|
@@ -163,6 +163,17 @@ class PackedObjectContainer(ObjectContainer):
|
|
|
allow_missing: bool = False,
|
|
allow_missing: bool = False,
|
|
|
convert_ofs_delta: bool = True,
|
|
convert_ofs_delta: bool = True,
|
|
|
) -> Iterator["UnpackedObject"]:
|
|
) -> Iterator["UnpackedObject"]:
|
|
|
|
|
+ """Iterate over unpacked objects from a subset of SHAs.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ shas: Set of object SHAs to retrieve
|
|
|
|
|
+ include_comp: Include compressed data if True
|
|
|
|
|
+ allow_missing: If True, skip missing objects
|
|
|
|
|
+ convert_ofs_delta: If True, convert offset deltas to ref deltas
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Iterator of UnpackedObject instances
|
|
|
|
|
+ """
|
|
|
raise NotImplementedError(self.iter_unpacked_subset)
|
|
raise NotImplementedError(self.iter_unpacked_subset)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1426,18 +1437,45 @@ class PackData:
|
|
|
|
|
|
|
|
@property
|
|
@property
|
|
|
def filename(self):
|
|
def filename(self):
|
|
|
|
|
+ """Get the filename of the pack file.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Base filename without directory path
|
|
|
|
|
+ """
|
|
|
return os.path.basename(self._filename)
|
|
return os.path.basename(self._filename)
|
|
|
|
|
|
|
|
@property
|
|
@property
|
|
|
def path(self):
|
|
def path(self):
|
|
|
|
|
+ """Get the full path of the pack file.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Full path to the pack file
|
|
|
|
|
+ """
|
|
|
return self._filename
|
|
return self._filename
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_file(cls, file, size=None):
|
|
def from_file(cls, file, size=None):
|
|
|
|
|
+ """Create a PackData object from an open file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ file: Open file object
|
|
|
|
|
+ size: Optional file size
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ PackData instance
|
|
|
|
|
+ """
|
|
|
return cls(str(file), file=file, size=size)
|
|
return cls(str(file), file=file, size=size)
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_path(cls, path: Union[str, os.PathLike]):
|
|
def from_path(cls, path: Union[str, os.PathLike]):
|
|
|
|
|
+ """Create a PackData object from a file path.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ path: Path to the pack file
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ PackData instance
|
|
|
|
|
+ """
|
|
|
return cls(filename=path)
|
|
return cls(filename=path)
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
def close(self) -> None:
|
|
@@ -1689,6 +1727,15 @@ class DeltaChainIterator(Generic[T]):
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def for_pack_data(cls, pack_data: PackData, resolve_ext_ref=None):
|
|
def for_pack_data(cls, pack_data: PackData, resolve_ext_ref=None):
|
|
|
|
|
+ """Create a DeltaChainIterator from pack data.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ pack_data: PackData object to iterate
|
|
|
|
|
+ resolve_ext_ref: Optional function to resolve external refs
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ DeltaChainIterator instance
|
|
|
|
|
+ """
|
|
|
walker = cls(None, resolve_ext_ref=resolve_ext_ref)
|
|
walker = cls(None, resolve_ext_ref=resolve_ext_ref)
|
|
|
walker.set_pack_data(pack_data)
|
|
walker.set_pack_data(pack_data)
|
|
|
for unpacked in pack_data.iter_unpacked(include_comp=False):
|
|
for unpacked in pack_data.iter_unpacked(include_comp=False):
|
|
@@ -1704,6 +1751,17 @@ class DeltaChainIterator(Generic[T]):
|
|
|
allow_missing: bool = False,
|
|
allow_missing: bool = False,
|
|
|
resolve_ext_ref=None,
|
|
resolve_ext_ref=None,
|
|
|
):
|
|
):
|
|
|
|
|
+ """Create a DeltaChainIterator for a subset of objects.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ pack: Pack object containing the data
|
|
|
|
|
+ shas: Iterable of object SHAs to include
|
|
|
|
|
+ allow_missing: If True, skip missing objects
|
|
|
|
|
+ resolve_ext_ref: Optional function to resolve external refs
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ DeltaChainIterator instance
|
|
|
|
|
+ """
|
|
|
walker = cls(None, resolve_ext_ref=resolve_ext_ref)
|
|
walker = cls(None, resolve_ext_ref=resolve_ext_ref)
|
|
|
walker.set_pack_data(pack.data)
|
|
walker.set_pack_data(pack.data)
|
|
|
todo = set()
|
|
todo = set()
|
|
@@ -1737,6 +1795,11 @@ class DeltaChainIterator(Generic[T]):
|
|
|
return walker
|
|
return walker
|
|
|
|
|
|
|
|
def record(self, unpacked: UnpackedObject) -> None:
|
|
def record(self, unpacked: UnpackedObject) -> None:
|
|
|
|
|
+ """Record an unpacked object for later processing.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ unpacked: UnpackedObject to record
|
|
|
|
|
+ """
|
|
|
type_num = unpacked.pack_type_num
|
|
type_num = unpacked.pack_type_num
|
|
|
offset = unpacked.offset
|
|
offset = unpacked.offset
|
|
|
assert offset is not None
|
|
assert offset is not None
|
|
@@ -1752,6 +1815,11 @@ class DeltaChainIterator(Generic[T]):
|
|
|
self._full_ofs.append((offset, type_num))
|
|
self._full_ofs.append((offset, type_num))
|
|
|
|
|
|
|
|
def set_pack_data(self, pack_data: PackData) -> None:
|
|
def set_pack_data(self, pack_data: PackData) -> None:
|
|
|
|
|
+ """Set the pack data for iteration.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ pack_data: PackData object to use
|
|
|
|
|
+ """
|
|
|
self._file = pack_data._file
|
|
self._file = pack_data._file
|
|
|
|
|
|
|
|
def _walk_all_chains(self):
|
|
def _walk_all_chains(self):
|
|
@@ -1837,6 +1905,14 @@ class UnpackedObjectIterator(DeltaChainIterator[UnpackedObject]):
|
|
|
"""Delta chain iterator that yield unpacked objects."""
|
|
"""Delta chain iterator that yield unpacked objects."""
|
|
|
|
|
|
|
|
def _result(self, unpacked):
|
|
def _result(self, unpacked):
|
|
|
|
|
+ """Return the unpacked object.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ unpacked: The unpacked object
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ The unpacked object unchanged
|
|
|
|
|
+ """
|
|
|
return unpacked
|
|
return unpacked
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1846,6 +1922,14 @@ class PackIndexer(DeltaChainIterator[PackIndexEntry]):
|
|
|
_compute_crc32 = True
|
|
_compute_crc32 = True
|
|
|
|
|
|
|
|
def _result(self, unpacked):
|
|
def _result(self, unpacked):
|
|
|
|
|
+ """Convert unpacked object to pack index entry.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ unpacked: The unpacked object
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Tuple of (sha, offset, crc32) for index entry
|
|
|
|
|
+ """
|
|
|
return unpacked.sha(), unpacked.offset, unpacked.crc32
|
|
return unpacked.sha(), unpacked.offset, unpacked.crc32
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1853,6 +1937,14 @@ class PackInflater(DeltaChainIterator[ShaFile]):
|
|
|
"""Delta chain iterator that yields ShaFile objects."""
|
|
"""Delta chain iterator that yields ShaFile objects."""
|
|
|
|
|
|
|
|
def _result(self, unpacked):
|
|
def _result(self, unpacked):
|
|
|
|
|
+ """Convert unpacked object to ShaFile.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ unpacked: The unpacked object
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ ShaFile object from the unpacked data
|
|
|
|
|
+ """
|
|
|
return unpacked.sha_file()
|
|
return unpacked.sha_file()
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1860,15 +1952,36 @@ class SHA1Reader(BinaryIO):
|
|
|
"""Wrapper for file-like object that remembers the SHA1 of its data."""
|
|
"""Wrapper for file-like object that remembers the SHA1 of its data."""
|
|
|
|
|
|
|
|
def __init__(self, f) -> None:
|
|
def __init__(self, f) -> None:
|
|
|
|
|
+ """Initialize SHA1Reader.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ f: File-like object to wrap
|
|
|
|
|
+ """
|
|
|
self.f = f
|
|
self.f = f
|
|
|
self.sha1 = sha1(b"")
|
|
self.sha1 = sha1(b"")
|
|
|
|
|
|
|
|
def read(self, size: int = -1) -> bytes:
|
|
def read(self, size: int = -1) -> bytes:
|
|
|
|
|
+ """Read bytes and update SHA1.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ size: Number of bytes to read, -1 for all
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Bytes read from file
|
|
|
|
|
+ """
|
|
|
data = self.f.read(size)
|
|
data = self.f.read(size)
|
|
|
self.sha1.update(data)
|
|
self.sha1.update(data)
|
|
|
return data
|
|
return data
|
|
|
|
|
|
|
|
def check_sha(self, allow_empty: bool = False) -> None:
|
|
def check_sha(self, allow_empty: bool = False) -> None:
|
|
|
|
|
+ """Check if the SHA1 matches the expected value.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ allow_empty: Allow empty SHA1 hash
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ ChecksumMismatch: If SHA1 doesn't match
|
|
|
|
|
+ """
|
|
|
stored = self.f.read(20)
|
|
stored = self.f.read(20)
|
|
|
# If git option index.skipHash is set the index will be empty
|
|
# If git option index.skipHash is set the index will be empty
|
|
|
if stored != self.sha1.digest() and (
|
|
if stored != self.sha1.digest() and (
|
|
@@ -1878,62 +1991,121 @@ class SHA1Reader(BinaryIO):
|
|
|
raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored))
|
|
raise ChecksumMismatch(self.sha1.hexdigest(), sha_to_hex(stored))
|
|
|
|
|
|
|
|
def close(self):
|
|
def close(self):
|
|
|
|
|
+ """Close the underlying file."""
|
|
|
return self.f.close()
|
|
return self.f.close()
|
|
|
|
|
|
|
|
def tell(self) -> int:
|
|
def tell(self) -> int:
|
|
|
|
|
+ """Return current file position."""
|
|
|
return self.f.tell()
|
|
return self.f.tell()
|
|
|
|
|
|
|
|
# BinaryIO abstract methods
|
|
# BinaryIO abstract methods
|
|
|
def readable(self) -> bool:
|
|
def readable(self) -> bool:
|
|
|
|
|
+ """Check if file is readable."""
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
def writable(self) -> bool:
|
|
def writable(self) -> bool:
|
|
|
|
|
+ """Check if file is writable."""
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
def seekable(self) -> bool:
|
|
def seekable(self) -> bool:
|
|
|
|
|
+ """Check if file is seekable."""
|
|
|
return getattr(self.f, "seekable", lambda: False)()
|
|
return getattr(self.f, "seekable", lambda: False)()
|
|
|
|
|
|
|
|
def seek(self, offset: int, whence: int = 0) -> int:
|
|
def seek(self, offset: int, whence: int = 0) -> int:
|
|
|
|
|
+ """Seek to position in file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ offset: Position offset
|
|
|
|
|
+ whence: Reference point (0=start, 1=current, 2=end)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ New file position
|
|
|
|
|
+ """
|
|
|
return self.f.seek(offset, whence)
|
|
return self.f.seek(offset, whence)
|
|
|
|
|
|
|
|
def flush(self) -> None:
|
|
def flush(self) -> None:
|
|
|
|
|
+ """Flush the file buffer."""
|
|
|
if hasattr(self.f, "flush"):
|
|
if hasattr(self.f, "flush"):
|
|
|
self.f.flush()
|
|
self.f.flush()
|
|
|
|
|
|
|
|
def readline(self, size: int = -1) -> bytes:
|
|
def readline(self, size: int = -1) -> bytes:
|
|
|
|
|
+ """Read a line from the file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ size: Maximum bytes to read
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Line read from file
|
|
|
|
|
+ """
|
|
|
return self.f.readline(size)
|
|
return self.f.readline(size)
|
|
|
|
|
|
|
|
def readlines(self, hint: int = -1) -> list[bytes]:
|
|
def readlines(self, hint: int = -1) -> list[bytes]:
|
|
|
|
|
+ """Read all lines from the file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ hint: Approximate number of bytes to read
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ List of lines
|
|
|
|
|
+ """
|
|
|
return self.f.readlines(hint)
|
|
return self.f.readlines(hint)
|
|
|
|
|
|
|
|
def writelines(self, lines) -> None:
|
|
def writelines(self, lines) -> None:
|
|
|
|
|
+ """Not supported for read-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("writelines")
|
|
raise UnsupportedOperation("writelines")
|
|
|
|
|
|
|
|
def write(self, data) -> int:
|
|
def write(self, data) -> int:
|
|
|
|
|
+ """Not supported for read-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("write")
|
|
raise UnsupportedOperation("write")
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
def __enter__(self):
|
|
|
|
|
+ """Enter context manager."""
|
|
|
return self
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
def __exit__(self, type, value, traceback):
|
|
|
|
|
+ """Exit context manager and close file."""
|
|
|
self.close()
|
|
self.close()
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
def __iter__(self):
|
|
|
|
|
+ """Return iterator over lines."""
|
|
|
return self
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self) -> bytes:
|
|
def __next__(self) -> bytes:
|
|
|
|
|
+ """Get next line from file.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Next line
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ StopIteration: When no more lines
|
|
|
|
|
+ """
|
|
|
line = self.readline()
|
|
line = self.readline()
|
|
|
if not line:
|
|
if not line:
|
|
|
raise StopIteration
|
|
raise StopIteration
|
|
|
return line
|
|
return line
|
|
|
|
|
|
|
|
def fileno(self) -> int:
|
|
def fileno(self) -> int:
|
|
|
|
|
+ """Return file descriptor number."""
|
|
|
return self.f.fileno()
|
|
return self.f.fileno()
|
|
|
|
|
|
|
|
def isatty(self) -> bool:
|
|
def isatty(self) -> bool:
|
|
|
|
|
+ """Check if file is a terminal."""
|
|
|
return getattr(self.f, "isatty", lambda: False)()
|
|
return getattr(self.f, "isatty", lambda: False)()
|
|
|
|
|
|
|
|
def truncate(self, size: Optional[int] = None) -> int:
|
|
def truncate(self, size: Optional[int] = None) -> int:
|
|
|
|
|
+ """Not supported for read-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("truncate")
|
|
raise UnsupportedOperation("truncate")
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1941,17 +2113,35 @@ class SHA1Writer(BinaryIO):
|
|
|
"""Wrapper for file-like object that remembers the SHA1 of its data."""
|
|
"""Wrapper for file-like object that remembers the SHA1 of its data."""
|
|
|
|
|
|
|
|
def __init__(self, f) -> None:
|
|
def __init__(self, f) -> None:
|
|
|
|
|
+ """Initialize SHA1Writer.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ f: File-like object to wrap
|
|
|
|
|
+ """
|
|
|
self.f = f
|
|
self.f = f
|
|
|
self.length = 0
|
|
self.length = 0
|
|
|
self.sha1 = sha1(b"")
|
|
self.sha1 = sha1(b"")
|
|
|
|
|
|
|
|
def write(self, data) -> int:
|
|
def write(self, data) -> int:
|
|
|
|
|
+ """Write data and update SHA1.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ data: Data to write
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Number of bytes written
|
|
|
|
|
+ """
|
|
|
self.sha1.update(data)
|
|
self.sha1.update(data)
|
|
|
self.f.write(data)
|
|
self.f.write(data)
|
|
|
self.length += len(data)
|
|
self.length += len(data)
|
|
|
return len(data)
|
|
return len(data)
|
|
|
|
|
|
|
|
def write_sha(self):
|
|
def write_sha(self):
|
|
|
|
|
+ """Write the SHA1 digest to the file.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ The SHA1 digest bytes
|
|
|
|
|
+ """
|
|
|
sha = self.sha1.digest()
|
|
sha = self.sha1.digest()
|
|
|
assert len(sha) == 20
|
|
assert len(sha) == 20
|
|
|
self.f.write(sha)
|
|
self.f.write(sha)
|
|
@@ -1959,65 +2149,124 @@ class SHA1Writer(BinaryIO):
|
|
|
return sha
|
|
return sha
|
|
|
|
|
|
|
|
def close(self):
|
|
def close(self):
|
|
|
|
|
+ """Close the file after writing SHA1.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ The SHA1 digest bytes
|
|
|
|
|
+ """
|
|
|
sha = self.write_sha()
|
|
sha = self.write_sha()
|
|
|
self.f.close()
|
|
self.f.close()
|
|
|
return sha
|
|
return sha
|
|
|
|
|
|
|
|
def offset(self):
|
|
def offset(self):
|
|
|
|
|
+ """Get the total number of bytes written.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Total bytes written
|
|
|
|
|
+ """
|
|
|
return self.length
|
|
return self.length
|
|
|
|
|
|
|
|
def tell(self) -> int:
|
|
def tell(self) -> int:
|
|
|
|
|
+ """Return current file position."""
|
|
|
return self.f.tell()
|
|
return self.f.tell()
|
|
|
|
|
|
|
|
# BinaryIO abstract methods
|
|
# BinaryIO abstract methods
|
|
|
def readable(self) -> bool:
|
|
def readable(self) -> bool:
|
|
|
|
|
+ """Check if file is readable."""
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
def writable(self) -> bool:
|
|
def writable(self) -> bool:
|
|
|
|
|
+ """Check if file is writable."""
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
def seekable(self) -> bool:
|
|
def seekable(self) -> bool:
|
|
|
|
|
+ """Check if file is seekable."""
|
|
|
return getattr(self.f, "seekable", lambda: False)()
|
|
return getattr(self.f, "seekable", lambda: False)()
|
|
|
|
|
|
|
|
def seek(self, offset: int, whence: int = 0) -> int:
|
|
def seek(self, offset: int, whence: int = 0) -> int:
|
|
|
|
|
+ """Seek to position in file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ offset: Position offset
|
|
|
|
|
+ whence: Reference point (0=start, 1=current, 2=end)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ New file position
|
|
|
|
|
+ """
|
|
|
return self.f.seek(offset, whence)
|
|
return self.f.seek(offset, whence)
|
|
|
|
|
|
|
|
def flush(self) -> None:
|
|
def flush(self) -> None:
|
|
|
|
|
+ """Flush the file buffer."""
|
|
|
if hasattr(self.f, "flush"):
|
|
if hasattr(self.f, "flush"):
|
|
|
self.f.flush()
|
|
self.f.flush()
|
|
|
|
|
|
|
|
def readline(self, size: int = -1) -> bytes:
|
|
def readline(self, size: int = -1) -> bytes:
|
|
|
|
|
+ """Not supported for write-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("readline")
|
|
raise UnsupportedOperation("readline")
|
|
|
|
|
|
|
|
def readlines(self, hint: int = -1) -> list[bytes]:
|
|
def readlines(self, hint: int = -1) -> list[bytes]:
|
|
|
|
|
+ """Not supported for write-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("readlines")
|
|
raise UnsupportedOperation("readlines")
|
|
|
|
|
|
|
|
def writelines(self, lines) -> None:
|
|
def writelines(self, lines) -> None:
|
|
|
|
|
+ """Write multiple lines to the file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ lines: Iterable of lines to write
|
|
|
|
|
+ """
|
|
|
for line in lines:
|
|
for line in lines:
|
|
|
self.write(line)
|
|
self.write(line)
|
|
|
|
|
|
|
|
def read(self, size: int = -1) -> bytes:
|
|
def read(self, size: int = -1) -> bytes:
|
|
|
|
|
+ """Not supported for write-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("read")
|
|
raise UnsupportedOperation("read")
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
def __enter__(self):
|
|
|
|
|
+ """Enter context manager."""
|
|
|
return self
|
|
return self
|
|
|
|
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
def __exit__(self, type, value, traceback):
|
|
|
|
|
+ """Exit context manager and close file."""
|
|
|
self.close()
|
|
self.close()
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
def __iter__(self):
|
|
|
|
|
+ """Return iterator."""
|
|
|
return self
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self) -> bytes:
|
|
def __next__(self) -> bytes:
|
|
|
|
|
+ """Not supported for write-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("__next__")
|
|
raise UnsupportedOperation("__next__")
|
|
|
|
|
|
|
|
def fileno(self) -> int:
|
|
def fileno(self) -> int:
|
|
|
|
|
+ """Return file descriptor number."""
|
|
|
return self.f.fileno()
|
|
return self.f.fileno()
|
|
|
|
|
|
|
|
def isatty(self) -> bool:
|
|
def isatty(self) -> bool:
|
|
|
|
|
+ """Check if file is a terminal."""
|
|
|
return getattr(self.f, "isatty", lambda: False)()
|
|
return getattr(self.f, "isatty", lambda: False)()
|
|
|
|
|
|
|
|
def truncate(self, size: Optional[int] = None) -> int:
|
|
def truncate(self, size: Optional[int] = None) -> int:
|
|
|
|
|
+ """Not supported for write-only file.
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ UnsupportedOperation: Always raised
|
|
|
|
|
+ """
|
|
|
raise UnsupportedOperation("truncate")
|
|
raise UnsupportedOperation("truncate")
|
|
|
|
|
|
|
|
|
|
|
|
@@ -2341,6 +2590,14 @@ def generate_unpacked_objects(
|
|
|
|
|
|
|
|
|
|
|
|
|
def full_unpacked_object(o: ShaFile) -> UnpackedObject:
|
|
def full_unpacked_object(o: ShaFile) -> UnpackedObject:
|
|
|
|
|
+ """Create an UnpackedObject from a ShaFile.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ o: ShaFile object to convert
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ UnpackedObject with full object data
|
|
|
|
|
+ """
|
|
|
return UnpackedObject(
|
|
return UnpackedObject(
|
|
|
o.type_num,
|
|
o.type_num,
|
|
|
delta_base=None,
|
|
delta_base=None,
|
|
@@ -2419,6 +2676,8 @@ def write_pack_objects(
|
|
|
|
|
|
|
|
|
|
|
|
|
class PackChunkGenerator:
|
|
class PackChunkGenerator:
|
|
|
|
|
+ """Generator for pack data chunks."""
|
|
|
|
|
+
|
|
|
def __init__(
|
|
def __init__(
|
|
|
self,
|
|
self,
|
|
|
num_records=None,
|
|
num_records=None,
|