123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- # bundle.py -- Bundle format support
- # Copyright (C) 2020 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Bundle format support."""
- from typing import BinaryIO, Callable, Optional
- from .pack import PackData, write_pack_data
- class Bundle:
- version: Optional[int]
- capabilities: dict[str, Optional[str]]
- prerequisites: list[tuple[bytes, bytes]]
- references: dict[bytes, bytes]
- pack_data: PackData
- def __repr__(self) -> str:
- return (
- f"<{type(self).__name__}(version={self.version}, "
- f"capabilities={self.capabilities}, "
- f"prerequisites={self.prerequisites}, "
- f"references={self.references})>"
- )
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, type(self)):
- return False
- if self.version != other.version:
- return False
- if self.capabilities != other.capabilities:
- return False
- if self.prerequisites != other.prerequisites:
- return False
- if self.references != other.references:
- return False
- if self.pack_data != other.pack_data:
- return False
- return True
- def store_objects(
- self, object_store, progress: Optional[Callable[[str], None]] = None
- ):
- """Store all objects from this bundle into an object store.
- Args:
- object_store: The object store to add objects to
- progress: Optional progress callback function
- """
- from .objects import ShaFile
- count = 0
- for unpacked in self.pack_data.iter_unpacked():
- # Convert the unpacked object to a proper git object
- if unpacked.decomp_chunks:
- git_obj = ShaFile.from_raw_chunks(
- unpacked.obj_type_num, unpacked.decomp_chunks
- )
- object_store.add_object(git_obj)
- count += 1
- if progress and count % 100 == 0:
- progress(f"Stored {count} objects")
- if progress:
- progress(f"Stored {count} objects total")
- def _read_bundle(f: BinaryIO, version: int) -> Bundle:
- capabilities = {}
- prerequisites = []
- references = {}
- line = f.readline()
- if version >= 3:
- while line.startswith(b"@"):
- line = line[1:].rstrip(b"\n")
- try:
- key, value_bytes = line.split(b"=", 1)
- value = value_bytes.decode("utf-8")
- except ValueError:
- key = line
- value = None
- capabilities[key.decode("utf-8")] = value
- line = f.readline()
- while line.startswith(b"-"):
- (obj_id, comment) = line[1:].rstrip(b"\n").split(b" ", 1)
- prerequisites.append((obj_id, comment))
- line = f.readline()
- while line != b"\n":
- (obj_id, ref) = line.rstrip(b"\n").split(b" ", 1)
- references[ref] = obj_id
- line = f.readline()
- # Extract pack data to separate stream since PackData expects
- # the file to start with PACK header at position 0
- pack_bytes = f.read()
- if not pack_bytes:
- raise ValueError("Bundle file contains no pack data")
- from io import BytesIO
- pack_file = BytesIO(pack_bytes)
- pack_data = PackData.from_file(pack_file)
- ret = Bundle()
- ret.references = references
- ret.capabilities = capabilities
- ret.prerequisites = prerequisites
- ret.pack_data = pack_data
- ret.version = version
- return ret
- def read_bundle(f: BinaryIO) -> Bundle:
- """Read a bundle file.
- Args:
- f: A seekable binary file-like object. The file must remain open
- for the lifetime of the returned Bundle object.
- """
- if not hasattr(f, "seek"):
- raise ValueError("Bundle file must be seekable")
- firstline = f.readline()
- if firstline == b"# v2 git bundle\n":
- return _read_bundle(f, 2)
- if firstline == b"# v3 git bundle\n":
- return _read_bundle(f, 3)
- raise AssertionError(f"unsupported bundle format header: {firstline!r}")
- def write_bundle(f: BinaryIO, bundle: Bundle) -> None:
- version = bundle.version
- if version is None:
- if bundle.capabilities:
- version = 3
- else:
- version = 2
- if version == 2:
- f.write(b"# v2 git bundle\n")
- elif version == 3:
- f.write(b"# v3 git bundle\n")
- else:
- raise AssertionError(f"unknown version {version}")
- if version == 3:
- for key, value in bundle.capabilities.items():
- f.write(b"@" + key.encode("utf-8"))
- if value is not None:
- f.write(b"=" + value.encode("utf-8"))
- f.write(b"\n")
- for obj_id, comment in bundle.prerequisites:
- f.write(b"-" + obj_id + b" " + comment + b"\n")
- for ref, obj_id in bundle.references.items():
- f.write(obj_id + b" " + ref + b"\n")
- f.write(b"\n")
- write_pack_data(
- f.write,
- num_records=len(bundle.pack_data),
- records=bundle.pack_data.iter_unpacked(),
- )
- def create_bundle_from_repo(
- repo,
- refs: Optional[list[bytes]] = None,
- prerequisites: Optional[list[bytes]] = None,
- version: Optional[int] = None,
- capabilities: Optional[dict[str, Optional[str]]] = None,
- progress: Optional[Callable[[str], None]] = None,
- ) -> Bundle:
- """Create a bundle from a repository.
- Args:
- repo: Repository object to create bundle from
- refs: List of refs to include (defaults to all refs)
- prerequisites: List of commit SHAs that are prerequisites
- version: Bundle version (2 or 3, auto-detected if None)
- capabilities: Bundle capabilities (for v3 bundles)
- progress: Optional progress reporting function
- Returns:
- Bundle object ready for writing
- """
- if refs is None:
- refs = list(repo.refs.keys())
- if prerequisites is None:
- prerequisites = []
- if capabilities is None:
- capabilities = {}
- # Build the references dictionary for the bundle
- bundle_refs = {}
- want_objects = []
- for ref in refs:
- if ref in repo.refs:
- ref_value = repo.refs[ref]
- # Handle peeled refs
- try:
- peeled_value = repo.refs.get_peeled(ref)
- if peeled_value is not None and peeled_value != ref_value:
- bundle_refs[ref] = peeled_value
- else:
- bundle_refs[ref] = ref_value
- except KeyError:
- bundle_refs[ref] = ref_value
- want_objects.append(bundle_refs[ref])
- # Convert prerequisites to proper format
- bundle_prerequisites = []
- have_objects = []
- for prereq in prerequisites:
- if isinstance(prereq, str):
- prereq = prereq.encode("utf-8")
- if isinstance(prereq, bytes):
- if len(prereq) == 40: # SHA1 hex string
- try:
- # Validate it's actually hex
- bytes.fromhex(prereq.decode("utf-8"))
- # Store hex in bundle and for pack generation
- bundle_prerequisites.append((prereq, b""))
- have_objects.append(prereq)
- except ValueError:
- # Not a valid hex string, invalid prerequisite
- raise ValueError(f"Invalid prerequisite format: {prereq!r}")
- elif len(prereq) == 20:
- # Binary SHA, convert to hex for both bundle and pack generation
- hex_prereq = prereq.hex().encode("ascii")
- bundle_prerequisites.append((hex_prereq, b""))
- have_objects.append(hex_prereq)
- else:
- # Invalid length
- raise ValueError(f"Invalid prerequisite SHA length: {len(prereq)}")
- else:
- # Assume it's already a binary SHA
- hex_prereq = prereq.hex().encode("ascii")
- bundle_prerequisites.append((hex_prereq, b""))
- have_objects.append(hex_prereq)
- # Generate pack data containing all objects needed for the refs
- pack_count, pack_objects = repo.generate_pack_data(
- have=have_objects,
- want=want_objects,
- progress=progress,
- )
- # Store the pack objects directly, we'll write them when saving the bundle
- # For now, create a simple wrapper to hold the data
- class _BundlePackData:
- def __init__(self, count, objects):
- self._count = count
- self._objects = list(objects) # Materialize the iterator
- def __len__(self):
- return self._count
- def iter_unpacked(self):
- return iter(self._objects)
- pack_data = _BundlePackData(pack_count, pack_objects)
- # Create bundle object
- bundle = Bundle()
- bundle.version = version
- bundle.capabilities = capabilities
- bundle.prerequisites = bundle_prerequisites
- bundle.references = bundle_refs
- bundle.pack_data = pack_data # type: ignore[assignment]
- return bundle
|