# SPDX-FileCopyrightText: 2020/2021 Jonathan Pieper <ody55eus@mailbox.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Handles multiple Magnetic Flux Noise (MFN) Measurements
taken with the SR830 DAQ."""
import logging
import os
from glob import glob
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import numpy as np
import pandas as pd
import scipy.stats
import seaborn as sns
from matplotlib.colors import LogNorm
from .raw import RAW
from .multiple import MultipleM
def load_meas_info(file='data/mfn_info.csv'):
meas_info = pd.read_csv(file, index_col=0)
meas_info.index.name = 'Nr'
# Drop Measurements where an error occured.
meas_info = meas_info[meas_info['Error'] != True]
meas_info.drop('Error', axis=1, inplace=True)
return meas_info
[docs]class MFN(MultipleM):
def __init__(self, files, **kwargs):
r"""Magnetic Flux Noise Measurements.
Class to handle and plot MFN Measurements
taken with the SR830 DAQ.
if a measurement number is given the files are
loaded from folder:
``data/MFN/m%s/*.dat``
else: files must be a list of filenames:
.. code:: python
:linenos:
:caption: Example
arg = glob('data/MFN/mXXX/m[0-9.]*.dat')
mfn = MFN(arg)
:param files: list of files or measurement number.
:param kwargs: Different additional parameters.
:param int nr: Measurement number
(default: None).
:param bool calc_first: Calculating first spectrum
(default: True).
:param bool calc_second: Calculating second spectrum
(default: False).
:param bool downsample: downsample timesignal for first spectrum
(default: False).
:param int num_first_spectra: Number of spectra to average
(default: 64).
:param float timeconstant: Lock-In timeconstant to cut first spectrum
at corresponding frequency (see ``cut_fmax``).
:param float cut_fmax: Cut first spectrum at this frequency
(default: :math:`f_{max} = \frac{\tau}{2\pi}`).
:param default_cm: matplotlib color map for contour plots
(default: style['default_cm']).
:type default_cm: matplotlib.cm
"""
super().__init__()
self.name = 'Magnetic Flux Noise'
self.default_cm = kwargs.get('default_cm', self.style.get('default_cm'))
self.style['default_cm'] = self.default_cm
self.info = {
'Nr': kwargs.get('nr'),
'first': kwargs.get('calc_first', True),
'second': kwargs.get('calc_second', False),
'num': kwargs.get('num_first_spectra', 64),
'downsample': kwargs.get('downsample', False),
'length': kwargs.get('length'),
}
if isinstance(files, int):
self.info['Nr'] = files
files = glob('data/MFN/m%s/*' % self.info['Nr'])
self.logger = logging.getLogger('MFNMeasurement (%s)' %
self.info['Nr'])
self.data, self.measurements = self.load_meas(files, **kwargs)
# Stop here if we have no data
if len(self.data) == 0:
return
# Cut too long measurements
if kwargs.get('equalize_length', True):
self.__equalize_length()
self.info['timeconstant'] = kwargs.get('timeconstant')
# Cut higher frequencies if timeconstant is known
fmax_rate = self.info.get('rate') / (2 * np.pi) if self.info.get('rate') else 1e6
fmax_tc = 1 / (2 * np.pi * self.info['timeconstant']) if self.info['timeconstant'] else 1e6
fmax = np.min([fmax_rate, fmax_tc])
self.info['fmax'] = fmax
if fmax < 1e6 or kwargs.get('cut_fmax'):
self.cut_fmax(kwargs.get('cut_fmax', fmax))
[docs] def load_meas(self, files, **kwargs):
"""
Loading a single magnetic flux noise DAQ measurement.
Files in measrument folder contain multiple measurements
with one measurement per field.
:param files: files to load
:return: DataFrame Data, dict Measurement Objects
:rtype: tuple
"""
max_len = 0
meas = {}
all_data = pd.DataFrame({'Field': [], 'Time': [], 'Vx': [], 'Vy': []})
for f in files:
info_dict = self.get_info_from_name(f)
field = info_dict.get('field')
nr = info_dict.get('nr')
if not self.info.get('Nr'):
self.info['Nr'] = nr
data_df = pd.read_csv(f, sep='\t')
if data_df.empty:
continue
if len(data_df['Vx']) % 1024:
d = data_df['Vx'].iloc[:-(len(data_df['Vx']) % 1024)]
else:
d = data_df.Vx
max_len = len(d)
data = {
'data': d,
'info': {
'Nr': nr,
'field': field,
'rate': 1 / data_df.time.diff().mean(),
'length': max_len * data_df.time.diff().mean(),
}
}
if not self.info['length']:
self.info['length'] = max_len * data_df.time.diff().mean()
if not self.info.get('rate'):
self.info['rate'] = 1 / data_df.time.diff().mean()
# Calculate the first and second spectrum
meas[field] = self._get_raw_meas(data, **kwargs)
if meas[field] == None:
meas.pop(field)
tmp_df = pd.DataFrame(
{'Field': field,
'Time': data_df.time.iloc[:max_len],
'Vx': data_df.Vx.iloc[:max_len] * kwargs.get('factor', 1e3),
'Vy': data_df.Vy.iloc[:max_len] * kwargs.get('factor', 1e3),
})
all_data = pd.concat([all_data, tmp_df]) # , how='outer')
return all_data, meas
def _get_raw_meas(self, data, **kwargs):
"""
Returns the RAW Measurement.
:param dict data: time signal and info
:param bool calculate_second_by_hand: if we calc
:return: RAWMeasurement
"""
if len(data['data']) == 0:
return None
ret = RAW(data,
rate=data['info']['rate'],
nof_first_spectra=self.info.get('num', 64),
calc_first=self.info['first'],
calc_second=self.info['second'],
downsample=self.info['downsample'],
**kwargs.get('raw_kwargs', dict())
)
if kwargs.get('calculate_second_by_hand', False):
ret.calc_second_spectrum(
highest_octave=kwargs.get(
'highest_octave', ret.avg_spec.freq.max()),
minimum_points_in_octave=kwargs.get(
'minimum_points_in_octave', 3),
peak_filter=kwargs.get(
'peak_filter', False),
)
return ret
[docs] def cut_fmax(self, fmax):
"""Cut frequencies higher than maximum allowed frequencies.
Args:
fmax:
"""
for field, spec in self.measurements.items():
# Drop all frequencies larger than fmax
s = spec.spectrum.first_spectra_df
s.drop(s.query('Frequency > %s' % fmax).index.tolist(),
axis=0, inplace=True
)
freq = spec.spectrum.frequency_span_array
freq = freq[freq <= fmax]
spec.spectrum.frequency_span_array = freq
# Reset First Spectrum Variables
spec.spectrum.finalize_first_spectrum(s)
spec.avg_spec = pd.DataFrame({
'freq': freq,
'S': spec.spectrum.first_spectrum_avg.loc[:] # copy
})
def __equalize_length(self):
"""
Check that data for all fields have same amount of data.
:return:
"""
# Check if all fields have same amount of data
nofvalues = self.data.groupby('Field').count()
if nofvalues.Vx.max() - nofvalues.Vx.min() > 0:
fields_with_more_data = nofvalues[
nofvalues.Vx > nofvalues.Vx.median()].index.tolist()
fields_with_less_data = nofvalues[
nofvalues.Vx < nofvalues.Vx.median()].index.tolist()
self.logger.warning(
'Not all fields have same amount of data (median = %s):\n' % (
nofvalues.Vx.quantile(.5))
+ 'Fields with more data: %s\n' % fields_with_more_data
+ 'Fields with less data: %s\n' % fields_with_less_data
+ '%s\n' % nofvalues.Vx.describe()
+ 'To avoid data loss use the option "equalize_length=False"!')
for field in self.data.Field.unique():
condition = 'Field == %s' % field
if field in fields_with_more_data:
# Cut data
cut_data = self.data.query(condition).iloc[
:int(nofvalues.Vx.median())]
self.data.drop(
self.data.query(condition).index.tolist(),
axis=0, inplace=True)
self.data = pd.merge(self.data, cut_data, how='outer')
if self.data.query(condition).shape[0] > 0:
# recalculate Spectrum
rate = 1 / self.data.query(condition).Time.diff().median()
self.measurements[field] = self._get_raw_meas(
{'data': self.data.query(condition).Vx.to_numpy(),
'info': {
'Nr': self.info['Nr'],
'field': field,
'rate': rate,
}
},
)
else:
# Remove field from measurements
self.measurements.pop(field)
self.data.drop(
self.data.query(condition).index.tolist(),
axis=0, inplace=True)
self.logger.error(
'This field (%s) has more data '
'(%s) than median (%s): Cutting data!' % (
field,
nofvalues.Vx.loc[field],
nofvalues.Vx.median()
)
)
if field in fields_with_less_data:
self.logger.error(
'This field (%s) has less data '
'(%s) than median (%s): Deleting field from data!' % (
field,
nofvalues.Vx.loc[field],
nofvalues.Vx.median()
)
)
self.data.drop(
self.data.query(condition).index,
inplace=True)
self.measurements.pop(field)
[docs] def plot_first_and_second(self, **kwargs):
"""
Args:
**kwargs:
"""
sns.set_palette(sns.color_palette("hls", len(self.measurements)))
fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1,
figsize=(11.69, 8.27), )
fields = kwargs.get('fields', self.data.sort_values(
'Field')['Field'].unique())
for field in fields:
# Setting shortcuts
try:
meas = self.measurements[field]
except KeyError:
continue
s = meas.avg_spec
ax1.plot(field, s.S.sum(), 'o')
if hasattr(meas.spectrum, 'second_spectra'):
marker = "+x*ds"
color = "rbgyk"
for octave in range(
meas.spectrum.number_of_octaves):
second = meas.spectrum.second_spectra[octave]
ax2.plot(field,
second.sum(),
"%s%s" % (color[octave], marker[octave])
)
ax2.set_title('$\\sum$ Second Spectrum')
ax2.set_ylabel('$S$ [a.u.]')
# Legend for second spectrum
field = fields[-1]
labels = []
if hasattr(self.measurements[field].spectrum, 'second_spectra'):
for octave in range(
self.measurements[field].spectrum.number_of_octaves):
labels.append('%s Octave' % (octave + 1))
ax2.legend(labels)
ax1.set_title(
'm%s: $\\sum$ First Spectrum Noise' % self.info['nr'])
ax1.set_ylabel(
'$\\langle S_V \\rangle$ [$\\mathrm{V}^2/\\mathrm{Hz}$]')
for ax in [ax2, ax1]:
ax.set_yscale('log')
plt.show()
[docs] def plot_alpha(self, field, s, ax=None, **kwargs):
"""
Fits a spectrum and plots the slope alpha.
:param s: pandas.DataFrame of the spectrum
:param ax: axis where to plot the data.
:return: linear regression fit
"""
if not ax:
fig, ax = plt.subplots()
# Fitting alpha
s['lnf'] = np.log10(s.freq)
s['lnS'] = np.log10(s.S)
fit_indices = s.freq < kwargs.get('fit_range', 0.7)
f = scipy.stats.linregress(
s.lnf.loc[fit_indices],
s.lnS.loc[fit_indices])
plt_obj, = ax.plot(field, -f.slope, kwargs.get('alpha_symbol', 'o'), label='$\\alpha$')
return f, plt_obj
[docs] def plot_info2(self, **kwargs):
"""
Args:
**kwargs:
"""
if kwargs.get('fig'):
fig = kwargs.get('fig')
(ax11, ax12) = kwargs.get('axes')
else:
fig, (ax11, ax12) = plt.subplots(nrows=2, ncols=1,
figsize=(5, 10), )
fields = kwargs.get('fields',
self.data.sort_values('Field')['Field'].unique())
self.plot_noise_chararcteristics(ax11, fields=fields)
self.spectra_field_contour(ax12, fields=fields)
[docs] def plot_noise_chararcteristics(self, ax=None, fields=None, **kwargs):
"""
Args:
ax:
fields:
**kwargs:
"""
if not ax:
fig, ax = plt.subplots()
fields = kwargs.get('fields', self.data.sort_values('Field')['Field'].unique())
par2 = ax.twinx()
spectra_df = pd.DataFrame()
for i, field in enumerate(fields):
# Setting shortcut for Average Spectrum s
try:
meas = self.measurements[field]
if meas == None or len(meas.avg_spec) < 2:
continue
except KeyError:
continue
s = meas.avg_spec
spectra_df['S%s' % field] = s.S
f, alpha = self.plot_alpha(field, s, par2, alpha_symbol="bo",
**kwargs)
meas.info['alpha'] = -f.slope,
meas.info['power'] = f.intercept
integral, = ax.plot(field, s.S.sum() * s.freq.diff().iloc[1],
'gd', label='Integral')
intercept, = ax.plot(field, 10 ** f.intercept, 'r*',
label='$S(f=1~\\mathrm{Hz})$')
ax.set_yscale('log')
plt.legend(handles=[integral, intercept, alpha], frameon=True,
fancybox=True, framealpha=.8)
# ('Integral', '$S(f=1~\\mathrm{Hz})$', '$\\alpha$'))
[docs] def plot_info(self, show_field=0, **kwargs):
"""Plots basic mfn infos
Args:
show_field:
kwargs:
"""
sns.set_palette(sns.color_palette("hls", len(self.measurements)))
fig, ((ax11, ax12),
(ax21, ax22),
(ax31, ax32)) = plt.subplots(nrows=3, ncols=2,
figsize=self.style.get('figsize'),
gridspec_kw=kwargs.get('gridspec_kw', dict()),
# wspace=.3,
# hspace=.45)
)
fields = kwargs.get('fields',
self.data.sort_values('Field')['Field'].unique())
spectra_df = pd.DataFrame()
second = pd.DataFrame()
spectra_fit = dict()
for i, field in enumerate(fields):
# Setting shortcut for Average Spectrum s
try:
meas = self.measurements[field]
if meas == None or len(meas.avg_spec) < 2:
continue
except KeyError:
continue
s = meas.avg_spec
spectra_df['S%s' % field] = s.S
ax11.loglog(s.freq, s.S*s.freq, label='%s T' % field)
f, _ = self.plot_alpha(field, s, ax21, **kwargs)
meas.info['alpha'] = -f.slope,
meas.info['amplitude'] = f.intercept
meas.info['power'] = s.S.sum() * s.freq.diff().iloc[1]
ax22.plot(field, meas.info['power'], 'o')
spectra_fit[field] = {
'alpha': meas.info['alpha'][0],
'amplitude': meas.info['amplitude'],
'power': meas.info['power']}
if field != float(field):
raise ValueError
time_df = self.data.query('Field == %f' % field)
m = time_df['Vx'].mean()
std = time_df['Vx'].std()
ax32.errorbar(field, m, yerr=2 * std, elinewidth=2, capsize=4)
if hasattr(meas.spectrum, 'second_spectra'):
if len(meas.spectrum.second_spectra) > 0:
second['S%s' % field] = meas.spectrum.second_spectra[0]
self.spectra_fit_df = pd.DataFrame(spectra_fit).T
self.spectra_fit_df.index.name = 'Field'
# Plot Contour Plot of Averaged Spectra per Field
if kwargs.get('plot_field_contour', True):
self.spectra_field_contour(ax12, **kwargs)
else:
field = fields[-1]
if kwargs.get('plot_second_sum', False):
marker = "+x*ds"
color = "rbgyk"
for octave in range(
self.measurements[
field].spectrum.number_of_octaves):
ax12.plot(field,
self.measurements[
field].spectrum.second_spectra[
octave].sum(),
"%s%s" % (color[octave], marker[octave])
)
ax12.set_ylabel('$S_2 (f)$')
elif kwargs.get('plot_second_freq', False):
frequencies = self.measurements[
field].spectrum.frequency_span_array_second_spectra
smin, smax = second.min().min(), second.max().max()
levels = np.logspace(np.log10(smin),
np.log10(smax), 20)
cs = ax12.contourf(fields,
frequencies,
second,
norm=LogNorm(vmin=smin, vmax=smax),
levels=levels,
cmap=kwargs.get('contour_cm',
self.style.get('default_cm')),
)
cbar = plt.colorbar(cs, ax=ax12)
cbar.set_ticklabels(['%.2e' % _ for _ in levels])
ax12.set_ylabel('$f$ [$\\mathrm{Hz}$]')
ax12.set_title('Second Spectrum over Field')
ax12.set_xlabel('$\\mu_0 H_{ext}$ [$T$]')
else:
self.data.query("Field == @show_field").plot(
x='Time', y='Vx', ax=ax12)
# # Plot 1/f line in Spectrum
# xmin, ymin = kwargs.get('xymin', (2e-1, 5e-14))
# factor, alpha = (1.5, 2)
# ax11.plot([xmin, xmin * factor],
# [ymin, ymin / (factor ** alpha)],
# 'k--')
# ax11.annotate('$1/f^{}$'.format(alpha),
# (
# xmin * np.sqrt(factor),
# ymin / (factor ** (alpha / 2))
# )
# )
if not kwargs.get('disable_PSD_grid'):
#ax11.set_xticks([.2+_/10 for _ in range(9)], minor=True)
ax11.grid(b=True, which='minor',
color='#cccccc',
linestyle='-.',
alpha=.3)
# Set nice title and axis labels
ax11.set_title('\\textbf{I. PSD}')
ax11.set_xlabel('$f$ [$\\mathrm{Hz}$]')
ax11.set_ylabel('$S_V (f) \cdot f$')
ax21.set_title('\\textbf{III. Slope} $\\mathbf{S_V \sim 1/f^{\\alpha}}$')
ax21.set_ylabel('$\\alpha$')
ax22.set_title('\\textbf{IV. PSD Integral}')
for ax in [ax22, ax12]:
ax.set_yscale('log')
self.plot_field_contour(ax31, show_field, **kwargs)
ax32.set_title('\\textbf{VI. Hysteresis Loop}')
ax32.set_ylabel('$V_H$ [$%s\\mathrm{V}$]' % (kwargs.get('unit', 'm')))
ax32.set_xlabel('$\\mu_0 H_{ext}$ [$T$]')
if not kwargs.get('notitle'):
fig.suptitle('m%d: ' % self.info.get('Nr', 431)
+ 'Rate=$%d\\,\\mathrm{Hz}$, ' % self.info['rate']
+ 'Length=$%s\\,\\mathrm{s}$ %s' % (self.info['length'],
kwargs.get(
'add_info', '')))
[docs] def plot_field_contour(self, ax31, show_field=0, **kwargs):
# Show specific field
field_data = self.data.query('Field == %f' % show_field)
if field_data.empty:
self.logger.error("Can not find timesignal for field (%s)" %
show_field)
else:
try:
field_spectra = self.measurements[show_field]
if kwargs.get('plot_timesignal', False):
field_data.plot(x='Time', y='Vx', legend=False, ax=ax31)
ax31.set_title(
'Timesignal ($\\mu_0 H_{ext} = %.2f\\,\\mathrm{mT}$)' % (
show_field * 1e3))
ax31.set_ylabel(
'$V_H$ [$%s\\mathrm{V}$]' % (kwargs.get('unit', 'm')))
# show chunksize of first spectra generated
num = field_spectra.spectrum.number_of_first_spectra
len_first_spectra = len(field_data) / num
for i in range(1, num):
x = field_data.iloc[int(len_first_spectra * i)]['Time']
ax31.axvline(x, linewidth=.5)
elif len(field_spectra.spectrum.first_spectra_df) > 1:
s = field_spectra.spectrum.first_spectra_df
freq = s.Frequency.to_numpy()
first_spectra = s.drop('Frequency', axis=1)
freq_table = np.tile(freq, (64,1)).T
first_spectra = first_spectra * freq_table
freq = field_spectra.avg_spec.freq
smin = kwargs.get('smin', first_spectra.min().min())
smax = kwargs.get('smax', first_spectra.max().max())
levels = np.logspace(np.log10(smin),
np.log10(smax),
kwargs.get('numlevels', 10))
times = np.linspace(0, field_spectra.info['Time'],
field_spectra.num)
cs = ax31.contourf(
times, # x = Time
freq, # y = Frequency
first_spectra, # z = all spectra
norm=LogNorm(vmin=smin, vmax=smax),
levels=levels,
cmap=kwargs.get('contour_cm',
self.style.get('default_cm')),
# levels=kwargs.get('contour_levels'),
)
self.time_spectra_field_contour_df = first_spectra.copy(deep=True)
self.time_spectra_field_contour_df['freq'] = freq
self.time_spectra_field_contour_df.set_index('freq', inplace=True)
self.time_spectra_field_contour_df.rename(
{'S{}'.format(_): times[_] for _ in range(field_spectra.num)},
axis='columns', inplace=True)
cbar = plt.gcf().colorbar(cs, ax=ax31)
cbar.set_label('$S_V^{(n)} (f) \cdot f$')
cbar.set_ticklabels(['%.2e' % _ for _ in levels])
ax31.set_yscale('log')
ax31.set_title(
'\\textbf{V. Time-resolved PSD} ($\\mu_0 H_{ext} = '
'%.0f\\,\\mathrm{mT}$)' % (show_field * 1e3)
)
ax31.set_ylabel('$f$ [Hz]')
except Exception as e:
self.logger.error("Can not plot field (H = %s) data: %s" %
(show_field, e))
raise ValueError("Can not plot field (H = %s) data: %s" %
(show_field, e))
ax31.set_xlabel('Time [s]')
[docs] def plot_various_noise_repr(self):
sns.set_palette(sns.color_palette("hls", len(self.measurements)))
fig, axes = plt.subplots(nrows=4, ncols=4,
gridspec_kw=dict(wspace=.3, hspace=.45))
for i in range(16):
ax = axes[i // 4][i % 4]
for field in self.data.sort_values(
'Field', ascending=False)['Field'].unique():
# Setting shortcut for Average Spectrum s
s = self.measurements[field].avg_spec
ax.plot(field, s.iloc[i]['S'], 'o')
ax.set_title('$S_V (f=%.3f)$' % s.iloc[i].freq)
plt.show()
[docs] def plot_various_noise_fits(self, **kwargs):
"""
Args:
**kwargs:
"""
sns.set_palette(sns.color_palette("hls", len(self.measurements)))
fig, axes = plt.subplots(nrows=kwargs.get('nrows', 3),
ncols=kwargs.get('ncols', 3),
gridspec_kw=dict(wspace=.3, hspace=.45),
figsize=(11.69, 8.27))
for i, fit_range in enumerate(kwargs.get('fit_ranges', range(3, 11))):
ax = axes[i // kwargs.get('nrows', 3)][i % kwargs.get('ncols', 3)]
for field in self.data.sort_values('Field')['Field'].unique():
# Setting shortcut for Average Spectrum s
s = self.measurements[field].avg_spec
fit = scipy.stats.linregress(s.lnf.iloc[:fit_range],
s.lnS.iloc[:fit_range])
if kwargs.get('value', 'intercept') == 'intercept':
ax.plot(field, 10 ** fit.intercept, 'o')
ax.set_yscale('log')
else:
ax.plot(field, -1 * fit.slope, 'o')
ax.set_title('Fit Range = %s' % fit_range)
fig.suptitle(kwargs.get('title', 'Noise Fit at 1 Hz'))
[docs] def plot_compare_timesignals(self, fields, **kwargs):
""" Plots a grid of 2x2 subplots to compare timesignals and histogram.
Args:
fields: list
factor: float Default: 1e3 (mV)
nrows: int Default: len(fields)
ncols: int Default: 2
sharey: bool Default: True
figsize: tuple Default: (11.69, 8.27) in
plot_range: int Default: -1
"""
fig, axes = plt.subplots(nrows=kwargs.get('nrows', 1),
ncols=kwargs.get('ncols', len(fields)),
sharey=kwargs.get('sharey', False),
figsize=kwargs.get('figsize', (11.69, 8.27)),
**kwargs.get('subplot_kw', dict()))
twinax = []
factor = kwargs.get('factor', 1e3)
colors = kwargs.get('colors',['red',
'blue',
'green',
'orange',
'purple'])
for i, (f, c) in enumerate(zip(fields, colors)):
ax = axes[i]
d = self.data[self.data['Field'] ==
f].iloc[:kwargs.get('plot_range', -1)]
ax.plot(d['Time'], d['Vx']*factor, color=c, **kwargs.get('plot_kw', {}))
twinax.append(ax.twiny())
sns.distplot(d['Vx']*factor, ax=twinax[i], vertical=True, color=c, **kwargs.get('dist_kw', {}))
ax.legend(['$V_H (\\mathbf{\\mu_0 H_{ext}=%s\\,mT})$' % int(float(f * 1e3))])
return fig, axes, twinax
[docs] def plot_various_timesignals(self, fields, **kwargs):
""" Plots a grid of 9 subplots with different timesignals
Args:
fields: list
nrows: int Default: len(fields)
ncols: int Default: 2
sharey: bool Default: True
figsize: tuple Default: (11.69, 8.27) in
plot_range: int Default: -1
"""
fig, axes = plt.subplots(nrows=kwargs.get('nrows', len(fields)),
ncols=kwargs.get('ncols', 2),
sharey=kwargs.get('sharey', True),
figsize=kwargs.get('figsize', (11.69, 8.27))
)
for i, f in enumerate(fields):
ax = axes[i // kwargs.get('nrows', 3)][i % kwargs.get('ncols', 3)]
d = self.data[self.data['Field'] ==
f].iloc[:kwargs.get('plot_range', -1)]
ax.plot(d['Time'], d['Vx'])
ax.legend(['$V_H (\\mu_0 H_{ext}=%.1f\\,\\mathrm{mT})$' % (f * 1e3)])
[docs] def spectra_field_contour(self, ax=None, **kwargs):
"""
Args:
ax:
**kwargs:
"""
if not ax:
fig, ax = plt.subplots()
fields = kwargs.get('fields',
self.data.sort_values('Field')['Field'].unique())
# Create variable for third dimension
spectra_df = pd.DataFrame()
for f in fields:
average_spectrum = self.measurements[f].avg_spec
spectra_df[f] = average_spectrum.S * \
average_spectrum.freq
frequencies = self.measurements[f].avg_spec['freq']
smin = kwargs.get('smin', spectra_df.min().min())
smax = kwargs.get('smax', spectra_df.max().max())
levels = np.logspace(np.log10(smin),
np.log10(smax),
kwargs.get('numlevels', 10))
size_fields = len(fields)
size_freq = len(frequencies)
if spectra_df.shape != (size_freq, size_fields):
self.logger.error('Cannot plot contour:\n' + \
'spectra_df.shape (%s) != ' + \
'size_freq (%s), size_fields (%s)', (
spectra_df.shape, size_freq, size_fields))
return
if size_fields < 2 or size_freq < 2:
self.logger.error('Cannot plot contour: ' + \
'size_freq (%s), size_fields (%s) incorrect' % (
size_freq, size_fields))
return
cs = ax.contourf(fields * 1e3, frequencies, spectra_df,
norm=LogNorm(vmin=smin, vmax=smax),
levels=levels,
cmap=kwargs.get('contour_cm',
self.style.get('default_cm')),
)
self.spectra_field_contour_df = spectra_df.copy(deep=True)
self.spectra_field_contour_df['freq'] = frequencies
self.spectra_field_contour_df.set_index('freq', inplace=True)
if kwargs.get('cbar', True):
cbar = plt.colorbar(cs, ax=ax)
cbar.set_ticklabels(['%.2e' % _ for _ in levels])
cbar.set_label('$S_{V} (f) \cdot f$')
ax.set_yscale('log')
ax.set_ylabel('$f$ [$\\mathrm{Hz}$]')
ax.set_xlabel('$\\mu_0 H_{ext}$ [$\\mathrm{mT}$]')
ax.set_title(kwargs.get('title', '\\textbf{II. PSD Contour Plot}'))
[docs] def write(self, file):
"""Write data to a file.
Args:
file (str):
"""
if not os.path.exists(file):
os.makedirs(file)
self.data.to_csv("%s_fields.dat" % file, index=False)
for field, m in self.measurements.items():
m.write("%s_spectrum_%s" % (file, field))
[docs] def read(self, file):
"""Write data to a file.
Args:
file (str):
"""
self.data = pd.read_csv("%s_fields.dat" % file)
for field in self.data['Field'].unique():
df = self.data.query('Field == %s' % field)
self.measurements[field] = \
RAW({'data': df['Vx']},
rate=1 / df['Time'].diff().median(),
)
if not self.measurements[field].read("%s_spectrum_%s" % (file,
field)):
self.measurements.pop(field)
[docs] def subtract_mean(self, inplace=False, **kwargs):
"""Subtracts mean value from timesignal.
:param inplace: Change variable inside this object?
:type inplace: bool
:param fields: list of fields to fit
:return: normalized timesignal
"""
df_fit = pd.DataFrame()
fields = kwargs.get('fields',
self.data.sort_values('Field')['Field'].unique())
for field in fields:
signal = self.data.query('Field == @field').copy()
mean = signal.Vx.mean()
signal.loc[:, 'normedVx'] = signal.Vx.copy() - mean
if inplace:
self.data[self.data.Field == field].loc[:, 'normedVx'] = \
signal.loc[:, 'normedVx']
df_fit = pd.concat([df_fit, signal])
return df_fit
[docs] def plot_multiple_histograms(self, steps=4,
fields=pd.Series(dtype=np.float64),
factor=1e3,
xlim=(-1.3, 1.3),
**kwargs):
"""Plots multiple histograms using seaborn ``sns.FacetGrid``
with kdeplot.
default kwargs for subplots:
kde1_kwargs = dict(clip_on=True, shade=True, alpha=1, lw=1.5, bw=.2)
kde2_kwargs = kde1_kwargs.update(dict(lw=2, color='w')
dist_kwargs = dict(hist=True, norm_hist=True, hist_kws={'alpha': .5})
ax_kwargs = dict(y=0, lw=2, clip_on=True).set(xlim=(-1.3,1.3))
"""
if fields.empty:
fields = self.data.Field.unique()
field_range = abs(fields.max() - fields.min())
stepsize = (field_range / steps)
for i in range(steps):
start = fields.min() + i * stepsize
end = start + stepsize
df_tmp = self.subtract_mean().query('Field >= @start and Field < @end').copy()
df_tmp.loc[:, 'normedVx'] *= factor
# Initialize the FacetGrid object
pal = sns.dark_palette(color="blue", n_colors=10, input='huls')
face_kwargs = dict(aspect=5, height=.8, palette=pal)
face_kwargs.update(kwargs.get('face_kwargs', {}))
kde1_kwargs = dict(clip_on=True, shade=True, alpha=1, lw=1.5, bw=.2)
kde1_kwargs.update(kwargs.get('kde1_kwargs', {}))
kde2_kwargs = kde1_kwargs.copy()
kde2_kwargs.update(dict(lw=2, bw=.2, color='w'))
kde2_kwargs.update(kwargs.get('kde2_kwargs', {}))
dist_kwargs = dict(hist=True, norm_hist=True, hist_kws={'alpha': .5})
dist_kwargs.update(kwargs.get('dist_kwargs', {}))
ax_kwargs = dict(y=0, lw=2, clip_on=True)
ax_kwargs.update(kwargs.get('ax_kwargs', {}))
g = sns.FacetGrid(df_tmp, row="Field", hue="Field", **face_kwargs)
g.map(sns.kdeplot, "normedVx", **kde1_kwargs)
g.map(sns.kdeplot, "normedVx", **kde2_kwargs)
# g.map(sns.distplot, "normedVx", **dist_kwargs)
g.map(plt.axhline, **ax_kwargs).set(xlim=xlim)
def label(x, color, label):
ax = plt.gca()
ax.text(0, kwargs.get('adjust_hspace', .3), label, fontweight="bold", color=color,
ha="left", va="center", fontsize=16,
transform=ax.transAxes)
g.map(label, "normedVx")
# Set the subplots to overlap
g.fig.subplots_adjust(-kwargs.get('adjust_hspace', .3))
# Remove axes details that don't play well with overlap
g.set_titles("")
g.set(yticks=[]) # , xticks=[-100 + 50*_ for _ in range(5)])
g.despine(bottom=True, left=True)
unit = 'mV' if factor == 1e3 else '\\mu{}V' if factor == 1e6 else 'V'
plt.gca().set_xlabel('$V_H$ [$\\mathrm{%s}$]' % unit)
[docs] def load_voltages(self, nr=508, figsize=(16, 12)):
def load_data(datapath):
meas_data = {}
meas_info = {}
all_data = {}
for f in datapath:
f_info = self.get_info_from_name(f)
sr = f_info['Vin']
nr = f_info['nr']
meas_info[sr] = f_info
meas_data[sr] = pd.read_csv(f, sep='\t')
new_df = meas_data[sr]
new_df['Vin'] = float(sr[:-2])
if nr in all_data.keys():
all_data[nr] = pd.concat([all_data[nr], new_df])
else:
all_data[nr] = new_df
return meas_data, meas_info, all_data
def calc_PSD(meas_data):
meas_obj = {}
for sr, data_df in meas_data.items():
if len(data_df['Vx']) % 1024:
avg = len(data_df['Vx']) // 1024
d = data_df['Vx'].iloc[:-(len(data_df['Vx']) % 1024)]
else:
d = data_df.Vx
max_len = len(d)
data = {
'data': d,
'info': {
'Nr': meas_info[sr]['nr'],
'rate': 1 / data_df.time.diff().mean(),
'length': max_len * data_df.time.diff().mean(),
}
}
meas_obj[sr] = RAW(data,
rate=data['info']['rate'],
nof_first_spectra=32,
calc_first=True,
downsample=False,
)
return meas_obj
def merge_data(meas_obj, cutoff_frequency=.9):
diff_voltages = pd.DataFrame()
for sr, m in meas_obj.items():
s = m.avg_spec
s = s[s.freq < cutoff_frequency]
if len(s) < 2:
continue
newdf = pd.DataFrame()
newdf['freq'] = s.freq
newdf['S'] = s.S
newdf['Vin'] = float(sr[:-2])
diff_voltages = pd.concat([diff_voltages, newdf])
return diff_voltages
def plot_PSD_classic(diff_voltages, title, groupby_category='Vin',
num=10, style=[['science'], {'context': 'talk', 'style': 'white', 'palette': 'bright', }]):
set_style(style)
c1 = sns.color_palette("hls", num)
sns.set_palette(c1)
fig, ax = plt.subplots(figsize=(12, 8))
# g = sns.relplot(x='freq', y='S', hue='Vin', data=diff_voltages, height=5, kind='line')
grouped = diff_voltages.groupby(groupby_category)
for group in grouped.groups.keys():
grouped.get_group(group).plot(x='freq', y='S', kind='line',
loglog=True, ax=ax,
label=group,
xlabel='Frequency [Hz]',
ylabel='$S_{V_H}$ [$\\mathrm{V}^2/\\mathrm{Hz}$]',
)
ax.set_title(title)
# save_plot('m506', 'png')
def show_PSD_classic(diff_voltages, title, ax=None, groupby_category='Vin',
num=10, style=[['science'], {'context': 'talk', 'style': 'white', 'palette': 'bright', }]):
if not ax:
fig, ax = plt.subplots(figsize=(12, 8))
set_style(style)
c1 = sns.color_palette("hls", num)
sns.set_palette(c1)
# g = sns.relplot(x='freq', y='S', hue='Vin', data=diff_voltages, height=5, kind='line')
grouped = diff_voltages.groupby(groupby_category)
for group in grouped.groups.keys():
grouped.get_group(group).plot(x='freq', y='S', kind='line',
loglog=True, ax=ax,
label=group,
xlabel='Frequency [Hz]',
ylabel='$S_{V_H}$ [$\\mathrm{V}^2/\\mathrm{Hz}$]',
)
ax.set_title(title)
return ax
datapath = glob('./data/MFN/m%s/*' % nr)
meas_data, meas_info, all_data = load_data(datapath)
meas_obj = calc_PSD(meas_data)
diff_voltages = merge_data(meas_obj)
fig, ax = plt.subplots(figsize=figsize)
show_PSD_classic(diff_voltages, 'm%s: Compare different Voltages' % nr, ax=ax)
inset2 = inset_axes(ax, width='100%', height='100%',
bbox_to_anchor=(.54, .75, .3, .25),
bbox_transform=ax.transAxes)
inset3 = inset_axes(ax, width='100%', height='100%',
bbox_to_anchor=(.1, .1, .3, .25),
bbox_transform=ax.transAxes)
grouped = diff_voltages.groupby('Vin')
for group in grouped.groups.keys():
g = grouped.get_group(group)
fit_area = g.query('freq > %f and freq < %f' % (2e-2, 7e-1))
fit_area['lnf'] = np.log10(fit_area.freq)
fit_area['lnS'] = np.log10(fit_area.S)
fit = scipy.stats.linregress(fit_area.lnf, fit_area.lnS)
intercept, slope = fit.intercept, -fit.slope
voltage = group
inset2.plot(voltage, 10 ** intercept, 'o')
inset3.plot(voltage, slope, 'o')
inset2.set_xlabel('Voltage [$\mathrm{V}$]')
inset2.set_ylabel('$S_{V_H} (f=1\\;$Hz$)$')
inset2.set_yscale('log')
inset3.set_xlabel('Voltage [$\mathrm{V}$]')
inset3.set_ylabel('$\\alpha$')