Jelmer Vernooij 2 месяцев назад
Родитель
Сommit
2fbdf9851b
4 измененных файлов с 324 добавлено и 0 удалено
  1. 4 0
      NEWS
  2. 4 0
      dulwich/cli.py
  3. 167 0
      dulwich/refs.py
  4. 149 0
      tests/test_refs.py

+ 4 - 0
NEWS

@@ -1,5 +1,9 @@
 0.24.8	UNRELEASED
 
+ * Add support for namespace isolation via ``NamespacedRefsContainer``.
+   Implements Git's namespace feature for isolating refs within a single
+   repository using the ``refs/namespaces/`` prefix. (Jelmer Vernooij, #1809)
+
 0.24.7	2025-10-23
 
  * Add sparse index support for improved performance with large repositories.

+ 4 - 0
dulwich/cli.py

@@ -28,6 +28,10 @@ no means intended to be a full-blown Git command-line interface but just
 a way to test Dulwich.
 """
 
+# TODO: Add support for GIT_NAMESPACE environment variable by wrapping
+# repository refs with NamespacedRefsContainer when the environment
+# variable is set. See issue #1809 and dulwich.refs.NamespacedRefsContainer.
+
 import argparse
 import io
 import logging

+ 167 - 0
dulwich/refs.py

@@ -1872,6 +1872,173 @@ class locked_ref:
         self._deleted = True
 
 
+class NamespacedRefsContainer(RefsContainer):
+    """Wrapper that adds namespace prefix to all ref operations.
+
+    This implements Git's GIT_NAMESPACE feature, which stores refs under
+    refs/namespaces/<namespace>/ and filters operations to only show refs
+    within that namespace.
+
+    Example:
+        With namespace "foo", a ref "refs/heads/master" is stored as
+        "refs/namespaces/foo/refs/heads/master" in the underlying container.
+    """
+
+    def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
+        """Initialize NamespacedRefsContainer.
+
+        Args:
+          refs: The underlying refs container to wrap
+          namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
+        """
+        super().__init__(logger=refs._logger)
+        self._refs = refs
+        # Build namespace prefix: refs/namespaces/<namespace>/
+        # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
+        namespace_parts = namespace.split(b"/")
+        self._namespace_prefix = b""
+        for part in namespace_parts:
+            self._namespace_prefix += b"refs/namespaces/" + part + b"/"
+
+    def _apply_namespace(self, name: bytes) -> bytes:
+        """Apply namespace prefix to a ref name."""
+        # HEAD and other special refs are not namespaced
+        if name == HEADREF or not name.startswith(b"refs/"):
+            return name
+        return self._namespace_prefix + name
+
+    def _strip_namespace(self, name: bytes) -> Optional[bytes]:
+        """Remove namespace prefix from a ref name.
+
+        Returns None if the ref is not in our namespace.
+        """
+        # HEAD and other special refs are not namespaced
+        if name == HEADREF or not name.startswith(b"refs/"):
+            return name
+        if name.startswith(self._namespace_prefix):
+            return name[len(self._namespace_prefix) :]
+        return None
+
+    def allkeys(self) -> set[bytes]:
+        """Return all reference keys in this namespace."""
+        keys = set()
+        for key in self._refs.allkeys():
+            stripped = self._strip_namespace(key)
+            if stripped is not None:
+                keys.add(stripped)
+        return keys
+
+    def read_loose_ref(self, name: bytes) -> Optional[bytes]:
+        """Read a loose reference."""
+        return self._refs.read_loose_ref(self._apply_namespace(name))
+
+    def get_packed_refs(self) -> dict[Ref, ObjectID]:
+        """Get packed refs within this namespace."""
+        packed = {}
+        for name, value in self._refs.get_packed_refs().items():
+            stripped = self._strip_namespace(name)
+            if stripped is not None:
+                packed[stripped] = value
+        return packed
+
+    def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
+        """Add packed refs with namespace prefix."""
+        namespaced_refs = {
+            self._apply_namespace(name): value for name, value in new_refs.items()
+        }
+        self._refs.add_packed_refs(namespaced_refs)
+
+    def get_peeled(self, name: bytes) -> Optional[ObjectID]:
+        """Return the cached peeled value of a ref."""
+        return self._refs.get_peeled(self._apply_namespace(name))
+
+    def set_symbolic_ref(
+        self,
+        name: bytes,
+        other: bytes,
+        committer: Optional[bytes] = None,
+        timestamp: Optional[int] = None,
+        timezone: Optional[int] = None,
+        message: Optional[bytes] = None,
+    ) -> None:
+        """Make a ref point at another ref."""
+        self._refs.set_symbolic_ref(
+            self._apply_namespace(name),
+            self._apply_namespace(other),
+            committer=committer,
+            timestamp=timestamp,
+            timezone=timezone,
+            message=message,
+        )
+
+    def set_if_equals(
+        self,
+        name: bytes,
+        old_ref: Optional[bytes],
+        new_ref: bytes,
+        committer: Optional[bytes] = None,
+        timestamp: Optional[int] = None,
+        timezone: Optional[int] = None,
+        message: Optional[bytes] = None,
+    ) -> bool:
+        """Set a refname to new_ref only if it currently equals old_ref."""
+        return self._refs.set_if_equals(
+            self._apply_namespace(name),
+            old_ref,
+            new_ref,
+            committer=committer,
+            timestamp=timestamp,
+            timezone=timezone,
+            message=message,
+        )
+
+    def add_if_new(
+        self,
+        name: bytes,
+        ref: bytes,
+        committer: Optional[bytes] = None,
+        timestamp: Optional[int] = None,
+        timezone: Optional[int] = None,
+        message: Optional[bytes] = None,
+    ) -> bool:
+        """Add a new reference only if it does not already exist."""
+        return self._refs.add_if_new(
+            self._apply_namespace(name),
+            ref,
+            committer=committer,
+            timestamp=timestamp,
+            timezone=timezone,
+            message=message,
+        )
+
+    def remove_if_equals(
+        self,
+        name: bytes,
+        old_ref: Optional[bytes],
+        committer: Optional[bytes] = None,
+        timestamp: Optional[int] = None,
+        timezone: Optional[int] = None,
+        message: Optional[bytes] = None,
+    ) -> bool:
+        """Remove a refname only if it currently equals old_ref."""
+        return self._refs.remove_if_equals(
+            self._apply_namespace(name),
+            old_ref,
+            committer=committer,
+            timestamp=timestamp,
+            timezone=timezone,
+            message=message,
+        )
+
+    def pack_refs(self, all: bool = False) -> None:
+        """Pack loose refs into packed-refs file.
+
+        Note: This packs all refs in the underlying container, not just
+        those in the namespace.
+        """
+        self._refs.pack_refs(all=all)
+
+
 def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
     """Filter refs to only include those with a given prefix.
 

+ 149 - 0
tests/test_refs.py

@@ -33,6 +33,7 @@ from dulwich.objects import ZERO_SHA
 from dulwich.refs import (
     DictRefsContainer,
     InfoRefsContainer,
+    NamespacedRefsContainer,
     SymrefLoop,
     _split_ref_line,
     check_ref_format,
@@ -1301,3 +1302,151 @@ class RefUtilityFunctionsTests(TestCase):
 
         with self.assertRaises(ValueError):
             extract_tag_name(b"v1.0")
+
+
+class NamespacedRefsContainerTests(TestCase):
+    """Tests for NamespacedRefsContainer."""
+
+    def setUp(self) -> None:
+        TestCase.setUp(self)
+        # Create an underlying refs container
+        self._underlying_refs = DictRefsContainer(dict(_TEST_REFS))
+        # Create a namespaced view
+        self._refs = NamespacedRefsContainer(self._underlying_refs, b"foo")
+
+    def test_namespace_prefix_simple(self) -> None:
+        """Test simple namespace prefix."""
+        refs = NamespacedRefsContainer(self._underlying_refs, b"foo")
+        self.assertEqual(b"refs/namespaces/foo/", refs._namespace_prefix)
+
+    def test_namespace_prefix_nested(self) -> None:
+        """Test nested namespace prefix."""
+        refs = NamespacedRefsContainer(self._underlying_refs, b"foo/bar")
+        self.assertEqual(
+            b"refs/namespaces/foo/refs/namespaces/bar/", refs._namespace_prefix
+        )
+
+    def test_allkeys_empty_namespace(self) -> None:
+        """Test that newly created namespace has no refs except HEAD."""
+        # HEAD is shared across namespaces, so it appears even in empty namespace
+        self.assertEqual({b"HEAD"}, self._refs.allkeys())
+
+    def test_setitem_and_getitem(self) -> None:
+        """Test setting and getting refs in namespace."""
+        sha = b"9" * 40
+        self._refs[b"refs/heads/master"] = sha
+        self.assertEqual(sha, self._refs[b"refs/heads/master"])
+
+        # Verify it's stored with the namespace prefix in underlying container
+        self.assertIn(
+            b"refs/namespaces/foo/refs/heads/master", self._underlying_refs.allkeys()
+        )
+        self.assertEqual(
+            sha, self._underlying_refs[b"refs/namespaces/foo/refs/heads/master"]
+        )
+
+    def test_head_not_namespaced(self) -> None:
+        """Test that HEAD is not namespaced."""
+        sha = b"a" * 40
+        self._refs[b"HEAD"] = sha
+        self.assertEqual(sha, self._refs[b"HEAD"])
+
+        # HEAD should be directly in the underlying container, not namespaced
+        self.assertIn(b"HEAD", self._underlying_refs.allkeys())
+        self.assertNotIn(b"refs/namespaces/foo/HEAD", self._underlying_refs.allkeys())
+
+    def test_isolation_between_namespaces(self) -> None:
+        """Test that different namespaces are isolated."""
+        sha1 = b"a" * 40
+        sha2 = b"b" * 40
+
+        # Create two different namespaces
+        refs_foo = NamespacedRefsContainer(self._underlying_refs, b"foo")
+        refs_bar = NamespacedRefsContainer(self._underlying_refs, b"bar")
+
+        # Set ref in foo namespace
+        refs_foo[b"refs/heads/master"] = sha1
+
+        # Set ref in bar namespace
+        refs_bar[b"refs/heads/master"] = sha2
+
+        # Each namespace should only see its own refs (plus shared HEAD)
+        self.assertEqual(sha1, refs_foo[b"refs/heads/master"])
+        self.assertEqual(sha2, refs_bar[b"refs/heads/master"])
+        self.assertEqual({b"HEAD", b"refs/heads/master"}, refs_foo.allkeys())
+        self.assertEqual({b"HEAD", b"refs/heads/master"}, refs_bar.allkeys())
+
+    def test_allkeys_filters_namespace(self) -> None:
+        """Test that allkeys only returns refs in the namespace."""
+        # Add refs in multiple namespaces
+        self._underlying_refs[b"refs/namespaces/foo/refs/heads/master"] = b"a" * 40
+        self._underlying_refs[b"refs/namespaces/foo/refs/heads/develop"] = b"b" * 40
+        self._underlying_refs[b"refs/namespaces/bar/refs/heads/feature"] = b"c" * 40
+        self._underlying_refs[b"refs/heads/global"] = b"d" * 40
+
+        # Only refs in 'foo' namespace should be visible (plus HEAD which is shared)
+        foo_refs = NamespacedRefsContainer(self._underlying_refs, b"foo")
+        self.assertEqual(
+            {b"HEAD", b"refs/heads/master", b"refs/heads/develop"}, foo_refs.allkeys()
+        )
+
+    def test_set_symbolic_ref(self) -> None:
+        """Test symbolic ref creation in namespace."""
+        sha = b"e" * 40
+        self._refs[b"refs/heads/develop"] = sha
+        self._refs.set_symbolic_ref(b"refs/heads/main", b"refs/heads/develop")
+
+        # Both target and link should be namespaced
+        self.assertIn(
+            b"refs/namespaces/foo/refs/heads/main", self._underlying_refs.allkeys()
+        )
+        self.assertEqual(
+            b"ref: refs/namespaces/foo/refs/heads/develop",
+            self._underlying_refs.read_loose_ref(
+                b"refs/namespaces/foo/refs/heads/main"
+            ),
+        )
+
+    def test_remove_if_equals(self) -> None:
+        """Test removing refs from namespace."""
+        sha = b"f" * 40
+        self._refs[b"refs/heads/temp"] = sha
+
+        # Remove the ref
+        self.assertTrue(self._refs.remove_if_equals(b"refs/heads/temp", sha))
+        self.assertNotIn(b"refs/heads/temp", self._refs.allkeys())
+        self.assertNotIn(
+            b"refs/namespaces/foo/refs/heads/temp", self._underlying_refs.allkeys()
+        )
+
+    def test_get_packed_refs(self) -> None:
+        """Test get_packed_refs returns empty dict for DictRefsContainer."""
+        # DictRefsContainer doesn't support packed refs, so just verify
+        # the wrapper returns an empty dict
+        packed = self._refs.get_packed_refs()
+        self.assertEqual({}, packed)
+
+    def test_add_if_new(self) -> None:
+        """Test add_if_new in namespace."""
+        sha = b"1" * 40
+        # Should succeed - ref doesn't exist
+        self.assertTrue(self._refs.add_if_new(b"refs/heads/new", sha))
+        self.assertEqual(sha, self._refs[b"refs/heads/new"])
+
+        # Should fail - ref already exists
+        self.assertFalse(self._refs.add_if_new(b"refs/heads/new", b"2" * 40))
+        self.assertEqual(sha, self._refs[b"refs/heads/new"])
+
+    def test_set_if_equals(self) -> None:
+        """Test set_if_equals in namespace."""
+        sha1 = b"a" * 40
+        sha2 = b"b" * 40
+        self._refs[b"refs/heads/test"] = sha1
+
+        # Should fail with wrong old value
+        self.assertFalse(self._refs.set_if_equals(b"refs/heads/test", b"c" * 40, sha2))
+        self.assertEqual(sha1, self._refs[b"refs/heads/test"])
+
+        # Should succeed with correct old value
+        self.assertTrue(self._refs.set_if_equals(b"refs/heads/test", sha1, sha2))
+        self.assertEqual(sha2, self._refs[b"refs/heads/test"])