Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# SPDX-FileCopyrightText: 2020 Jonathan Pieper <jpieper@stud.uni-frankfurt.de> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5"""Measurements with time signal only.""" 

6 

7from .single import SingleM 

8 

9import os 

10import logging 

11import numpy as np 

12import pandas as pd 

13 

14try: 

15 from spectrumanalyzer import SpectrumAnalyzer 

16except ImportError: 

17 pass 

18 

19 

20class RAW(SingleM): 

21 def __init__(self, data, **kwargs): 

22 """ 

23 RAW Measurement: 

24 Measurements with time signal only. 

25 

26 .. code-block:: python 

27 :linenos: 

28 :caption: Example 

29 

30 data = np.array(timesignal, shape=(n,)) 

31 data = { 

32 'data': np.array(timesignal, shape=(n,)) 

33 'info': {'Nr': 123, 'Add Info': 'Important Informations'} 

34 } 

35 raw = ana.RAW(data) 

36 

37 :param data: Different possibilities. 

38 

39 :param rate: At which rate was the timesignal measured (samplingrate) [Hz] 

40 :type rate: float, default: 2e-4 

41 

42 :info: First Spectrum parameters 

43 :param nof_first_spectra: Cut timesignal into # pieces and average. 

44 :type nof_first_spectra: int, default: 64 

45 

46 :param first_spectra_highest_frequency: Cut higher frequencies in final spectrum than this. 

47 :type first_spectra_highest_frequency: float, default: rate/8 

48 

49 :param downsample: Downsample the timesignal before calculation. 

50 :type downsamle: bool, default: True 

51 

52 :param calc_first: Triggers calculation of first spectrum. 

53 :type calc_first: bool, default: False 

54 

55 :info: Second Spectrum parameters 

56 :param highest_octave: The highest octave starting point [Hz] of the second spectrum. 

57 :type highest_octave: int, default: 64 

58 

59 :param minimum_points_in_octave: If octave would contain less points, stop. 

60 :type minimum_points_in_octave: int, default: 10 

61 

62 :param calc_second: Triggers calculation of second spectrum. 

63 :type calc_second: bool, default: False 

64 """ 

65 super().__init__() 

66 self.logger = logging.getLogger('RAW') 

67 self.name = 'RAW' 

68 # Default Parameter for the first and second spectrum 

69 timesignal = data.get('data', 

70 kwargs.get('timesignal', 

71 pd.Series([], dtype='float64') 

72 ) 

73 ) 

74 self.info = data.get('info', {}) 

75 self.data = data 

76 self.rate = kwargs.get('rate', 2e-4) 

77 

78 # Define dummies 

79 self.avg = 0 

80 self.spectrum = None 

81 self.spec = [] 

82 self.avg_spec = pd.DataFrame() 

83 self.freq2 = np.array([]) 

84 self.time_signal_second_spectrum_transposed_normalized = np.array([]) 

85 self.second_spectrum_time_array = np.array([]) 

86 

87 self.timesignal = self.check_timesignal(timesignal) 

88 self.n = int(1024 * self.avg) 

89 self.highest_octave = kwargs.get('highest_octave', 64) 

90 self.minimum_points_in_octave = kwargs.get('minimum_points_in_octave', 

91 10) 

92 self.shifting_method = False 

93 self.short_mode = 1 

94 self.filter_type = 'lowpass' 

95 self.passband_factor = 1 

96 self.filter_order = 6 

97 self.filter_analog = False 

98 self.downsample = kwargs.get('downsample', True) 

99 self.f_max = kwargs.get( 

100 'first_spectra_highest_frequency', self.rate / 8) 

101 self.num = kwargs.get('nof_first_spectra', 64) 

102 

103 self.info.update({ 

104 "Time": self.n / self.rate, 

105 "Num": self.n, 

106 "Rate": self.rate, 

107 "Avg": self.avg, 

108 "downsample": self.downsample, 

109 "f_max": self.f_max, 

110 "NofSpectra": self.num, 

111 }) 

112 

113 if kwargs.get('calc_first'): 

114 self.calc_first_spectrum() 

115 

116 if self.avg_spec.empty: 

117 return 

118 

119 max_freq = self.avg_spec['freq'].max() 

120 self.info.update({ 

121 # "f_min": freq[0], 

122 # 'f_max': freq.max(), 

123 "f_min": self.avg_spec['freq'].min(), 

124 'f_max': max_freq, 

125 }) 

126 

127 if kwargs.get('calc_second'): 

128 self.calc_second_spectrum(highest_octave=max_freq) 

129 

130 def check_timesignal(self, timesignal): 

131 """ converts pd.DataFrame['Vx'] and Series into numpy array. 

132 

133 :param timesignal: input timesignal 

134 :return: converted timesignal. 

135 :rtype: numpy.ndarray 

136 """ 

137 if isinstance(timesignal, pd.DataFrame): 

138 ts = np.array(timesignal['Vx']) 

139 elif isinstance(timesignal, pd.Series): 

140 ts = timesignal.to_numpy() 

141 else: 

142 ts = timesignal 

143 

144 self.avg = (len(ts) / 1024.) 

145 if self.avg != int(self.avg): 

146 self.avg = int(len(ts) // 1024) 

147 ts = ts[:int(self.avg * 1024)] 

148 

149 if self.rate == 0: 

150 self.rate = 1 

151 

152 return ts 

153 

154 def calc_first_spectrum(self): 

155 """ 

156 Sets the variable ``spectrum`` to a 

157 :class:`spectrumanalyzer.SpectrumAnalyzer` object. 

158 

159 .. important:: 

160 Calculates the first spectrum. 

161 

162  

163 :raises NameError: spectrumanalyzer is not installed 

164 :return: None 

165 

166 """ 

167 

168 try: 

169 self.spectrum = SpectrumAnalyzer( 

170 # filepath=filename, 

171 timesignal=self.timesignal, 

172 samplingrate=self.rate, 

173 averages=self.avg, 

174 shifting_method=self.shifting_method, 

175 short_mode=self.short_mode, 

176 filter_type=self.filter_type, 

177 passband_factor=self.passband_factor, 

178 filter_order=self.filter_order, 

179 filter_analog=self.filter_analog, 

180 first_spectra_highest_frequency=self.f_max, 

181 downsample=self.downsample, 

182 ) 

183 except NameError: 

184 raise NameError("SpectrumAnalyzer module not installed.") 

185 

186 for spectrum_arr, freq, k in self.spectrum.cut_timesignal(self.num): 

187 self.spec.append(spectrum_arr) 

188 

189 self.avg_spec = pd.DataFrame({ 

190 'freq': self.spectrum.frequency_span_array, 

191 'S': self.spectrum.first_spectrum}) 

192 

193 def calc_second_spectrum(self, **kwargs): 

194 """ 

195 Calculating the second spectrum. 

196 """ 

197 

198 self.spectrum.calculate_second_spectrum( 

199 highest_octave=kwargs.get('highest_octave', self.highest_octave), 

200 minimum_points_in_octave=kwargs.get('minimum_points_in_octave', 

201 self.minimum_points_in_octave), 

202 peak_filter=kwargs.get('peak_filter', False) 

203 ) 

204 

205 self.freq2 = self.spectrum.frequency_span_array[ 

206 self.spectrum.frequency_span_array > 0 

207 ] 

208 

209 octaves = [self.highest_octave] 

210 for i in range(30): 

211 freq = np.fft.fftfreq( 

212 int(self.n / self.spectrum.number_of_first_spectra), 

213 1. / self.rate)[3:] 

214 freq = freq[freq > 0] 

215 freq = freq[freq < self.f_max] 

216 points_in_octave = len( 

217 freq[freq < self.highest_octave / (2. ** (i + 1))]) 

218 number_of_octaves = i 

219 if points_in_octave < self.minimum_points_in_octave: 

220 break 

221 else: 

222 octaves = np.append([min(octaves) / 2.], octaves) 

223 

224 self.info.update({ 

225 'NofOct': number_of_octaves, 

226 }) 

227 

228 # Calculate Timesignal if needed 

229 if not kwargs.get('skip_timesignal', False): 

230 self.calc_time_signal_second_spectrum() 

231 

232 def calc_time_signal_second_spectrum(self): 

233 time_signal_second_spectrum_transposed = \ 

234 self.spectrum.time_signal_second_spectrum.transpose() 

235 second_spectrum_time_array = \ 

236 np.arange(0, len(time_signal_second_spectrum_transposed[0])) * \ 

237 self.spectrum.timestep_second_spectrum 

238 time_signal_second_spectrum_transposed_normalized = \ 

239 np.zeros((len(time_signal_second_spectrum_transposed), 

240 len(time_signal_second_spectrum_transposed[0]))) 

241 time_signal_second_spectrum_transposed_normalized = \ 

242 np.fliplr(time_signal_second_spectrum_transposed_normalized) 

243 

244 for p in range(self.spectrum.number_of_octaves): 

245 time_signal_second_spectrum_transposed_normalized[p] = \ 

246 time_signal_second_spectrum_transposed[p] / np.mean( 

247 time_signal_second_spectrum_transposed[p]) 

248 

249 self.time_signal_second_spectrum_transposed_normalized = \ 

250 time_signal_second_spectrum_transposed_normalized 

251 self.second_spectrum_time_array = second_spectrum_time_array 

252 

253 

254 def plot_spectrum(self, ax, ): 

255 self.avg_spec.plot(x='freq', y='S', loglog=True, ax=ax) 

256 

257 def plot_time(self, ax, ): 

258 ax.plot(np.arange(len(self.timesignal)), self.timesignal, 

259 color='red', label='time', linewidth=.2) 

260 

261 def subtract_mean(self, inplace=False): 

262 """Subtracts mean value from timesignal. 

263 

264 :param inplace: Change variable inside this object? 

265 :type inplace: bool 

266 

267 :return: normalized timesignal 

268 """ 

269 norm_ts = self.timesignal - np.mean(self.timesignal, axis=1) 

270 

271 if inplace: 

272 self.timesignal = norm_ts 

273 

274 return norm_ts 

275 

276 def write(self, file): 

277 """ 

278 Writing data to a file. 

279 

280 Parameters 

281 ---------- 

282 file: str 

283 """ 

284 if self.avg_spec.empty: 

285 return 

286 

287 if not os.path.exists(file): 

288 os.makedirs(file) 

289 

290 all_first_spectra = self.spectrum.first_spectra_df.loc[:] 

291 all_first_spectra['Avg'] = self.avg_spec['S'] 

292 

293 all_first_spectra.to_csv("%s_first.dat" % file, index=False) 

294 # all_first_spectra.to_feather("%s_first.feather" % file) 

295 

296 if not self.spectrum.second_spectra.any(): 

297 return 

298 

299 all_second_spectra = pd.DataFrame(self.spectrum.second_spectra, 

300 index=False) 

301 all_second_spectra['Frequency'] = \ 

302 self.spectrum.frequency_span_array_second_spectra 

303 all_second_spectra.to_csv('%s_second.dat' % file, 

304 index=False) 

305 # all_second_spectra.to_feather('%s_second.feather' % file) 

306 

307 all_second_spectra_time = pd.DataFrame( 

308 self.spectrum.time_signal_second_spectrum) 

309 all_second_spectra_time.to_csv('%s_second_time.dat' % file, 

310 index=False) 

311 # all_second_spectra_time.to_feather('%s_second_time.feather' % file) 

312 

313 def read(self, file): 

314 """ 

315 Reading data from file. 

316 

317 Parameters 

318 ---------- 

319 file: str 

320 Filename base to read data from. 

321 

322 Returns 

323 ------- 

324 self 

325 """ 

326 

327 try: 

328 first_spectra = pd.read_csv("%s_first.dat" % file) 

329 except FileNotFoundError: 

330 self.logger.error('First Spectrum at %s_first.dat not found!', 

331 file) 

332 return False 

333 

334 try: 

335 self.spectrum = SpectrumAnalyzer( 

336 timesignal=self.timesignal, 

337 samplingrate=self.rate, 

338 averages=self.avg, 

339 shifting_method=self.shifting_method, 

340 short_mode=self.short_mode, 

341 filter_type=self.filter_type, 

342 passband_factor=self.passband_factor, 

343 filter_order=self.filter_order, 

344 filter_analog=self.filter_analog, 

345 first_spectra_highest_frequency=self.f_max, 

346 downsample=self.downsample, 

347 ) 

348 except Exception as e: 

349 self.logger.error("Can not create SpectrumAnalyzer Object: %s", e) 

350 return False 

351 

352 try: 

353 self.avg_spec = first_spectra[['Frequency', 'Avg']].loc[:] 

354 self.avg_spec.rename({'Avg': 'S', 

355 'Frequency': 'freq'}, 

356 axis='columns', inplace=True) 

357 

358 self.num = first_spectra.shape[1] - 2 

359 self.spectrum.number_of_first_spectra = self.num 

360 

361 self.spectrum.frequency_span_array = first_spectra['Frequency'] 

362 self.spectrum.finalize_first_spectrum( 

363 first_spectra.drop('Avg', axis=1)) 

364 except Exception as e: 

365 self.logger.error("Can not load variables: %s", e) 

366 return False 

367 

368 try: 

369 second_spectrum = pd.read_csv("%s_second.dat" % file) 

370 second_spectrum_time = pd.read_csv("%s_second_time.dat" % file) 

371 except FileNotFoundError: 

372 self.logger.info('Second Spectrum (%s_second.dat) not found!', 

373 file) 

374 return True 

375 

376 try: 

377 self.spectrum.frequency_span_array_second_spectra = \ 

378 second_spectrum['Frequency'] 

379 self.spectrum.second_spectra = second_spectrum.drop('Frequency', 

380 axis=1) 

381 self.spectrum.time_signal_second_spectrum = second_spectrum_time 

382 except Exception as e: 

383 self.logger.error("Can not load second spectrum variables: %s", e) 

384 return False 

385 

386 self.logger.warning("Second Spectrum loaded but not all variables" 

387 "are implemented yet. Only access to \n" 

388 "- second_spectra\n" 

389 "- time_signal_second_spectrum\n" 

390 "- frequency_span_array_second_spectra") 

391 return True