Coverage for ana/raw.py : 44%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-FileCopyrightText: 2020 Jonathan Pieper <jpieper@stud.uni-frankfurt.de>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
5"""Measurements with time signal only."""
7from .single import SingleM
9import os
10import logging
11import numpy as np
12import pandas as pd
14try:
15 from spectrumanalyzer import SpectrumAnalyzer
16except ImportError:
17 pass
20class RAW(SingleM):
21 def __init__(self, data, **kwargs):
22 """
23 RAW Measurement:
24 Measurements with time signal only.
26 .. code-block:: python
27 :linenos:
28 :caption: Example
30 data = np.array(timesignal, shape=(n,))
31 data = {
32 'data': np.array(timesignal, shape=(n,))
33 'info': {'Nr': 123, 'Add Info': 'Important Informations'}
34 }
35 raw = ana.RAW(data)
37 :param data: Different possibilities.
39 :param rate: At which rate was the timesignal measured (samplingrate) [Hz]
40 :type rate: float, default: 2e-4
42 :info: First Spectrum parameters
43 :param nof_first_spectra: Cut timesignal into # pieces and average.
44 :type nof_first_spectra: int, default: 64
46 :param first_spectra_highest_frequency: Cut higher frequencies in final spectrum than this.
47 :type first_spectra_highest_frequency: float, default: rate/8
49 :param downsample: Downsample the timesignal before calculation.
50 :type downsamle: bool, default: True
52 :param calc_first: Triggers calculation of first spectrum.
53 :type calc_first: bool, default: False
55 :info: Second Spectrum parameters
56 :param highest_octave: The highest octave starting point [Hz] of the second spectrum.
57 :type highest_octave: int, default: 64
59 :param minimum_points_in_octave: If octave would contain less points, stop.
60 :type minimum_points_in_octave: int, default: 10
62 :param calc_second: Triggers calculation of second spectrum.
63 :type calc_second: bool, default: False
64 """
65 super().__init__()
66 self.logger = logging.getLogger('RAW')
67 self.name = 'RAW'
68 # Default Parameter for the first and second spectrum
69 timesignal = data.get('data',
70 kwargs.get('timesignal',
71 pd.Series([], dtype='float64')
72 )
73 )
74 self.info = data.get('info', {})
75 self.data = data
76 self.rate = kwargs.get('rate', 2e-4)
78 # Define dummies
79 self.avg = 0
80 self.spectrum = None
81 self.spec = []
82 self.avg_spec = pd.DataFrame()
83 self.freq2 = np.array([])
84 self.time_signal_second_spectrum_transposed_normalized = np.array([])
85 self.second_spectrum_time_array = np.array([])
87 self.timesignal = self.check_timesignal(timesignal)
88 self.n = int(1024 * self.avg)
89 self.highest_octave = kwargs.get('highest_octave', 64)
90 self.minimum_points_in_octave = kwargs.get('minimum_points_in_octave',
91 10)
92 self.shifting_method = False
93 self.short_mode = 1
94 self.filter_type = 'lowpass'
95 self.passband_factor = 1
96 self.filter_order = 6
97 self.filter_analog = False
98 self.downsample = kwargs.get('downsample', True)
99 self.f_max = kwargs.get(
100 'first_spectra_highest_frequency', self.rate / 8)
101 self.num = kwargs.get('nof_first_spectra', 64)
103 self.info.update({
104 "Time": self.n / self.rate,
105 "Num": self.n,
106 "Rate": self.rate,
107 "Avg": self.avg,
108 "downsample": self.downsample,
109 "f_max": self.f_max,
110 "NofSpectra": self.num,
111 })
113 if kwargs.get('calc_first'):
114 self.calc_first_spectrum()
116 if self.avg_spec.empty:
117 return
119 max_freq = self.avg_spec['freq'].max()
120 self.info.update({
121 # "f_min": freq[0],
122 # 'f_max': freq.max(),
123 "f_min": self.avg_spec['freq'].min(),
124 'f_max': max_freq,
125 })
127 if kwargs.get('calc_second'):
128 self.calc_second_spectrum(highest_octave=max_freq)
130 def check_timesignal(self, timesignal):
131 """ converts pd.DataFrame['Vx'] and Series into numpy array.
133 :param timesignal: input timesignal
134 :return: converted timesignal.
135 :rtype: numpy.ndarray
136 """
137 if isinstance(timesignal, pd.DataFrame):
138 ts = np.array(timesignal['Vx'])
139 elif isinstance(timesignal, pd.Series):
140 ts = timesignal.to_numpy()
141 else:
142 ts = timesignal
144 self.avg = (len(ts) / 1024.)
145 if self.avg != int(self.avg):
146 self.avg = int(len(ts) // 1024)
147 ts = ts[:int(self.avg * 1024)]
149 if self.rate == 0:
150 self.rate = 1
152 return ts
154 def calc_first_spectrum(self):
155 """
156 Sets the variable ``spectrum`` to a
157 :class:`spectrumanalyzer.SpectrumAnalyzer` object.
159 .. important::
160 Calculates the first spectrum.
163 :raises NameError: spectrumanalyzer is not installed
164 :return: None
166 """
168 try:
169 self.spectrum = SpectrumAnalyzer(
170 # filepath=filename,
171 timesignal=self.timesignal,
172 samplingrate=self.rate,
173 averages=self.avg,
174 shifting_method=self.shifting_method,
175 short_mode=self.short_mode,
176 filter_type=self.filter_type,
177 passband_factor=self.passband_factor,
178 filter_order=self.filter_order,
179 filter_analog=self.filter_analog,
180 first_spectra_highest_frequency=self.f_max,
181 downsample=self.downsample,
182 )
183 except NameError:
184 raise NameError("SpectrumAnalyzer module not installed.")
186 for spectrum_arr, freq, k in self.spectrum.cut_timesignal(self.num):
187 self.spec.append(spectrum_arr)
189 self.avg_spec = pd.DataFrame({
190 'freq': self.spectrum.frequency_span_array,
191 'S': self.spectrum.first_spectrum})
193 def calc_second_spectrum(self, **kwargs):
194 """
195 Calculating the second spectrum.
196 """
198 self.spectrum.calculate_second_spectrum(
199 highest_octave=kwargs.get('highest_octave', self.highest_octave),
200 minimum_points_in_octave=kwargs.get('minimum_points_in_octave',
201 self.minimum_points_in_octave),
202 peak_filter=kwargs.get('peak_filter', False)
203 )
205 self.freq2 = self.spectrum.frequency_span_array[
206 self.spectrum.frequency_span_array > 0
207 ]
209 octaves = [self.highest_octave]
210 for i in range(30):
211 freq = np.fft.fftfreq(
212 int(self.n / self.spectrum.number_of_first_spectra),
213 1. / self.rate)[3:]
214 freq = freq[freq > 0]
215 freq = freq[freq < self.f_max]
216 points_in_octave = len(
217 freq[freq < self.highest_octave / (2. ** (i + 1))])
218 number_of_octaves = i
219 if points_in_octave < self.minimum_points_in_octave:
220 break
221 else:
222 octaves = np.append([min(octaves) / 2.], octaves)
224 self.info.update({
225 'NofOct': number_of_octaves,
226 })
228 # Calculate Timesignal if needed
229 if not kwargs.get('skip_timesignal', False):
230 self.calc_time_signal_second_spectrum()
232 def calc_time_signal_second_spectrum(self):
233 time_signal_second_spectrum_transposed = \
234 self.spectrum.time_signal_second_spectrum.transpose()
235 second_spectrum_time_array = \
236 np.arange(0, len(time_signal_second_spectrum_transposed[0])) * \
237 self.spectrum.timestep_second_spectrum
238 time_signal_second_spectrum_transposed_normalized = \
239 np.zeros((len(time_signal_second_spectrum_transposed),
240 len(time_signal_second_spectrum_transposed[0])))
241 time_signal_second_spectrum_transposed_normalized = \
242 np.fliplr(time_signal_second_spectrum_transposed_normalized)
244 for p in range(self.spectrum.number_of_octaves):
245 time_signal_second_spectrum_transposed_normalized[p] = \
246 time_signal_second_spectrum_transposed[p] / np.mean(
247 time_signal_second_spectrum_transposed[p])
249 self.time_signal_second_spectrum_transposed_normalized = \
250 time_signal_second_spectrum_transposed_normalized
251 self.second_spectrum_time_array = second_spectrum_time_array
254 def plot_spectrum(self, ax, ):
255 self.avg_spec.plot(x='freq', y='S', loglog=True, ax=ax)
257 def plot_time(self, ax, ):
258 ax.plot(np.arange(len(self.timesignal)), self.timesignal,
259 color='red', label='time', linewidth=.2)
261 def subtract_mean(self, inplace=False):
262 """Subtracts mean value from timesignal.
264 :param inplace: Change variable inside this object?
265 :type inplace: bool
267 :return: normalized timesignal
268 """
269 norm_ts = self.timesignal - np.mean(self.timesignal, axis=1)
271 if inplace:
272 self.timesignal = norm_ts
274 return norm_ts
276 def write(self, file):
277 """
278 Writing data to a file.
280 Parameters
281 ----------
282 file: str
283 """
284 if self.avg_spec.empty:
285 return
287 if not os.path.exists(file):
288 os.makedirs(file)
290 all_first_spectra = self.spectrum.first_spectra_df.loc[:]
291 all_first_spectra['Avg'] = self.avg_spec['S']
293 all_first_spectra.to_csv("%s_first.dat" % file, index=False)
294 # all_first_spectra.to_feather("%s_first.feather" % file)
296 if not self.spectrum.second_spectra.any():
297 return
299 all_second_spectra = pd.DataFrame(self.spectrum.second_spectra,
300 index=False)
301 all_second_spectra['Frequency'] = \
302 self.spectrum.frequency_span_array_second_spectra
303 all_second_spectra.to_csv('%s_second.dat' % file,
304 index=False)
305 # all_second_spectra.to_feather('%s_second.feather' % file)
307 all_second_spectra_time = pd.DataFrame(
308 self.spectrum.time_signal_second_spectrum)
309 all_second_spectra_time.to_csv('%s_second_time.dat' % file,
310 index=False)
311 # all_second_spectra_time.to_feather('%s_second_time.feather' % file)
313 def read(self, file):
314 """
315 Reading data from file.
317 Parameters
318 ----------
319 file: str
320 Filename base to read data from.
322 Returns
323 -------
324 self
325 """
327 try:
328 first_spectra = pd.read_csv("%s_first.dat" % file)
329 except FileNotFoundError:
330 self.logger.error('First Spectrum at %s_first.dat not found!',
331 file)
332 return False
334 try:
335 self.spectrum = SpectrumAnalyzer(
336 timesignal=self.timesignal,
337 samplingrate=self.rate,
338 averages=self.avg,
339 shifting_method=self.shifting_method,
340 short_mode=self.short_mode,
341 filter_type=self.filter_type,
342 passband_factor=self.passband_factor,
343 filter_order=self.filter_order,
344 filter_analog=self.filter_analog,
345 first_spectra_highest_frequency=self.f_max,
346 downsample=self.downsample,
347 )
348 except Exception as e:
349 self.logger.error("Can not create SpectrumAnalyzer Object: %s", e)
350 return False
352 try:
353 self.avg_spec = first_spectra[['Frequency', 'Avg']].loc[:]
354 self.avg_spec.rename({'Avg': 'S',
355 'Frequency': 'freq'},
356 axis='columns', inplace=True)
358 self.num = first_spectra.shape[1] - 2
359 self.spectrum.number_of_first_spectra = self.num
361 self.spectrum.frequency_span_array = first_spectra['Frequency']
362 self.spectrum.finalize_first_spectrum(
363 first_spectra.drop('Avg', axis=1))
364 except Exception as e:
365 self.logger.error("Can not load variables: %s", e)
366 return False
368 try:
369 second_spectrum = pd.read_csv("%s_second.dat" % file)
370 second_spectrum_time = pd.read_csv("%s_second_time.dat" % file)
371 except FileNotFoundError:
372 self.logger.info('Second Spectrum (%s_second.dat) not found!',
373 file)
374 return True
376 try:
377 self.spectrum.frequency_span_array_second_spectra = \
378 second_spectrum['Frequency']
379 self.spectrum.second_spectra = second_spectrum.drop('Frequency',
380 axis=1)
381 self.spectrum.time_signal_second_spectrum = second_spectrum_time
382 except Exception as e:
383 self.logger.error("Can not load second spectrum variables: %s", e)
384 return False
386 self.logger.warning("Second Spectrum loaded but not all variables"
387 "are implemented yet. Only access to \n"
388 "- second_spectra\n"
389 "- time_signal_second_spectrum\n"
390 "- frequency_span_array_second_spectra")
391 return True