|
@@ -21,18 +21,33 @@
|
|
"""Bundle format support.
|
|
"""Bundle format support.
|
|
"""
|
|
"""
|
|
|
|
|
|
-from .pack import PackData
|
|
|
|
-from typing import Dict, List, Tuple
|
|
|
|
|
|
+from typing import Dict, List, Tuple, Optional, Union, Sequence
|
|
|
|
+from .pack import PackData, write_pack_data
|
|
|
|
|
|
|
|
|
|
class Bundle(object):
|
|
class Bundle(object):
|
|
|
|
|
|
- version: int
|
|
|
|
|
|
+ version: Optional[int] = None
|
|
|
|
|
|
- capabilities: Dict[str, str]
|
|
|
|
- prerequisites: List[Tuple[bytes, str]]
|
|
|
|
- references: Dict[str, bytes]
|
|
|
|
- pack_data: PackData
|
|
|
|
|
|
+ capabilities: Dict[str, str] = {}
|
|
|
|
+ prerequisites: List[Tuple[bytes, str]] = []
|
|
|
|
+ references: Dict[str, bytes] = {}
|
|
|
|
+ pack_data: Union[PackData, Sequence[bytes]] = []
|
|
|
|
+
|
|
|
|
+ def __eq__(self, other):
|
|
|
|
+ if not isinstance(other, type(self)):
|
|
|
|
+ return False
|
|
|
|
+ if self.version != other.version:
|
|
|
|
+ return False
|
|
|
|
+ if self.capabilities != other.capabilities:
|
|
|
|
+ return False
|
|
|
|
+ if self.prerequisites != other.prerequisites:
|
|
|
|
+ return False
|
|
|
|
+ if self.references != other.references:
|
|
|
|
+ return False
|
|
|
|
+ if self.pack_data != other.pack_data:
|
|
|
|
+ return False
|
|
|
|
+ return True
|
|
|
|
|
|
|
|
|
|
def _read_bundle(f, version):
|
|
def _read_bundle(f, version):
|
|
@@ -45,13 +60,15 @@ def _read_bundle(f, version):
|
|
line = line[1:].rstrip(b'\n')
|
|
line = line[1:].rstrip(b'\n')
|
|
try:
|
|
try:
|
|
key, value = line.split(b'=', 1)
|
|
key, value = line.split(b'=', 1)
|
|
- except IndexError:
|
|
|
|
|
|
+ except ValueError:
|
|
key = line
|
|
key = line
|
|
value = None
|
|
value = None
|
|
- capabilities[key] = value
|
|
|
|
|
|
+ else:
|
|
|
|
+ value = value.decode('utf-8')
|
|
|
|
+ capabilities[key.decode('utf-8')] = value
|
|
line = f.readline()
|
|
line = f.readline()
|
|
while line.startswith(b'-'):
|
|
while line.startswith(b'-'):
|
|
- (obj_id, comment) = line[1:].split(b' ', 1)
|
|
|
|
|
|
+ (obj_id, comment) = line[1:].rstrip(b'\n').split(b' ', 1)
|
|
prerequisites.append((obj_id, comment.decode('utf-8')))
|
|
prerequisites.append((obj_id, comment.decode('utf-8')))
|
|
line = f.readline()
|
|
line = f.readline()
|
|
while line != b'\n':
|
|
while line != b'\n':
|
|
@@ -64,6 +81,7 @@ def _read_bundle(f, version):
|
|
ret.capabilities = capabilities
|
|
ret.capabilities = capabilities
|
|
ret.prerequisites = prerequisites
|
|
ret.prerequisites = prerequisites
|
|
ret.pack_data = pack_data
|
|
ret.pack_data = pack_data
|
|
|
|
+ ret.version = version
|
|
return ret
|
|
return ret
|
|
|
|
|
|
|
|
|
|
@@ -76,3 +94,30 @@ def read_bundle(f):
|
|
return _read_bundle(f, 3)
|
|
return _read_bundle(f, 3)
|
|
raise AssertionError(
|
|
raise AssertionError(
|
|
'unsupported bundle format header: %r' % firstline)
|
|
'unsupported bundle format header: %r' % firstline)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def write_bundle(f, bundle):
|
|
|
|
+ version = bundle.version
|
|
|
|
+ if version is None:
|
|
|
|
+ if bundle.capabilities:
|
|
|
|
+ version = 3
|
|
|
|
+ else:
|
|
|
|
+ version = 2
|
|
|
|
+ if version == 2:
|
|
|
|
+ f.write(b'# v2 git bundle\n')
|
|
|
|
+ elif version == 3:
|
|
|
|
+ f.write(b'# v3 git bundle\n')
|
|
|
|
+ else:
|
|
|
|
+ raise AssertionError('unknown version %d' % version)
|
|
|
|
+ if version == 3:
|
|
|
|
+ for key, value in bundle.capabilities.items():
|
|
|
|
+ f.write(b'@' + key.encode('utf-8'))
|
|
|
|
+ if value is not None:
|
|
|
|
+ f.write(b'=' + value.encode('utf-8'))
|
|
|
|
+ f.write(b'\n')
|
|
|
|
+ for (obj_id, comment) in bundle.prerequisites:
|
|
|
|
+ f.write(b'-%s %s\n' % (obj_id, comment.encode('utf-8')))
|
|
|
|
+ for ref, obj_id in bundle.references.items():
|
|
|
|
+ f.write(b'%s %s\n' % (obj_id, ref))
|
|
|
|
+ f.write(b'\n')
|
|
|
|
+ write_pack_data(f, len(bundle.pack_data), iter(bundle.pack_data))
|