import pathlib as plib
import numpy as np
import pandas as pd
import ele
import pubchempy as pcp
from rdkit import Chem
from rdkit.Chem import DataStructs
from rdkit.Chem import rdmolops
from rdkit.Chem.AllChem import ( # pylint: disable=no-name-in-module
GetMorganFingerprintAsBitVect,
)
from hplc_data_analysis.fragmenter import Fragmenter
[docs]
def get_compound_from_pubchempy(comp_name: str) -> pcp.Compound:
if not isinstance(comp_name, str) or comp_name.isspace():
print(f"WARNING get_compound_from_pubchempy got an invalid {comp_name =}")
return None
cond = True
while cond: # to deal with HTML issues on server sides (timeouts)
try:
# comp contains all info about the chemical from pubchem
try:
comp_inside_list = pcp.get_compounds(comp_name, "name")
except ValueError:
print(f"{comp_name = }")
return None
if comp_inside_list:
comp = comp_inside_list[0]
else:
print(
f"WARNING: name_to_properties {comp_name=} does not find an entry in pcp",
)
return None
cond = False
except pcp.PubChemHTTPError: # timeout error, simply try again
print("Caught: pcp.PubChemHTTPError (keep trying)")
return comp
def _order_columns_in_compounds_properties(
unsorted_df: pd.DataFrame | None,
) -> pd.DataFrame | None:
if unsorted_df is None:
return None
priority_cols: list[str] = [
"iupac_name",
"underiv_comp_name",
"molecular_formula",
"canonical_smiles",
"molecular_weight",
"xlogp",
]
# Define a custom sort key function
def sort_key(col):
if col in priority_cols:
return (-1, priority_cols.index(col))
if col.startswith("el_mf"):
return (2, col)
elif col.startswith("el_"):
return (1, col)
elif col.startswith("fg_mf_unclassified"):
return (5, col)
elif col.startswith("fg_mf"):
return (4, col)
elif col.startswith("fg_"):
return (3, col)
else:
return (0, col)
# Sort columns using the custom key
sorted_columns = sorted(unsorted_df.columns, key=sort_key)
sorted_df = unsorted_df.reindex(sorted_columns, axis=1)
sorted_df.index.name = "comp_name"
# Reindex the DataFrame with the sorted columns
return sorted_df
[docs]
def name_to_properties(
comp_name: str,
dict_classes_to_codes: dict[str:str],
dict_classes_to_mass_fractions: dict[str:float],
df: pd.DataFrame = pd.DataFrame(),
precision_sum_elements: float = 0.05,
precision_sum_functional_group: float = 0.05,
) -> pd.DataFrame:
"""
used to retrieve chemical properties of the compound indicated by the
comp_name and to store those properties in the df
Parameters
----------
GCname : str
name from GC, used as a unique key.
search_name : str
name to be used to search on pubchem.
df : pd.DataFrame
that contains all searched compounds.
df_class_code_frac : pd.DataFrame
contains the list of functional group names, codes to be searched
and the weight fraction of each one to automatically calculate the
mass fraction of each compounds for each functional group.
Classes are given as smarts and are looked into the smiles of the comp.
Returns
-------
df : pd.DataFrame
updated dataframe with the searched compound.
CompNotFound : str
if GCname did not yield anything CompNotFound=GCname.
"""
if not isinstance(df, pd.DataFrame):
raise TypeError("The argument df must be a pd.DataFrame.")
if not isinstance(comp_name, str) or comp_name.isspace():
return _order_columns_in_compounds_properties(df)
if comp_name in df.index.tolist():
return _order_columns_in_compounds_properties(df)
comp = get_compound_from_pubchempy(comp_name)
if comp is None:
df.loc[comp_name, "iupac_name"] = "unidentified"
return _order_columns_in_compounds_properties(df)
try:
valid_iupac_name = comp.iupac_name.lower()
except AttributeError: # iupac_name not give
valid_iupac_name = comp_name.lower()
df.loc[comp_name, "iupac_name"] = valid_iupac_name
df.loc[comp_name, "molecular_formula"] = comp.molecular_formula
df.loc[comp_name, "canonical_smiles"] = comp.canonical_smiles
df.loc[comp_name, "molecular_weight"] = float(comp.molecular_weight)
try:
df.loc[comp_name, "xlogp"] = float(comp.xlogp)
except TypeError: # float() argument must be a string or a real number, not 'NoneType'
df.loc[comp_name, "xlogp"] = np.nan
elements = set(comp.to_dict()["elements"])
el_dict = {}
el_mf_dict = {}
for el in elements:
el_count = comp.to_dict()["elements"].count(el)
el_mass = ele.element_from_symbol(el).mass
# Using similar logic as in the fg_dict example
if el not in el_dict:
el_dict[el] = 0
el_mf_dict[el] = 0.0
el_dict[el] += int(el_count)
el_mf_dict[el] += float(el_count) * float(el_mass) / float(comp.molecular_weight)
# Now, update the DataFrame in a similar way to the fg_dict example
for key, value in el_dict.items():
df.at[comp_name, f"el_{key}"] = int(value)
for key, value in el_mf_dict.items():
df.at[comp_name, f"el_mf_{key}"] = float(value)
cols_el_mf = [col for col in df.columns if col.startswith("el_mf_")]
residual_els = df.loc[comp_name, cols_el_mf].sum() - 1
# check element sum
try:
assert residual_els <= precision_sum_elements
except AssertionError:
print(f"the total mass fraction of elements in {comp_name =} is > 0.001")
# apply fragmentation using the Fragmenter class (thanks simonmb)
frg = Fragmenter(
dict_classes_to_codes,
fragmentation_scheme_order=dict_classes_to_codes.keys(),
algorithm="simple",
)
fragmentation, _, _ = frg.fragment(comp.canonical_smiles)
fg_dict = {}
fg_mf_dict = {}
# Iterate over each item in the dictionary
for key, value in fragmentation.items():
# Determine the root key (the part before an underscore, if present)
root_key = key.split("_")[0]
# if root_key in hetero_atoms:
# pass
# Check if the root key is in the sum_dict; if not, initialize it
if root_key not in fg_dict:
fg_dict[root_key] = 0
fg_mf_dict[root_key] = 0
# Add the value to the corresponding root key in the sum_dict
fg_dict[root_key] += int(fragmentation[key])
fg_mf_dict[root_key] += (
float(fragmentation[key])
* float(dict_classes_to_mass_fractions[key])
/ df.loc[comp_name, "molecular_weight"].astype(float)
) # mass fraction of total
# Update df with fg_dict
for key, value in fg_dict.items():
df.at[comp_name, f"fg_{key}"] = int(value) # Update the cell
# Update df with fg_mf_dict
for key, value in fg_mf_dict.items():
df.at[comp_name, f"fg_mf_{key}"] = float(value) # Update the cell
cols_fg_mf = [col for col in df.columns if col.startswith("fg_mf")]
residual_fgs = df.loc[comp_name, cols_fg_mf].sum() - 1
try:
assert residual_fgs <= precision_sum_functional_group
except AssertionError:
print(f"{df.loc[comp_name, cols_fg_mf].sum()=}")
print(f"the total mass fraction of functional groups in {comp_name =} is > 0.05")
if residual_fgs < -precision_sum_functional_group:
df.at[comp_name, "fg_mf_unclassified"] = abs(residual_fgs)
df.loc[df["iupac_name"] != "unidentified"] = df.loc[df["iupac_name"] != "unidentified"].fillna(
0
)
return _order_columns_in_compounds_properties(df)
# %%
[docs]
def get_iupac_from_pcp(comp_name: str) -> str:
"""get iupac name for compound using pubchempy, needs internet connection
:param comp_name: _description_
:type comp_name: str
:return: lowercase iupac name for the compound
:rtype: str
"""
cond = True
while cond: # to deal with HTML issues on server sides (timeouts)
try:
# comp contains all info about the chemical from pubchem
try:
comp = pcp.get_compounds(comp_name, "name")[0]
except ValueError:
print(f"Calibration iupac addition: compund {comp_name} did not work")
try:
iup: str = comp.iupac_name
except AttributeError: # iupac_name not give
print(
f"Calibration iupac addition: compund {comp_name} does not have a iupac entry"
)
cond = False
except pcp.PubChemHTTPError: # timeout error, simply try again
print("Caught: pcp.PubChemHTTPError")
return iup.lower()
[docs]
def report_difference(rep1, rep2, diff_type="absolute"):
"""
calculates the ave, std and p percentage of the differnece between
two reports where columns and index are the same.
Replicates (indicated as XX_1, XX_2) are used for std.
Parameters
----------
rep1 : pd.DataFrame
report that is conisdered the reference to compute differences from.
rep2 : pd.DataFrame
report with the data to compute the difference.
diff_type : str, optional
type of difference, absolute vs relative (to rep1)
. The default is 'absolute'.
Returns
-------
dif_ave : pd.DataFrame
contains the average difference.
dif_std : pd.DataFrame
contains the std, same units as dif_ave.
dif_stdp : pd.DataFrame
contains the percentage std compared to ref1.
"""
idx_name = rep1.index.name
rep1 = rep1.transpose()
rep2 = rep2.transpose()
# put the exact same name on files (by removing the '_#' at end)
repl_idx1 = [i if "_" not in i else i.split("_")[0] for i in rep1.index.tolist()]
repl_idx2 = [i if "_" not in i else i.split("_")[0] for i in rep2.index.tolist()]
rep1.loc[:, idx_name] = repl_idx1
rep2.loc[:, idx_name] = repl_idx2
# compute files and std of files and update the index
rep_ave1 = rep1.groupby(idx_name, sort=False).mean().reset_index()
rep_std1 = rep1.groupby(idx_name, sort=False).std().reset_index()
rep_ave1.set_index(idx_name, inplace=True)
rep_std1.set_index(idx_name, inplace=True)
rep_ave2 = rep2.groupby(idx_name, sort=False).mean().reset_index()
rep_std2 = rep2.groupby(idx_name, sort=False).std().reset_index()
rep_ave2.set_index(idx_name, inplace=True)
rep_std2.set_index(idx_name, inplace=True)
if diff_type == "absolute":
dif_ave = rep_ave1 - rep_ave2
dif_std = np.sqrt(rep_std1**2 + rep_std2**2)
dif_stdp = np.sqrt(rep_std1**2 + rep_std2**2) / dif_ave * 100
if diff_type == "relative":
dif_ave = (rep_ave1 - rep_ave2) / rep_ave1
dif_std = np.sqrt(rep_std1**2 + rep_std2**2) / rep_ave1
dif_stdp = np.sqrt(rep_std1**2 + rep_std2**2) / rep_ave1 / dif_ave * 100
return dif_ave, dif_std, dif_stdp