Source code for hplc_data_analysis.fragmenter

import marshal
from rdkit import Chem
from rdkit.Chem import DataStructs
from rdkit.Chem import rdmolops
from rdkit.Chem.AllChem import (  # pylint: disable=no-name-in-module
    GetMorganFingerprintAsBitVect,
)


[docs] class Fragmenter: """ Class taken from https://github.com/simonmb/fragmentation_algorithm. The original version of this algorithm was published in: "Flexible Heuristic Algorithm for Automatic Molecule Fragmentation: Application to the UNIFAC Group Contribution Model DOI: 10.1186/s13321-019-0382-39." MIT License ... THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ # tested with Python 3.8.8 and RDKit version 2021.09.4 # does a substructure match and then checks whether the match # is adjacent to previous matches
[docs] @classmethod def get_substruct_matches( cls, mol_searched_for, mol_searched_in, atomIdxs_to_which_new_matches_have_to_be_adjacent, ): valid_matches = [] if mol_searched_in.GetNumAtoms() >= mol_searched_for.GetNumAtoms(): matches = mol_searched_in.GetSubstructMatches(mol_searched_for) if matches: for match in matches: add_this_match = True if len(atomIdxs_to_which_new_matches_have_to_be_adjacent) > 0: add_this_match = False for i in match: for neighbor in mol_searched_in.GetAtomWithIdx(i).GetNeighbors(): if ( neighbor.GetIdx() in atomIdxs_to_which_new_matches_have_to_be_adjacent ): add_this_match = True break if add_this_match: valid_matches.append(match) return valid_matches
# count heavier isotopes of hydrogen correctly
[docs] @classmethod def get_heavy_atom_count(cls, mol): heavy_atom_count = 0 for atom in mol.GetAtoms(): if atom.GetAtomicNum() != 1: heavy_atom_count += 1 return heavy_atom_count
def __init__( self, fragmentation_scheme={}, fragmentation_scheme_order=None, match_hydrogens=False, algorithm="", n_atoms_cuttoff=-1, function_to_choose_fragmentation=False, n_max_fragmentations_to_find=-1, ): if not type(fragmentation_scheme) is dict: raise TypeError( "fragmentation_scheme must be a dctionary with integers as keys and either strings or list of strings as values." ) if len(fragmentation_scheme) == 0: raise ValueError("fragmentation_scheme must be provided.") if not algorithm in ["simple", "complete", "combined"]: raise ValueError("Algorithm must be either simple ,complete or combined.") if algorithm == "simple": if n_max_fragmentations_to_find != -1: raise ValueError( "Setting n_max_fragmentations_to_find only makes sense with complete or combined algorithm." ) self.algorithm = algorithm if algorithm in ["combined", "complete"]: if n_atoms_cuttoff == -1: raise ValueError( "n_atoms_cuttoff needs to be specified for complete or combined algorithms." ) if function_to_choose_fragmentation == False: raise ValueError( "function_to_choose_fragmentation needs to be specified for complete or combined algorithms." ) if not callable(function_to_choose_fragmentation): raise TypeError("function_to_choose_fragmentation needs to be a function.") else: if type(function_to_choose_fragmentation([{}, {}])) != dict: raise TypeError( "function_to_choose_fragmentation needs to take a list of fragmentations and choose one of it" ) if n_max_fragmentations_to_find != -1: if n_max_fragmentations_to_find < 1: raise ValueError("n_max_fragmentations_to_find has to be 1 or higher.") if fragmentation_scheme_order is None: fragmentation_scheme_order = [] if algorithm in ["simple", "combined"]: assert len(fragmentation_scheme) == len(fragmentation_scheme_order) else: fragmentation_scheme_order = [key for key in fragmentation_scheme.keys()] self.n_max_fragmentations_to_find = n_max_fragmentations_to_find self.n_atoms_cuttoff = n_atoms_cuttoff self.match_hydrogens = match_hydrogens self.fragmentation_scheme = fragmentation_scheme self.function_to_choose_fragmentation = function_to_choose_fragmentation # create a lookup dictionaries to faster finding a group number self._fragmentation_scheme_group_number_lookup = {} self._fragmentation_scheme_pattern_lookup = {} self.fragmentation_scheme_order = fragmentation_scheme_order for group_number, list_SMARTS in fragmentation_scheme.items(): if type(list_SMARTS) is not list: list_SMARTS = [list_SMARTS] for SMARTS in list_SMARTS: if SMARTS != "": self._fragmentation_scheme_group_number_lookup[SMARTS] = group_number mol_SMARTS = Chem.MolFromSmarts(SMARTS) self._fragmentation_scheme_pattern_lookup[SMARTS] = mol_SMARTS
[docs] def fragment(self, SMILES_or_molecule): if type(SMILES_or_molecule) is str: mol_SMILES = Chem.MolFromSmiles(SMILES_or_molecule) mol_SMILES = Chem.AddHs(mol_SMILES) if self.match_hydrogens else mol_SMILES is_valid_SMILES = mol_SMILES is not None if not is_valid_SMILES: raise ValueError("Following SMILES is not valid: " + SMILES_or_molecule) else: mol_SMILES = SMILES_or_molecule # iterate over all separated molecules success = [] fragmentation = {} fragmentation_matches = {} for mol in rdmolops.GetMolFrags(mol_SMILES, asMols=True): this_mol_fragmentation, this_mol_success = self.__get_fragmentation(mol) for SMARTS, matches in this_mol_fragmentation.items(): group_number = self._fragmentation_scheme_group_number_lookup[SMARTS] if not group_number in fragmentation: fragmentation[group_number] = 0 fragmentation_matches[group_number] = [] fragmentation[group_number] += len(matches) fragmentation_matches[group_number].extend(matches) success.append(this_mol_success) return fragmentation, all(success), fragmentation_matches
[docs] def fragment_complete(self, SMILES_or_molecule): if type(SMILES_or_molecule) is str: mol_SMILES = Chem.MolFromSmiles(SMILES_or_molecule) mol_SMILES = Chem.AddHs(mol_SMILES) if self.match_hydrogens else mol_SMILES is_valid_SMILES = mol_SMILES is not None if not is_valid_SMILES: raise ValueError("Following SMILES is not valid: " + SMILES_or_molecule) else: mol_SMILES = SMILES_or_molecule if len(rdmolops.GetMolFrags(mol_SMILES)) != 1: raise ValueError("fragment_complete does not accept multifragment molecules.") temp_fragmentations, success = self.__complete_fragmentation(mol_SMILES) fragmentations = [] fragmentations_matches = [] for temp_fragmentation in temp_fragmentations: fragmentation = {} fragmentation_matches = {} for SMARTS, matches in temp_fragmentation.items(): group_number = self._fragmentation_scheme_group_number_lookup[SMARTS] fragmentation[group_number] = len(matches) fragmentation_matches[group_number] = matches fragmentations.append(fragmentation) fragmentations_matches.append(fragmentation_matches) return fragmentations, success, fragmentations_matches
def __get_fragmentation(self, mol_SMILES): success = False fragmentation = {} if self.algorithm in ["simple", "combined"]: fragmentation, success = self.__simple_fragmentation(mol_SMILES) if success: return fragmentation, success if self.algorithm in ["combined", "complete"]: fragmentations, success = self.__complete_fragmentation(mol_SMILES) if success: fragmentation = self.function_to_choose_fragmentation(fragmentations) return fragmentation, success def __simple_fragmentation(self, mol_SMILES): if self.match_hydrogens: target_atom_count = len(mol_SMILES.GetAtoms()) else: target_atom_count = Fragmenter.get_heavy_atom_count(mol_SMILES) success = False fragmentation = {} fragmentation, atomIdxs_included_in_fragmentation = self.__search_non_overlapping_solution( mol_SMILES, {}, set(), set() ) success = len(atomIdxs_included_in_fragmentation) == target_atom_count # if not successful, clean up molecule and search again level = 1 while not success: fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far = ( Fragmenter.__clean_molecule_surrounding_unmatched_atoms( mol_SMILES, fragmentation, atomIdxs_included_in_fragmentation, level ) ) level += 1 if len(atomIdxs_included_in_fragmentation_so_far) == 0: break fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far = ( self.__search_non_overlapping_solution( mol_SMILES, fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far, ) ) success = len(atomIdxs_included_in_fragmentation_so_far) == target_atom_count if success: fragmentation = fragmentation_so_far return fragmentation, success def __search_non_overlapping_solution( self, mol_searched_in, fragmentation, atomIdxs_included_in_fragmentation, atomIdxs_to_which_new_matches_have_to_be_adjacent, ): n_atomIdxs_included_in_fragmentation = len(atomIdxs_included_in_fragmentation) - 1 while n_atomIdxs_included_in_fragmentation != len(atomIdxs_included_in_fragmentation): n_atomIdxs_included_in_fragmentation = len(atomIdxs_included_in_fragmentation) for group_number in self.fragmentation_scheme_order: list_SMARTS = self.fragmentation_scheme[group_number] if type(list_SMARTS) is not list: list_SMARTS = [list_SMARTS] for SMARTS in list_SMARTS: if SMARTS != "": fragmentation, atomIdxs_included_in_fragmentation = ( self.__get_next_non_overlapping_match( mol_searched_in, SMARTS, fragmentation, atomIdxs_included_in_fragmentation, atomIdxs_to_which_new_matches_have_to_be_adjacent, ) ) return fragmentation, atomIdxs_included_in_fragmentation def __get_next_non_overlapping_match( self, mol_searched_in, SMARTS, fragmentation, atomIdxs_included_in_fragmentation, atomIdxs_to_which_new_matches_have_to_be_adjacent, ): mol_searched_for = self._fragmentation_scheme_pattern_lookup[SMARTS] if atomIdxs_to_which_new_matches_have_to_be_adjacent: matches = Fragmenter.get_substruct_matches( mol_searched_for, mol_searched_in, atomIdxs_to_which_new_matches_have_to_be_adjacent, ) else: matches = Fragmenter.get_substruct_matches(mol_searched_for, mol_searched_in, set()) if matches: for match in matches: all_atoms_of_new_match_are_unassigned = ( atomIdxs_included_in_fragmentation.isdisjoint(match) ) if all_atoms_of_new_match_are_unassigned: if not SMARTS in fragmentation: fragmentation[SMARTS] = [] fragmentation[SMARTS].append(match) atomIdxs_included_in_fragmentation.update(match) return fragmentation, atomIdxs_included_in_fragmentation @classmethod def __clean_molecule_surrounding_unmatched_atoms( cls, mol_searched_in, fragmentation, atomIdxs_included_in_fragmentation, level ): for i in range(0, level): atoms_missing = set( range(0, Fragmenter.get_heavy_atom_count(mol_searched_in)) ).difference(atomIdxs_included_in_fragmentation) new_fragmentation = marshal.loads(marshal.dumps(fragmentation)) for atomIdx in atoms_missing: for neighbor in mol_searched_in.GetAtomWithIdx(atomIdx).GetNeighbors(): for smart, atoms_found in fragmentation.items(): for atoms in atoms_found: if neighbor.GetIdx() in atoms: if smart in new_fragmentation: if new_fragmentation[smart].count(atoms) > 0: new_fragmentation[smart].remove(atoms) if smart in new_fragmentation: if len(new_fragmentation[smart]) == 0: new_fragmentation.pop(smart) new_atomIdxs_included_in_fragmentation = set() for i in new_fragmentation.values(): for j in i: new_atomIdxs_included_in_fragmentation.update(j) atomIdxs_included_in_fragmentation = new_atomIdxs_included_in_fragmentation fragmentation = new_fragmentation return fragmentation, atomIdxs_included_in_fragmentation def __complete_fragmentation(self, mol_SMILES): heavy_atom_count = Fragmenter.get_heavy_atom_count(mol_SMILES) if heavy_atom_count > self.n_atoms_cuttoff: return {}, False completed_fragmentations = [] groups_leading_to_incomplete_fragmentations = [] ( completed_fragmentations, groups_leading_to_incomplete_fragmentations, incomplete_fragmentation_found, ) = self.__get_next_non_overlapping_adjacent_match_recursively( mol_SMILES, heavy_atom_count, completed_fragmentations, groups_leading_to_incomplete_fragmentations, {}, set(), set(), self.n_max_fragmentations_to_find, ) success = len(completed_fragmentations) > 0 return completed_fragmentations, success def __get_next_non_overlapping_adjacent_match_recursively( self, mol_searched_in, heavy_atom_count, completed_fragmentations, groups_leading_to_incomplete_fragmentations, fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far, atomIdxs_to_which_new_matches_have_to_be_adjacent, n_max_fragmentations_to_find=-1, ): n_completed_fragmentations = len(completed_fragmentations) incomplete_fragmentation_found = False complete_fragmentation_found = False if len(completed_fragmentations) == n_max_fragmentations_to_find: return ( completed_fragmentations, groups_leading_to_incomplete_fragmentations, incomplete_fragmentation_found, ) for group_number in self.fragmentation_scheme_order: list_SMARTS = self.fragmentation_scheme[group_number] if complete_fragmentation_found: break if type(list_SMARTS) is not list: list_SMARTS = [list_SMARTS] for SMARTS in list_SMARTS: if complete_fragmentation_found: break if SMARTS != "": matches = Fragmenter.get_substruct_matches( self._fragmentation_scheme_pattern_lookup[SMARTS], mol_searched_in, atomIdxs_included_in_fragmentation_so_far, ) for match in matches: # only allow non-overlapping matches all_atoms_are_unassigned = ( atomIdxs_included_in_fragmentation_so_far.isdisjoint(match) ) if not all_atoms_are_unassigned: continue # only allow matches that do not contain groups leading to incomplete matches for ( groups_leading_to_incomplete_fragmentation ) in groups_leading_to_incomplete_fragmentations: if Fragmenter.__is_fragmentation_subset_of_other_fragmentation( groups_leading_to_incomplete_fragmentation, fragmentation_so_far, ): return ( completed_fragmentations, groups_leading_to_incomplete_fragmentations, incomplete_fragmentation_found, ) # only allow matches that will lead to new fragmentations use_this_match = True n_found_groups = len(fragmentation_so_far) for completed_fragmentation in completed_fragmentations: if not SMARTS in completed_fragmentation: continue if n_found_groups == 0: use_this_match = ( not Fragmenter.__is_match_contained_in_fragmentation( match, SMARTS, completed_fragmentation ) ) else: if Fragmenter.__is_fragmentation_subset_of_other_fragmentation( fragmentation_so_far, completed_fragmentation ): use_this_match = ( not Fragmenter.__is_match_contained_in_fragmentation( match, SMARTS, completed_fragmentation ) ) if not use_this_match: break if not use_this_match: continue # make a deepcopy here, otherwise the variables are modified down the road # marshal is used here because it works faster than copy.deepcopy this_SMARTS_fragmentation_so_far = marshal.loads( marshal.dumps(fragmentation_so_far) ) this_SMARTS_atomIdxs_included_in_fragmentation_so_far = ( atomIdxs_included_in_fragmentation_so_far.copy() ) if not SMARTS in this_SMARTS_fragmentation_so_far: this_SMARTS_fragmentation_so_far[SMARTS] = [] this_SMARTS_fragmentation_so_far[SMARTS].append(match) this_SMARTS_atomIdxs_included_in_fragmentation_so_far.update(match) # only allow matches that do not contain groups leading to incomplete matches for ( groups_leading_to_incomplete_match ) in groups_leading_to_incomplete_fragmentations: if Fragmenter.__is_fragmentation_subset_of_other_fragmentation( groups_leading_to_incomplete_match, this_SMARTS_fragmentation_so_far, ): use_this_match = False break if not use_this_match: continue # if the complete molecule has not been fragmented, continue to do so if ( len(this_SMARTS_atomIdxs_included_in_fragmentation_so_far) < heavy_atom_count ): ( completed_fragmentations, groups_leading_to_incomplete_fragmentations, incomplete_fragmentation_found, ) = self.__get_next_non_overlapping_adjacent_match_recursively( mol_searched_in, heavy_atom_count, completed_fragmentations, groups_leading_to_incomplete_fragmentations, this_SMARTS_fragmentation_so_far, this_SMARTS_atomIdxs_included_in_fragmentation_so_far, this_SMARTS_atomIdxs_included_in_fragmentation_so_far, n_max_fragmentations_to_find, ) break # if the complete molecule has been fragmented, save and return if ( len(this_SMARTS_atomIdxs_included_in_fragmentation_so_far) == heavy_atom_count ): completed_fragmentations.append(this_SMARTS_fragmentation_so_far) complete_fragmentation_found = True break # if until here no new fragmentation was found check whether an incomplete fragmentation was found if n_completed_fragmentations == len(completed_fragmentations): if not incomplete_fragmentation_found: incomplete_matched_groups = {} if len(atomIdxs_included_in_fragmentation_so_far) > 0: unassignes_atom_idx = set(range(0, heavy_atom_count)).difference( atomIdxs_included_in_fragmentation_so_far ) for atom_idx in unassignes_atom_idx: neighbor_atoms_idx = [ i.GetIdx() for i in mol_searched_in.GetAtomWithIdx(atom_idx).GetNeighbors() ] for neighbor_atom_idx in neighbor_atoms_idx: for ( found_smarts, found_matches, ) in fragmentation_so_far.items(): for found_match in found_matches: if neighbor_atom_idx in found_match: if not found_smarts in incomplete_matched_groups: incomplete_matched_groups[found_smarts] = [] if ( found_match not in incomplete_matched_groups[found_smarts] ): incomplete_matched_groups[found_smarts].append( found_match ) is_subset_of_groups_already_found = False indexes_to_remove = [] for idx, groups_leading_to_incomplete_match in enumerate( groups_leading_to_incomplete_fragmentations ): is_subset_of_groups_already_found = ( Fragmenter.__is_fragmentation_subset_of_other_fragmentation( incomplete_matched_groups, groups_leading_to_incomplete_match, ) ) if is_subset_of_groups_already_found: indexes_to_remove.append(idx) for index in sorted(indexes_to_remove, reverse=True): del groups_leading_to_incomplete_fragmentations[index] groups_leading_to_incomplete_fragmentations.append(incomplete_matched_groups) groups_leading_to_incomplete_fragmentations = sorted( groups_leading_to_incomplete_fragmentations, key=len ) incomplete_fragmentation_found = True return ( completed_fragmentations, groups_leading_to_incomplete_fragmentations, incomplete_fragmentation_found, ) @classmethod def __is_fragmentation_subset_of_other_fragmentation(cls, fragmentation, other_fragmentation): n_found_groups = len(fragmentation) n_found_other_groups = len(other_fragmentation) if n_found_groups == 0: return False if n_found_other_groups < n_found_groups: return False n_found_SMARTS_that_are_subset = 0 for found_SMARTS, _ in fragmentation.items(): if found_SMARTS in other_fragmentation: found_matches_set = set(frozenset(i) for i in fragmentation[found_SMARTS]) found_other_matches_set = set( frozenset(i) for i in other_fragmentation[found_SMARTS] ) if found_matches_set.issubset(found_other_matches_set): n_found_SMARTS_that_are_subset += 1 else: return False return n_found_SMARTS_that_are_subset == n_found_groups @classmethod def __is_match_contained_in_fragmentation(cls, match, SMARTS, fragmentation): if not SMARTS in fragmentation: return False found_matches_set = set(frozenset(i) for i in fragmentation[SMARTS]) match_set = set(match) return match_set in found_matches_set
if __name__ == "__main__": smiles = ["CCCCO", "CCCO", "CCO", "CO"] fragmentation_scheme = { "CH2": "[CH2]", "OH": "[OH]", "CH3": "[CH3]", "CH2-CH2": "[CH2][CH2]", } fragmentation_scheme_order1 = ["CH2-CH2", "CH3", "CH2", "OH"] print("simple algorithm 1") frg = Fragmenter( fragmentation_scheme, fragmentation_scheme_order=fragmentation_scheme_order1, algorithm="simple", ) for smi in smiles: fragmentation, success, fragmentation_matches = frg.fragment(smi) print(smi, fragmentation) print() print("simple algorithm 2") fragmentation_scheme_order2 = ["CH3", "CH2", "CH2-CH2", "OH"] frg = Fragmenter( fragmentation_scheme, fragmentation_scheme_order=fragmentation_scheme_order2, algorithm="simple", ) for smi in smiles: fragmentation, success, fragmentation_matches = frg.fragment(smi) print(smi, fragmentation) print() print("complete algorithm 1") frg = Fragmenter( fragmentation_scheme, algorithm="complete", n_atoms_cuttoff=30, function_to_choose_fragmentation=lambda x: x[0], ) for smi in smiles: fragmentation, success, fragmentation_matches = frg.fragment(smi) print(smi, fragmentation) print() print("complete algorithm 2") frg = Fragmenter( fragmentation_scheme, algorithm="complete", n_atoms_cuttoff=30, function_to_choose_fragmentation=lambda x: x[0], ) for smi in smiles: fragmentations, success, fragmentations_matches = frg.fragment_complete(smi) print(smi, fragmentations) print( fragmentations_matches ) # some of the fragmentations are the same, but the found fragmentation_matches are different.