Jelmer Vernooij пре 2 месеци
родитељ
комит
8d9a033c73
1 измењених фајлова са 30 додато и 17 уклоњено
  1. 30 17
      dulwich/graph.py

+ 30 - 17
dulwich/graph.py

@@ -22,9 +22,13 @@
 
 from collections.abc import Iterator
 from heapq import heappop, heappush
-from typing import Generic, Optional, TypeVar
+from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, TypeVar
+
+if TYPE_CHECKING:
+    from .repo import BaseRepo
 
 from .lru_cache import LRUCache
+from .objects import ObjectID
 
 T = TypeVar("T")
 
@@ -52,7 +56,13 @@ class WorkList(Generic[T]):
             yield (-pr, cmt)
 
 
-def _find_lcas(lookup_parents, c1, c2s, lookup_stamp, min_stamp=0):
+def _find_lcas(
+    lookup_parents: Callable[[ObjectID], list[ObjectID]],
+    c1: ObjectID,
+    c2s: list[ObjectID],
+    lookup_stamp: Callable[[ObjectID], int],
+    min_stamp: int = 0,
+) -> list[ObjectID]:
     cands = []
     cstates = {}
 
@@ -62,7 +72,7 @@ def _find_lcas(lookup_parents, c1, c2s, lookup_stamp, min_stamp=0):
     _DNC = 4  # Do Not Consider
     _LCA = 8  # potential LCA (Lowest Common Ancestor)
 
-    def _has_candidates(wlst, cstates) -> bool:
+    def _has_candidates(wlst: WorkList[ObjectID], cstates: dict[ObjectID, int]) -> bool:
         for dt, cmt in wlst.iter():
             if cmt in cstates:
                 if not ((cstates[cmt] & _DNC) == _DNC):
@@ -71,7 +81,7 @@ def _find_lcas(lookup_parents, c1, c2s, lookup_stamp, min_stamp=0):
 
     # initialize the working list states with ancestry info
     # note possibility of c1 being one of c2s should be handled
-    wlst = WorkList()
+    wlst: WorkList[bytes] = WorkList()
     cstates[c1] = _ANC_OF_1
     wlst.add((lookup_stamp(c1), c1))
     for c2 in c2s:
@@ -82,7 +92,10 @@ def _find_lcas(lookup_parents, c1, c2s, lookup_stamp, min_stamp=0):
     # loop while at least one working list commit is still viable (not marked as _DNC)
     # adding any parents to the list in a breadth first manner
     while _has_candidates(wlst, cstates):
-        dt, cmt = wlst.get()
+        result = wlst.get()
+        if result is None:
+            break
+        dt, cmt = result
         # Look only at ANCESTRY and _DNC flags so that already
         # found _LCAs can still be marked _DNC by lower _LCAS
         cflags = cstates[cmt] & (_ANC_OF_1 | _ANC_OF_2 | _DNC)
@@ -120,7 +133,7 @@ def _find_lcas(lookup_parents, c1, c2s, lookup_stamp, min_stamp=0):
 
 
 # actual git sorts these based on commit times
-def find_merge_base(repo, commit_ids):
+def find_merge_base(repo: "BaseRepo", commit_ids: list[ObjectID]) -> list[ObjectID]:
     """Find lowest common ancestors of commit_ids[0] and *any* of commits_ids[1:].
 
     Args:
@@ -129,15 +142,15 @@ def find_merge_base(repo, commit_ids):
     Returns:
       list of lowest common ancestor commit_ids
     """
-    cmtcache = LRUCache(max_cache=128)
+    cmtcache: LRUCache[ObjectID, Any] = LRUCache(max_cache=128)
     parents_provider = repo.parents_provider()
 
-    def lookup_stamp(cmtid):
+    def lookup_stamp(cmtid: ObjectID) -> int:
         if cmtid not in cmtcache:
             cmtcache[cmtid] = repo.object_store[cmtid]
         return cmtcache[cmtid].commit_time
 
-    def lookup_parents(cmtid):
+    def lookup_parents(cmtid: ObjectID) -> list[ObjectID]:
         commit = None
         if cmtid in cmtcache:
             commit = cmtcache[cmtid]
@@ -156,7 +169,7 @@ def find_merge_base(repo, commit_ids):
     return lcas
 
 
-def find_octopus_base(repo, commit_ids):
+def find_octopus_base(repo: "BaseRepo", commit_ids: list[ObjectID]) -> list[ObjectID]:
     """Find lowest common ancestors of *all* provided commit_ids.
 
     Args:
@@ -165,15 +178,15 @@ def find_octopus_base(repo, commit_ids):
     Returns:
       list of lowest common ancestor commit_ids
     """
-    cmtcache = LRUCache(max_cache=128)
+    cmtcache: LRUCache[ObjectID, Any] = LRUCache(max_cache=128)
     parents_provider = repo.parents_provider()
 
-    def lookup_stamp(cmtid):
+    def lookup_stamp(cmtid: ObjectID) -> int:
         if cmtid not in cmtcache:
             cmtcache[cmtid] = repo.object_store[cmtid]
         return cmtcache[cmtid].commit_time
 
-    def lookup_parents(cmtid):
+    def lookup_parents(cmtid: ObjectID) -> list[ObjectID]:
         commit = None
         if cmtid in cmtcache:
             commit = cmtcache[cmtid]
@@ -195,7 +208,7 @@ def find_octopus_base(repo, commit_ids):
     return lcas
 
 
-def can_fast_forward(repo, c1, c2):
+def can_fast_forward(repo: "BaseRepo", c1: bytes, c2: bytes) -> bool:
     """Is it possible to fast-forward from c1 to c2?
 
     Args:
@@ -203,15 +216,15 @@ def can_fast_forward(repo, c1, c2):
       c1: Commit id for first commit
       c2: Commit id for second commit
     """
-    cmtcache = LRUCache(max_cache=128)
+    cmtcache: LRUCache[ObjectID, Any] = LRUCache(max_cache=128)
     parents_provider = repo.parents_provider()
 
-    def lookup_stamp(cmtid):
+    def lookup_stamp(cmtid: ObjectID) -> int:
         if cmtid not in cmtcache:
             cmtcache[cmtid] = repo.object_store[cmtid]
         return cmtcache[cmtid].commit_time
 
-    def lookup_parents(cmtid):
+    def lookup_parents(cmtid: ObjectID) -> list[ObjectID]:
         commit = None
         if cmtid in cmtcache:
             commit = cmtcache[cmtid]