Browse Source

Add more typing for pack.

Jelmer Vernooij 2 years ago
parent
commit
90b10d6dd5
1 changed files with 25 additions and 20 deletions
  1. 25 20
      dulwich/pack.py

+ 25 - 20
dulwich/pack.py

@@ -468,6 +468,9 @@ class PackIndex:
     def close(self):
         pass
 
+    def check(self) -> None:
+        pass
+
 
 class MemoryPackIndex(PackIndex):
     """Pack index that is stored entirely in memory."""
@@ -1293,6 +1296,14 @@ class PackData:
         if actual != stored:
             raise ChecksumMismatch(stored, actual)
 
+    def get_unpacked_object_at(self, offset: int, *, include_comp: bool = False) -> UnpackedObject:
+        """Given offset in the packfile return a UnpackedObject.
+        """
+        assert offset >= self._header_size
+        self._file.seek(offset)
+        unpacked, _ = unpack_object(self._file.read, include_comp=include_comp)
+        return unpacked
+
     def get_compressed_data_at(self, offset):
         """Given offset in the packfile return compressed data that is there.
 
@@ -1300,32 +1311,28 @@ class PackData:
         and then the packfile can be asked directly for that object using this
         function.
         """
-        assert offset >= self._header_size
-        self._file.seek(offset)
-        unpacked, _ = unpack_object(self._file.read, include_comp=True)
+        unpacked = self.get_unpacked_object_at(offset, include_comp=True)
         return (
             unpacked.pack_type_num,
             unpacked.delta_base,
             unpacked.comp_chunks,
         )
 
-    def get_decompressed_data_at(self, offset):
+    def get_decompressed_data_at(self, offset: int) -> Tuple[int, Optional[bytes], List[bytes]]:
         """Given an offset in the packfile, decompress the data that is there.
 
         Using the associated index the location of an object can be looked up,
         and then the packfile can be asked directly for that object using this
         function.
         """
-        assert offset >= self._header_size
-        self._file.seek(offset)
-        unpacked, _ = unpack_object(self._file.read, include_comp=False)
+        unpacked = self.get_unpacked_object_at(offset, include_comp=False)
         return (
             unpacked.pack_type_num,
             unpacked.delta_base,
             unpacked.decomp_chunks,
         )
 
-    def get_object_at(self, offset):
+    def get_object_at(self, offset: int):
         """Given an offset in to the packfile return the object that is there.
 
         Using the associated index the location of an object can be looked up,
@@ -1336,9 +1343,7 @@ class PackData:
             return self._offset_cache[offset]
         except KeyError:
             pass
-        assert offset >= self._header_size
-        self._file.seek(offset)
-        unpacked, _ = unpack_object(self._file.read)
+        unpacked = self.get_unpacked_object_at(offset, include_comp=False)
         return (unpacked.pack_type_num, unpacked._obj())
 
 
@@ -2221,7 +2226,7 @@ class Pack:
         """Iterate over all the sha1s of the objects in this pack."""
         return iter(self.index)
 
-    def check_length_and_checksum(self):
+    def check_length_and_checksum(self) -> None:
         """Sanity check the length and checksum of the pack index and data."""
         assert len(self.index) == len(self.data)
         idx_stored_checksum = self.index.get_pack_checksum()
@@ -2232,7 +2237,7 @@ class Pack:
                 sha_to_hex(data_stored_checksum),
             )
 
-    def check(self):
+    def check(self) -> None:
         """Check the integrity of this pack.
 
         Raises:
@@ -2244,10 +2249,10 @@ class Pack:
             obj.check()
         # TODO: object connectivity checks
 
-    def get_stored_checksum(self):
+    def get_stored_checksum(self) -> bytes:
         return self.data.get_stored_checksum()
 
-    def __contains__(self, sha1):
+    def __contains__(self, sha1: bytes) -> bool:
         """Check whether this pack contains a particular SHA1."""
         try:
             self.index.object_index(sha1)
@@ -2255,7 +2260,7 @@ class Pack:
         except KeyError:
             return False
 
-    def get_raw_unresolved(self, sha1):
+    def get_raw_unresolved(self, sha1: bytes) -> Tuple[int, Optional[bytes], List[bytes]]:
         """Get raw unresolved data for a SHA.
 
         Args:
@@ -2270,13 +2275,13 @@ class Pack:
             obj_type = REF_DELTA
         return (obj_type, delta_base, chunks)
 
-    def get_raw(self, sha1):
+    def get_raw(self, sha1: bytes) -> Tuple[int, bytes]:
         offset = self.index.object_index(sha1)
         obj_type, obj = self.data.get_object_at(offset)
         type_num, chunks = self.resolve_object(offset, obj_type, obj)
         return type_num, b"".join(chunks)
 
-    def __getitem__(self, sha1):
+    def __getitem__(self, sha1: bytes) -> bytes:
         """Retrieve the specified SHA1."""
         type, uncomp = self.get_raw(sha1)
         return ShaFile.from_raw_string(type, uncomp, sha=sha1)
@@ -2311,7 +2316,7 @@ class Pack:
                 keepfile.write(b"\n")
         return keepfile_name
 
-    def get_ref(self, sha) -> Tuple[int, int, UnpackedObject]:
+    def get_ref(self, sha: bytes) -> Tuple[int, int, UnpackedObject]:
         """Get the object for a ref SHA, only looking in this pack."""
         # TODO: cache these results
         try:
@@ -2326,7 +2331,7 @@ class Pack:
             raise KeyError(sha)
         return offset, type, obj
 
-    def resolve_object(self, offset, type, obj, get_ref=None):
+    def resolve_object(self, offset: int, type: int, obj, get_ref=None) -> Tuple[int, Iterable[bytes]]:
         """Resolve an object, possibly resolving deltas when necessary.
 
         Returns: Tuple with object type and contents.