Browse Source

Add more typing checking for pack.

Jelmer Vernooij 2 years ago
parent
commit
6db10810ae
1 changed files with 13 additions and 14 deletions
  1. 13 14
      dulwich/pack.py

+ 13 - 14
dulwich/pack.py

@@ -49,7 +49,7 @@ from itertools import chain
 
 import os
 import sys
-from typing import Optional, Callable, Tuple, List, Deque, Union, Protocol, Iterable, Iterator
+from typing import Optional, Callable, Tuple, List, Deque, Union, Protocol, Iterable, Iterator, Dict
 import warnings
 
 from hashlib import sha1
@@ -1371,24 +1371,23 @@ class DeltaChainIterator:
     _compute_crc32 = False
     _include_comp = False
 
-    def __init__(self, file_obj, resolve_ext_ref=None):
+    def __init__(self, file_obj, *, resolve_ext_ref=None) -> None:
         self._file = file_obj
         self._resolve_ext_ref = resolve_ext_ref
-        self._pending_ofs = defaultdict(list)
-        self._pending_ref = defaultdict(list)
-        self._full_ofs = []
-        self._shas = {}
-        self._ext_refs = []
+        self._pending_ofs: Dict[int, List[int]] = defaultdict(list)
+        self._pending_ref: Dict[bytes, List[int]] = defaultdict(list)
+        self._full_ofs: List[Tuple[int, int]] = []
+        self._ext_refs: List[bytes] = []
 
     @classmethod
-    def for_pack_data(cls, pack_data, resolve_ext_ref=None):
+    def for_pack_data(cls, pack_data: PackData, resolve_ext_ref=None):
         walker = cls(None, resolve_ext_ref=resolve_ext_ref)
         walker.set_pack_data(pack_data)
         for unpacked in pack_data._iter_unpacked():
             walker.record(unpacked)
         return walker
 
-    def record(self, unpacked):
+    def record(self, unpacked: UnpackedObject) -> None:
         type_num = unpacked.pack_type_num
         offset = unpacked.offset
         if type_num == OFS_DELTA:
@@ -1399,7 +1398,7 @@ class DeltaChainIterator:
         else:
             self._full_ofs.append((offset, type_num))
 
-    def set_pack_data(self, pack_data):
+    def set_pack_data(self, pack_data: PackData) -> None:
         self._file = pack_data._file
 
     def _walk_all_chains(self):
@@ -1408,7 +1407,7 @@ class DeltaChainIterator:
         yield from self._walk_ref_chains()
         assert not self._pending_ofs
 
-    def _ensure_no_pending(self):
+    def _ensure_no_pending(self) -> None:
         if self._pending_ref:
             raise KeyError([sha_to_hex(s) for s in self._pending_ref])
 
@@ -1437,7 +1436,7 @@ class DeltaChainIterator:
     def _result(self, unpacked):
         return unpacked
 
-    def _resolve_object(self, offset, obj_type_num, base_chunks):
+    def _resolve_object(self, offset: int, obj_type_num: int, base_chunks: List[bytes]) -> UnpackedObject:
         self._file.seek(offset)
         unpacked, _ = unpack_object(
             self._file.read,
@@ -1453,7 +1452,7 @@ class DeltaChainIterator:
             unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks)
         return unpacked
 
-    def _follow_chain(self, offset, obj_type_num, base_chunks):
+    def _follow_chain(self, offset: int, obj_type_num: int, base_chunks: List[bytes]):
         # Unlike PackData.get_object_at, there is no need to cache offsets as
         # this approach by design inflates each object exactly once.
         todo = [(offset, obj_type_num, base_chunks)]
@@ -1467,7 +1466,7 @@ class DeltaChainIterator:
                 self._pending_ref.pop(unpacked.sha(), []),
             )
             todo.extend(
-                (new_offset, unpacked.obj_type_num, unpacked.obj_chunks)
+                (new_offset, unpacked.obj_type_num, unpacked.obj_chunks)  # type: ignore
                 for new_offset in unblocked
             )