Source code for ana.mfn

# SPDX-FileCopyrightText: 2020/2021 Jonathan Pieper <ody55eus@mailbox.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later

"""Handles multiple Magnetic Flux Noise (MFN) Measurements 
taken with the SR830 DAQ."""

import logging
import os
from glob import glob

import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import numpy as np
import pandas as pd
import scipy.stats
import seaborn as sns
from matplotlib.colors import LogNorm

from .raw import RAW
from .multiple import MultipleM


def load_meas_info(file='data/mfn_info.csv'):
    meas_info = pd.read_csv(file, index_col=0)
    meas_info.index.name = 'Nr'

    # Drop Measurements where an error occured.
    meas_info = meas_info[meas_info['Error'] != True]
    meas_info.drop('Error', axis=1, inplace=True)
    return meas_info


[docs]class MFN(MultipleM): def __init__(self, files, **kwargs): r"""Magnetic Flux Noise Measurements. Class to handle and plot MFN Measurements taken with the SR830 DAQ. if a measurement number is given the files are loaded from folder: ``data/MFN/m%s/*.dat`` else: files must be a list of filenames: .. code:: python :linenos: :caption: Example arg = glob('data/MFN/mXXX/m[0-9.]*.dat') mfn = MFN(arg) :param files: list of files or measurement number. :param kwargs: Different additional parameters. :param int nr: Measurement number (default: None). :param bool calc_first: Calculating first spectrum (default: True). :param bool calc_second: Calculating second spectrum (default: False). :param bool downsample: downsample timesignal for first spectrum (default: False). :param int num_first_spectra: Number of spectra to average (default: 64). :param float timeconstant: Lock-In timeconstant to cut first spectrum at corresponding frequency (see ``cut_fmax``). :param float cut_fmax: Cut first spectrum at this frequency (default: :math:`f_{max} = \frac{\tau}{2\pi}`). :param default_cm: matplotlib color map for contour plots (default: style['default_cm']). :type default_cm: matplotlib.cm """ super().__init__() self.name = 'Magnetic Flux Noise' self.default_cm = kwargs.get('default_cm', self.style.get('default_cm')) self.style['default_cm'] = self.default_cm self.info = { 'Nr': kwargs.get('nr'), 'first': kwargs.get('calc_first', True), 'second': kwargs.get('calc_second', False), 'num': kwargs.get('num_first_spectra', 64), 'downsample': kwargs.get('downsample', False), 'length': kwargs.get('length'), } if isinstance(files, int): self.info['Nr'] = files files = glob('data/MFN/m%s/*' % self.info['Nr']) self.logger = logging.getLogger('MFNMeasurement (%s)' % self.info['Nr']) self.data, self.measurements = self.load_meas(files, **kwargs) # Stop here if we have no data if len(self.data) == 0: return # Cut too long measurements if kwargs.get('equalize_length', True): self.__equalize_length() self.info['timeconstant'] = kwargs.get('timeconstant') # Cut higher frequencies if timeconstant is known fmax_rate = self.info.get('rate') / (2 * np.pi) if self.info.get('rate') else 1e6 fmax_tc = 1 / (2 * np.pi * self.info['timeconstant']) if self.info['timeconstant'] else 1e6 fmax = np.min([fmax_rate, fmax_tc]) self.info['fmax'] = fmax if fmax < 1e6 or kwargs.get('cut_fmax'): self.cut_fmax(kwargs.get('cut_fmax', fmax))
[docs] def load_meas(self, files, **kwargs): """ Loading a single magnetic flux noise DAQ measurement. Files in measrument folder contain multiple measurements with one measurement per field. :param files: files to load :return: DataFrame Data, dict Measurement Objects :rtype: tuple """ max_len = 0 meas = {} all_data = pd.DataFrame({'Field': [], 'Time': [], 'Vx': [], 'Vy': []}) for f in files: info_dict = self.get_info_from_name(f) field = info_dict.get('field') nr = info_dict.get('nr') if not self.info.get('Nr'): self.info['Nr'] = nr data_df = pd.read_csv(f, sep='\t') if data_df.empty: continue if len(data_df['Vx']) % 1024: d = data_df['Vx'].iloc[:-(len(data_df['Vx']) % 1024)] else: d = data_df.Vx max_len = len(d) data = { 'data': d, 'info': { 'Nr': nr, 'field': field, 'rate': 1 / data_df.time.diff().mean(), 'length': max_len * data_df.time.diff().mean(), } } if not self.info['length']: self.info['length'] = max_len * data_df.time.diff().mean() if not self.info.get('rate'): self.info['rate'] = 1 / data_df.time.diff().mean() # Calculate the first and second spectrum meas[field] = self._get_raw_meas(data, **kwargs) if meas[field] == None: meas.pop(field) tmp_df = pd.DataFrame( {'Field': field, 'Time': data_df.time.iloc[:max_len], 'Vx': data_df.Vx.iloc[:max_len] * kwargs.get('factor', 1e3), 'Vy': data_df.Vy.iloc[:max_len] * kwargs.get('factor', 1e3), }) all_data = pd.concat([all_data, tmp_df]) # , how='outer') return all_data, meas
def _get_raw_meas(self, data, **kwargs): """ Returns the RAW Measurement. :param dict data: time signal and info :param bool calculate_second_by_hand: if we calc :return: RAWMeasurement """ if len(data['data']) == 0: return None ret = RAW(data, rate=data['info']['rate'], nof_first_spectra=self.info.get('num', 64), calc_first=self.info['first'], calc_second=self.info['second'], downsample=self.info['downsample'], **kwargs.get('raw_kwargs', dict()) ) if kwargs.get('calculate_second_by_hand', False): ret.calc_second_spectrum( highest_octave=kwargs.get( 'highest_octave', ret.avg_spec.freq.max()), minimum_points_in_octave=kwargs.get( 'minimum_points_in_octave', 3), peak_filter=kwargs.get( 'peak_filter', False), ) return ret
[docs] def cut_fmax(self, fmax): """Cut frequencies higher than maximum allowed frequencies. Args: fmax: """ for field, spec in self.measurements.items(): # Drop all frequencies larger than fmax s = spec.spectrum.first_spectra_df s.drop(s.query('Frequency > %s' % fmax).index.tolist(), axis=0, inplace=True ) freq = spec.spectrum.frequency_span_array freq = freq[freq <= fmax] spec.spectrum.frequency_span_array = freq # Reset First Spectrum Variables spec.spectrum.finalize_first_spectrum(s) spec.avg_spec = pd.DataFrame({ 'freq': freq, 'S': spec.spectrum.first_spectrum_avg.loc[:] # copy })
def __equalize_length(self): """ Check that data for all fields have same amount of data. :return: """ # Check if all fields have same amount of data nofvalues = self.data.groupby('Field').count() if nofvalues.Vx.max() - nofvalues.Vx.min() > 0: fields_with_more_data = nofvalues[ nofvalues.Vx > nofvalues.Vx.median()].index.tolist() fields_with_less_data = nofvalues[ nofvalues.Vx < nofvalues.Vx.median()].index.tolist() self.logger.warning( 'Not all fields have same amount of data (median = %s):\n' % ( nofvalues.Vx.quantile(.5)) + 'Fields with more data: %s\n' % fields_with_more_data + 'Fields with less data: %s\n' % fields_with_less_data + '%s\n' % nofvalues.Vx.describe() + 'To avoid data loss use the option "equalize_length=False"!') for field in self.data.Field.unique(): condition = 'Field == %s' % field if field in fields_with_more_data: # Cut data cut_data = self.data.query(condition).iloc[ :int(nofvalues.Vx.median())] self.data.drop( self.data.query(condition).index.tolist(), axis=0, inplace=True) self.data = pd.merge(self.data, cut_data, how='outer') if self.data.query(condition).shape[0] > 0: # recalculate Spectrum rate = 1 / self.data.query(condition).Time.diff().median() self.measurements[field] = self._get_raw_meas( {'data': self.data.query(condition).Vx.to_numpy(), 'info': { 'Nr': self.info['Nr'], 'field': field, 'rate': rate, } }, ) else: # Remove field from measurements self.measurements.pop(field) self.data.drop( self.data.query(condition).index.tolist(), axis=0, inplace=True) self.logger.error( 'This field (%s) has more data ' '(%s) than median (%s): Cutting data!' % ( field, nofvalues.Vx.loc[field], nofvalues.Vx.median() ) ) if field in fields_with_less_data: self.logger.error( 'This field (%s) has less data ' '(%s) than median (%s): Deleting field from data!' % ( field, nofvalues.Vx.loc[field], nofvalues.Vx.median() ) ) self.data.drop( self.data.query(condition).index, inplace=True) self.measurements.pop(field)
[docs] def plot_first_and_second(self, **kwargs): """ Args: **kwargs: """ sns.set_palette(sns.color_palette("hls", len(self.measurements))) fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(11.69, 8.27), ) fields = kwargs.get('fields', self.data.sort_values( 'Field')['Field'].unique()) for field in fields: # Setting shortcuts try: meas = self.measurements[field] except KeyError: continue s = meas.avg_spec ax1.plot(field, s.S.sum(), 'o') if hasattr(meas.spectrum, 'second_spectra'): marker = "+x*ds" color = "rbgyk" for octave in range( meas.spectrum.number_of_octaves): second = meas.spectrum.second_spectra[octave] ax2.plot(field, second.sum(), "%s%s" % (color[octave], marker[octave]) ) ax2.set_title('$\\sum$ Second Spectrum') ax2.set_ylabel('$S$ [a.u.]') # Legend for second spectrum field = fields[-1] labels = [] if hasattr(self.measurements[field].spectrum, 'second_spectra'): for octave in range( self.measurements[field].spectrum.number_of_octaves): labels.append('%s Octave' % (octave + 1)) ax2.legend(labels) ax1.set_title( 'm%s: $\\sum$ First Spectrum Noise' % self.info['nr']) ax1.set_ylabel( '$\\langle S_V \\rangle$ [$\\mathrm{V}^2/\\mathrm{Hz}$]') for ax in [ax2, ax1]: ax.set_yscale('log') plt.show()
[docs] def plot_alpha(self, field, s, ax=None, **kwargs): """ Fits a spectrum and plots the slope alpha. :param s: pandas.DataFrame of the spectrum :param ax: axis where to plot the data. :return: linear regression fit """ if not ax: fig, ax = plt.subplots() # Fitting alpha s['lnf'] = np.log10(s.freq) s['lnS'] = np.log10(s.S) fit_indices = s.freq < kwargs.get('fit_range', 0.7) f = scipy.stats.linregress( s.lnf.loc[fit_indices], s.lnS.loc[fit_indices]) plt_obj, = ax.plot(field, -f.slope, kwargs.get('alpha_symbol', 'o'), label='$\\alpha$') return f, plt_obj
[docs] def plot_info2(self, **kwargs): """ Args: **kwargs: """ if kwargs.get('fig'): fig = kwargs.get('fig') (ax11, ax12) = kwargs.get('axes') else: fig, (ax11, ax12) = plt.subplots(nrows=2, ncols=1, figsize=(5, 10), ) fields = kwargs.get('fields', self.data.sort_values('Field')['Field'].unique()) self.plot_noise_chararcteristics(ax11, fields=fields) self.spectra_field_contour(ax12, fields=fields)
[docs] def plot_noise_chararcteristics(self, ax=None, fields=None, **kwargs): """ Args: ax: fields: **kwargs: """ if not ax: fig, ax = plt.subplots() fields = kwargs.get('fields', self.data.sort_values('Field')['Field'].unique()) par2 = ax.twinx() spectra_df = pd.DataFrame() for i, field in enumerate(fields): # Setting shortcut for Average Spectrum s try: meas = self.measurements[field] if meas == None or len(meas.avg_spec) < 2: continue except KeyError: continue s = meas.avg_spec spectra_df['S%s' % field] = s.S f, alpha = self.plot_alpha(field, s, par2, alpha_symbol="bo", **kwargs) meas.info['alpha'] = -f.slope, meas.info['power'] = f.intercept integral, = ax.plot(field, s.S.sum() * s.freq.diff().iloc[1], 'gd', label='Integral') intercept, = ax.plot(field, 10 ** f.intercept, 'r*', label='$S(f=1~\\mathrm{Hz})$') ax.set_yscale('log') plt.legend(handles=[integral, intercept, alpha], frameon=True, fancybox=True, framealpha=.8)
# ('Integral', '$S(f=1~\\mathrm{Hz})$', '$\\alpha$'))
[docs] def plot_info(self, show_field=0, **kwargs): """Plots basic mfn infos Args: show_field: kwargs: """ sns.set_palette(sns.color_palette("hls", len(self.measurements))) fig, ((ax11, ax12), (ax21, ax22), (ax31, ax32)) = plt.subplots(nrows=3, ncols=2, figsize=self.style.get('figsize'), gridspec_kw=kwargs.get('gridspec_kw', dict()), # wspace=.3, # hspace=.45) ) fields = kwargs.get('fields', self.data.sort_values('Field')['Field'].unique()) spectra_df = pd.DataFrame() second = pd.DataFrame() spectra_fit = dict() for i, field in enumerate(fields): # Setting shortcut for Average Spectrum s try: meas = self.measurements[field] if meas == None or len(meas.avg_spec) < 2: continue except KeyError: continue s = meas.avg_spec spectra_df['S%s' % field] = s.S ax11.loglog(s.freq, s.S*s.freq, label='%s T' % field) f, _ = self.plot_alpha(field, s, ax21, **kwargs) meas.info['alpha'] = -f.slope, meas.info['amplitude'] = f.intercept meas.info['power'] = s.S.sum() * s.freq.diff().iloc[1] ax22.plot(field, meas.info['power'], 'o') spectra_fit[field] = { 'alpha': meas.info['alpha'][0], 'amplitude': meas.info['amplitude'], 'power': meas.info['power']} if field != float(field): raise ValueError time_df = self.data.query('Field == %f' % field) m = time_df['Vx'].mean() std = time_df['Vx'].std() ax32.errorbar(field, m, yerr=2 * std, elinewidth=2, capsize=4) if hasattr(meas.spectrum, 'second_spectra'): if len(meas.spectrum.second_spectra) > 0: second['S%s' % field] = meas.spectrum.second_spectra[0] self.spectra_fit_df = pd.DataFrame(spectra_fit).T self.spectra_fit_df.index.name = 'Field' # Plot Contour Plot of Averaged Spectra per Field if kwargs.get('plot_field_contour', True): self.spectra_field_contour(ax12, **kwargs) else: field = fields[-1] if kwargs.get('plot_second_sum', False): marker = "+x*ds" color = "rbgyk" for octave in range( self.measurements[ field].spectrum.number_of_octaves): ax12.plot(field, self.measurements[ field].spectrum.second_spectra[ octave].sum(), "%s%s" % (color[octave], marker[octave]) ) ax12.set_ylabel('$S_2 (f)$') elif kwargs.get('plot_second_freq', False): frequencies = self.measurements[ field].spectrum.frequency_span_array_second_spectra smin, smax = second.min().min(), second.max().max() levels = np.logspace(np.log10(smin), np.log10(smax), 20) cs = ax12.contourf(fields, frequencies, second, norm=LogNorm(vmin=smin, vmax=smax), levels=levels, cmap=kwargs.get('contour_cm', self.style.get('default_cm')), ) cbar = plt.colorbar(cs, ax=ax12) cbar.set_ticklabels(['%.2e' % _ for _ in levels]) ax12.set_ylabel('$f$ [$\\mathrm{Hz}$]') ax12.set_title('Second Spectrum over Field') ax12.set_xlabel('$\\mu_0 H_{ext}$ [$T$]') else: self.data.query("Field == @show_field").plot( x='Time', y='Vx', ax=ax12) # # Plot 1/f line in Spectrum # xmin, ymin = kwargs.get('xymin', (2e-1, 5e-14)) # factor, alpha = (1.5, 2) # ax11.plot([xmin, xmin * factor], # [ymin, ymin / (factor ** alpha)], # 'k--') # ax11.annotate('$1/f^{}$'.format(alpha), # ( # xmin * np.sqrt(factor), # ymin / (factor ** (alpha / 2)) # ) # ) if not kwargs.get('disable_PSD_grid'): #ax11.set_xticks([.2+_/10 for _ in range(9)], minor=True) ax11.grid(b=True, which='minor', color='#cccccc', linestyle='-.', alpha=.3) # Set nice title and axis labels ax11.set_title('\\textbf{I. PSD}') ax11.set_xlabel('$f$ [$\\mathrm{Hz}$]') ax11.set_ylabel('$S_V (f) \cdot f$') ax21.set_title('\\textbf{III. Slope} $\\mathbf{S_V \sim 1/f^{\\alpha}}$') ax21.set_ylabel('$\\alpha$') ax22.set_title('\\textbf{IV. PSD Integral}') for ax in [ax22, ax12]: ax.set_yscale('log') self.plot_field_contour(ax31, show_field, **kwargs) ax32.set_title('\\textbf{VI. Hysteresis Loop}') ax32.set_ylabel('$V_H$ [$%s\\mathrm{V}$]' % (kwargs.get('unit', 'm'))) ax32.set_xlabel('$\\mu_0 H_{ext}$ [$T$]') if not kwargs.get('notitle'): fig.suptitle('m%d: ' % self.info.get('Nr', 431) + 'Rate=$%d\\,\\mathrm{Hz}$, ' % self.info['rate'] + 'Length=$%s\\,\\mathrm{s}$ %s' % (self.info['length'], kwargs.get( 'add_info', '')))
[docs] def plot_field_contour(self, ax31, show_field=0, **kwargs): # Show specific field field_data = self.data.query('Field == %f' % show_field) if field_data.empty: self.logger.error("Can not find timesignal for field (%s)" % show_field) else: try: field_spectra = self.measurements[show_field] if kwargs.get('plot_timesignal', False): field_data.plot(x='Time', y='Vx', legend=False, ax=ax31) ax31.set_title( 'Timesignal ($\\mu_0 H_{ext} = %.2f\\,\\mathrm{mT}$)' % ( show_field * 1e3)) ax31.set_ylabel( '$V_H$ [$%s\\mathrm{V}$]' % (kwargs.get('unit', 'm'))) # show chunksize of first spectra generated num = field_spectra.spectrum.number_of_first_spectra len_first_spectra = len(field_data) / num for i in range(1, num): x = field_data.iloc[int(len_first_spectra * i)]['Time'] ax31.axvline(x, linewidth=.5) elif len(field_spectra.spectrum.first_spectra_df) > 1: s = field_spectra.spectrum.first_spectra_df freq = s.Frequency.to_numpy() first_spectra = s.drop('Frequency', axis=1) freq_table = np.tile(freq, (64,1)).T first_spectra = first_spectra * freq_table freq = field_spectra.avg_spec.freq smin = kwargs.get('smin', first_spectra.min().min()) smax = kwargs.get('smax', first_spectra.max().max()) levels = np.logspace(np.log10(smin), np.log10(smax), kwargs.get('numlevels', 10)) times = np.linspace(0, field_spectra.info['Time'], field_spectra.num) cs = ax31.contourf( times, # x = Time freq, # y = Frequency first_spectra, # z = all spectra norm=LogNorm(vmin=smin, vmax=smax), levels=levels, cmap=kwargs.get('contour_cm', self.style.get('default_cm')), # levels=kwargs.get('contour_levels'), ) self.time_spectra_field_contour_df = first_spectra.copy(deep=True) self.time_spectra_field_contour_df['freq'] = freq self.time_spectra_field_contour_df.set_index('freq', inplace=True) self.time_spectra_field_contour_df.rename( {'S{}'.format(_): times[_] for _ in range(field_spectra.num)}, axis='columns', inplace=True) cbar = plt.gcf().colorbar(cs, ax=ax31) cbar.set_label('$S_V^{(n)} (f) \cdot f$') cbar.set_ticklabels(['%.2e' % _ for _ in levels]) ax31.set_yscale('log') ax31.set_title( '\\textbf{V. Time-resolved PSD} ($\\mu_0 H_{ext} = ' '%.0f\\,\\mathrm{mT}$)' % (show_field * 1e3) ) ax31.set_ylabel('$f$ [Hz]') except Exception as e: self.logger.error("Can not plot field (H = %s) data: %s" % (show_field, e)) raise ValueError("Can not plot field (H = %s) data: %s" % (show_field, e)) ax31.set_xlabel('Time [s]')
[docs] def plot_various_noise_repr(self): sns.set_palette(sns.color_palette("hls", len(self.measurements))) fig, axes = plt.subplots(nrows=4, ncols=4, gridspec_kw=dict(wspace=.3, hspace=.45)) for i in range(16): ax = axes[i // 4][i % 4] for field in self.data.sort_values( 'Field', ascending=False)['Field'].unique(): # Setting shortcut for Average Spectrum s s = self.measurements[field].avg_spec ax.plot(field, s.iloc[i]['S'], 'o') ax.set_title('$S_V (f=%.3f)$' % s.iloc[i].freq) plt.show()
[docs] def plot_various_noise_fits(self, **kwargs): """ Args: **kwargs: """ sns.set_palette(sns.color_palette("hls", len(self.measurements))) fig, axes = plt.subplots(nrows=kwargs.get('nrows', 3), ncols=kwargs.get('ncols', 3), gridspec_kw=dict(wspace=.3, hspace=.45), figsize=(11.69, 8.27)) for i, fit_range in enumerate(kwargs.get('fit_ranges', range(3, 11))): ax = axes[i // kwargs.get('nrows', 3)][i % kwargs.get('ncols', 3)] for field in self.data.sort_values('Field')['Field'].unique(): # Setting shortcut for Average Spectrum s s = self.measurements[field].avg_spec fit = scipy.stats.linregress(s.lnf.iloc[:fit_range], s.lnS.iloc[:fit_range]) if kwargs.get('value', 'intercept') == 'intercept': ax.plot(field, 10 ** fit.intercept, 'o') ax.set_yscale('log') else: ax.plot(field, -1 * fit.slope, 'o') ax.set_title('Fit Range = %s' % fit_range) fig.suptitle(kwargs.get('title', 'Noise Fit at 1 Hz'))
[docs] def plot_compare_timesignals(self, fields, **kwargs): """ Plots a grid of 2x2 subplots to compare timesignals and histogram. Args: fields: list factor: float Default: 1e3 (mV) nrows: int Default: len(fields) ncols: int Default: 2 sharey: bool Default: True figsize: tuple Default: (11.69, 8.27) in plot_range: int Default: -1 """ fig, axes = plt.subplots(nrows=kwargs.get('nrows', 1), ncols=kwargs.get('ncols', len(fields)), sharey=kwargs.get('sharey', False), figsize=kwargs.get('figsize', (11.69, 8.27)), **kwargs.get('subplot_kw', dict())) twinax = [] factor = kwargs.get('factor', 1e3) colors = kwargs.get('colors',['red', 'blue', 'green', 'orange', 'purple']) for i, (f, c) in enumerate(zip(fields, colors)): ax = axes[i] d = self.data[self.data['Field'] == f].iloc[:kwargs.get('plot_range', -1)] ax.plot(d['Time'], d['Vx']*factor, color=c, **kwargs.get('plot_kw', {})) twinax.append(ax.twiny()) sns.distplot(d['Vx']*factor, ax=twinax[i], vertical=True, color=c, **kwargs.get('dist_kw', {})) ax.legend(['$V_H (\\mathbf{\\mu_0 H_{ext}=%s\\,mT})$' % int(float(f * 1e3))]) return fig, axes, twinax
[docs] def plot_various_timesignals(self, fields, **kwargs): """ Plots a grid of 9 subplots with different timesignals Args: fields: list nrows: int Default: len(fields) ncols: int Default: 2 sharey: bool Default: True figsize: tuple Default: (11.69, 8.27) in plot_range: int Default: -1 """ fig, axes = plt.subplots(nrows=kwargs.get('nrows', len(fields)), ncols=kwargs.get('ncols', 2), sharey=kwargs.get('sharey', True), figsize=kwargs.get('figsize', (11.69, 8.27)) ) for i, f in enumerate(fields): ax = axes[i // kwargs.get('nrows', 3)][i % kwargs.get('ncols', 3)] d = self.data[self.data['Field'] == f].iloc[:kwargs.get('plot_range', -1)] ax.plot(d['Time'], d['Vx']) ax.legend(['$V_H (\\mu_0 H_{ext}=%.1f\\,\\mathrm{mT})$' % (f * 1e3)])
[docs] def spectra_field_contour(self, ax=None, **kwargs): """ Args: ax: **kwargs: """ if not ax: fig, ax = plt.subplots() fields = kwargs.get('fields', self.data.sort_values('Field')['Field'].unique()) # Create variable for third dimension spectra_df = pd.DataFrame() for f in fields: average_spectrum = self.measurements[f].avg_spec spectra_df[f] = average_spectrum.S * \ average_spectrum.freq frequencies = self.measurements[f].avg_spec['freq'] smin = kwargs.get('smin', spectra_df.min().min()) smax = kwargs.get('smax', spectra_df.max().max()) levels = np.logspace(np.log10(smin), np.log10(smax), kwargs.get('numlevels', 10)) size_fields = len(fields) size_freq = len(frequencies) if spectra_df.shape != (size_freq, size_fields): self.logger.error('Cannot plot contour:\n' + \ 'spectra_df.shape (%s) != ' + \ 'size_freq (%s), size_fields (%s)', ( spectra_df.shape, size_freq, size_fields)) return if size_fields < 2 or size_freq < 2: self.logger.error('Cannot plot contour: ' + \ 'size_freq (%s), size_fields (%s) incorrect' % ( size_freq, size_fields)) return cs = ax.contourf(fields * 1e3, frequencies, spectra_df, norm=LogNorm(vmin=smin, vmax=smax), levels=levels, cmap=kwargs.get('contour_cm', self.style.get('default_cm')), ) self.spectra_field_contour_df = spectra_df.copy(deep=True) self.spectra_field_contour_df['freq'] = frequencies self.spectra_field_contour_df.set_index('freq', inplace=True) if kwargs.get('cbar', True): cbar = plt.colorbar(cs, ax=ax) cbar.set_ticklabels(['%.2e' % _ for _ in levels]) cbar.set_label('$S_{V} (f) \cdot f$') ax.set_yscale('log') ax.set_ylabel('$f$ [$\\mathrm{Hz}$]') ax.set_xlabel('$\\mu_0 H_{ext}$ [$\\mathrm{mT}$]') ax.set_title(kwargs.get('title', '\\textbf{II. PSD Contour Plot}'))
[docs] def write(self, file): """Write data to a file. Args: file (str): """ if not os.path.exists(file): os.makedirs(file) self.data.to_csv("%s_fields.dat" % file, index=False) for field, m in self.measurements.items(): m.write("%s_spectrum_%s" % (file, field))
[docs] def read(self, file): """Write data to a file. Args: file (str): """ self.data = pd.read_csv("%s_fields.dat" % file) for field in self.data['Field'].unique(): df = self.data.query('Field == %s' % field) self.measurements[field] = \ RAW({'data': df['Vx']}, rate=1 / df['Time'].diff().median(), ) if not self.measurements[field].read("%s_spectrum_%s" % (file, field)): self.measurements.pop(field)
[docs] def subtract_mean(self, inplace=False, **kwargs): """Subtracts mean value from timesignal. :param inplace: Change variable inside this object? :type inplace: bool :param fields: list of fields to fit :return: normalized timesignal """ df_fit = pd.DataFrame() fields = kwargs.get('fields', self.data.sort_values('Field')['Field'].unique()) for field in fields: signal = self.data.query('Field == @field').copy() mean = signal.Vx.mean() signal.loc[:, 'normedVx'] = signal.Vx.copy() - mean if inplace: self.data[self.data.Field == field].loc[:, 'normedVx'] = \ signal.loc[:, 'normedVx'] df_fit = pd.concat([df_fit, signal]) return df_fit
[docs] def plot_multiple_histograms(self, steps=4, fields=pd.Series(dtype=np.float64), factor=1e3, xlim=(-1.3, 1.3), **kwargs): """Plots multiple histograms using seaborn ``sns.FacetGrid`` with kdeplot. default kwargs for subplots: kde1_kwargs = dict(clip_on=True, shade=True, alpha=1, lw=1.5, bw=.2) kde2_kwargs = kde1_kwargs.update(dict(lw=2, color='w') dist_kwargs = dict(hist=True, norm_hist=True, hist_kws={'alpha': .5}) ax_kwargs = dict(y=0, lw=2, clip_on=True).set(xlim=(-1.3,1.3)) """ if fields.empty: fields = self.data.Field.unique() field_range = abs(fields.max() - fields.min()) stepsize = (field_range / steps) for i in range(steps): start = fields.min() + i * stepsize end = start + stepsize df_tmp = self.subtract_mean().query('Field >= @start and Field < @end').copy() df_tmp.loc[:, 'normedVx'] *= factor # Initialize the FacetGrid object pal = sns.dark_palette(color="blue", n_colors=10, input='huls') face_kwargs = dict(aspect=5, height=.8, palette=pal) face_kwargs.update(kwargs.get('face_kwargs', {})) kde1_kwargs = dict(clip_on=True, shade=True, alpha=1, lw=1.5, bw=.2) kde1_kwargs.update(kwargs.get('kde1_kwargs', {})) kde2_kwargs = kde1_kwargs.copy() kde2_kwargs.update(dict(lw=2, bw=.2, color='w')) kde2_kwargs.update(kwargs.get('kde2_kwargs', {})) dist_kwargs = dict(hist=True, norm_hist=True, hist_kws={'alpha': .5}) dist_kwargs.update(kwargs.get('dist_kwargs', {})) ax_kwargs = dict(y=0, lw=2, clip_on=True) ax_kwargs.update(kwargs.get('ax_kwargs', {})) g = sns.FacetGrid(df_tmp, row="Field", hue="Field", **face_kwargs) g.map(sns.kdeplot, "normedVx", **kde1_kwargs) g.map(sns.kdeplot, "normedVx", **kde2_kwargs) # g.map(sns.distplot, "normedVx", **dist_kwargs) g.map(plt.axhline, **ax_kwargs).set(xlim=xlim) def label(x, color, label): ax = plt.gca() ax.text(0, kwargs.get('adjust_hspace', .3), label, fontweight="bold", color=color, ha="left", va="center", fontsize=16, transform=ax.transAxes) g.map(label, "normedVx") # Set the subplots to overlap g.fig.subplots_adjust(-kwargs.get('adjust_hspace', .3)) # Remove axes details that don't play well with overlap g.set_titles("") g.set(yticks=[]) # , xticks=[-100 + 50*_ for _ in range(5)]) g.despine(bottom=True, left=True) unit = 'mV' if factor == 1e3 else '\\mu{}V' if factor == 1e6 else 'V' plt.gca().set_xlabel('$V_H$ [$\\mathrm{%s}$]' % unit)
[docs] def load_voltages(self, nr=508, figsize=(16, 12)): def load_data(datapath): meas_data = {} meas_info = {} all_data = {} for f in datapath: f_info = self.get_info_from_name(f) sr = f_info['Vin'] nr = f_info['nr'] meas_info[sr] = f_info meas_data[sr] = pd.read_csv(f, sep='\t') new_df = meas_data[sr] new_df['Vin'] = float(sr[:-2]) if nr in all_data.keys(): all_data[nr] = pd.concat([all_data[nr], new_df]) else: all_data[nr] = new_df return meas_data, meas_info, all_data def calc_PSD(meas_data): meas_obj = {} for sr, data_df in meas_data.items(): if len(data_df['Vx']) % 1024: avg = len(data_df['Vx']) // 1024 d = data_df['Vx'].iloc[:-(len(data_df['Vx']) % 1024)] else: d = data_df.Vx max_len = len(d) data = { 'data': d, 'info': { 'Nr': meas_info[sr]['nr'], 'rate': 1 / data_df.time.diff().mean(), 'length': max_len * data_df.time.diff().mean(), } } meas_obj[sr] = RAW(data, rate=data['info']['rate'], nof_first_spectra=32, calc_first=True, downsample=False, ) return meas_obj def merge_data(meas_obj, cutoff_frequency=.9): diff_voltages = pd.DataFrame() for sr, m in meas_obj.items(): s = m.avg_spec s = s[s.freq < cutoff_frequency] if len(s) < 2: continue newdf = pd.DataFrame() newdf['freq'] = s.freq newdf['S'] = s.S newdf['Vin'] = float(sr[:-2]) diff_voltages = pd.concat([diff_voltages, newdf]) return diff_voltages def plot_PSD_classic(diff_voltages, title, groupby_category='Vin', num=10, style=[['science'], {'context': 'talk', 'style': 'white', 'palette': 'bright', }]): set_style(style) c1 = sns.color_palette("hls", num) sns.set_palette(c1) fig, ax = plt.subplots(figsize=(12, 8)) # g = sns.relplot(x='freq', y='S', hue='Vin', data=diff_voltages, height=5, kind='line') grouped = diff_voltages.groupby(groupby_category) for group in grouped.groups.keys(): grouped.get_group(group).plot(x='freq', y='S', kind='line', loglog=True, ax=ax, label=group, xlabel='Frequency [Hz]', ylabel='$S_{V_H}$ [$\\mathrm{V}^2/\\mathrm{Hz}$]', ) ax.set_title(title) # save_plot('m506', 'png') def show_PSD_classic(diff_voltages, title, ax=None, groupby_category='Vin', num=10, style=[['science'], {'context': 'talk', 'style': 'white', 'palette': 'bright', }]): if not ax: fig, ax = plt.subplots(figsize=(12, 8)) set_style(style) c1 = sns.color_palette("hls", num) sns.set_palette(c1) # g = sns.relplot(x='freq', y='S', hue='Vin', data=diff_voltages, height=5, kind='line') grouped = diff_voltages.groupby(groupby_category) for group in grouped.groups.keys(): grouped.get_group(group).plot(x='freq', y='S', kind='line', loglog=True, ax=ax, label=group, xlabel='Frequency [Hz]', ylabel='$S_{V_H}$ [$\\mathrm{V}^2/\\mathrm{Hz}$]', ) ax.set_title(title) return ax datapath = glob('./data/MFN/m%s/*' % nr) meas_data, meas_info, all_data = load_data(datapath) meas_obj = calc_PSD(meas_data) diff_voltages = merge_data(meas_obj) fig, ax = plt.subplots(figsize=figsize) show_PSD_classic(diff_voltages, 'm%s: Compare different Voltages' % nr, ax=ax) inset2 = inset_axes(ax, width='100%', height='100%', bbox_to_anchor=(.54, .75, .3, .25), bbox_transform=ax.transAxes) inset3 = inset_axes(ax, width='100%', height='100%', bbox_to_anchor=(.1, .1, .3, .25), bbox_transform=ax.transAxes) grouped = diff_voltages.groupby('Vin') for group in grouped.groups.keys(): g = grouped.get_group(group) fit_area = g.query('freq > %f and freq < %f' % (2e-2, 7e-1)) fit_area['lnf'] = np.log10(fit_area.freq) fit_area['lnS'] = np.log10(fit_area.S) fit = scipy.stats.linregress(fit_area.lnf, fit_area.lnS) intercept, slope = fit.intercept, -fit.slope voltage = group inset2.plot(voltage, 10 ** intercept, 'o') inset3.plot(voltage, slope, 'o') inset2.set_xlabel('Voltage [$\mathrm{V}$]') inset2.set_ylabel('$S_{V_H} (f=1\\;$Hz$)$') inset2.set_yscale('log') inset3.set_xlabel('Voltage [$\mathrm{V}$]') inset3.set_ylabel('$\\alpha$')