Browse Source

Add more typing.

Jelmer Vernooij 2 years ago
parent
commit
450d6b364c
4 changed files with 54 additions and 48 deletions
  1. 0 1
      dulwich/index.py
  2. 2 1
      dulwich/object_store.py
  3. 51 45
      dulwich/pack.py
  4. 1 1
      dulwich/server.py

+ 0 - 1
dulwich/index.py

@@ -32,7 +32,6 @@ from typing import (
     Dict,
     Dict,
     List,
     List,
     Optional,
     Optional,
-    TYPE_CHECKING,
     Iterable,
     Iterable,
     Iterator,
     Iterator,
     Tuple,
     Tuple,

+ 2 - 1
dulwich/object_store.py

@@ -563,7 +563,7 @@ class PackBasedObjectStore(BaseObjectStore):
                 pass
                 pass
         raise KeyError(hexsha)
         raise KeyError(hexsha)
 
 
-    def get_raw_unresolved(self, sha1: bytes) -> Tuple[int, Union[bytes, None], List[bytes]]:
+    def get_raw_unresolved(self, name: bytes) -> Tuple[int, Union[bytes, None], List[bytes]]:
         """Obtain the unresolved data for an object.
         """Obtain the unresolved data for an object.
 
 
         Args:
         Args:
@@ -1198,6 +1198,7 @@ def _collect_filetree_revs(obj_store: ObjectContainer, tree_sha: ObjectID, kset:
       kset: set to fill with references to files and directories
       kset: set to fill with references to files and directories
     """
     """
     filetree = obj_store[tree_sha]
     filetree = obj_store[tree_sha]
+    assert isinstance(filetree, Tree)
     for name, mode, sha in filetree.iteritems():
     for name, mode, sha in filetree.iteritems():
         if not S_ISGITLINK(mode) and sha not in kset:
         if not S_ISGITLINK(mode) and sha not in kset:
             kset.add(sha)
             kset.add(sha)

+ 51 - 45
dulwich/pack.py

@@ -49,7 +49,7 @@ from itertools import chain
 
 
 import os
 import os
 import sys
 import sys
-from typing import Optional, Callable, Tuple, List, Deque, Union, Protocol, Iterable
+from typing import Optional, Callable, Tuple, List, Deque, Union, Protocol, Iterable, Iterator
 import warnings
 import warnings
 
 
 from hashlib import sha1
 from hashlib import sha1
@@ -102,7 +102,7 @@ class ObjectContainer(Protocol):
         """Add a single object to this object store."""
         """Add a single object to this object store."""
 
 
     def add_objects(
     def add_objects(
-            self, objects: Iterable[tuple[ShaFile, Optional[str]]],
+            self, objects: Iterable[Tuple[ShaFile, Optional[str]]],
             progress: Optional[Callable[[str], None]] = None) -> None:
             progress: Optional[Callable[[str], None]] = None) -> None:
         """Add a set of objects to this object store.
         """Add a set of objects to this object store.
 
 
@@ -541,7 +541,7 @@ class FilePackIndex(PackIndex):
             self._contents, self._size = (contents, size)
             self._contents, self._size = (contents, size)
 
 
     @property
     @property
-    def path(self):
+    def path(self) -> str:
         return self._filename
         return self._filename
 
 
     def __eq__(self, other):
     def __eq__(self, other):
@@ -554,16 +554,16 @@ class FilePackIndex(PackIndex):
 
 
         return super().__eq__(other)
         return super().__eq__(other)
 
 
-    def close(self):
+    def close(self) -> None:
         self._file.close()
         self._file.close()
         if getattr(self._contents, "close", None) is not None:
         if getattr(self._contents, "close", None) is not None:
             self._contents.close()
             self._contents.close()
 
 
-    def __len__(self):
+    def __len__(self) -> int:
         """Return the number of entries in this pack index."""
         """Return the number of entries in this pack index."""
         return self._fan_out_table[-1]
         return self._fan_out_table[-1]
 
 
-    def _unpack_entry(self, i):
+    def _unpack_entry(self, i: int) -> Tuple[bytes, int, Optional[int]]:
         """Unpack the i-th entry in the index file.
         """Unpack the i-th entry in the index file.
 
 
         Returns: Tuple with object name (SHA), offset in pack file and CRC32
         Returns: Tuple with object name (SHA), offset in pack file and CRC32
@@ -583,11 +583,11 @@ class FilePackIndex(PackIndex):
         """Unpack the crc32 checksum for the ith object from the index file."""
         """Unpack the crc32 checksum for the ith object from the index file."""
         raise NotImplementedError(self._unpack_crc32_checksum)
         raise NotImplementedError(self._unpack_crc32_checksum)
 
 
-    def _itersha(self):
+    def _itersha(self) -> Iterator[bytes]:
         for i in range(len(self)):
         for i in range(len(self)):
             yield self._unpack_name(i)
             yield self._unpack_name(i)
 
 
-    def iterentries(self):
+    def iterentries(self) -> Iterator[Tuple[bytes, int, Optional[int]]]:
         """Iterate over the entries in this pack index.
         """Iterate over the entries in this pack index.
 
 
         Returns: iterator over tuples with object name, offset in packfile and
         Returns: iterator over tuples with object name, offset in packfile and
@@ -596,7 +596,7 @@ class FilePackIndex(PackIndex):
         for i in range(len(self)):
         for i in range(len(self)):
             yield self._unpack_entry(i)
             yield self._unpack_entry(i)
 
 
-    def _read_fan_out_table(self, start_offset):
+    def _read_fan_out_table(self, start_offset: int):
         ret = []
         ret = []
         for i in range(0x100):
         for i in range(0x100):
             fanout_entry = self._contents[
             fanout_entry = self._contents[
@@ -605,35 +605,35 @@ class FilePackIndex(PackIndex):
             ret.append(struct.unpack(">L", fanout_entry)[0])
             ret.append(struct.unpack(">L", fanout_entry)[0])
         return ret
         return ret
 
 
-    def check(self):
+    def check(self) -> None:
         """Check that the stored checksum matches the actual checksum."""
         """Check that the stored checksum matches the actual checksum."""
         actual = self.calculate_checksum()
         actual = self.calculate_checksum()
         stored = self.get_stored_checksum()
         stored = self.get_stored_checksum()
         if actual != stored:
         if actual != stored:
             raise ChecksumMismatch(stored, actual)
             raise ChecksumMismatch(stored, actual)
 
 
-    def calculate_checksum(self):
+    def calculate_checksum(self) -> bytes:
         """Calculate the SHA1 checksum over this pack index.
         """Calculate the SHA1 checksum over this pack index.
 
 
         Returns: This is a 20-byte binary digest
         Returns: This is a 20-byte binary digest
         """
         """
         return sha1(self._contents[:-20]).digest()
         return sha1(self._contents[:-20]).digest()
 
 
-    def get_pack_checksum(self):
+    def get_pack_checksum(self) -> bytes:
         """Return the SHA1 checksum stored for the corresponding packfile.
         """Return the SHA1 checksum stored for the corresponding packfile.
 
 
         Returns: 20-byte binary digest
         Returns: 20-byte binary digest
         """
         """
         return bytes(self._contents[-40:-20])
         return bytes(self._contents[-40:-20])
 
 
-    def get_stored_checksum(self):
+    def get_stored_checksum(self) -> bytes:
         """Return the SHA1 checksum stored for this index.
         """Return the SHA1 checksum stored for this index.
 
 
         Returns: 20-byte binary digest
         Returns: 20-byte binary digest
         """
         """
         return bytes(self._contents[-20:])
         return bytes(self._contents[-20:])
 
 
-    def object_index(self, sha):
+    def object_index(self, sha: bytes) -> int:
         """Return the index in to the corresponding packfile for the object.
         """Return the index in to the corresponding packfile for the object.
 
 
         Given the name of an object it will return the offset that object
         Given the name of an object it will return the offset that object
@@ -672,7 +672,7 @@ class FilePackIndex(PackIndex):
 class PackIndex1(FilePackIndex):
 class PackIndex1(FilePackIndex):
     """Version 1 Pack Index file."""
     """Version 1 Pack Index file."""
 
 
-    def __init__(self, filename, file=None, contents=None, size=None):
+    def __init__(self, filename: str, file=None, contents=None, size=None):
         super().__init__(filename, file, contents, size)
         super().__init__(filename, file, contents, size)
         self.version = 1
         self.version = 1
         self._fan_out_table = self._read_fan_out_table(0)
         self._fan_out_table = self._read_fan_out_table(0)
@@ -697,7 +697,7 @@ class PackIndex1(FilePackIndex):
 class PackIndex2(FilePackIndex):
 class PackIndex2(FilePackIndex):
     """Version 2 Pack Index file."""
     """Version 2 Pack Index file."""
 
 
-    def __init__(self, filename, file=None, contents=None, size=None):
+    def __init__(self, filename: str, file=None, contents=None, size=None):
         super().__init__(filename, file, contents, size)
         super().__init__(filename, file, contents, size)
         if self._contents[:4] != b"\377tOc":
         if self._contents[:4] != b"\377tOc":
             raise AssertionError("Not a v2 pack index file")
             raise AssertionError("Not a v2 pack index file")
@@ -735,7 +735,7 @@ class PackIndex2(FilePackIndex):
         return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
         return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
 
 
 
 
-def read_pack_header(read):
+def read_pack_header(read) -> Tuple[Optional[int], Optional[int]]:
     """Read the header of a pack file.
     """Read the header of a pack file.
 
 
     Args:
     Args:
@@ -755,7 +755,7 @@ def read_pack_header(read):
     return (version, num_objects)
     return (version, num_objects)
 
 
 
 
-def chunks_length(chunks):
+def chunks_length(chunks: Union[bytes, Iterable[bytes]]) -> int:
     if isinstance(chunks, bytes):
     if isinstance(chunks, bytes):
         return len(chunks)
         return len(chunks)
     else:
     else:
@@ -768,7 +768,7 @@ def unpack_object(
     compute_crc32=False,
     compute_crc32=False,
     include_comp=False,
     include_comp=False,
     zlib_bufsize=_ZLIB_BUFSIZE,
     zlib_bufsize=_ZLIB_BUFSIZE,
-):
+) -> Tuple[UnpackedObject, bytes]:
     """Unpack a Git object.
     """Unpack a Git object.
 
 
     Args:
     Args:
@@ -1624,14 +1624,13 @@ def write_pack_object(write, type, object, sha=None, compression_level=-1):
 
 
 
 
 def write_pack(
 def write_pack(
-    filename,
-    objects,
-    *,
-    deltify: Optional[bool] = None,
-    delta_window_size: Optional[int] = None,
-    compression_level: int = -1,
-    reuse_pack: Optional[PackedObjectContainer] = None):
-):
+        filename,
+        objects,
+        *,
+        deltify: Optional[bool] = None,
+        delta_window_size: Optional[int] = None,
+        compression_level: int = -1,
+        reuse_pack: Optional[PackedObjectContainer] = None):
     """Write a new pack data file.
     """Write a new pack data file.
 
 
     Args:
     Args:
@@ -1746,7 +1745,11 @@ def deltify_pack_objects(
             possible_bases.pop()
             possible_bases.pop()
 
 
 
 
-def pack_objects_to_data(objects):
+def pack_objects_to_data(
+        objects,
+        delta_window_size: Optional[int] = None,
+        deltify: Optional[bool] = None,
+        reuse_pack: Optional[PackedObjectContainer] = None):
     """Create pack data from objects
     """Create pack data from objects
 
 
     Args:
     Args:
@@ -1754,13 +1757,22 @@ def pack_objects_to_data(objects):
     Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
     Returns: Tuples with (type_num, hexdigest, delta base, object chunks)
     """
     """
     count = len(objects)
     count = len(objects)
-    return (
-        count,
-        (
-            (o.type_num, o.sha().digest(), None, o.as_raw_chunks())
-            for (o, path) in objects
-        ),
-    )
+    if deltify is None:
+        # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
+        # slow at the moment.
+        deltify = False
+    if deltify:
+        pack_contents = deltify_pack_objects(
+            objects, window_size=delta_window_size, reuse_pack=reuse_pack)
+        return (count, pack_contents)
+    else:
+        return (
+            count,
+            (
+                (o.type_num, o.sha().digest(), None, o.as_raw_chunks())
+                for (o, path) in objects
+            ),
+        )
 
 
 
 
 def write_pack_objects(
 def write_pack_objects(
@@ -1789,16 +1801,10 @@ def write_pack_objects(
             DeprecationWarning, stacklevel=2)
             DeprecationWarning, stacklevel=2)
         write = write.write
         write = write.write
 
 
-    if deltify is None:
-        # PERFORMANCE/TODO(jelmer): This should be enabled but is *much* too
-        # slow at the moment.
-        deltify = False
-    if deltify:
-        pack_contents = deltify_pack_objects(
-            objects, window_size=delta_window_size, reuse_pack=reuse_pack)
-        pack_contents_count = len(objects)
-    else:
-        pack_contents_count, pack_contents = pack_objects_to_data(objects)
+    pack_contents_count, pack_contents = pack_objects_to_data(
+        objects, delta_window_size=delta_window_size,
+        deltify=deltify,
+        reuse_pack=reuse_pack)
 
 
     return write_pack_data(
     return write_pack_data(
         write,
         write,

+ 1 - 1
dulwich/server.py

@@ -472,7 +472,7 @@ def _find_shallow(store: ObjectContainer, heads, depth):
         considered shallow and unshallow according to the arguments. Note that
         considered shallow and unshallow according to the arguments. Note that
         these sets may overlap if a commit is reachable along multiple paths.
         these sets may overlap if a commit is reachable along multiple paths.
     """
     """
-    parents: Dict[bytes, List[bytes]]  = {}
+    parents: Dict[bytes, List[bytes]] = {}
 
 
     def get_parents(sha):
     def get_parents(sha):
         result = parents.get(sha, None)
         result = parents.get(sha, None)