Source code for ana.handle

# SPDX-FileCopyrightText: 2020/2021 Jonathan Pieper <ody55eus@mailbox.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later

"""
Handling multiple measurements.
"""

from typing import Any, Callable

from .raw import RAW
from .hloop import Hloop
from .signalanalyzer import SA
from .multiple import MultipleM

import logging
import re
import os
from glob import glob

import pandas as pd
import numpy as np

import matplotlib.pyplot as plt


[docs]class HandleM(MultipleM): def __init__(self, *args, **kwargs): """ Args: args: kwargs: Note: **Measurement Handler:** This class represents all other measurements and is the interface to other measurement objets. """ super().__init__() self.logger = logging.getLogger('Handle') self.data = {} self.info = pd.DataFrame() self.info_dict = {} self.sa_dict = {} self.hloop_dict = {} if kwargs.get('data'): data = kwargs.get('data') for nr, d in data: data.update( self._process_data(nr, d['data'], d['info'])) self.parse_data(data, **kwargs) elif kwargs.get('directory'): self.load_files(kwargs.pop('directory'), **kwargs) def __getitem__(self, key): """ Args: key: """ if isinstance(key, str): if key[1] == 'm': d = self.data.get(float(key[1:])) if not d: raise AttributeError("Measurement Nr. %s not found" % key) else: return d else: d = self.data.get(float(key)) if not d: raise AttributeError("Measurement Nr. %s not found" % key) else: return d return self.query('Nr == %f' % key) def __setitem__(self, key, val): """ Args: key: val: """ self.logger.warning("Setting Items is not allowed.") return def __contains__(self, key): """ Args: key: """ return (key in self.data)
[docs] def load_files(self, directory, **kwargs): """Loads all files from a specific directory and creates objects for found measurements. Args: directory (str): directory to search in **kwargs: Returns: None """ file_list = glob(os.sep.join([directory, '**/*.dat']), recursive=True) if file_list: self.load_folder(file_list, **kwargs) else: self.logger.error('No Files found: %s' % file_list)
[docs] def get_custom_header(self, kind='RAW', instruments=None): """Help Function to get column names for EVE Measurements. Args: kind (str, optional): Kind of measurement, default: 'RAW' instruments (list, optional): Used instruments in order of appearance, default: ['t', 'LS', 'IPS', 'SR830'] Returns: tuple: (list header, int skiprows) """ if not instruments: instruments = ['t', 'LS', 'IPS', 'SR830'] if kind == 'RAW': return ['Time', "Vx", "Vy"], 0 elif kind == 'SA': return ['f', 'Vx', 'Vy'], 6 def_skiprows = 3 def_header = [] for k in instruments: if k == 't': def_header += ["Time", ] if k == 'LS': def_header += ["Temp%s" % i for i in range(6)] if k == 'IPS': def_header += ["B", ] if k == 'SR830': def_header += [ "Vx", "Vy", "Vr", "Vth", # ["V%s%d" % (var, gpib) for gpib in [8, 9, 13] # for var in ['x', 'y', 'r', 'th',]] ] return def_header, def_skiprows
[docs] def load_folder(self, file_list, **kwargs): """ Args: file_list: **kwargs: """ self.logger.warning('Start loading folder: %s' % file_list[0]) d_dict = dict() for f in file_list: directory, filename = f.split(os.sep)[-2:] regex = re.match( '(.*)[mM]([0-9.]+)(.*)([Pp]lusses|[Cc]rosses|[Ee]mpty)(.*).dat', filename) if not regex: regex = re.match('(.*)[mM]([0-9.]+)(.*).dat', filename) if not regex: self.logger.warning("Regex doesn't match: %s" % f) continue else: folder, nr, add_info = regex.groups() struct = None m_type = 'NA' struct = 'NA' else: folder, nr, m_type, struct, add_info = regex.groups() try: nr = float(nr) except Exception: self.logger.error('Not a valid Measurement Nr. (%s)\n' % nr + \ 'Debug kind/struct/f (%s, %s, %s)' % ( m_type, struct, f)) continue if nr == int(nr): nr = int(nr) def_header, def_skiprows = self.get_custom_header('SA') elif 'SR830' in add_info: def_header, def_skiprows = self.get_custom_header('RAW') else: def_header, def_skiprows = self.get_custom_header( instruments=['t', 'LS', 'IPS', 'SR830']) data = pd.read_csv(f, sep=kwargs.get('sep', '\t'), # header=kwargs.get('header', def_header_index), names=kwargs.get('names', def_header), skiprows=kwargs.get('skiprows', def_skiprows)) if int(nr) == nr: nr = int(nr) subnr = False else: i, subnr = str(nr).split('.') info = {'type': 'SA', 'Nr': nr, 'filename': f.split(os.sep)[-1], 'folder': folder, 'technique': m_type.strip('_'), 'Structure': struct, 'Additional': add_info.strip('_')} if subnr: info['type'] = 'RAW' info['subnr'] = subnr d_dict.update(self._process_data(nr, data, info)) if kwargs.get('drop_unused_columns', True): if 'Temp0' in def_header: d_dict[nr]['data'] = d_dict[nr]['data'].drop( ["Temp%s" % i for i in range(6)] + \ ["Vr", "Vth"], axis=1, ) self.parse_data(d_dict)
[docs] def parse_data(self, data, **kwargs): """imports data and sets info variable. .. code-block:: python h.parse_data({'data': pd.DataFrame, 'info': dict(Nr=0, ** kw) }) Args: data (dict): Data to parse **kwargs: Returns: None """ self.logger.debug('Start parsing data: %s' % data.items()) for nr, df in data.items(): if not df.get('info'): continue m_type = df['info'].get('type', 'Unknown') self.info_dict.update({nr: df['info']}) if df['info'].get('Structure'): kwargs['Structure'] = df['info']['Structure'] if df['info'].get('Angle'): kwargs['Angle'] = df['info']['Angle'] if df['info'].get('Additional'): kwargs['Additional'] = df['info']['Additional'] if m_type == 'SA': self.data[nr] = SA(df) self.sa_dict[nr] = self.data[nr] elif m_type == 'Hloop': self.data[nr] = Hloop(nr, down_only=True, **kwargs) self.hloop_dict[nr] = self.data[nr] elif m_type == 'RAW': self.data[nr] = RAW(df, **kwargs) self.hloop_dict[nr] = self.data[nr] else: self.logger.error( 'Measurement %s (type: %s) not processed (unknown type).' % ( nr, m_type)) self.info = pd.DataFrame(self.info_dict).T # self.info.set_index('Nr', inplace=True) self.logger.debug('Parsing Measurement Finished\n' + \ 'Debug size data/info (%s, %s)' % (len(self.data), self.info.shape))
def _process_data(self, nr, data, info): """ Args: nr: data: info: """ self.logger.info('%s\n%s' % (data, info)) # Logging statistical values for info for key in data.keys(): # Converting Data to numeric if data[key].dtype == object: try: data[key] = pd.to_numeric(data[key], errors='coerce') except Exception: self.logger.error('Not Numeric column: %s/%s' % (nr, key)) info['%s_count' % key] = data[key].count() info['%s_mean' % key] = data[key].mean() info['%s_var' % key] = data[key].var() info['%s_min' % key] = data[key].min() info['%s_max' % key] = data[key].max() d_dict = {nr: { 'data': data, 'info': info }} return d_dict
[docs] def plot(self, *args, **kwargs): """Plotting a Measurement Args: lofm: list of measurements figsize: tuple size of figure show_fit: bool fit_range: tuple/list plot_settings: list grid_options: list f_settings: 1/f line f2_settings: 1/f^2 line Returns: tuple: Figure, Axis Raises: warning: lofm not available. KeyError: Measurement does not exist. """ lofm = kwargs.get('lofm', args[0]) if not (lofm): self.logger.warning("No list of measurements (lofm) available.") return fig, ax = plt.subplots(figsize=kwargs.get('figsize', (16, 12))) for i, j in lofm.items(): if i not in self.data: raise KeyError(i) label = 'm%s: %s' % (i, j[0]) plot_options = j[1] if kwargs.get('fit_range'): intercept, slope = self.data[i].fit(kwargs.get('fit_range')) x = np.linspace(*kwargs.get('fit_range')) if kwargs.get('show_fit'): ax.plot(x, (10 ** intercept * np.power(x, slope)), label='Fit m%s' % i) if kwargs.get('show_fit_label'): label += ' $\\alpha = %.3f, S_R(f=0) = 10^{%.3f}$' % ( slope, intercept) if kwargs.get('norm_S'): self.data[i].df['norm'] = self.data[i].df['Vx'] * \ self.data[i].df['f'] plot_options['plot_y'] = 'norm' self.data[i].plot(label=label, ax=ax, dont_set_plot_settings=True, **plot_options) tmp = kwargs.get('plot_settings', dict()) if not (tmp): tmp = {} plot_settings = dict( xlim=(1e-2, 1e0), ylim=(1e-7, 5e-2), title='($90^\circ$) Field sweeps', grid=dict( which='minor', color='#ffff99', linestyle='--', alpha=.5)) plot_settings.update(tmp) self.data[i]._set_plot_settings(**plot_settings) grid_options = dict( b=True, which='minor', color='#cccccc', linestyle='-.', alpha=.3) grid_options.update(kwargs.get('grid_options', dict())) plt.grid(**grid_options) # Draw 1 over f lines f_settings = dict(xmin=1e-2, ymin=1e-6) f_settings.update(kwargs.get('f_settings', {})) f2_settings = dict(ymin=1e-3, alpha=2, plot_style='b--', an_color='blue') f2_settings.update(kwargs.get('f2_settings', {})) self.data[i]._draw_oneoverf(ax, **f_settings) self.data[i]._draw_oneoverf(ax, **f2_settings) return fig, ax
def __repr__(self): ret = 'HandleM\n' if not self.data: return ret + 'Empty' for nr, info in self.info_dict.items(): ret += 'Nr. %s:\t Type: %s\t Structure: %s\tTechnique: %s\n' % ( nr, info['type'], info['Structure'], info['technique']) return ret
[docs] def get_info(self): """Returns information about all measurements and splits it into groups (SA/RAV/MFN/VH).""" info = self.info sa = info[info['type'] == 'SA'] raw = info[info['type'] == 'RAW'] mfn = info[info['technique'] == 'MFN'] vh = info[info['technique'] == 'VH'] groups = dict(vh=vh, mfn=mfn, sa=sa, raw=raw) return info, groups