|
@@ -68,6 +68,13 @@ EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
|
|
|
|
|
|
DEFAULT_VERSION = 2
|
|
|
|
|
|
+# Index extension signatures
|
|
|
+TREE_EXTENSION = b"TREE"
|
|
|
+REUC_EXTENSION = b"REUC"
|
|
|
+UNTR_EXTENSION = b"UNTR"
|
|
|
+EOIE_EXTENSION = b"EOIE"
|
|
|
+IEOT_EXTENSION = b"IEOT"
|
|
|
+
|
|
|
|
|
|
def _encode_varint(value: int) -> bytes:
|
|
|
"""Encode an integer using variable-width encoding.
|
|
@@ -262,6 +269,83 @@ class SerializedIndexEntry:
|
|
|
return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
|
|
|
|
|
|
|
|
|
+@dataclass
|
|
|
+class IndexExtension:
|
|
|
+ """Base class for index extensions."""
|
|
|
+
|
|
|
+ signature: bytes
|
|
|
+ data: bytes
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
|
|
|
+ """Create an extension from raw data.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ signature: 4-byte extension signature
|
|
|
+ data: Extension data
|
|
|
+ Returns:
|
|
|
+ Parsed extension object
|
|
|
+ """
|
|
|
+ if signature == TREE_EXTENSION:
|
|
|
+ return TreeExtension.from_bytes(data)
|
|
|
+ elif signature == REUC_EXTENSION:
|
|
|
+ return ResolveUndoExtension.from_bytes(data)
|
|
|
+ elif signature == UNTR_EXTENSION:
|
|
|
+ return UntrackedExtension.from_bytes(data)
|
|
|
+ else:
|
|
|
+ # Unknown extension - just store raw data
|
|
|
+ return cls(signature, data)
|
|
|
+
|
|
|
+ def to_bytes(self) -> bytes:
|
|
|
+ """Serialize extension to bytes."""
|
|
|
+ return self.data
|
|
|
+
|
|
|
+
|
|
|
+class TreeExtension(IndexExtension):
|
|
|
+ """Tree cache extension."""
|
|
|
+
|
|
|
+ def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
|
|
|
+ self.entries = entries
|
|
|
+ super().__init__(TREE_EXTENSION, b"")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_bytes(cls, data: bytes) -> "TreeExtension":
|
|
|
+ # TODO: Implement tree cache parsing
|
|
|
+ return cls([])
|
|
|
+
|
|
|
+ def to_bytes(self) -> bytes:
|
|
|
+ # TODO: Implement tree cache serialization
|
|
|
+ return b""
|
|
|
+
|
|
|
+
|
|
|
+class ResolveUndoExtension(IndexExtension):
|
|
|
+ """Resolve undo extension for recording merge conflicts."""
|
|
|
+
|
|
|
+ def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
|
|
|
+ self.entries = entries
|
|
|
+ super().__init__(REUC_EXTENSION, b"")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
|
|
|
+ # TODO: Implement resolve undo parsing
|
|
|
+ return cls([])
|
|
|
+
|
|
|
+ def to_bytes(self) -> bytes:
|
|
|
+ # TODO: Implement resolve undo serialization
|
|
|
+ return b""
|
|
|
+
|
|
|
+
|
|
|
+class UntrackedExtension(IndexExtension):
|
|
|
+ """Untracked cache extension."""
|
|
|
+
|
|
|
+ def __init__(self, data: bytes) -> None:
|
|
|
+ super().__init__(UNTR_EXTENSION, data)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_bytes(cls, data: bytes) -> "UntrackedExtension":
|
|
|
+ return cls(data)
|
|
|
+
|
|
|
+
|
|
|
@dataclass
|
|
|
class IndexEntry:
|
|
|
ctime: Union[int, float, tuple[int, int]]
|
|
@@ -439,15 +523,13 @@ def read_cache_entry(
|
|
|
extended_flags = 0
|
|
|
|
|
|
if version >= 4:
|
|
|
- # Version 4: path is compressed, name length should be 0
|
|
|
name_len = flags & FLAG_NAMEMASK
|
|
|
- if name_len != 0:
|
|
|
- raise ValueError(
|
|
|
- f"Non-zero name length {name_len} in version 4 index entry"
|
|
|
- )
|
|
|
-
|
|
|
- # Read compressed path data byte by byte to avoid seeking
|
|
|
- name, consumed = _decompress_path_from_stream(f, previous_path)
|
|
|
+ if name_len == 0:
|
|
|
+ # Version 4: path is compressed
|
|
|
+ name, consumed = _decompress_path_from_stream(f, previous_path)
|
|
|
+ else:
|
|
|
+ # Version 4: uncompressed path (used when compression doesn't help)
|
|
|
+ name = f.read(name_len)
|
|
|
else:
|
|
|
# Versions < 4: regular name reading
|
|
|
name = f.read(flags & FLAG_NAMEMASK)
|
|
@@ -489,11 +571,20 @@ def write_cache_entry(
|
|
|
write_cache_time(f, entry.mtime)
|
|
|
|
|
|
if version >= 4:
|
|
|
- # Version 4: use path compression, set name length to 0
|
|
|
- flags = 0 | (entry.flags & ~FLAG_NAMEMASK)
|
|
|
+ # Version 4: decide whether to use compression
|
|
|
+ compressed_path = _compress_path(entry.name, previous_path)
|
|
|
+ if len(compressed_path) < len(entry.name) + 1: # +1 for NUL in uncompressed
|
|
|
+ # Compression helps, use it (set name length to 0)
|
|
|
+ flags = 0 | (entry.flags & ~FLAG_NAMEMASK)
|
|
|
+ use_compression = True
|
|
|
+ else:
|
|
|
+ # Compression doesn't help, use uncompressed
|
|
|
+ flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
|
|
|
+ use_compression = False
|
|
|
else:
|
|
|
# Versions < 4: include actual name length
|
|
|
flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
|
|
|
+ use_compression = False
|
|
|
|
|
|
if entry.extended_flags:
|
|
|
flags |= FLAG_EXTENDED
|
|
@@ -517,9 +608,12 @@ def write_cache_entry(
|
|
|
f.write(struct.pack(b">H", entry.extended_flags))
|
|
|
|
|
|
if version >= 4:
|
|
|
- # Version 4: write compressed path
|
|
|
- compressed_path = _compress_path(entry.name, previous_path)
|
|
|
- f.write(compressed_path)
|
|
|
+ if use_compression:
|
|
|
+ # Version 4: write compressed path
|
|
|
+ f.write(compressed_path)
|
|
|
+ else:
|
|
|
+ # Version 4: write uncompressed path
|
|
|
+ f.write(entry.name)
|
|
|
else:
|
|
|
# Versions < 4: write regular path and padding
|
|
|
f.write(entry.name)
|
|
@@ -549,6 +643,94 @@ def read_index_header(f: BinaryIO) -> tuple[int, int]:
|
|
|
return version, num_entries
|
|
|
|
|
|
|
|
|
+def read_index_extension(f: BinaryIO) -> tuple[bytes, bytes]:
|
|
|
+ """Read a single index extension.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ f: File-like object to read from
|
|
|
+ Returns:
|
|
|
+ tuple of (signature, data)
|
|
|
+ """
|
|
|
+ header = f.read(4)
|
|
|
+ if len(header) < 4:
|
|
|
+ raise ValueError("Truncated index extension header")
|
|
|
+
|
|
|
+ # Check if this is an extension signature (uppercase letters)
|
|
|
+ if not all(65 <= b <= 90 for b in header): # A-Z
|
|
|
+ # Not an extension, put it back
|
|
|
+ f.seek(-4, 1)
|
|
|
+ raise ValueError("Not an extension signature")
|
|
|
+
|
|
|
+ # Read extension size
|
|
|
+ size_bytes = f.read(4)
|
|
|
+ if len(size_bytes) < 4:
|
|
|
+ raise ValueError("Truncated index extension size")
|
|
|
+
|
|
|
+ (size,) = struct.unpack(">I", size_bytes)
|
|
|
+
|
|
|
+ # Read extension data
|
|
|
+ data = f.read(size)
|
|
|
+ if len(data) < size:
|
|
|
+ raise ValueError("Truncated index extension data")
|
|
|
+
|
|
|
+ return header, data
|
|
|
+
|
|
|
+
|
|
|
+def read_index_extensions(f: BinaryIO) -> list[IndexExtension]:
|
|
|
+ """Read all index extensions until EOF or checksum.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ f: File-like object to read from
|
|
|
+ Returns:
|
|
|
+ List of parsed extensions
|
|
|
+ """
|
|
|
+ extensions = []
|
|
|
+
|
|
|
+ while True:
|
|
|
+ start_pos = f.tell()
|
|
|
+ try:
|
|
|
+ # Try to read 20 bytes to check if it's the SHA1 checksum
|
|
|
+ potential_sha = f.read(20)
|
|
|
+ if len(potential_sha) < 20:
|
|
|
+ # End of file
|
|
|
+ break
|
|
|
+
|
|
|
+ # Check if we've reached the end (20 zero bytes for skipHash or actual SHA)
|
|
|
+ if potential_sha == b"\x00" * 20:
|
|
|
+ # skipHash case - we're done
|
|
|
+ break
|
|
|
+
|
|
|
+ # Check if this looks like a SHA1 (harder to detect, so we'll try to parse as extension first)
|
|
|
+ f.seek(start_pos)
|
|
|
+
|
|
|
+ try:
|
|
|
+ signature, data = read_index_extension(f)
|
|
|
+ extension = IndexExtension.from_raw(signature, data)
|
|
|
+ extensions.append(extension)
|
|
|
+ except ValueError:
|
|
|
+ # Not an extension, assume it's the SHA1
|
|
|
+ break
|
|
|
+
|
|
|
+ except Exception:
|
|
|
+ # Any error, assume we've hit the checksum
|
|
|
+ break
|
|
|
+
|
|
|
+ return extensions
|
|
|
+
|
|
|
+
|
|
|
+def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
|
|
|
+ """Write an index extension.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ f: File-like object to write to
|
|
|
+ extension: Extension to write
|
|
|
+ """
|
|
|
+ data = extension.to_bytes()
|
|
|
+ f.write(extension.signature)
|
|
|
+ f.write(struct.pack(">I", len(data)))
|
|
|
+ f.write(data)
|
|
|
+
|
|
|
+
|
|
|
def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
|
|
|
"""Read an index file, yielding the individual entries."""
|
|
|
version, num_entries = read_index_header(f)
|