Source code for ana.raw

# SPDX-FileCopyrightText: 2020/2021 Jonathan Pieper <ody55eus@mailbox.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later

"""Measurements with time signal only."""

from .single import SingleM

import os
import logging
import numpy as np
import pandas as pd

try:
    from spectrumanalyzer import SpectrumAnalyzer
except ImportError:
    pass


[docs]class RAW(SingleM): def __init__(self, data, **kwargs): """ RAW Measurement: Measurements with time signal only. .. code-block:: python :linenos: :caption: Example data = np.array(timesignal, shape=(n,)) data = { 'data': np.array(timesignal, shape=(n,)) 'info': {'Nr': 123, 'Add Info': 'Important Informations'} } raw = ana.RAW(data) :param data: Different possibilities. :param rate: At which rate was the timesignal measured (samplingrate) [Hz] :type rate: float, default: 2e-4 :info: First Spectrum parameters :param nof_first_spectra: Cut timesignal into # pieces and average. :type nof_first_spectra: int, default: 64 :param first_spectra_highest_frequency: Cut higher frequencies in final spectrum than this. :type first_spectra_highest_frequency: float, default: rate/8 :param downsample: Downsample the timesignal before calculation. :type downsamle: bool, default: True :param calc_first: Triggers calculation of first spectrum. :type calc_first: bool, default: False :info: Second Spectrum parameters :param highest_octave: The highest octave starting point [Hz] of the second spectrum. :type highest_octave: int, default: 64 :param minimum_points_in_octave: If octave would contain less points, stop. :type minimum_points_in_octave: int, default: 10 :param calc_second: Triggers calculation of second spectrum. :type calc_second: bool, default: False """ super().__init__() self.logger = logging.getLogger('RAW') self.name = 'RAW' # Default Parameter for the first and second spectrum timesignal = data.get('data', kwargs.get('timesignal', pd.Series([], dtype='float64') ) ) self.info = data.get('info', {}) self.data = data self.rate = kwargs.get('rate', 2e-4) # Define dummies self.avg = 0 self.spectrum = None self.spec = [] self.avg_spec = pd.DataFrame() self.freq2 = np.array([]) self.time_signal_second_spectrum_transposed_normalized = np.array([]) self.second_spectrum_time_array = np.array([]) self.timesignal = self.check_timesignal(timesignal) self.n = int(1024 * self.avg) self.highest_octave = kwargs.get('highest_octave', 64) self.minimum_points_in_octave = kwargs.get('minimum_points_in_octave', 10) self.shifting_method = False self.short_mode = 1 self.filter_type = 'lowpass' self.passband_factor = 1 self.filter_order = 6 self.filter_analog = False self.downsample = kwargs.get('downsample', True) self.f_max = kwargs.get( 'first_spectra_highest_frequency', self.rate / 8) self.num = kwargs.get('nof_first_spectra', 64) self.info.update({ "Time": self.n / self.rate, "Num": self.n, "Rate": self.rate, "Avg": self.avg, "downsample": self.downsample, "f_max": self.f_max, "NofSpectra": self.num, }) if kwargs.get('calc_first'): self.calc_first_spectrum() if self.avg_spec.empty: return max_freq = self.avg_spec['freq'].max() self.info.update({ # "f_min": freq[0], # 'f_max': freq.max(), "f_min": self.avg_spec['freq'].min(), 'f_max': max_freq, }) if kwargs.get('calc_second'): self.calc_second_spectrum(highest_octave=max_freq)
[docs] def check_timesignal(self, timesignal): """ converts pd.DataFrame['Vx'] and Series into numpy array. :param timesignal: input timesignal :return: converted timesignal. :rtype: numpy.ndarray """ if isinstance(timesignal, pd.DataFrame): ts = np.array(timesignal['Vx']) elif isinstance(timesignal, pd.Series): ts = timesignal.to_numpy() else: ts = timesignal self.avg = (len(ts) / 1024.) if self.avg != int(self.avg): self.avg = int(len(ts) // 1024) ts = ts[:int(self.avg * 1024)] if self.rate == 0: self.rate = 1 return ts
[docs] def calc_first_spectrum(self): """ Sets the variable ``spectrum`` to a :class:`spectrumanalyzer.SpectrumAnalyzer` object. .. important:: Calculates the first spectrum. :raises NameError: spectrumanalyzer is not installed :return: None """ try: self.spectrum = SpectrumAnalyzer( # filepath=filename, timesignal=self.timesignal, samplingrate=self.rate, averages=self.avg, shifting_method=self.shifting_method, short_mode=self.short_mode, filter_type=self.filter_type, passband_factor=self.passband_factor, filter_order=self.filter_order, filter_analog=self.filter_analog, first_spectra_highest_frequency=self.f_max, downsample=self.downsample, ) except NameError: raise NameError("SpectrumAnalyzer module not installed.") for spectrum_arr, freq, k in self.spectrum.cut_timesignal(self.num): self.spec.append(spectrum_arr) self.avg_spec = pd.DataFrame({ 'freq': self.spectrum.frequency_span_array, 'S': self.spectrum.first_spectrum})
[docs] def calc_second_spectrum(self, **kwargs): """ Calculating the second spectrum. """ self.spectrum.calculate_second_spectrum( highest_octave=kwargs.get('highest_octave', self.highest_octave), minimum_points_in_octave=kwargs.get('minimum_points_in_octave', self.minimum_points_in_octave), peak_filter=kwargs.get('peak_filter', False) ) self.freq2 = self.spectrum.frequency_span_array[ self.spectrum.frequency_span_array > 0 ] octaves = [self.highest_octave] for i in range(30): freq = np.fft.fftfreq( int(self.n / self.spectrum.number_of_first_spectra), 1. / self.rate)[3:] freq = freq[freq > 0] freq = freq[freq < self.f_max] points_in_octave = len( freq[freq < self.highest_octave / (2. ** (i + 1))]) number_of_octaves = i if points_in_octave < self.minimum_points_in_octave: break else: octaves = np.append([min(octaves) / 2.], octaves) self.info.update({ 'NofOct': number_of_octaves, }) # Calculate Timesignal if needed if not kwargs.get('skip_timesignal', False): self.calc_time_signal_second_spectrum()
[docs] def calc_time_signal_second_spectrum(self): time_signal_second_spectrum_transposed = \ self.spectrum.time_signal_second_spectrum.transpose() second_spectrum_time_array = \ np.arange(0, len(time_signal_second_spectrum_transposed[0])) * \ self.spectrum.timestep_second_spectrum time_signal_second_spectrum_transposed_normalized = \ np.zeros((len(time_signal_second_spectrum_transposed), len(time_signal_second_spectrum_transposed[0]))) time_signal_second_spectrum_transposed_normalized = \ np.fliplr(time_signal_second_spectrum_transposed_normalized) for p in range(self.spectrum.number_of_octaves): time_signal_second_spectrum_transposed_normalized[p] = \ time_signal_second_spectrum_transposed[p] / np.mean( time_signal_second_spectrum_transposed[p]) self.time_signal_second_spectrum_transposed_normalized = \ time_signal_second_spectrum_transposed_normalized self.second_spectrum_time_array = second_spectrum_time_array
[docs] def plot_spectrum(self, ax, ): self.avg_spec.plot(x='freq', y='S', loglog=True, ax=ax)
[docs] def plot_time(self, ax, ): ax.plot(np.arange(len(self.timesignal)), self.timesignal, color='red', label='time', linewidth=.2)
[docs] def subtract_mean(self, inplace=False): """Subtracts mean value from timesignal. :param inplace: Change variable inside this object? :type inplace: bool :return: normalized timesignal """ norm_ts = self.timesignal - np.mean(self.timesignal, axis=1) if inplace: self.timesignal = norm_ts return norm_ts
[docs] def write(self, file): """ Writing data to a file. Parameters ---------- file: str """ if self.avg_spec.empty: return if not os.path.exists(file): os.makedirs(file) all_first_spectra = self.spectrum.first_spectra_df.loc[:] all_first_spectra['Avg'] = self.avg_spec['S'] all_first_spectra.to_csv("%s_first.dat" % file, index=False) # all_first_spectra.to_feather("%s_first.feather" % file) if not self.spectrum.second_spectra.any(): return all_second_spectra = pd.DataFrame(self.spectrum.second_spectra, index=False) all_second_spectra['Frequency'] = \ self.spectrum.frequency_span_array_second_spectra all_second_spectra.to_csv('%s_second.dat' % file, index=False) # all_second_spectra.to_feather('%s_second.feather' % file) all_second_spectra_time = pd.DataFrame( self.spectrum.time_signal_second_spectrum) all_second_spectra_time.to_csv('%s_second_time.dat' % file, index=False)
# all_second_spectra_time.to_feather('%s_second_time.feather' % file)
[docs] def read(self, file): """ Reading data from file. Parameters ---------- file: str Filename base to read data from. Returns ------- self """ try: first_spectra = pd.read_csv("%s_first.dat" % file) except FileNotFoundError: self.logger.error('First Spectrum at %s_first.dat not found!', file) return False try: self.spectrum = SpectrumAnalyzer( timesignal=self.timesignal, samplingrate=self.rate, averages=self.avg, shifting_method=self.shifting_method, short_mode=self.short_mode, filter_type=self.filter_type, passband_factor=self.passband_factor, filter_order=self.filter_order, filter_analog=self.filter_analog, first_spectra_highest_frequency=self.f_max, downsample=self.downsample, ) except Exception as e: self.logger.error("Can not create SpectrumAnalyzer Object: %s", e) return False try: self.avg_spec = first_spectra[['Frequency', 'Avg']].loc[:] self.avg_spec.rename({'Avg': 'S', 'Frequency': 'freq'}, axis='columns', inplace=True) self.num = first_spectra.shape[1] - 2 self.spectrum.number_of_first_spectra = self.num self.spectrum.frequency_span_array = first_spectra['Frequency'] self.spectrum.finalize_first_spectrum( first_spectra.drop('Avg', axis=1)) except Exception as e: self.logger.error("Can not load variables: %s", e) return False try: second_spectrum = pd.read_csv("%s_second.dat" % file) second_spectrum_time = pd.read_csv("%s_second_time.dat" % file) except FileNotFoundError: self.logger.info('Second Spectrum (%s_second.dat) not found!', file) return True try: self.spectrum.frequency_span_array_second_spectra = \ second_spectrum['Frequency'] self.spectrum.second_spectra = second_spectrum.drop('Frequency', axis=1) self.spectrum.time_signal_second_spectrum = second_spectrum_time except Exception as e: self.logger.error("Can not load second spectrum variables: %s", e) return False self.logger.warning("Second Spectrum loaded but not all variables" "are implemented yet. Only access to \n" "- second_spectra\n" "- time_signal_second_spectrum\n" "- frequency_span_array_second_spectra") return True