Jelmer Vernooij 4 bulan lalu
induk
melakukan
3759bf0e9e

+ 5 - 4
dulwich/bundle.py

@@ -20,7 +20,8 @@
 
 """Bundle format support."""
 
-from typing import Dict, List, Optional, Sequence, Tuple, Union
+from collections.abc import Sequence
+from typing import Optional, Union
 
 from .pack import PackData, write_pack_data
 
@@ -28,9 +29,9 @@ from .pack import PackData, write_pack_data
 class Bundle:
     version: Optional[int]
 
-    capabilities: Dict[str, str]
-    prerequisites: List[Tuple[bytes, str]]
-    references: Dict[str, bytes]
+    capabilities: dict[str, str]
+    prerequisites: list[tuple[bytes, str]]
+    references: dict[str, bytes]
     pack_data: Union[PackData, Sequence[bytes]]
 
     def __repr__(self) -> str:

+ 6 - 6
dulwich/cli.py

@@ -33,7 +33,7 @@ import os
 import signal
 import sys
 from getopt import getopt
-from typing import ClassVar, Dict, Optional, Type
+from typing import ClassVar, Optional
 
 from dulwich import porcelain
 
@@ -644,8 +644,8 @@ class cmd_remote_add(Command):
 
 
 class SuperCommand(Command):
-    subcommands: ClassVar[Dict[str, Type[Command]]] = {}
-    default_command: ClassVar[Optional[Type[Command]]] = None
+    subcommands: ClassVar[dict[str, type[Command]]] = {}
+    default_command: ClassVar[Optional[type[Command]]] = None
 
     def run(self, args):
         if not args and not self.default_command:
@@ -663,7 +663,7 @@ class SuperCommand(Command):
 
 
 class cmd_remote(SuperCommand):
-    subcommands: ClassVar[Dict[str, Type[Command]]] = {
+    subcommands: ClassVar[dict[str, type[Command]]] = {
         "add": cmd_remote_add,
     }
 
@@ -684,7 +684,7 @@ class cmd_submodule_init(Command):
 
 
 class cmd_submodule(SuperCommand):
-    subcommands: ClassVar[Dict[str, Type[Command]]] = {
+    subcommands: ClassVar[dict[str, type[Command]]] = {
         "init": cmd_submodule_init,
     }
 
@@ -736,7 +736,7 @@ class cmd_stash_pop(Command):
 
 
 class cmd_stash(SuperCommand):
-    subcommands: ClassVar[Dict[str, Type[Command]]] = {
+    subcommands: ClassVar[dict[str, type[Command]]] = {
         "list": cmd_stash_list,
         "pop": cmd_stash_pop,
         "push": cmd_stash_push,

+ 23 - 28
dulwich/client.py

@@ -45,6 +45,7 @@ import select
 import socket
 import subprocess
 import sys
+from collections.abc import Iterable, Iterator
 from contextlib import closing
 from io import BufferedReader, BytesIO
 from typing import (
@@ -52,13 +53,7 @@ from typing import (
     TYPE_CHECKING,
     Callable,
     ClassVar,
-    Dict,
-    Iterable,
-    Iterator,
-    List,
     Optional,
-    Set,
-    Tuple,
     Union,
 )
 from urllib.parse import quote as urlquote
@@ -210,7 +205,7 @@ class ReportStatusParser:
     def __init__(self) -> None:
         self._done = False
         self._pack_status = None
-        self._ref_statuses: List[bytes] = []
+        self._ref_statuses: list[bytes] = []
 
     def check(self):
         """Check if there were any errors and, if so, raise exceptions.
@@ -272,7 +267,7 @@ def read_server_capabilities(pkt_seq):
 
 def read_pkt_refs_v2(
     pkt_seq,
-) -> Tuple[Dict[bytes, bytes], Dict[bytes, bytes], Dict[bytes, bytes]]:
+) -> tuple[dict[bytes, bytes], dict[bytes, bytes], dict[bytes, bytes]]:
     refs = {}
     symrefs = {}
     peeled = {}
@@ -295,7 +290,7 @@ def read_pkt_refs_v2(
     return refs, symrefs, peeled
 
 
-def read_pkt_refs_v1(pkt_seq) -> Tuple[Dict[bytes, bytes], Set[bytes]]:
+def read_pkt_refs_v1(pkt_seq) -> tuple[dict[bytes, bytes], set[bytes]]:
     server_capabilities = None
     refs = {}
     # Receive refs from server
@@ -324,7 +319,7 @@ class FetchPackResult:
       agent: User agent string
     """
 
-    _FORWARDED_ATTRS: ClassVar[Set[str]] = {
+    _FORWARDED_ATTRS: ClassVar[set[str]] = {
         "clear",
         "copy",
         "fromkeys",
@@ -405,7 +400,7 @@ class SendPackResult:
         failed to update), or None if it was updated successfully
     """
 
-    _FORWARDED_ATTRS: ClassVar[Set[str]] = {
+    _FORWARDED_ATTRS: ClassVar[set[str]] = {
         "clear",
         "copy",
         "fromkeys",
@@ -489,8 +484,8 @@ def _read_shallow_updates(pkt_seq):
 
 class _v1ReceivePackHeader:
     def __init__(self, capabilities, old_refs, new_refs) -> None:
-        self.want: List[bytes] = []
-        self.have: List[bytes] = []
+        self.want: list[bytes] = []
+        self.have: list[bytes] = []
         self._it = self._handle_receive_pack_head(capabilities, old_refs, new_refs)
         self.sent_capabilities = False
 
@@ -546,7 +541,7 @@ class _v1ReceivePackHeader:
         yield None
 
 
-def _read_side_band64k_data(pkt_seq: Iterable[bytes]) -> Iterator[Tuple[int, bytes]]:
+def _read_side_band64k_data(pkt_seq: Iterable[bytes]) -> Iterator[tuple[int, bytes]]:
     """Read per-channel data.
 
     This requires the side-band-64k capability.
@@ -654,7 +649,7 @@ def _handle_upload_pack_head(
 
 def _handle_upload_pack_tail(
     proto,
-    capabilities: Set[bytes],
+    capabilities: set[bytes],
     graph_walker,
     pack_data: Callable[[bytes], None],
     progress: Optional[Callable[[bytes], None]] = None,
@@ -797,7 +792,7 @@ class GitClient:
         path,
         update_refs,
         generate_pack_data: Callable[
-            [Set[bytes], Set[bytes], bool], Tuple[int, Iterator[UnpackedObject]]
+            [set[bytes], set[bytes], bool], tuple[int, Iterator[UnpackedObject]]
         ],
         progress=None,
     ):
@@ -924,11 +919,11 @@ class GitClient:
         path: str,
         target: Repo,
         determine_wants: Optional[
-            Callable[[Dict[bytes, bytes], Optional[int]], List[bytes]]
+            Callable[[dict[bytes, bytes], Optional[int]], list[bytes]]
         ] = None,
         progress: Optional[Callable[[bytes], None]] = None,
         depth: Optional[int] = None,
-        ref_prefix: Optional[List[bytes]] = [b"HEAD", b"refs/"],
+        ref_prefix: Optional[list[bytes]] = [b"HEAD", b"refs/"],
         filter_spec: Optional[bytes] = None,
         protocol_version: Optional[int] = None,
     ) -> FetchPackResult:
@@ -1065,9 +1060,9 @@ class GitClient:
     def _handle_receive_pack_tail(
         self,
         proto: Protocol,
-        capabilities: Set[bytes],
+        capabilities: set[bytes],
         progress: Optional[Callable[[bytes], None]] = None,
-    ) -> Optional[Dict[bytes, Optional[str]]]:
+    ) -> Optional[dict[bytes, Optional[str]]]:
         """Handle the tail of a 'git-receive-pack' request.
 
         Args:
@@ -1637,7 +1632,7 @@ class SubprocessWrapper:
         self.proc.wait()
 
 
-def find_git_command() -> List[str]:
+def find_git_command() -> list[str]:
     """Find command to run for system Git (usually C Git)."""
     if sys.platform == "win32":  # support .exe, .bat and .cmd
         try:  # to avoid overhead
@@ -1836,7 +1831,7 @@ class LocalGitClient(GitClient):
         pack_data,
         progress=None,
         depth=None,
-        ref_prefix: Optional[List[bytes]] = [b"HEAD", b"refs/"],
+        ref_prefix: Optional[list[bytes]] = [b"HEAD", b"refs/"],
         filter_spec: Optional[bytes] = None,
         protocol_version: Optional[int] = None,
     ) -> FetchPackResult:
@@ -2090,7 +2085,7 @@ class SSHGitClient(TraditionalGitClient):
             "GIT_SSH_COMMAND", os.environ.get("GIT_SSH")
         )
         super().__init__(**kwargs)
-        self.alternative_paths: Dict[bytes, bytes] = {}
+        self.alternative_paths: dict[bytes, bytes] = {}
         if vendor is not None:
             self.ssh_vendor = vendor
         else:
@@ -2355,8 +2350,8 @@ class AbstractHttpGitClient(GitClient):
 
     def _discover_references(
         self, service, base_url, protocol_version=None
-    ) -> Tuple[
-        Dict[Ref, ObjectID], Set[bytes], str, Dict[Ref, Ref], Dict[Ref, ObjectID]
+    ) -> tuple[
+        dict[Ref, ObjectID], set[bytes], str, dict[Ref, Ref], dict[Ref, ObjectID]
     ]:
         if (
             protocol_version is not None
@@ -2824,7 +2819,7 @@ def _win32_url_to_path(parsed) -> str:
 
 def get_transport_and_path_from_url(
     url: str, config: Optional[Config] = None, operation: Optional[str] = None, **kwargs
-) -> Tuple[GitClient, str]:
+) -> tuple[GitClient, str]:
     """Obtain a git client from a URL.
 
     Args:
@@ -2869,7 +2864,7 @@ def _get_transport_and_path_from_url(url, config, operation, **kwargs):
     raise ValueError(f"unknown scheme '{parsed.scheme}'")
 
 
-def parse_rsync_url(location: str) -> Tuple[Optional[str], str, str]:
+def parse_rsync_url(location: str) -> tuple[Optional[str], str, str]:
     """Parse a rsync-style URL."""
     if ":" in location and "@" not in location:
         # SSH with no user@, zero or one leading slash.
@@ -2893,7 +2888,7 @@ def get_transport_and_path(
     config: Optional[Config] = None,
     operation: Optional[str] = None,
     **kwargs,
-) -> Tuple[GitClient, str]:
+) -> tuple[GitClient, str]:
     """Obtain a git client from a URL.
 
     Args:

+ 15 - 21
dulwich/config.py

@@ -28,18 +28,12 @@ Todo:
 
 import os
 import sys
+from collections.abc import Iterable, Iterator, KeysView, MutableMapping
 from contextlib import suppress
 from typing import (
     Any,
     BinaryIO,
-    Dict,
-    Iterable,
-    Iterator,
-    KeysView,
-    List,
-    MutableMapping,
     Optional,
-    Tuple,
     Union,
     overload,
 )
@@ -61,8 +55,8 @@ def lower_key(key):
 
 class CaseInsensitiveOrderedMultiDict(MutableMapping):
     def __init__(self) -> None:
-        self._real: List[Any] = []
-        self._keyed: Dict[Any, Any] = {}
+        self._real: list[Any] = []
+        self._keyed: dict[Any, Any] = {}
 
     @classmethod
     def make(cls, dict_in=None):
@@ -85,7 +79,7 @@ class CaseInsensitiveOrderedMultiDict(MutableMapping):
     def __len__(self) -> int:
         return len(self._keyed)
 
-    def keys(self) -> KeysView[Tuple[bytes, ...]]:
+    def keys(self) -> KeysView[tuple[bytes, ...]]:
         return self._keyed.keys()
 
     def items(self):
@@ -139,8 +133,8 @@ class CaseInsensitiveOrderedMultiDict(MutableMapping):
 
 Name = bytes
 NameLike = Union[bytes, str]
-Section = Tuple[bytes, ...]
-SectionLike = Union[bytes, str, Tuple[Union[bytes, str], ...]]
+Section = tuple[bytes, ...]
+SectionLike = Union[bytes, str, tuple[Union[bytes, str], ...]]
 Value = bytes
 ValueLike = Union[bytes, str]
 
@@ -218,7 +212,7 @@ class Config:
         """
         raise NotImplementedError(self.set)
 
-    def items(self, section: SectionLike) -> Iterator[Tuple[Name, Value]]:
+    def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
         """Iterate over the configuration pairs for a specific section.
 
         Args:
@@ -293,7 +287,7 @@ class ConfigDict(Config, MutableMapping[Section, MutableMapping[Name, Value]]):
 
     def _check_section_and_name(
         self, section: SectionLike, name: NameLike
-    ) -> Tuple[Section, Name]:
+    ) -> tuple[Section, Name]:
         if not isinstance(section, tuple):
             section = (section,)
 
@@ -355,7 +349,7 @@ class ConfigDict(Config, MutableMapping[Section, MutableMapping[Name, Value]]):
 
     def items(  # type: ignore[override]
         self, section: Section
-    ) -> Iterator[Tuple[Name, Value]]:
+    ) -> Iterator[tuple[Name, Value]]:
         return self._values.get(section).items()
 
     def sections(self) -> Iterator[Section]:
@@ -469,7 +463,7 @@ def _strip_comments(line: bytes) -> bytes:
     return line
 
 
-def _parse_section_header_line(line: bytes) -> Tuple[Section, bytes]:
+def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
     # Parse section header ("[bla]")
     line = _strip_comments(line).rstrip()
     in_quotes = False
@@ -667,7 +661,7 @@ class StackedConfig(Config):
     """Configuration which reads from multiple config files.."""
 
     def __init__(
-        self, backends: List[ConfigFile], writable: Optional[ConfigFile] = None
+        self, backends: list[ConfigFile], writable: Optional[ConfigFile] = None
     ) -> None:
         self.backends = backends
         self.writable = writable
@@ -680,7 +674,7 @@ class StackedConfig(Config):
         return cls(cls.default_backends())
 
     @classmethod
-    def default_backends(cls) -> List[ConfigFile]:
+    def default_backends(cls) -> list[ConfigFile]:
         """Retrieve the default configuration.
 
         See git-config(1) for details on the files searched.
@@ -738,13 +732,13 @@ class StackedConfig(Config):
                     yield section
 
 
-def read_submodules(path: str) -> Iterator[Tuple[bytes, bytes, bytes]]:
+def read_submodules(path: str) -> Iterator[tuple[bytes, bytes, bytes]]:
     """Read a .gitmodules file."""
     cfg = ConfigFile.from_path(path)
     return parse_submodules(cfg)
 
 
-def parse_submodules(config: ConfigFile) -> Iterator[Tuple[bytes, bytes, bytes]]:
+def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
     """Parse a gitmodules GitConfig file, returning submodules.
 
     Args:
@@ -767,7 +761,7 @@ def parse_submodules(config: ConfigFile) -> Iterator[Tuple[bytes, bytes, bytes]]
                 pass
 
 
-def iter_instead_of(config: Config, push: bool = False) -> Iterable[Tuple[str, str]]:
+def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
     """Iterate over insteadOf / pushInsteadOf values."""
     for section in config.sections():
         if section[0] != b"url":

+ 3 - 3
dulwich/contrib/diffstat.py

@@ -34,7 +34,7 @@
 
 import re
 import sys
-from typing import List, Optional, Tuple
+from typing import Optional
 
 # only needs to detect git style diffs as this is for
 # use with dulwich
@@ -56,8 +56,8 @@ _GIT_UNCHANGED_START = b" "
 
 
 def _parse_patch(
-    lines: List[bytes],
-) -> Tuple[List[bytes], List[bool], List[Tuple[int, int]]]:
+    lines: list[bytes],
+) -> tuple[list[bytes], list[bool], list[tuple[int, int]]]:
     """Parse a git style diff or patch to generate diff stats.
 
     Args:

+ 2 - 1
dulwich/credentials.py

@@ -26,7 +26,8 @@ https://git-scm.com/book/en/v2/Git-Tools-Credential-Storage
 """
 
 import sys
-from typing import Iterator, Optional
+from collections.abc import Iterator
+from typing import Optional
 from urllib.parse import ParseResult, urlparse
 
 from .config import ConfigDict, SectionLike

+ 7 - 7
dulwich/diff_tree.py

@@ -24,7 +24,7 @@ import stat
 from collections import defaultdict, namedtuple
 from io import BytesIO
 from itertools import chain
-from typing import Dict, List, Optional
+from typing import Optional
 
 from .object_store import BaseObjectStore
 from .objects import S_ISGITLINK, ObjectID, ShaFile, Tree, TreeEntry
@@ -59,8 +59,8 @@ class TreeChange(namedtuple("TreeChange", ["type", "old", "new"])):
         return cls(CHANGE_DELETE, old, _NULL_ENTRY)
 
 
-def _tree_entries(path: str, tree: Tree) -> List[TreeEntry]:
-    result: List[TreeEntry] = []
+def _tree_entries(path: str, tree: Tree) -> list[TreeEntry]:
+    result: list[TreeEntry] = []
     if not tree:
         return result
     for entry in tree.iteritems(name_order=True):
@@ -241,7 +241,7 @@ def _all_same(seq, key):
 
 def tree_changes_for_merge(
     store: BaseObjectStore,
-    parent_tree_ids: List[ObjectID],
+    parent_tree_ids: list[ObjectID],
     tree_id: ObjectID,
     rename_detector=None,
 ):
@@ -270,7 +270,7 @@ def tree_changes_for_merge(
         for t in parent_tree_ids
     ]
     num_parents = len(parent_tree_ids)
-    changes_by_path: Dict[str, List[Optional[TreeChange]]] = defaultdict(
+    changes_by_path: dict[str, list[Optional[TreeChange]]] = defaultdict(
         lambda: [None] * num_parents
     )
 
@@ -308,7 +308,7 @@ def tree_changes_for_merge(
 _BLOCK_SIZE = 64
 
 
-def _count_blocks(obj: ShaFile) -> Dict[int, int]:
+def _count_blocks(obj: ShaFile) -> dict[int, int]:
     """Count the blocks in an object.
 
     Splits the data into blocks either on lines or <=64-byte chunks of lines.
@@ -319,7 +319,7 @@ def _count_blocks(obj: ShaFile) -> Dict[int, int]:
     Returns:
       A dict of block hashcode -> total bytes occurring.
     """
-    block_counts: Dict[int, int] = defaultdict(int)
+    block_counts: dict[int, int] = defaultdict(int)
     block = BytesIO()
     n = 0
 

+ 3 - 4
dulwich/fastexport.py

@@ -22,7 +22,6 @@
 """Fast export/import functionality."""
 
 import stat
-from typing import Dict, Tuple
 
 from fastimport import commands, parser, processor
 from fastimport import errors as fastimport_errors
@@ -43,7 +42,7 @@ class GitFastExporter:
     def __init__(self, outf, store) -> None:
         self.outf = outf
         self.store = store
-        self.markers: Dict[bytes, bytes] = {}
+        self.markers: dict[bytes, bytes] = {}
         self._marker_idx = 0
 
     def print_cmd(self, cmd):
@@ -126,8 +125,8 @@ class GitImportProcessor(processor.ImportProcessor):
         processor.ImportProcessor.__init__(self, params, verbose)
         self.repo = repo
         self.last_commit = ZERO_SHA
-        self.markers: Dict[bytes, bytes] = {}
-        self._contents: Dict[bytes, Tuple[int, bytes]] = {}
+        self.markers: dict[bytes, bytes] = {}
+        self._contents: dict[bytes, tuple[int, bytes]] = {}
 
     def lookup_object(self, objectish):
         if objectish.startswith(b":"):

+ 3 - 3
dulwich/file.py

@@ -23,7 +23,7 @@
 import os
 import sys
 import warnings
-from typing import ClassVar, Set
+from typing import ClassVar
 
 
 def ensure_dir_exists(dirname):
@@ -115,7 +115,7 @@ class _GitFile:
         released. Typically this will happen in a finally block.
     """
 
-    PROXY_PROPERTIES: ClassVar[Set[str]] = {
+    PROXY_PROPERTIES: ClassVar[set[str]] = {
         "closed",
         "encoding",
         "errors",
@@ -124,7 +124,7 @@ class _GitFile:
         "newlines",
         "softspace",
     }
-    PROXY_METHODS: ClassVar[Set[str]] = {
+    PROXY_METHODS: ClassVar[set[str]] = {
         "__iter__",
         "flush",
         "fileno",

+ 4 - 4
dulwich/greenthreads.py

@@ -22,7 +22,7 @@
 
 """Utility module for querying an ObjectStore with gevent."""
 
-from typing import FrozenSet, Optional, Set, Tuple
+from typing import Optional
 
 import gevent
 from gevent import pool
@@ -95,7 +95,7 @@ class GreenThreadsMissingObjectFinder(MissingObjectFinder):
         want_commits, want_tags = _split_commits_and_tags(
             object_store, wants, ignore_unknown=False, pool=p
         )
-        all_ancestors: FrozenSet[ObjectID] = frozenset(
+        all_ancestors: frozenset[ObjectID] = frozenset(
             _collect_ancestors(object_store, have_commits)[0]
         )
         missing_commits, common_commits = _collect_ancestors(
@@ -109,8 +109,8 @@ class GreenThreadsMissingObjectFinder(MissingObjectFinder):
             self.sha_done.add(t)
         missing_tags = want_tags.difference(have_tags)
         wants = missing_commits.union(missing_tags)
-        self.objects_to_send: Set[
-            Tuple[ObjectID, Optional[bytes], Optional[int], bool]
+        self.objects_to_send: set[
+            tuple[ObjectID, Optional[bytes], Optional[int], bool]
         ] = {(w, None, 0, False) for w in wants}
         if progress is None:
             self.progress = lambda x: None

+ 5 - 4
dulwich/ignore.py

@@ -24,8 +24,9 @@ For details for the matching rules, see https://git-scm.com/docs/gitignore
 
 import os.path
 import re
+from collections.abc import Iterable
 from contextlib import suppress
-from typing import TYPE_CHECKING, BinaryIO, Dict, Iterable, List, Optional, Union
+from typing import TYPE_CHECKING, BinaryIO, Optional, Union
 
 if TYPE_CHECKING:
     from .repo import Repo
@@ -193,7 +194,7 @@ class IgnoreFilter:
     def __init__(
         self, patterns: Iterable[bytes], ignorecase: bool = False, path=None
     ) -> None:
-        self._patterns: List[Pattern] = []
+        self._patterns: list[Pattern] = []
         self._ignorecase = ignorecase
         self._path = path
         for pattern in patterns:
@@ -290,10 +291,10 @@ class IgnoreFilterManager:
     def __init__(
         self,
         top_path: str,
-        global_filters: List[IgnoreFilter],
+        global_filters: list[IgnoreFilter],
         ignorecase: bool,
     ) -> None:
-        self._path_filters: Dict[str, Optional[IgnoreFilter]] = {}
+        self._path_filters: dict[str, Optional[IgnoreFilter]] = {}
         self._top_path = top_path
         self._global_filters = global_filters
         self._ignorecase = ignorecase

+ 24 - 28
dulwich/index.py

@@ -24,18 +24,14 @@ import os
 import stat
 import struct
 import sys
+from collections.abc import Iterable, Iterator
 from dataclasses import dataclass
 from enum import Enum
 from typing import (
     Any,
     BinaryIO,
     Callable,
-    Dict,
-    Iterable,
-    Iterator,
-    List,
     Optional,
-    Tuple,
     Union,
 )
 
@@ -82,8 +78,8 @@ class Stage(Enum):
 @dataclass
 class SerializedIndexEntry:
     name: bytes
-    ctime: Union[int, float, Tuple[int, int]]
-    mtime: Union[int, float, Tuple[int, int]]
+    ctime: Union[int, float, tuple[int, int]]
+    mtime: Union[int, float, tuple[int, int]]
     dev: int
     ino: int
     mode: int
@@ -100,8 +96,8 @@ class SerializedIndexEntry:
 
 @dataclass
 class IndexEntry:
-    ctime: Union[int, float, Tuple[int, int]]
-    mtime: Union[int, float, Tuple[int, int]]
+    ctime: Union[int, float, tuple[int, int]]
+    mtime: Union[int, float, tuple[int, int]]
     dev: int
     ino: int
     mode: int
@@ -163,7 +159,7 @@ class UnmergedEntries(Exception):
     """Unmerged entries exist in the index."""
 
 
-def pathsplit(path: bytes) -> Tuple[bytes, bytes]:
+def pathsplit(path: bytes) -> tuple[bytes, bytes]:
     """Split a /-delimited path into a directory part and a basename.
 
     Args:
@@ -314,14 +310,14 @@ def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
         yield read_cache_entry(f, version)
 
 
-def read_index_dict(f) -> Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
+def read_index_dict(f) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
     """Read an index file and return it as a dictionary.
        Dict Key is tuple of path and stage number, as
             path alone is not unique
     Args:
       f: File object to read fromls.
     """
-    ret: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
+    ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
     for entry in read_index(f):
         stage = entry.stage()
         if stage == Stage.NORMAL:
@@ -340,7 +336,7 @@ def read_index_dict(f) -> Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
 
 
 def write_index(
-    f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None
+    f: BinaryIO, entries: list[SerializedIndexEntry], version: Optional[int] = None
 ):
     """Write an index file.
 
@@ -359,7 +355,7 @@ def write_index(
 
 def write_index_dict(
     f: BinaryIO,
-    entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
+    entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
     version: Optional[int] = None,
 ) -> None:
     """Write an index file based on the contents of a dictionary.
@@ -412,7 +408,7 @@ def cleanup_mode(mode: int) -> int:
 class Index:
     """A Git Index file."""
 
-    _byname: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
+    _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
 
     def __init__(self, filename: Union[bytes, str], read=True) -> None:
         """Create an index object associated with the given filename.
@@ -491,7 +487,7 @@ class Index:
             raise UnmergedEntries
         return value.mode
 
-    def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]:
+    def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
         """Iterate over path, sha, mode tuples for use with commit_tree."""
         for path in self:
             entry = self[path]
@@ -520,13 +516,13 @@ class Index:
 
     def iteritems(
         self,
-    ) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
+    ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
         return iter(self._byname.items())
 
-    def items(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
+    def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
         return iter(self._byname.items())
 
-    def update(self, entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]):
+    def update(self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]):
         for key, value in entries.items():
             self[key] = value
 
@@ -570,7 +566,7 @@ class Index:
 
 
 def commit_tree(
-    object_store: ObjectContainer, blobs: Iterable[Tuple[bytes, bytes, int]]
+    object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
 ) -> bytes:
     """Commit a new tree.
 
@@ -580,7 +576,7 @@ def commit_tree(
     Returns:
       SHA1 of the created tree.
     """
-    trees: Dict[bytes, Any] = {b"": {}}
+    trees: dict[bytes, Any] = {b"": {}}
 
     def add_tree(path):
         if path in trees:
@@ -627,15 +623,15 @@ def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
 
 def changes_from_tree(
     names: Iterable[bytes],
-    lookup_entry: Callable[[bytes], Tuple[bytes, int]],
+    lookup_entry: Callable[[bytes], tuple[bytes, int]],
     object_store: ObjectContainer,
     tree: Optional[bytes],
     want_unchanged=False,
 ) -> Iterable[
-    Tuple[
-        Tuple[Optional[bytes], Optional[bytes]],
-        Tuple[Optional[int], Optional[int]],
-        Tuple[Optional[bytes], Optional[bytes]],
+    tuple[
+        tuple[Optional[bytes], Optional[bytes]],
+        tuple[Optional[int], Optional[int]],
+        tuple[Optional[bytes], Optional[bytes]],
     ]
 ]:
     """Find the differences between the contents of a tree and
@@ -1089,7 +1085,7 @@ def iter_fresh_entries(
     paths: Iterable[bytes],
     root_path: bytes,
     object_store: Optional[ObjectContainer] = None,
-) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
+) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
     """Iterate over current versions of index entries on disk.
 
     Args:
@@ -1109,7 +1105,7 @@ def iter_fresh_entries(
 
 def iter_fresh_objects(
     paths: Iterable[bytes], root_path: bytes, include_deleted=False, object_store=None
-) -> Iterator[Tuple[bytes, Optional[bytes], Optional[int]]]:
+) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
     """Iterate over versions of objects on disk referenced by index.
 
     Args:

+ 4 - 3
dulwich/lru_cache.py

@@ -21,7 +21,8 @@
 
 """A simple least-recently-used (LRU) cache."""
 
-from typing import Callable, Dict, Generic, Iterable, Iterator, Optional, TypeVar
+from collections.abc import Iterable, Iterator
+from typing import Callable, Generic, Optional, TypeVar
 
 _null_key = object()
 
@@ -74,7 +75,7 @@ class LRUCache(Generic[K, V]):
     def __init__(
         self, max_cache: int = 100, after_cleanup_count: Optional[int] = None
     ) -> None:
-        self._cache: Dict[K, _LRUNode[K, V]] = {}
+        self._cache: dict[K, _LRUNode[K, V]] = {}
         # The "HEAD" of the lru linked list
         self._most_recently_used = None
         # The "TAIL" of the lru linked list
@@ -209,7 +210,7 @@ class LRUCache(Generic[K, V]):
         """
         return self._cache.keys()
 
-    def items(self) -> Dict[K, V]:
+    def items(self) -> dict[K, V]:
         """Get the key:value pairs as a dict."""
         return {k: n.value for k, n in self._cache.items()}
 

+ 2 - 2
dulwich/mailmap.py

@@ -20,7 +20,7 @@
 
 """Mailmap file reader."""
 
-from typing import Dict, Optional, Tuple
+from typing import Optional
 
 
 def parse_identity(text):
@@ -64,7 +64,7 @@ class Mailmap:
     """Class for accessing a mailmap file."""
 
     def __init__(self, map=None) -> None:
-        self._table: Dict[Tuple[Optional[str], str], Tuple[str, str]] = {}
+        self._table: dict[tuple[Optional[str], str], tuple[str, str]] = {}
         if map:
             for canonical_identity, from_identity in map:
                 self.add_entry(canonical_identity, from_identity)

+ 26 - 33
dulwich/object_store.py

@@ -27,20 +27,13 @@ import os
 import stat
 import sys
 import warnings
+from collections.abc import Iterable, Iterator, Sequence
 from contextlib import suppress
 from io import BytesIO
 from typing import (
     Callable,
-    Dict,
-    FrozenSet,
-    Iterable,
-    Iterator,
-    List,
     Optional,
     Protocol,
-    Sequence,
-    Set,
-    Tuple,
     cast,
 )
 
@@ -96,7 +89,7 @@ PACK_MODE = 0o444 if sys.platform != "win32" else 0o644
 
 
 class PackContainer(Protocol):
-    def add_pack(self) -> Tuple[BytesIO, Callable[[], None], Callable[[], None]]:
+    def add_pack(self) -> tuple[BytesIO, Callable[[], None], Callable[[], None]]:
         """Add a new pack."""
 
 
@@ -104,8 +97,8 @@ class BaseObjectStore:
     """Object store interface."""
 
     def determine_wants_all(
-        self, refs: Dict[Ref, ObjectID], depth: Optional[int] = None
-    ) -> List[ObjectID]:
+        self, refs: dict[Ref, ObjectID], depth: Optional[int] = None
+    ) -> list[ObjectID]:
         def _want_deepen(sha):
             if not depth:
                 return False
@@ -286,7 +279,7 @@ class BaseObjectStore:
 
     def generate_pack_data(
         self, have, want, shallow=None, progress=None, ofs_delta=True
-    ) -> Tuple[int, Iterator[UnpackedObject]]:
+    ) -> tuple[int, Iterator[UnpackedObject]]:
         """Generate pack data objects for a set of wants/haves.
 
         Args:
@@ -373,10 +366,10 @@ class BaseObjectStore:
 
 class PackBasedObjectStore(BaseObjectStore):
     def __init__(self, pack_compression_level=-1) -> None:
-        self._pack_cache: Dict[str, Pack] = {}
+        self._pack_cache: dict[str, Pack] = {}
         self.pack_compression_level = pack_compression_level
 
-    def add_pack(self) -> Tuple[BytesIO, Callable[[], None], Callable[[], None]]:
+    def add_pack(self) -> tuple[BytesIO, Callable[[], None], Callable[[], None]]:
         """Add a new pack to this object store."""
         raise NotImplementedError(self.add_pack)
 
@@ -446,7 +439,7 @@ class PackBasedObjectStore(BaseObjectStore):
 
     def generate_pack_data(
         self, have, want, shallow=None, progress=None, ofs_delta=True
-    ) -> Tuple[int, Iterator[UnpackedObject]]:
+    ) -> tuple[int, Iterator[UnpackedObject]]:
         """Generate pack data objects for a set of wants/haves.
 
         Args:
@@ -616,7 +609,7 @@ class PackBasedObjectStore(BaseObjectStore):
         allow_missing: bool = False,
         convert_ofs_delta: bool = True,
     ) -> Iterator[ShaFile]:
-        todo: Set[bytes] = set(shas)
+        todo: set[bytes] = set(shas)
         for p in self._iter_cached_packs():
             for unpacked in p.iter_unpacked_subset(
                 todo,
@@ -653,7 +646,7 @@ class PackBasedObjectStore(BaseObjectStore):
     def iterobjects_subset(
         self, shas: Iterable[bytes], *, allow_missing: bool = False
     ) -> Iterator[ShaFile]:
-        todo: Set[bytes] = set(shas)
+        todo: set[bytes] = set(shas)
         for p in self._iter_cached_packs():
             for o in p.iterobjects_subset(todo, allow_missing=True):
                 yield o
@@ -716,7 +709,7 @@ class PackBasedObjectStore(BaseObjectStore):
 
     def add_objects(
         self,
-        objects: Sequence[Tuple[ShaFile, Optional[str]]],
+        objects: Sequence[tuple[ShaFile, Optional[str]]],
         progress: Optional[Callable[[str], None]] = None,
     ) -> None:
         """Add a set of objects to this object store.
@@ -1076,7 +1069,7 @@ class MemoryObjectStore(BaseObjectStore):
 
     def __init__(self) -> None:
         super().__init__()
-        self._data: Dict[str, ShaFile] = {}
+        self._data: dict[str, ShaFile] = {}
         self.pack_compression_level = -1
 
     def _to_hexsha(self, sha):
@@ -1222,7 +1215,7 @@ def tree_lookup_path(lookup_obj, root_sha, path):
 
 
 def _collect_filetree_revs(
-    obj_store: ObjectContainer, tree_sha: ObjectID, kset: Set[ObjectID]
+    obj_store: ObjectContainer, tree_sha: ObjectID, kset: set[ObjectID]
 ) -> None:
     """Collect SHA1s of files and directories for specified tree.
 
@@ -1242,7 +1235,7 @@ def _collect_filetree_revs(
 
 def _split_commits_and_tags(
     obj_store: ObjectContainer, lst, *, ignore_unknown=False
-) -> Tuple[Set[bytes], Set[bytes], Set[bytes]]:
+) -> tuple[set[bytes], set[bytes], set[bytes]]:
     """Split object id list into three lists with commit, tag, and other SHAs.
 
     Commits referenced by tags are included into commits
@@ -1257,9 +1250,9 @@ def _split_commits_and_tags(
         silently.
     Returns: A tuple of (commits, tags, others) SHA1s
     """
-    commits: Set[bytes] = set()
-    tags: Set[bytes] = set()
-    others: Set[bytes] = set()
+    commits: set[bytes] = set()
+    tags: set[bytes] = set()
+    others: set[bytes] = set()
     for e in lst:
         try:
             o = obj_store[e]
@@ -1339,7 +1332,7 @@ class MissingObjectFinder:
             shallow=shallow,
             get_parents=self._get_parents,
         )
-        self.remote_has: Set[bytes] = set()
+        self.remote_has: set[bytes] = set()
         # Now, fill sha_done with commits and revisions of
         # files and directories known to be both locally
         # and on target. Thus these commits and files
@@ -1355,8 +1348,8 @@ class MissingObjectFinder:
 
         # in fact, what we 'want' is commits, tags, and others
         # we've found missing
-        self.objects_to_send: Set[
-            Tuple[ObjectID, Optional[bytes], Optional[int], bool]
+        self.objects_to_send: set[
+            tuple[ObjectID, Optional[bytes], Optional[int], bool]
         ] = {(w, None, Commit.type_num, False) for w in missing_commits}
         missing_tags = want_tags.difference(have_tags)
         self.objects_to_send.update(
@@ -1375,11 +1368,11 @@ class MissingObjectFinder:
         return self.remote_has
 
     def add_todo(
-        self, entries: Iterable[Tuple[ObjectID, Optional[bytes], Optional[int], bool]]
+        self, entries: Iterable[tuple[ObjectID, Optional[bytes], Optional[int], bool]]
     ):
         self.objects_to_send.update([e for e in entries if e[0] not in self.sha_done])
 
-    def __next__(self) -> Tuple[bytes, Optional[PackHint]]:
+    def __next__(self) -> tuple[bytes, Optional[PackHint]]:
         while True:
             if not self.objects_to_send:
                 self.progress(
@@ -1444,7 +1437,7 @@ class ObjectStoreGraphWalker:
         """
         self.heads = set(local_heads)
         self.get_parents = get_parents
-        self.parents: Dict[ObjectID, Optional[List[ObjectID]]] = {}
+        self.parents: dict[ObjectID, Optional[list[ObjectID]]] = {}
         if shallow is None:
             shallow = set()
         self.shallow = shallow
@@ -1726,8 +1719,8 @@ class BucketBasedObjectStore(PackBasedObjectStore):
 def _collect_ancestors(
     store: ObjectContainer,
     heads,
-    common: FrozenSet[ObjectID] = frozenset(),
-    shallow: FrozenSet[ObjectID] = frozenset(),
+    common: frozenset[ObjectID] = frozenset(),
+    shallow: frozenset[ObjectID] = frozenset(),
     get_parents=lambda commit: commit.parents,
 ):
     """Collect all ancestors of heads up to (excluding) those in common.
@@ -1790,7 +1783,7 @@ def iter_tree_contents(
             yield entry
 
 
-def peel_sha(store: ObjectContainer, sha: bytes) -> Tuple[ShaFile, ShaFile]:
+def peel_sha(store: ObjectContainer, sha: bytes) -> tuple[ShaFile, ShaFile]:
     """Peel all tags from a SHA.
 
     Args:

+ 20 - 25
dulwich/objects.py

@@ -28,18 +28,13 @@ import stat
 import warnings
 import zlib
 from collections import namedtuple
+from collections.abc import Iterable, Iterator
 from hashlib import sha1
 from io import BytesIO
 from typing import (
     TYPE_CHECKING,
     BinaryIO,
-    Dict,
-    Iterable,
-    Iterator,
-    List,
     Optional,
-    Tuple,
-    Type,
     Union,
 )
 
@@ -183,7 +178,7 @@ def serializable_property(name: str, docstring: Optional[str] = None):
     return property(get, set, doc=docstring)
 
 
-def object_class(type: Union[bytes, int]) -> Optional[Type["ShaFile"]]:
+def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]:
     """Get the object class corresponding to the given type.
 
     Args:
@@ -280,7 +275,7 @@ class ShaFile:
     _needs_serialization: bool
     type_name: bytes
     type_num: int
-    _chunked_text: Optional[List[bytes]]
+    _chunked_text: Optional[list[bytes]]
     _sha: Union[FixedSha, None, "HASH"]
 
     @staticmethod
@@ -335,7 +330,7 @@ class ShaFile:
             self.as_legacy_object_chunks(compression_level=compression_level)
         )
 
-    def as_raw_chunks(self) -> List[bytes]:
+    def as_raw_chunks(self) -> list[bytes]:
         """Return chunks with serialization of the object.
 
         Returns: List of strings, not necessarily one per line
@@ -372,7 +367,7 @@ class ShaFile:
         self.set_raw_chunks([text], sha)
 
     def set_raw_chunks(
-        self, chunks: List[bytes], sha: Optional[ObjectID] = None
+        self, chunks: list[bytes], sha: Optional[ObjectID] = None
     ) -> None:
         """Set the contents of this object from a list of chunks."""
         self._chunked_text = chunks
@@ -431,10 +426,10 @@ class ShaFile:
         self._chunked_text = []
         self._needs_serialization = True
 
-    def _deserialize(self, chunks: List[bytes]) -> None:
+    def _deserialize(self, chunks: list[bytes]) -> None:
         raise NotImplementedError(self._deserialize)
 
-    def _serialize(self) -> List[bytes]:
+    def _serialize(self) -> list[bytes]:
         raise NotImplementedError(self._serialize)
 
     @classmethod
@@ -471,7 +466,7 @@ class ShaFile:
 
     @staticmethod
     def from_raw_chunks(
-        type_num: int, chunks: List[bytes], sha: Optional[ObjectID] = None
+        type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None
     ):
         """Creates an object of the indicated type from the raw chunks given.
 
@@ -591,7 +586,7 @@ class Blob(ShaFile):
     type_name = b"blob"
     type_num = 3
 
-    _chunked_text: List[bytes]
+    _chunked_text: list[bytes]
 
     def __init__(self) -> None:
         super().__init__()
@@ -611,7 +606,7 @@ class Blob(ShaFile):
     def _get_chunked(self):
         return self._chunked_text
 
-    def _set_chunked(self, chunks: List[bytes]):
+    def _set_chunked(self, chunks: list[bytes]):
         self._chunked_text = chunks
 
     def _serialize(self):
@@ -641,7 +636,7 @@ class Blob(ShaFile):
         """
         super().check()
 
-    def splitlines(self) -> List[bytes]:
+    def splitlines(self) -> list[bytes]:
         """Return list of lines in this blob.
 
         This preserves the original line endings.
@@ -671,7 +666,7 @@ class Blob(ShaFile):
 
 def _parse_message(
     chunks: Iterable[bytes],
-) -> Iterator[Union[Tuple[None, None], Tuple[Optional[bytes], bytes]]]:
+) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]:
     """Parse a message with a list of fields and a body.
 
     Args:
@@ -1027,7 +1022,7 @@ def sorted_tree_items(entries, name_order: bool):
         yield TreeEntry(name, mode, hexsha)
 
 
-def key_entry(entry: Tuple[bytes, Tuple[int, ObjectID]]) -> bytes:
+def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
     """Sort key for tree entry.
 
     Args:
@@ -1039,7 +1034,7 @@ def key_entry(entry: Tuple[bytes, Tuple[int, ObjectID]]) -> bytes:
     return name
 
 
-def key_entry_name_order(entry: Tuple[bytes, Tuple[int, ObjectID]]) -> bytes:
+def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
     """Sort key for tree entry in name order."""
     return entry[0]
 
@@ -1083,7 +1078,7 @@ class Tree(ShaFile):
 
     def __init__(self) -> None:
         super().__init__()
-        self._entries: Dict[bytes, Tuple[int, bytes]] = {}
+        self._entries: dict[bytes, tuple[int, bytes]] = {}
 
     @classmethod
     def from_path(cls, filename):
@@ -1201,7 +1196,7 @@ class Tree(ShaFile):
         return list(serialize_tree(self.iteritems()))
 
     def as_pretty_string(self) -> str:
-        text: List[str] = []
+        text: list[str] = []
         for name, mode, hexsha in self.iteritems():
             text.append(pretty_format_tree_entry(name, mode, hexsha))
         return "".join(text)
@@ -1387,11 +1382,11 @@ class Commit(ShaFile):
 
     def __init__(self) -> None:
         super().__init__()
-        self._parents: List[bytes] = []
+        self._parents: list[bytes] = []
         self._encoding = None
-        self._mergetag: List[Tag] = []
+        self._mergetag: list[Tag] = []
         self._gpgsig = None
-        self._extra: List[Tuple[bytes, bytes]] = []
+        self._extra: list[tuple[bytes, bytes]] = []
         self._author_timezone_neg_utc = False
         self._commit_timezone_neg_utc = False
 
@@ -1655,7 +1650,7 @@ OBJECT_CLASSES = (
     Tag,
 )
 
-_TYPE_MAP: Dict[Union[bytes, int], Type[ShaFile]] = {}
+_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {}
 
 for cls in OBJECT_CLASSES:
     _TYPE_MAP[cls.type_name] = cls

+ 4 - 3
dulwich/objectspec.py

@@ -20,7 +20,8 @@
 
 """Object specification."""
 
-from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple, Union
+from collections.abc import Iterator
+from typing import TYPE_CHECKING, Optional, Union
 
 from .objects import Commit, ShaFile, Tree
 
@@ -102,7 +103,7 @@ def parse_reftuple(
     rh_container: Union["Repo", "RefsContainer"],
     refspec: Union[str, bytes],
     force: bool = False,
-) -> Tuple[Optional["Ref"], Optional["Ref"], bool]:
+) -> tuple[Optional["Ref"], Optional["Ref"], bool]:
     """Parse a reftuple spec.
 
     Args:
@@ -142,7 +143,7 @@ def parse_reftuple(
 def parse_reftuples(
     lh_container: Union["Repo", "RefsContainer"],
     rh_container: Union["Repo", "RefsContainer"],
-    refspecs: Union[bytes, List[bytes]],
+    refspecs: Union[bytes, list[bytes]],
     force: bool = False,
 ):
     """Parse a list of reftuple specs to a list of reftuples.

+ 49 - 56
dulwich/pack.py

@@ -47,6 +47,7 @@ import struct
 import sys
 import warnings
 import zlib
+from collections.abc import Iterable, Iterator, Sequence
 from hashlib import sha1
 from itertools import chain
 from os import SEEK_CUR, SEEK_END
@@ -54,17 +55,9 @@ from struct import unpack_from
 from typing import (
     BinaryIO,
     Callable,
-    Deque,
-    Dict,
     Generic,
-    Iterable,
-    Iterator,
-    List,
     Optional,
     Protocol,
-    Sequence,
-    Set,
-    Tuple,
     TypeVar,
     Union,
 )
@@ -97,10 +90,10 @@ DEFAULT_PACK_DELTA_WINDOW_SIZE = 10
 PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024
 
 
-OldUnpackedObject = Union[Tuple[Union[bytes, int], List[bytes]], List[bytes]]
-ResolveExtRefFn = Callable[[bytes], Tuple[int, OldUnpackedObject]]
+OldUnpackedObject = Union[tuple[Union[bytes, int], list[bytes]], list[bytes]]
+ResolveExtRefFn = Callable[[bytes], tuple[int, OldUnpackedObject]]
 ProgressFn = Callable[[int, str], None]
-PackHint = Tuple[int, Optional[bytes]]
+PackHint = tuple[int, Optional[bytes]]
 
 
 class UnresolvedDeltas(Exception):
@@ -116,7 +109,7 @@ class ObjectContainer(Protocol):
 
     def add_objects(
         self,
-        objects: Sequence[Tuple[ShaFile, Optional[str]]],
+        objects: Sequence[tuple[ShaFile, Optional[str]]],
         progress: Optional[Callable[[str], None]] = None,
     ) -> None:
         """Add a set of objects to this object store.
@@ -146,7 +139,7 @@ class PackedObjectContainer(ObjectContainer):
 
     def iter_unpacked_subset(
         self,
-        shas: Set[bytes],
+        shas: set[bytes],
         include_comp: bool = False,
         allow_missing: bool = False,
         convert_ofs_delta: bool = True,
@@ -164,13 +157,13 @@ class UnpackedObjectStream:
 
 def take_msb_bytes(
     read: Callable[[int], bytes], crc32: Optional[int] = None
-) -> Tuple[List[int], Optional[int]]:
+) -> tuple[list[int], Optional[int]]:
     """Read bytes marked with most significant bit.
 
     Args:
       read: Read function
     """
-    ret: List[int] = []
+    ret: list[int] = []
     while len(ret) == 0 or ret[-1] & 0x80:
         b = read(1)
         if crc32 is not None:
@@ -209,10 +202,10 @@ class UnpackedObject:
     ]
 
     obj_type_num: Optional[int]
-    obj_chunks: Optional[List[bytes]]
+    obj_chunks: Optional[list[bytes]]
     delta_base: Union[None, bytes, int]
-    decomp_chunks: List[bytes]
-    comp_chunks: Optional[List[bytes]]
+    decomp_chunks: list[bytes]
+    comp_chunks: Optional[list[bytes]]
 
     # TODO(dborowitz): read_zlib_chunks and unpack_object could very well be
     # methods of this object.
@@ -232,7 +225,7 @@ class UnpackedObject:
         self.pack_type_num = pack_type_num
         self.delta_base = delta_base
         self.comp_chunks = None
-        self.decomp_chunks: List[bytes] = decomp_chunks or []
+        self.decomp_chunks: list[bytes] = decomp_chunks or []
         if decomp_chunks is not None and decomp_len is None:
             self.decomp_len = sum(map(len, decomp_chunks))
         else:
@@ -443,7 +436,7 @@ def bisect_find_sha(start, end, sha, unpack_name):
     return None
 
 
-PackIndexEntry = Tuple[bytes, int, Optional[int]]
+PackIndexEntry = tuple[bytes, int, Optional[int]]
 
 
 class PackIndex:
@@ -598,7 +591,7 @@ class FilePackIndex(PackIndex):
     present.
     """
 
-    _fan_out_table: List[int]
+    _fan_out_table: list[int]
 
     def __init__(self, filename, file=None, contents=None, size=None) -> None:
         """Create a pack index object.
@@ -835,7 +828,7 @@ class PackIndex2(FilePackIndex):
         return unpack_from(">L", self._contents, self._crc32_table_offset + i * 4)[0]
 
 
-def read_pack_header(read) -> Tuple[int, int]:
+def read_pack_header(read) -> tuple[int, int]:
     """Read the header of a pack file.
 
     Args:
@@ -868,7 +861,7 @@ def unpack_object(
     compute_crc32=False,
     include_comp=False,
     zlib_bufsize=_ZLIB_BUFSIZE,
-) -> Tuple[UnpackedObject, bytes]:
+) -> tuple[UnpackedObject, bytes]:
     """Unpack a Git object.
 
     Args:
@@ -964,7 +957,7 @@ class PackStreamReader:
         self._offset = 0
         self._rbuf = BytesIO()
         # trailer is a deque to avoid memory allocation on small reads
-        self._trailer: Deque[bytes] = deque()
+        self._trailer: deque[bytes] = deque()
         self._zlib_bufsize = zlib_bufsize
 
     def _read(self, read, size):
@@ -1218,7 +1211,7 @@ class PackData:
         else:
             self._file = file
         (version, self._num_objects) = read_pack_header(self._file.read)
-        self._offset_cache = LRUSizeCache[int, Tuple[int, OldUnpackedObject]](
+        self._offset_cache = LRUSizeCache[int, tuple[int, OldUnpackedObject]](
             1024 * 1024 * 20, compute_size=_compute_object_size
         )
 
@@ -1394,7 +1387,7 @@ class PackData:
         unpacked.offset = offset
         return unpacked
 
-    def get_object_at(self, offset: int) -> Tuple[int, OldUnpackedObject]:
+    def get_object_at(self, offset: int) -> tuple[int, OldUnpackedObject]:
         """Given an offset in to the packfile return the object that is there.
 
         Using the associated index the location of an object can be looked up,
@@ -1439,10 +1432,10 @@ class DeltaChainIterator(Generic[T]):
     def __init__(self, file_obj, *, resolve_ext_ref=None) -> None:
         self._file = file_obj
         self._resolve_ext_ref = resolve_ext_ref
-        self._pending_ofs: Dict[int, List[int]] = defaultdict(list)
-        self._pending_ref: Dict[bytes, List[int]] = defaultdict(list)
-        self._full_ofs: List[Tuple[int, int]] = []
-        self._ext_refs: List[bytes] = []
+        self._pending_ofs: dict[int, list[int]] = defaultdict(list)
+        self._pending_ref: dict[bytes, list[int]] = defaultdict(list)
+        self._full_ofs: list[tuple[int, int]] = []
+        self._ext_refs: list[bytes] = []
 
     @classmethod
     def for_pack_data(cls, pack_data: PackData, resolve_ext_ref=None):
@@ -1541,7 +1534,7 @@ class DeltaChainIterator(Generic[T]):
         raise NotImplementedError
 
     def _resolve_object(
-        self, offset: int, obj_type_num: int, base_chunks: List[bytes]
+        self, offset: int, obj_type_num: int, base_chunks: list[bytes]
     ) -> UnpackedObject:
         self._file.seek(offset)
         unpacked, _ = unpack_object(
@@ -1558,7 +1551,7 @@ class DeltaChainIterator(Generic[T]):
             unpacked.obj_chunks = apply_delta(base_chunks, unpacked.decomp_chunks)
         return unpacked
 
-    def _follow_chain(self, offset: int, obj_type_num: int, base_chunks: List[bytes]):
+    def _follow_chain(self, offset: int, obj_type_num: int, base_chunks: list[bytes]):
         # Unlike PackData.get_object_at, there is no need to cache offsets as
         # this approach by design inflates each object exactly once.
         todo = [(offset, obj_type_num, base_chunks)]
@@ -1736,7 +1729,7 @@ def write_pack_object(write, type, object, sha=None, compression_level=-1):
 
 def write_pack(
     filename,
-    objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
+    objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]],
     *,
     deltify: Optional[bool] = None,
     delta_window_size: Optional[int] = None,
@@ -1788,9 +1781,9 @@ def write_pack_header(write, num_objects):
 
 def find_reusable_deltas(
     container: PackedObjectContainer,
-    object_ids: Set[bytes],
+    object_ids: set[bytes],
     *,
-    other_haves: Optional[Set[bytes]] = None,
+    other_haves: Optional[set[bytes]] = None,
     progress=None,
 ) -> Iterator[UnpackedObject]:
     if other_haves is None:
@@ -1817,7 +1810,7 @@ def find_reusable_deltas(
 
 
 def deltify_pack_objects(
-    objects: Union[Iterator[bytes], Iterator[Tuple[ShaFile, Optional[bytes]]]],
+    objects: Union[Iterator[bytes], Iterator[tuple[ShaFile, Optional[bytes]]]],
     *,
     window_size: Optional[int] = None,
     progress=None,
@@ -1846,7 +1839,7 @@ def deltify_pack_objects(
 
 
 def sort_objects_for_delta(
-    objects: Union[Iterator[ShaFile], Iterator[Tuple[ShaFile, Optional[PackHint]]]],
+    objects: Union[Iterator[ShaFile], Iterator[tuple[ShaFile, Optional[PackHint]]]],
 ) -> Iterator[ShaFile]:
     magic = []
     for entry in objects:
@@ -1873,7 +1866,7 @@ def deltas_from_sorted_objects(
     if window_size is None:
         window_size = DEFAULT_PACK_DELTA_WINDOW_SIZE
 
-    possible_bases: Deque[Tuple[bytes, int, List[bytes]]] = deque()
+    possible_bases: deque[tuple[bytes, int, list[bytes]]] = deque()
     for i, o in enumerate(objects):
         if progress is not None and i % 1000 == 0:
             progress(("generating deltas: %d\r" % (i,)).encode("utf-8"))
@@ -1908,13 +1901,13 @@ def deltas_from_sorted_objects(
 
 
 def pack_objects_to_data(
-    objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
+    objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]],
     *,
     deltify: Optional[bool] = None,
     delta_window_size: Optional[int] = None,
     ofs_delta: bool = True,
     progress=None,
-) -> Tuple[int, Iterator[UnpackedObject]]:
+) -> tuple[int, Iterator[UnpackedObject]]:
     """Create pack data from objects.
 
     Args:
@@ -1950,12 +1943,12 @@ def pack_objects_to_data(
 
 def generate_unpacked_objects(
     container: PackedObjectContainer,
-    object_ids: Sequence[Tuple[ObjectID, Optional[PackHint]]],
+    object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]],
     delta_window_size: Optional[int] = None,
     deltify: Optional[bool] = None,
     reuse_deltas: bool = True,
     ofs_delta: bool = True,
-    other_haves: Optional[Set[bytes]] = None,
+    other_haves: Optional[set[bytes]] = None,
     progress=None,
 ) -> Iterator[UnpackedObject]:
     """Create pack data from objects.
@@ -2002,12 +1995,12 @@ def full_unpacked_object(o: ShaFile) -> UnpackedObject:
 def write_pack_from_container(
     write,
     container: PackedObjectContainer,
-    object_ids: Sequence[Tuple[ObjectID, Optional[PackHint]]],
+    object_ids: Sequence[tuple[ObjectID, Optional[PackHint]]],
     delta_window_size: Optional[int] = None,
     deltify: Optional[bool] = None,
     reuse_deltas: bool = True,
     compression_level: int = -1,
-    other_haves: Optional[Set[bytes]] = None,
+    other_haves: Optional[set[bytes]] = None,
 ):
     """Write a new pack data file.
 
@@ -2041,7 +2034,7 @@ def write_pack_from_container(
 
 def write_pack_objects(
     write,
-    objects: Union[Sequence[ShaFile], Sequence[Tuple[ShaFile, Optional[bytes]]]],
+    objects: Union[Sequence[ShaFile], Sequence[tuple[ShaFile, Optional[bytes]]]],
     *,
     delta_window_size: Optional[int] = None,
     deltify: Optional[bool] = None,
@@ -2078,7 +2071,7 @@ class PackChunkGenerator:
         reuse_compressed=True,
     ) -> None:
         self.cs = sha1(b"")
-        self.entries: Dict[Union[int, bytes], Tuple[int, int]] = {}
+        self.entries: dict[Union[int, bytes], tuple[int, int]] = {}
         self._it = self._pack_data_chunks(
             num_records=num_records,
             records=records,
@@ -2126,7 +2119,7 @@ class PackChunkGenerator:
                 progress(
                     ("writing pack data: %d/%d\r" % (i, num_records)).encode("ascii")
                 )
-            raw: Union[List[bytes], Tuple[int, List[bytes]], Tuple[bytes, List[bytes]]]
+            raw: Union[list[bytes], tuple[int, list[bytes]], tuple[bytes, list[bytes]]]
             if unpacked.delta_base is not None:
                 try:
                     base_offset, base_crc32 = self.entries[unpacked.delta_base]
@@ -2383,11 +2376,11 @@ def write_pack_index_v2(
     f = SHA1Writer(f)
     f.write(b"\377tOc")  # Magic!
     f.write(struct.pack(">L", 2))
-    fan_out_table: Dict[int, int] = defaultdict(lambda: 0)
+    fan_out_table: dict[int, int] = defaultdict(lambda: 0)
     for name, offset, entry_checksum in entries:
         fan_out_table[ord(name[:1])] += 1
     # Fan-out table
-    largetable: List[int] = []
+    largetable: list[int] = []
     for i in range(0x100):
         f.write(struct.pack(b">L", fan_out_table[i]))
         fan_out_table[i + 1] += fan_out_table[i]
@@ -2542,7 +2535,7 @@ class Pack:
         except KeyError:
             return False
 
-    def get_raw(self, sha1: bytes) -> Tuple[int, bytes]:
+    def get_raw(self, sha1: bytes) -> tuple[int, bytes]:
         offset = self.index.object_offset(sha1)
         obj_type, obj = self.data.get_object_at(offset)
         type_num, chunks = self.resolve_object(offset, obj_type, obj)
@@ -2581,8 +2574,8 @@ class Pack:
         allow_missing: bool = False,
         convert_ofs_delta: bool = False,
     ) -> Iterator[UnpackedObject]:
-        ofs_pending: Dict[int, List[UnpackedObject]] = defaultdict(list)
-        ofs: Dict[bytes, int] = {}
+        ofs_pending: dict[int, list[UnpackedObject]] = defaultdict(list)
+        ofs: dict[bytes, int] = {}
         todo = set(shas)
         for unpacked in self.iter_unpacked(include_comp=include_comp):
             sha = unpacked.sha()
@@ -2634,7 +2627,7 @@ class Pack:
                 keepfile.write(b"\n")
         return keepfile_name
 
-    def get_ref(self, sha: bytes) -> Tuple[Optional[int], int, OldUnpackedObject]:
+    def get_ref(self, sha: bytes) -> tuple[Optional[int], int, OldUnpackedObject]:
         """Get the object for a ref SHA, only looking in this pack."""
         # TODO: cache these results
         try:
@@ -2651,7 +2644,7 @@ class Pack:
 
     def resolve_object(
         self, offset: int, type: int, obj, get_ref=None
-    ) -> Tuple[int, Iterable[bytes]]:
+    ) -> tuple[int, Iterable[bytes]]:
         """Resolve an object, possibly resolving deltas when necessary.
 
         Returns: Tuple with object type and contents.
@@ -2743,12 +2736,12 @@ class Pack:
 
 def extend_pack(
     f: BinaryIO,
-    object_ids: Set[ObjectID],
+    object_ids: set[ObjectID],
     get_raw,
     *,
     compression_level=-1,
     progress=None,
-) -> Tuple[bytes, List]:
+) -> tuple[bytes, list]:
     """Extend a pack file with more objects.
 
     The caller should make sure that object_ids does not contain any objects

+ 5 - 5
dulwich/porcelain.py

@@ -75,7 +75,7 @@ from collections import namedtuple
 from contextlib import closing, contextmanager
 from io import BytesIO, RawIOBase
 from pathlib import Path
-from typing import Dict, List, Optional, Tuple, Union
+from typing import Optional, Union
 
 from .archive import tar_stream
 from .client import get_transport_and_path
@@ -1167,7 +1167,7 @@ def reset(repo, mode, treeish="HEAD"):
 
 def get_remote_repo(
     repo: Repo, remote_location: Optional[Union[str, bytes]] = None
-) -> Tuple[Optional[str], str]:
+) -> tuple[Optional[str], str]:
     config = repo.get_config()
     if remote_location is None:
         remote_location = get_branch_remote(repo)
@@ -1763,7 +1763,7 @@ def fetch(
 def for_each_ref(
     repo: Union[Repo, str] = ".",
     pattern: Optional[Union[str, bytes]] = None,
-) -> List[Tuple[bytes, bytes, bytes]]:
+) -> list[tuple[bytes, bytes, bytes]]:
     """Iterate over all refs that match the (optional) pattern.
 
     Args:
@@ -1779,7 +1779,7 @@ def for_each_ref(
         refs = r.get_refs()
 
     if pattern:
-        matching_refs: Dict[bytes, bytes] = {}
+        matching_refs: dict[bytes, bytes] = {}
         pattern_parts = pattern.split(b"/")
         for ref, sha in refs.items():
             matches = False
@@ -1802,7 +1802,7 @@ def for_each_ref(
 
         refs = matching_refs
 
-    ret: List[Tuple[bytes, bytes, bytes]] = [
+    ret: list[tuple[bytes, bytes, bytes]] = [
         (sha, r.get_object(sha).type_name, ref)
         for ref, sha in sorted(
             refs.items(),

+ 8 - 8
dulwich/refs.py

@@ -24,7 +24,7 @@
 import os
 import warnings
 from contextlib import suppress
-from typing import Any, Dict, List, Optional, Set, Tuple
+from typing import Any, Optional
 
 from .errors import PackedRefsException, RefFormatError
 from .file import GitFile, ensure_dir_exists
@@ -149,7 +149,7 @@ class RefsContainer:
         """
         raise NotImplementedError(self.get_packed_refs)
 
-    def add_packed_refs(self, new_refs: Dict[Ref, Optional[ObjectID]]):
+    def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]):
         """Add the given refs as packed refs.
 
         Args:
@@ -172,7 +172,7 @@ class RefsContainer:
     def import_refs(
         self,
         base: Ref,
-        other: Dict[Ref, ObjectID],
+        other: dict[Ref, ObjectID],
         committer: Optional[bytes] = None,
         timestamp: Optional[bytes] = None,
         timezone: Optional[bytes] = None,
@@ -291,7 +291,7 @@ class RefsContainer:
         """
         raise NotImplementedError(self.read_loose_ref)
 
-    def follow(self, name) -> Tuple[List[bytes], bytes]:
+    def follow(self, name) -> tuple[list[bytes], bytes]:
         """Follow a reference name.
 
         Returns: a tuple of (refnames, sha), wheres refnames are the names of
@@ -444,8 +444,8 @@ class DictRefsContainer(RefsContainer):
     def __init__(self, refs, logger=None) -> None:
         super().__init__(logger=logger)
         self._refs = refs
-        self._peeled: Dict[bytes, ObjectID] = {}
-        self._watchers: Set[Any] = set()
+        self._peeled: dict[bytes, ObjectID] = {}
+        self._watchers: set[Any] = set()
 
     def allkeys(self):
         return self._refs.keys()
@@ -702,7 +702,7 @@ class DiskRefsContainer(RefsContainer):
                         self._packed_refs[name] = sha
         return self._packed_refs
 
-    def add_packed_refs(self, new_refs: Dict[Ref, Optional[ObjectID]]):
+    def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]):
         """Add the given refs as packed refs.
 
         Args:
@@ -1245,7 +1245,7 @@ def _set_head(refs, head_ref, ref_message):
 def _import_remote_refs(
     refs_container: RefsContainer,
     remote_name: str,
-    refs: Dict[str, str],
+    refs: dict[str, str],
     message: Optional[bytes] = None,
     prune: bool = False,
     prune_tags: bool = False,

+ 22 - 27
dulwich/repo.py

@@ -33,19 +33,14 @@ import stat
 import sys
 import time
 import warnings
+from collections.abc import Iterable
 from io import BytesIO
 from typing import (
     TYPE_CHECKING,
     Any,
     BinaryIO,
     Callable,
-    Dict,
-    FrozenSet,
-    Iterable,
-    List,
     Optional,
-    Set,
-    Tuple,
     Union,
 )
 
@@ -149,7 +144,7 @@ class DefaultIdentityNotFound(Exception):
 
 
 # TODO(jelmer): Cache?
-def _get_default_identity() -> Tuple[str, str]:
+def _get_default_identity() -> tuple[str, str]:
     import socket
 
     for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
@@ -257,7 +252,7 @@ def check_user_identity(identity):
 
 def parse_graftpoints(
     graftpoints: Iterable[bytes],
-) -> Dict[bytes, List[bytes]]:
+) -> dict[bytes, list[bytes]]:
     """Convert a list of graftpoints into a dict.
 
     Args:
@@ -288,7 +283,7 @@ def parse_graftpoints(
     return grafts
 
 
-def serialize_graftpoints(graftpoints: Dict[bytes, List[bytes]]) -> bytes:
+def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
     """Convert a dictionary of grafts into string.
 
     The graft dictionary is:
@@ -376,8 +371,8 @@ class BaseRepo:
         self.object_store = object_store
         self.refs = refs
 
-        self._graftpoints: Dict[bytes, List[bytes]] = {}
-        self.hooks: Dict[str, Hook] = {}
+        self._graftpoints: dict[bytes, list[bytes]] = {}
+        self.hooks: dict[str, Hook] = {}
 
     def _determine_file_mode(self) -> bool:
         """Probe the file-system to determine whether permissions can be trusted.
@@ -540,8 +535,8 @@ class BaseRepo:
         if not isinstance(wants, list):
             raise TypeError("determine_wants() did not return a list")
 
-        shallows: FrozenSet[ObjectID] = getattr(graph_walker, "shallow", frozenset())
-        unshallows: FrozenSet[ObjectID] = getattr(
+        shallows: frozenset[ObjectID] = getattr(graph_walker, "shallow", frozenset())
+        unshallows: frozenset[ObjectID] = getattr(
             graph_walker, "unshallow", frozenset()
         )
 
@@ -594,8 +589,8 @@ class BaseRepo:
 
     def generate_pack_data(
         self,
-        have: List[ObjectID],
-        want: List[ObjectID],
+        have: list[ObjectID],
+        want: list[ObjectID],
         progress: Optional[Callable[[str], None]] = None,
         ofs_delta: Optional[bool] = None,
     ):
@@ -616,7 +611,7 @@ class BaseRepo:
         )
 
     def get_graph_walker(
-        self, heads: Optional[List[ObjectID]] = None
+        self, heads: Optional[list[ObjectID]] = None
     ) -> ObjectStoreGraphWalker:
         """Retrieve a graph walker.
 
@@ -638,7 +633,7 @@ class BaseRepo:
             heads, parents_provider.get_parents, shallow=self.get_shallow()
         )
 
-    def get_refs(self) -> Dict[bytes, bytes]:
+    def get_refs(self) -> dict[bytes, bytes]:
         """Get dictionary with all refs.
 
         Returns: A ``dict`` mapping ref names to SHA1s
@@ -683,7 +678,7 @@ class BaseRepo:
             shallows=self.get_shallow(),
         )
 
-    def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> List[bytes]:
+    def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
         """Retrieve the parents of a specific commit.
 
         If the specific commit is a graftpoint, the graft parents
@@ -735,14 +730,14 @@ class BaseRepo:
         from .config import ConfigFile, StackedConfig
 
         local_config = self.get_config()
-        backends: List[ConfigFile] = [local_config]
+        backends: list[ConfigFile] = [local_config]
         if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
             backends.append(self.get_worktree_config())
 
         backends += StackedConfig.default_backends()
         return StackedConfig(backends, writable=local_config)
 
-    def get_shallow(self) -> Set[ObjectID]:
+    def get_shallow(self) -> set[ObjectID]:
         """Get the set of shallow commits.
 
         Returns: Set of shallow commits.
@@ -784,7 +779,7 @@ class BaseRepo:
             return cached
         return peel_sha(self.object_store, self.refs[ref])[1].id
 
-    def get_walker(self, include: Optional[List[bytes]] = None, *args, **kwargs):
+    def get_walker(self, include: Optional[list[bytes]] = None, *args, **kwargs):
         """Obtain a walker for this repository.
 
         Args:
@@ -889,7 +884,7 @@ class BaseRepo:
         )
         return get_user_identity(config)
 
-    def _add_graftpoints(self, updated_graftpoints: Dict[bytes, List[bytes]]):
+    def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]):
         """Add or modify graftpoints.
 
         Args:
@@ -902,7 +897,7 @@ class BaseRepo:
 
         self._graftpoints.update(updated_graftpoints)
 
-    def _remove_graftpoints(self, to_remove: List[bytes] = []) -> None:
+    def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
         """Remove graftpoints.
 
         Args:
@@ -930,7 +925,7 @@ class BaseRepo:
         tree: Optional[ObjectID] = None,
         encoding: Optional[bytes] = None,
         ref: Ref = b"HEAD",
-        merge_heads: Optional[List[ObjectID]] = None,
+        merge_heads: Optional[list[ObjectID]] = None,
         no_verify: bool = False,
         sign: bool = False,
     ):
@@ -1439,7 +1434,7 @@ class Repo(BaseRepo):
                     index[tree_path] = index_entry_from_stat(st, blob.id)
         index.write()
 
-    def unstage(self, fs_paths: List[str]):
+    def unstage(self, fs_paths: list[str]):
         """Unstage specific file in the index
         Args:
           fs_paths: a list of files to unstage,
@@ -1843,10 +1838,10 @@ class MemoryRepo(BaseRepo):
     def __init__(self) -> None:
         from .config import ConfigFile
 
-        self._reflog: List[Any] = []
+        self._reflog: list[Any] = []
         refs_container = DictRefsContainer({}, logger=self._append_reflog)
         BaseRepo.__init__(self, MemoryObjectStore(), refs_container)  # type: ignore
-        self._named_files: Dict[str, bytes] = {}
+        self._named_files: dict[str, bytes] = {}
         self.bare = True
         self._config = ConfigFile()
         self._description = None

+ 17 - 16
dulwich/server.py

@@ -49,8 +49,9 @@ import socketserver
 import sys
 import time
 import zlib
+from collections.abc import Iterable
 from functools import partial
-from typing import Dict, Iterable, List, Optional, Set, Tuple, cast
+from typing import Optional, cast
 from typing import Protocol as TypingProtocol
 
 from dulwich import log_utils
@@ -142,7 +143,7 @@ class BackendRepo(TypingProtocol):
     object_store: PackedObjectContainer
     refs: RefsContainer
 
-    def get_refs(self) -> Dict[bytes, bytes]:
+    def get_refs(self) -> dict[bytes, bytes]:
         """Get all the refs in the repository.
 
         Returns: dict of name -> sha
@@ -224,7 +225,7 @@ class PackHandler(Handler):
 
     def __init__(self, backend, proto, stateless_rpc=False) -> None:
         super().__init__(backend, proto, stateless_rpc)
-        self._client_capabilities: Optional[Set[bytes]] = None
+        self._client_capabilities: Optional[set[bytes]] = None
         # Flags needed for the no-done capability
         self._done_received = False
 
@@ -331,7 +332,7 @@ class UploadPackHandler(PackHandler):
         else:
             self.write_pack_data = self.proto.write
 
-    def get_tagged(self, refs=None, repo=None) -> Dict[ObjectID, ObjectID]:
+    def get_tagged(self, refs=None, repo=None) -> dict[ObjectID, ObjectID]:
         """Get a dict of peeled values of tags to their original tag shas.
 
         Args:
@@ -471,7 +472,7 @@ def _find_shallow(store: ObjectContainer, heads, depth):
         considered shallow and unshallow according to the arguments. Note that
         these sets may overlap if a commit is reachable along multiple paths.
     """
-    parents: Dict[bytes, List[bytes]] = {}
+    parents: dict[bytes, list[bytes]] = {}
 
     def get_parents(sha):
         result = parents.get(sha, None)
@@ -570,12 +571,12 @@ class _ProtocolGraphWalker:
         self.proto = handler.proto
         self.stateless_rpc = handler.stateless_rpc
         self.advertise_refs = handler.advertise_refs
-        self._wants: List[bytes] = []
-        self.shallow: Set[bytes] = set()
-        self.client_shallow: Set[bytes] = set()
-        self.unshallow: Set[bytes] = set()
+        self._wants: list[bytes] = []
+        self.shallow: set[bytes] = set()
+        self.client_shallow: set[bytes] = set()
+        self.unshallow: set[bytes] = set()
         self._cached = False
-        self._cache: List[bytes] = []
+        self._cache: list[bytes] = []
         self._cache_index = 0
         self._impl = None
 
@@ -770,7 +771,7 @@ class SingleAckGraphWalkerImpl:
 
     def __init__(self, walker) -> None:
         self.walker = walker
-        self._common: List[bytes] = []
+        self._common: list[bytes] = []
 
     def ack(self, have_ref):
         if not self._common:
@@ -815,7 +816,7 @@ class MultiAckGraphWalkerImpl:
     def __init__(self, walker) -> None:
         self.walker = walker
         self._found_base = False
-        self._common: List[bytes] = []
+        self._common: list[bytes] = []
 
     def ack(self, have_ref):
         self._common.append(have_ref)
@@ -873,7 +874,7 @@ class MultiAckDetailedGraphWalkerImpl:
 
     def __init__(self, walker) -> None:
         self.walker = walker
-        self._common: List[bytes] = []
+        self._common: list[bytes] = []
 
     def ack(self, have_ref):
         # Should only be called iff have_ref is common
@@ -955,8 +956,8 @@ class ReceivePackHandler(PackHandler):
         ]
 
     def _apply_pack(
-        self, refs: List[Tuple[bytes, bytes, bytes]]
-    ) -> List[Tuple[bytes, bytes]]:
+        self, refs: list[tuple[bytes, bytes, bytes]]
+    ) -> list[tuple[bytes, bytes]]:
         all_exceptions = (
             IOError,
             OSError,
@@ -1014,7 +1015,7 @@ class ReceivePackHandler(PackHandler):
 
         return status
 
-    def _report_status(self, status: List[Tuple[bytes, bytes]]) -> None:
+    def _report_status(self, status: list[tuple[bytes, bytes]]) -> None:
         if self.has_capability(CAPABILITY_SIDE_BAND_64K):
             writer = BufferedPktLineWriter(
                 lambda d: self.proto.write_sideband(SIDE_BAND_CHANNEL_DATA, d)

+ 2 - 2
dulwich/submodule.py

@@ -20,13 +20,13 @@
 
 """Working with Git submodules."""
 
-from typing import Iterator, Tuple
+from collections.abc import Iterator
 
 from .object_store import iter_tree_contents
 from .objects import S_ISGITLINK
 
 
-def iter_cached_submodules(store, root_tree_id: bytes) -> Iterator[Tuple[str, bytes]]:
+def iter_cached_submodules(store, root_tree_id: bytes) -> Iterator[tuple[str, bytes]]:
     """Iterate over cached submodules.
 
     Args:

+ 10 - 10
dulwich/walk.py

@@ -23,7 +23,7 @@
 import collections
 import heapq
 from itertools import chain
-from typing import Deque, Dict, List, Optional, Set, Tuple
+from typing import Optional
 
 from .diff_tree import (
     RENAME_CHANGE_TYPES,
@@ -51,7 +51,7 @@ class WalkEntry:
         self.commit = commit
         self._store = walker.store
         self._get_parents = walker.get_parents
-        self._changes: Dict[str, List[TreeChange]] = {}
+        self._changes: dict[str, list[TreeChange]] = {}
         self._rename_detector = walker.rename_detector
 
     def changes(self, path_prefix=None):
@@ -127,10 +127,10 @@ class _CommitTimeQueue:
         self._store = walker.store
         self._get_parents = walker.get_parents
         self._excluded = walker.excluded
-        self._pq: List[Tuple[int, Commit]] = []
-        self._pq_set: Set[ObjectID] = set()
-        self._seen: Set[ObjectID] = set()
-        self._done: Set[ObjectID] = set()
+        self._pq: list[tuple[int, Commit]] = []
+        self._pq_set: set[ObjectID] = set()
+        self._seen: set[ObjectID] = set()
+        self._done: set[ObjectID] = set()
         self._min_time = walker.since
         self._last = None
         self._extra_commits_left = _MAX_EXTRA_COMMITS
@@ -233,12 +233,12 @@ class Walker:
     def __init__(
         self,
         store,
-        include: List[bytes],
-        exclude: Optional[List[bytes]] = None,
+        include: list[bytes],
+        exclude: Optional[list[bytes]] = None,
         order: str = "date",
         reverse: bool = False,
         max_entries: Optional[int] = None,
-        paths: Optional[List[bytes]] = None,
+        paths: Optional[list[bytes]] = None,
         rename_detector: Optional[RenameDetector] = None,
         follow: bool = False,
         since: Optional[int] = None,
@@ -297,7 +297,7 @@ class Walker:
 
         self._num_entries = 0
         self._queue = queue_cls(self)
-        self._out_queue: Deque[WalkEntry] = collections.deque()
+        self._out_queue: collections.deque[WalkEntry] = collections.deque()
 
     def _path_matches(self, changed_path):
         if changed_path is None:

+ 8 - 7
dulwich/web.py

@@ -25,8 +25,9 @@ import os
 import re
 import sys
 import time
+from collections.abc import Iterator
 from io import BytesIO
-from typing import Callable, ClassVar, Dict, Iterator, List, Optional, Tuple
+from typing import Callable, ClassVar, Optional
 from urllib.parse import parse_qs
 from wsgiref.simple_server import (
     ServerHandler,
@@ -258,7 +259,7 @@ def _chunk_iter(f):
 class ChunkReader:
     def __init__(self, f) -> None:
         self._iter = _chunk_iter(f)
-        self._buffer: List[bytes] = []
+        self._buffer: list[bytes] = []
 
     def read(self, n):
         while sum(map(len, self._buffer)) < n:
@@ -334,8 +335,8 @@ class HTTPGitRequest:
         self.dumb = dumb
         self.handlers = handlers
         self._start_response = start_response
-        self._cache_headers: List[Tuple[str, str]] = []
-        self._headers: List[Tuple[str, str]] = []
+        self._cache_headers: list[tuple[str, str]] = []
+        self._headers: list[tuple[str, str]] = []
 
     def add_header(self, name, value):
         """Add a header to the response."""
@@ -345,7 +346,7 @@ class HTTPGitRequest:
         self,
         status: str = HTTP_OK,
         content_type: Optional[str] = None,
-        headers: Optional[List[Tuple[str, str]]] = None,
+        headers: Optional[list[tuple[str, str]]] = None,
     ):
         """Begin a response with the given status and other headers."""
         if headers:
@@ -394,8 +395,8 @@ class HTTPGitApplication:
     """
 
     services: ClassVar[
-        Dict[
-            Tuple[str, re.Pattern],
+        dict[
+            tuple[str, re.Pattern],
             Callable[[HTTPGitRequest, Backend, re.Match], Iterator[bytes]],
         ]
     ] = {

+ 1 - 2
fuzzing/fuzz-targets/test_utils.py

@@ -1,11 +1,10 @@
-from typing import List  # pragma: no cover
 
 import atheris  # pragma: no cover
 
 
 @atheris.instrument_func
 def is_expected_exception(
-    error_message_list: List[str], exception: Exception
+    error_message_list: list[str], exception: Exception
 ):  # pragma: no cover
     """Checks if the message of a given exception matches any of the expected error messages.
 

+ 2 - 2
tests/__init__.py

@@ -37,7 +37,7 @@ import tempfile
 
 # If Python itself provides an exception, use that
 import unittest
-from typing import ClassVar, List
+from typing import ClassVar
 from unittest import SkipTest, expectedFailure, skipIf
 from unittest import TestCase as _TestCase
 
@@ -67,7 +67,7 @@ class BlackboxTestCase(TestCase):
     """Blackbox testing."""
 
     # TODO(jelmer): Include more possible binary paths.
-    bin_directories: ClassVar[List[str]] = [
+    bin_directories: ClassVar[list[str]] = [
         os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "bin")),
         "/usr/bin",
         "/usr/local/bin",

+ 1 - 2
tests/compat/test_web.py

@@ -27,7 +27,6 @@ warning: these tests should be fairly stable, but when writing/debugging new
 
 import sys
 import threading
-from typing import Tuple
 from wsgiref import simple_server
 
 from dulwich.server import DictBackend, ReceivePackHandler, UploadPackHandler
@@ -78,7 +77,7 @@ class SmartWebTestCase(WebTests, CompatTestCase):
     This server test case does not use side-band-64k in git-receive-pack.
     """
 
-    min_git_version: Tuple[int, ...] = (1, 6, 6)
+    min_git_version: tuple[int, ...] = (1, 6, 6)
 
     def _handlers(self):
         return {b"git-receive-pack": NoSideBand64kReceivePackHandler}

+ 1 - 2
tests/compat/utils.py

@@ -30,7 +30,6 @@ import subprocess
 import sys
 import tempfile
 import time
-from typing import Tuple
 
 from dulwich.protocol import TCP_GIT_PORT
 from dulwich.repo import Repo
@@ -236,7 +235,7 @@ class CompatTestCase(TestCase):
     min_git_version.
     """
 
-    min_git_version: Tuple[int, ...] = (1, 5, 0)
+    min_git_version: tuple[int, ...] = (1, 5, 0)
 
     def setUp(self):
         super().setUp()

+ 3 - 3
tests/contrib/test_release_robot.py

@@ -26,7 +26,7 @@ import shutil
 import tempfile
 import time
 import unittest
-from typing import ClassVar, Dict, List, Optional, Tuple
+from typing import ClassVar, Optional
 
 from dulwich.contrib import release_robot
 from dulwich.repo import Repo
@@ -68,9 +68,9 @@ class GetRecentTagsTest(unittest.TestCase):
     # Git repo for dulwich project
     test_repo = os.path.join(BASEDIR, "dulwich_test_repo.zip")
     committer = b"Mark Mikofski <mark.mikofski@sunpowercorp.com>"
-    test_tags: ClassVar[List[bytes]] = [b"v0.1a", b"v0.1"]
+    test_tags: ClassVar[list[bytes]] = [b"v0.1a", b"v0.1"]
     tag_test_data: ClassVar[
-        Dict[bytes, Tuple[int, bytes, Optional[Tuple[int, bytes]]]]
+        dict[bytes, tuple[int, bytes, Optional[tuple[int, bytes]]]]
     ] = {
         test_tags[0]: (1484788003, b"3" * 40, None),
         test_tags[1]: (1484788314, b"1" * 40, (1484788401, b"2" * 40)),

+ 3 - 4
tests/test_client.py

@@ -25,7 +25,6 @@ import sys
 import tempfile
 import warnings
 from io import BytesIO
-from typing import Dict
 from unittest.mock import patch
 from urllib.parse import quote as urlquote
 from urllib.parse import urlparse
@@ -1094,7 +1093,7 @@ class HttpGitClientTests(TestCase):
         # otherwise without an active internet connection
         class PoolManagerMock:
             def __init__(self) -> None:
-                self.headers: Dict[str, str] = {}
+                self.headers: dict[str, str] = {}
 
             def request(
                 self,
@@ -1165,7 +1164,7 @@ class HttpGitClientTests(TestCase):
         # otherwise without an active internet connection
         class PoolManagerMock:
             def __init__(self) -> None:
-                self.headers: Dict[str, str] = {}
+                self.headers: dict[str, str] = {}
 
             def request(
                 self,
@@ -1200,7 +1199,7 @@ class HttpGitClientTests(TestCase):
         # otherwise without an active internet connection
         class PoolManagerMock:
             def __init__(self) -> None:
-                self.headers: Dict[str, str] = {}
+                self.headers: dict[str, str] = {}
 
             def request(
                 self,

+ 1 - 2
tests/test_pack.py

@@ -28,7 +28,6 @@ import tempfile
 import zlib
 from hashlib import sha1
 from io import BytesIO
-from typing import Set
 
 from dulwich.errors import ApplyDeltaError, ChecksumMismatch
 from dulwich.file import GitFile
@@ -987,7 +986,7 @@ class TestPackIterator(DeltaChainIterator):
 
     def __init__(self, *args, **kwargs) -> None:
         super().__init__(*args, **kwargs)
-        self._unpacked_offsets: Set[int] = set()
+        self._unpacked_offsets: set[int] = set()
 
     def _result(self, unpacked):
         """Return entries in the same format as build_pack."""

+ 3 - 3
tests/test_refs.py

@@ -24,7 +24,7 @@ import os
 import sys
 import tempfile
 from io import BytesIO
-from typing import ClassVar, Dict
+from typing import ClassVar
 
 from dulwich import errors
 from dulwich.file import GitFile
@@ -797,7 +797,7 @@ class ParseSymrefValueTests(TestCase):
 
 
 class StripPeeledRefsTests(TestCase):
-    all_refs: ClassVar[Dict[bytes, bytes]] = {
+    all_refs: ClassVar[dict[bytes, bytes]] = {
         b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
         b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
         b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",
@@ -805,7 +805,7 @@ class StripPeeledRefsTests(TestCase):
         b"refs/tags/2.0.0": b"0749936d0956c661ac8f8d3483774509c165f89e",
         b"refs/tags/2.0.0^{}": b"0749936d0956c661ac8f8d3483774509c165f89e",
     }
-    non_peeled_refs: ClassVar[Dict[bytes, bytes]] = {
+    non_peeled_refs: ClassVar[dict[bytes, bytes]] = {
         b"refs/heads/master": b"8843d7f92416211de9ebb963ff4ce28125932878",
         b"refs/heads/testing": b"186a005b134d8639a58b6731c7c1ea821a6eedba",
         b"refs/tags/1.0.0": b"a93db4b0360cc635a2b93675010bac8d101f73f0",

+ 4 - 5
tests/test_server.py

@@ -25,7 +25,6 @@ import shutil
 import sys
 import tempfile
 from io import BytesIO
-from typing import Dict, List
 
 from dulwich.errors import (
     GitProtocolError,
@@ -67,8 +66,8 @@ SIX = b"6" * 40
 
 class TestProto:
     def __init__(self) -> None:
-        self._output: List[bytes] = []
-        self._received: Dict[int, List[bytes]] = {0: [], 1: [], 2: [], 3: []}
+        self._output: list[bytes] = []
+        self._received: dict[int, list[bytes]] = {0: [], 1: [], 2: [], 3: []}
 
     def set_output(self, output_lines):
         self._output = output_lines
@@ -587,8 +586,8 @@ class ProtocolGraphWalkerTestCase(TestCase):
 
 class TestProtocolGraphWalker:
     def __init__(self) -> None:
-        self.acks: List[bytes] = []
-        self.lines: List[bytes] = []
+        self.acks: list[bytes] = []
+        self.lines: list[bytes] = []
         self.wants_satisified = False
         self.stateless_rpc = None
         self.advertise_refs = False

+ 1 - 2
tests/test_web.py

@@ -24,7 +24,6 @@ import gzip
 import os
 import re
 from io import BytesIO
-from typing import Type
 
 from dulwich.object_store import MemoryObjectStore
 from dulwich.objects import Blob
@@ -96,7 +95,7 @@ class TestHTTPGitRequest(HTTPGitRequest):
 class WebTestCase(TestCase):
     """Base TestCase with useful instance vars and utility functions."""
 
-    _req_class: Type[HTTPGitRequest] = TestHTTPGitRequest
+    _req_class: type[HTTPGitRequest] = TestHTTPGitRequest
 
     def setUp(self):
         super().setUp()