106 lines
4.1 KiB
Python
106 lines
4.1 KiB
Python
from pybaselines import Baseline
|
|
from sklearn.preprocessing import StandardScaler
|
|
from scipy.signal import savgol_filter
|
|
import pandas as pd
|
|
|
|
|
|
def interpolate_spectrum(exp, max_wave=1927, min_wave=181):
|
|
"""
|
|
Interpolate spectra at integer wavelengths in specified wavelength range.
|
|
:param exp: one measure
|
|
:param max_wave: maximal wavelength
|
|
:param min_wave: minimal wavelength
|
|
:return: the interpolated experiment
|
|
"""
|
|
result = pd.DataFrame(columns=["#Intensity"])
|
|
for i in range(min_wave, max_wave, 1):
|
|
for k in range(len(exp.index)-1):
|
|
if exp.index[k] == i:
|
|
result.loc[i] = exp["#Intensity"].iloc[k]
|
|
break
|
|
if exp.index[k] > i > exp.index[k + 1]:
|
|
result.loc[i] = exp["#Intensity"].iloc[k] - (
|
|
exp["#Intensity"].iloc[k] - exp["#Intensity"].iloc[k + 1]) / (
|
|
exp.index[k] - exp.index[k + 1]) * (exp.index[k] - i)
|
|
break
|
|
else:
|
|
result.loc[i] = 0
|
|
return result
|
|
|
|
|
|
def interpolate_experiments(sliced_experiments, max_wave=1927, min_wave=181):
|
|
result = []
|
|
for i, exp in enumerate(sliced_experiments):
|
|
result.append(interpolate_spectrum(exp, max_wave, min_wave))
|
|
print(i+1, "/", len(sliced_experiments))
|
|
return result
|
|
|
|
|
|
def calculate_baseline(x_data, measure, lam=10, p=1e-2):
|
|
baseline_fitter = Baseline(x_data=x_data)
|
|
bkg_2, params_2 = baseline_fitter.iasls(measure, lam=lam, p=p)
|
|
return bkg_2
|
|
|
|
|
|
def adjust_baseline(x_data, measure, lam=10, p=1e-2):
|
|
baseline = calculate_baseline(x_data, measure, lam=lam, p=p)
|
|
return measure - baseline
|
|
|
|
|
|
def adjust_all_baselines(measures, lam=10, p=1e-2):
|
|
result = measures.copy(deep=True)
|
|
for index, row in result.iterrows():
|
|
result.iloc[index] = adjust_baseline(measures.columns.astype(int), row, lam=lam, p=p)
|
|
return result
|
|
|
|
|
|
def scale_experiments(experiments):
|
|
result = experiments.copy(deep=True)
|
|
trans = StandardScaler()
|
|
scaled = trans.fit_transform(experiments.transpose()).T
|
|
result.iloc[:, :] = scaled
|
|
return result
|
|
|
|
|
|
def apply_smoothing(experiment, window_length=7, polyorder=3):
|
|
return savgol_filter(experiment, window_length, polyorder)
|
|
|
|
|
|
def smooth_experiments(experiments, window_length=7, polyorder=3):
|
|
result = experiments.copy(deep=True)
|
|
for index, row in result.iterrows():
|
|
result.iloc[index, :] = apply_smoothing(row, window_length=window_length, polyorder=polyorder)
|
|
return result
|
|
|
|
|
|
def categorize_metadata(metadata: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
truth = pd.DataFrame(pd.Categorical(metadata["strain"]).codes, index=metadata.index)
|
|
encoded = pd.get_dummies(data = metadata, columns = ["phase", "substrate", "confocalhigh"], dtype=int)
|
|
encoded.drop(columns=["replica", "strain"], inplace=True)
|
|
return truth, encoded
|
|
|
|
|
|
def process_experiments(experiments: pd.DataFrame, baseline_lam: int = 10, baseline_p: float = 1e-2,
|
|
smooth_window_length: int = 7, smooth_polyorder: int = 3) -> pd.DataFrame:
|
|
experiments = adjust_all_baselines(experiments, lam=baseline_lam, p=baseline_p)
|
|
experiments = scale_experiments(experiments)
|
|
experiments = smooth_experiments(experiments, window_length=smooth_window_length, polyorder=smooth_polyorder)
|
|
return experiments
|
|
|
|
|
|
def process_train_test(params: dict, experiments_train: pd.DataFrame, metadata_train: pd.DataFrame, experiments_test: pd.DataFrame = None, metadata_test: pd.DataFrame = None, scale: bool=True) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
processed_train = process_experiments(experiments_train, **params)
|
|
X_train = pd.concat([metadata_train, processed_train], axis=1)
|
|
if experiments_test is not None:
|
|
processed_test = process_experiments(experiments_test, **params)
|
|
X_test = pd.concat([metadata_test, processed_test], axis=1)
|
|
else:
|
|
X_test = None
|
|
if scale:
|
|
scaler = StandardScaler()
|
|
scaler.fit(X_train)
|
|
X_train = scaler.transform(X_train)
|
|
if X_test is not None:
|
|
X_test = scaler.transform(X_test)
|
|
return X_train, X_test
|