# SPDX-FileCopyrightText: 2020/2021 Jonathan Pieper <ody55eus@mailbox.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Handling multiple measurements.
"""
from typing import Any, Callable
from .raw import RAW
from .hloop import Hloop
from .signalanalyzer import SA
from .multiple import MultipleM
import logging
import re
import os
from glob import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
[docs]class HandleM(MultipleM):
def __init__(self, *args, **kwargs):
"""
Args:
args:
kwargs:
Note:
**Measurement Handler:** This class represents all other
measurements and is the interface to other measurement objets.
"""
super().__init__()
self.logger = logging.getLogger('Handle')
self.data = {}
self.info = pd.DataFrame()
self.info_dict = {}
self.sa_dict = {}
self.hloop_dict = {}
if kwargs.get('data'):
data = kwargs.get('data')
for nr, d in data:
data.update(
self._process_data(nr, d['data'], d['info']))
self.parse_data(data, **kwargs)
elif kwargs.get('directory'):
self.load_files(kwargs.pop('directory'), **kwargs)
def __getitem__(self, key):
"""
Args:
key:
"""
if isinstance(key, str):
if key[1] == 'm':
d = self.data.get(float(key[1:]))
if not d:
raise AttributeError("Measurement Nr. %s not found" % key)
else:
return d
else:
d = self.data.get(float(key))
if not d:
raise AttributeError("Measurement Nr. %s not found" % key)
else:
return d
return self.query('Nr == %f' % key)
def __setitem__(self, key, val):
"""
Args:
key:
val:
"""
self.logger.warning("Setting Items is not allowed.")
return
def __contains__(self, key):
"""
Args:
key:
"""
return (key in self.data)
[docs] def load_files(self, directory, **kwargs):
"""Loads all files from a specific directory and creates objects for
found measurements.
Args:
directory (str): directory to search in
**kwargs:
Returns:
None
"""
file_list = glob(os.sep.join([directory, '**/*.dat']), recursive=True)
if file_list:
self.load_folder(file_list, **kwargs)
else:
self.logger.error('No Files found: %s' % file_list)
[docs] def get_custom_header(self, kind='RAW',
instruments=None):
"""Help Function to get column names for EVE Measurements.
Args:
kind (str, optional): Kind of measurement, default: 'RAW'
instruments (list, optional): Used instruments in order of
appearance, default: ['t', 'LS', 'IPS', 'SR830']
Returns:
tuple: (list header, int skiprows)
"""
if not instruments:
instruments = ['t', 'LS', 'IPS', 'SR830']
if kind == 'RAW':
return ['Time', "Vx", "Vy"], 0
elif kind == 'SA':
return ['f', 'Vx', 'Vy'], 6
def_skiprows = 3
def_header = []
for k in instruments:
if k == 't':
def_header += ["Time", ]
if k == 'LS':
def_header += ["Temp%s" % i for i in range(6)]
if k == 'IPS':
def_header += ["B", ]
if k == 'SR830':
def_header += [
"Vx", "Vy", "Vr", "Vth",
# ["V%s%d" % (var, gpib) for gpib in [8, 9, 13]
# for var in ['x', 'y', 'r', 'th',]]
]
return def_header, def_skiprows
[docs] def load_folder(self, file_list, **kwargs):
"""
Args:
file_list:
**kwargs:
"""
self.logger.warning('Start loading folder: %s' % file_list[0])
d_dict = dict()
for f in file_list:
directory, filename = f.split(os.sep)[-2:]
regex = re.match(
'(.*)[mM]([0-9.]+)(.*)([Pp]lusses|[Cc]rosses|[Ee]mpty)(.*).dat',
filename)
if not regex:
regex = re.match('(.*)[mM]([0-9.]+)(.*).dat', filename)
if not regex:
self.logger.warning("Regex doesn't match: %s" % f)
continue
else:
folder, nr, add_info = regex.groups()
struct = None
m_type = 'NA'
struct = 'NA'
else:
folder, nr, m_type, struct, add_info = regex.groups()
try:
nr = float(nr)
except Exception:
self.logger.error('Not a valid Measurement Nr. (%s)\n' % nr + \
'Debug kind/struct/f (%s, %s, %s)' % (
m_type, struct, f))
continue
if nr == int(nr):
nr = int(nr)
def_header, def_skiprows = self.get_custom_header('SA')
elif 'SR830' in add_info:
def_header, def_skiprows = self.get_custom_header('RAW')
else:
def_header, def_skiprows = self.get_custom_header(
instruments=['t', 'LS', 'IPS', 'SR830'])
data = pd.read_csv(f, sep=kwargs.get('sep', '\t'),
# header=kwargs.get('header', def_header_index),
names=kwargs.get('names', def_header),
skiprows=kwargs.get('skiprows', def_skiprows))
if int(nr) == nr:
nr = int(nr)
subnr = False
else:
i, subnr = str(nr).split('.')
info = {'type': 'SA',
'Nr': nr,
'filename': f.split(os.sep)[-1],
'folder': folder,
'technique': m_type.strip('_'),
'Structure': struct,
'Additional': add_info.strip('_')}
if subnr:
info['type'] = 'RAW'
info['subnr'] = subnr
d_dict.update(self._process_data(nr, data, info))
if kwargs.get('drop_unused_columns', True):
if 'Temp0' in def_header:
d_dict[nr]['data'] = d_dict[nr]['data'].drop(
["Temp%s" % i for i in range(6)] + \
["Vr", "Vth"],
axis=1,
)
self.parse_data(d_dict)
[docs] def parse_data(self, data, **kwargs):
"""imports data and sets info variable.
.. code-block:: python
h.parse_data({'data': pd.DataFrame,
'info': dict(Nr=0, ** kw) })
Args:
data (dict): Data to parse
**kwargs:
Returns:
None
"""
self.logger.debug('Start parsing data: %s' % data.items())
for nr, df in data.items():
if not df.get('info'):
continue
m_type = df['info'].get('type', 'Unknown')
self.info_dict.update({nr: df['info']})
if df['info'].get('Structure'):
kwargs['Structure'] = df['info']['Structure']
if df['info'].get('Angle'):
kwargs['Angle'] = df['info']['Angle']
if df['info'].get('Additional'):
kwargs['Additional'] = df['info']['Additional']
if m_type == 'SA':
self.data[nr] = SA(df)
self.sa_dict[nr] = self.data[nr]
elif m_type == 'Hloop':
self.data[nr] = Hloop(nr, down_only=True,
**kwargs)
self.hloop_dict[nr] = self.data[nr]
elif m_type == 'RAW':
self.data[nr] = RAW(df, **kwargs)
self.hloop_dict[nr] = self.data[nr]
else:
self.logger.error(
'Measurement %s (type: %s) not processed (unknown type).' % (
nr, m_type))
self.info = pd.DataFrame(self.info_dict).T
# self.info.set_index('Nr', inplace=True)
self.logger.debug('Parsing Measurement Finished\n' + \
'Debug size data/info (%s, %s)' % (len(self.data),
self.info.shape))
def _process_data(self, nr, data, info):
"""
Args:
nr:
data:
info:
"""
self.logger.info('%s\n%s' % (data, info))
# Logging statistical values for info
for key in data.keys():
# Converting Data to numeric
if data[key].dtype == object:
try:
data[key] = pd.to_numeric(data[key], errors='coerce')
except Exception:
self.logger.error('Not Numeric column: %s/%s' % (nr, key))
info['%s_count' % key] = data[key].count()
info['%s_mean' % key] = data[key].mean()
info['%s_var' % key] = data[key].var()
info['%s_min' % key] = data[key].min()
info['%s_max' % key] = data[key].max()
d_dict = {nr: {
'data': data,
'info': info
}}
return d_dict
[docs] def plot(self, *args, **kwargs):
"""Plotting a Measurement
Args:
lofm: list of measurements
figsize: tuple size of figure
show_fit: bool
fit_range: tuple/list
plot_settings: list
grid_options: list
f_settings: 1/f line
f2_settings: 1/f^2 line
Returns:
tuple: Figure, Axis
Raises:
warning: lofm not available.
KeyError: Measurement does not exist.
"""
lofm = kwargs.get('lofm', args[0])
if not (lofm):
self.logger.warning("No list of measurements (lofm) available.")
return
fig, ax = plt.subplots(figsize=kwargs.get('figsize', (16, 12)))
for i, j in lofm.items():
if i not in self.data:
raise KeyError(i)
label = 'm%s: %s' % (i, j[0])
plot_options = j[1]
if kwargs.get('fit_range'):
intercept, slope = self.data[i].fit(kwargs.get('fit_range'))
x = np.linspace(*kwargs.get('fit_range'))
if kwargs.get('show_fit'):
ax.plot(x, (10 ** intercept * np.power(x, slope)),
label='Fit m%s' % i)
if kwargs.get('show_fit_label'):
label += ' $\\alpha = %.3f, S_R(f=0) = 10^{%.3f}$' % (
slope, intercept)
if kwargs.get('norm_S'):
self.data[i].df['norm'] = self.data[i].df['Vx'] * \
self.data[i].df['f']
plot_options['plot_y'] = 'norm'
self.data[i].plot(label=label, ax=ax,
dont_set_plot_settings=True,
**plot_options)
tmp = kwargs.get('plot_settings', dict())
if not (tmp):
tmp = {}
plot_settings = dict(
xlim=(1e-2, 1e0),
ylim=(1e-7, 5e-2),
title='($90^\circ$) Field sweeps',
grid=dict(
which='minor',
color='#ffff99',
linestyle='--',
alpha=.5))
plot_settings.update(tmp)
self.data[i]._set_plot_settings(**plot_settings)
grid_options = dict(
b=True,
which='minor',
color='#cccccc',
linestyle='-.',
alpha=.3)
grid_options.update(kwargs.get('grid_options', dict()))
plt.grid(**grid_options)
# Draw 1 over f lines
f_settings = dict(xmin=1e-2, ymin=1e-6)
f_settings.update(kwargs.get('f_settings', {}))
f2_settings = dict(ymin=1e-3, alpha=2,
plot_style='b--', an_color='blue')
f2_settings.update(kwargs.get('f2_settings', {}))
self.data[i]._draw_oneoverf(ax, **f_settings)
self.data[i]._draw_oneoverf(ax, **f2_settings)
return fig, ax
def __repr__(self):
ret = 'HandleM\n'
if not self.data:
return ret + 'Empty'
for nr, info in self.info_dict.items():
ret += 'Nr. %s:\t Type: %s\t Structure: %s\tTechnique: %s\n' % (
nr,
info['type'],
info['Structure'],
info['technique'])
return ret
[docs] def get_info(self):
"""Returns information about all measurements
and splits it into groups (SA/RAV/MFN/VH)."""
info = self.info
sa = info[info['type'] == 'SA']
raw = info[info['type'] == 'RAW']
mfn = info[info['technique'] == 'MFN']
vh = info[info['technique'] == 'VH']
groups = dict(vh=vh, mfn=mfn, sa=sa, raw=raw)
return info, groups