Преглед изворни кода

Remove porcelain module usage from lower-level tests

Jelmer Vernooij пре 1 недеља
родитељ
комит
bffdcc3808
2 измењених фајлова са 117 додато и 106 уклоњено
  1. 72 59
      tests/test_repository.py
  2. 45 47
      tests/test_worktree.py

+ 72 - 59
tests/test_repository.py

@@ -31,9 +31,10 @@ import tempfile
 import time
 import warnings
 
-from dulwich import errors, objects, porcelain
+from dulwich import errors, objects
 from dulwich.config import Config
 from dulwich.errors import NotGitRepository
+from dulwich.index import get_unstaged_changes as _get_unstaged_changes
 from dulwich.object_store import tree_lookup_path
 from dulwich.repo import (
     InvalidUserIdentity,
@@ -50,6 +51,14 @@ from . import TestCase, skipIf
 missing_sha = b"b91fa4d900e17e99b433218e988c4eb4a3e9a097"
 
 
+def get_unstaged_changes(repo):
+    """Helper to get unstaged changes for a repo."""
+    index = repo.open_index()
+    normalizer = repo.get_blob_normalizer()
+    filter_callback = normalizer.checkin_normalize if normalizer else None
+    return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
+
+
 class CreateRepositoryTests(TestCase):
     def assertFileContentsEqual(self, expected, repo, path) -> None:
         f = repo.get_named_file(path)
@@ -1705,96 +1714,88 @@ class BuildRepoRootTests(TestCase):
 
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.stage(["new_dir/foo"])
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "a") as f:
             f.write("something new")
-        self._repo.get_worktree().unstage(["new_dir/foo"])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [
-                {"add": [], "delete": [], "modify": []},
-                [os.fsencode(os.path.join("new_dir", "foo"))],
-                [],
-            ],
-            status,
-        )
+        wt.unstage(["new_dir/foo"])
+
+        unstaged = get_unstaged_changes(self._repo)
+        self.assertEqual([os.fsencode(os.path.join("new_dir", "foo"))], unstaged)
 
     def test_unstage_while_no_commit(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+        wt = self._repo.get_worktree()
+        wt.stage([file])
+        wt.unstage([file])
+
+        # Check that file is no longer in index
+        index = self._repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_add_file(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+        wt.stage([file])
+        wt.unstage([file])
+
+        # Check that file is no longer in index
+        index = self._repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_modify_file(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.stage([file])
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "a") as f:
             f.write("broken")
-        porcelain.add(self._repo, paths=[full_path])
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
+        wt.stage([file])
+        wt.unstage([file])
 
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+        unstaged = get_unstaged_changes(self._repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
     def test_unstage_remove_file(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.stage([file])
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         os.remove(full_path)
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+        wt.unstage([file])
+
+        unstaged = get_unstaged_changes(self._repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
     def test_reset_index(self) -> None:
         r = self._repo
@@ -1803,20 +1804,32 @@ class BuildRepoRootTests(TestCase):
         with open(os.path.join(r.path, "b"), "wb") as f:
             f.write(b"added")
         r.get_worktree().stage(["a", "b"])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [
-                {"add": [os.fsencode("b")], "delete": [], "modify": [os.fsencode("a")]},
-                [],
-                [],
-            ],
-            status,
-        )
+
+        # Check staged changes using lower-level APIs
+        index = r.open_index()
+        staged = {"add": [], "delete": [], "modify": []}
+        try:
+            head_commit = r[b"HEAD"]
+            tree_id = head_commit.tree
+        except KeyError:
+            tree_id = None
+
+        for change in index.changes_from_tree(r.object_store, tree_id):
+            if not change[0][0]:
+                staged["add"].append(change[0][1])
+            elif not change[1][1]:
+                staged["delete"].append(change[0][1])
+            else:
+                staged["modify"].append(change[0][1])
+
+        self.assertEqual({"add": [b"b"], "delete": [], "modify": [b"a"]}, staged)
+
         r.get_worktree().reset_index()
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("b")]], status
-        )
+
+        # After reset, check that nothing is staged and b is untracked
+        index = r.open_index()
+        self.assertNotIn(b"b", index)
+        self.assertIn(b"a", index)
 
     @skipIf(
         sys.platform in ("win32", "darwin"),
@@ -2353,7 +2366,7 @@ class SharedRepositoryTests(TestCase):
         with open(test_file, "wb") as f:
             f.write(b"test content")
         # Stage the file
-        porcelain.add(repo, [test_file])
+        repo.get_worktree().stage(["test.txt"])
 
         # Check index file permissions
         index_path = repo.index_path()

+ 45 - 47
tests/test_worktree.py

@@ -27,8 +27,9 @@ import stat
 import tempfile
 from unittest import skipIf
 
-from dulwich import porcelain
 from dulwich.errors import CommitError
+from dulwich.index import get_unstaged_changes as _get_unstaged_changes
+from dulwich.objects import Commit
 from dulwich.object_store import tree_lookup_path
 from dulwich.repo import Repo
 from dulwich.worktree import (
@@ -47,6 +48,14 @@ from dulwich.worktree import (
 from . import TestCase
 
 
+def get_unstaged_changes(repo):
+    """Helper to get unstaged changes for a repo."""
+    index = repo.open_index()
+    normalizer = repo.get_blob_normalizer()
+    filter_callback = normalizer.checkin_normalize if normalizer else None
+    return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
+
+
 class WorkTreeTestCase(TestCase):
     """Base test case for WorkTree tests."""
 
@@ -166,9 +175,8 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
 
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
-        porcelain.commit(
-            self.repo,
+        self.worktree.stage(["new_dir/foo"])
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
@@ -176,15 +184,9 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         with open(full_path, "a") as f:
             f.write("something new")
         self.worktree.unstage(["new_dir/foo"])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [
-                {"add": [], "delete": [], "modify": []},
-                [os.fsencode(os.path.join("new_dir", "foo"))],
-                [],
-            ],
-            status,
-        )
+
+        unstaged = get_unstaged_changes(self.repo)
+        self.assertEqual([os.fsencode(os.path.join("new_dir", "foo"))], unstaged)
 
     def test_unstage_while_no_commit(self):
         """Test unstaging when there are no commits."""
@@ -192,31 +194,30 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         full_path = os.path.join(self.repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
+        self.worktree.stage([file])
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+
+        # Check that file is no longer in index
+        index = self.repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_add_file(self):
         """Test unstaging a newly added file."""
         file = "foo"
         full_path = os.path.join(self.repo.path, file)
-        porcelain.commit(
-            self.repo,
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
+        self.worktree.stage([file])
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+
+        # Check that file is no longer in index
+        index = self.repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_modify_file(self):
         """Test unstaging a modified file."""
@@ -224,22 +225,19 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         full_path = os.path.join(self.repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
-        porcelain.commit(
-            self.repo,
+        self.worktree.stage([file])
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "a") as f:
             f.write("broken")
-        porcelain.add(self.repo, paths=[full_path])
+        self.worktree.stage([file])
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
 
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+        unstaged = get_unstaged_changes(self.repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
     def test_unstage_remove_file(self):
         """Test unstaging a removed file."""
@@ -247,19 +245,17 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         full_path = os.path.join(self.repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
-        porcelain.commit(
-            self.repo,
+        self.worktree.stage([file])
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         os.remove(full_path)
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+
+        unstaged = get_unstaged_changes(self.repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
 
 class WorkTreeCommitTests(WorkTreeTestCase):
@@ -920,8 +916,9 @@ class TemporaryWorktreeTests(TestCase):
         readme_path = os.path.join(self.repo_path, "README.md")
         with open(readme_path, "w") as f:
             f.write("# Test Repository\n")
-        porcelain.add(self.repo, [readme_path])
-        porcelain.commit(self.repo, message=b"Initial commit")
+        wt = self.repo.get_worktree()
+        wt.stage(["README.md"])
+        wt.commit(message=b"Initial commit")
 
     def test_temporary_worktree_creates_and_cleans_up(self) -> None:
         """Test that temporary worktree is created and cleaned up."""
@@ -990,8 +987,9 @@ class TemporaryWorktreeTests(TestCase):
         with open(test_file, "w") as f:
             f.write("Hello, world!")
 
-        porcelain.add(self.repo, [test_file])
-        porcelain.commit(self.repo, message=b"Initial commit")
+        wt = self.repo.get_worktree()
+        wt.stage(["test.txt"])
+        wt.commit(message=b"Initial commit")
 
         with temporary_worktree(self.repo) as worktree:
             # Check that the file exists in the worktree
@@ -1007,6 +1005,6 @@ class TemporaryWorktreeTests(TestCase):
             with open(wt_test_file, "w") as f:
                 f.write("Modified content")
 
-            # Changes should be visible in status
-            status = porcelain.status(worktree)
-            self.assertIn(os.fsencode("test.txt"), status.unstaged)
+            # Changes should be visible as unstaged
+            unstaged = get_unstaged_changes(worktree)
+            self.assertIn(os.fsencode("test.txt"), unstaged)