Source code for hplc_data_analysis.hplc

# %%
from __future__ import annotations
import pathlib as plib
from typing import Literal
import pandas as pd
import matplotlib.patches as mpatches
from matplotlib.axes import Axes
from hplc_data_analysis.pubchem import name_to_properties
from myfigure.myfigure import MyFigure, colors, hatches


[docs] class Project: def __init__( self, folder_path: plib.Path | str, projectname: str | None = None, file_load_skiprows: int = 18, file_load_delimiter: Literal["\t", ",", ";"] = "\t", columns_to_rename_and_keep_in_files: dict[str, str] | None = None, compounds_to_rename_in_files: dict[str, str] | None = None, param_to_axis_label: dict[str, str] | None = None, plot_font: Literal["Dejavu Sans", "Times New Roman"] = "Dejavu Sans", plot_grid: bool = False, ): self.folder_path = plib.Path(folder_path) self.out_path = plib.Path(folder_path, "output") if projectname is None: self.projectname = self.folder_path.parts[-1] else: self.projectname = projectname self.plot_font = plot_font self.plot_grid = plot_grid self.file_load_skiprows = file_load_skiprows self.file_load_delimiter = file_load_delimiter self.files_info_defauls_columns = [ "dilution_factor", "total_sample_conc_in_vial_mg_L", "sample_yield_on_feedstock_basis_fr", ] if columns_to_rename_and_keep_in_files is None: self.columns_to_rename_in_files = { "Name": "comp_name", "R.Time": "retention_time", "Height": "height", "Area": "area", "Conc.": "conc_vial_mg_L", } else: self.columns_to_rename_and_keep_in_files = columns_to_rename_and_keep_in_files self.columns_to_keep_in_files = self.columns_to_rename_in_files.values() if compounds_to_rename_in_files is None: self.compounds_to_rename_in_files = { "3-methyl-(2H)-furan-5-one": "4-methyl-2H-furan-5-one", "4-methyl-(2H)-furan-5-one": "4-methyl-2H-furan-5-one", "2,3-pentanedione": "pentane-2,3-dione", "(2R,3S,4R,5R)-2,3,4,5,6-pentahydroxyhexanoic acid": "gluconic acid", "5-(hydroxymethyl)furan-2-carbaldehyde": "5-HMF", } else: self.compounds_to_rename_in_files = compounds_to_rename_in_files if param_to_axis_label is None: self.param_to_axis_label = { "AdjArea": "Peak Area [-]", "conc_vial_mg_L": "vial conc. [mg/L] (ppm)", "conc_vial_if_undiluted_mg_L": "vial conc. [mg/L] (ppm)", "fraction_of_sample_fr": "mass fraction [g/g$_{sample}$]", "fraction_of_feedstock_fr": "mass fraction [g/g$_{feedstock}$]", } else: self.param_to_axis_label = param_to_axis_label self.acceptable_params: list[str] = list(self.param_to_axis_label.keys()) self.files_info: pd.DataFrame | None = None self.replicates_info: pd.DataFrame | None = None self.samples_info: pd.DataFrame | None = None self.list_of_unique_compounds: list[str] | None = None self.class_code_frac: pd.DataFrame | None = None self.dict_classes_to_codes: dict[str, str] | None = None self.dict_classes_to_mass_fractions: dict[str, float] | None = None self.compounds_properties: pd.DataFrame | None = None self.samples: dict[str, Sample] = {} self.file_dfs: dict[str, pd.DataFrame] = {} self.replicate_dfs: dict[str, pd.DataFrame] = {} self.sample_dfs: dict[str, pd.DataFrame] = {} self.sample_dfs_std: dict[str, pd.DataFrame] = {} self.samplenames: list[str] = [] self.files_reports: dict[str, pd.DataFrame] = {} self.replicates_reports: dict[str, pd.DataFrame] = {} self.samples_reports: dict[str, pd.DataFrame] = {} self.samples_reports_std: dict[str, pd.DataFrame] = {} self.files_aggrreps: dict[str, pd.DataFrame] = {} self.replicates_aggrreps: dict[str, pd.DataFrame] = {} self.samples_aggrreps: dict[str, pd.DataFrame] = {} self.samples_aggrreps_std: dict[str, pd.DataFrame] = {} self.list_of_files_param_reports = [] self.list_of_replicates_param_reports = [] self.list_of_samples_param_reports = [] self.list_of_files_param_aggrreps = [] self.list_of_replicates_param_aggrreps = [] self.list_of_samples_param_aggrreps = []
[docs] def load_files_info(self, update_saved_files_info: bool = True) -> pd.DataFrame: """ Loads the files information from an Excel file and returns it as a DataFrame. :param update_saved_files_info: Specifies whether to update the saved files_info.xlsx file. :type update_saved_files_info: bool, optional :return: The loaded files information as a DataFrame. :rtype: pd.DataFrame """ files_info_path = plib.Path(self.folder_path, "files_info.xlsx") if files_info_path.exists(): files_info = pd.read_excel(files_info_path, engine="openpyxl", index_col="filename") self.files_info = self._add_default_to_files_info(files_info) print("Info: files_info loaded") else: print("Info: files_info not found") self.files_info = self.create_files_info() if update_saved_files_info: self.files_info.to_excel(plib.Path(self.folder_path, "files_info.xlsx")) return self.files_info
[docs] def create_files_info(self, update_saved_files_info: bool = False) -> pd.DataFrame: """ """ filename = [a.parts[-1].split(".")[0] for a in list(self.folder_path.glob("**/*.txt"))] hplc_method = [f.split("_")[0] for f in filename] samplename = [f.split("_")[1] for f in filename] replicatenumber = [f.split("_")[2] for f in filename] replicatename = [s + "_" + r for s, r in zip(samplename, replicatenumber)] files_info_unsorted = pd.DataFrame( index=filename, data={ "hplc_method": hplc_method, "samplename": samplename, "replicatename": replicatename, }, ) files_info = files_info_unsorted.sort_index() files_info.index.name = "filename" self.files_info = self._add_default_to_files_info(files_info) if update_saved_files_info: self.files_info.to_excel(plib.Path(self.folder_path, "files_info.xlsx")) return self.files_info
def _add_default_to_files_info(self, files_info_no_defaults: pd.DataFrame) -> pd.DataFrame: """Add default values to the files_info DataFrame. This method takes a DataFrame `files_info_no_defaults` as input and adds default values to it. The default values are added for any columns specified in `self.files_info_defauls_columns`. Args: files_info_no_defaults (pd.DataFrame): The DataFrame containing files_info without default values. Returns: pd.DataFrame: The DataFrame with default values added. """ for col in self.files_info_defauls_columns: if col not in list(files_info_no_defaults): files_info_no_defaults[col] = 1 return files_info_no_defaults
[docs] def create_replicates_info(self): """Creates a summary 'replicates_info' DataFrame from 'files_info', aggregating data for each replicate, and updates the 'replicates_info' attribute with this summarized data.""" if self.files_info is None: _ = self.load_files_info() self.replicates_info = self.files_info.reset_index().groupby("replicatename").agg(list) self.replicates_info.reset_index(inplace=True) self.replicates_info.set_index("replicatename", drop=True, inplace=True) print("Info: create_replicates_info: replicates_info created") return self.replicates_info
[docs] def create_samples_info(self): """Creates a summary 'samples_info' DataFrame from 'files_info', aggregating data for each sample, and updates the 'samples_info' attribute with this summarized data.""" if self.replicates_info is None: _ = self.create_replicates_info() self.samples_info = self.files_info.reset_index().groupby("samplename").agg(list) self.replicates_info["samplename"] = [a[0] for a in self.replicates_info["samplename"]] self.samples_info.reset_index(inplace=True) self.samples_info.set_index("samplename", drop=True, inplace=True) print("Info: create_samples_info: samples_info created") return self.samples_info
[docs] def create_samples(self): if self.samples_info is None: self.create_samples_info() for samplename in self.samples_info.index.tolist(): sample_info = self.files_info.loc[self.files_info["samplename"] == samplename, :] self.samples[samplename] = Sample(self, samplename, sample_info)
[docs] def create_files_param_report(self, param="conc_vial_mg_L"): """ Create a report that consolidates the values of a specified parameter from different DataFrames, using the union of all indices found in the individual DataFrames. :param param: The parameter to extract from each DataFrame. Defaults to "conc_vial_mg_L". :return: A DataFrame containing the consolidated report. """ if not self.sample_dfs: self.create_samples() # Create a dictionary of Series, each Series named after the file and containing the 'param' values series_dict = { filename: self.file_dfs[filename][param].rename(filename) for filename in self.files_info.index if param in self.file_dfs[filename].columns } # Get the union of all indices from the individual DataFrames rep = pd.concat(series_dict.values(), axis=1, keys=series_dict.keys(), join="outer") # Reindex the DataFrame to include all unique indices, filling missing values with 0 rep = rep.sort_index(key=rep.max(axis=1).get, ascending=False) rep = rep.loc[:, rep.any(axis=0)] # Save and return the report self.files_reports[param] = rep.fillna(0) self.list_of_files_param_reports.append(param) return self.files_reports[param]
[docs] def create_replicates_param_report(self, param="conc_vial_mg_L"): """ Create a report that consolidates the values of a specified parameter from different DataFrames, using the union of all indices found in the individual DataFrames. :param param: The parameter to extract from each DataFrame. Defaults to "conc_vial_mg_L". :return: A DataFrame containing the consolidated report. """ if not self.sample_dfs: self.create_samples() # Create a dictionary of Series, each Series named after the replicate and containing the 'param' values series_dict = { replicatename: self.replicate_dfs[replicatename][param].rename(replicatename) for replicatename in self.replicates_info.index if param in self.replicate_dfs[replicatename].columns } # Get the union of all indices from the individual DataFrames rep = pd.concat(series_dict.values(), axis=1, keys=series_dict.keys(), join="outer") # Sort by the max value in each row, then filter out columns that only contain 0s rep = rep.sort_index(key=rep.max(axis=1).get, ascending=False) rep = rep.loc[:, rep.any(axis=0)] # Save and return the report self.replicates_reports[param] = rep.fillna(0) self.list_of_replicates_param_reports.append(param) return self.replicates_reports[param]
[docs] def create_samples_param_report(self, param="conc_vial_mg_L"): """ Create two reports that consolidate the average and standard deviation of a specified parameter from different sample DataFrames, assuming both sets of DataFrames share the same indices. :param param: The parameter to extract from each DataFrame. Defaults to "conc_vial_mg_L". :return: A tuple of two DataFrames containing the consolidated averages and standard deviations. """ if not self.sample_dfs: self.create_samples() series_dict = { samplename: self.sample_dfs[samplename][param].rename(samplename) for samplename in self.samples_info.index if param in self.sample_dfs[samplename].columns } series_dict_std = { samplename: self.sample_dfs_std[samplename][param].rename(samplename) for samplename in self.samples_info.index if param in self.sample_dfs_std[samplename].columns } # Get the union of all indices from the individual sample DataFrames (assuming indices are the same for std and avg) rep = pd.concat(series_dict.values(), axis=1, keys=series_dict.keys(), join="outer") rep_std = pd.concat( series_dict_std.values(), axis=1, keys=series_dict_std.keys(), join="outer" ) # Populate the DataFrames with values # Sort by the max value in each row and filter out columns that only contain 0s in the average report rep = rep.sort_index(key=rep.max(axis=1).get, ascending=False) rep = rep.loc[:, rep.any(axis=0)] # Ensure the standard deviation DataFrame aligns with the average DataFrame rep_std = rep_std.reindex_like(rep) # Save and return the reports self.samples_reports[param] = rep.fillna(0) self.samples_reports_std[param] = rep_std self.list_of_samples_param_reports.append(param) return self.samples_reports[param], self.samples_reports_std[param]
[docs] def load_class_code_frac(self): """ """ class_code_frac_path = plib.Path(self.folder_path, "classifications_codes_fractions.xlsx") if class_code_frac_path.exists(): self.class_code_frac = pd.read_excel(class_code_frac_path) else: raise FileNotFoundError('the file "classifications_codes_fractions.xlsx" was not found') all_classes = self.class_code_frac.classes.tolist() codes = self.class_code_frac.codes.tolist() # list of code for each class mfs = self.class_code_frac.mfs.tolist() # list of mass fraction of each class self.dict_classes_to_codes = dict(zip(all_classes, codes)) # dictionaries self.dict_classes_to_mass_fractions = dict(zip(all_classes, mfs)) # dictionaries return self.class_code_frac
[docs] def load_compounds_properties(self): """Attempts to load the 'compounds_properties.xlsx' file containing physical and chemical properties of compounds. If not found, it creates a new properties DataFrame and updates the 'compounds_properties_created' attribute.""" compounds_properties_path = plib.Path(self.folder_path, "compounds_properties.xlsx") if compounds_properties_path.exists(): self.compounds_properties = pd.read_excel( compounds_properties_path, index_col="comp_name" ) print("Info: compounds_properties loaded") else: print("Warning: compounds_properties.xlsx not found, creating it") self.compounds_properties = self.create_compounds_properties() return self.compounds_properties
[docs] def create_compounds_properties(self, update_saved_files_info: bool = True): """ """ if self.list_of_unique_compounds is None: self.create_list_of_unique_compounds() if self.class_code_frac is None: self.class_code_frac = self.load_class_code_frac() self.compounds_properties = pd.DataFrame() for name in self.list_of_unique_compounds: self.compounds_properties = name_to_properties( comp_name=name, dict_classes_to_codes=self.dict_classes_to_codes, dict_classes_to_mass_fractions=self.dict_classes_to_mass_fractions, df=self.compounds_properties, ) self.compounds_properties.index.name = "comp_name" # save db in the project folder in the input if update_saved_files_info: self.compounds_properties.to_excel( plib.Path(self.folder_path, "compounds_properties.xlsx") ) print("Info: create_compounds_properties: compounds_properties created and saved") return self.compounds_properties
[docs] def create_list_of_unique_compounds(self) -> list[str]: if len(self.sample_dfs) == 0: self.create_samples() all_compounds = pd.concat([df for df in self.sample_dfs.values()]) self.list_of_unique_compounds = pd.Index(all_compounds.index.unique()) return self.list_of_unique_compounds
[docs] def create_files_param_aggrrep(self, param="conc_vial_mg_L"): """Aggregates compound concentration data by functional group for each parameter across all FILES, providing a summarized view of functional group concentrations. This aggregation facilitates the understanding of functional group distribution across FILES.""" print("Info: create_param_aggrrep: ", param) if param not in self.acceptable_params: raise ValueError(f"{param = } is not an acceptable param") if param not in self.list_of_files_param_reports: self.create_files_param_report(param) if self.compounds_properties is None: self.load_compounds_properties() # fg = functional groups, mf = mass fraction filenames = self.files_info.index.tolist() _all_comps = self.files_reports[param].index.tolist() cols_with_fg_mf_labs = list(self.compounds_properties) fg_mf_labs = [ c for c in cols_with_fg_mf_labs if c.startswith("fg_mf_") if c != "fg_mf_total" ] fg_labs = [c[6:] for c in fg_mf_labs] # create a df with iupac name index and fg_mf columns (underiv and deriv) all_comps_df = self.compounds_properties all_comps_df = all_comps_df[~all_comps_df.index.duplicated(keep="first")] fg_mf_all = pd.DataFrame(index=_all_comps, columns=fg_mf_labs) for idx in fg_mf_all.index.tolist(): fg_mf_all.loc[idx, fg_mf_labs] = all_comps_df.loc[idx, fg_mf_labs] # create the aggregated dataframes and compute aggregated results aggrrep = pd.DataFrame(columns=filenames, index=fg_labs, dtype="float") aggrrep.index.name = param # is the parameter for col in filenames: list_iupac = self.files_reports[param].index signal = self.files_reports[param].loc[:, col].values for fg, fg_mf in zip(fg_labs, fg_mf_labs): # each compound contributes to the cumulative sum of each # functional group for the based on the mass fraction it has # of that functional group (fg_mf act as weights) # if fg_mf in subrep: multiply signal for weight and sum # to get aggregated weights = fg_mf_all.loc[list_iupac, fg_mf].astype(signal.dtype) aggrrep.loc[fg, col] = (signal * weights).sum() aggrrep = aggrrep.loc[(aggrrep != 0).any(axis=1), :] # drop rows with only 0 aggrrep = aggrrep.sort_index(key=aggrrep[filenames].max(1).get, ascending=False) self.files_aggrreps[param] = aggrrep self.list_of_files_param_aggrreps.append(param) return aggrrep
[docs] def create_replicates_param_aggrrep(self, param="conc_vial_mg_L"): """Aggregates compound concentration data by functional group for each parameter across all FILES, providing a summarized view of functional group concentrations. This aggregation facilitates the understanding of functional group distribution across FILES.""" print("Info: create_param_aggrrep: ", param) if param not in self.acceptable_params: raise ValueError(f"{param = } is not an acceptable param") if param not in self.list_of_replicates_param_reports: self.create_replicates_param_report(param) if self.compounds_properties is None: self.load_compounds_properties() # fg = functional groups, mf = mass fraction replicatenames = self.replicates_info.index.tolist() _all_comps = self.replicates_reports[param].index.tolist() cols_with_fg_mf_labs = list(self.compounds_properties) fg_mf_labs = [ c for c in cols_with_fg_mf_labs if c.startswith("fg_mf_") if c != "fg_mf_total" ] fg_labs = [c[6:] for c in fg_mf_labs] # create a df with iupac name index and fg_mf columns (underiv and deriv) comps_df = self.compounds_properties all_comps_df = comps_df all_comps_df = all_comps_df[~all_comps_df.index.duplicated(keep="first")] fg_mf_all = pd.DataFrame(index=_all_comps, columns=fg_mf_labs) for idx in fg_mf_all.index.tolist(): fg_mf_all.loc[idx, fg_mf_labs] = all_comps_df.loc[idx, fg_mf_labs] # create the aggregated dataframes and compute aggregated results aggrrep = pd.DataFrame(columns=replicatenames, index=fg_labs, dtype="float") aggrrep.index.name = param # is the parameter aggrrep.fillna(0, inplace=True) for col in replicatenames: list_iupac = self.replicates_reports[param].index signal = self.replicates_reports[param].loc[:, col].values for fg, fg_mf in zip(fg_labs, fg_mf_labs): # each compound contributes to the cumulative sum of each # functional group for the based on the mass fraction it has # of that functional group (fg_mf act as weights) # if fg_mf in subrep: multiply signal for weight and sum # to get aggregated weights = fg_mf_all.loc[list_iupac, fg_mf].astype(signal.dtype) aggrrep.loc[fg, col] = (signal * weights).sum() aggrrep = aggrrep.loc[(aggrrep != 0).any(axis=1), :] # drop rows with only 0 aggrrep = aggrrep.sort_index(key=aggrrep[replicatenames].max(1).get, ascending=False) self.replicates_aggrreps[param] = aggrrep self.list_of_replicates_param_aggrreps.append(param) return aggrrep
[docs] def create_samples_param_aggrrep(self, param: str = "conc_vial_mg_L"): print(f"Info: create_samples_param_aggrrep: {param = }") if param not in self.acceptable_params: raise ValueError(f"{param = } is not an acceptable param") if param not in self.list_of_replicates_param_aggrreps: self.create_replicates_param_aggrrep(param) replicate_to_sample_rename = dict( zip(self.replicates_info.index.tolist(), self.replicates_info["samplename"]) ) replicateagg = self.replicates_aggrreps[param].copy() replicateagg.rename(columns=replicate_to_sample_rename, inplace=True) self.samples_aggrreps[param] = replicateagg.T.groupby(by=replicateagg.columns).mean().T self.samples_aggrreps_std[param] = replicateagg.T.groupby(by=replicateagg.columns).std().T self.list_of_samples_param_aggrreps.append(param) return self.samples_aggrreps[param], self.samples_aggrreps_std[param]
[docs] def save_files_samples_reports(self): """""" for subfolder in [ "", "files", "replicates", "samples", "files_reports", "files_aggrreps", "replicates_reports", "replicates_aggrreps", "samples_reports", "samples_aggrreps", ]: plib.Path(self.out_path, subfolder).mkdir(parents=True, exist_ok=True) out_path = self.out_path # save files_info and samples_info to the general output folder if self.files_info is not None: self.files_info.to_excel(plib.Path(out_path, "files_info.xlsx")) if self.samples_info is not None: self.samples_info.to_excel(plib.Path(out_path, "samples_info.xlsx")) if self.file_dfs: for filename, df in self.file_dfs.items(): df.to_excel(plib.Path(out_path, "files", f"{filename}.xlsx")) if self.replicate_dfs: for replicatename, df in self.replicate_dfs.items(): df.to_excel(plib.Path(out_path, "replicates", f"{replicatename}.xlsx")) if self.sample_dfs: for samplename, df in self.sample_dfs.items(): df.to_excel(plib.Path(out_path, "samples", f"{samplename}.xlsx")) for samplename, df in self.sample_dfs_std.items(): df.to_excel(plib.Path(out_path, "samples", f"{samplename}_std.xlsx")) if self.files_reports: for param, df in self.files_reports.items(): df.to_excel(plib.Path(out_path, "files_reports", f"report_files_{param}.xlsx")) if self.files_aggrreps: for param, df in self.files_aggrreps.items(): df.to_excel( plib.Path(self.out_path, "files_aggrreps", f"aggrrep_files_{param}.xlsx") ) if self.replicates_reports: for param, df in self.replicates_reports.items(): df.to_excel( plib.Path(out_path, "replicates_reports", f"report_replicates_{param}.xlsx") ) if self.replicates_aggrreps: for param, df in self.replicates_aggrreps.items(): df.to_excel( plib.Path( self.out_path, "replicates_aggrreps", f"aggrrep_replicates_{param}.xlsx" ) ) if self.samples_reports: for param, df in self.samples_reports.items(): df.to_excel( plib.Path(self.out_path, "samples_reports", f"report_samples_{param}.xlsx") ) for param, df in self.samples_reports_std.items(): df.to_excel( plib.Path( self.out_path, "samples_reports", f"report_samples_{param}_std.xlsx", ) ) if self.samples_aggrreps: for param, df in self.samples_aggrreps.items(): df.to_excel( plib.Path( self.out_path, "samples_aggrreps", f"aggrrep_samples_{param}.xlsx", ) ) for param, df in self.samples_aggrreps_std.items(): df.to_excel( plib.Path( self.out_path, "samples_aggrreps", f"aggrrep_samples_{param}_std.xlsx", ) )
[docs] def plot_report( self, report_or_aggrrep: Literal["report", "aggrrep"] = "report", files_replicates_or_samples: Literal["files", "replicates", "samples"] = "samples", param: str = "conc_vial_mg_L", names_to_keep: list[str] | None = None, labels: list[str] | None = None, show_total_in_twinx: bool = False, y_axis_min_threshold: float | None = None, item_to_color_to_hatch: pd.DataFrame | None = None, yt_sum_label: str = "total\n(right axis)", remove_insignificant_values: bool = False, **kwargs, ) -> MyFigure: """ """ if param not in self.acceptable_params: raise ValueError(f"{param = } is not an acceptable param") out_path = plib.Path(self.out_path, "plots") out_path.mkdir(parents=True, exist_ok=True) if report_or_aggrrep == "report": # then use compounds reports if files_replicates_or_samples == "files": if param not in self.list_of_files_param_reports: self.create_files_param_report(param) df_ave = self.files_reports[param].T df_std = pd.DataFrame() if files_replicates_or_samples == "replicates": if param not in self.list_of_replicates_param_reports: self.create_replicates_param_report(param) df_ave = self.replicates_reports[param].T df_std = pd.DataFrame() elif files_replicates_or_samples == "samples": if param not in self.list_of_samples_param_reports: self.create_samples_param_report(param) df_ave = self.samples_reports[param].T df_std = self.samples_reports_std[param].T else: # use aggregated reports if files_replicates_or_samples == "files": if param not in self.list_of_files_param_aggrreps: self.create_files_param_aggrrep(param) df_ave = self.files_aggrreps[param].T df_std = pd.DataFrame() if files_replicates_or_samples == "replicates": if param not in self.list_of_replicates_param_aggrreps: self.create_replicates_param_aggrrep(param) df_ave = self.replicates_aggrreps[param].T df_std = pd.DataFrame() elif files_replicates_or_samples == "samples": if param not in self.list_of_samples_param_aggrreps: self.create_samples_param_aggrrep(param) df_ave = self.samples_aggrreps[param].T df_std = self.samples_aggrreps_std[param].T if names_to_keep is not None: df_ave = df_ave.loc[names_to_keep, :].copy() if files_replicates_or_samples == "samples": df_std = df_std.loc[names_to_keep, :].copy() if labels is not None: df_ave.index = labels if files_replicates_or_samples == "samples": df_std.index = labels if y_axis_min_threshold is not None: df_ave = df_ave.loc[:, (df_ave > y_axis_min_threshold).any(axis=0)].copy() if files_replicates_or_samples == "samples": df_std = df_std.loc[:, df_ave.columns].copy() if item_to_color_to_hatch is not None: # specific color and hatches to each fg plot_colors = [item_to_color_to_hatch.loc[item, "clr"] for item in df_ave.columns] plot_hatches = [item_to_color_to_hatch.loc[item, "htch"] for item in df_ave.columns] else: # no specific colors and hatches specified plot_colors = colors plot_hatches = hatches if df_std.isna().all().all() or df_std.empty: std_available = False else: std_available = True if remove_insignificant_values: if std_available: mask = (df_ave.abs() > df_std.abs()) | df_std.isna() df_ave = df_ave[mask] df_std = df_std[mask] default_kwargs = { "filename": report_or_aggrrep + files_replicates_or_samples + param, "out_path": out_path, "height": 4, "width": 4, "grid": self.plot_grid, "text_font": self.plot_font, "y_lab": self.param_to_axis_label[param], "yt_lab": self.param_to_axis_label[param], "twinx": True if show_total_in_twinx else False, "auto_apply_hatches_to_bars": False, } # Update kwargs with the default key-value pairs if the key is not present in kwargs kwargs = {**default_kwargs, **kwargs} myfig = MyFigure(rows=1, cols=1, **kwargs) df_ave.plot( ax=myfig.axs[0], kind="bar", width=0.9, edgecolor="k", legend=False, capsize=3, color=plot_colors, yerr=df_std if std_available else None, ) apply_hatches_to_ax(myfig.axs[0], plot_hatches) if show_total_in_twinx: myfig.axts[0].scatter( df_ave.index, df_ave.sum(axis=1).values, color="k", linestyle="None", edgecolor="k", facecolor="grey", s=100, label=yt_sum_label, alpha=0.5, ) if std_available: myfig.axts[0].errorbar( df_ave.index, df_ave.sum(axis=1).values, df_std.sum(axis=1).values, capsize=3, linestyle="None", color="grey", ecolor="k", label="_nolegend_", ) # Identify new patches added by the DataFrame plot myfig.save_figure() return myfig
[docs] class Sample: def __init__( self, project: Project, samplename: str, sample_info: pd.DataFrame, ): # store the sample in the project self.projectname = project.projectname # prject defaults unless specified self.file_load_skiprows = project.file_load_skiprows self.file_load_delimiter = project.file_load_delimiter self.files_info_defauls_columns = project.files_info_defauls_columns self.columns_to_keep_in_files = project.columns_to_keep_in_files self.columns_to_rename_in_files = project.columns_to_rename_in_files self.compounds_to_rename_in_files = project.compounds_to_rename_in_files self.folder_path = project.folder_path self.samplename = samplename self.sample_info = sample_info self.files: dict[str, pd.DataFrame] = {} self.replicates: dict[str, pd.DataFrame] = {} self.replicate_files: dict[str, pd.DataFrame] = {} for replicatename in self.sample_info["replicatename"].tolist(): replicate_info = self.sample_info.loc[ self.sample_info["replicatename"] == replicatename, : ] _files = [] for filename in replicate_info.index.tolist(): file = self.load_single_file(filename) self.files[filename] = file project.file_dfs[filename] = self.files[filename] _files.append(file) self.replicates[replicatename] = self.create_replicate_from_files(_files, replicatename) project.replicate_dfs[replicatename] = self.replicates[replicatename] ave, std = self.create_ave_std_from_replicates(list(self.replicates.values())) project.sample_dfs[self.samplename] = ave project.sample_dfs_std[self.samplename] = std
[docs] def load_single_file(self, filename: str) -> pd.DataFrame: file: pd.DataFrame = pd.read_csv( plib.Path(self.folder_path, filename + ".txt"), delimiter=self.file_load_delimiter, index_col=0, skiprows=self.file_load_skiprows, ) file.rename(self.columns_to_rename_in_files, inplace=True, axis="columns") file = file.loc[file["comp_name"].notna(), self.columns_to_keep_in_files] file.set_index("comp_name", inplace=True) file.rename(self.compounds_to_rename_in_files, inplace=True) if any(file.index.duplicated(keep="first")): duplicates = file[file.index.duplicated(keep=False)] file = file[~file.index.duplicated(keep="first")] print(f"WARNING: duplicates in {filename = }") print(f"{duplicates = }, first instance has been kept") file = file.loc[file["conc_vial_mg_L"] > 0, :] file["conc_vial_if_undiluted_mg_L"] = ( file["conc_vial_mg_L"] * self.sample_info.loc[filename, "dilution_factor"] ) file["fraction_of_sample_fr"] = ( file["conc_vial_mg_L"] / self.sample_info.loc[filename, "total_sample_conc_in_vial_mg_L"] ) file["fraction_of_feedstock_fr"] = ( file["fraction_of_sample_fr"] * self.sample_info.loc[filename, "sample_yield_on_feedstock_basis_fr"] ) file.index.name = filename return file
[docs] def create_replicate_from_files(self, files_to_merge, replicatename): """ """ replicate = pd.concat(files_to_merge, join="outer") replicate = replicate.groupby(replicate.index).max() replicate.index.name = replicatename return replicate
[docs] def create_ave_std_from_replicates(self, replicates) -> tuple[pd.DataFrame]: # Align indices and columns of all DataFrames to the first replicate aligned_dfs = [df.align(replicates[0], join="outer", axis=0)[0] for df in replicates] aligned_dfs = [df.align(replicates[0], join="outer", axis=1)[0] for df in aligned_dfs] # Fill missing values with 0 in each DataFrame filled_dfs = [df.fillna(0) for df in aligned_dfs] # Calculate the average and standard deviation self.ave = pd.concat(filled_dfs).groupby(level=0).mean() self.std = pd.concat(filled_dfs).groupby(level=0).std() return self.ave, self.std
[docs] def apply_hatches_to_ax(ax: Axes, hatches_list: list[str]) -> None: """ Apply hatch patterns to bars in the bar plots of each subplot. This method iterates over all subplots and applies predefined hatch patterns to each bar, enhancing the visual distinction between bars, especially in black and white printouts. """ # Check if the plot is a bar plot bars = [b for b in ax.patches if isinstance(b, mpatches.Rectangle)] # If there are no bars, return immediately if not bars: return num_groups = len(ax.get_xticks(minor=False)) # Determine the number of bars in each group bars_in_group = len(bars) // num_groups patterns = hatches_list[:bars_in_group] # set hatch patterns in correct order plot_hatches_list = [] # list for hatches in the order of the bars for h in patterns: # loop over patterns to create bar-ordered hatches for _ in range(int(len(bars) / len(patterns))): plot_hatches_list.append(h) # loop over bars and hatches to set hatches in correct order for b, hatch in zip(bars, plot_hatches_list): b.set_hatch(hatch) b.set_edgecolor("k")
# %% # if __file__ == "main":