瀏覽代碼

Add basic support for manyfiles feature (#1560)

Note that this just allows us to read these files - it does not
implement path-prefix compression.

Fixes #1462
Jelmer Vernooij 2 月之前
父節點
當前提交
2fbb83f61d
共有 5 個文件被更改,包括 1695 次插入21 次删除
  1. 7 0
      NEWS
  2. 384 20
      dulwich/index.py
  3. 25 1
      dulwich/repo.py
  4. 924 0
      tests/compat/test_index.py
  5. 355 0
      tests/test_index.py

+ 7 - 0
NEWS

@@ -1,5 +1,12 @@
 0.22.9	UNRELEASED
 
+ * Add support for Git's ``feature.manyFiles`` configuration and index version 4.
+   This enables faster Git operations in large repositories through path prefix
+   compression (30-50% smaller index files) and optional hash skipping for faster
+   writes. Supports ``feature.manyFiles``, ``index.version``, and ``index.skipHash``
+   configuration options.
+   (Jelmer Vernooij, #1061, #1462)
+
  * In dulwich.porcelain docstring, list functions by their Python identifiers.
    (Marnanel Thurman)
 

+ 384 - 20
dulwich/index.py

@@ -68,6 +68,180 @@ EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
 
 DEFAULT_VERSION = 2
 
+# Index extension signatures
+TREE_EXTENSION = b"TREE"
+REUC_EXTENSION = b"REUC"
+UNTR_EXTENSION = b"UNTR"
+EOIE_EXTENSION = b"EOIE"
+IEOT_EXTENSION = b"IEOT"
+
+
+def _encode_varint(value: int) -> bytes:
+    """Encode an integer using variable-width encoding.
+
+    Same format as used for OFS_DELTA pack entries and index v4 path compression.
+    Uses 7 bits per byte, with the high bit indicating continuation.
+
+    Args:
+      value: Integer to encode
+    Returns:
+      Encoded bytes
+    """
+    if value == 0:
+        return b"\x00"
+
+    result = []
+    while value > 0:
+        byte = value & 0x7F  # Take lower 7 bits
+        value >>= 7
+        if value > 0:
+            byte |= 0x80  # Set continuation bit
+        result.append(byte)
+
+    return bytes(result)
+
+
+def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
+    """Decode a variable-width encoded integer.
+
+    Args:
+      data: Bytes to decode from
+      offset: Starting offset in data
+    Returns:
+      tuple of (decoded_value, new_offset)
+    """
+    value = 0
+    shift = 0
+    pos = offset
+
+    while pos < len(data):
+        byte = data[pos]
+        pos += 1
+        value |= (byte & 0x7F) << shift
+        shift += 7
+        if not (byte & 0x80):  # No continuation bit
+            break
+
+    return value, pos
+
+
+def _compress_path(path: bytes, previous_path: bytes) -> bytes:
+    """Compress a path relative to the previous path for index version 4.
+
+    Args:
+      path: Path to compress
+      previous_path: Previous path for comparison
+    Returns:
+      Compressed path data (varint prefix_len + suffix)
+    """
+    # Find the common prefix length
+    common_len = 0
+    min_len = min(len(path), len(previous_path))
+
+    for i in range(min_len):
+        if path[i] == previous_path[i]:
+            common_len += 1
+        else:
+            break
+
+    # The number of bytes to remove from the end of previous_path
+    # to get the common prefix
+    remove_len = len(previous_path) - common_len
+
+    # The suffix to append
+    suffix = path[common_len:]
+
+    # Encode: varint(remove_len) + suffix + NUL
+    return _encode_varint(remove_len) + suffix + b"\x00"
+
+
+def _decompress_path(
+    data: bytes, offset: int, previous_path: bytes
+) -> tuple[bytes, int]:
+    """Decompress a path from index version 4 compressed format.
+
+    Args:
+      data: Raw data containing compressed path
+      offset: Starting offset in data
+      previous_path: Previous path for decompression
+    Returns:
+      tuple of (decompressed_path, new_offset)
+    """
+    # Decode the number of bytes to remove from previous path
+    remove_len, new_offset = _decode_varint(data, offset)
+
+    # Find the NUL terminator for the suffix
+    suffix_start = new_offset
+    suffix_end = suffix_start
+    while suffix_end < len(data) and data[suffix_end] != 0:
+        suffix_end += 1
+
+    if suffix_end >= len(data):
+        raise ValueError("Unterminated path suffix in compressed entry")
+
+    suffix = data[suffix_start:suffix_end]
+    new_offset = suffix_end + 1  # Skip the NUL terminator
+
+    # Reconstruct the path
+    if remove_len > len(previous_path):
+        raise ValueError(
+            f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
+        )
+
+    prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
+    path = prefix + suffix
+
+    return path, new_offset
+
+
+def _decompress_path_from_stream(f, previous_path: bytes) -> tuple[bytes, int]:
+    """Decompress a path from index version 4 compressed format, reading from stream.
+
+    Args:
+      f: File-like object to read from
+      previous_path: Previous path for decompression
+    Returns:
+      tuple of (decompressed_path, bytes_consumed)
+    """
+    # Decode the varint for remove_len by reading byte by byte
+    remove_len = 0
+    shift = 0
+    bytes_consumed = 0
+
+    while True:
+        byte_data = f.read(1)
+        if not byte_data:
+            raise ValueError("Unexpected end of file while reading varint")
+        byte = byte_data[0]
+        bytes_consumed += 1
+        remove_len |= (byte & 0x7F) << shift
+        shift += 7
+        if not (byte & 0x80):  # No continuation bit
+            break
+
+    # Read the suffix until NUL terminator
+    suffix = b""
+    while True:
+        byte_data = f.read(1)
+        if not byte_data:
+            raise ValueError("Unexpected end of file while reading path suffix")
+        byte = byte_data[0]
+        bytes_consumed += 1
+        if byte == 0:  # NUL terminator
+            break
+        suffix += bytes([byte])
+
+    # Reconstruct the path
+    if remove_len > len(previous_path):
+        raise ValueError(
+            f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
+        )
+
+    prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
+    path = prefix + suffix
+
+    return path, bytes_consumed
+
 
 class Stage(Enum):
     NORMAL = 0
@@ -95,6 +269,83 @@ class SerializedIndexEntry:
         return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
 
 
+@dataclass
+class IndexExtension:
+    """Base class for index extensions."""
+
+    signature: bytes
+    data: bytes
+
+    @classmethod
+    def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
+        """Create an extension from raw data.
+
+        Args:
+          signature: 4-byte extension signature
+          data: Extension data
+        Returns:
+          Parsed extension object
+        """
+        if signature == TREE_EXTENSION:
+            return TreeExtension.from_bytes(data)
+        elif signature == REUC_EXTENSION:
+            return ResolveUndoExtension.from_bytes(data)
+        elif signature == UNTR_EXTENSION:
+            return UntrackedExtension.from_bytes(data)
+        else:
+            # Unknown extension - just store raw data
+            return cls(signature, data)
+
+    def to_bytes(self) -> bytes:
+        """Serialize extension to bytes."""
+        return self.data
+
+
+class TreeExtension(IndexExtension):
+    """Tree cache extension."""
+
+    def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
+        self.entries = entries
+        super().__init__(TREE_EXTENSION, b"")
+
+    @classmethod
+    def from_bytes(cls, data: bytes) -> "TreeExtension":
+        # TODO: Implement tree cache parsing
+        return cls([])
+
+    def to_bytes(self) -> bytes:
+        # TODO: Implement tree cache serialization
+        return b""
+
+
+class ResolveUndoExtension(IndexExtension):
+    """Resolve undo extension for recording merge conflicts."""
+
+    def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
+        self.entries = entries
+        super().__init__(REUC_EXTENSION, b"")
+
+    @classmethod
+    def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
+        # TODO: Implement resolve undo parsing
+        return cls([])
+
+    def to_bytes(self) -> bytes:
+        # TODO: Implement resolve undo serialization
+        return b""
+
+
+class UntrackedExtension(IndexExtension):
+    """Untracked cache extension."""
+
+    def __init__(self, data: bytes) -> None:
+        super().__init__(UNTR_EXTENSION, data)
+
+    @classmethod
+    def from_bytes(cls, data: bytes) -> "UntrackedExtension":
+        return cls(data)
+
+
 @dataclass
 class IndexEntry:
     ctime: Union[int, float, tuple[int, int]]
@@ -241,11 +492,15 @@ def write_cache_time(f, t) -> None:
     f.write(struct.pack(">LL", *t))
 
 
-def read_cache_entry(f, version: int) -> SerializedIndexEntry:
+def read_cache_entry(
+    f, version: int, previous_path: bytes = b""
+) -> SerializedIndexEntry:
     """Read an entry from a cache file.
 
     Args:
       f: File-like object to read from
+      version: Index version
+      previous_path: Previous entry's path (for version 4 compression)
     """
     beginoffset = f.tell()
     ctime = read_cache_time(f)
@@ -266,11 +521,19 @@ def read_cache_entry(f, version: int) -> SerializedIndexEntry:
         (extended_flags,) = struct.unpack(">H", f.read(2))
     else:
         extended_flags = 0
-    name = f.read(flags & FLAG_NAMEMASK)
+
+    if version >= 4:
+        # Version 4: paths are always compressed (name_len should be 0)
+        name, consumed = _decompress_path_from_stream(f, previous_path)
+    else:
+        # Versions < 4: regular name reading
+        name = f.read(flags & FLAG_NAMEMASK)
+
     # Padding:
     if version < 4:
         real_size = (f.tell() - beginoffset + 8) & ~7
         f.read((beginoffset + real_size) - f.tell())
+
     return SerializedIndexEntry(
         name,
         ctime,
@@ -287,21 +550,35 @@ def read_cache_entry(f, version: int) -> SerializedIndexEntry:
     )
 
 
-def write_cache_entry(f, entry: SerializedIndexEntry, version: int) -> None:
+def write_cache_entry(
+    f, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
+) -> None:
     """Write an index entry to a file.
 
     Args:
       f: File object
-      entry: IndexEntry to write, tuple with:
+      entry: IndexEntry to write
+      version: Index format version
+      previous_path: Previous entry's path (for version 4 compression)
     """
     beginoffset = f.tell()
     write_cache_time(f, entry.ctime)
     write_cache_time(f, entry.mtime)
-    flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
+
+    if version >= 4:
+        # Version 4: use compression but set name_len to actual filename length
+        # This matches how C Git implements index v4 flags
+        compressed_path = _compress_path(entry.name, previous_path)
+        flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
+    else:
+        # Versions < 4: include actual name length
+        flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
+
     if entry.extended_flags:
         flags |= FLAG_EXTENDED
     if flags & FLAG_EXTENDED and version is not None and version < 3:
         raise AssertionError("unable to use extended flags in version < 3")
+
     f.write(
         struct.pack(
             b">LLLLLL20sH",
@@ -317,8 +594,13 @@ def write_cache_entry(f, entry: SerializedIndexEntry, version: int) -> None:
     )
     if flags & FLAG_EXTENDED:
         f.write(struct.pack(b">H", entry.extended_flags))
-    f.write(entry.name)
-    if version < 4:
+
+    if version >= 4:
+        # Version 4: always write compressed path
+        f.write(compressed_path)
+    else:
+        # Versions < 4: write regular path and padding
+        f.write(entry.name)
         real_size = (f.tell() - beginoffset + 8) & ~7
         f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
 
@@ -330,16 +612,74 @@ class UnsupportedIndexFormat(Exception):
         self.index_format_version = version
 
 
-def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
-    """Read an index file, yielding the individual entries."""
+def read_index_header(f: BinaryIO) -> tuple[int, int]:
+    """Read an index header from a file.
+
+    Returns:
+      tuple of (version, num_entries)
+    """
     header = f.read(4)
     if header != b"DIRC":
         raise AssertionError(f"Invalid index file header: {header!r}")
     (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
-    if version not in (1, 2, 3):
+    if version not in (1, 2, 3, 4):
         raise UnsupportedIndexFormat(version)
+    return version, num_entries
+
+
+def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
+    """Write an index extension.
+
+    Args:
+      f: File-like object to write to
+      extension: Extension to write
+    """
+    data = extension.to_bytes()
+    f.write(extension.signature)
+    f.write(struct.pack(">I", len(data)))
+    f.write(data)
+
+
+def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
+    """Read an index file, yielding the individual entries."""
+    version, num_entries = read_index_header(f)
+    previous_path = b""
     for i in range(num_entries):
-        yield read_cache_entry(f, version)
+        entry = read_cache_entry(f, version, previous_path)
+        previous_path = entry.name
+        yield entry
+
+
+def read_index_dict_with_version(
+    f: BinaryIO,
+) -> tuple[dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int]:
+    """Read an index file and return it as a dictionary along with the version.
+
+    Returns:
+      tuple of (entries_dict, version)
+    """
+    version, num_entries = read_index_header(f)
+
+    ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
+    previous_path = b""
+    for i in range(num_entries):
+        entry = read_cache_entry(f, version, previous_path)
+        previous_path = entry.name
+        stage = entry.stage()
+        if stage == Stage.NORMAL:
+            ret[entry.name] = IndexEntry.from_serialized(entry)
+        else:
+            existing = ret.setdefault(entry.name, ConflictedIndexEntry())
+            if isinstance(existing, IndexEntry):
+                raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
+            if stage == Stage.MERGE_CONFLICT_ANCESTOR:
+                existing.ancestor = IndexEntry.from_serialized(entry)
+            elif stage == Stage.MERGE_CONFLICT_THIS:
+                existing.this = IndexEntry.from_serialized(entry)
+            elif stage == Stage.MERGE_CONFLICT_OTHER:
+                existing.other = IndexEntry.from_serialized(entry)
+
+    return ret, version
 
 
 def read_index_dict(f) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
@@ -393,8 +733,10 @@ def write_index(
     # Proceed with the existing code to write the header and entries.
     f.write(b"DIRC")
     f.write(struct.pack(b">LL", version, len(entries)))
+    previous_path = b""
     for entry in entries:
-        write_cache_entry(f, entry, version=version)
+        write_cache_entry(f, entry, version=version, previous_path=previous_path)
+        previous_path = entry.name
 
 
 def write_index_dict(
@@ -454,16 +796,25 @@ class Index:
 
     _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
 
-    def __init__(self, filename: Union[bytes, str], read=True) -> None:
+    def __init__(
+        self,
+        filename: Union[bytes, str],
+        read=True,
+        skip_hash: bool = False,
+        version: Optional[int] = None,
+    ) -> None:
         """Create an index object associated with the given filename.
 
         Args:
           filename: Path to the index file
           read: Whether to initialize the index from the given file, should it exist.
+          skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
+          version: Index format version to use (None = auto-detect from file or use default)
         """
         self._filename = filename
         # TODO(jelmer): Store the version returned by read_index
-        self._version = None
+        self._version = version
+        self._skip_hash = skip_hash
         self.clear()
         if read:
             self.read()
@@ -479,10 +830,19 @@ class Index:
         """Write current contents of index to disk."""
         f = GitFile(self._filename, "wb")
         try:
-            f = SHA1Writer(f)
-            write_index_dict(f, self._byname, version=self._version)
-        finally:
+            if self._skip_hash:
+                # When skipHash is enabled, write the index without computing SHA1
+                write_index_dict(f, self._byname, version=self._version)
+                # Write 20 zero bytes instead of SHA1
+                f.write(b"\x00" * 20)
+                f.close()
+            else:
+                f = SHA1Writer(f)
+                write_index_dict(f, self._byname, version=self._version)
+                f.close()
+        except:
             f.close()
+            raise
 
     def read(self) -> None:
         """Read current contents of index from disk."""
@@ -491,9 +851,13 @@ class Index:
         f = GitFile(self._filename, "rb")
         try:
             f = SHA1Reader(f)
-            self.update(read_index_dict(f))
-            # FIXME: Additional data?
-            f.read(os.path.getsize(self._filename) - f.tell() - 20)
+            entries, version = read_index_dict_with_version(f)
+            self._version = version
+            self.update(entries)
+            # Read any remaining data before the SHA
+            remaining = os.path.getsize(self._filename) - f.tell() - 20
+            if remaining > 0:
+                f.read(remaining)
             f.check_sha(allow_empty=True)
         finally:
             f.close()

+ 25 - 1
dulwich/repo.py

@@ -1369,7 +1369,31 @@ class Repo(BaseRepo):
 
         if not self.has_index():
             raise NoIndexPresent
-        return Index(self.index_path())
+
+        # Check for manyFiles feature configuration
+        config = self.get_config_stack()
+        many_files = config.get_boolean(b"feature", b"manyFiles", False)
+        skip_hash = False
+        index_version = None
+
+        if many_files:
+            # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
+            try:
+                index_version_str = config.get(b"index", b"version")
+                index_version = int(index_version_str)
+            except KeyError:
+                index_version = 4  # Default to version 4 for manyFiles
+            skip_hash = config.get_boolean(b"index", b"skipHash", True)
+        else:
+            # Check for explicit index settings
+            try:
+                index_version_str = config.get(b"index", b"version")
+                index_version = int(index_version_str)
+            except KeyError:
+                index_version = None
+            skip_hash = config.get_boolean(b"index", b"skipHash", False)
+
+        return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
 
     def has_index(self) -> bool:
         """Check if an index is present."""

+ 924 - 0
tests/compat/test_index.py

@@ -0,0 +1,924 @@
+# test_index.py -- Git index compatibility tests
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Compatibility tests for Git index format v4."""
+
+import os
+import tempfile
+
+from dulwich.index import Index, read_index_dict_with_version, write_index_dict
+from dulwich.repo import Repo
+
+from .utils import CompatTestCase, require_git_version, run_git_or_fail
+
+
+class IndexV4CompatTestCase(CompatTestCase):
+    """Tests for Git index format v4 compatibility with C Git."""
+
+    def setUp(self) -> None:
+        super().setUp()
+        self.tempdir = tempfile.mkdtemp()
+        self.addCleanup(self._cleanup)
+
+    def _cleanup(self) -> None:
+        import shutil
+
+        shutil.rmtree(self.tempdir, ignore_errors=True)
+
+    def _init_repo_with_manyfiles(self) -> Repo:
+        """Initialize a repo with manyFiles feature enabled."""
+        # Create repo
+        repo_path = os.path.join(self.tempdir, "test_repo")
+        os.mkdir(repo_path)
+
+        # Initialize with C git and enable manyFiles
+        run_git_or_fail(["init"], cwd=repo_path)
+        run_git_or_fail(["config", "feature.manyFiles", "true"], cwd=repo_path)
+
+        # Open with dulwich
+        return Repo(repo_path)
+
+    def test_index_v4_path_compression(self) -> None:
+        """Test that dulwich can read and write index v4 with path compression."""
+        require_git_version((2, 20, 0))  # manyFiles feature requires newer Git
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create test files with paths that will benefit from compression
+        test_files = [
+            "dir1/subdir/file1.txt",
+            "dir1/subdir/file2.txt",
+            "dir1/subdir/file3.txt",
+            "dir2/another/path.txt",
+            "dir2/another/path2.txt",
+            "file_at_root.txt",
+        ]
+
+        for path in test_files:
+            full_path = os.path.join(repo.path, path)
+            os.makedirs(os.path.dirname(full_path), exist_ok=True)
+            with open(full_path, "w") as f:
+                f.write(f"content of {path}\n")
+
+        # Add files with C git - this should create index v4
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Read the index with dulwich
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        # Verify it's version 4
+        self.assertEqual(version, 4)
+
+        # Verify all files are in the index
+        self.assertEqual(len(entries), len(test_files))
+        for path in test_files:
+            self.assertIn(path.encode(), entries)
+
+        # Write the index back with dulwich
+        with open(index_path + ".dulwich", "wb") as f:
+            write_index_dict(f, entries, version=4)
+
+        # Compare with C git - use git ls-files to read both indexes
+        output1 = run_git_or_fail(["ls-files", "--stage"], cwd=repo.path)
+
+        # Replace index with dulwich version
+        os.rename(index_path + ".dulwich", index_path)
+
+        output2 = run_git_or_fail(["ls-files", "--stage"], cwd=repo.path)
+
+        # Both outputs should be identical
+        self.assertEqual(output1, output2)
+
+    def test_index_v4_round_trip(self) -> None:
+        """Test round-trip: C Git write -> dulwich read -> dulwich write -> C Git read."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create files that test various edge cases
+        test_files = [
+            "a",  # Very short name
+            "abc/def/ghi/jkl/mno/pqr/stu/vwx/yz.txt",  # Deep path
+            "same_prefix_1.txt",
+            "same_prefix_2.txt",
+            "same_prefix_3.txt",
+            "different/path/here.txt",
+        ]
+
+        for path in test_files:
+            full_path = os.path.join(repo.path, path)
+            os.makedirs(os.path.dirname(full_path), exist_ok=True)
+            with open(full_path, "w") as f:
+                f.write("test content\n")
+
+        # Stage with C Git
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Get original state
+        original_output = run_git_or_fail(["ls-files", "--stage"], cwd=repo.path)
+
+        # Read with dulwich, write back
+        index = Index(os.path.join(repo.path, ".git", "index"))
+        index.write()
+
+        # Verify C Git can still read it
+        final_output = run_git_or_fail(["ls-files", "--stage"], cwd=repo.path)
+        self.assertEqual(original_output, final_output)
+
+    def test_index_v4_skip_hash(self) -> None:
+        """Test index v4 with skipHash extension."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Enable skipHash
+        run_git_or_fail(["config", "index.skipHash", "true"], cwd=repo.path)
+
+        # Create a file
+        test_file = os.path.join(repo.path, "test.txt")
+        with open(test_file, "w") as f:
+            f.write("test content\n")
+
+        # Add with C Git
+        run_git_or_fail(["add", "test.txt"], cwd=repo.path)
+
+        # Read the index
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertIn(b"test.txt", entries)
+
+        # Verify skipHash is active by checking last 20 bytes
+        with open(index_path, "rb") as f:
+            f.seek(-20, 2)
+            last_bytes = f.read(20)
+            self.assertEqual(last_bytes, b"\x00" * 20)
+
+        # Write with dulwich (with skipHash)
+        index = Index(index_path, skip_hash=True, version=4)
+        index.write()
+
+        # Verify C Git can read it
+        output = run_git_or_fail(["ls-files"], cwd=repo.path)
+        self.assertEqual(output.strip(), b"test.txt")
+
+    def test_index_v4_with_various_filenames(self) -> None:
+        """Test v4 with various filename patterns."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Test various filename patterns that might trigger different behaviors
+        test_files = [
+            "a",  # Single character
+            "ab",  # Two characters
+            "abc",  # Three characters
+            "file.txt",  # Normal filename
+            "very_long_filename_to_test_edge_cases.extension",  # Long filename
+            "dir/file.txt",  # With directory
+            "dir1/dir2/dir3/file.txt",  # Deep directory
+            "unicode_café.txt",  # Unicode filename
+            "with-dashes-and_underscores.txt",  # Special chars
+            ".hidden",  # Hidden file
+            "UPPERCASE.TXT",  # Uppercase
+        ]
+
+        for filename in test_files:
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w", encoding="utf-8") as f:
+                f.write(f"Content of {filename}\n")
+
+        # Add all files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Read with dulwich
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), len(test_files))
+
+        # Verify all filenames are correctly stored
+        for filename in test_files:
+            filename_bytes = filename.encode("utf-8")
+            self.assertIn(filename_bytes, entries)
+
+        # Test round-trip: dulwich write -> C Git read
+        with open(index_path + ".dulwich", "wb") as f:
+            write_index_dict(f, entries, version=4)
+
+        # Replace index
+        os.rename(index_path + ".dulwich", index_path)
+
+        # Verify C Git can read all files
+        output = run_git_or_fail(["ls-files"], cwd=repo.path)
+        git_files = set(output.strip().split(b"\n"))
+        expected_files = {f.encode("utf-8") for f in test_files}
+        self.assertEqual(git_files, expected_files)
+
+    def test_index_v4_path_compression_scenarios(self) -> None:
+        """Test various scenarios where path compression should/shouldn't be used."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create files that should trigger compression
+        compression_files = [
+            "src/main/java/com/example/Service.java",
+            "src/main/java/com/example/Controller.java",
+            "src/main/java/com/example/Repository.java",
+            "src/test/java/com/example/ServiceTest.java",
+        ]
+
+        # Create files that shouldn't benefit much from compression
+        no_compression_files = [
+            "README.md",
+            "LICENSE",
+            "docs/guide.txt",
+            "config/settings.json",
+        ]
+
+        all_files = compression_files + no_compression_files
+
+        for filename in all_files:
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w") as f:
+                f.write(f"Content of {filename}\n")
+
+        # Add files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Read the index
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), len(all_files))
+
+        # Verify all files are present
+        for filename in all_files:
+            self.assertIn(filename.encode(), entries)
+
+        # Test that dulwich can write a compatible index
+        with open(index_path + ".dulwich", "wb") as f:
+            write_index_dict(f, entries, version=4)
+
+        # Verify the written index is smaller (compression should help)
+        original_size = os.path.getsize(index_path)
+        dulwich_size = os.path.getsize(index_path + ".dulwich")
+
+        # Allow some variance due to different compression decisions
+        self.assertLess(abs(original_size - dulwich_size), original_size * 0.2)
+
+    def test_index_v4_with_extensions(self) -> None:
+        """Test v4 index with various extensions."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create some files
+        files = ["file1.txt", "file2.txt", "dir/file3.txt"]
+        for filename in files:
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w") as f:
+                f.write("content\n")
+
+        # Add files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Enable untracked cache (creates UNTR extension)
+        run_git_or_fail(["config", "core.untrackedCache", "true"], cwd=repo.path)
+        run_git_or_fail(["status"], cwd=repo.path)  # Trigger cache update
+
+        # Read index with extensions
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), len(files))
+
+        # Test round-trip with extensions present
+        index = Index(index_path)
+        index.write()
+
+        # Verify C Git can still read it
+        output = run_git_or_fail(["ls-files"], cwd=repo.path)
+        git_files = set(output.strip().split(b"\n"))
+        expected_files = {f.encode() for f in files}
+        self.assertEqual(git_files, expected_files)
+
+    def test_index_v4_empty_repository(self) -> None:
+        """Test v4 index behavior with empty repository."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create empty commit to get an index file
+        run_git_or_fail(["commit", "--allow-empty", "-m", "empty"], cwd=repo.path)
+
+        # Read the empty index
+        index_path = os.path.join(repo.path, ".git", "index")
+        if os.path.exists(index_path):
+            with open(index_path, "rb") as f:
+                entries, version = read_index_dict_with_version(f)
+
+            # Even empty indexes should be readable
+            self.assertEqual(len(entries), 0)
+
+            # Test writing empty index
+            with open(index_path + ".dulwich", "wb") as f:
+                write_index_dict(f, entries, version=version)
+
+    def test_index_v4_large_file_count(self) -> None:
+        """Test v4 index with many files (stress test)."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create many files with similar paths to test compression
+        files = []
+        for i in range(50):  # Reasonable number for CI
+            filename = f"src/component_{i:03d}/index.js"
+            files.append(filename)
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w") as f:
+                f.write(f"// Component {i}\nexport default {{}};")
+
+        # Add all files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Read index
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), len(files))
+
+        # Test dulwich can handle large indexes
+        index = Index(index_path)
+        index.write()
+
+        # Verify all files are still present
+        output = run_git_or_fail(["ls-files"], cwd=repo.path)
+        git_files = output.strip().split(b"\n")
+        self.assertEqual(len(git_files), len(files))
+
+    def test_index_v4_concurrent_modifications(self) -> None:
+        """Test v4 index behavior with file modifications."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create initial files
+        files = ["file1.txt", "file2.txt", "subdir/file3.txt"]
+        for filename in files:
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w") as f:
+                f.write("initial content\n")
+
+        # Add files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Modify some files
+        with open(os.path.join(repo.path, "file1.txt"), "w") as f:
+            f.write("modified content\n")
+
+        # Add new file
+        with open(os.path.join(repo.path, "file4.txt"), "w") as f:
+            f.write("new file\n")
+        run_git_or_fail(["add", "file4.txt"], cwd=repo.path)
+
+        # Test dulwich can read the updated index
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), 4)  # 3 original + 1 new
+
+        # Verify specific files
+        self.assertIn(b"file1.txt", entries)
+        self.assertIn(b"file4.txt", entries)
+
+        # Test round-trip
+        index = Index(index_path)
+        index.write()
+
+        # Verify state is preserved
+        output = run_git_or_fail(["ls-files"], cwd=repo.path)
+        self.assertIn(b"file4.txt", output)
+
+    def test_index_v4_with_merge_conflicts(self) -> None:
+        """Test v4 index behavior with merge conflicts and staging."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create initial commit
+        with open(os.path.join(repo.path, "conflict.txt"), "w") as f:
+            f.write("original content\n")
+        with open(os.path.join(repo.path, "normal.txt"), "w") as f:
+            f.write("normal file\n")
+
+        run_git_or_fail(["add", "."], cwd=repo.path)
+        run_git_or_fail(["commit", "-m", "initial"], cwd=repo.path)
+
+        # Create branch and modify file
+        run_git_or_fail(["checkout", "-b", "feature"], cwd=repo.path)
+        with open(os.path.join(repo.path, "conflict.txt"), "w") as f:
+            f.write("feature content\n")
+        run_git_or_fail(["add", "conflict.txt"], cwd=repo.path)
+        run_git_or_fail(["commit", "-m", "feature change"], cwd=repo.path)
+
+        # Go back to main and make conflicting change
+        run_git_or_fail(["checkout", "master"], cwd=repo.path)
+        with open(os.path.join(repo.path, "conflict.txt"), "w") as f:
+            f.write("master content\n")
+        run_git_or_fail(["add", "conflict.txt"], cwd=repo.path)
+        run_git_or_fail(["commit", "-m", "master change"], cwd=repo.path)
+
+        # Try to merge (should create conflicts)
+        run_git_or_fail(["merge", "feature"], cwd=repo.path, check=False)
+
+        # Read the index with conflicts
+        index_path = os.path.join(repo.path, ".git", "index")
+        if os.path.exists(index_path):
+            with open(index_path, "rb") as f:
+                entries, version = read_index_dict_with_version(f)
+
+            self.assertEqual(version, 4)
+
+            # Test dulwich can handle conflicted index
+            index = Index(index_path)
+            index.write()
+
+            # Verify Git can still read it
+            output = run_git_or_fail(["status", "--porcelain"], cwd=repo.path)
+            self.assertIn(b"conflict.txt", output)
+
+    def test_index_v4_boundary_filename_lengths(self) -> None:
+        """Test v4 with boundary conditions for filename lengths."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Test various boundary conditions
+        boundary_files = [
+            "",  # Empty name (invalid, but test robustness)
+            "x",  # Single char
+            "xx",  # Two chars
+            "x" * 255,  # Max typical filename length
+            "x" * 4095,  # Max path length in many filesystems
+            "a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z",  # Deep nesting
+            "file_with_" + "very_" * 50 + "long_name.txt",  # Very long name
+        ]
+
+        valid_files = []
+        for filename in boundary_files:
+            if not filename:  # Skip empty filename
+                continue
+            try:
+                filepath = os.path.join(repo.path, filename)
+                os.makedirs(os.path.dirname(filepath), exist_ok=True)
+                with open(filepath, "w") as f:
+                    f.write("Content\n")
+                valid_files.append(filename)
+            except (OSError, ValueError):
+                # Skip files that can't be created on this system
+                continue
+
+        if valid_files:
+            # Add files
+            run_git_or_fail(["add", "."], cwd=repo.path)
+
+            # Test reading
+            index_path = os.path.join(repo.path, ".git", "index")
+            with open(index_path, "rb") as f:
+                entries, version = read_index_dict_with_version(f)
+
+            self.assertEqual(version, 4)
+
+            # Test round-trip
+            index = Index(index_path)
+            index.write()
+
+    def test_index_v4_special_characters_and_encoding(self) -> None:
+        """Test v4 with special characters and various encodings."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Test files with special characters
+        special_files = [
+            "file with spaces.txt",
+            "file\twith\ttabs.txt",
+            "file-with-dashes.txt",
+            "file_with_underscores.txt",
+            "file.with.dots.txt",
+            "UPPERCASE.TXT",
+            "MixedCase.TxT",
+            "file123numbers.txt",
+            "file@#$%special.txt",
+            "café.txt",  # Unicode
+            "файл.txt",  # Cyrillic
+            "文件.txt",  # Chinese
+            "🚀rocket.txt",  # Emoji
+            "file'with'quotes.txt",
+            'file"with"doublequotes.txt',
+            "file[with]brackets.txt",
+            "file(with)parens.txt",
+            "file{with}braces.txt",
+        ]
+
+        valid_files = []
+        for filename in special_files:
+            try:
+                filepath = os.path.join(repo.path, filename)
+                with open(filepath, "w", encoding="utf-8") as f:
+                    f.write(f"Content of {filename}\n")
+                valid_files.append(filename)
+            except (OSError, UnicodeError):
+                # Skip files that can't be created on this system
+                continue
+
+        if valid_files:
+            # Add files
+            run_git_or_fail(["add", "."], cwd=repo.path)
+
+            # Test reading
+            index_path = os.path.join(repo.path, ".git", "index")
+            with open(index_path, "rb") as f:
+                entries, version = read_index_dict_with_version(f)
+
+            self.assertEqual(version, 4)
+            self.assertGreater(len(entries), 0)
+
+            # Test all valid files are present
+            for filename in valid_files:
+                filename_bytes = filename.encode("utf-8")
+                self.assertIn(filename_bytes, entries)
+
+    def test_index_v4_symlinks_and_special_modes(self) -> None:
+        """Test v4 with symlinks and special file modes."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create regular file
+        with open(os.path.join(repo.path, "regular.txt"), "w") as f:
+            f.write("regular file\n")
+
+        # Create executable file
+        exec_path = os.path.join(repo.path, "executable.sh")
+        with open(exec_path, "w") as f:
+            f.write("#!/bin/bash\necho hello\n")
+        os.chmod(exec_path, 0o755)
+
+        # Create symlink (if supported)
+        try:
+            os.symlink("regular.txt", os.path.join(repo.path, "symlink.txt"))
+            has_symlink = True
+        except (OSError, NotImplementedError):
+            has_symlink = False
+
+        # Add files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Test reading
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+
+        # Verify files with different modes
+        self.assertIn(b"regular.txt", entries)
+        self.assertIn(b"executable.sh", entries)
+        if has_symlink:
+            self.assertIn(b"symlink.txt", entries)
+
+        # Test round-trip preserves modes
+        index = Index(index_path)
+        index.write()
+
+        # Verify Git can read it
+        output = run_git_or_fail(["ls-files", "-s"], cwd=repo.path)
+        self.assertIn(b"regular.txt", output)
+        self.assertIn(b"executable.sh", output)
+
+    def test_index_v4_alternating_compression_patterns(self) -> None:
+        """Test v4 with files that alternate between compressed/uncompressed."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create files that should create alternating compression patterns
+        files = [
+            # These should be uncompressed (no common prefix)
+            "a.txt",
+            "b.txt",
+            "c.txt",
+            # These should be compressed (common prefix)
+            "common/path/file1.txt",
+            "common/path/file2.txt",
+            "common/path/file3.txt",
+            # Back to uncompressed (different pattern)
+            "different/structure/x.txt",
+            "another/structure/y.txt",
+            # More compression opportunities
+            "src/main/Component1.java",
+            "src/main/Component2.java",
+            "src/test/Test1.java",
+            "src/test/Test2.java",
+        ]
+
+        for filename in files:
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w") as f:
+                f.write(f"Content of {filename}\n")
+
+        # Add files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Test reading
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), len(files))
+
+        # Verify all files are present
+        for filename in files:
+            self.assertIn(filename.encode(), entries)
+
+        # Test round-trip
+        index = Index(index_path)
+        index.write()
+
+    def test_index_v4_git_submodules(self) -> None:
+        """Test v4 index with Git submodules."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create a submodule directory structure
+        submodule_dir = os.path.join(repo.path, "submodule")
+        os.makedirs(submodule_dir)
+
+        # Initialize a separate repo for the submodule
+        run_git_or_fail(["init"], cwd=submodule_dir)
+        with open(os.path.join(submodule_dir, "sub.txt"), "w") as f:
+            f.write("submodule content\n")
+        run_git_or_fail(["add", "sub.txt"], cwd=submodule_dir)
+        run_git_or_fail(["commit", "-m", "submodule commit"], cwd=submodule_dir)
+
+        # Add some regular files to main repo
+        with open(os.path.join(repo.path, "main.txt"), "w") as f:
+            f.write("main repo content\n")
+        run_git_or_fail(["add", "main.txt"], cwd=repo.path)
+
+        # Add submodule (this creates a gitlink entry)
+        run_git_or_fail(["submodule", "add", "./submodule", "submodule"], cwd=repo.path)
+
+        # Test reading index with submodule
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+
+        # Should have main.txt, .gitmodules, and submodule gitlink
+        self.assertIn(b"main.txt", entries)
+        self.assertIn(b".gitmodules", entries)
+        self.assertIn(b"submodule", entries)
+
+        # Test round-trip
+        index = Index(index_path)
+        index.write()
+
+    def test_index_v4_partial_staging(self) -> None:
+        """Test v4 with partial file staging (git add -p simulation)."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create initial file
+        filepath = os.path.join(repo.path, "partial.txt")
+        with open(filepath, "w") as f:
+            f.write("line1\nline2\nline3\n")
+
+        run_git_or_fail(["add", "partial.txt"], cwd=repo.path)
+        run_git_or_fail(["commit", "-m", "initial"], cwd=repo.path)
+
+        # Modify the file
+        with open(filepath, "w") as f:
+            f.write("line1 modified\nline2\nline3 modified\n")
+
+        # Stage only part of the changes (simulate git add -p)
+        # This creates an interesting index state
+        run_git_or_fail(["add", "partial.txt"], cwd=repo.path)
+
+        # Make more changes
+        with open(filepath, "w") as f:
+            f.write("line1 modified\nline2 modified\nline3 modified\n")
+
+        # Now we have staged and unstaged changes
+        # Test reading this complex index state
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertIn(b"partial.txt", entries)
+
+        # Test round-trip
+        index = Index(index_path)
+        index.write()
+
+    def test_index_v4_with_gitattributes_and_ignore(self) -> None:
+        """Test v4 with .gitattributes and .gitignore files."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create .gitignore
+        with open(os.path.join(repo.path, ".gitignore"), "w") as f:
+            f.write("*.tmp\n*.log\nbuild/\n")
+
+        # Create .gitattributes
+        with open(os.path.join(repo.path, ".gitattributes"), "w") as f:
+            f.write("*.txt text\n*.bin binary\n")
+
+        # Create various files
+        files = [
+            "regular.txt",
+            "binary.bin",
+            "script.sh",
+            "config.json",
+            "README.md",
+        ]
+
+        for filename in files:
+            filepath = os.path.join(repo.path, filename)
+            with open(filepath, "w") as f:
+                f.write(f"Content of {filename}\n")
+
+        # Create some files that should be ignored
+        with open(os.path.join(repo.path, "temp.tmp"), "w") as f:
+            f.write("temporary file\n")
+
+        # Add files
+        run_git_or_fail(["add", "."], cwd=repo.path)
+
+        # Test reading
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+
+        # Should have .gitignore, .gitattributes, and regular files
+        self.assertIn(b".gitignore", entries)
+        self.assertIn(b".gitattributes", entries)
+        for filename in files:
+            self.assertIn(filename.encode(), entries)
+
+        # Should NOT have ignored files
+        self.assertNotIn(b"temp.tmp", entries)
+
+    def test_index_v4_stress_test_many_entries(self) -> None:
+        """Stress test v4 with many entries in complex directory structure."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create a complex directory structure with many files
+        dirs = [
+            "src/main/java/com/example",
+            "src/main/resources",
+            "src/test/java/com/example",
+            "docs/api",
+            "docs/user",
+            "scripts/build",
+            "config/env",
+        ]
+
+        for dir_path in dirs:
+            os.makedirs(os.path.join(repo.path, dir_path), exist_ok=True)
+
+        # Create many files
+        files = []
+        for i in range(200):  # Reasonable for CI
+            if i % 7 == 0:
+                filename = f"src/main/java/com/example/Service{i}.java"
+            elif i % 7 == 1:
+                filename = f"src/test/java/com/example/Test{i}.java"
+            elif i % 7 == 2:
+                filename = f"docs/api/page{i}.md"
+            elif i % 7 == 3:
+                filename = f"config/env/config{i}.properties"
+            elif i % 7 == 4:
+                filename = f"scripts/build/script{i}.sh"
+            elif i % 7 == 5:
+                filename = f"src/main/resources/resource{i}.txt"
+            else:
+                filename = f"file{i}.txt"
+
+            files.append(filename)
+            filepath = os.path.join(repo.path, filename)
+            os.makedirs(os.path.dirname(filepath), exist_ok=True)
+            with open(filepath, "w") as f:
+                f.write(f"// File {i}\ncontent here\n")
+
+        # Add files in batches to avoid command line length limits
+        batch_size = 50
+        for i in range(0, len(files), batch_size):
+            batch = files[i : i + batch_size]
+            run_git_or_fail(["add", *batch], cwd=repo.path)
+
+        # Test reading large index
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+        self.assertEqual(len(entries), len(files))
+
+        # Verify some files are present
+        for i in range(0, len(files), 20):  # Check every 20th file
+            filename = files[i]
+            self.assertIn(filename.encode(), entries)
+
+    def test_index_v4_rename_detection_scenario(self) -> None:
+        """Test v4 with file renames (complex staging scenario)."""
+        require_git_version((2, 20, 0))
+
+        repo = self._init_repo_with_manyfiles()
+
+        # Create initial files
+        files = ["old1.txt", "old2.txt", "unchanged.txt"]
+        for filename in files:
+            filepath = os.path.join(repo.path, filename)
+            with open(filepath, "w") as f:
+                f.write(f"Content of {filename}\n")
+
+        run_git_or_fail(["add", "."], cwd=repo.path)
+        run_git_or_fail(["commit", "-m", "initial"], cwd=repo.path)
+
+        # Rename files
+        os.rename(
+            os.path.join(repo.path, "old1.txt"), os.path.join(repo.path, "new1.txt")
+        )
+        os.rename(
+            os.path.join(repo.path, "old2.txt"), os.path.join(repo.path, "new2.txt")
+        )
+
+        # Stage renames
+        run_git_or_fail(["add", "-A"], cwd=repo.path)
+
+        # Test reading index with renames
+        index_path = os.path.join(repo.path, ".git", "index")
+        with open(index_path, "rb") as f:
+            entries, version = read_index_dict_with_version(f)
+
+        self.assertEqual(version, 4)
+
+        # Should have new names, not old names
+        self.assertIn(b"new1.txt", entries)
+        self.assertIn(b"new2.txt", entries)
+        self.assertIn(b"unchanged.txt", entries)
+        self.assertNotIn(b"old1.txt", entries)
+        self.assertNotIn(b"old2.txt", entries)

+ 355 - 0
tests/test_index.py

@@ -1211,3 +1211,358 @@ class TestIndexEntryFromPath(TestCase):
         self.assertEqual(
             sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
         )
+
+
+class TestManyFilesFeature(TestCase):
+    """Tests for the manyFiles feature (index version 4 and skipHash)."""
+
+    def setUp(self):
+        self.tempdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.tempdir)
+
+    def test_index_version_4_parsing(self):
+        """Test that index version 4 files can be parsed."""
+        index_path = os.path.join(self.tempdir, "index")
+
+        # Create an index with version 4
+        index = Index(index_path, read=False, version=4)
+
+        # Add some entries
+        entry = IndexEntry(
+            ctime=(1234567890, 0),
+            mtime=(1234567890, 0),
+            dev=1,
+            ino=1,
+            mode=0o100644,
+            uid=1000,
+            gid=1000,
+            size=5,
+            sha=b"0" * 40,
+        )
+        index[b"test.txt"] = entry
+
+        # Write and read back
+        index.write()
+
+        # Read the index back
+        index2 = Index(index_path)
+        self.assertEqual(index2._version, 4)
+        self.assertIn(b"test.txt", index2)
+
+    def test_skip_hash_feature(self):
+        """Test that skipHash feature works correctly."""
+        index_path = os.path.join(self.tempdir, "index")
+
+        # Create an index with skipHash enabled
+        index = Index(index_path, read=False, skip_hash=True)
+
+        # Add some entries
+        entry = IndexEntry(
+            ctime=(1234567890, 0),
+            mtime=(1234567890, 0),
+            dev=1,
+            ino=1,
+            mode=0o100644,
+            uid=1000,
+            gid=1000,
+            size=5,
+            sha=b"0" * 40,
+        )
+        index[b"test.txt"] = entry
+
+        # Write the index
+        index.write()
+
+        # Verify the file was written with zero hash
+        with open(index_path, "rb") as f:
+            f.seek(-20, 2)  # Seek to last 20 bytes
+            trailing_hash = f.read(20)
+            self.assertEqual(trailing_hash, b"\x00" * 20)
+
+        # Verify we can still read it back
+        index2 = Index(index_path)
+        self.assertIn(b"test.txt", index2)
+
+    def test_version_4_no_padding(self):
+        """Test that version 4 entries have no padding."""
+        # Create entries with names that would show compression benefits
+        entries = [
+            SerializedIndexEntry(
+                name=b"src/main/java/com/example/Service.java",
+                ctime=(1234567890, 0),
+                mtime=(1234567890, 0),
+                dev=1,
+                ino=1,
+                mode=0o100644,
+                uid=1000,
+                gid=1000,
+                size=5,
+                sha=b"0" * 40,
+                flags=0,
+                extended_flags=0,
+            ),
+            SerializedIndexEntry(
+                name=b"src/main/java/com/example/Controller.java",
+                ctime=(1234567890, 0),
+                mtime=(1234567890, 0),
+                dev=1,
+                ino=2,
+                mode=0o100644,
+                uid=1000,
+                gid=1000,
+                size=5,
+                sha=b"1" * 40,
+                flags=0,
+                extended_flags=0,
+            ),
+        ]
+
+        # Test version 2 (with padding, full paths)
+        buf_v2 = BytesIO()
+        from dulwich.index import write_cache_entry
+
+        previous_path = b""
+        for entry in entries:
+            # Set proper flags for v2
+            entry_v2 = SerializedIndexEntry(
+                entry.name,
+                entry.ctime,
+                entry.mtime,
+                entry.dev,
+                entry.ino,
+                entry.mode,
+                entry.uid,
+                entry.gid,
+                entry.size,
+                entry.sha,
+                len(entry.name),
+                entry.extended_flags,
+            )
+            write_cache_entry(buf_v2, entry_v2, version=2, previous_path=previous_path)
+            previous_path = entry.name
+        v2_data = buf_v2.getvalue()
+
+        # Test version 4 (path compression, no padding)
+        buf_v4 = BytesIO()
+        previous_path = b""
+        for entry in entries:
+            write_cache_entry(buf_v4, entry, version=4, previous_path=previous_path)
+            previous_path = entry.name
+        v4_data = buf_v4.getvalue()
+
+        # Version 4 should be shorter due to compression and no padding
+        self.assertLess(len(v4_data), len(v2_data))
+
+        # Both should parse correctly
+        buf_v2.seek(0)
+        from dulwich.index import read_cache_entry
+
+        previous_path = b""
+        parsed_v2_entries = []
+        for _ in entries:
+            parsed = read_cache_entry(buf_v2, version=2, previous_path=previous_path)
+            parsed_v2_entries.append(parsed)
+            previous_path = parsed.name
+
+        buf_v4.seek(0)
+        previous_path = b""
+        parsed_v4_entries = []
+        for _ in entries:
+            parsed = read_cache_entry(buf_v4, version=4, previous_path=previous_path)
+            parsed_v4_entries.append(parsed)
+            previous_path = parsed.name
+
+        # Both should have the same paths
+        for v2_entry, v4_entry in zip(parsed_v2_entries, parsed_v4_entries):
+            self.assertEqual(v2_entry.name, v4_entry.name)
+            self.assertEqual(v2_entry.sha, v4_entry.sha)
+
+
+class TestManyFilesRepoIntegration(TestCase):
+    """Tests for manyFiles feature integration with Repo."""
+
+    def setUp(self):
+        self.tempdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.tempdir)
+
+    def test_repo_with_manyfiles_config(self):
+        """Test that a repository with feature.manyFiles=true uses the right settings."""
+        from dulwich.repo import Repo
+
+        # Create a new repository
+        repo = Repo.init(self.tempdir)
+
+        # Set feature.manyFiles=true in config
+        config = repo.get_config()
+        config.set(b"feature", b"manyFiles", b"true")
+        config.write_to_path()
+
+        # Open the index - should have skipHash enabled and version 4
+        index = repo.open_index()
+        self.assertTrue(index._skip_hash)
+        self.assertEqual(index._version, 4)
+
+    def test_repo_with_explicit_index_settings(self):
+        """Test that explicit index.version and index.skipHash work."""
+        from dulwich.repo import Repo
+
+        # Create a new repository
+        repo = Repo.init(self.tempdir)
+
+        # Set explicit index settings
+        config = repo.get_config()
+        config.set(b"index", b"version", b"3")
+        config.set(b"index", b"skipHash", b"false")
+        config.write_to_path()
+
+        # Open the index - should respect explicit settings
+        index = repo.open_index()
+        self.assertFalse(index._skip_hash)
+        self.assertEqual(index._version, 3)
+
+
+class TestPathPrefixCompression(TestCase):
+    """Tests for index version 4 path prefix compression."""
+
+    def setUp(self):
+        self.tempdir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.tempdir)
+
+    def test_varint_encoding_decoding(self):
+        """Test variable-width integer encoding and decoding."""
+        from dulwich.index import _decode_varint, _encode_varint
+
+        test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
+
+        for value in test_values:
+            encoded = _encode_varint(value)
+            decoded, _ = _decode_varint(encoded, 0)
+            self.assertEqual(value, decoded, f"Failed for value {value}")
+
+    def test_path_compression_simple(self):
+        """Test simple path compression cases."""
+        from dulwich.index import _compress_path, _decompress_path
+
+        # Test case 1: No common prefix
+        compressed = _compress_path(b"file1.txt", b"")
+        decompressed, _ = _decompress_path(compressed, 0, b"")
+        self.assertEqual(b"file1.txt", decompressed)
+
+        # Test case 2: Common prefix
+        compressed = _compress_path(b"src/file2.txt", b"src/file1.txt")
+        decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
+        self.assertEqual(b"src/file2.txt", decompressed)
+
+        # Test case 3: Completely different paths
+        compressed = _compress_path(b"docs/readme.md", b"src/file1.txt")
+        decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
+        self.assertEqual(b"docs/readme.md", decompressed)
+
+    def test_path_compression_deep_directories(self):
+        """Test compression with deep directory structures."""
+        from dulwich.index import _compress_path, _decompress_path
+
+        path1 = b"src/main/java/com/example/service/UserService.java"
+        path2 = b"src/main/java/com/example/service/OrderService.java"
+        path3 = b"src/main/java/com/example/model/User.java"
+
+        # Compress path2 relative to path1
+        compressed = _compress_path(path2, path1)
+        decompressed, _ = _decompress_path(compressed, 0, path1)
+        self.assertEqual(path2, decompressed)
+
+        # Compress path3 relative to path2
+        compressed = _compress_path(path3, path2)
+        decompressed, _ = _decompress_path(compressed, 0, path2)
+        self.assertEqual(path3, decompressed)
+
+    def test_index_version_4_with_compression(self):
+        """Test full index version 4 write/read with path compression."""
+        index_path = os.path.join(self.tempdir, "index")
+
+        # Create an index with version 4
+        index = Index(index_path, read=False, version=4)
+
+        # Add multiple entries with common prefixes
+        paths = [
+            b"src/main/java/App.java",
+            b"src/main/java/Utils.java",
+            b"src/main/resources/config.properties",
+            b"src/test/java/AppTest.java",
+            b"docs/README.md",
+            b"docs/INSTALL.md",
+        ]
+
+        for i, path in enumerate(paths):
+            entry = IndexEntry(
+                ctime=(1234567890, 0),
+                mtime=(1234567890, 0),
+                dev=1,
+                ino=i + 1,
+                mode=0o100644,
+                uid=1000,
+                gid=1000,
+                size=10,
+                sha=f"{i:040d}".encode(),
+            )
+            index[path] = entry
+
+        # Write and read back
+        index.write()
+
+        # Read the index back
+        index2 = Index(index_path)
+        self.assertEqual(index2._version, 4)
+
+        # Verify all paths were preserved correctly
+        for path in paths:
+            self.assertIn(path, index2)
+
+        # Verify the index file is smaller than version 2 would be
+        with open(index_path, "rb") as f:
+            v4_size = len(f.read())
+
+        # Create equivalent version 2 index for comparison
+        index_v2_path = os.path.join(self.tempdir, "index_v2")
+        index_v2 = Index(index_v2_path, read=False, version=2)
+        for path in paths:
+            entry = IndexEntry(
+                ctime=(1234567890, 0),
+                mtime=(1234567890, 0),
+                dev=1,
+                ino=1,
+                mode=0o100644,
+                uid=1000,
+                gid=1000,
+                size=10,
+                sha=b"0" * 40,
+            )
+            index_v2[path] = entry
+        index_v2.write()
+
+        with open(index_v2_path, "rb") as f:
+            v2_size = len(f.read())
+
+        # Version 4 should be smaller due to compression
+        self.assertLess(
+            v4_size, v2_size, "Version 4 index should be smaller than version 2"
+        )
+
+    def test_path_compression_edge_cases(self):
+        """Test edge cases in path compression."""
+        from dulwich.index import _compress_path, _decompress_path
+
+        # Empty paths
+        compressed = _compress_path(b"", b"")
+        decompressed, _ = _decompress_path(compressed, 0, b"")
+        self.assertEqual(b"", decompressed)
+
+        # Path identical to previous
+        compressed = _compress_path(b"same.txt", b"same.txt")
+        decompressed, _ = _decompress_path(compressed, 0, b"same.txt")
+        self.assertEqual(b"same.txt", decompressed)
+
+        # Path shorter than previous
+        compressed = _compress_path(b"short", b"very/long/path/file.txt")
+        decompressed, _ = _decompress_path(compressed, 0, b"very/long/path/file.txt")
+        self.assertEqual(b"short", decompressed)