|
@@ -262,36 +262,87 @@ def parse_filter_spec(spec: str | bytes) -> FilterSpec:
|
|
|
|
|
|
|
|
Raises:
|
|
Raises:
|
|
|
ValueError: If spec is not a valid filter specification
|
|
ValueError: If spec is not a valid filter specification
|
|
|
|
|
+
|
|
|
|
|
+ Examples:
|
|
|
|
|
+ >>> parse_filter_spec("blob:none")
|
|
|
|
|
+ BlobNoneFilter()
|
|
|
|
|
+ >>> parse_filter_spec("blob:limit=1m")
|
|
|
|
|
+ BlobLimitFilter(limit=1048576)
|
|
|
|
|
+ >>> parse_filter_spec("tree:0")
|
|
|
|
|
+ TreeDepthFilter(max_depth=0)
|
|
|
"""
|
|
"""
|
|
|
if isinstance(spec, bytes):
|
|
if isinstance(spec, bytes):
|
|
|
- spec = spec.decode("utf-8")
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ spec = spec.decode("utf-8")
|
|
|
|
|
+ except UnicodeDecodeError as e:
|
|
|
|
|
+ raise ValueError(f"Filter specification must be valid UTF-8: {e}")
|
|
|
|
|
|
|
|
spec = spec.strip()
|
|
spec = spec.strip()
|
|
|
|
|
|
|
|
|
|
+ if not spec:
|
|
|
|
|
+ raise ValueError("Filter specification cannot be empty")
|
|
|
|
|
+
|
|
|
if spec == "blob:none":
|
|
if spec == "blob:none":
|
|
|
return BlobNoneFilter()
|
|
return BlobNoneFilter()
|
|
|
elif spec.startswith("blob:limit="):
|
|
elif spec.startswith("blob:limit="):
|
|
|
limit_str = spec[11:] # len('blob:limit=') == 11
|
|
limit_str = spec[11:] # len('blob:limit=') == 11
|
|
|
- limit = _parse_size(limit_str)
|
|
|
|
|
- return BlobLimitFilter(limit)
|
|
|
|
|
|
|
+ if not limit_str:
|
|
|
|
|
+ raise ValueError("blob:limit requires a size value (e.g., blob:limit=1m)")
|
|
|
|
|
+ try:
|
|
|
|
|
+ limit = _parse_size(limit_str)
|
|
|
|
|
+ if limit < 0:
|
|
|
|
|
+ raise ValueError(f"blob:limit size must be non-negative, got {limit_str}")
|
|
|
|
|
+ return BlobLimitFilter(limit)
|
|
|
|
|
+ except ValueError as e:
|
|
|
|
|
+ raise ValueError(f"Invalid blob:limit specification: {e}")
|
|
|
elif spec.startswith("tree:"):
|
|
elif spec.startswith("tree:"):
|
|
|
depth_str = spec[5:] # len('tree:') == 5
|
|
depth_str = spec[5:] # len('tree:') == 5
|
|
|
|
|
+ if not depth_str:
|
|
|
|
|
+ raise ValueError("tree filter requires a depth value (e.g., tree:0)")
|
|
|
try:
|
|
try:
|
|
|
depth = int(depth_str)
|
|
depth = int(depth_str)
|
|
|
|
|
+ if depth < 0:
|
|
|
|
|
+ raise ValueError(f"tree depth must be non-negative, got {depth}")
|
|
|
return TreeDepthFilter(depth)
|
|
return TreeDepthFilter(depth)
|
|
|
- except ValueError:
|
|
|
|
|
- raise ValueError(f"Invalid tree depth: {depth_str}")
|
|
|
|
|
|
|
+ except ValueError as e:
|
|
|
|
|
+ raise ValueError(f"Invalid tree filter: {e}")
|
|
|
elif spec.startswith("sparse:oid="):
|
|
elif spec.startswith("sparse:oid="):
|
|
|
oid_str = spec[11:] # len('sparse:oid=') == 11
|
|
oid_str = spec[11:] # len('sparse:oid=') == 11
|
|
|
- # Convert to bytes for OID
|
|
|
|
|
- oid = oid_str.encode("ascii")
|
|
|
|
|
|
|
+ if not oid_str:
|
|
|
|
|
+ raise ValueError("sparse:oid requires an object ID (e.g., sparse:oid=abc123...)")
|
|
|
|
|
+ # Validate OID format (should be 40 hex chars for SHA-1 or 64 for SHA-256)
|
|
|
|
|
+ if len(oid_str) not in (40, 64):
|
|
|
|
|
+ raise ValueError(
|
|
|
|
|
+ f"sparse:oid requires a valid object ID (40 or 64 hex chars), got {len(oid_str)} chars"
|
|
|
|
|
+ )
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Convert to bytes and validate hex
|
|
|
|
|
+ oid = oid_str.encode("ascii")
|
|
|
|
|
+ int(oid_str, 16) # Validate it's valid hex
|
|
|
|
|
+ except (ValueError, UnicodeEncodeError):
|
|
|
|
|
+ raise ValueError(f"sparse:oid must be a hexadecimal object ID, got: {oid_str}")
|
|
|
return SparseOidFilter(oid)
|
|
return SparseOidFilter(oid)
|
|
|
elif spec.startswith("combine:"):
|
|
elif spec.startswith("combine:"):
|
|
|
- filter_specs = spec[8:].split("+") # len('combine:') == 8
|
|
|
|
|
- filters = [parse_filter_spec(f) for f in filter_specs]
|
|
|
|
|
|
|
+ filter_str = spec[8:] # len('combine:') == 8
|
|
|
|
|
+ if not filter_str:
|
|
|
|
|
+ raise ValueError("combine filter requires at least one filter (e.g., combine:blob:none+tree:0)")
|
|
|
|
|
+ filter_specs = filter_str.split("+")
|
|
|
|
|
+ if len(filter_specs) < 2:
|
|
|
|
|
+ raise ValueError(
|
|
|
|
|
+ "combine filter requires at least two filters separated by '+'"
|
|
|
|
|
+ )
|
|
|
|
|
+ try:
|
|
|
|
|
+ filters = [parse_filter_spec(f) for f in filter_specs]
|
|
|
|
|
+ except ValueError as e:
|
|
|
|
|
+ raise ValueError(f"Invalid filter in combine specification: {e}")
|
|
|
return CombineFilter(filters)
|
|
return CombineFilter(filters)
|
|
|
else:
|
|
else:
|
|
|
- raise ValueError(f"Unknown filter specification: {spec}")
|
|
|
|
|
|
|
+ # Provide helpful error message with supported formats
|
|
|
|
|
+ raise ValueError(
|
|
|
|
|
+ f"Unknown filter specification: '{spec}'. "
|
|
|
|
|
+ f"Supported formats: blob:none, blob:limit=<n>[kmg], tree:<depth>, "
|
|
|
|
|
+ f"sparse:oid=<oid>, combine:<filter>+<filter>+..."
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
|
|
|
|
|
def filter_pack_objects(
|
|
def filter_pack_objects(
|