import marshal
from rdkit import Chem
from rdkit.Chem import DataStructs
from rdkit.Chem import rdmolops
from rdkit.Chem.AllChem import ( # pylint: disable=no-name-in-module
GetMorganFingerprintAsBitVect,
)
[docs]
class Fragmenter:
"""
Class taken from https://github.com/simonmb/fragmentation_algorithm.
The original version of this algorithm was published in:
"Flexible Heuristic Algorithm for Automatic Molecule Fragmentation:
Application to the UNIFAC Group Contribution Model
DOI: 10.1186/s13321-019-0382-39."
MIT License
...
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
# tested with Python 3.8.8 and RDKit version 2021.09.4
# does a substructure match and then checks whether the match
# is adjacent to previous matches
[docs]
@classmethod
def get_substruct_matches(
cls,
mol_searched_for,
mol_searched_in,
atomIdxs_to_which_new_matches_have_to_be_adjacent,
):
valid_matches = []
if mol_searched_in.GetNumAtoms() >= mol_searched_for.GetNumAtoms():
matches = mol_searched_in.GetSubstructMatches(mol_searched_for)
if matches:
for match in matches:
add_this_match = True
if len(atomIdxs_to_which_new_matches_have_to_be_adjacent) > 0:
add_this_match = False
for i in match:
for neighbor in mol_searched_in.GetAtomWithIdx(i).GetNeighbors():
if (
neighbor.GetIdx()
in atomIdxs_to_which_new_matches_have_to_be_adjacent
):
add_this_match = True
break
if add_this_match:
valid_matches.append(match)
return valid_matches
# count heavier isotopes of hydrogen correctly
[docs]
@classmethod
def get_heavy_atom_count(cls, mol):
heavy_atom_count = 0
for atom in mol.GetAtoms():
if atom.GetAtomicNum() != 1:
heavy_atom_count += 1
return heavy_atom_count
def __init__(
self,
fragmentation_scheme={},
fragmentation_scheme_order=None,
match_hydrogens=False,
algorithm="",
n_atoms_cuttoff=-1,
function_to_choose_fragmentation=False,
n_max_fragmentations_to_find=-1,
):
if not type(fragmentation_scheme) is dict:
raise TypeError(
"fragmentation_scheme must be a dctionary with integers as keys and either strings or list of strings as values."
)
if len(fragmentation_scheme) == 0:
raise ValueError("fragmentation_scheme must be provided.")
if not algorithm in ["simple", "complete", "combined"]:
raise ValueError("Algorithm must be either simple ,complete or combined.")
if algorithm == "simple":
if n_max_fragmentations_to_find != -1:
raise ValueError(
"Setting n_max_fragmentations_to_find only makes sense with complete or combined algorithm."
)
self.algorithm = algorithm
if algorithm in ["combined", "complete"]:
if n_atoms_cuttoff == -1:
raise ValueError(
"n_atoms_cuttoff needs to be specified for complete or combined algorithms."
)
if function_to_choose_fragmentation == False:
raise ValueError(
"function_to_choose_fragmentation needs to be specified for complete or combined algorithms."
)
if not callable(function_to_choose_fragmentation):
raise TypeError("function_to_choose_fragmentation needs to be a function.")
else:
if type(function_to_choose_fragmentation([{}, {}])) != dict:
raise TypeError(
"function_to_choose_fragmentation needs to take a list of fragmentations and choose one of it"
)
if n_max_fragmentations_to_find != -1:
if n_max_fragmentations_to_find < 1:
raise ValueError("n_max_fragmentations_to_find has to be 1 or higher.")
if fragmentation_scheme_order is None:
fragmentation_scheme_order = []
if algorithm in ["simple", "combined"]:
assert len(fragmentation_scheme) == len(fragmentation_scheme_order)
else:
fragmentation_scheme_order = [key for key in fragmentation_scheme.keys()]
self.n_max_fragmentations_to_find = n_max_fragmentations_to_find
self.n_atoms_cuttoff = n_atoms_cuttoff
self.match_hydrogens = match_hydrogens
self.fragmentation_scheme = fragmentation_scheme
self.function_to_choose_fragmentation = function_to_choose_fragmentation
# create a lookup dictionaries to faster finding a group number
self._fragmentation_scheme_group_number_lookup = {}
self._fragmentation_scheme_pattern_lookup = {}
self.fragmentation_scheme_order = fragmentation_scheme_order
for group_number, list_SMARTS in fragmentation_scheme.items():
if type(list_SMARTS) is not list:
list_SMARTS = [list_SMARTS]
for SMARTS in list_SMARTS:
if SMARTS != "":
self._fragmentation_scheme_group_number_lookup[SMARTS] = group_number
mol_SMARTS = Chem.MolFromSmarts(SMARTS)
self._fragmentation_scheme_pattern_lookup[SMARTS] = mol_SMARTS
[docs]
def fragment(self, SMILES_or_molecule):
if type(SMILES_or_molecule) is str:
mol_SMILES = Chem.MolFromSmiles(SMILES_or_molecule)
mol_SMILES = Chem.AddHs(mol_SMILES) if self.match_hydrogens else mol_SMILES
is_valid_SMILES = mol_SMILES is not None
if not is_valid_SMILES:
raise ValueError("Following SMILES is not valid: " + SMILES_or_molecule)
else:
mol_SMILES = SMILES_or_molecule
# iterate over all separated molecules
success = []
fragmentation = {}
fragmentation_matches = {}
for mol in rdmolops.GetMolFrags(mol_SMILES, asMols=True):
this_mol_fragmentation, this_mol_success = self.__get_fragmentation(mol)
for SMARTS, matches in this_mol_fragmentation.items():
group_number = self._fragmentation_scheme_group_number_lookup[SMARTS]
if not group_number in fragmentation:
fragmentation[group_number] = 0
fragmentation_matches[group_number] = []
fragmentation[group_number] += len(matches)
fragmentation_matches[group_number].extend(matches)
success.append(this_mol_success)
return fragmentation, all(success), fragmentation_matches
[docs]
def fragment_complete(self, SMILES_or_molecule):
if type(SMILES_or_molecule) is str:
mol_SMILES = Chem.MolFromSmiles(SMILES_or_molecule)
mol_SMILES = Chem.AddHs(mol_SMILES) if self.match_hydrogens else mol_SMILES
is_valid_SMILES = mol_SMILES is not None
if not is_valid_SMILES:
raise ValueError("Following SMILES is not valid: " + SMILES_or_molecule)
else:
mol_SMILES = SMILES_or_molecule
if len(rdmolops.GetMolFrags(mol_SMILES)) != 1:
raise ValueError("fragment_complete does not accept multifragment molecules.")
temp_fragmentations, success = self.__complete_fragmentation(mol_SMILES)
fragmentations = []
fragmentations_matches = []
for temp_fragmentation in temp_fragmentations:
fragmentation = {}
fragmentation_matches = {}
for SMARTS, matches in temp_fragmentation.items():
group_number = self._fragmentation_scheme_group_number_lookup[SMARTS]
fragmentation[group_number] = len(matches)
fragmentation_matches[group_number] = matches
fragmentations.append(fragmentation)
fragmentations_matches.append(fragmentation_matches)
return fragmentations, success, fragmentations_matches
def __get_fragmentation(self, mol_SMILES):
success = False
fragmentation = {}
if self.algorithm in ["simple", "combined"]:
fragmentation, success = self.__simple_fragmentation(mol_SMILES)
if success:
return fragmentation, success
if self.algorithm in ["combined", "complete"]:
fragmentations, success = self.__complete_fragmentation(mol_SMILES)
if success:
fragmentation = self.function_to_choose_fragmentation(fragmentations)
return fragmentation, success
def __simple_fragmentation(self, mol_SMILES):
if self.match_hydrogens:
target_atom_count = len(mol_SMILES.GetAtoms())
else:
target_atom_count = Fragmenter.get_heavy_atom_count(mol_SMILES)
success = False
fragmentation = {}
fragmentation, atomIdxs_included_in_fragmentation = self.__search_non_overlapping_solution(
mol_SMILES, {}, set(), set()
)
success = len(atomIdxs_included_in_fragmentation) == target_atom_count
# if not successful, clean up molecule and search again
level = 1
while not success:
fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far = (
Fragmenter.__clean_molecule_surrounding_unmatched_atoms(
mol_SMILES, fragmentation, atomIdxs_included_in_fragmentation, level
)
)
level += 1
if len(atomIdxs_included_in_fragmentation_so_far) == 0:
break
fragmentation_so_far, atomIdxs_included_in_fragmentation_so_far = (
self.__search_non_overlapping_solution(
mol_SMILES,
fragmentation_so_far,
atomIdxs_included_in_fragmentation_so_far,
atomIdxs_included_in_fragmentation_so_far,
)
)
success = len(atomIdxs_included_in_fragmentation_so_far) == target_atom_count
if success:
fragmentation = fragmentation_so_far
return fragmentation, success
def __search_non_overlapping_solution(
self,
mol_searched_in,
fragmentation,
atomIdxs_included_in_fragmentation,
atomIdxs_to_which_new_matches_have_to_be_adjacent,
):
n_atomIdxs_included_in_fragmentation = len(atomIdxs_included_in_fragmentation) - 1
while n_atomIdxs_included_in_fragmentation != len(atomIdxs_included_in_fragmentation):
n_atomIdxs_included_in_fragmentation = len(atomIdxs_included_in_fragmentation)
for group_number in self.fragmentation_scheme_order:
list_SMARTS = self.fragmentation_scheme[group_number]
if type(list_SMARTS) is not list:
list_SMARTS = [list_SMARTS]
for SMARTS in list_SMARTS:
if SMARTS != "":
fragmentation, atomIdxs_included_in_fragmentation = (
self.__get_next_non_overlapping_match(
mol_searched_in,
SMARTS,
fragmentation,
atomIdxs_included_in_fragmentation,
atomIdxs_to_which_new_matches_have_to_be_adjacent,
)
)
return fragmentation, atomIdxs_included_in_fragmentation
def __get_next_non_overlapping_match(
self,
mol_searched_in,
SMARTS,
fragmentation,
atomIdxs_included_in_fragmentation,
atomIdxs_to_which_new_matches_have_to_be_adjacent,
):
mol_searched_for = self._fragmentation_scheme_pattern_lookup[SMARTS]
if atomIdxs_to_which_new_matches_have_to_be_adjacent:
matches = Fragmenter.get_substruct_matches(
mol_searched_for,
mol_searched_in,
atomIdxs_to_which_new_matches_have_to_be_adjacent,
)
else:
matches = Fragmenter.get_substruct_matches(mol_searched_for, mol_searched_in, set())
if matches:
for match in matches:
all_atoms_of_new_match_are_unassigned = (
atomIdxs_included_in_fragmentation.isdisjoint(match)
)
if all_atoms_of_new_match_are_unassigned:
if not SMARTS in fragmentation:
fragmentation[SMARTS] = []
fragmentation[SMARTS].append(match)
atomIdxs_included_in_fragmentation.update(match)
return fragmentation, atomIdxs_included_in_fragmentation
@classmethod
def __clean_molecule_surrounding_unmatched_atoms(
cls, mol_searched_in, fragmentation, atomIdxs_included_in_fragmentation, level
):
for i in range(0, level):
atoms_missing = set(
range(0, Fragmenter.get_heavy_atom_count(mol_searched_in))
).difference(atomIdxs_included_in_fragmentation)
new_fragmentation = marshal.loads(marshal.dumps(fragmentation))
for atomIdx in atoms_missing:
for neighbor in mol_searched_in.GetAtomWithIdx(atomIdx).GetNeighbors():
for smart, atoms_found in fragmentation.items():
for atoms in atoms_found:
if neighbor.GetIdx() in atoms:
if smart in new_fragmentation:
if new_fragmentation[smart].count(atoms) > 0:
new_fragmentation[smart].remove(atoms)
if smart in new_fragmentation:
if len(new_fragmentation[smart]) == 0:
new_fragmentation.pop(smart)
new_atomIdxs_included_in_fragmentation = set()
for i in new_fragmentation.values():
for j in i:
new_atomIdxs_included_in_fragmentation.update(j)
atomIdxs_included_in_fragmentation = new_atomIdxs_included_in_fragmentation
fragmentation = new_fragmentation
return fragmentation, atomIdxs_included_in_fragmentation
def __complete_fragmentation(self, mol_SMILES):
heavy_atom_count = Fragmenter.get_heavy_atom_count(mol_SMILES)
if heavy_atom_count > self.n_atoms_cuttoff:
return {}, False
completed_fragmentations = []
groups_leading_to_incomplete_fragmentations = []
(
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
incomplete_fragmentation_found,
) = self.__get_next_non_overlapping_adjacent_match_recursively(
mol_SMILES,
heavy_atom_count,
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
{},
set(),
set(),
self.n_max_fragmentations_to_find,
)
success = len(completed_fragmentations) > 0
return completed_fragmentations, success
def __get_next_non_overlapping_adjacent_match_recursively(
self,
mol_searched_in,
heavy_atom_count,
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
fragmentation_so_far,
atomIdxs_included_in_fragmentation_so_far,
atomIdxs_to_which_new_matches_have_to_be_adjacent,
n_max_fragmentations_to_find=-1,
):
n_completed_fragmentations = len(completed_fragmentations)
incomplete_fragmentation_found = False
complete_fragmentation_found = False
if len(completed_fragmentations) == n_max_fragmentations_to_find:
return (
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
incomplete_fragmentation_found,
)
for group_number in self.fragmentation_scheme_order:
list_SMARTS = self.fragmentation_scheme[group_number]
if complete_fragmentation_found:
break
if type(list_SMARTS) is not list:
list_SMARTS = [list_SMARTS]
for SMARTS in list_SMARTS:
if complete_fragmentation_found:
break
if SMARTS != "":
matches = Fragmenter.get_substruct_matches(
self._fragmentation_scheme_pattern_lookup[SMARTS],
mol_searched_in,
atomIdxs_included_in_fragmentation_so_far,
)
for match in matches:
# only allow non-overlapping matches
all_atoms_are_unassigned = (
atomIdxs_included_in_fragmentation_so_far.isdisjoint(match)
)
if not all_atoms_are_unassigned:
continue
# only allow matches that do not contain groups leading to incomplete matches
for (
groups_leading_to_incomplete_fragmentation
) in groups_leading_to_incomplete_fragmentations:
if Fragmenter.__is_fragmentation_subset_of_other_fragmentation(
groups_leading_to_incomplete_fragmentation,
fragmentation_so_far,
):
return (
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
incomplete_fragmentation_found,
)
# only allow matches that will lead to new fragmentations
use_this_match = True
n_found_groups = len(fragmentation_so_far)
for completed_fragmentation in completed_fragmentations:
if not SMARTS in completed_fragmentation:
continue
if n_found_groups == 0:
use_this_match = (
not Fragmenter.__is_match_contained_in_fragmentation(
match, SMARTS, completed_fragmentation
)
)
else:
if Fragmenter.__is_fragmentation_subset_of_other_fragmentation(
fragmentation_so_far, completed_fragmentation
):
use_this_match = (
not Fragmenter.__is_match_contained_in_fragmentation(
match, SMARTS, completed_fragmentation
)
)
if not use_this_match:
break
if not use_this_match:
continue
# make a deepcopy here, otherwise the variables are modified down the road
# marshal is used here because it works faster than copy.deepcopy
this_SMARTS_fragmentation_so_far = marshal.loads(
marshal.dumps(fragmentation_so_far)
)
this_SMARTS_atomIdxs_included_in_fragmentation_so_far = (
atomIdxs_included_in_fragmentation_so_far.copy()
)
if not SMARTS in this_SMARTS_fragmentation_so_far:
this_SMARTS_fragmentation_so_far[SMARTS] = []
this_SMARTS_fragmentation_so_far[SMARTS].append(match)
this_SMARTS_atomIdxs_included_in_fragmentation_so_far.update(match)
# only allow matches that do not contain groups leading to incomplete matches
for (
groups_leading_to_incomplete_match
) in groups_leading_to_incomplete_fragmentations:
if Fragmenter.__is_fragmentation_subset_of_other_fragmentation(
groups_leading_to_incomplete_match,
this_SMARTS_fragmentation_so_far,
):
use_this_match = False
break
if not use_this_match:
continue
# if the complete molecule has not been fragmented, continue to do so
if (
len(this_SMARTS_atomIdxs_included_in_fragmentation_so_far)
< heavy_atom_count
):
(
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
incomplete_fragmentation_found,
) = self.__get_next_non_overlapping_adjacent_match_recursively(
mol_searched_in,
heavy_atom_count,
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
this_SMARTS_fragmentation_so_far,
this_SMARTS_atomIdxs_included_in_fragmentation_so_far,
this_SMARTS_atomIdxs_included_in_fragmentation_so_far,
n_max_fragmentations_to_find,
)
break
# if the complete molecule has been fragmented, save and return
if (
len(this_SMARTS_atomIdxs_included_in_fragmentation_so_far)
== heavy_atom_count
):
completed_fragmentations.append(this_SMARTS_fragmentation_so_far)
complete_fragmentation_found = True
break
# if until here no new fragmentation was found check whether an incomplete fragmentation was found
if n_completed_fragmentations == len(completed_fragmentations):
if not incomplete_fragmentation_found:
incomplete_matched_groups = {}
if len(atomIdxs_included_in_fragmentation_so_far) > 0:
unassignes_atom_idx = set(range(0, heavy_atom_count)).difference(
atomIdxs_included_in_fragmentation_so_far
)
for atom_idx in unassignes_atom_idx:
neighbor_atoms_idx = [
i.GetIdx()
for i in mol_searched_in.GetAtomWithIdx(atom_idx).GetNeighbors()
]
for neighbor_atom_idx in neighbor_atoms_idx:
for (
found_smarts,
found_matches,
) in fragmentation_so_far.items():
for found_match in found_matches:
if neighbor_atom_idx in found_match:
if not found_smarts in incomplete_matched_groups:
incomplete_matched_groups[found_smarts] = []
if (
found_match
not in incomplete_matched_groups[found_smarts]
):
incomplete_matched_groups[found_smarts].append(
found_match
)
is_subset_of_groups_already_found = False
indexes_to_remove = []
for idx, groups_leading_to_incomplete_match in enumerate(
groups_leading_to_incomplete_fragmentations
):
is_subset_of_groups_already_found = (
Fragmenter.__is_fragmentation_subset_of_other_fragmentation(
incomplete_matched_groups,
groups_leading_to_incomplete_match,
)
)
if is_subset_of_groups_already_found:
indexes_to_remove.append(idx)
for index in sorted(indexes_to_remove, reverse=True):
del groups_leading_to_incomplete_fragmentations[index]
groups_leading_to_incomplete_fragmentations.append(incomplete_matched_groups)
groups_leading_to_incomplete_fragmentations = sorted(
groups_leading_to_incomplete_fragmentations, key=len
)
incomplete_fragmentation_found = True
return (
completed_fragmentations,
groups_leading_to_incomplete_fragmentations,
incomplete_fragmentation_found,
)
@classmethod
def __is_fragmentation_subset_of_other_fragmentation(cls, fragmentation, other_fragmentation):
n_found_groups = len(fragmentation)
n_found_other_groups = len(other_fragmentation)
if n_found_groups == 0:
return False
if n_found_other_groups < n_found_groups:
return False
n_found_SMARTS_that_are_subset = 0
for found_SMARTS, _ in fragmentation.items():
if found_SMARTS in other_fragmentation:
found_matches_set = set(frozenset(i) for i in fragmentation[found_SMARTS])
found_other_matches_set = set(
frozenset(i) for i in other_fragmentation[found_SMARTS]
)
if found_matches_set.issubset(found_other_matches_set):
n_found_SMARTS_that_are_subset += 1
else:
return False
return n_found_SMARTS_that_are_subset == n_found_groups
@classmethod
def __is_match_contained_in_fragmentation(cls, match, SMARTS, fragmentation):
if not SMARTS in fragmentation:
return False
found_matches_set = set(frozenset(i) for i in fragmentation[SMARTS])
match_set = set(match)
return match_set in found_matches_set
if __name__ == "__main__":
smiles = ["CCCCO", "CCCO", "CCO", "CO"]
fragmentation_scheme = {
"CH2": "[CH2]",
"OH": "[OH]",
"CH3": "[CH3]",
"CH2-CH2": "[CH2][CH2]",
}
fragmentation_scheme_order1 = ["CH2-CH2", "CH3", "CH2", "OH"]
print("simple algorithm 1")
frg = Fragmenter(
fragmentation_scheme,
fragmentation_scheme_order=fragmentation_scheme_order1,
algorithm="simple",
)
for smi in smiles:
fragmentation, success, fragmentation_matches = frg.fragment(smi)
print(smi, fragmentation)
print()
print("simple algorithm 2")
fragmentation_scheme_order2 = ["CH3", "CH2", "CH2-CH2", "OH"]
frg = Fragmenter(
fragmentation_scheme,
fragmentation_scheme_order=fragmentation_scheme_order2,
algorithm="simple",
)
for smi in smiles:
fragmentation, success, fragmentation_matches = frg.fragment(smi)
print(smi, fragmentation)
print()
print("complete algorithm 1")
frg = Fragmenter(
fragmentation_scheme,
algorithm="complete",
n_atoms_cuttoff=30,
function_to_choose_fragmentation=lambda x: x[0],
)
for smi in smiles:
fragmentation, success, fragmentation_matches = frg.fragment(smi)
print(smi, fragmentation)
print()
print("complete algorithm 2")
frg = Fragmenter(
fragmentation_scheme,
algorithm="complete",
n_atoms_cuttoff=30,
function_to_choose_fragmentation=lambda x: x[0],
)
for smi in smiles:
fragmentations, success, fragmentations_matches = frg.fragment_complete(smi)
print(smi, fragmentations)
print(
fragmentations_matches
) # some of the fragmentations are the same, but the found fragmentation_matches are different.