First processing steps

This commit is contained in:
2024-02-29 21:27:11 +01:00
parent debf013ee9
commit 66c6e6c931
5 changed files with 228 additions and 65 deletions

3
data/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
import pandas as pd
from data_loading import *
from data_processing import *

54
data/data_loading.py Normal file
View File

@@ -0,0 +1,54 @@
import pandas as pd
from utils.df_utils import *
def load_raw_metadata() -> pd.DataFrame:
"""
Load metadata.csv
:return:
"""
with open("../data_raw/metadata.csv") as f:
return pd.read_csv(f, index_col="file")
def load_raw_measure(filename: str) -> pd.DataFrame:
"""
Load a given measure session
:param filename:
:return:
"""
with open(f"../data_raw/{filename}") as f:
df_experiment = pd.DataFrame([x.split() for x in f.readlines()])
df_experiment.columns = df_experiment.iloc[0]
df_experiment = df_experiment[1:][["#Wave", "#Intensity"]].astype(float)
return df_experiment
def get_raw_measure_with_metadata(file: pd.Series) -> tuple[pd.DataFrame, list[pd.DataFrame]]:
"""
Load a given measure, slice it and give the corresponding metadata
:param file:
:return: dataframe containing the metadata with one row per sliced measure
:return: list of sliced measure dataframes
"""
df_experiment = load_raw_measure(file.name)
sliced_experiments = slice_df(df_experiment, df_experiment[df_experiment["#Wave"].diff() > 0].index)
sliced_experiments = [exp.set_index("#Wave") for exp in sliced_experiments]
file = pd.DataFrame(file).transpose()
metadata = pd.DataFrame(file.loc[file.index.repeat(len(sliced_experiments))])
return metadata, sliced_experiments
def load_data() -> tuple[pd.DataFrame, list[pd.DataFrame]]:
"""
Load all the available data, slice it into individual measures and give the corresponding metadata
:return: dataframe containing the metadata with one row per sliced measure
:return: list of sliced measure dataframes
"""
metadata, sliced_experiments = pd.DataFrame(), []
raw_metadata = load_raw_metadata()
for _, row in raw_metadata.iterrows():
temp_metadata, temp_sliced_experiments = get_raw_measure_with_metadata(row)
metadata = pd.concat([metadata, temp_metadata])
sliced_experiments.extend(temp_sliced_experiments)
return metadata, sliced_experiments

File diff suppressed because one or more lines are too long

88
data/data_processing.py Normal file
View File

@@ -0,0 +1,88 @@
from pybaselines import Baseline
import numpy as np
from math import factorial
def calculate_baseline(measure):
baseline_fitter = Baseline(x_data=measure.index)
bkg_2, params_2 = baseline_fitter.iasls(measure["#Intensity"], lam=10, p=1e-2)
return bkg_2
def adjust_baseline(measure, scale = False):
baseline = calculate_baseline(measure)
measure["#Intensity"] -= baseline
if scale:
measure["#Intensity"] /= baseline.max() - baseline.min()
return measure
def savitzky_golay(y, window_size, order, deriv=0, rate=1):
r"""Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
The Savitzky-Golay filter removes high frequency noise from data.
It has the advantage of preserving the original shape and
features of the signal better than other types of filtering
approaches, such as moving averages techniques.
Parameters
----------
y : array_like, shape (N,)
the values of the time history of the signal.
window_size : int
the length of the window. Must be an odd integer number.
order : int
the order of the polynomial used in the filtering.
Must be less then `window_size` - 1.
deriv: int
the order of the derivative to compute (default = 0 means only smoothing)
Returns
-------
ys : ndarray, shape (N)
the smoothed signal (or it's n-th derivative).
Notes
-----
The Savitzky-Golay is a type of low-pass filter, particularly
suited for smoothing noisy data. The main idea behind this
approach is to make for each point a least-square fit with a
polynomial of high order over a odd-sized window centered at
the point.
Examples
--------
t = np.linspace(-4, 4, 500)
y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape)
ysg = savitzky_golay(y, window_size=31, order=4)
import matplotlib.pyplot as plt
plt.plot(t, y, label='Noisy signal')
plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal')
plt.plot(t, ysg, 'r', label='Filtered signal')
plt.legend()
plt.show()
References
----------
.. [1] A. Savitzky, M. J. E. Golay, Smoothing and Differentiation of
Data by Simplified Least Squares Procedures. Analytical
Chemistry, 1964, 36 (8), pp 1627-1639.
.. [2] Numerical Recipes 3rd Edition: The Art of Scientific Computing
W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery
Cambridge University Press ISBN-13: 9780521880688
"""
try:
window_size = np.abs(int(window_size))
order = np.abs(int(order))
except ValueError:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order+1)
half_window = (window_size -1) // 2
# precompute coefficients
b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)])
m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv)
# pad the signal at the extremes with
# values taken from the signal itself
firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )
lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve(m[::-1], y.T[0], mode='valid')