فهرست منبع

Add support for replace command

Implements the git replace functionality to create refs that replace
objects without altering repository history. This is useful for fixing
broken commits, replacing large files with placeholders, or temporarily
patching objects for testing.

Fixes #1834
Jelmer Vernooij 2 ماه پیش
والد
کامیت
daf8253919
6فایلهای تغییر یافته به همراه370 افزوده شده و 0 حذف شده
  1. 3 0
      NEWS
  2. 84 0
      dulwich/cli.py
  3. 80 0
      dulwich/porcelain.py
  4. 21 0
      dulwich/refs.py
  5. 89 0
      tests/test_cli.py
  6. 93 0
      tests/test_porcelain.py

+ 3 - 0
NEWS

@@ -24,6 +24,9 @@
    prefetch tasks. Supports automatic maintenance with ``--auto`` flag and task-specific
    prefetch tasks. Supports automatic maintenance with ``--auto`` flag and task-specific
    configuration. (Jelmer Vernooij)
    configuration. (Jelmer Vernooij)
 
 
+ * Add support for ``dulwich replace`` command to create refs that replace objects.
+   (Jelmer Vernooij, #1834)
+
 0.24.7	2025-10-23
 0.24.7	2025-10-23
 
 
  * Add sparse index support for improved performance with large repositories.
  * Add sparse index support for improved performance with large repositories.

+ 84 - 0
dulwich/cli.py

@@ -4112,6 +4112,89 @@ class cmd_notes(SuperCommand):
     default_command = cmd_notes_list
     default_command = cmd_notes_list
 
 
 
 
+class cmd_replace_list(Command):
+    """List all replacement refs."""
+
+    def run(self, args: Sequence[str]) -> None:
+        """Execute the replace-list command.
+
+        Args:
+            args: Command line arguments
+        """
+        parser = argparse.ArgumentParser()
+        parser.parse_args(args)
+
+        replacements = porcelain.replace_list(".")
+        for object_sha, replacement_sha in replacements:
+            sys.stdout.write(
+                f"{object_sha.decode('ascii')} -> {replacement_sha.decode('ascii')}\n"
+            )
+
+
+class cmd_replace_delete(Command):
+    """Delete a replacement ref."""
+
+    def run(self, args: Sequence[str]) -> Optional[int]:
+        """Execute the replace-delete command.
+
+        Args:
+            args: Command line arguments
+
+        Returns:
+            Exit code (0 for success, 1 for error)
+        """
+        parser = argparse.ArgumentParser()
+        parser.add_argument("object", help="Object whose replacement should be removed")
+        parsed_args = parser.parse_args(args)
+
+        try:
+            porcelain.replace_delete(".", parsed_args.object)
+            logger.info("Deleted replacement for %s", parsed_args.object)
+            return None
+        except KeyError as e:
+            logger.error(str(e))
+            return 1
+
+
+class cmd_replace(SuperCommand):
+    """Create, list, and delete replacement refs."""
+
+    subcommands: ClassVar[dict[str, type[Command]]] = {
+        "list": cmd_replace_list,
+        "delete": cmd_replace_delete,
+    }
+
+    default_command = cmd_replace_list
+
+    def run(self, args: Sequence[str]) -> Optional[int]:
+        """Execute the replace command.
+
+        Args:
+            args: Command line arguments
+
+        Returns:
+            Exit code (0 for success, 1 for error)
+        """
+        # Special case: if we have exactly 2 args and no subcommand, treat as create
+        if len(args) == 2 and args[0] not in self.subcommands:
+            # This is the create form: git replace <object> <replacement>
+            parser = argparse.ArgumentParser()
+            parser.add_argument("object", help="Object to replace")
+            parser.add_argument("replacement", help="Replacement object")
+            parsed_args = parser.parse_args(args)
+
+            porcelain.replace_create(".", parsed_args.object, parsed_args.replacement)
+            logger.info(
+                "Created replacement: %s -> %s",
+                parsed_args.object,
+                parsed_args.replacement,
+            )
+            return None
+
+        # Otherwise, delegate to supercommand handling
+        return super().run(args)
+
+
 class cmd_cherry(Command):
 class cmd_cherry(Command):
     """Find commits not merged upstream."""
     """Find commits not merged upstream."""
 
 
@@ -6030,6 +6113,7 @@ commands = {
     "reflog": cmd_reflog,
     "reflog": cmd_reflog,
     "remote": cmd_remote,
     "remote": cmd_remote,
     "repack": cmd_repack,
     "repack": cmd_repack,
+    "replace": cmd_replace,
     "reset": cmd_reset,
     "reset": cmd_reset,
     "revert": cmd_revert,
     "revert": cmd_revert,
     "rev-list": cmd_rev_list,
     "rev-list": cmd_rev_list,

+ 80 - 0
dulwich/porcelain.py

@@ -55,6 +55,7 @@ Currently implemented:
  * rm
  * rm
  * remote{_add}
  * remote{_add}
  * receive_pack
  * receive_pack
+ * replace{_create,_delete,_list}
  * reset
  * reset
  * revert
  * revert
  * sparse_checkout
  * sparse_checkout
@@ -192,12 +193,14 @@ from .refs import (
     LOCAL_BRANCH_PREFIX,
     LOCAL_BRANCH_PREFIX,
     LOCAL_NOTES_PREFIX,
     LOCAL_NOTES_PREFIX,
     LOCAL_REMOTE_PREFIX,
     LOCAL_REMOTE_PREFIX,
+    LOCAL_REPLACE_PREFIX,
     LOCAL_TAG_PREFIX,
     LOCAL_TAG_PREFIX,
     Ref,
     Ref,
     SymrefLoop,
     SymrefLoop,
     _import_remote_refs,
     _import_remote_refs,
     filter_ref_prefix,
     filter_ref_prefix,
     local_branch_name,
     local_branch_name,
+    local_replace_name,
     local_tag_name,
     local_tag_name,
     shorten_ref_name,
     shorten_ref_name,
 )
 )
@@ -2532,6 +2535,77 @@ def notes_list(repo: RepoPath, ref: bytes = b"commits") -> list[tuple[bytes, byt
         return r.notes.list_notes(notes_ref, config=config)
         return r.notes.list_notes(notes_ref, config=config)
 
 
 
 
+def replace_list(repo: RepoPath) -> list[tuple[bytes, bytes]]:
+    """List all replacement refs.
+
+    Args:
+      repo: Path to repository
+
+    Returns:
+      List of tuples of (object_sha, replacement_sha) where object_sha is the
+      object being replaced and replacement_sha is what it's replaced with
+    """
+    with open_repo_closing(repo) as r:
+        replacements = []
+        for ref in r.refs.keys():
+            if ref.startswith(LOCAL_REPLACE_PREFIX):
+                object_sha = ref[len(LOCAL_REPLACE_PREFIX) :]
+                replacement_sha = r.refs[ref]
+                replacements.append((object_sha, replacement_sha))
+        return replacements
+
+
+def replace_delete(repo: RepoPath, object_sha: Union[str, bytes]) -> None:
+    """Delete a replacement ref.
+
+    Args:
+      repo: Path to repository
+      object_sha: SHA of the object whose replacement should be removed
+    """
+    with open_repo_closing(repo) as r:
+        # Convert to bytes if string
+        if isinstance(object_sha, str):
+            object_sha_hex = object_sha.encode("ascii")
+        else:
+            object_sha_hex = object_sha
+
+        replace_ref = _make_replace_ref(object_sha_hex)
+        if replace_ref not in r.refs:
+            raise KeyError(
+                f"No replacement ref found for {object_sha_hex.decode('ascii')}"
+            )
+        del r.refs[replace_ref]
+
+
+def replace_create(
+    repo: RepoPath,
+    object_sha: Union[str, bytes],
+    replacement_sha: Union[str, bytes],
+) -> None:
+    """Create a replacement ref to replace one object with another.
+
+    Args:
+      repo: Path to repository
+      object_sha: SHA of the object to replace
+      replacement_sha: SHA of the replacement object
+    """
+    with open_repo_closing(repo) as r:
+        # Convert to bytes if string
+        if isinstance(object_sha, str):
+            object_sha_hex = object_sha.encode("ascii")
+        else:
+            object_sha_hex = object_sha
+
+        if isinstance(replacement_sha, str):
+            replacement_sha_hex = replacement_sha.encode("ascii")
+        else:
+            replacement_sha_hex = replacement_sha
+
+        # Create the replacement ref
+        replace_ref = _make_replace_ref(object_sha_hex)
+        r.refs[replace_ref] = replacement_sha_hex
+
+
 def reset(
 def reset(
     repo: Union[str, os.PathLike[str], Repo],
     repo: Union[str, os.PathLike[str], Repo],
     mode: str,
     mode: str,
@@ -3611,6 +3685,12 @@ def _make_tag_ref(name: Union[str, bytes]) -> Ref:
     return local_tag_name(name)
     return local_tag_name(name)
 
 
 
 
+def _make_replace_ref(name: Union[str, bytes]) -> Ref:
+    if isinstance(name, str):
+        name = name.encode(DEFAULT_ENCODING)
+    return local_replace_name(name)
+
+
 def branch_delete(
 def branch_delete(
     repo: RepoPath, name: Union[str, bytes, Sequence[Union[str, bytes]]]
     repo: RepoPath, name: Union[str, bytes, Sequence[Union[str, bytes]]]
 ) -> None:
 ) -> None:

+ 21 - 0
dulwich/refs.py

@@ -54,6 +54,7 @@ LOCAL_BRANCH_PREFIX = b"refs/heads/"
 LOCAL_TAG_PREFIX = b"refs/tags/"
 LOCAL_TAG_PREFIX = b"refs/tags/"
 LOCAL_REMOTE_PREFIX = b"refs/remotes/"
 LOCAL_REMOTE_PREFIX = b"refs/remotes/"
 LOCAL_NOTES_PREFIX = b"refs/notes/"
 LOCAL_NOTES_PREFIX = b"refs/notes/"
+LOCAL_REPLACE_PREFIX = b"refs/replace/"
 BAD_REF_CHARS = set(b"\177 ~^:?*[")
 BAD_REF_CHARS = set(b"\177 ~^:?*[")
 PEELED_TAG_SUFFIX = b"^{}"
 PEELED_TAG_SUFFIX = b"^{}"
 
 
@@ -1508,6 +1509,26 @@ def local_tag_name(name: bytes) -> bytes:
     return LOCAL_TAG_PREFIX + name
     return LOCAL_TAG_PREFIX + name
 
 
 
 
+def local_replace_name(name: bytes) -> bytes:
+    """Build a full replace ref from a short name.
+
+    Args:
+      name: Short replace name (object SHA) or full ref
+
+    Returns:
+      Full replace ref name (e.g., b"refs/replace/<sha>")
+
+    Examples:
+      >>> local_replace_name(b"abc123")
+      b'refs/replace/abc123'
+      >>> local_replace_name(b"refs/replace/abc123")
+      b'refs/replace/abc123'
+    """
+    if name.startswith(LOCAL_REPLACE_PREFIX):
+        return name
+    return LOCAL_REPLACE_PREFIX + name
+
+
 def extract_branch_name(ref: bytes) -> bytes:
 def extract_branch_name(ref: bytes) -> bytes:
     """Extract branch name from a full branch ref.
     """Extract branch name from a full branch ref.
 
 

+ 89 - 0
tests/test_cli.py

@@ -3958,5 +3958,94 @@ class InterpretTrailersCommandTest(DulwichCliTestCase):
         self.assertIn("Bug: 12345", stdout)
         self.assertIn("Bug: 12345", stdout)
 
 
 
 
+class ReplaceCommandTest(DulwichCliTestCase):
+    """Tests for replace command."""
+
+    def test_replace_create(self):
+        """Test creating a replacement ref."""
+        # Create two commits
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create replacement using the create form (decode to string for CLI)
+        c1_str = c1.id.decode("ascii")
+        c2_str = c2.id.decode("ascii")
+        _result, _stdout, _stderr = self._run_cli("replace", c1_str, c2_str)
+
+        # Verify the replacement ref was created
+        replace_ref = b"refs/replace/" + c1.id
+        self.assertIn(replace_ref, self.repo.refs.keys())
+        self.assertEqual(c2.id, self.repo.refs[replace_ref])
+
+    def test_replace_list_empty(self):
+        """Test listing replacements when there are none."""
+        _result, stdout, _stderr = self._run_cli("replace", "list")
+        self.assertEqual("", stdout)
+
+    def test_replace_list(self):
+        """Test listing replacement refs."""
+        # Create two commits
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create replacement
+        c1_str = c1.id.decode("ascii")
+        c2_str = c2.id.decode("ascii")
+        self._run_cli("replace", c1_str, c2_str)
+
+        # List replacements
+        _result, stdout, _stderr = self._run_cli("replace", "list")
+        self.assertIn(c1_str, stdout)
+        self.assertIn(c2_str, stdout)
+
+    def test_replace_default_list(self):
+        """Test that replace without subcommand defaults to list."""
+        # Create two commits
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create replacement
+        c1_str = c1.id.decode("ascii")
+        c2_str = c2.id.decode("ascii")
+        self._run_cli("replace", c1_str, c2_str)
+
+        # Call replace without subcommand (should list)
+        _result, stdout, _stderr = self._run_cli("replace")
+        self.assertIn(c1_str, stdout)
+        self.assertIn(c2_str, stdout)
+
+    def test_replace_delete(self):
+        """Test deleting a replacement ref."""
+        # Create two commits
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create replacement
+        c1_str = c1.id.decode("ascii")
+        c2_str = c2.id.decode("ascii")
+        self._run_cli("replace", c1_str, c2_str)
+
+        # Verify it exists
+        replace_ref = b"refs/replace/" + c1.id
+        self.assertIn(replace_ref, self.repo.refs.keys())
+
+        # Delete the replacement
+        _result, _stdout, _stderr = self._run_cli("replace", "delete", c1_str)
+
+        # Verify it's gone
+        self.assertNotIn(replace_ref, self.repo.refs.keys())
+
+    def test_replace_delete_nonexistent(self):
+        """Test deleting a nonexistent replacement ref fails."""
+        # Create a commit
+        [c1] = build_commit_graph(self.repo.object_store, [[1]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Try to delete a non-existent replacement
+        c1_str = c1.id.decode("ascii")
+        result, _stdout, _stderr = self._run_cli("replace", "delete", c1_str)
+        self.assertEqual(result, 1)
+
+
 if __name__ == "__main__":
 if __name__ == "__main__":
     unittest.main()
     unittest.main()

+ 93 - 0
tests/test_porcelain.py

@@ -10706,3 +10706,96 @@ class GrepTests(PorcelainTestCase):
         outstream = StringIO()
         outstream = StringIO()
         with self.assertRaises(ValueError):
         with self.assertRaises(ValueError):
             porcelain.grep(empty_repo, "pattern", outstream=outstream)
             porcelain.grep(empty_repo, "pattern", outstream=outstream)
+
+
+class ReplaceListTests(PorcelainTestCase):
+    def test_empty(self) -> None:
+        """Test listing replacements when there are none."""
+        replacements = porcelain.replace_list(self.repo)
+        self.assertEqual([], replacements)
+
+    def test_list_replacements(self) -> None:
+        """Test listing replacement refs."""
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create a replacement
+        porcelain.replace_create(self.repo, c1.id, c2.id)
+
+        # List replacements
+        replacements = porcelain.replace_list(self.repo)
+        self.assertEqual(1, len(replacements))
+        self.assertEqual((c1.id, c2.id), replacements[0])
+
+
+class ReplaceCreateTests(PorcelainTestCase):
+    def test_create_replacement(self) -> None:
+        """Test creating a replacement ref."""
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create a replacement
+        porcelain.replace_create(self.repo, c1.id, c2.id)
+
+        # Verify the replacement ref was created (c1.id is already 40-char hex bytes)
+        replace_ref = b"refs/replace/" + c1.id
+        self.assertIn(replace_ref, self.repo.refs)
+        self.assertEqual(c2.id, self.repo.refs[replace_ref])
+
+    def test_create_replacement_with_bytes(self) -> None:
+        """Test creating a replacement ref with bytes arguments."""
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create a replacement using bytes arguments
+        porcelain.replace_create(self.repo, c1.id, c2.id)
+
+        # Verify the replacement ref was created
+        replace_ref = b"refs/replace/" + c1.id
+        self.assertIn(replace_ref, self.repo.refs)
+        self.assertEqual(c2.id, self.repo.refs[replace_ref])
+
+
+class ReplaceDeleteTests(PorcelainTestCase):
+    def test_delete_replacement(self) -> None:
+        """Test deleting a replacement ref."""
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create a replacement
+        porcelain.replace_create(self.repo, c1.id, c2.id)
+
+        # Verify it exists
+        replacements = porcelain.replace_list(self.repo)
+        self.assertEqual(1, len(replacements))
+
+        # Delete the replacement
+        porcelain.replace_delete(self.repo, c1.id)
+
+        # Verify it's gone
+        replacements = porcelain.replace_list(self.repo)
+        self.assertEqual(0, len(replacements))
+
+    def test_delete_replacement_with_bytes(self) -> None:
+        """Test deleting a replacement ref with bytes argument."""
+        [c1, c2] = build_commit_graph(self.repo.object_store, [[1], [2]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Create a replacement
+        porcelain.replace_create(self.repo, c1.id, c2.id)
+
+        # Delete using bytes argument
+        porcelain.replace_delete(self.repo, c1.id)
+
+        # Verify it's gone
+        replacements = porcelain.replace_list(self.repo)
+        self.assertEqual(0, len(replacements))
+
+    def test_delete_nonexistent_replacement(self) -> None:
+        """Test deleting a replacement ref that doesn't exist raises KeyError."""
+        [c1] = build_commit_graph(self.repo.object_store, [[1]])
+        self.repo[b"HEAD"] = c1.id
+
+        # Try to delete a non-existent replacement
+        with self.assertRaises(KeyError):
+            porcelain.replace_delete(self.repo, c1.id)