# SPDX-FileCopyrightText: 2020/2021 Jonathan Pieper <ody55eus@mailbox.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Measurements with time signal only."""
from .single import SingleM
import os
import logging
import numpy as np
import pandas as pd
try:
from spectrumanalyzer import SpectrumAnalyzer
except ImportError:
pass
[docs]class RAW(SingleM):
def __init__(self, data, **kwargs):
"""
RAW Measurement:
Measurements with time signal only.
.. code-block:: python
:linenos:
:caption: Example
data = np.array(timesignal, shape=(n,))
data = {
'data': np.array(timesignal, shape=(n,))
'info': {'Nr': 123, 'Add Info': 'Important Informations'}
}
raw = ana.RAW(data)
:param data: Different possibilities.
:param rate: At which rate was the timesignal measured (samplingrate) [Hz]
:type rate: float, default: 2e-4
:info: First Spectrum parameters
:param nof_first_spectra: Cut timesignal into # pieces and average.
:type nof_first_spectra: int, default: 64
:param first_spectra_highest_frequency: Cut higher frequencies in final spectrum than this.
:type first_spectra_highest_frequency: float, default: rate/8
:param downsample: Downsample the timesignal before calculation.
:type downsamle: bool, default: True
:param calc_first: Triggers calculation of first spectrum.
:type calc_first: bool, default: False
:info: Second Spectrum parameters
:param highest_octave: The highest octave starting point [Hz] of the second spectrum.
:type highest_octave: int, default: 64
:param minimum_points_in_octave: If octave would contain less points, stop.
:type minimum_points_in_octave: int, default: 10
:param calc_second: Triggers calculation of second spectrum.
:type calc_second: bool, default: False
"""
super().__init__()
self.logger = logging.getLogger('RAW')
self.name = 'RAW'
# Default Parameter for the first and second spectrum
timesignal = data.get('data',
kwargs.get('timesignal',
pd.Series([], dtype='float64')
)
)
self.info = data.get('info', {})
self.data = data
self.rate = kwargs.get('rate', 2e-4)
# Define dummies
self.avg = 0
self.spectrum = None
self.spec = []
self.avg_spec = pd.DataFrame()
self.freq2 = np.array([])
self.time_signal_second_spectrum_transposed_normalized = np.array([])
self.second_spectrum_time_array = np.array([])
self.timesignal = self.check_timesignal(timesignal)
self.n = int(1024 * self.avg)
self.highest_octave = kwargs.get('highest_octave', 64)
self.minimum_points_in_octave = kwargs.get('minimum_points_in_octave',
10)
self.shifting_method = False
self.short_mode = 1
self.filter_type = 'lowpass'
self.passband_factor = 1
self.filter_order = 6
self.filter_analog = False
self.downsample = kwargs.get('downsample', True)
self.f_max = kwargs.get(
'first_spectra_highest_frequency', self.rate / 8)
self.num = kwargs.get('nof_first_spectra', 64)
self.info.update({
"Time": self.n / self.rate,
"Num": self.n,
"Rate": self.rate,
"Avg": self.avg,
"downsample": self.downsample,
"f_max": self.f_max,
"NofSpectra": self.num,
})
if kwargs.get('calc_first'):
self.calc_first_spectrum()
if self.avg_spec.empty:
return
max_freq = self.avg_spec['freq'].max()
self.info.update({
# "f_min": freq[0],
# 'f_max': freq.max(),
"f_min": self.avg_spec['freq'].min(),
'f_max': max_freq,
})
if kwargs.get('calc_second'):
self.calc_second_spectrum(highest_octave=max_freq)
[docs] def check_timesignal(self, timesignal):
""" converts pd.DataFrame['Vx'] and Series into numpy array.
:param timesignal: input timesignal
:return: converted timesignal.
:rtype: numpy.ndarray
"""
if isinstance(timesignal, pd.DataFrame):
ts = np.array(timesignal['Vx'])
elif isinstance(timesignal, pd.Series):
ts = timesignal.to_numpy()
else:
ts = timesignal
self.avg = (len(ts) / 1024.)
if self.avg != int(self.avg):
self.avg = int(len(ts) // 1024)
ts = ts[:int(self.avg * 1024)]
if self.rate == 0:
self.rate = 1
return ts
[docs] def calc_first_spectrum(self):
"""
Sets the variable ``spectrum`` to a
:class:`spectrumanalyzer.SpectrumAnalyzer` object.
.. important::
Calculates the first spectrum.
:raises NameError: spectrumanalyzer is not installed
:return: None
"""
try:
self.spectrum = SpectrumAnalyzer(
# filepath=filename,
timesignal=self.timesignal,
samplingrate=self.rate,
averages=self.avg,
shifting_method=self.shifting_method,
short_mode=self.short_mode,
filter_type=self.filter_type,
passband_factor=self.passband_factor,
filter_order=self.filter_order,
filter_analog=self.filter_analog,
first_spectra_highest_frequency=self.f_max,
downsample=self.downsample,
)
except NameError:
raise NameError("SpectrumAnalyzer module not installed.")
for spectrum_arr, freq, k in self.spectrum.cut_timesignal(self.num):
self.spec.append(spectrum_arr)
self.avg_spec = pd.DataFrame({
'freq': self.spectrum.frequency_span_array,
'S': self.spectrum.first_spectrum})
[docs] def calc_second_spectrum(self, **kwargs):
"""
Calculating the second spectrum.
"""
self.spectrum.calculate_second_spectrum(
highest_octave=kwargs.get('highest_octave', self.highest_octave),
minimum_points_in_octave=kwargs.get('minimum_points_in_octave',
self.minimum_points_in_octave),
peak_filter=kwargs.get('peak_filter', False)
)
self.freq2 = self.spectrum.frequency_span_array[
self.spectrum.frequency_span_array > 0
]
octaves = [self.highest_octave]
for i in range(30):
freq = np.fft.fftfreq(
int(self.n / self.spectrum.number_of_first_spectra),
1. / self.rate)[3:]
freq = freq[freq > 0]
freq = freq[freq < self.f_max]
points_in_octave = len(
freq[freq < self.highest_octave / (2. ** (i + 1))])
number_of_octaves = i
if points_in_octave < self.minimum_points_in_octave:
break
else:
octaves = np.append([min(octaves) / 2.], octaves)
self.info.update({
'NofOct': number_of_octaves,
})
# Calculate Timesignal if needed
if not kwargs.get('skip_timesignal', False):
self.calc_time_signal_second_spectrum()
[docs] def calc_time_signal_second_spectrum(self):
time_signal_second_spectrum_transposed = \
self.spectrum.time_signal_second_spectrum.transpose()
second_spectrum_time_array = \
np.arange(0, len(time_signal_second_spectrum_transposed[0])) * \
self.spectrum.timestep_second_spectrum
time_signal_second_spectrum_transposed_normalized = \
np.zeros((len(time_signal_second_spectrum_transposed),
len(time_signal_second_spectrum_transposed[0])))
time_signal_second_spectrum_transposed_normalized = \
np.fliplr(time_signal_second_spectrum_transposed_normalized)
for p in range(self.spectrum.number_of_octaves):
time_signal_second_spectrum_transposed_normalized[p] = \
time_signal_second_spectrum_transposed[p] / np.mean(
time_signal_second_spectrum_transposed[p])
self.time_signal_second_spectrum_transposed_normalized = \
time_signal_second_spectrum_transposed_normalized
self.second_spectrum_time_array = second_spectrum_time_array
[docs] def plot_spectrum(self, ax, ):
self.avg_spec.plot(x='freq', y='S', loglog=True, ax=ax)
[docs] def plot_time(self, ax, ):
ax.plot(np.arange(len(self.timesignal)), self.timesignal,
color='red', label='time', linewidth=.2)
[docs] def subtract_mean(self, inplace=False):
"""Subtracts mean value from timesignal.
:param inplace: Change variable inside this object?
:type inplace: bool
:return: normalized timesignal
"""
norm_ts = self.timesignal - np.mean(self.timesignal, axis=1)
if inplace:
self.timesignal = norm_ts
return norm_ts
[docs] def write(self, file):
"""
Writing data to a file.
Parameters
----------
file: str
"""
if self.avg_spec.empty:
return
if not os.path.exists(file):
os.makedirs(file)
all_first_spectra = self.spectrum.first_spectra_df.loc[:]
all_first_spectra['Avg'] = self.avg_spec['S']
all_first_spectra.to_csv("%s_first.dat" % file, index=False)
# all_first_spectra.to_feather("%s_first.feather" % file)
if not self.spectrum.second_spectra.any():
return
all_second_spectra = pd.DataFrame(self.spectrum.second_spectra,
index=False)
all_second_spectra['Frequency'] = \
self.spectrum.frequency_span_array_second_spectra
all_second_spectra.to_csv('%s_second.dat' % file,
index=False)
# all_second_spectra.to_feather('%s_second.feather' % file)
all_second_spectra_time = pd.DataFrame(
self.spectrum.time_signal_second_spectrum)
all_second_spectra_time.to_csv('%s_second_time.dat' % file,
index=False)
# all_second_spectra_time.to_feather('%s_second_time.feather' % file)
[docs] def read(self, file):
"""
Reading data from file.
Parameters
----------
file: str
Filename base to read data from.
Returns
-------
self
"""
try:
first_spectra = pd.read_csv("%s_first.dat" % file)
except FileNotFoundError:
self.logger.error('First Spectrum at %s_first.dat not found!',
file)
return False
try:
self.spectrum = SpectrumAnalyzer(
timesignal=self.timesignal,
samplingrate=self.rate,
averages=self.avg,
shifting_method=self.shifting_method,
short_mode=self.short_mode,
filter_type=self.filter_type,
passband_factor=self.passband_factor,
filter_order=self.filter_order,
filter_analog=self.filter_analog,
first_spectra_highest_frequency=self.f_max,
downsample=self.downsample,
)
except Exception as e:
self.logger.error("Can not create SpectrumAnalyzer Object: %s", e)
return False
try:
self.avg_spec = first_spectra[['Frequency', 'Avg']].loc[:]
self.avg_spec.rename({'Avg': 'S',
'Frequency': 'freq'},
axis='columns', inplace=True)
self.num = first_spectra.shape[1] - 2
self.spectrum.number_of_first_spectra = self.num
self.spectrum.frequency_span_array = first_spectra['Frequency']
self.spectrum.finalize_first_spectrum(
first_spectra.drop('Avg', axis=1))
except Exception as e:
self.logger.error("Can not load variables: %s", e)
return False
try:
second_spectrum = pd.read_csv("%s_second.dat" % file)
second_spectrum_time = pd.read_csv("%s_second_time.dat" % file)
except FileNotFoundError:
self.logger.info('Second Spectrum (%s_second.dat) not found!',
file)
return True
try:
self.spectrum.frequency_span_array_second_spectra = \
second_spectrum['Frequency']
self.spectrum.second_spectra = second_spectrum.drop('Frequency',
axis=1)
self.spectrum.time_signal_second_spectrum = second_spectrum_time
except Exception as e:
self.logger.error("Can not load second spectrum variables: %s", e)
return False
self.logger.warning("Second Spectrum loaded but not all variables"
"are implemented yet. Only access to \n"
"- second_spectra\n"
"- time_signal_second_spectrum\n"
"- frequency_span_array_second_spectra")
return True