|
@@ -55,7 +55,8 @@ class ObjectStoreTests:
|
|
|
store: "BaseObjectStore"
|
|
|
|
|
|
assertEqual: Callable[[object, object], None]
|
|
|
- assertRaises: Callable[[type[Exception], Callable[[], Any]], None]
|
|
|
+ # For type checker purposes - actual implementation supports both styles
|
|
|
+ assertRaises: Callable[..., Any]
|
|
|
assertNotIn: Callable[[object, object], None]
|
|
|
assertNotEqual: Callable[[object, object], None]
|
|
|
assertIn: Callable[[object, object], None]
|
|
@@ -259,10 +260,39 @@ class ObjectStoreTests:
|
|
|
self.assertEqual(
|
|
|
[testobject.id], list(self.store.iter_prefix(testobject.id[:10]))
|
|
|
)
|
|
|
- self.assertEqual(
|
|
|
- [testobject.id], list(self.store.iter_prefix(testobject.id[:4]))
|
|
|
+
|
|
|
+ def test_iterobjects_subset_all_present(self) -> None:
|
|
|
+ """Test iterating over a subset of objects that all exist."""
|
|
|
+ blob1 = make_object(Blob, data=b"blob 1 data")
|
|
|
+ blob2 = make_object(Blob, data=b"blob 2 data")
|
|
|
+ self.store.add_object(blob1)
|
|
|
+ self.store.add_object(blob2)
|
|
|
+
|
|
|
+ objects = list(self.store.iterobjects_subset([blob1.id, blob2.id]))
|
|
|
+ self.assertEqual(2, len(objects))
|
|
|
+ object_ids = set(o.id for o in objects)
|
|
|
+ self.assertEqual(set([blob1.id, blob2.id]), object_ids)
|
|
|
+
|
|
|
+ def test_iterobjects_subset_missing_not_allowed(self) -> None:
|
|
|
+ """Test iterating with missing objects when not allowed."""
|
|
|
+ blob1 = make_object(Blob, data=b"blob 1 data")
|
|
|
+ self.store.add_object(blob1)
|
|
|
+ missing_sha = b"1" * 40
|
|
|
+
|
|
|
+ with self.assertRaises(KeyError):
|
|
|
+ list(self.store.iterobjects_subset([blob1.id, missing_sha]))
|
|
|
+
|
|
|
+ def test_iterobjects_subset_missing_allowed(self) -> None:
|
|
|
+ """Test iterating with missing objects when allowed."""
|
|
|
+ blob1 = make_object(Blob, data=b"blob 1 data")
|
|
|
+ self.store.add_object(blob1)
|
|
|
+ missing_sha = b"1" * 40
|
|
|
+
|
|
|
+ objects = list(
|
|
|
+ self.store.iterobjects_subset([blob1.id, missing_sha], allow_missing=True)
|
|
|
)
|
|
|
- self.assertEqual([testobject.id], list(self.store.iter_prefix(b"")))
|
|
|
+ self.assertEqual(1, len(objects))
|
|
|
+ self.assertEqual(blob1.id, objects[0].id)
|
|
|
|
|
|
def test_iter_prefix_not_found(self) -> None:
|
|
|
self.assertEqual([], list(self.store.iter_prefix(b"1" * 40)))
|