62 lines
2.3 KiB
Python
62 lines
2.3 KiB
Python
import pandas as pd
|
|
import os
|
|
from utils.df_utils import slice_df
|
|
|
|
|
|
def load_raw_metadata() -> pd.DataFrame:
|
|
"""
|
|
Load metadata.csv
|
|
:return:
|
|
"""
|
|
with open("../data_raw/metadata.csv") as f:
|
|
return pd.read_csv(f, index_col="file")
|
|
|
|
|
|
def load_raw_measure(filename: str) -> pd.DataFrame:
|
|
"""
|
|
Load a given measure session
|
|
:param filename:
|
|
:return:
|
|
"""
|
|
with open(f"../data_raw/{filename}") as f:
|
|
df_experiment = pd.DataFrame([x.split() for x in f.readlines()])
|
|
df_experiment.columns = df_experiment.iloc[0]
|
|
df_experiment = df_experiment[1:][["#Wave", "#Intensity"]].astype(float)
|
|
return df_experiment
|
|
|
|
|
|
def get_raw_measure_with_metadata(file: pd.Series) -> tuple[pd.DataFrame, list[pd.DataFrame]]:
|
|
"""
|
|
Load a given measure, slice it and give the corresponding metadata
|
|
:param file:
|
|
:return: dataframe containing the metadata with one row per sliced measure
|
|
:return: list of sliced measure dataframes
|
|
"""
|
|
df_experiment = load_raw_measure(file.name)
|
|
sliced_experiments = slice_df(df_experiment, df_experiment[df_experiment["#Wave"].diff() > 0].index)
|
|
sliced_experiments = [exp.set_index("#Wave") for exp in sliced_experiments]
|
|
file = pd.DataFrame(file).transpose()
|
|
metadata = pd.DataFrame(file.loc[file.index.repeat(len(sliced_experiments))])
|
|
return metadata, sliced_experiments
|
|
|
|
|
|
def load_raw_data() -> tuple[pd.DataFrame, list[pd.DataFrame]]:
|
|
"""
|
|
Load all the available data, slice it into individual measures and give the corresponding metadata
|
|
:return: dataframe containing the metadata with one row per sliced measure
|
|
:return: list of sliced measure dataframes
|
|
"""
|
|
metadata, sliced_experiments = pd.DataFrame(), []
|
|
raw_metadata = load_raw_metadata()
|
|
for _, row in raw_metadata.iterrows():
|
|
temp_metadata, temp_sliced_experiments = get_raw_measure_with_metadata(row)
|
|
metadata = pd.concat([metadata, temp_metadata])
|
|
sliced_experiments.extend(temp_sliced_experiments)
|
|
return metadata, sliced_experiments
|
|
|
|
|
|
def load_data(name: str, path: os.path = os.path.join("data")) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
metadata = pd.read_csv(os.path.join(path, name, "metadata.csv"))
|
|
experiments = pd.read_csv(os.path.join(path, name, "experiments.csv"), dtype=float)
|
|
return metadata, experiments
|