Jelmer Vernooij 2 maanden geleden
bovenliggende
commit
ea75abd39c

+ 10 - 0
dulwich/_typing.py

@@ -0,0 +1,10 @@
+"""Common type definitions for Dulwich."""
+
+import sys
+
+if sys.version_info >= (3, 12):
+    from collections.abc import Buffer
+else:
+    Buffer = bytes | bytearray | memoryview
+
+__all__ = ["Buffer"]

+ 3 - 5
dulwich/bitmap.py

@@ -32,7 +32,7 @@ import os
 import struct
 from collections.abc import Iterator
 from io import BytesIO
-from typing import IO, TYPE_CHECKING, Optional
+from typing import IO, TYPE_CHECKING
 
 from .file import GitFile
 
@@ -440,7 +440,7 @@ class PackBitmap:
 
 def read_bitmap(
     filename: str | os.PathLike[str],
-    pack_index: Optional["PackIndex"] = None,
+    pack_index: "PackIndex | None" = None,
 ) -> PackBitmap:
     """Read a bitmap index file.
 
@@ -459,9 +459,7 @@ def read_bitmap(
         return read_bitmap_file(f, pack_index=pack_index)
 
 
-def read_bitmap_file(
-    f: IO[bytes], pack_index: Optional["PackIndex"] = None
-) -> PackBitmap:
+def read_bitmap_file(f: IO[bytes], pack_index: "PackIndex | None" = None) -> PackBitmap:
     """Read bitmap data from a file object.
 
     Args:

+ 1 - 5
dulwich/cli.py

@@ -51,12 +51,8 @@ from typing import (
     TextIO,
 )
 
-if sys.version_info >= (3, 12):
-    from collections.abc import Buffer
-else:
-    Buffer = bytes | bytearray | memoryview
-
 from dulwich import porcelain
+from dulwich._typing import Buffer
 
 from .bundle import Bundle, create_bundle_from_repo, read_bundle, write_bundle
 from .client import get_transport_and_path

+ 9 - 11
dulwich/client.py

@@ -55,8 +55,6 @@ from typing import (
     TYPE_CHECKING,
     Any,
     ClassVar,
-    Optional,
-    Union,
 )
 from urllib.parse import ParseResult, urljoin, urlparse, urlunparse, urlunsplit
 from urllib.parse import quote as urlquote
@@ -1110,7 +1108,7 @@ class GitClient:
         self,
         path: bytes,
         target: BaseRepo,
-        determine_wants: Optional["DetermineWantsFunc"] = None,
+        determine_wants: "DetermineWantsFunc | None" = None,
         progress: Callable[[bytes], None] | None = None,
         depth: int | None = None,
         ref_prefix: Sequence[Ref] | None = None,
@@ -2304,7 +2302,7 @@ class LocalGitClient(GitClient):
         self,
         path: bytes,
         target: BaseRepo,
-        determine_wants: Optional["DetermineWantsFunc"] = None,
+        determine_wants: "DetermineWantsFunc | None" = None,
         progress: Callable[[bytes], None] | None = None,
         depth: int | None = None,
         ref_prefix: Sequence[bytes] | None = None,
@@ -2632,7 +2630,7 @@ class BundleClient(GitClient):
         self,
         path: bytes,
         target: BaseRepo,
-        determine_wants: Optional["DetermineWantsFunc"] = None,
+        determine_wants: "DetermineWantsFunc | None" = None,
         progress: Callable[[bytes], None] | None = None,
         depth: int | None = None,
         ref_prefix: Sequence[Ref] | None = None,
@@ -3221,7 +3219,7 @@ def default_urllib3_manager(
     base_url: str | None = None,
     timeout: float | None = None,
     cert_reqs: str | None = None,
-) -> Union["urllib3.ProxyManager", "urllib3.PoolManager"]:
+) -> "urllib3.ProxyManager | urllib3.PoolManager":
     """Return urllib3 connection pool manager.
 
     Honour detected proxy configurations.
@@ -3989,7 +3987,7 @@ class AbstractHttpGitClient(GitClient):
         username: str | None = None,
         password: str | None = None,
         config: Config | None = None,
-        pool_manager: Optional["urllib3.PoolManager"] = None,
+        pool_manager: "urllib3.PoolManager | None" = None,
     ) -> "AbstractHttpGitClient":
         """Create an AbstractHttpGitClient from a parsed URL.
 
@@ -4084,7 +4082,7 @@ class Urllib3HttpGitClient(AbstractHttpGitClient):
         self,
         base_url: str,
         dumb: bool | None = None,
-        pool_manager: Optional["urllib3.PoolManager"] = None,
+        pool_manager: "urllib3.PoolManager | None" = None,
         config: Config | None = None,
         username: str | None = None,
         password: str | None = None,
@@ -4229,7 +4227,7 @@ def get_transport_and_path_from_url(
     password: str | None = None,
     key_filename: str | None = None,
     ssh_command: str | None = None,
-    pool_manager: Optional["urllib3.PoolManager"] = None,
+    pool_manager: "urllib3.PoolManager | None" = None,
 ) -> tuple[GitClient, str]:
     """Obtain a git client from a URL.
 
@@ -4282,7 +4280,7 @@ def _get_transport_and_path_from_url(
     password: str | None = None,
     key_filename: str | None = None,
     ssh_command: str | None = None,
-    pool_manager: Optional["urllib3.PoolManager"] = None,
+    pool_manager: "urllib3.PoolManager | None" = None,
 ) -> tuple[GitClient, str]:
     parsed = urlparse(url)
     if parsed.scheme == "git":
@@ -4377,7 +4375,7 @@ def get_transport_and_path(
     password: str | None = None,
     key_filename: str | None = None,
     ssh_command: str | None = None,
-    pool_manager: Optional["urllib3.PoolManager"] = None,
+    pool_manager: "urllib3.PoolManager | None" = None,
 ) -> tuple[GitClient, str]:
     """Obtain a git client from a URL.
 

+ 1 - 3
dulwich/config.py

@@ -45,7 +45,6 @@ from typing import (
     IO,
     Generic,
     TypeVar,
-    Union,
     overload,
 )
 
@@ -197,8 +196,7 @@ class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
     @classmethod
     def make(
         cls,
-        dict_in: Union[MutableMapping[K, V], "CaseInsensitiveOrderedMultiDict[K, V]"]
-        | None = None,
+        dict_in: "MutableMapping[K, V] | CaseInsensitiveOrderedMultiDict[K, V] | None" = None,
         default_factory: Callable[[], V] | None = None,
     ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
         """Create a CaseInsensitiveOrderedMultiDict from an existing mapping.

+ 3 - 3
dulwich/contrib/requests_vendor.py

@@ -33,7 +33,7 @@ This implementation is experimental and does not have any tests.
 
 from collections.abc import Callable, Iterator
 from io import BytesIO
-from typing import TYPE_CHECKING, Any, Optional
+from typing import TYPE_CHECKING, Any
 
 if TYPE_CHECKING:
     from ..config import ConfigFile
@@ -56,7 +56,7 @@ class RequestsHttpGitClient(AbstractHttpGitClient):
         self,
         base_url: str,
         dumb: bool | None = None,
-        config: Optional["ConfigFile"] = None,
+        config: "ConfigFile | None" = None,
         username: str | None = None,
         password: str | None = None,
         thin_packs: bool = True,
@@ -133,7 +133,7 @@ class RequestsHttpGitClient(AbstractHttpGitClient):
         return resp, read
 
 
-def get_session(config: Optional["ConfigFile"]) -> Session:
+def get_session(config: "ConfigFile | None") -> Session:
     """Create a requests session with Git configuration.
 
     Args:

+ 3 - 3
dulwich/contrib/swift.py

@@ -39,7 +39,7 @@ import zlib
 from collections.abc import Callable, Iterator, Mapping
 from configparser import ConfigParser
 from io import BytesIO
-from typing import Any, BinaryIO, Optional, cast
+from typing import Any, BinaryIO, cast
 
 from geventhttpclient import HTTPClient
 
@@ -231,7 +231,7 @@ def pack_info_create(pack_data: "PackData", pack_index: "PackIndex") -> bytes:
 
 def load_pack_info(
     filename: str,
-    scon: Optional["SwiftConnector"] = None,
+    scon: "SwiftConnector | None" = None,
     file: BinaryIO | None = None,
 ) -> dict[str, Any] | None:
     """Load pack info from Swift or file.
@@ -821,7 +821,7 @@ class SwiftObjectStore(PackBasedObjectStore):
         """
         f = BytesIO()
 
-        def commit() -> Optional["SwiftPack"]:
+        def commit() -> "SwiftPack | None":
             """Commit the pack to Swift storage.
 
             Returns:

+ 1 - 6
dulwich/diff.py

@@ -48,15 +48,10 @@ import io
 import logging
 import os
 import stat
-import sys
 from collections.abc import Iterable, Sequence
 from typing import BinaryIO
 
-if sys.version_info >= (3, 12):
-    from collections.abc import Buffer
-else:
-    Buffer = bytes | bytearray | memoryview
-
+from ._typing import Buffer
 from .index import ConflictedIndexEntry, commit_index
 from .object_store import iter_tree_contents
 from .objects import S_ISGITLINK, Blob, Commit

+ 3 - 3
dulwich/diff_tree.py

@@ -27,7 +27,7 @@ from collections.abc import Callable, Iterator, Mapping, Sequence
 from collections.abc import Set as AbstractSet
 from io import BytesIO
 from itertools import chain
-from typing import TYPE_CHECKING, Any, NamedTuple, Optional, TypeVar
+from typing import TYPE_CHECKING, Any, NamedTuple, TypeVar
 
 from .object_store import BaseObjectStore
 from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
@@ -260,7 +260,7 @@ def tree_changes(
     tree1_id: ObjectID | None,
     tree2_id: ObjectID | None,
     want_unchanged: bool = False,
-    rename_detector: Optional["RenameDetector"] = None,
+    rename_detector: "RenameDetector | None" = None,
     include_trees: bool = False,
     change_type_same: bool = False,
     paths: Sequence[bytes] | None = None,
@@ -347,7 +347,7 @@ def tree_changes_for_merge(
     store: BaseObjectStore,
     parent_tree_ids: Sequence[ObjectID],
     tree_id: ObjectID,
-    rename_detector: Optional["RenameDetector"] = None,
+    rename_detector: "RenameDetector | None" = None,
 ) -> Iterator[list[TreeChange | None]]:
     """Get the tree changes for a merge tree relative to all its parents.
 

+ 2 - 2
dulwich/dumb.py

@@ -26,7 +26,7 @@ import tempfile
 import zlib
 from collections.abc import Callable, Iterator, Mapping, Sequence
 from io import BytesIO
-from typing import Any, Optional
+from typing import Any
 from urllib.parse import urljoin
 
 from .errors import NotGitRepository, ObjectFormatException
@@ -340,7 +340,7 @@ class DumbHTTPObjectStore(BaseObjectStore):
         self,
         objects: Sequence[tuple[ShaFile, str | None]],
         progress: Callable[[str], None] | None = None,
-    ) -> Optional["Pack"]:
+    ) -> "Pack | None":
         """Add a set of objects to this object store."""
         raise NotImplementedError("Cannot add objects to dumb HTTP repository")
 

+ 1 - 4
dulwich/file.py

@@ -28,10 +28,7 @@ from collections.abc import Iterable, Iterator
 from types import TracebackType
 from typing import IO, Any, ClassVar, Literal, overload
 
-if sys.version_info >= (3, 12):
-    from collections.abc import Buffer
-else:
-    Buffer = bytes | bytearray | memoryview
+from ._typing import Buffer
 
 
 def ensure_dir_exists(

+ 6 - 6
dulwich/filters.py

@@ -25,7 +25,7 @@ import logging
 import subprocess
 import threading
 from collections.abc import Callable
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
 from typing import Protocol as TypingProtocol
 
 from .attrs import GitAttributes
@@ -140,7 +140,7 @@ class ProcessFilterDriver:
         self._capabilities: set[bytes] = set()
         self._process_lock = threading.Lock()
 
-    def _get_or_start_process(self) -> Optional["Protocol"]:
+    def _get_or_start_process(self) -> "Protocol | None":
         """Get or start the long-running process filter."""
         if self._process is None and self.process_cmd:
             from .errors import GitProtocolError, HangupException
@@ -602,8 +602,8 @@ class FilterRegistry:
 
     def __init__(
         self,
-        config: Optional["StackedConfig"] = None,
-        repo: Optional["BaseRepo"] = None,
+        config: "StackedConfig | None" = None,
+        repo: "BaseRepo | None" = None,
     ) -> None:
         """Initialize FilterRegistry.
 
@@ -879,10 +879,10 @@ class FilterBlobNormalizer:
 
     def __init__(
         self,
-        config_stack: Optional["StackedConfig"],
+        config_stack: "StackedConfig | None",
         gitattributes: GitAttributes,
         filter_registry: FilterRegistry | None = None,
-        repo: Optional["BaseRepo"] = None,
+        repo: "BaseRepo | None" = None,
         filter_context: FilterContext | None = None,
     ) -> None:
         """Initialize FilterBlobNormalizer.

+ 5 - 5
dulwich/gc.py

@@ -1,12 +1,12 @@
 """Git garbage collection implementation."""
 
-import collections
 import logging
 import os
 import time
+from collections import deque
 from collections.abc import Callable
 from dataclasses import dataclass, field
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
 
 from dulwich.object_store import (
     BaseObjectStore,
@@ -54,7 +54,7 @@ def find_reachable_objects(
         Set of reachable object SHAs
     """
     reachable = set()
-    pending: collections.deque[ObjectID] = collections.deque()
+    pending: deque[ObjectID] = deque()
 
     # Start with all refs
     for ref in refs_container.allkeys():
@@ -313,7 +313,7 @@ def garbage_collect(
     return stats
 
 
-def should_run_gc(repo: "BaseRepo", config: Optional["Config"] = None) -> bool:
+def should_run_gc(repo: "BaseRepo", config: "Config | None" = None) -> bool:
     """Check if automatic garbage collection should run.
 
     Args:
@@ -372,7 +372,7 @@ def should_run_gc(repo: "BaseRepo", config: Optional["Config"] = None) -> bool:
 
 def maybe_auto_gc(
     repo: "Repo",
-    config: Optional["Config"] = None,
+    config: "Config | None" = None,
     progress: Callable[[str], None] | None = None,
 ) -> bool:
     """Run automatic garbage collection if needed.

+ 6 - 3
dulwich/hooks.py

@@ -226,9 +226,12 @@ class PostReceiveShellHook(ShellHook):
             out_data, err_data = p.communicate(in_data)
 
             if (p.returncode != 0) or err_data:
-                err_fmt = b"post-receive exit code: %d\n" + b"stdout:\n%s\nstderr:\n%s"
-                err_msg = err_fmt % (p.returncode, out_data, err_data)
-                raise HookError(err_msg.decode("utf-8", "backslashreplace"))
+                err_msg = (
+                    f"post-receive exit code: {p.returncode}\n"
+                    f"stdout:\n{out_data.decode('utf-8', 'backslashreplace')}\n"
+                    f"stderr:\n{err_data.decode('utf-8', 'backslashreplace')}"
+                )
+                raise HookError(err_msg)
             return out_data
         except OSError as err:
             raise HookError(repr(err)) from err

+ 2 - 2
dulwich/ignore.py

@@ -30,7 +30,7 @@ import os.path
 import re
 from collections.abc import Iterable, Sequence
 from contextlib import suppress
-from typing import TYPE_CHECKING, BinaryIO, Union
+from typing import TYPE_CHECKING, BinaryIO
 
 if TYPE_CHECKING:
     from .repo import Repo
@@ -38,7 +38,7 @@ if TYPE_CHECKING:
 from .config import Config, get_xdg_config_home_path
 
 
-def _pattern_to_str(pattern: Union["Pattern", bytes, str]) -> str:
+def _pattern_to_str(pattern: "Pattern | bytes | str") -> str:
     """Convert a pattern to string, handling both Pattern objects and raw patterns."""
     if isinstance(pattern, Pattern):
         pattern_data: bytes | str = pattern.pattern

+ 6 - 8
dulwich/index.py

@@ -44,8 +44,6 @@ from typing import (
     TYPE_CHECKING,
     Any,
     BinaryIO,
-    Optional,
-    Union,
 )
 
 if TYPE_CHECKING:
@@ -71,7 +69,7 @@ from .objects import (
 from .pack import ObjectContainer, SHA1Reader, SHA1Writer
 
 # Type alias for recursive tree structure used in commit_tree
-TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]]
+TreeDict = dict[bytes, "TreeDict | tuple[int, bytes]"]
 
 # 2-bit stage (during merge)
 FLAG_STAGEMASK = 0x3000
@@ -1653,7 +1651,7 @@ if sys.platform == "win32":
 
         def __init__(self, errno: int, msg: str, filename: str | None) -> None:
             """Initialize WindowsSymlinkPermissionError."""
-            super(PermissionError, self).__init__(
+            super().__init__(
                 errno,
                 f"Unable to create symlink; do you have developer mode enabled? {msg}",
                 filename,
@@ -1888,7 +1886,7 @@ def build_index_from_tree(
         [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
     ]
     | None = None,
-    blob_normalizer: Optional["FilterBlobNormalizer"] = None,
+    blob_normalizer: "FilterBlobNormalizer | None" = None,
     tree_encoding: str = "utf-8",
 ) -> None:
     """Generate and materialize index from a tree.
@@ -2159,7 +2157,7 @@ def _check_file_matches(
     entry_mode: int,
     current_stat: os.stat_result,
     honor_filemode: bool,
-    blob_normalizer: Optional["FilterBlobNormalizer"] = None,
+    blob_normalizer: "FilterBlobNormalizer | None" = None,
     tree_path: bytes | None = None,
 ) -> bool:
     """Check if a file on disk matches the expected git object.
@@ -2255,7 +2253,7 @@ def _transition_to_file(
         [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
     ]
     | None,
-    blob_normalizer: Optional["FilterBlobNormalizer"],
+    blob_normalizer: "FilterBlobNormalizer | None",
     tree_encoding: str = "utf-8",
 ) -> None:
     """Transition any type to regular file or symlink."""
@@ -2515,7 +2513,7 @@ def update_working_tree(
     ]
     | None = None,
     force_remove_untracked: bool = False,
-    blob_normalizer: Optional["FilterBlobNormalizer"] = None,
+    blob_normalizer: "FilterBlobNormalizer | None" = None,
     tree_encoding: str = "utf-8",
     allow_overwrite_modified: bool = False,
 ) -> None:

+ 9 - 11
dulwich/lfs.py

@@ -39,7 +39,7 @@ import os
 import tempfile
 from collections.abc import Iterable, Mapping
 from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any, BinaryIO, Optional
+from typing import TYPE_CHECKING, Any, BinaryIO
 from urllib.parse import urljoin, urlparse
 from urllib.request import Request, urlopen
 
@@ -182,7 +182,7 @@ class LFSPointer:
         self.size = size
 
     @classmethod
-    def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
+    def from_bytes(cls, data: bytes) -> "LFSPointer | None":
         """Parse LFS pointer from bytes.
 
         Returns None if data is not a valid LFS pointer.
@@ -243,9 +243,7 @@ class LFSPointer:
 class LFSFilterDriver:
     """LFS filter driver implementation."""
 
-    def __init__(
-        self, lfs_store: "LFSStore", config: Optional["Config"] = None
-    ) -> None:
+    def __init__(self, lfs_store: "LFSStore", config: "Config | None" = None) -> None:
         """Initialize LFSFilterDriver."""
         self.lfs_store = lfs_store
         self.config = config
@@ -328,13 +326,13 @@ class LFSFilterDriver:
         """Clean up any resources held by this filter driver."""
         # LFSFilterDriver doesn't hold any resources that need cleanup
 
-    def reuse(self, config: Optional["Config"], filter_name: str) -> bool:
+    def reuse(self, config: "Config | None", filter_name: str) -> bool:
         """Check if this filter driver should be reused with the given configuration."""
         # LFSFilterDriver is stateless and lightweight, no need to cache
         return False
 
 
-def _get_lfs_user_agent(config: Optional["Config"]) -> str:
+def _get_lfs_user_agent(config: "Config | None") -> str:
     """Get User-Agent string for LFS requests, respecting git config."""
     try:
         if config:
@@ -385,7 +383,7 @@ def _is_valid_lfs_url(url: str) -> bool:
 class LFSClient:
     """Base class for LFS client operations."""
 
-    def __init__(self, url: str, config: Optional["Config"] = None) -> None:
+    def __init__(self, url: str, config: "Config | None" = None) -> None:
         """Initialize LFS client.
 
         Args:
@@ -427,7 +425,7 @@ class LFSClient:
         raise NotImplementedError
 
     @classmethod
-    def from_config(cls, config: "Config") -> Optional["LFSClient"]:
+    def from_config(cls, config: "Config") -> "LFSClient | None":
         """Create LFS client from git config.
 
         Returns the appropriate subclass (HTTPLFSClient or FileLFSClient)
@@ -491,7 +489,7 @@ class LFSClient:
 class HTTPLFSClient(LFSClient):
     """LFS client for HTTP/HTTPS operations."""
 
-    def __init__(self, url: str, config: Optional["Config"] = None) -> None:
+    def __init__(self, url: str, config: "Config | None" = None) -> None:
         """Initialize HTTP LFS client.
 
         Args:
@@ -711,7 +709,7 @@ class HTTPLFSClient(LFSClient):
 class FileLFSClient(LFSClient):
     """LFS client for file:// URLs that accesses local filesystem."""
 
-    def __init__(self, url: str, config: Optional["Config"] = None) -> None:
+    def __init__(self, url: str, config: "Config | None" = None) -> None:
         """Initialize File LFS client.
 
         Args:

+ 2 - 2
dulwich/line_ending.py

@@ -139,7 +139,7 @@ Sources:
 
 import logging
 from collections.abc import Callable, Mapping
-from typing import TYPE_CHECKING, Any, Optional
+from typing import TYPE_CHECKING, Any
 
 if TYPE_CHECKING:
     from .config import StackedConfig
@@ -176,7 +176,7 @@ class LineEndingFilter(FilterDriver):
 
     @classmethod
     def from_config(
-        cls, config: Optional["StackedConfig"], for_text_attr: bool = False
+        cls, config: "StackedConfig | None", for_text_attr: bool = False
     ) -> "LineEndingFilter":
         """Create a LineEndingFilter from git configuration.
 

+ 2 - 2
dulwich/lru_cache.py

@@ -23,7 +23,7 @@
 """A simple least-recently-used (LRU) cache."""
 
 from collections.abc import Callable, Iterable, Iterator
-from typing import Generic, Optional, TypeVar, cast
+from typing import Generic, TypeVar, cast
 
 _null_key = object()
 
@@ -37,7 +37,7 @@ class _LRUNode(Generic[K, V]):
 
     __slots__ = ("cleanup", "key", "next_key", "prev", "size", "value")
 
-    prev: Optional["_LRUNode[K, V]"]
+    prev: "_LRUNode[K, V] | None"
     next_key: K | object
     size: int | None
 

+ 1 - 1
dulwich/merge.py

@@ -24,7 +24,7 @@ def make_merge3(
     a: Sequence[bytes],
     b: Sequence[bytes],
     is_cherrypick: bool = False,
-    sequence_matcher: type["SequenceMatcherProtocol[bytes]"] | None = None,
+    sequence_matcher: "type[SequenceMatcherProtocol[bytes]] | None" = None,
 ) -> "merge3.Merge3[bytes]":
     """Return a Merge3 object, or raise ImportError if merge3 is not installed."""
     if merge3 is None:

+ 6 - 6
dulwich/notes.py

@@ -22,7 +22,7 @@
 
 import stat
 from collections.abc import Iterator, Sequence
-from typing import TYPE_CHECKING, Optional
+from typing import TYPE_CHECKING
 
 from .objects import Blob, Tree
 
@@ -609,7 +609,7 @@ class Notes:
     def get_notes_ref(
         self,
         notes_ref: bytes | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
     ) -> bytes:
         """Get the notes reference to use.
 
@@ -631,7 +631,7 @@ class Notes:
         self,
         object_sha: bytes,
         notes_ref: bytes | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
     ) -> bytes | None:
         """Get the note for an object.
 
@@ -675,7 +675,7 @@ class Notes:
         author: bytes | None = None,
         committer: bytes | None = None,
         message: bytes | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
     ) -> bytes:
         """Set or update a note for an object.
 
@@ -759,7 +759,7 @@ class Notes:
         author: bytes | None = None,
         committer: bytes | None = None,
         message: bytes | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
     ) -> bytes | None:
         """Remove a note for an object.
 
@@ -837,7 +837,7 @@ class Notes:
     def list_notes(
         self,
         notes_ref: bytes | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
     ) -> list[tuple[bytes, bytes]]:
         """List all notes in a notes ref.
 

+ 10 - 11
dulwich/object_store.py

@@ -36,7 +36,6 @@ from pathlib import Path
 from typing import (
     TYPE_CHECKING,
     BinaryIO,
-    Optional,
     Protocol,
 )
 
@@ -351,7 +350,7 @@ class BaseObjectStore:
         self,
         objects: Sequence[tuple[ShaFile, str | None]],
         progress: Callable[..., None] | None = None,
-    ) -> Optional["Pack"]:
+    ) -> "Pack | None":
         """Add a set of objects to this object store.
 
         Args:
@@ -379,7 +378,7 @@ class BaseObjectStore:
         want_unchanged: bool = False,
         include_trees: bool = False,
         change_type_same: bool = False,
-        rename_detector: Optional["RenameDetector"] = None,
+        rename_detector: "RenameDetector | None" = None,
         paths: Sequence[bytes] | None = None,
     ) -> Iterator[
         tuple[
@@ -648,7 +647,7 @@ class BaseObjectStore:
             if sha.startswith(prefix):
                 yield sha
 
-    def get_commit_graph(self) -> Optional["CommitGraph"]:
+    def get_commit_graph(self) -> "CommitGraph | None":
         """Get the commit graph for this object store.
 
         Returns:
@@ -708,7 +707,7 @@ class PackCapableObjectStore(BaseObjectStore, PackedObjectContainer):
         count: int,
         unpacked_objects: Iterator["UnpackedObject"],
         progress: Callable[..., None] | None = None,
-    ) -> Optional["Pack"]:
+    ) -> "Pack | None":
         """Add pack data to this object store.
 
         Args:
@@ -805,7 +804,7 @@ class PackBasedObjectStore(PackCapableObjectStore, PackedObjectContainer):
         count: int,
         unpacked_objects: Iterator[UnpackedObject],
         progress: Callable[..., None] | None = None,
-    ) -> Optional["Pack"]:
+    ) -> "Pack | None":
         """Add pack data to this object store.
 
         Args:
@@ -1234,7 +1233,7 @@ class PackBasedObjectStore(PackCapableObjectStore, PackedObjectContainer):
         self,
         objects: Sequence[tuple[ShaFile, str | None]],
         progress: Callable[[str], None] | None = None,
-    ) -> Optional["Pack"]:
+    ) -> "Pack | None":
         """Add a set of objects to this object store.
 
         Args:
@@ -1253,8 +1252,8 @@ class DiskObjectStore(PackBasedObjectStore):
 
     path: str | os.PathLike[str]
     pack_dir: str | os.PathLike[str]
-    _alternates: list["BaseObjectStore"] | None
-    _commit_graph: Optional["CommitGraph"]
+    _alternates: "list[BaseObjectStore] | None"
+    _commit_graph: "CommitGraph | None"
 
     def __init__(
         self,
@@ -1735,7 +1734,7 @@ class DiskObjectStore(PackBasedObjectStore):
         f = os.fdopen(fd, "w+b")
         os.chmod(path, PACK_MODE)
 
-        def commit() -> Optional["Pack"]:
+        def commit() -> "Pack | None":
             if f.tell() > 0:
                 f.seek(0)
 
@@ -1837,7 +1836,7 @@ class DiskObjectStore(PackBasedObjectStore):
                     seen.add(sha)
                     yield sha
 
-    def get_commit_graph(self) -> Optional["CommitGraph"]:
+    def get_commit_graph(self) -> "CommitGraph | None":
         """Get the commit graph for this object store.
 
         Returns:

+ 7 - 8
dulwich/objects.py

@@ -36,7 +36,6 @@ from typing import (
     TYPE_CHECKING,
     NamedTuple,
     TypeVar,
-    Union,
 )
 
 if sys.version_info >= (3, 11):
@@ -395,11 +394,11 @@ class ShaFile:
     type_name: bytes
     type_num: int
     _chunked_text: list[bytes] | None
-    _sha: Union[FixedSha, None, "HASH"]
+    _sha: "FixedSha | None | HASH"
 
     @staticmethod
     def _parse_legacy_object_header(
-        magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
+        magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
     ) -> "ShaFile":
         """Parse a legacy object, creating it but not reading the file."""
         bufsize = 1024
@@ -500,7 +499,7 @@ class ShaFile:
 
     @staticmethod
     def _parse_object_header(
-        magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
+        magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
     ) -> "ShaFile":
         """Parse a new style object, creating it but not reading the file."""
         num_type = (ord(magic[0:1]) >> 4) & 7
@@ -529,7 +528,7 @@ class ShaFile:
         return (b0 & 0x8F) == 0x08 and (word % 31) == 0
 
     @classmethod
-    def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
+    def _parse_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
         map = f.read()
         if not map:
             raise EmptyFileException("Corrupted empty file detected")
@@ -561,7 +560,7 @@ class ShaFile:
             return cls.from_file(f)
 
     @classmethod
-    def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
+    def from_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
         """Get the contents of a SHA file on disk."""
         try:
             obj = cls._parse_file(f)
@@ -655,7 +654,7 @@ class ShaFile:
         """Returns the length of the raw string of this object."""
         return sum(map(len, self.as_raw_chunks()))
 
-    def sha(self) -> Union[FixedSha, "HASH"]:
+    def sha(self) -> "FixedSha | HASH":
         """The SHA1 object that is the name of this object."""
         if self._sha is None or self._needs_serialization:
             # this is a local because as_raw_chunks() overwrites self._sha
@@ -891,7 +890,7 @@ class Tag(ShaFile):
 
     _message: bytes | None
     _name: bytes | None
-    _object_class: type["ShaFile"] | None
+    _object_class: "type[ShaFile] | None"
     _object_sha: bytes | None
     _signature: bytes | None
     _tag_time: int | None

+ 9 - 9
dulwich/objectspec.py

@@ -22,7 +22,7 @@
 """Object specification."""
 
 from collections.abc import Sequence
-from typing import TYPE_CHECKING, Optional, Union
+from typing import TYPE_CHECKING
 
 from .objects import Commit, ShaFile, Tag, Tree
 from .refs import local_branch_name, local_tag_name
@@ -290,7 +290,7 @@ def parse_tree(repo: "BaseRepo", treeish: bytes | str | Tree | Commit | Tag) ->
     return o
 
 
-def parse_ref(container: Union["Repo", "RefsContainer"], refspec: str | bytes) -> "Ref":
+def parse_ref(container: "Repo | RefsContainer", refspec: str | bytes) -> "Ref":
     """Parse a string referring to a reference.
 
     Args:
@@ -316,11 +316,11 @@ def parse_ref(container: Union["Repo", "RefsContainer"], refspec: str | bytes) -
 
 
 def parse_reftuple(
-    lh_container: Union["Repo", "RefsContainer"],
-    rh_container: Union["Repo", "RefsContainer"],
+    lh_container: "Repo | RefsContainer",
+    rh_container: "Repo | RefsContainer",
     refspec: str | bytes,
     force: bool = False,
-) -> tuple[Optional["Ref"], Optional["Ref"], bool]:
+) -> tuple["Ref | None", "Ref | None", bool]:
     """Parse a reftuple spec.
 
     Args:
@@ -359,11 +359,11 @@ def parse_reftuple(
 
 
 def parse_reftuples(
-    lh_container: Union["Repo", "RefsContainer"],
-    rh_container: Union["Repo", "RefsContainer"],
+    lh_container: "Repo | RefsContainer",
+    rh_container: "Repo | RefsContainer",
     refspecs: bytes | Sequence[bytes],
     force: bool = False,
-) -> list[tuple[Optional["Ref"], Optional["Ref"], bool]]:
+) -> list[tuple["Ref | None", "Ref | None", bool]]:
     """Parse a list of reftuple specs to a list of reftuples.
 
     Args:
@@ -385,7 +385,7 @@ def parse_reftuples(
 
 
 def parse_refs(
-    container: Union["Repo", "RefsContainer"],
+    container: "Repo | RefsContainer",
     refspecs: bytes | str | Sequence[bytes | str],
 ) -> list["Ref"]:
     """Parse a list of refspecs to a list of refs.

+ 6 - 13
dulwich/pack.py

@@ -60,10 +60,8 @@ from typing import (
     Any,
     BinaryIO,
     Generic,
-    Optional,
     Protocol,
     TypeVar,
-    Union,
 )
 
 try:
@@ -73,11 +71,6 @@ except ImportError:
 else:
     has_mmap = True
 
-if sys.version_info >= (3, 12):
-    from collections.abc import Buffer
-else:
-    Buffer = bytes | bytearray | memoryview
-
 if TYPE_CHECKING:
     from _hashlib import HASH as HashObject
 
@@ -137,7 +130,7 @@ class ObjectContainer(Protocol):
         self,
         objects: Sequence[tuple[ShaFile, str | None]],
         progress: Callable[..., None] | None = None,
-    ) -> Optional["Pack"]:
+    ) -> "Pack | None":
         """Add a set of objects to this object store.
 
         Args:
@@ -152,7 +145,7 @@ class ObjectContainer(Protocol):
     def __getitem__(self, sha1: bytes) -> ShaFile:
         """Retrieve an object."""
 
-    def get_commit_graph(self) -> Optional["CommitGraph"]:
+    def get_commit_graph(self) -> "CommitGraph | None":
         """Get the commit graph for this object store.
 
         Returns:
@@ -758,7 +751,7 @@ class FilePackIndex(PackIndex):
         self,
         filename: str | os.PathLike[str],
         file: IO[bytes] | _GitFile | None = None,
-        contents: Union[bytes, "mmap.mmap"] | None = None,
+        contents: "bytes | mmap.mmap | None" = None,
         size: int | None = None,
     ) -> None:
         """Create a pack index object.
@@ -1411,7 +1404,7 @@ class PackStreamCopier(PackStreamReader):
         read_all: Callable[[int], bytes],
         read_some: Callable[[int], bytes] | None,
         outfile: IO[bytes],
-        delta_iter: Optional["DeltaChainIterator[UnpackedObject]"] = None,
+        delta_iter: "DeltaChainIterator[UnpackedObject] | None" = None,
     ) -> None:
         """Initialize the copier.
 
@@ -2520,7 +2513,7 @@ def write_pack_object(
     write: Callable[[bytes], int],
     type: int,
     object: list[bytes] | tuple[bytes | int, list[bytes]],
-    sha: Optional["HashObject"] = None,
+    sha: "HashObject | None" = None,
     compression_level: int = -1,
 ) -> int:
     """Write pack object to a file.
@@ -3523,7 +3516,7 @@ class Pack:
         return self._idx
 
     @property
-    def bitmap(self) -> Optional["PackBitmap"]:
+    def bitmap(self) -> "PackBitmap | None":
         """The bitmap being used, if available.
 
         Returns:

+ 3 - 4
dulwich/patch.py

@@ -37,7 +37,6 @@ from typing import (
     IO,
     TYPE_CHECKING,
     BinaryIO,
-    Optional,
     TextIO,
 )
 
@@ -487,8 +486,8 @@ def gen_diff_header(
 # TODO(jelmer): Support writing unicode, rather than bytes.
 def write_blob_diff(
     f: IO[bytes],
-    old_file: tuple[bytes | None, int | None, Optional["Blob"]],
-    new_file: tuple[bytes | None, int | None, Optional["Blob"]],
+    old_file: tuple[bytes | None, int | None, "Blob | None"],
+    new_file: tuple[bytes | None, int | None, "Blob | None"],
     diff_algorithm: str | None = None,
 ) -> None:
     """Write blob diff.
@@ -506,7 +505,7 @@ def write_blob_diff(
     patched_old_path = patch_filename(old_path, b"a")
     patched_new_path = patch_filename(new_path, b"b")
 
-    def lines(blob: Optional["Blob"]) -> list[bytes]:
+    def lines(blob: "Blob | None") -> list[bytes]:
         """Split blob content into lines.
 
         Args:

+ 29 - 14
dulwich/porcelain.py

@@ -105,7 +105,6 @@ from typing import (
     TYPE_CHECKING,
     Any,
     BinaryIO,
-    Optional,
     TextIO,
     TypedDict,
     TypeVar,
@@ -114,10 +113,11 @@ from typing import (
 )
 
 if sys.version_info >= (3, 12):
-    from collections.abc import Buffer
     from typing import override
 else:
-    from typing_extensions import Buffer, override
+    from typing_extensions import override
+
+from ._typing import Buffer
 
 if TYPE_CHECKING:
     import urllib3
@@ -244,7 +244,7 @@ class TransportKwargs(TypedDict, total=False):
     password: str | None
     key_filename: str | None
     ssh_command: str | None
-    pool_manager: Optional["urllib3.PoolManager"]
+    pool_manager: "urllib3.PoolManager | None"
 
 
 @dataclass
@@ -281,14 +281,25 @@ class NoneStream(RawIOBase):
         """
         return b""
 
-    @override
-    def readinto(self, b: Buffer) -> int | None:
-        return 0
+    if sys.version_info >= (3, 12):
+
+        @override
+        def readinto(self, b: Buffer) -> int | None:
+            return 0
+
+        @override
+        def write(self, b: Buffer) -> int | None:
+            return len(cast(bytes, b)) if b else 0
+
+    else:
 
-    @override
-    def write(self, b: Buffer) -> int | None:
-        # All Buffer implementations (bytes, bytearray, memoryview) support len()
-        return len(b) if b else 0  # type: ignore[arg-type]
+        @override
+        def readinto(self, b: bytearray | memoryview) -> int | None:  # type: ignore[override]
+            return 0
+
+        @override
+        def write(self, b: bytes | bytearray | memoryview) -> int | None:  # type: ignore[override]
+            return len(b) if b else 0
 
 
 default_bytes_out_stream: BinaryIO = cast(
@@ -3074,10 +3085,14 @@ def push(
         for ref, error in (result.ref_status or {}).items():
             if error is not None:
                 errstream.write(
-                    b"Push of ref %s failed: %s\n" % (ref, error.encode(err_encoding))
+                    f"Push of ref {ref.decode('utf-8', 'replace')} failed: {error}\n".encode(
+                        err_encoding
+                    )
                 )
             else:
-                errstream.write(b"Ref %s updated\n" % ref)
+                errstream.write(
+                    f"Ref {ref.decode('utf-8', 'replace')} updated\n".encode()
+                )
 
         if remote_name is not None:
             _import_remote_refs(r.refs, remote_name, remote_changed_refs)
@@ -7050,7 +7065,7 @@ def filter_branch(
     repo: RepoPath = ".",
     branch: str | bytes = "HEAD",
     *,
-    filter_fn: Callable[[Commit], Optional["CommitData"]] | None = None,
+    filter_fn: Callable[[Commit], "CommitData | None"] | None = None,
     filter_author: Callable[[bytes], bytes | None] | None = None,
     filter_committer: Callable[[bytes], bytes | None] | None = None,
     filter_message: Callable[[bytes], bytes | None] | None = None,

+ 1 - 1
dulwich/protocol.py

@@ -247,7 +247,7 @@ def pkt_line(data: bytes | None) -> bytes:
     """
     if data is None:
         return b"0000"
-    return ("%04x" % (len(data) + 4)).encode("ascii") + data
+    return f"{len(data) + 4:04x}".encode("ascii") + data
 
 
 def pkt_seq(*seq: bytes | None) -> bytes:

+ 2 - 2
dulwich/rebase.py

@@ -27,7 +27,7 @@ import subprocess
 from collections.abc import Callable, Sequence
 from dataclasses import dataclass
 from enum import Enum
-from typing import Optional, Protocol, TypedDict
+from typing import Protocol, TypedDict
 
 from dulwich.graph import find_merge_base
 from dulwich.merge import three_way_merge
@@ -164,7 +164,7 @@ class RebaseTodoEntry:
         return " ".join(parts)
 
     @classmethod
-    def from_string(cls, line: str) -> Optional["RebaseTodoEntry"]:
+    def from_string(cls, line: str) -> "RebaseTodoEntry | None":
         """Parse a todo entry from a line.
 
         Args:

+ 6 - 7
dulwich/repo.py

@@ -41,7 +41,6 @@ from typing import (
     TYPE_CHECKING,
     Any,
     BinaryIO,
-    Optional,
     TypeVar,
 )
 
@@ -918,7 +917,7 @@ class BaseRepo:
         reverse: bool = False,
         max_entries: int | None = None,
         paths: Sequence[bytes] | None = None,
-        rename_detector: Optional["RenameDetector"] = None,
+        rename_detector: "RenameDetector | None" = None,
         follow: bool = False,
         since: int | None = None,
         until: int | None = None,
@@ -1208,7 +1207,7 @@ class Repo(BaseRepo):
     path: str
     bare: bool
     object_store: DiskObjectStore
-    filter_context: Optional["FilterContext"]
+    filter_context: "FilterContext | None"
 
     def __init__(
         self,
@@ -1857,7 +1856,7 @@ class Repo(BaseRepo):
         controldir: str | bytes | os.PathLike[str],
         bare: bool,
         object_store: PackBasedObjectStore | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
         default_branch: bytes | None = None,
         symlinks: bool | None = None,
         format: int | None = None,
@@ -1892,7 +1891,7 @@ class Repo(BaseRepo):
         path: str | bytes | os.PathLike[str],
         *,
         mkdir: bool = False,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
         default_branch: bytes | None = None,
         symlinks: bool | None = None,
         format: int | None = None,
@@ -1982,7 +1981,7 @@ class Repo(BaseRepo):
         *,
         mkdir: bool = False,
         object_store: PackBasedObjectStore | None = None,
-        config: Optional["StackedConfig"] = None,
+        config: "StackedConfig | None" = None,
         default_branch: bytes | None = None,
         format: int | None = None,
     ) -> "Repo":
@@ -2217,7 +2216,7 @@ class MemoryRepo(BaseRepo):
     those have a stronger dependency on the filesystem.
     """
 
-    filter_context: Optional["FilterContext"]
+    filter_context: "FilterContext | None"
 
     def __init__(self) -> None:
         """Create a new repository in memory."""

+ 4 - 9
dulwich/server.py

@@ -43,24 +43,19 @@ Currently supported capabilities:
  * symref
 """
 
-import collections
 import os
 import socket
 import socketserver
 import sys
 import time
 import zlib
+from collections import deque
 from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
 from collections.abc import Set as AbstractSet
 from functools import partial
-from typing import IO, TYPE_CHECKING, Optional
+from typing import IO, TYPE_CHECKING
 from typing import Protocol as TypingProtocol
 
-if sys.version_info >= (3, 12):
-    from collections.abc import Buffer
-else:
-    Buffer = bytes | bytearray | memoryview
-
 if TYPE_CHECKING:
     from .object_store import BaseObjectStore
     from .repo import BaseRepo
@@ -181,7 +176,7 @@ class BackendRepo(TypingProtocol):
         *,
         get_tagged: Callable[[], dict[bytes, bytes]] | None = None,
         depth: int | None = None,
-    ) -> Optional["MissingObjectFinder"]:
+    ) -> "MissingObjectFinder | None":
         """Yield the objects required for a list of commits.
 
         Args:
@@ -629,7 +624,7 @@ def _want_satisfied(
     Returns: True if the want is satisfied by the haves
     """
     o = store[want]
-    pending = collections.deque([o])
+    pending = deque([o])
     known = {want}
     while pending:
         commit = pending.popleft()

+ 4 - 4
dulwich/walk.py

@@ -21,8 +21,8 @@
 
 """General implementation of walking commits and their contents."""
 
-import collections
 import heapq
+from collections import defaultdict, deque
 from collections.abc import Callable, Iterator, Sequence
 from itertools import chain
 from typing import TYPE_CHECKING, Any, cast
@@ -338,7 +338,7 @@ class Walker:
 
         self._num_entries = 0
         self._queue = queue_cls(self)
-        self._out_queue: collections.deque[WalkEntry] = collections.deque()
+        self._out_queue: deque[WalkEntry] = deque()
 
     def _path_matches(self, changed_path: bytes | None) -> bool:
         if changed_path is None:
@@ -481,9 +481,9 @@ def _topo_reorder(
     Returns: iterator over WalkEntry objects from entries in FIFO order, except
         where a parent would be yielded before any of its children.
     """
-    todo: collections.deque[WalkEntry] = collections.deque()
+    todo: deque[WalkEntry] = deque()
     pending: dict[bytes, WalkEntry] = {}
-    num_children: dict[bytes, int] = collections.defaultdict(int)
+    num_children: dict[bytes, int] = defaultdict(int)
     for entry in entries:
         todo.append(entry)
         for p in get_parents(entry.commit):

+ 2 - 5
dulwich/web.py

@@ -59,7 +59,6 @@ from typing import (
     Any,
     BinaryIO,
     ClassVar,
-    Union,
     cast,
 )
 from urllib.parse import parse_qs
@@ -582,8 +581,7 @@ class HTTPGitRequest:
         environ: WSGIEnvironment,
         start_response: StartResponse,
         dumb: bool = False,
-        handlers: dict[bytes, Union["HandlerConstructor", Callable[..., Any]]]
-        | None = None,
+        handlers: dict[bytes, "HandlerConstructor | Callable[..., Any]"] | None = None,
     ) -> None:
         """Initialize HTTPGitRequest.
 
@@ -687,8 +685,7 @@ class HTTPGitApplication:
         self,
         backend: Backend,
         dumb: bool = False,
-        handlers: dict[bytes, Union["HandlerConstructor", Callable[..., Any]]]
-        | None = None,
+        handlers: dict[bytes, "HandlerConstructor | Callable[..., Any]"] | None = None,
         fallback_app: WSGIApplication | None = None,
     ) -> None:
         """Initialize HTTPGitApplication.