Procházet zdrojové kódy

Fix remaining mypy type errors and ruff linting issues

Jelmer Vernooij před 5 měsíci
rodič
revize
d164c9ce50
7 změnil soubory, kde provedl 120 přidání a 39 odebrání
  1. 3 3
      dulwich/cli.py
  2. 1 1
      dulwich/contrib/swift.py
  3. 76 32
      dulwich/pack.py
  4. 10 3
      dulwich/porcelain.py
  5. 4 0
      dulwich/rebase.py
  6. 15 0
      dulwich/refs.py
  7. 11 0
      dulwich/web.py

+ 3 - 3
dulwich/cli.py

@@ -857,7 +857,7 @@ class cmd_dump_pack(Command):
 
 
         basename, _ = os.path.splitext(args.filename)
         basename, _ = os.path.splitext(args.filename)
         x = Pack(basename)
         x = Pack(basename)
-        print(f"Object names checksum: {x.name()}")
+        print(f"Object names checksum: {x.name().decode('ascii', 'replace')}")
         print(f"Checksum: {sha_to_hex(x.get_stored_checksum())!r}")
         print(f"Checksum: {sha_to_hex(x.get_stored_checksum())!r}")
         x.check()
         x.check()
         print(f"Length: {len(x)}")
         print(f"Length: {len(x)}")
@@ -865,9 +865,9 @@ class cmd_dump_pack(Command):
             try:
             try:
                 print(f"\t{x[name]}")
                 print(f"\t{x[name]}")
             except KeyError as k:
             except KeyError as k:
-                print(f"\t{name}: Unable to resolve base {k}")
+                print(f"\t{name.decode('ascii', 'replace')}: Unable to resolve base {k!r}")
             except ApplyDeltaError as e:
             except ApplyDeltaError as e:
-                print(f"\t{name}: Unable to apply delta: {e!r}")
+                print(f"\t{name.decode('ascii', 'replace')}: Unable to apply delta: {e!r}")
 
 
 
 
 class cmd_dump_index(Command):
 class cmd_dump_index(Command):

+ 1 - 1
dulwich/contrib/swift.py

@@ -881,7 +881,7 @@ class SwiftObjectStore(PackBasedObjectStore):
         f = os.fdopen(fd, "w+b")
         f = os.fdopen(fd, "w+b")
         try:
         try:
             pack_data = PackData(file=cast(_GitFile, f), filename=path)
             pack_data = PackData(file=cast(_GitFile, f), filename=path)
-            indexer = PackIndexer(pack_data, resolve_ext_ref=None)
+            indexer = PackIndexer(cast(BinaryIO, pack_data._file), resolve_ext_ref=None)
             copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
             copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
             copier.verify()
             copier.verify()
             return self._complete_thin_pack(f, path, copier, indexer)
             return self._complete_thin_pack(f, path, copier, indexer)

+ 76 - 32
dulwich/pack.py

@@ -1697,8 +1697,10 @@ class PackData:
         entries = self.sorted_entries(
         entries = self.sorted_entries(
             progress=progress, resolve_ext_ref=resolve_ext_ref
             progress=progress, resolve_ext_ref=resolve_ext_ref
         )
         )
+        checksum = self.calculate_checksum()
         with GitFile(filename, "wb") as f:
         with GitFile(filename, "wb") as f:
-            return write_pack_index_v1(f, entries, self.calculate_checksum())
+            write_pack_index_v1(cast(BinaryIO, f), cast(list[tuple[bytes, int, Optional[int]]], entries), checksum)
+        return checksum
 
 
     def create_index_v2(
     def create_index_v2(
         self,
         self,
@@ -1845,7 +1847,7 @@ class DeltaChainIterator(Generic[T]):
     _include_comp = False
     _include_comp = False
 
 
     def __init__(
     def __init__(
-        self, file_obj: Any, *, resolve_ext_ref: Optional[Callable] = None
+        self, file_obj: Optional[BinaryIO], *, resolve_ext_ref: Optional[Callable] = None
     ) -> None:
     ) -> None:
         """Initialize DeltaChainIterator.
         """Initialize DeltaChainIterator.
 
 
@@ -1957,7 +1959,7 @@ class DeltaChainIterator(Generic[T]):
         Args:
         Args:
           pack_data: PackData object to use
           pack_data: PackData object to use
         """
         """
-        self._file = pack_data._file
+        self._file = cast(BinaryIO, pack_data._file)
 
 
     def _walk_all_chains(self) -> Iterator[T]:
     def _walk_all_chains(self) -> Iterator[T]:
         for offset, type_num in self._full_ofs:
         for offset, type_num in self._full_ofs:
@@ -2010,7 +2012,9 @@ class DeltaChainIterator(Generic[T]):
         else:
         else:
             assert unpacked.pack_type_num in DELTA_TYPES
             assert unpacked.pack_type_num in DELTA_TYPES
             unpacked.obj_type_num = obj_type_num
             unpacked.obj_type_num = obj_type_num
-            unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks)
+            base_data = b"".join(base_chunks) if base_chunks else b""
+            delta_data = b"".join(unpacked.decomp_chunks) if unpacked.decomp_chunks else b""
+            unpacked.obj_chunks = [apply_delta(base_data, delta_data)]
         return unpacked
         return unpacked
 
 
     def _follow_chain(
     def _follow_chain(
@@ -2202,6 +2206,7 @@ class SHA1Reader(BinaryIO):
         raise UnsupportedOperation("write")
         raise UnsupportedOperation("write")
 
 
     def __enter__(self) -> "SHA1Reader":
     def __enter__(self) -> "SHA1Reader":
+        """Enter context manager."""
         return self
         return self
 
 
     def __exit__(
     def __exit__(
@@ -2210,9 +2215,11 @@ class SHA1Reader(BinaryIO):
         value: Optional[BaseException],
         value: Optional[BaseException],
         traceback: Optional[TracebackType],
         traceback: Optional[TracebackType],
     ) -> None:
     ) -> None:
+        """Exit context manager and close file."""
         self.close()
         self.close()
 
 
     def __iter__(self) -> "SHA1Reader":
     def __iter__(self) -> "SHA1Reader":
+        """Return iterator for reading file lines."""
         return self
         return self
 
 
     def __next__(self) -> bytes:
     def __next__(self) -> bytes:
@@ -2349,7 +2356,7 @@ class SHA1Writer(BinaryIO):
         """
         """
         raise UnsupportedOperation("readlines")
         raise UnsupportedOperation("readlines")
 
 
-    def writelines(self, lines: Iterable[bytes]) -> None:
+    def writelines(self, lines: Iterable[bytes], /) -> None:  # type: ignore[override]
         """Write multiple lines to the file.
         """Write multiple lines to the file.
 
 
         Args:
         Args:
@@ -2374,7 +2381,7 @@ class SHA1Writer(BinaryIO):
         self,
         self,
         type: Optional[type],
         type: Optional[type],
         value: Optional[BaseException],
         value: Optional[BaseException],
-        traceback: Optional[Any],
+        traceback: Optional[TracebackType],
     ) -> None:
     ) -> None:
         """Exit context manager and close file."""
         """Exit context manager and close file."""
         self.close()
         self.close()
@@ -2409,7 +2416,7 @@ class SHA1Writer(BinaryIO):
 
 
 
 
 def pack_object_header(
 def pack_object_header(
-    type_num: int, delta_base: Optional[Any], size: int
+    type_num: int, delta_base: Optional[Union[bytes, int]], size: int
 ) -> bytearray:
 ) -> bytearray:
     """Create a pack object header for the given object info.
     """Create a pack object header for the given object info.
 
 
@@ -2428,6 +2435,7 @@ def pack_object_header(
         size >>= 7
         size >>= 7
     header.append(c)
     header.append(c)
     if type_num == OFS_DELTA:
     if type_num == OFS_DELTA:
+        assert isinstance(delta_base, int)
         ret = [delta_base & 0x7F]
         ret = [delta_base & 0x7F]
         delta_base >>= 7
         delta_base >>= 7
         while delta_base:
         while delta_base:
@@ -2436,13 +2444,14 @@ def pack_object_header(
             delta_base >>= 7
             delta_base >>= 7
         header.extend(ret)
         header.extend(ret)
     elif type_num == REF_DELTA:
     elif type_num == REF_DELTA:
+        assert isinstance(delta_base, bytes)
         assert len(delta_base) == 20
         assert len(delta_base) == 20
         header += delta_base
         header += delta_base
     return bytearray(header)
     return bytearray(header)
 
 
 
 
 def pack_object_chunks(
 def pack_object_chunks(
-    type: int, object: ShaFile, compression_level: int = -1
+    type: int, object: Union[ShaFile, bytes, list[bytes], tuple[Union[bytes, int], Union[bytes, list[bytes]]]], compression_level: int = -1
 ) -> Iterator[bytes]:
 ) -> Iterator[bytes]:
     """Generate chunks for a pack object.
     """Generate chunks for a pack object.
 
 
@@ -2453,14 +2462,27 @@ def pack_object_chunks(
     Returns: Chunks
     Returns: Chunks
     """
     """
     if type in DELTA_TYPES:
     if type in DELTA_TYPES:
-        delta_base, object = object
+        if isinstance(object, tuple):
+            delta_base, object = object
+        else:
+            raise TypeError("Delta types require a tuple of (delta_base, object)")
     else:
     else:
         delta_base = None
         delta_base = None
+    
+    # Convert object to list of bytes chunks
     if isinstance(object, bytes):
     if isinstance(object, bytes):
-        object = [object]
-    yield bytes(pack_object_header(type, delta_base, sum(map(len, object))))
+        chunks = [object]
+    elif isinstance(object, list):
+        chunks = object
+    elif isinstance(object, ShaFile):
+        chunks = object.as_raw_chunks()
+    else:
+        # Shouldn't reach here with proper typing
+        raise TypeError(f"Unexpected object type: {object.__class__.__name__}")
+    
+    yield bytes(pack_object_header(type, delta_base, sum(map(len, chunks))))
     compressor = zlib.compressobj(level=compression_level)
     compressor = zlib.compressobj(level=compression_level)
-    for data in object:
+    for data in chunks:
         yield compressor.compress(data)
         yield compressor.compress(data)
     yield compressor.flush()
     yield compressor.flush()
 
 
@@ -2469,9 +2491,9 @@ def write_pack_object(
     write: Callable[[bytes], int],
     write: Callable[[bytes], int],
     type: int,
     type: int,
     object: ShaFile,
     object: ShaFile,
-    sha: Optional[bytes] = None,
+    sha: Optional[hashlib._Hash] = None,
     compression_level: int = -1,
     compression_level: int = -1,
-) -> bytes:
+) -> int:
     """Write pack object to a file.
     """Write pack object to a file.
 
 
     Args:
     Args:
@@ -2480,7 +2502,7 @@ def write_pack_object(
       object: Object to write
       object: Object to write
       sha: Optional SHA-1 hasher to update
       sha: Optional SHA-1 hasher to update
       compression_level: the zlib compression level
       compression_level: the zlib compression level
-    Returns: Tuple with offset at which the object was written, and crc32
+    Returns: CRC32 checksum of the written object
     """
     """
     crc32 = 0
     crc32 = 0
     for chunk in pack_object_chunks(type, object, compression_level=compression_level):
     for chunk in pack_object_chunks(type, object, compression_level=compression_level):
@@ -2595,7 +2617,7 @@ def deltify_pack_objects(
         delta_base is None for full text entries
         delta_base is None for full text entries
     """
     """
 
 
-    def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, None]]]:
+    def objects_with_hints() -> Iterator[tuple[ShaFile, tuple[int, Optional[bytes]]]]:
         for e in objects:
         for e in objects:
             if isinstance(e, ShaFile):
             if isinstance(e, ShaFile):
                 yield (e, (e.type_num, None))
                 yield (e, (e.type_num, None))
@@ -2668,7 +2690,7 @@ def deltas_from_sorted_objects(
                 continue
                 continue
             delta_len = 0
             delta_len = 0
             delta = []
             delta = []
-            for chunk in create_delta(base, raw):
+            for chunk in create_delta(b"".join(base), b"".join(raw)):
                 delta_len += len(chunk)
                 delta_len += len(chunk)
                 if delta_len >= winner_len:
                 if delta_len >= winner_len:
                     break
                     break
@@ -2724,7 +2746,7 @@ def pack_objects_to_data(
         )
         )
     else:
     else:
 
 
-        def iter_without_path() -> Iterator[tuple[ShaFile, tuple[int, None]]]:
+        def iter_without_path() -> Iterator[UnpackedObject]:
             for o in objects:
             for o in objects:
                 if isinstance(o, tuple):
                 if isinstance(o, tuple):
                     yield full_unpacked_object(o[0])
                     yield full_unpacked_object(o[0])
@@ -2945,6 +2967,7 @@ class PackChunkGenerator:
                     raw = (offset - base_offset, unpacked.decomp_chunks)
                     raw = (offset - base_offset, unpacked.decomp_chunks)
             else:
             else:
                 raw = unpacked.decomp_chunks
                 raw = unpacked.decomp_chunks
+            chunks: Union[list[bytes], Iterator[bytes]]
             if unpacked.comp_chunks is not None and reuse_compressed:
             if unpacked.comp_chunks is not None and reuse_compressed:
                 chunks = unpacked.comp_chunks
                 chunks = unpacked.comp_chunks
             else:
             else:
@@ -3000,7 +3023,7 @@ def write_pack_data(
 
 
 def write_pack_index_v1(
 def write_pack_index_v1(
     f: BinaryIO, entries: list[tuple[bytes, int, Optional[int]]], pack_checksum: bytes
     f: BinaryIO, entries: list[tuple[bytes, int, Optional[int]]], pack_checksum: bytes
-) -> None:
+) -> bytes:
     """Write a new pack index file.
     """Write a new pack index file.
 
 
     Args:
     Args:
@@ -3058,7 +3081,7 @@ def _encode_copy_operation(start: int, length: int) -> bytes:
     return bytes(scratch)
     return bytes(scratch)
 
 
 
 
-def create_delta(base_buf: bytes, target_buf: bytes) -> bytes:
+def create_delta(base_buf: bytes, target_buf: bytes) -> Iterator[bytes]:
     """Use python difflib to work out how to transform base_buf to target_buf.
     """Use python difflib to work out how to transform base_buf to target_buf.
 
 
     Args:
     Args:
@@ -3119,7 +3142,7 @@ def apply_delta(src_buf: bytes, delta: bytes) -> bytes:
     index = 0
     index = 0
     delta_length = len(delta)
     delta_length = len(delta)
 
 
-    def get_delta_header_size(delta: bytes, index: list[int]) -> tuple[int, int]:
+    def get_delta_header_size(delta: bytes, index: int) -> tuple[int, int]:
         size = 0
         size = 0
         i = 0
         i = 0
         while delta:
         while delta:
@@ -3175,7 +3198,7 @@ def apply_delta(src_buf: bytes, delta: bytes) -> bytes:
     if dest_size != chunks_length(out):
     if dest_size != chunks_length(out):
         raise ApplyDeltaError("dest size incorrect")
         raise ApplyDeltaError("dest size incorrect")
 
 
-    return out
+    return b"".join(out)
 
 
 
 
 def write_pack_index_v2(
 def write_pack_index_v2(
@@ -3437,14 +3460,16 @@ class Pack:
         self,
         self,
         exc_type: Optional[type],
         exc_type: Optional[type],
         exc_val: Optional[BaseException],
         exc_val: Optional[BaseException],
-        exc_tb: Optional[Any],
+        exc_tb: Optional[TracebackType],
     ) -> None:
     ) -> None:
         """Exit context manager."""
         """Exit context manager."""
         self.close()
         self.close()
 
 
-    def __eq__(self, other: Any) -> bool:
+    def __eq__(self, other: object) -> bool:
         """Check equality with another pack."""
         """Check equality with another pack."""
-        return isinstance(self, type(other)) and self.index == other.index
+        if not isinstance(other, Pack):
+            return False
+        return self.index == other.index
 
 
     def __len__(self) -> int:
     def __len__(self) -> int:
         """Number of entries in this pack."""
         """Number of entries in this pack."""
@@ -3545,15 +3570,17 @@ class Pack:
     ) -> Iterator[UnpackedObject]:
     ) -> Iterator[UnpackedObject]:
         """Iterate over unpacked objects in subset."""
         """Iterate over unpacked objects in subset."""
         ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list)
         ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list)
-        ofs: dict[bytes, int] = {}
+        ofs: dict[int, bytes] = {}
         todo = set(shas)
         todo = set(shas)
         for unpacked in self.iter_unpacked(include_comp=include_comp):
         for unpacked in self.iter_unpacked(include_comp=include_comp):
             sha = unpacked.sha()
             sha = unpacked.sha()
-            ofs[unpacked.offset] = sha
+            if unpacked.offset is not None:
+                ofs[unpacked.offset] = sha
             hexsha = sha_to_hex(sha)
             hexsha = sha_to_hex(sha)
             if hexsha in todo:
             if hexsha in todo:
                 if unpacked.pack_type_num == OFS_DELTA:
                 if unpacked.pack_type_num == OFS_DELTA:
                     assert isinstance(unpacked.delta_base, int)
                     assert isinstance(unpacked.delta_base, int)
+                    assert unpacked.offset is not None
                     base_offset = unpacked.offset - unpacked.delta_base
                     base_offset = unpacked.offset - unpacked.delta_base
                     try:
                     try:
                         unpacked.delta_base = ofs[base_offset]
                         unpacked.delta_base = ofs[base_offset]
@@ -3564,10 +3591,11 @@ class Pack:
                         unpacked.pack_type_num = REF_DELTA
                         unpacked.pack_type_num = REF_DELTA
                 yield unpacked
                 yield unpacked
                 todo.remove(hexsha)
                 todo.remove(hexsha)
-            for child in ofs_pending.pop(unpacked.offset, []):
-                child.pack_type_num = REF_DELTA
-                child.delta_base = sha
-                yield child
+            if unpacked.offset is not None:
+                for child in ofs_pending.pop(unpacked.offset, []):
+                    child.pack_type_num = REF_DELTA
+                    child.delta_base = sha
+                    yield child
         assert not ofs_pending
         assert not ofs_pending
         if not allow_missing and todo:
         if not allow_missing and todo:
             raise UnresolvedDeltas(list(todo))
             raise UnresolvedDeltas(list(todo))
@@ -3650,7 +3678,23 @@ class Pack:
         # deltas all the way up the stack.
         # deltas all the way up the stack.
         chunks = base_obj
         chunks = base_obj
         for prev_offset, _delta_type, delta in reversed(delta_stack):
         for prev_offset, _delta_type, delta in reversed(delta_stack):
-            chunks = apply_delta(chunks, delta)
+            # Convert chunks to bytes for apply_delta if needed
+            if isinstance(chunks, list):
+                chunks_bytes = b"".join(chunks)
+            elif isinstance(chunks, tuple):
+                # For tuple type, second element is the actual data
+                _, chunk_data = chunks
+                if isinstance(chunk_data, list):
+                    chunks_bytes = b"".join(chunk_data)
+                else:
+                    chunks_bytes = chunk_data
+            else:
+                chunks_bytes = chunks
+            
+            # Apply delta and convert back to list
+            result_bytes = apply_delta(chunks_bytes, delta)
+            chunks = [result_bytes]
+            
             if prev_offset is not None:
             if prev_offset is not None:
                 self.data._offset_cache[prev_offset] = base_type, chunks
                 self.data._offset_cache[prev_offset] = base_type, chunks
         return base_type, chunks
         return base_type, chunks

+ 10 - 3
dulwich/porcelain.py

@@ -234,6 +234,7 @@ class NoneStream(RawIOBase):
     """Fallback if stdout or stderr are unavailable, does nothing."""
     """Fallback if stdout or stderr are unavailable, does nothing."""
 
 
     def read(self, size: int = -1) -> None:
     def read(self, size: int = -1) -> None:
+        """Read from stream (returns None as this is a null stream)."""
         return None
         return None
 
 
     def readall(self) -> bytes:
     def readall(self) -> bytes:
@@ -269,6 +270,7 @@ class Error(Exception):
     """Porcelain-based error."""
     """Porcelain-based error."""
 
 
     def __init__(self, msg: str) -> None:
     def __init__(self, msg: str) -> None:
+        """Initialize Error with message."""
         super().__init__(msg)
         super().__init__(msg)
 
 
 
 
@@ -340,7 +342,8 @@ def parse_timezone_format(tz_str: str) -> int:
 
 
 
 
 def get_user_timezones() -> tuple[int, int]:
 def get_user_timezones() -> tuple[int, int]:
-    """Retrieve local timezone as described in
+    """Retrieve local timezone as described in git documentation.
+
     https://raw.githubusercontent.com/git/git/v2.3.0/Documentation/date-formats.txt
     https://raw.githubusercontent.com/git/git/v2.3.0/Documentation/date-formats.txt
     Returns: A tuple containing author timezone, committer timezone.
     Returns: A tuple containing author timezone, committer timezone.
     """
     """
@@ -411,8 +414,7 @@ def path_to_tree_path(
     path: Union[str, bytes, os.PathLike],
     path: Union[str, bytes, os.PathLike],
     tree_encoding: str = DEFAULT_ENCODING,
     tree_encoding: str = DEFAULT_ENCODING,
 ) -> bytes:
 ) -> bytes:
-    """Convert a path to a path usable in an index, e.g. bytes and relative to
-    the repository root.
+    """Convert a path to a path usable in an index, e.g. bytes and relative to the repository root.
 
 
     Args:
     Args:
       repopath: Repository path, absolute or relative to the cwd
       repopath: Repository path, absolute or relative to the cwd
@@ -462,6 +464,7 @@ class DivergedBranches(Error):
     """Branches have diverged and fast-forward is not possible."""
     """Branches have diverged and fast-forward is not possible."""
 
 
     def __init__(self, current_sha: bytes, new_sha: bytes) -> None:
     def __init__(self, current_sha: bytes, new_sha: bytes) -> None:
+        """Initialize DivergedBranches error with current and new SHA values."""
         self.current_sha = current_sha
         self.current_sha = current_sha
         self.new_sha = new_sha
         self.new_sha = new_sha
 
 
@@ -554,6 +557,7 @@ def symbolic_ref(
 
 
 
 
 def pack_refs(repo: RepoPath, all: bool = False) -> None:
 def pack_refs(repo: RepoPath, all: bool = False) -> None:
+    """Pack loose references into packed-refs file."""
     with open_repo_closing(repo) as repo_obj:
     with open_repo_closing(repo) as repo_obj:
         repo_obj.refs.pack_refs(all=all)
         repo_obj.refs.pack_refs(all=all)
 
 
@@ -1205,6 +1209,7 @@ move = mv
 def commit_decode(
 def commit_decode(
     commit: Commit, contents: bytes, default_encoding: str = DEFAULT_ENCODING
     commit: Commit, contents: bytes, default_encoding: str = DEFAULT_ENCODING
 ) -> str:
 ) -> str:
+    """Decode commit contents using the commit's encoding or default."""
     if commit.encoding:
     if commit.encoding:
         encoding = commit.encoding.decode("ascii")
         encoding = commit.encoding.decode("ascii")
     else:
     else:
@@ -1215,6 +1220,7 @@ def commit_decode(
 def commit_encode(
 def commit_encode(
     commit: Commit, contents: str, default_encoding: str = DEFAULT_ENCODING
     commit: Commit, contents: str, default_encoding: str = DEFAULT_ENCODING
 ) -> bytes:
 ) -> bytes:
+    """Encode commit contents using the commit's encoding or default."""
     if commit.encoding:
     if commit.encoding:
         encoding = commit.encoding.decode("ascii")
         encoding = commit.encoding.decode("ascii")
     else:
     else:
@@ -1393,6 +1399,7 @@ def show_object(
     decode: Callable[[bytes], str],
     decode: Callable[[bytes], str],
     outstream: TextIO,
     outstream: TextIO,
 ) -> None:
 ) -> None:
+    """Show details of a git object."""
     handlers: dict[bytes, Callable[[RepoPath, Any, Any, TextIO], None]] = {
     handlers: dict[bytes, Callable[[RepoPath, Any, Any, TextIO], None]] = {
         b"tree": show_tree,
         b"tree": show_tree,
         b"blob": show_blob,
         b"blob": show_blob,

+ 4 - 0
dulwich/rebase.py

@@ -1210,12 +1210,16 @@ def _squash_commits(
     if not entry.commit_sha:
     if not entry.commit_sha:
         raise RebaseError("No commit SHA for squash/fixup operation")
         raise RebaseError("No commit SHA for squash/fixup operation")
     commit_to_squash = repo[entry.commit_sha]
     commit_to_squash = repo[entry.commit_sha]
+    if not isinstance(commit_to_squash, Commit):
+        raise RebaseError(f"Expected commit, got {type(commit_to_squash).__name__}")
 
 
     # Get the previous commit (target of squash)
     # Get the previous commit (target of squash)
     previous_commit = rebaser._done[-1]
     previous_commit = rebaser._done[-1]
 
 
     # Cherry-pick the changes onto the previous commit
     # Cherry-pick the changes onto the previous commit
     parent = repo[commit_to_squash.parents[0]]
     parent = repo[commit_to_squash.parents[0]]
+    if not isinstance(parent, Commit):
+        raise RebaseError(f"Expected parent commit, got {type(parent).__name__}")
 
 
     # Perform three-way merge for the tree
     # Perform three-way merge for the tree
     merged_tree, conflicts = three_way_merge(
     merged_tree, conflicts = three_way_merge(

+ 15 - 0
dulwich/refs.py

@@ -164,6 +164,7 @@ class RefsContainer:
             ]
             ]
         ] = None,
         ] = None,
     ) -> None:
     ) -> None:
+        """Initialize RefsContainer with optional logger function."""
         self._logger = logger
         self._logger = logger
 
 
     def _log(
     def _log(
@@ -278,6 +279,7 @@ class RefsContainer:
         raise NotImplementedError(self.allkeys)
         raise NotImplementedError(self.allkeys)
 
 
     def __iter__(self) -> Iterator[Ref]:
     def __iter__(self) -> Iterator[Ref]:
+        """Iterate over all reference keys."""
         return iter(self.allkeys())
         return iter(self.allkeys())
 
 
     def keys(self, base=None):
     def keys(self, base=None):
@@ -387,6 +389,7 @@ class RefsContainer:
         return refnames, contents
         return refnames, contents
 
 
     def __contains__(self, refname: bytes) -> bool:
     def __contains__(self, refname: bytes) -> bool:
+        """Check if a reference exists."""
         if self.read_ref(refname):
         if self.read_ref(refname):
             return True
             return True
         return False
         return False
@@ -562,18 +565,22 @@ class DictRefsContainer(RefsContainer):
             ]
             ]
         ] = None,
         ] = None,
     ) -> None:
     ) -> None:
+        """Initialize DictRefsContainer with refs dictionary and optional logger."""
         super().__init__(logger=logger)
         super().__init__(logger=logger)
         self._refs = refs
         self._refs = refs
         self._peeled: dict[bytes, ObjectID] = {}
         self._peeled: dict[bytes, ObjectID] = {}
         self._watchers: set[Any] = set()
         self._watchers: set[Any] = set()
 
 
     def allkeys(self) -> set[bytes]:
     def allkeys(self) -> set[bytes]:
+        """Return all reference keys."""
         return set(self._refs.keys())
         return set(self._refs.keys())
 
 
     def read_loose_ref(self, name: bytes) -> Optional[bytes]:
     def read_loose_ref(self, name: bytes) -> Optional[bytes]:
+        """Read a loose reference."""
         return self._refs.get(name, None)
         return self._refs.get(name, None)
 
 
     def get_packed_refs(self) -> dict[bytes, bytes]:
     def get_packed_refs(self) -> dict[bytes, bytes]:
+        """Get packed references."""
         return {}
         return {}
 
 
     def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
     def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
@@ -742,6 +749,7 @@ class DictRefsContainer(RefsContainer):
         return True
         return True
 
 
     def get_peeled(self, name: bytes) -> Optional[bytes]:
     def get_peeled(self, name: bytes) -> Optional[bytes]:
+        """Get peeled version of a reference."""
         return self._peeled.get(name)
         return self._peeled.get(name)
 
 
     def _update(self, refs: dict[bytes, bytes]) -> None:
     def _update(self, refs: dict[bytes, bytes]) -> None:
@@ -760,21 +768,26 @@ class InfoRefsContainer(RefsContainer):
     """Refs container that reads refs from a info/refs file."""
     """Refs container that reads refs from a info/refs file."""
 
 
     def __init__(self, f: BinaryIO) -> None:
     def __init__(self, f: BinaryIO) -> None:
+        """Initialize InfoRefsContainer from info/refs file."""
         self._refs: dict[bytes, bytes] = {}
         self._refs: dict[bytes, bytes] = {}
         self._peeled: dict[bytes, bytes] = {}
         self._peeled: dict[bytes, bytes] = {}
         refs = read_info_refs(f)
         refs = read_info_refs(f)
         (self._refs, self._peeled) = split_peeled_refs(refs)
         (self._refs, self._peeled) = split_peeled_refs(refs)
 
 
     def allkeys(self) -> set[bytes]:
     def allkeys(self) -> set[bytes]:
+        """Return all reference keys."""
         return set(self._refs.keys())
         return set(self._refs.keys())
 
 
     def read_loose_ref(self, name: bytes) -> Optional[bytes]:
     def read_loose_ref(self, name: bytes) -> Optional[bytes]:
+        """Read a loose reference."""
         return self._refs.get(name, None)
         return self._refs.get(name, None)
 
 
     def get_packed_refs(self) -> dict[bytes, bytes]:
     def get_packed_refs(self) -> dict[bytes, bytes]:
+        """Get packed references."""
         return {}
         return {}
 
 
     def get_peeled(self, name: bytes) -> Optional[bytes]:
     def get_peeled(self, name: bytes) -> Optional[bytes]:
+        """Get peeled version of a reference."""
         try:
         try:
             return self._peeled[name]
             return self._peeled[name]
         except KeyError:
         except KeyError:
@@ -819,6 +832,7 @@ class DiskRefsContainer(RefsContainer):
         return f"{self.__class__.__name__}({self.path!r})"
         return f"{self.__class__.__name__}({self.path!r})"
 
 
     def subkeys(self, base: bytes) -> set[bytes]:
     def subkeys(self, base: bytes) -> set[bytes]:
+        """Return subkeys under a given base reference path."""
         subkeys = set()
         subkeys = set()
         path = self.refpath(base)
         path = self.refpath(base)
         for root, unused_dirs, files in os.walk(path):
         for root, unused_dirs, files in os.walk(path):
@@ -838,6 +852,7 @@ class DiskRefsContainer(RefsContainer):
         return subkeys
         return subkeys
 
 
     def allkeys(self) -> set[bytes]:
     def allkeys(self) -> set[bytes]:
+        """Return all reference keys."""
         allkeys = set()
         allkeys = set()
         if os.path.exists(self.refpath(HEADREF)):
         if os.path.exists(self.refpath(HEADREF)):
             allkeys.add(HEADREF)
             allkeys.add(HEADREF)

+ 11 - 0
dulwich/web.py

@@ -607,6 +607,7 @@ class HTTPGitApplication:
         environ: WSGIEnvironment,
         environ: WSGIEnvironment,
         start_response: StartResponse,
         start_response: StartResponse,
     ) -> Iterable[bytes]:
     ) -> Iterable[bytes]:
+        """Handle WSGI request."""
         path = environ["PATH_INFO"]
         path = environ["PATH_INFO"]
         method = environ["REQUEST_METHOD"]
         method = environ["REQUEST_METHOD"]
         req = HTTPGitRequest(
         req = HTTPGitRequest(
@@ -636,6 +637,7 @@ class GunzipFilter:
     """WSGI middleware that unzips gzip-encoded requests before passing on to the underlying application."""
     """WSGI middleware that unzips gzip-encoded requests before passing on to the underlying application."""
 
 
     def __init__(self, application: WSGIApplication) -> None:
     def __init__(self, application: WSGIApplication) -> None:
+        """Initialize GunzipFilter with WSGI application."""
         self.app = application
         self.app = application
 
 
     def __call__(
     def __call__(
@@ -643,6 +645,7 @@ class GunzipFilter:
         environ: WSGIEnvironment,
         environ: WSGIEnvironment,
         start_response: StartResponse,
         start_response: StartResponse,
     ) -> Iterable[bytes]:
     ) -> Iterable[bytes]:
+        """Handle WSGI request with gzip decompression."""
         import gzip
         import gzip
 
 
         if environ.get("HTTP_CONTENT_ENCODING", "") == "gzip":
         if environ.get("HTTP_CONTENT_ENCODING", "") == "gzip":
@@ -659,6 +662,7 @@ class LimitedInputFilter:
     """WSGI middleware that limits the input length of a request to that specified in Content-Length."""
     """WSGI middleware that limits the input length of a request to that specified in Content-Length."""
 
 
     def __init__(self, application: WSGIApplication) -> None:
     def __init__(self, application: WSGIApplication) -> None:
+        """Initialize LimitedInputFilter with WSGI application."""
         self.app = application
         self.app = application
 
 
     def __call__(
     def __call__(
@@ -666,6 +670,7 @@ class LimitedInputFilter:
         environ: WSGIEnvironment,
         environ: WSGIEnvironment,
         start_response: StartResponse,
         start_response: StartResponse,
     ) -> Iterable[bytes]:
     ) -> Iterable[bytes]:
+        """Handle WSGI request with input length limiting."""
         # This is not necessary if this app is run from a conforming WSGI
         # This is not necessary if this app is run from a conforming WSGI
         # server. Unfortunately, there's no way to tell that at this point.
         # server. Unfortunately, there's no way to tell that at this point.
         # TODO: git may used HTTP/1.1 chunked encoding instead of specifying
         # TODO: git may used HTTP/1.1 chunked encoding instead of specifying
@@ -706,15 +711,18 @@ class ServerHandlerLogger(ServerHandler):
             None,
             None,
         ],
         ],
     ) -> None:
     ) -> None:
+        """Log exception using dulwich logger."""
         logger.exception(
         logger.exception(
             "Exception happened during processing of request",
             "Exception happened during processing of request",
             exc_info=exc_info,
             exc_info=exc_info,
         )
         )
 
 
     def log_message(self, format: str, *args: object) -> None:
     def log_message(self, format: str, *args: object) -> None:
+        """Log message using dulwich logger."""
         logger.info(format, *args)
         logger.info(format, *args)
 
 
     def log_error(self, *args: object) -> None:
     def log_error(self, *args: object) -> None:
+        """Log error using dulwich logger."""
         logger.error(*args)
         logger.error(*args)
 
 
 
 
@@ -729,15 +737,18 @@ class WSGIRequestHandlerLogger(WSGIRequestHandler):
             None,
             None,
         ],
         ],
     ) -> None:
     ) -> None:
+        """Log exception using dulwich logger."""
         logger.exception(
         logger.exception(
             "Exception happened during processing of request",
             "Exception happened during processing of request",
             exc_info=exc_info,
             exc_info=exc_info,
         )
         )
 
 
     def log_message(self, format: str, *args: object) -> None:
     def log_message(self, format: str, *args: object) -> None:
+        """Log message using dulwich logger."""
         logger.info(format, *args)
         logger.info(format, *args)
 
 
     def log_error(self, *args: object) -> None:
     def log_error(self, *args: object) -> None:
+        """Log error using dulwich logger."""
         logger.error(*args)
         logger.error(*args)
 
 
     def handle(self) -> None:
     def handle(self) -> None: