Просмотр исходного кода

Add git worktree repair command

Fixes #1799
Jelmer Vernooij 3 месяцев назад
Родитель
Сommit
3926daf5b6
5 измененных файлов с 367 добавлено и 0 удалено
  1. 4 0
      NEWS
  2. 38 0
      dulwich/cli.py
  3. 20 0
      dulwich/porcelain.py
  4. 130 0
      dulwich/worktree.py
  5. 175 0
      tests/test_worktree.py

+ 4 - 0
NEWS

@@ -15,6 +15,10 @@
    other filters that send status messages in final headers.
    other filters that send status messages in final headers.
    (Jelmer Vernooij, #1889)
    (Jelmer Vernooij, #1889)
 
 
+ * Add ``git worktree repair`` command to repair worktree administrative files
+   after worktrees or the main repository have been moved.
+   (Jelmer Vernooij, #1799)
+
 0.24.2	2025-09-25
 0.24.2	2025-09-25
 
 
  * Added ``porcelain.shortlog`` function to summarize commits by author,
  * Added ``porcelain.shortlog`` function to summarize commits by author,

+ 38 - 0
dulwich/cli.py

@@ -4369,6 +4369,43 @@ class cmd_worktree_move(Command):
         return 0
         return 0
 
 
 
 
+class cmd_worktree_repair(Command):
+    """Repair worktree administrative files."""
+
+    """Repair worktree administrative files."""
+
+    def run(self, args: Sequence[str]) -> Optional[int]:
+        """Execute the worktree-repair command.
+
+        Args:
+            args: Command line arguments
+        """
+        parser = argparse.ArgumentParser(
+            description="Repair worktree administrative files",
+            prog="dulwich worktree repair",
+        )
+        parser.add_argument(
+            "path",
+            nargs="*",
+            help="Paths to worktrees to repair (if not specified, repairs all)",
+        )
+
+        parsed_args = parser.parse_args(args)
+
+        from dulwich import porcelain
+
+        paths = parsed_args.path if parsed_args.path else None
+        repaired = porcelain.worktree_repair(repo=".", paths=paths)
+
+        if repaired:
+            for path in repaired:
+                logger.info("Repaired worktree: %s", path)
+        else:
+            logger.info("No worktrees needed repair")
+
+        return 0
+
+
 class cmd_worktree(SuperCommand):
 class cmd_worktree(SuperCommand):
     """Manage multiple working trees."""
     """Manage multiple working trees."""
 
 
@@ -4382,6 +4419,7 @@ class cmd_worktree(SuperCommand):
         "lock": cmd_worktree_lock,
         "lock": cmd_worktree_lock,
         "unlock": cmd_worktree_unlock,
         "unlock": cmd_worktree_unlock,
         "move": cmd_worktree_move,
         "move": cmd_worktree_move,
+        "repair": cmd_worktree_repair,
     }
     }
     default_command = cmd_worktree_list
     default_command = cmd_worktree_list
 
 

+ 20 - 0
dulwich/porcelain.py

@@ -7016,3 +7016,23 @@ def worktree_move(
 
 
     with open_repo_closing(repo) as r:
     with open_repo_closing(repo) as r:
         move_worktree(r, old_path, new_path)
         move_worktree(r, old_path, new_path)
+
+
+def worktree_repair(
+    repo: RepoPath = ".",
+    paths: Optional[list[Union[str, os.PathLike[str]]]] = None,
+) -> list[str]:
+    """Repair worktree administrative files.
+
+    Args:
+        repo: Path to repository
+        paths: Optional list of worktree paths to repair. If None, repairs
+               connections from the main repository to all linked worktrees.
+
+    Returns:
+        List of repaired worktree paths
+    """
+    from .worktree import repair_worktree
+
+    with open_repo_closing(repo) as r:
+        return repair_worktree(r, paths=paths)

+ 130 - 0
dulwich/worktree.py

@@ -238,6 +238,20 @@ class WorkTreeContainer:
         """
         """
         unlock_worktree(self._repo, path)
         unlock_worktree(self._repo, path)
 
 
+    def repair(
+        self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
+    ) -> builtins.list[str]:
+        """Repair worktree administrative files.
+
+        Args:
+            paths: Optional list of worktree paths to repair. If None, repairs
+                   connections from the main repository to all linked worktrees.
+
+        Returns:
+            List of repaired worktree paths
+        """
+        return repair_worktree(self._repo, paths=paths)
+
     def __iter__(self) -> Iterator[WorkTreeInfo]:
     def __iter__(self) -> Iterator[WorkTreeInfo]:
         """Iterate over all worktrees."""
         """Iterate over all worktrees."""
         yield from self.list()
         yield from self.list()
@@ -1214,6 +1228,122 @@ def move_worktree(
         f.write(os.fsencode(gitdir_file) + b"\n")
         f.write(os.fsencode(gitdir_file) + b"\n")
 
 
 
 
+def repair_worktree(
+    repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
+) -> list[str]:
+    """Repair worktree administrative files.
+
+    This repairs the connection between worktrees and the main repository
+    when they have been moved or become corrupted.
+
+    Args:
+        repo: The main repository
+        paths: Optional list of worktree paths to repair. If None, repairs
+               connections from the main repository to all linked worktrees.
+
+    Returns:
+        List of repaired worktree paths
+
+    Raises:
+        ValueError: If a specified path is not a valid worktree
+    """
+    repaired: list[str] = []
+    worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
+
+    if paths:
+        # Repair specific worktrees
+        for path in paths:
+            path_str = os.fspath(path)
+            if isinstance(path_str, bytes):
+                path_str = os.fsdecode(path_str)
+            path_str = os.path.abspath(path_str)
+
+            # Check if this is a linked worktree
+            gitdir_file = os.path.join(path_str, ".git")
+            if not os.path.exists(gitdir_file):
+                raise ValueError(f"Not a valid worktree: {path_str}")
+
+            # Read the .git file to get the worktree control directory
+            try:
+                with open(gitdir_file, "rb") as f:
+                    gitdir_content = f.read().strip()
+                    if gitdir_content.startswith(b"gitdir: "):
+                        worktree_control_path = gitdir_content[8:].decode()
+                    else:
+                        raise ValueError(f"Invalid .git file in worktree: {path_str}")
+            except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
+                raise ValueError(
+                    f"Cannot read .git file in worktree: {path_str}"
+                ) from e
+
+            # Make the path absolute if it's relative
+            if not os.path.isabs(worktree_control_path):
+                worktree_control_path = os.path.abspath(
+                    os.path.join(path_str, worktree_control_path)
+                )
+
+            # Update the gitdir file in the worktree control directory
+            gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
+            if os.path.exists(gitdir_pointer):
+                # Update to point to the current location
+                with open(gitdir_pointer, "wb") as f:
+                    f.write(os.fsencode(gitdir_file) + b"\n")
+                repaired.append(path_str)
+    else:
+        # Repair from main repository to all linked worktrees
+        if not os.path.isdir(worktrees_dir):
+            return repaired
+
+        for entry in os.listdir(worktrees_dir):
+            worktree_control_path = os.path.join(worktrees_dir, entry)
+            if not os.path.isdir(worktree_control_path):
+                continue
+
+            # Read the gitdir file to find where the worktree thinks it is
+            gitdir_path = os.path.join(worktree_control_path, GITDIR)
+            try:
+                with open(gitdir_path, "rb") as f:
+                    gitdir_contents = f.read().strip()
+                    old_gitdir_location = os.fsdecode(gitdir_contents)
+            except (FileNotFoundError, PermissionError):
+                # Can't repair if we can't read the gitdir file
+                continue
+
+            # Get the worktree directory (remove .git suffix)
+            old_worktree_path = os.path.dirname(old_gitdir_location)
+
+            # Check if the .git file exists at the old location
+            if os.path.exists(old_gitdir_location):
+                # Try to read and update the .git file to ensure it points back correctly
+                try:
+                    with open(old_gitdir_location, "rb") as f:
+                        content = f.read().strip()
+                        if content.startswith(b"gitdir: "):
+                            current_pointer = content[8:].decode()
+                            if not os.path.isabs(current_pointer):
+                                current_pointer = os.path.abspath(
+                                    os.path.join(old_worktree_path, current_pointer)
+                                )
+
+                            # If it doesn't point to the right place, fix it
+                            expected_pointer = worktree_control_path
+                            if os.path.abspath(current_pointer) != os.path.abspath(
+                                expected_pointer
+                            ):
+                                # Update the .git file to point to the correct location
+                                with open(old_gitdir_location, "wb") as wf:
+                                    wf.write(
+                                        b"gitdir: "
+                                        + os.fsencode(worktree_control_path)
+                                        + b"\n"
+                                    )
+                                repaired.append(old_worktree_path)
+                except (PermissionError, UnicodeDecodeError):
+                    continue
+
+    return repaired
+
+
 @contextmanager
 @contextmanager
 def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
 def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
     """Create a temporary worktree that is automatically cleaned up.
     """Create a temporary worktree that is automatically cleaned up.

+ 175 - 0
tests/test_worktree.py

@@ -39,6 +39,7 @@ from dulwich.worktree import (
     move_worktree,
     move_worktree,
     prune_worktrees,
     prune_worktrees,
     remove_worktree,
     remove_worktree,
+    repair_worktree,
     temporary_worktree,
     temporary_worktree,
     unlock_worktree,
     unlock_worktree,
 )
 )
@@ -721,6 +722,180 @@ class WorkTreeOperationsTests(WorkTreeTestCase):
             move_worktree(self.repo, wt_path, new_path)
             move_worktree(self.repo, wt_path, new_path)
         self.assertIn("Path already exists", str(cm.exception))
         self.assertIn("Path already exists", str(cm.exception))
 
 
+    def test_repair_worktree_after_manual_move(self) -> None:
+        """Test repairing a worktree after manually moving it."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "original")
+        add_worktree(self.repo, wt_path)
+
+        # Manually move the worktree directory (simulating external move)
+        new_path = os.path.join(self.tempdir, "moved")
+        shutil.move(wt_path, new_path)
+
+        # At this point, the connection is broken
+        # Repair from the moved worktree
+        repaired = repair_worktree(self.repo, paths=[new_path])
+
+        # Should have repaired the worktree
+        self.assertEqual(len(repaired), 1)
+        self.assertEqual(repaired[0], new_path)
+
+        # Verify the worktree is now properly connected
+        worktrees = list_worktrees(self.repo)
+        paths = [wt.path for wt in worktrees]
+        self.assertIn(new_path, paths)
+
+    def test_repair_worktree_from_main_repo(self) -> None:
+        """Test repairing worktree connections from main repository."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "worktree")
+        add_worktree(self.repo, wt_path)
+
+        # Read the .git file to get the control directory
+        gitdir_file = os.path.join(wt_path, ".git")
+        with open(gitdir_file, "rb") as f:
+            content = f.read().strip()
+            control_dir = content[8:].decode()  # Remove "gitdir: " prefix
+
+        # Manually corrupt the .git file to point to wrong location
+        with open(gitdir_file, "wb") as f:
+            f.write(b"gitdir: /wrong/path\n")
+
+        # Repair from main repository
+        repaired = repair_worktree(self.repo)
+
+        # Should have repaired the connection
+        self.assertEqual(len(repaired), 1)
+        self.assertEqual(repaired[0], wt_path)
+
+        # Verify .git file now points to correct location
+        with open(gitdir_file, "rb") as f:
+            content = f.read().strip()
+            new_control_dir = content[8:].decode()
+            self.assertEqual(
+                os.path.abspath(new_control_dir), os.path.abspath(control_dir)
+            )
+
+    def test_repair_worktree_no_repairs_needed(self) -> None:
+        """Test repair when no repairs are needed."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "worktree")
+        add_worktree(self.repo, wt_path)
+
+        # Repair - should return empty list since nothing is broken
+        repaired = repair_worktree(self.repo)
+        self.assertEqual(len(repaired), 0)
+
+    def test_repair_invalid_worktree_path(self) -> None:
+        """Test that repairing an invalid path raises an error."""
+        with self.assertRaises(ValueError) as cm:
+            repair_worktree(self.repo, paths=["/nonexistent/path"])
+        self.assertIn("Not a valid worktree", str(cm.exception))
+
+    def test_repair_multiple_worktrees(self) -> None:
+        """Test repairing multiple worktrees at once."""
+        # Create two worktrees
+        wt_path1 = os.path.join(self.tempdir, "wt1")
+        wt_path2 = os.path.join(self.tempdir, "wt2")
+        add_worktree(self.repo, wt_path1, branch=b"branch1")
+        add_worktree(self.repo, wt_path2, branch=b"branch2")
+
+        # Manually move both worktrees
+        new_path1 = os.path.join(self.tempdir, "moved1")
+        new_path2 = os.path.join(self.tempdir, "moved2")
+        shutil.move(wt_path1, new_path1)
+        shutil.move(wt_path2, new_path2)
+
+        # Repair both at once
+        repaired = repair_worktree(self.repo, paths=[new_path1, new_path2])
+
+        # Both should be repaired
+        self.assertEqual(len(repaired), 2)
+        self.assertIn(new_path1, repaired)
+        self.assertIn(new_path2, repaired)
+
+    def test_repair_worktree_with_relative_paths(self) -> None:
+        """Test that repair handles worktrees with relative paths in gitdir."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "worktree")
+        add_worktree(self.repo, wt_path)
+
+        # Manually move the worktree
+        new_path = os.path.join(self.tempdir, "new-location")
+        shutil.move(wt_path, new_path)
+
+        # Repair from the new location
+        repaired = repair_worktree(self.repo, paths=[new_path])
+
+        # Should have repaired successfully
+        self.assertEqual(len(repaired), 1)
+        self.assertEqual(repaired[0], new_path)
+
+        # Verify the gitdir pointer was updated
+        from dulwich.repo import GITDIR, WORKTREES
+
+        worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
+        for entry in os.listdir(worktrees_dir):
+            gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
+            if os.path.exists(gitdir_path):
+                with open(gitdir_path, "rb") as f:
+                    content = f.read().strip()
+                    gitdir_location = os.fsdecode(content)
+                    # Should point to the new .git file location
+                    self.assertTrue(gitdir_location.endswith(".git"))
+
+    def test_repair_worktree_container_method(self) -> None:
+        """Test the WorkTreeContainer.repair() method."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "worktree")
+        add_worktree(self.repo, wt_path)
+
+        # Manually move it
+        new_path = os.path.join(self.tempdir, "moved")
+        shutil.move(wt_path, new_path)
+
+        # Use the container method to repair
+        repaired = self.repo.worktrees.repair(paths=[new_path])
+
+        # Should have repaired
+        self.assertEqual(len(repaired), 1)
+        self.assertEqual(repaired[0], new_path)
+
+    def test_repair_with_missing_gitdir_pointer(self) -> None:
+        """Test repair when gitdir pointer file is missing."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "worktree")
+        add_worktree(self.repo, wt_path)
+
+        # Find and remove the gitdir pointer file
+        from dulwich.repo import GITDIR, WORKTREES
+
+        worktrees_dir = os.path.join(self.repo.controldir(), WORKTREES)
+        for entry in os.listdir(worktrees_dir):
+            gitdir_path = os.path.join(worktrees_dir, entry, GITDIR)
+            if os.path.exists(gitdir_path):
+                os.remove(gitdir_path)
+
+        # Repair should not crash, but won't repair anything
+        repaired = repair_worktree(self.repo, paths=[wt_path])
+        self.assertEqual(len(repaired), 0)
+
+    def test_repair_worktree_with_corrupted_git_file(self) -> None:
+        """Test repair with a corrupted .git file."""
+        # Create a worktree
+        wt_path = os.path.join(self.tempdir, "worktree")
+        add_worktree(self.repo, wt_path)
+
+        # Corrupt the .git file
+        gitdir_file = os.path.join(wt_path, ".git")
+        with open(gitdir_file, "wb") as f:
+            f.write(b"invalid content\n")
+
+        # Attempting to repair should raise an error
+        with self.assertRaises(ValueError) as cm:
+            repair_worktree(self.repo, paths=[wt_path])
+        self.assertIn("Invalid .git file", str(cm.exception))
+
 
 
 class TemporaryWorktreeTests(TestCase):
 class TemporaryWorktreeTests(TestCase):
     """Tests for temporary_worktree context manager."""
     """Tests for temporary_worktree context manager."""