|
|
@@ -22,11 +22,11 @@
|
|
|
"""Utilities for diffing files and trees."""
|
|
|
|
|
|
import stat
|
|
|
-from collections import defaultdict, namedtuple
|
|
|
+from collections import defaultdict
|
|
|
from collections.abc import Iterator
|
|
|
from io import BytesIO
|
|
|
from itertools import chain
|
|
|
-from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar
|
|
|
+from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional, TypeVar
|
|
|
|
|
|
from .object_store import BaseObjectStore
|
|
|
from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
|
|
|
@@ -41,7 +41,7 @@ CHANGE_UNCHANGED = "unchanged"
|
|
|
|
|
|
RENAME_CHANGE_TYPES = (CHANGE_RENAME, CHANGE_COPY)
|
|
|
|
|
|
-_NULL_ENTRY = TreeEntry(None, None, None)
|
|
|
+# _NULL_ENTRY removed - using None instead
|
|
|
|
|
|
_MAX_SCORE = 100
|
|
|
RENAME_THRESHOLD = 60
|
|
|
@@ -49,9 +49,13 @@ MAX_FILES = 200
|
|
|
REWRITE_THRESHOLD: Optional[int] = None
|
|
|
|
|
|
|
|
|
-class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])):
|
|
|
+class TreeChange(NamedTuple):
|
|
|
"""Named tuple a single change between two trees."""
|
|
|
|
|
|
+ type: str
|
|
|
+ old: Optional[TreeEntry]
|
|
|
+ new: Optional[TreeEntry]
|
|
|
+
|
|
|
@classmethod
|
|
|
def add(cls, new: TreeEntry) -> "TreeChange":
|
|
|
"""Create a TreeChange for an added entry.
|
|
|
@@ -62,7 +66,7 @@ class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])):
|
|
|
Returns:
|
|
|
TreeChange instance
|
|
|
"""
|
|
|
- return cls(CHANGE_ADD, _NULL_ENTRY, new)
|
|
|
+ return cls(CHANGE_ADD, None, new)
|
|
|
|
|
|
@classmethod
|
|
|
def delete(cls, old: TreeEntry) -> "TreeChange":
|
|
|
@@ -74,7 +78,7 @@ class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])):
|
|
|
Returns:
|
|
|
TreeChange instance
|
|
|
"""
|
|
|
- return cls(CHANGE_DELETE, old, _NULL_ENTRY)
|
|
|
+ return cls(CHANGE_DELETE, old, None)
|
|
|
|
|
|
|
|
|
def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
|
|
|
@@ -88,7 +92,7 @@ def _tree_entries(path: bytes, tree: Tree) -> list[TreeEntry]:
|
|
|
|
|
|
def _merge_entries(
|
|
|
path: bytes, tree1: Tree, tree2: Tree
|
|
|
-) -> list[tuple[TreeEntry, TreeEntry]]:
|
|
|
+) -> list[tuple[Optional[TreeEntry], Optional[TreeEntry]]]:
|
|
|
"""Merge the entries of two trees.
|
|
|
|
|
|
Args:
|
|
|
@@ -99,8 +103,7 @@ def _merge_entries(
|
|
|
Returns:
|
|
|
A list of pairs of TreeEntry objects for each pair of entries in
|
|
|
the trees. If an entry exists in one tree but not the other, the other
|
|
|
- entry will have all attributes set to None. If neither entry's path is
|
|
|
- None, they are guaranteed to match.
|
|
|
+ entry will be None. If both entries exist, they are guaranteed to match.
|
|
|
"""
|
|
|
entries1 = _tree_entries(path, tree1)
|
|
|
entries2 = _tree_entries(path, tree2)
|
|
|
@@ -108,32 +111,31 @@ def _merge_entries(
|
|
|
len1 = len(entries1)
|
|
|
len2 = len(entries2)
|
|
|
|
|
|
- result = []
|
|
|
+ result: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = []
|
|
|
while i1 < len1 and i2 < len2:
|
|
|
entry1 = entries1[i1]
|
|
|
entry2 = entries2[i2]
|
|
|
if entry1.path < entry2.path:
|
|
|
- result.append((entry1, _NULL_ENTRY))
|
|
|
+ result.append((entry1, None))
|
|
|
i1 += 1
|
|
|
elif entry1.path > entry2.path:
|
|
|
- result.append((_NULL_ENTRY, entry2))
|
|
|
+ result.append((None, entry2))
|
|
|
i2 += 1
|
|
|
else:
|
|
|
result.append((entry1, entry2))
|
|
|
i1 += 1
|
|
|
i2 += 1
|
|
|
for i in range(i1, len1):
|
|
|
- result.append((entries1[i], _NULL_ENTRY))
|
|
|
+ result.append((entries1[i], None))
|
|
|
for i in range(i2, len2):
|
|
|
- result.append((_NULL_ENTRY, entries2[i]))
|
|
|
+ result.append((None, entries2[i]))
|
|
|
return result
|
|
|
|
|
|
|
|
|
-def _is_tree(entry: TreeEntry) -> bool:
|
|
|
- mode = entry.mode
|
|
|
- if mode is None:
|
|
|
+def _is_tree(entry: Optional[TreeEntry]) -> bool:
|
|
|
+ if entry is None:
|
|
|
return False
|
|
|
- return stat.S_ISDIR(mode)
|
|
|
+ return stat.S_ISDIR(entry.mode)
|
|
|
|
|
|
|
|
|
def walk_trees(
|
|
|
@@ -142,7 +144,7 @@ def walk_trees(
|
|
|
tree2_id: Optional[ObjectID],
|
|
|
prune_identical: bool = False,
|
|
|
paths: Optional[list[bytes]] = None,
|
|
|
-) -> Iterator[tuple[TreeEntry, TreeEntry]]:
|
|
|
+) -> Iterator[tuple[Optional[TreeEntry], Optional[TreeEntry]]]:
|
|
|
"""Recursively walk all the entries of two trees.
|
|
|
|
|
|
Iteration is depth-first pre-order, as in e.g. os.walk.
|
|
|
@@ -157,15 +159,14 @@ def walk_trees(
|
|
|
Returns:
|
|
|
Iterator over Pairs of TreeEntry objects for each pair of entries
|
|
|
in the trees and their subtrees recursively. If an entry exists in one
|
|
|
- tree but not the other, the other entry will have all attributes set
|
|
|
- to None. If neither entry's path is None, they are guaranteed to
|
|
|
- match.
|
|
|
+ tree but not the other, the other entry will be None. If both entries
|
|
|
+ exist, they are guaranteed to match.
|
|
|
"""
|
|
|
# This could be fairly easily generalized to >2 trees if we find a use
|
|
|
# case.
|
|
|
- mode1 = (tree1_id and stat.S_IFDIR) or None
|
|
|
- mode2 = (tree2_id and stat.S_IFDIR) or None
|
|
|
- todo = [(TreeEntry(b"", mode1, tree1_id), TreeEntry(b"", mode2, tree2_id))]
|
|
|
+ entry1 = TreeEntry(b"", stat.S_IFDIR, tree1_id) if tree1_id else None
|
|
|
+ entry2 = TreeEntry(b"", stat.S_IFDIR, tree2_id) if tree2_id else None
|
|
|
+ todo: list[tuple[Optional[TreeEntry], Optional[TreeEntry]]] = [(entry1, entry2)]
|
|
|
while todo:
|
|
|
entry1, entry2 = todo.pop()
|
|
|
is_tree1 = _is_tree(entry1)
|
|
|
@@ -173,9 +174,13 @@ def walk_trees(
|
|
|
if prune_identical and is_tree1 and is_tree2 and entry1 == entry2:
|
|
|
continue
|
|
|
|
|
|
- tree1 = (is_tree1 and store[entry1.sha]) or None
|
|
|
- tree2 = (is_tree2 and store[entry2.sha]) or None
|
|
|
- path = entry1.path or entry2.path
|
|
|
+ tree1 = (is_tree1 and entry1 and store[entry1.sha]) or None
|
|
|
+ tree2 = (is_tree2 and entry2 and store[entry2.sha]) or None
|
|
|
+ path = (
|
|
|
+ (entry1.path if entry1 else None)
|
|
|
+ or (entry2.path if entry2 else None)
|
|
|
+ or b""
|
|
|
+ )
|
|
|
|
|
|
# If we have path filters, check if we should process this tree
|
|
|
if paths is not None and (is_tree1 or is_tree2):
|
|
|
@@ -236,9 +241,9 @@ def walk_trees(
|
|
|
break
|
|
|
|
|
|
|
|
|
-def _skip_tree(entry: TreeEntry, include_trees: bool) -> TreeEntry:
|
|
|
- if entry.mode is None or (not include_trees and stat.S_ISDIR(entry.mode)):
|
|
|
- return _NULL_ENTRY
|
|
|
+def _skip_tree(entry: Optional[TreeEntry], include_trees: bool) -> Optional[TreeEntry]:
|
|
|
+ if entry is None or (not include_trees and stat.S_ISDIR(entry.mode)):
|
|
|
+ return None
|
|
|
return entry
|
|
|
|
|
|
|
|
|
@@ -290,22 +295,22 @@ def tree_changes(
|
|
|
entry1 = _skip_tree(entry1, include_trees)
|
|
|
entry2 = _skip_tree(entry2, include_trees)
|
|
|
|
|
|
- if entry1 != _NULL_ENTRY and entry2 != _NULL_ENTRY:
|
|
|
+ if entry1 is not None and entry2 is not None:
|
|
|
if (
|
|
|
stat.S_IFMT(entry1.mode) != stat.S_IFMT(entry2.mode)
|
|
|
and not change_type_same
|
|
|
):
|
|
|
# File type changed: report as delete/add.
|
|
|
yield TreeChange.delete(entry1)
|
|
|
- entry1 = _NULL_ENTRY
|
|
|
+ entry1 = None
|
|
|
change_type = CHANGE_ADD
|
|
|
elif entry1 == entry2:
|
|
|
change_type = CHANGE_UNCHANGED
|
|
|
else:
|
|
|
change_type = CHANGE_MODIFY
|
|
|
- elif entry1 != _NULL_ENTRY:
|
|
|
+ elif entry1 is not None:
|
|
|
change_type = CHANGE_DELETE
|
|
|
- elif entry2 != _NULL_ENTRY:
|
|
|
+ elif entry2 is not None:
|
|
|
change_type = CHANGE_ADD
|
|
|
else:
|
|
|
# Both were None because at least one was a tree.
|
|
|
@@ -359,7 +364,7 @@ def tree_changes_for_merge(
|
|
|
for t in parent_tree_ids
|
|
|
]
|
|
|
num_parents = len(parent_tree_ids)
|
|
|
- changes_by_path: dict[str, list[Optional[TreeChange]]] = defaultdict(
|
|
|
+ changes_by_path: dict[bytes, list[Optional[TreeChange]]] = defaultdict(
|
|
|
lambda: [None] * num_parents
|
|
|
)
|
|
|
|
|
|
@@ -367,13 +372,15 @@ def tree_changes_for_merge(
|
|
|
for i, parent_changes in enumerate(all_parent_changes):
|
|
|
for change in parent_changes:
|
|
|
if change.type == CHANGE_DELETE:
|
|
|
+ assert change.old is not None
|
|
|
path = change.old.path
|
|
|
else:
|
|
|
+ assert change.new is not None
|
|
|
path = change.new.path
|
|
|
changes_by_path[path][i] = change
|
|
|
|
|
|
def old_sha(c: TreeChange) -> Optional[ObjectID]:
|
|
|
- return c.old.sha
|
|
|
+ return c.old.sha if c.old is not None else None
|
|
|
|
|
|
def change_type(c: TreeChange) -> str:
|
|
|
return c.type
|
|
|
@@ -490,12 +497,13 @@ def _similarity_score(
|
|
|
|
|
|
def _tree_change_key(entry: TreeChange) -> tuple[bytes, bytes]:
|
|
|
# Sort by old path then new path. If only one exists, use it for both keys.
|
|
|
- path1 = entry.old.path
|
|
|
- path2 = entry.new.path
|
|
|
+ path1 = entry.old.path if entry.old is not None else None
|
|
|
+ path2 = entry.new.path if entry.new is not None else None
|
|
|
if path1 is None:
|
|
|
path1 = path2
|
|
|
if path2 is None:
|
|
|
path2 = path1
|
|
|
+ assert path1 is not None and path2 is not None
|
|
|
return (path1, path2)
|
|
|
|
|
|
|
|
|
@@ -545,11 +553,10 @@ class RenameDetector:
|
|
|
self._changes = []
|
|
|
|
|
|
def _should_split(self, change: TreeChange) -> bool:
|
|
|
- if (
|
|
|
- self._rewrite_threshold is None
|
|
|
- or change.type != CHANGE_MODIFY
|
|
|
- or change.old.sha == change.new.sha
|
|
|
- ):
|
|
|
+ if self._rewrite_threshold is None or change.type != CHANGE_MODIFY:
|
|
|
+ return False
|
|
|
+ assert change.old is not None and change.new is not None
|
|
|
+ if change.old.sha == change.new.sha:
|
|
|
return False
|
|
|
old_obj = self._store[change.old.sha]
|
|
|
new_obj = self._store[change.new.sha]
|
|
|
@@ -561,6 +568,7 @@ class RenameDetector:
|
|
|
elif change.type == CHANGE_DELETE:
|
|
|
self._deletes.append(change)
|
|
|
elif self._should_split(change):
|
|
|
+ assert change.old is not None and change.new is not None
|
|
|
self._deletes.append(TreeChange.delete(change.old))
|
|
|
self._adds.append(TreeChange.add(change.new))
|
|
|
elif (
|
|
|
@@ -588,18 +596,28 @@ class RenameDetector:
|
|
|
self._add_change(change)
|
|
|
|
|
|
def _prune(self, add_paths: set[bytes], delete_paths: set[bytes]) -> None:
|
|
|
- self._adds = [a for a in self._adds if a.new.path not in add_paths]
|
|
|
- self._deletes = [d for d in self._deletes if d.old.path not in delete_paths]
|
|
|
+ def check_add(a: TreeChange) -> bool:
|
|
|
+ assert a.new is not None
|
|
|
+ return a.new.path not in add_paths
|
|
|
+
|
|
|
+ def check_delete(d: TreeChange) -> bool:
|
|
|
+ assert d.old is not None
|
|
|
+ return d.old.path not in delete_paths
|
|
|
+
|
|
|
+ self._adds = [a for a in self._adds if check_add(a)]
|
|
|
+ self._deletes = [d for d in self._deletes if check_delete(d)]
|
|
|
|
|
|
def _find_exact_renames(self) -> None:
|
|
|
add_map = defaultdict(list)
|
|
|
for add in self._adds:
|
|
|
+ assert add.new is not None
|
|
|
add_map[add.new.sha].append(add.new)
|
|
|
delete_map = defaultdict(list)
|
|
|
for delete in self._deletes:
|
|
|
# Keep track of whether the delete was actually marked as a delete.
|
|
|
# If not, it needs to be marked as a copy.
|
|
|
is_delete = delete.type == CHANGE_DELETE
|
|
|
+ assert delete.old is not None
|
|
|
delete_map[delete.old.sha].append((delete.old, is_delete))
|
|
|
|
|
|
add_paths = set()
|
|
|
@@ -632,6 +650,7 @@ class RenameDetector:
|
|
|
def _rename_type(
|
|
|
self, check_paths: bool, delete: TreeChange, add: TreeChange
|
|
|
) -> str:
|
|
|
+ assert delete.old is not None and add.new is not None
|
|
|
if check_paths and delete.old.path == add.new.path:
|
|
|
# If the paths match, this must be a split modify, so make sure it
|
|
|
# comes out as a modify.
|
|
|
@@ -657,12 +676,14 @@ class RenameDetector:
|
|
|
block_cache = {}
|
|
|
check_paths = self._rename_threshold is not None
|
|
|
for delete in self._deletes:
|
|
|
+ assert delete.old is not None
|
|
|
if S_ISGITLINK(delete.old.mode):
|
|
|
continue # Git links don't exist in this repo.
|
|
|
old_sha = delete.old.sha
|
|
|
old_obj = self._store[old_sha]
|
|
|
block_cache[old_sha] = _count_blocks(old_obj)
|
|
|
for add in self._adds:
|
|
|
+ assert add.new is not None
|
|
|
if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
|
|
|
continue
|
|
|
new_obj = self._store[add.new.sha]
|
|
|
@@ -680,6 +701,7 @@ class RenameDetector:
|
|
|
delete_paths = set()
|
|
|
add_paths = set()
|
|
|
for _, change in self._candidates:
|
|
|
+ assert change.old is not None and change.new is not None
|
|
|
new_path = change.new.path
|
|
|
if new_path in add_paths:
|
|
|
continue
|
|
|
@@ -701,17 +723,29 @@ class RenameDetector:
|
|
|
return
|
|
|
|
|
|
modifies = {}
|
|
|
- delete_map = {d.old.path: d for d in self._deletes}
|
|
|
+ delete_map = {}
|
|
|
+ for d in self._deletes:
|
|
|
+ assert d.old is not None
|
|
|
+ delete_map[d.old.path] = d
|
|
|
for add in self._adds:
|
|
|
+ assert add.new is not None
|
|
|
path = add.new.path
|
|
|
delete = delete_map.get(path)
|
|
|
- if delete is not None and stat.S_IFMT(delete.old.mode) == stat.S_IFMT(
|
|
|
- add.new.mode
|
|
|
- ):
|
|
|
- modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
|
|
|
+ if delete is not None:
|
|
|
+ assert delete.old is not None
|
|
|
+ if stat.S_IFMT(delete.old.mode) == stat.S_IFMT(add.new.mode):
|
|
|
+ modifies[path] = TreeChange(CHANGE_MODIFY, delete.old, add.new)
|
|
|
+
|
|
|
+ def check_add_mod(a: TreeChange) -> bool:
|
|
|
+ assert a.new is not None
|
|
|
+ return a.new.path not in modifies
|
|
|
+
|
|
|
+ def check_delete_mod(d: TreeChange) -> bool:
|
|
|
+ assert d.old is not None
|
|
|
+ return d.old.path not in modifies
|
|
|
|
|
|
- self._adds = [a for a in self._adds if a.new.path not in modifies]
|
|
|
- self._deletes = [a for a in self._deletes if a.new.path not in modifies]
|
|
|
+ self._adds = [a for a in self._adds if check_add_mod(a)]
|
|
|
+ self._deletes = [d for d in self._deletes if check_delete_mod(d)]
|
|
|
self._changes += modifies.values()
|
|
|
|
|
|
def _sorted_changes(self) -> list[TreeChange]:
|