Files
RamanClassifier/data/data_processing.py
2024-04-29 15:30:39 +02:00

106 lines
4.1 KiB
Python

from pybaselines import Baseline
from sklearn.preprocessing import StandardScaler
from scipy.signal import savgol_filter
import pandas as pd
def interpolate_spectrum(exp, max_wave=1927, min_wave=181):
"""
Interpolate spectra at integer wavelengths in specified wavelength range.
:param exp: one measure
:param max_wave: maximal wavelength
:param min_wave: minimal wavelength
:return: the interpolated experiment
"""
result = pd.DataFrame(columns=["#Intensity"])
for i in range(min_wave, max_wave, 1):
for k in range(len(exp.index)-1):
if exp.index[k] == i:
result.loc[i] = exp["#Intensity"].iloc[k]
break
if exp.index[k] > i > exp.index[k + 1]:
result.loc[i] = exp["#Intensity"].iloc[k] - (
exp["#Intensity"].iloc[k] - exp["#Intensity"].iloc[k + 1]) / (
exp.index[k] - exp.index[k + 1]) * (exp.index[k] - i)
break
else:
result.loc[i] = 0
return result
def interpolate_experiments(sliced_experiments, max_wave=1927, min_wave=181):
result = []
for i, exp in enumerate(sliced_experiments):
result.append(interpolate_spectrum(exp, max_wave, min_wave))
print(i+1, "/", len(sliced_experiments))
return result
def calculate_baseline(x_data, measure, lam=10, p=1e-2):
baseline_fitter = Baseline(x_data=x_data)
bkg_2, params_2 = baseline_fitter.iasls(measure, lam=lam, p=p)
return bkg_2
def adjust_baseline(x_data, measure, lam=10, p=1e-2):
baseline = calculate_baseline(x_data, measure, lam=lam, p=p)
return measure - baseline
def adjust_all_baselines(measures, lam=10, p=1e-2):
result = measures.copy(deep=True)
for index, row in result.iterrows():
result.iloc[index] = adjust_baseline(measures.columns.astype(int), row, lam=lam, p=p)
return result
def scale_experiments(experiments):
result = experiments.copy(deep=True)
trans = StandardScaler()
scaled = trans.fit_transform(experiments.transpose()).T
result.iloc[:, :] = scaled
return result
def apply_smoothing(experiment, window_length=7, polyorder=3):
return savgol_filter(experiment, window_length, polyorder)
def smooth_experiments(experiments, window_length=7, polyorder=3):
result = experiments.copy(deep=True)
for index, row in result.iterrows():
result.iloc[index, :] = apply_smoothing(row, window_length=window_length, polyorder=polyorder)
return result
def categorize_metadata(metadata: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
truth = pd.DataFrame(pd.Categorical(metadata["strain"]).codes, index=metadata.index)
encoded = pd.get_dummies(data = metadata, columns = ["phase", "substrate", "confocalhigh"], dtype=int)
encoded.drop(columns=["replica", "strain"], inplace=True)
return truth, encoded
def process_experiments(experiments: pd.DataFrame, baseline_lam: int = 10, baseline_p: float = 1e-2,
smooth_window_length: int = 7, smooth_polyorder: int = 3) -> pd.DataFrame:
experiments = adjust_all_baselines(experiments, lam=baseline_lam, p=baseline_p)
experiments = scale_experiments(experiments)
experiments = smooth_experiments(experiments, window_length=smooth_window_length, polyorder=smooth_polyorder)
return experiments
def process_train_test(params: dict, experiments_train: pd.DataFrame, metadata_train: pd.DataFrame, experiments_test: pd.DataFrame = None, metadata_test: pd.DataFrame = None, scale: bool=True) -> tuple[pd.DataFrame, pd.DataFrame]:
processed_train = process_experiments(experiments_train, **params)
X_train = pd.concat([metadata_train, processed_train], axis=1)
if experiments_test is not None:
processed_test = process_experiments(experiments_test, **params)
X_test = pd.concat([metadata_test, processed_test], axis=1)
else:
X_test = None
if scale:
scaler = StandardScaler()
scaler.fit(X_train)
X_train = scaler.transform(X_train)
if X_test is not None:
X_test = scaler.transform(X_test)
return X_train, X_test