Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-FileCopyrightText: 2020 Jonathan Pieper <jpieper@stud.uni-frankfurt.de> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5""" 

6Handling multiple measurements. 

7""" 

8 

9from typing import Any, Callable 

10 

11from .raw import RAW 

12from .hloop import Hloop 

13from .signalanalyzer import SA 

14from .multiple import MultipleM 

15 

16import logging 

17import re 

18import os 

19from glob import glob 

20 

21import pandas as pd 

22import numpy as np 

23 

24import matplotlib.pyplot as plt 

25 

26 

27class HandleM(MultipleM): 

28 def __init__(self, *args, **kwargs): 

29 """ 

30 Args: 

31 args: 

32 kwargs: 

33 

34 Note: 

35 **Measurement Handler:** This class represents all other 

36 measurements and is the interface to other measurement objets. 

37 """ 

38 super().__init__() 

39 self.logger = logging.getLogger('Handle') 

40 self.data = {} 

41 self.info = pd.DataFrame() 

42 self.info_dict = {} 

43 self.sa_dict = {} 

44 self.hloop_dict = {} 

45 if kwargs.get('data'): 

46 data = kwargs.get('data') 

47 for nr, d in data: 

48 data.update( 

49 self._process_data(nr, d['data'], d['info'])) 

50 self.parse_data(data, **kwargs) 

51 elif kwargs.get('directory'): 

52 self.load_files(kwargs.pop('directory'), **kwargs) 

53 

54 def __getitem__(self, key): 

55 """ 

56 Args: 

57 key: 

58 """ 

59 if isinstance(key, str): 

60 if key[1] == 'm': 

61 d = self.data.get(float(key[1:])) 

62 if not d: 

63 raise AttributeError("Measurement Nr. %s not found" % key) 

64 else: 

65 return d 

66 else: 

67 d = self.data.get(float(key)) 

68 if not d: 

69 raise AttributeError("Measurement Nr. %s not found" % key) 

70 else: 

71 return d 

72 return self.query('Nr == %f' % key) 

73 

74 def __setitem__(self, key, val): 

75 """ 

76 Args: 

77 key: 

78 val: 

79 """ 

80 self.logger.warning("Setting Items is not allowed.") 

81 return 

82 

83 def __contains__(self, key): 

84 """ 

85 Args: 

86 key: 

87 """ 

88 return (key in self.data) 

89 

90 def load_files(self, directory, **kwargs): 

91 """Loads all files from a specific directory and creates objects for 

92 found measurements. 

93 

94 Args: 

95 directory (str): directory to search in 

96 **kwargs: 

97 

98 Returns: 

99 None 

100 """ 

101 file_list = glob(os.sep.join([directory, '**.dat']), recursive=True) 

102 if file_list: 

103 self.load_folder(file_list, **kwargs) 

104 else: 

105 self.logger.error('No Files found: %s' % file_list) 

106 

107 def get_custom_header(self, kind='RAW', 

108 instruments=None): 

109 """Help Function to get column names for EVE Measurements. 

110 

111 Args: 

112 kind (str, optional): Kind of measurement, default: 'RAW' 

113 instruments (list, optional): Used instruments in order of 

114 appearance, default: ['t', 'LS', 'IPS', 'SR830'] 

115 

116 Returns: 

117 tuple: (list header, int skiprows) 

118 """ 

119 if not instruments: 

120 instruments = ['t', 'LS', 'IPS', 'SR830'] 

121 

122 if kind == 'RAW': 

123 return ['Time', "Vx", "Vy"], 0 

124 elif kind == 'SA': 

125 return ['f', 'Vx', 'Vy'], 6 

126 

127 def_skiprows = 3 

128 def_header = [] 

129 for k in instruments: 

130 if k == 't': 

131 def_header += ["Time", ] 

132 if k == 'LS': 

133 def_header += ["Temp%s" % i for i in range(6)] 

134 if k == 'IPS': 

135 def_header += ["B", ] 

136 if k == 'SR830': 

137 def_header += [ 

138 "Vx", "Vy", "Vr", "Vth", 

139 # ["V%s%d" % (var, gpib) for gpib in [8, 9, 13] 

140 # for var in ['x', 'y', 'r', 'th',]] 

141 ] 

142 return def_header, def_skiprows 

143 

144 def load_folder(self, file_list, **kwargs): 

145 

146 """ 

147 Args: 

148 file_list: 

149 **kwargs: 

150 """ 

151 self.logger.warning('Start loading folder: %s' % file_list[0]) 

152 d_dict = dict() 

153 

154 for f in file_list: 

155 directory, filename = f.split(os.sep)[-2:] 

156 regex = re.match( 

157 '(.*)[mM]([0-9.]+)(.*)([Pp]lusses|[Cc]rosses|[Ee]mpty)(.*).dat', 

158 filename) 

159 if not regex: 

160 regex = re.match('(.*)[mM]([0-9.]+)(.*).dat', filename) 

161 if not regex: 

162 self.logger.warning("Regex doesn't match: %s" % f) 

163 continue 

164 else: 

165 folder, nr, add_info = regex.groups() 

166 struct = None 

167 m_type = 'NA' 

168 struct = 'NA' 

169 else: 

170 folder, nr, m_type, struct, add_info = regex.groups() 

171 

172 try: 

173 nr = float(nr) 

174 except Exception: 

175 self.logger.error('Not a valid Measurement Nr. (%s)\n' % nr + \ 

176 'Debug kind/struct/f (%s, %s, %s)' % ( 

177 m_type, struct, f)) 

178 continue 

179 

180 if nr == int(nr): 

181 nr = int(nr) 

182 def_header, def_skiprows = self.get_custom_header('SA') 

183 elif 'SR830' in add_info: 

184 def_header, def_skiprows = self.get_custom_header('RAW') 

185 else: 

186 def_header, def_skiprows = self.get_custom_header( 

187 instruments=['t', 'LS', 'IPS', 'SR830']) 

188 

189 data = pd.read_csv(f, sep=kwargs.get('sep', '\t'), 

190 # header=kwargs.get('header', def_header_index), 

191 names=kwargs.get('names', def_header), 

192 skiprows=kwargs.get('skiprows', def_skiprows)) 

193 

194 if int(nr) == nr: 

195 nr = int(nr) 

196 subnr = False 

197 else: 

198 i, subnr = str(nr).split('.') 

199 

200 info = {'type': 'SA', 

201 'Nr': nr, 

202 'filename': f.split(os.sep)[-1], 

203 'folder': folder, 

204 'technique': m_type.strip('_'), 

205 'Structure': struct, 

206 'Additional': add_info.strip('_')} 

207 if subnr: 

208 info['type'] = 'RAW' 

209 info['subnr'] = subnr 

210 

211 d_dict.update(self._process_data(nr, data, info)) 

212 if kwargs.get('drop_unused_columns', True): 

213 if 'Temp0' in def_header: 

214 d_dict[nr]['data'] = d_dict[nr]['data'].drop( 

215 ["Temp%s" % i for i in range(6)] + \ 

216 ["Vr", "Vth"], 

217 axis=1, 

218 ) 

219 self.parse_data(d_dict) 

220 

221 def parse_data(self, data, **kwargs): 

222 """imports data and sets info variable. 

223 

224 .. code-block:: python 

225 

226 h.parse_data({'data': pd.DataFrame, 

227 'info': dict(Nr=0, ** kw) }) 

228 

229 Args: 

230 data (dict): Data to parse 

231 **kwargs: 

232 

233 Returns: 

234 None 

235 """ 

236 self.logger.debug('Start parsing data: %s' % data.items()) 

237 for nr, df in data.items(): 

238 if not df.get('info'): 

239 continue 

240 m_type = df['info'].get('type', 'Unknown') 

241 self.info_dict.update({nr: df['info']}) 

242 if df['info'].get('Structure'): 

243 kwargs['Structure'] = df['info']['Structure'] 

244 if df['info'].get('Angle'): 

245 kwargs['Angle'] = df['info']['Angle'] 

246 if df['info'].get('Additional'): 

247 kwargs['Additional'] = df['info']['Additional'] 

248 

249 if m_type == 'SA': 

250 self.data[nr] = SA(df) 

251 self.sa_dict[nr] = self.data[nr] 

252 elif m_type == 'Hloop': 

253 self.data[nr] = Hloop(nr, down_only=True, 

254 **kwargs) 

255 self.hloop_dict[nr] = self.data[nr] 

256 elif m_type == 'RAW': 

257 self.data[nr] = RAW(df, **kwargs) 

258 self.hloop_dict[nr] = self.data[nr] 

259 else: 

260 self.logger.error( 

261 'Measurement %s (type: %s) not processed (unknown type).' % ( 

262 nr, m_type)) 

263 

264 self.info = pd.DataFrame(self.info_dict).T 

265 # self.info.set_index('Nr', inplace=True) 

266 self.logger.debug('Parsing Measurement Finished\n' + \ 

267 'Debug size data/info (%s, %s)' % (len(self.data), 

268 self.info.shape)) 

269 

270 def _process_data(self, nr, data, info): 

271 """ 

272 Args: 

273 nr: 

274 data: 

275 info: 

276 """ 

277 self.logger.info('%s\n%s' % (data, info)) 

278 

279 # Logging statistical values for info 

280 for key in data.keys(): 

281 # Converting Data to numeric 

282 if data[key].dtype == object: 

283 try: 

284 data[key] = pd.to_numeric(data[key], errors='coerce') 

285 except Exception: 

286 self.logger.error('Not Numeric column: %s/%s' % (nr, key)) 

287 

288 info['%s_count' % key] = data[key].count() 

289 info['%s_mean' % key] = data[key].mean() 

290 info['%s_var' % key] = data[key].var() 

291 info['%s_min' % key] = data[key].min() 

292 info['%s_max' % key] = data[key].max() 

293 

294 d_dict = {nr: { 

295 'data': data, 

296 'info': info 

297 }} 

298 return d_dict 

299 

300 def plot(self, *args, **kwargs): 

301 """Plotting a Measurement 

302 

303 Args: 

304 lofm: list of measurements 

305 figsize: tuple size of figure 

306 show_fit: bool 

307 fit_range: tuple/list 

308 plot_settings: list 

309 grid_options: list 

310 f_settings: 1/f line 

311 f2_settings: 1/f^2 line 

312  

313 

314 Returns: 

315 tuple: Figure, Axis 

316 

317 Raises: 

318 warning: lofm not available. 

319 KeyError: Measurement does not exist. 

320 """ 

321 lofm = kwargs.get('lofm', args[0]) 

322 if not (lofm): 

323 self.logger.warning("No list of measurements (lofm) available.") 

324 return 

325 

326 fig, ax = plt.subplots(figsize=kwargs.get('figsize', (16, 12))) 

327 for i, j in lofm.items(): 

328 if i not in self.data: 

329 raise KeyError(i) 

330 

331 label = 'm%s: %s' % (i, j[0]) 

332 plot_options = j[1] 

333 

334 if kwargs.get('fit_range'): 

335 intercept, slope = self.data[i].fit(kwargs.get('fit_range')) 

336 x = np.linspace(*kwargs.get('fit_range')) 

337 if kwargs.get('show_fit'): 

338 ax.plot(x, (10 ** intercept * np.power(x, slope)), 

339 label='Fit m%s' % i) 

340 if kwargs.get('show_fit_label'): 

341 label += ' $\\alpha = %.3f, S_R(f=0) = 10^{%.3f}$' % ( 

342 slope, intercept) 

343 

344 if kwargs.get('norm_S'): 

345 self.data[i].df['norm'] = self.data[i].df['Vx'] * \ 

346 self.data[i].df['f'] 

347 plot_options['plot_y'] = 'norm' 

348 

349 self.data[i].plot(label=label, ax=ax, 

350 dont_set_plot_settings=True, 

351 **plot_options) 

352 

353 tmp = kwargs.get('plot_settings', dict()) 

354 if not (tmp): 

355 tmp = {} 

356 plot_settings = dict( 

357 xlim=(1e-2, 1e0), 

358 ylim=(1e-7, 5e-2), 

359 title='($90^\circ$) Field sweeps', 

360 grid=dict( 

361 which='minor', 

362 color='#ffff99', 

363 linestyle='--', 

364 alpha=.5)) 

365 plot_settings.update(tmp) 

366 self.data[i]._set_plot_settings(**plot_settings) 

367 

368 grid_options = dict( 

369 b=True, 

370 which='minor', 

371 color='#cccccc', 

372 linestyle='-.', 

373 alpha=.3) 

374 grid_options.update(kwargs.get('grid_options', dict())) 

375 plt.grid(**grid_options) 

376 

377 # Draw 1 over f lines 

378 f_settings = dict(xmin=1e-2, ymin=1e-6) 

379 f_settings.update(kwargs.get('f_settings', {})) 

380 f2_settings = dict(ymin=1e-3, alpha=2, 

381 plot_style='b--', an_color='blue') 

382 f2_settings.update(kwargs.get('f2_settings', {})) 

383 

384 self.data[i]._draw_oneoverf(ax, **f_settings) 

385 self.data[i]._draw_oneoverf(ax, **f2_settings) 

386 

387 return fig, ax 

388 

389 def __repr__(self): 

390 ret = 'HandleM\n' 

391 if not self.data: 

392 return ret + 'Empty' 

393 

394 for nr, info in self.info_dict.items(): 

395 ret += 'Nr. %s:\t Type: %s\t Structure: %s\tTechnique: %s\n' % ( 

396 nr, 

397 info['type'], 

398 info['Structure'], 

399 info['technique']) 

400 return ret 

401 

402 def get_info(self): 

403 """Returns information about all measurements  

404 and splits it into groups (SA/RAV/MFN/VH).""" 

405 

406 info = self.info 

407 sa = info[info['type'] == 'SA'] 

408 raw = info[info['type'] == 'RAW'] 

409 mfn = info[info['technique'] == 'MFN'] 

410 vh = info[info['technique'] == 'VH'] 

411 

412 groups = dict(vh=vh, mfn=mfn, sa=sa, raw=raw) 

413 

414 return info, groups