Skip to content

Commit

Permalink
182 Phenopacket distance algorithm (#187)
Browse files Browse the repository at this point in the history
* added files for tree comparison

* implemented method signature of method to compare the structure of dicts

* added traversal of dict like tree

* added function to create dict from phenopacket

* implemented compare structure

* created method stub for difference tree method

* renamed `edit_distance.py` to `structure.py`

* created logic for difference tree for dicts

* added logic for lists and no match

* created test for assign_at

* debugged assign at

* added test for difference tree

* fixed mini bug in difference tree

* assign at can now add a new key to the dict

* fixed bug in difference tree

* fixed test structure

* had to adjust test condition

* removed trying out in main method

* wrote test case for compare structure

* updated traversal to include 'list' as if it was a key

* made it more clear that this is not the word list but a list obj

* removed warning that difference tree is not being created

* extended test cases for test_compare_structure

* added the option to include vals in compare structure

* added method definition of edit_distance

* added comment to describe

* checking that all costs are non-negative integers

* changed validating costs to a method that can be used to check costs in the method

ensures costs are valid also from supplied cost methods

* basic logic to check edit distance 0 if there is no difference in structure

* added method stub to calculate edit distance between 2 subtrees

* moved edit distance code to own file, restructured project

* finished logic of edit distance

* insertion and deletion cost is the same

* rewrote compare structure method, can now do only pos check w/o diff tree creation

* renamed modification to substitution

* rewrote logic of edit dist to avoid traversing the tree over and over again. only traversing once now

* turned costs given as parameters to methods

* can take ints as well as methods for costs

* updated method comments to reflect changes, also able to return float

* also updated def of private helper method

* added todo comment

* return 1 for the moment

* added subtree change cost

* updated check cost valid method to accept floats as well

* validating subtree cost

* renamed to subtree substitution cost

* line too long

* typo

* add missing comma

* remove diff assignment, not receiving diff tree anymore

* adding check cost valid to priv helper method

* added blueprint for calculating costs

* renamed check_cost_valid to validate cost

* added tests for edit dist

* artefact from copying from diff tree method in edit dist

* added more code for tests

* ammended todo comment

* ipynb to test edit dist etc

* removed unused imports

* commented out test cases
  • Loading branch information
frehburg authored Nov 29, 2023
1 parent d2dffda commit 09cf8df
Show file tree
Hide file tree
Showing 8 changed files with 3,267 additions and 0 deletions.
166 changes: 166 additions & 0 deletions ERKER2Phenopackets/src/analysis/tree_comparison/edit_dist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import uuid
from collections import deque
from typing import Dict, Optional, Union, Callable, Any

from .structure import compare_structure

T = Union[int, float]


def edit_distance(
d1: Dict, d2: Dict,
d1_id: Optional[Union[int, str]] = uuid.uuid4(),
d2_id: Optional[Union[int, str]] = uuid.uuid4(),
subtree_substitution_cost: T = 1,
insertion_cost: Union[int, float, Callable[[Any], T]] = 1,
val_substitution_cost: Union[int, float, Callable[[Any, Any], T]] = 1
) -> T:
"""
Calculates the edit distance between two dictionaries.
:param d1: First dictionary
:type d1: Dict
:param d2: Second dictionary
:type d2: Dict
:param d1_id: Identifier for first dictionary, defaults to random UUID
:type d1_id: Optional[Union[int, str]], optional
:param d2_id: Identifier for second dictionary, defaults to random UUID
:type d2_id: Optional[Union[int, str]], optional
:param subtree_substitution_cost: Cost for changing a subtree, if this is assigned,
insertion cost and substitution cost are ignored, defaults 1
:type subtree_substitution_cost: Union[int, float]
:param insertion_cost: Cost for inserting a key, can be a method taking the
inserted element as a parameter, defaults to 1
:type insertion_cost: Union[int, float, Callable[[Any], Union[int, float]]]
:param val_substitution_cost: Cost for changing a value, can be a method taking
the first and the second value as argument, defaults to 1
:type val_substitution_cost:
Union[int, float, Callable[[Any, Any], Union[int, float]]]
:return: Edit distance between the two dictionaries
:rtype: int
"""

def validate_cost(cost_val: T, cost_label: str):
"""surround each cost call with this method to check if the cost is valid"""
if not isinstance(cost_val, (int, float)) or cost_val < 0:
raise ValueError(f'{cost_label} {cost_val} must be a non-negative '
f' integer or floating point number')
return cost_val

if isinstance(subtree_substitution_cost, (int, float)):
validate_cost(
subtree_substitution_cost,
'subtree_substitution_cost'
)

if isinstance(insertion_cost, (int, float)):
validate_cost(insertion_cost, 'insertion_cost')

def insertion_cost(inserted):
return insertion_cost

if isinstance(val_substitution_cost, (int, float)):
validate_cost(val_substitution_cost, 'val_substitution_cost')

def val_substitution_cost(val1, val2):
return val_substitution_cost

equals = compare_structure(
d1, d2,
d1_id, d2_id,
include_vals=bool(val_substitution_cost),
construct_diff_tree=False
)

if equals:
return 0

cost = 0

q1 = deque()
q1.append(d1)

q2 = deque()
q2.append(d2)

while q1:
n1 = q1.popleft()
n2 = q2.popleft()

if isinstance(n1, dict) and isinstance(n2, dict):
for k1, v1, k2, v2 in zip(n1.keys(), n1.values(), n2.keys(), n2.values()):
if k1 == k2:
q1.append(v1)
q2.append(v2)
elif k1 != k2:
cost += _calculate_edit_distance(
subtree1={k1: v1},
subtree2={k2: v2},
validate_cost=validate_cost,
insertion_cost=insertion_cost,
val_substitution_cost=val_substitution_cost
)

elif (isinstance(n1, list) or isinstance(n1, tuple)) and \
(isinstance(n2, list) or isinstance(n2, tuple)):
if n1 != n2:
cost += _calculate_edit_distance(
subtree1={'k': n1},
subtree2={'k': n2},
validate_cost=validate_cost,
insertion_cost=insertion_cost,
val_substitution_cost=val_substitution_cost
)

return cost


def _calculate_edit_distance(
subtree1: Dict, subtree2: Dict,
validate_cost: Callable[[T, str], T],
subtree_substitution_cost: T = 1,
insertion_cost: Union[int, float, Callable[[Any], T]] = 1,
val_substitution_cost: Union[int, float, Callable[[Any, Any], T]] = 1,
) -> T:
"""
Calculates the edit distance between two subtrees.
:param subtree1: First subtree
:type subtree1: Dict
:param subtree2: Second subtree
:type subtree2: Dict
:param validate_cost: Method to check if the cost is valid
:type validate_cost: Callable[[T, str], T]
:param subtree_substitution_cost: Cost for changing a subtree, if this is assigned,
insertion cost and substitution cost are ignored, defaults 1
:type subtree_substitution_cost: Union[int, float]
:param insertion_cost: Cost for inserting a key, can be a method taking the
inserted element as a parameter, defaults to 1
:type insertion_cost: Union[int, float, Callable[[Any], Union[int, float]]]
:param val_substitution_cost: Cost for changing a value, can be a method taking
the first and the second value as argument, defaults to 1
:type val_substitution_cost:
Union[int, float, Callable[[Any, Any], Union[int, float]]]
:return: Edit distance between the two subtrees
:rtype: int
"""
if subtree_substitution_cost:
return subtree_substitution_cost
else:
# TODO: the structure of the subtrees should roughly match, assign penalties
# otherwise
# just assign penalties for each key or value or list item that is different
# also count up the items in a not in b and in b not in a and assign penalties
cost = 69

if True: # insertion
inserted_item = None
cost += validate_cost(insertion_cost(inserted_item), 'insertion_cost')
if True: # modification
val1 = None
val2 = None
cost += validate_cost(
val_substitution_cost(val1, val2),
'val_substitution_cost'
)
return cost
173 changes: 173 additions & 0 deletions ERKER2Phenopackets/src/analysis/tree_comparison/structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import uuid
from collections import deque
from typing import Dict, Tuple, Union, Optional, List, Any

from .traverse import traverse


def compare_structure(
d1: Dict, d2: Dict,
d1_id: Optional[Union[int, str]] = uuid.uuid4(),
d2_id: Optional[Union[int, str]] = uuid.uuid4(),
include_vals: bool = False,
construct_diff_tree: bool = True
) -> Union[Tuple[bool, Dict], bool]:
"""Compares if the structure of two dictionaries match.
By structure we mean the keys and the order of the keys.
:param d1: First dictionary
:type d1: Dict
:param d2: Second dictionary
:type d2: Dict
:param d1_id: Identifier for first dictionary, defaults to random UUID
:type d1_id: Optional[Union[int, str]], optional
:param d2_id: Identifier for second dictionary, defaults to random UUID
:type d2_id: Optional[Union[int, str]], optional
:param include_vals: Whether to include values in the comparison, defaults to False
because this method is mainly concerned with the structure of the tree
:type include_vals: bool, optional
:param construct_diff_tree: Whether to construct a difference tree, defaults to True
:type construct_diff_tree: bool, optional
:return: True if structure matches, False and difference dict otherwise
:rtype: Union[Tuple[bool, Dict], bool]
"""
bfs1 = traverse(d1, 'bfs', include_vals=include_vals)
bfs2 = traverse(d2, 'bfs', include_vals=include_vals)

bfs_equals = bfs1 == bfs2

if bfs_equals:
dfs1 = traverse(d1, 'dfs', include_vals=include_vals)
dfs2 = traverse(d2, 'dfs', include_vals=include_vals)

dfs_equals = dfs1 == dfs2

if dfs_equals:
return True, {} if construct_diff_tree else True

if construct_diff_tree:
return False, create_difference_tree(d1, d2, d1_id, d2_id)
else:
return False


def create_difference_tree(d1: Dict, d2: Dict,
d1_id: Optional[Union[int, str]] = uuid.uuid4(),
d2_id: Optional[Union[int, str]] = uuid.uuid4()
) -> Dict:
"""Creates a difference tree for two dictionaries.
:param d1: First dictionary
:type d1: Dict
:param d2: Second dictionary
:type d2: Dict
:param d1_id: Identifier for first dictionary, defaults to random UUID
:type d1_id: Optional[Union[int, str]], optional
:param d2_id: Identifier for second dictionary, defaults to random UUID
:type d2_id: Optional[Union[int, str]], optional
:return: Difference tree
:rtype: Dict
"""
difference_tree = {}
q1 = deque()
q1.append((d1, [None]))

q2 = deque()
q2.append((d2, [None]))

while q1:
n1, key_path1 = q1.popleft()
n2, key_path2 = q2.popleft()

if isinstance(n1, dict) and isinstance(n2, dict):
for k1, v1, k2, v2 in zip(n1.keys(), n1.values(), n2.keys(), n2.values()):
if k1 == k2:
difference_tree = assign_dict_at(
d=difference_tree,
key_path=key_path1,
value={k1: {}}
)

q1.append((v1, key_path1 + [k1]))
q2.append((v2, key_path2 + [k2]))
elif k1 != k2:
difference_tree = assign_dict_at(
d=difference_tree,
key_path=key_path1,
value={
d1_id: {k1: v1},
d2_id: {k2: v2},
}
)

elif (isinstance(n1, list) or isinstance(n1, tuple)) and \
(isinstance(n2, list) or isinstance(n2, tuple)):
if n1 == n2:
difference_tree = assign_dict_at(
d=difference_tree,
key_path=key_path1,
value=n1
)
elif n1 != n2:
difference_tree = assign_dict_at(
d=difference_tree,
key_path=key_path1,
value={
d1_id: n1,
d2_id: n2,
}
)
else:
difference_tree = assign_dict_at(
d=difference_tree,
key_path=key_path1,
value={
d1_id: n1,
d2_id: n2,
}
)

return difference_tree


def assign_dict_at(d: Dict, key_path: List[Union[str, int]], value: Any) -> Dict:
"""
Assigns a value to a dictionary at a given key path.
Example:
>>> d = {'a': {'b': {}}}
>>> assign_dict_at(d, ['a', 'b', 'c'], 2)
{'a': {'b': {'c': 2}}}
:param d: a dictionary
:type d: Dict
:param key_path: a list of keys to traverse the dictionary with to get to the value
:type key_path: List[Union[str, int]]
:param value: the value to assign at the position specified by the key path
:type value: Any
:return: the dictionary with the value assigned at the position specified by the key
path
:rtype: Dict
"""
_d = d
for key in key_path[:-1]:
if key is None:
continue
if key in _d:
_d = _d[key]
else:
_d[key] = {}
_d = _d[key]

if key_path[-1] is not None:
_d[key_path[-1]] = value
elif isinstance(value, dict):
return value
elif isinstance(value, (int, str)):
return {value: {}}
else:
raise ValueError(f'Could not insert {value} at {key_path} in {d}')

return d
Loading

0 comments on commit 09cf8df

Please sign in to comment.