Răsfoiți Sursa

Implement mv porcelain. Fixes #1633

Jelmer Vernooij 1 lună în urmă
părinte
comite
57af1664e2
5 a modificat fișierele cu 283 adăugiri și 30 ștergeri
  1. 0 30
      CLAUDE.md
  2. 2 0
      NEWS
  3. 17 0
      dulwich/cli.py
  4. 103 0
      dulwich/porcelain.py
  5. 161 0
      tests/test_porcelain.py

+ 0 - 30
CLAUDE.md

@@ -1,30 +0,0 @@
-# CLAUDE.md
-
-This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
-
-## Build/Test Commands
-- Build: `make build`
-- Run all tests: `make check`
-- Run single test: `PYTHONPATH=$(pwd) python3 -m unittest tests.test_module_name.TestClassName.test_method_name`
-- Type checking: `make typing` or `python3 -m mypy dulwich`
-- Lint code: `make style` or `ruff check .`
-- Fix lint issues: `make fix` or `ruff check --fix .`
-- Format code: `make reformat` or `ruff format .`
-- Generate coverage: `make coverage` or `make coverage-html` for HTML report
-
-## Code Style Guidelines
-- Follow PEP8 with accommodations listed in `pyproject.toml` (ruff config)
-- Use Google-style docstrings for public methods, functions and classes
-- Triple quotes should always be """, single quotes are ' (unless " reduces escaping)
-- Git paths/filenames are treated as bytestrings (bytes), not unicode strings
-- On-disk filenames: use regular strings or pathlib.Path objects
-- Ensure all functionality is available in pure Python (Rust implementations optional)
-- Add unit tests for new functionality and bug fixes
-- All contributions must be under Apache License 2.0+ or GPL 2.0+
-- When adding new test files, ensure the test accumulation functions are updated
-  (i.e. ``self_test_suite()`` in `tests/__init__.py` or ``test_suite()`` in `tests/compat/__init__.py`)
-- Do not ignore exceptions. Never catch ``Exception`` unless you're going to
-  re-raise it, always something more specific. Catch specific exceptions. Don't catch OSError, but specific subclasses.
-- Keep code in try/except blocks to a minimum, and use else:. This keeps the code that can raise an exception
-  to a minimum.
-- There is almost never a good reason to catch AttributeError.

+ 2 - 0
NEWS

@@ -66,6 +66,8 @@
    Supports filtering author, committer, and message fields.
    (#745, Jelmer Vernooij)
 
+ * Add ``mv`` porcelain command. (Jelmer Vernooij, #1633)
+
 0.23.0	2025-06-21
 
  * Add basic ``rebase`` subcommand. (Jelmer Vernooij)

+ 17 - 0
dulwich/cli.py

@@ -201,6 +201,22 @@ class cmd_rm(Command):
         porcelain.remove(".", paths=args.path, cached=args.cached)
 
 
+class cmd_mv(Command):
+    def run(self, argv) -> None:
+        parser = argparse.ArgumentParser()
+        parser.add_argument(
+            "-f",
+            "--force",
+            action="store_true",
+            help="Force move even if destination exists",
+        )
+        parser.add_argument("source", type=Path)
+        parser.add_argument("destination", type=Path)
+        args = parser.parse_args(argv)
+
+        porcelain.mv(".", args.source, args.destination, force=args.force)
+
+
 class cmd_fetch_pack(Command):
     def run(self, argv) -> None:
         parser = argparse.ArgumentParser()
@@ -1795,6 +1811,7 @@ commands = {
     "revert": cmd_revert,
     "rev-list": cmd_rev_list,
     "rm": cmd_rm,
+    "mv": cmd_mv,
     "show": cmd_show,
     "stash": cmd_stash,
     "status": cmd_status,

+ 103 - 0
dulwich/porcelain.py

@@ -44,6 +44,7 @@ Currently implemented:
  * ls_tree
  * merge
  * merge_tree
+ * mv/move
  * prune
  * pull
  * push
@@ -832,6 +833,108 @@ def remove(repo=".", paths=None, cached=False) -> None:
 rm = remove
 
 
+def mv(
+    repo: Union[str, os.PathLike, BaseRepo],
+    source: Union[str, bytes, os.PathLike],
+    destination: Union[str, bytes, os.PathLike],
+    force: bool = False,
+) -> None:
+    """Move or rename a file, directory, or symlink.
+
+    Args:
+      repo: Path to the repository
+      source: Path to move from
+      destination: Path to move to
+      force: Force move even if destination exists
+
+    Raises:
+      Error: If source doesn't exist, is not tracked, or destination already exists (without force)
+    """
+    with open_repo_closing(repo) as r:
+        index = r.open_index()
+
+        # Handle paths - convert to string if necessary
+        if isinstance(source, bytes):
+            source = source.decode(sys.getfilesystemencoding())
+        elif hasattr(source, "__fspath__"):
+            source = os.fspath(source)
+        else:
+            source = str(source)
+
+        if isinstance(destination, bytes):
+            destination = destination.decode(sys.getfilesystemencoding())
+        elif hasattr(destination, "__fspath__"):
+            destination = os.fspath(destination)
+        else:
+            destination = str(destination)
+
+        # Get full paths
+        if os.path.isabs(source):
+            source_full_path = source
+        else:
+            # Treat relative paths as relative to the repository root
+            source_full_path = os.path.join(r.path, source)
+
+        if os.path.isabs(destination):
+            destination_full_path = destination
+        else:
+            # Treat relative paths as relative to the repository root
+            destination_full_path = os.path.join(r.path, destination)
+
+        # Check if destination is a directory
+        if os.path.isdir(destination_full_path):
+            # Move source into destination directory
+            basename = os.path.basename(source_full_path)
+            destination_full_path = os.path.join(destination_full_path, basename)
+
+        # Convert to tree paths for index
+        source_tree_path = path_to_tree_path(r.path, source_full_path)
+        destination_tree_path = path_to_tree_path(r.path, destination_full_path)
+
+        # Check if source exists in index
+        if source_tree_path not in index:
+            raise Error(f"source '{source}' is not under version control")
+
+        # Check if source exists in filesystem
+        if not os.path.exists(source_full_path):
+            raise Error(f"source '{source}' does not exist")
+
+        # Check if destination already exists
+        if os.path.exists(destination_full_path) and not force:
+            raise Error(f"destination '{destination}' already exists (use -f to force)")
+
+        # Check if destination is already in index
+        if destination_tree_path in index and not force:
+            raise Error(
+                f"destination '{destination}' already exists in index (use -f to force)"
+            )
+
+        # Get the index entry for the source
+        source_entry = index[source_tree_path]
+
+        # Convert to bytes for file operations
+        source_full_path_bytes = os.fsencode(source_full_path)
+        destination_full_path_bytes = os.fsencode(destination_full_path)
+
+        # Create parent directory for destination if needed
+        dest_dir = os.path.dirname(destination_full_path_bytes)
+        if dest_dir and not os.path.exists(dest_dir):
+            os.makedirs(dest_dir)
+
+        # Move the file in the filesystem
+        if os.path.exists(destination_full_path_bytes) and force:
+            os.remove(destination_full_path_bytes)
+        os.rename(source_full_path_bytes, destination_full_path_bytes)
+
+        # Update the index
+        del index[source_tree_path]
+        index[destination_tree_path] = source_entry
+        index.write()
+
+
+move = mv
+
+
 def commit_decode(commit, contents, default_encoding=DEFAULT_ENCODING):
     if commit.encoding:
         encoding = commit.encoding.decode("ascii")

+ 161 - 0
tests/test_porcelain.py

@@ -1610,6 +1610,167 @@ class RemoveTests(PorcelainTestCase):
         self.assertFalse(os.path.exists(fullpath))
 
 
+class MvTests(PorcelainTestCase):
+    def test_mv_file(self) -> None:
+        # Create a file
+        fullpath = os.path.join(self.repo.path, "foo")
+        with open(fullpath, "w") as f:
+            f.write("BAR")
+
+        # Add and commit the file
+        porcelain.add(self.repo.path, paths=[fullpath])
+        porcelain.commit(
+            repo=self.repo,
+            message=b"test",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Move the file
+        porcelain.mv(self.repo.path, "foo", "bar")
+
+        # Verify old path doesn't exist and new path does
+        self.assertFalse(os.path.exists(os.path.join(self.repo.path, "foo")))
+        self.assertTrue(os.path.exists(os.path.join(self.repo.path, "bar")))
+
+        # Verify index was updated
+        index = self.repo.open_index()
+        self.assertNotIn(b"foo", index)
+        self.assertIn(b"bar", index)
+
+    def test_mv_file_to_existing_directory(self) -> None:
+        # Create a file and a directory
+        fullpath = os.path.join(self.repo.path, "foo")
+        with open(fullpath, "w") as f:
+            f.write("BAR")
+
+        dirpath = os.path.join(self.repo.path, "mydir")
+        os.makedirs(dirpath)
+
+        # Add and commit the file
+        porcelain.add(self.repo.path, paths=[fullpath])
+        porcelain.commit(
+            repo=self.repo,
+            message=b"test",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Move the file into the directory
+        porcelain.mv(self.repo.path, "foo", "mydir")
+
+        # Verify file moved into directory
+        self.assertFalse(os.path.exists(os.path.join(self.repo.path, "foo")))
+        self.assertTrue(os.path.exists(os.path.join(self.repo.path, "mydir", "foo")))
+
+        # Verify index was updated
+        index = self.repo.open_index()
+        self.assertNotIn(b"foo", index)
+        self.assertIn(b"mydir/foo", index)
+
+    def test_mv_file_force_overwrite(self) -> None:
+        # Create two files
+        fullpath1 = os.path.join(self.repo.path, "foo")
+        with open(fullpath1, "w") as f:
+            f.write("FOO")
+
+        fullpath2 = os.path.join(self.repo.path, "bar")
+        with open(fullpath2, "w") as f:
+            f.write("BAR")
+
+        # Add and commit both files
+        porcelain.add(self.repo.path, paths=[fullpath1, fullpath2])
+        porcelain.commit(
+            repo=self.repo,
+            message=b"test",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Try to move without force (should fail)
+        self.assertRaises(porcelain.Error, porcelain.mv, self.repo.path, "foo", "bar")
+
+        # Move with force
+        porcelain.mv(self.repo.path, "foo", "bar", force=True)
+
+        # Verify foo doesn't exist and bar has foo's content
+        self.assertFalse(os.path.exists(os.path.join(self.repo.path, "foo")))
+        with open(os.path.join(self.repo.path, "bar")) as f:
+            self.assertEqual(f.read(), "FOO")
+
+    def test_mv_file_not_tracked(self) -> None:
+        # Create an untracked file
+        fullpath = os.path.join(self.repo.path, "untracked")
+        with open(fullpath, "w") as f:
+            f.write("UNTRACKED")
+
+        # Try to move it (should fail)
+        self.assertRaises(
+            porcelain.Error, porcelain.mv, self.repo.path, "untracked", "tracked"
+        )
+
+    def test_mv_file_not_exists(self) -> None:
+        # Try to move a non-existent file
+        self.assertRaises(
+            porcelain.Error, porcelain.mv, self.repo.path, "nonexistent", "destination"
+        )
+
+    def test_mv_absolute_paths(self) -> None:
+        # Create a file
+        fullpath = os.path.join(self.repo.path, "foo")
+        with open(fullpath, "w") as f:
+            f.write("BAR")
+
+        # Add and commit the file
+        porcelain.add(self.repo.path, paths=[fullpath])
+        porcelain.commit(
+            repo=self.repo,
+            message=b"test",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Move using absolute paths
+        dest_path = os.path.join(self.repo.path, "bar")
+        porcelain.mv(self.repo.path, fullpath, dest_path)
+
+        # Verify file moved
+        self.assertFalse(os.path.exists(fullpath))
+        self.assertTrue(os.path.exists(dest_path))
+
+    def test_mv_from_different_directory(self) -> None:
+        # Create a subdirectory with a file
+        subdir = os.path.join(self.repo.path, "mydir")
+        os.makedirs(subdir)
+        fullpath = os.path.join(subdir, "myfile")
+        with open(fullpath, "w") as f:
+            f.write("BAR")
+
+        # Add and commit the file
+        porcelain.add(self.repo.path, paths=[fullpath])
+        porcelain.commit(
+            repo=self.repo,
+            message=b"test",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Change to a different directory and move the file
+        cwd = os.getcwd()
+        tempdir = tempfile.mkdtemp()
+        try:
+            os.chdir(tempdir)
+            # Move the file using relative path from repository root
+            porcelain.mv(self.repo.path, "mydir/myfile", "renamed")
+        finally:
+            os.chdir(cwd)
+            os.rmdir(tempdir)
+
+        # Verify file was moved
+        self.assertFalse(os.path.exists(fullpath))
+        self.assertTrue(os.path.exists(os.path.join(self.repo.path, "renamed")))
+
+
 class LogTests(PorcelainTestCase):
     def test_simple(self) -> None:
         c1, c2, c3 = build_commit_graph(