Просмотр исходного кода

Ban porcelain/cli in lower level tests (#2060)

Jelmer Vernooij 1 неделя назад
Родитель
Сommit
953eb0cbdc

+ 1 - 6
dulwich/client.py

@@ -1478,12 +1478,7 @@ class GitClient:
 
     @staticmethod
     def _warn_filter_objects() -> None:
-        import warnings
-
-        warnings.warn(
-            "object filtering not recognized by server, ignoring",
-            UserWarning,
-        )
+        logging.warning("object filtering not recognized by server, ignoring")
 
 
 def check_wants(wants: Set[bytes], refs: Mapping[bytes, bytes]) -> None:

+ 7 - 6
dulwich/signature.py

@@ -1293,12 +1293,13 @@ class SSHCliSignatureVendor(SignatureSigner, SignatureVerifier):
                 args.extend(["-r", self.revocation_file])
 
             try:
-                subprocess.run(
-                    args,
-                    stdin=open(data_filename, "rb"),
-                    capture_output=True,
-                    check=True,
-                )
+                with open(data_filename, "rb") as data_file:
+                    subprocess.run(
+                        args,
+                        stdin=data_file,
+                        capture_output=True,
+                        check=True,
+                    )
             except subprocess.CalledProcessError as e:
                 raise BadSignature(
                     f"SSH signature verification failed: {e.stderr.decode('utf-8', errors='replace')}"

+ 12 - 3
tests/__init__.py

@@ -132,9 +132,6 @@ def self_test_suite() -> unittest.TestSuite:
         "bitmap",
         "blackbox",
         "bundle",
-        "cli",
-        "cli_cherry_pick",
-        "cli_merge",
         "client",
         "cloud_gcs",
         "commit_graph",
@@ -197,12 +194,24 @@ def self_test_suite() -> unittest.TestSuite:
         "worktree",
     ]
     module_names = ["tests.test_" + name for name in names]
+    cli_names = [
+        "cherry_pick",
+        "cli",
+        "merge",
+    ]
+    module_names += ["tests.cli.test_" + name for name in cli_names]
     porcelain_names = [
+        "annotate",
+        "bisect",
         "cherry_pick",
         "filters",
+        "ignore",
         "lfs",
+        "maintenance",
+        "mbox",
         "merge",
         "notes",
+        "rebase",
     ]
     module_names += ["tests.porcelain"] + [
         "tests.porcelain.test_" + name for name in porcelain_names

+ 0 - 0
tests/cli/__init__.py


+ 1 - 1
tests/test_cli_cherry_pick.py → tests/cli/test_cherry_pick.py

@@ -27,7 +27,7 @@ import tempfile
 from dulwich import porcelain
 from dulwich.cli import cmd_cherry_pick
 
-from . import TestCase
+from .. import TestCase
 
 
 class CherryPickCommandTests(TestCase):

+ 13 - 1
tests/test_cli.py → tests/cli/test_cli.py

@@ -24,6 +24,7 @@
 """Tests for dulwich.cli."""
 
 import io
+import logging
 import os
 import shutil
 import sys
@@ -47,7 +48,7 @@ from dulwich.tests.utils import (
     build_commit_graph,
 )
 
-from . import TestCase
+from .. import TestCase
 
 
 class DulwichCliTestCase(TestCase):
@@ -55,6 +56,17 @@ class DulwichCliTestCase(TestCase):
 
     def setUp(self) -> None:
         super().setUp()
+        # Suppress expected error logging during CLI tests
+        cli_logger = logging.getLogger("dulwich.cli")
+        original_cli_level = cli_logger.level
+        cli_logger.setLevel(logging.CRITICAL)
+        self.addCleanup(cli_logger.setLevel, original_cli_level)
+
+        root_logger = logging.getLogger()
+        original_root_level = root_logger.level
+        root_logger.setLevel(logging.CRITICAL)
+        self.addCleanup(root_logger.setLevel, original_root_level)
+
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
         self.repo_path = os.path.join(self.test_dir, "repo")

+ 1 - 1
tests/test_cli_merge.py → tests/cli/test_merge.py

@@ -29,7 +29,7 @@ import unittest
 from dulwich import porcelain
 from dulwich.cli import main
 
-from . import DependencyMissing, TestCase
+from .. import DependencyMissing, TestCase
 
 
 class CLIMergeTests(TestCase):

+ 22 - 15
tests/compat/test_partial_clone.py

@@ -390,14 +390,15 @@ class PartialCloneClientTestCase(CompatTestCase):
             # Get all refs
             return list(refs.values())
 
-        # Fetch with filter
-        result = client.fetch(
-            path,
-            dest_repo,
-            determine_wants=determine_wants,
-            progress=None,
-            filter_spec=b"blob:none",
-        )
+        # Fetch with filter (may warn if server doesn't support filtering)
+        with self.assertLogs(level="WARNING"):
+            result = client.fetch(
+                path,
+                dest_repo,
+                determine_wants=determine_wants,
+                progress=None,
+                filter_spec=b"blob:none",
+            )
 
         # The fetch should succeed with partial clone
         self.assertIsNotNone(result)
@@ -430,13 +431,14 @@ class PartialCloneClientTestCase(CompatTestCase):
 
         client, path = get_transport_and_path(f"git://localhost:{daemon_port}/")
 
-        # Clone with blob:limit filter
-        cloned_repo = client.clone(
-            path,
-            dest_path,
-            mkdir=False,
-            filter_spec=b"blob:limit=100",
-        )
+        # Clone with blob:limit filter (may warn if server doesn't support filtering)
+        with self.assertLogs(level="WARNING"):
+            cloned_repo = client.clone(
+                path,
+                dest_path,
+                mkdir=False,
+                filter_spec=b"blob:limit=100",
+            )
         self.addCleanup(cloned_repo.close)
 
         # Verify clone succeeded
@@ -483,6 +485,11 @@ class PartialCloneClientTestCase(CompatTestCase):
         def cleanup_daemon():
             daemon_process.terminate()
             daemon_process.wait(timeout=2)
+            # Close pipes to avoid ResourceWarning
+            if daemon_process.stdout:
+                daemon_process.stdout.close()
+            if daemon_process.stderr:
+                daemon_process.stderr.close()
 
         self.addCleanup(cleanup_daemon)
 

+ 220 - 0
tests/porcelain/test_annotate.py

@@ -0,0 +1,220 @@
+# test_annotate.py -- tests for porcelain annotate
+# Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain annotate and blame functions."""
+
+import os
+import tempfile
+from unittest import TestCase
+
+from dulwich.objects import Blob, Commit, Tree
+from dulwich.porcelain import annotate, blame
+from dulwich.repo import Repo
+
+
+class PorcelainAnnotateTestCase(TestCase):
+    """Tests for the porcelain annotate function."""
+
+    def setUp(self) -> None:
+        self.temp_dir = tempfile.mkdtemp()
+        self.repo = Repo.init(self.temp_dir)
+
+    def tearDown(self) -> None:
+        self.repo.close()
+        import shutil
+
+        shutil.rmtree(self.temp_dir)
+
+    def _make_commit_with_file(
+        self,
+        filename: str,
+        content: bytes,
+        message: str,
+        parent: bytes | None = None,
+    ) -> bytes:
+        """Helper to create a commit with a file."""
+        # Create blob
+        blob = Blob()
+        blob.data = content
+        self.repo.object_store.add_object(blob)
+
+        # Create tree
+        tree = Tree()
+        tree.add(filename.encode(), 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        # Create commit
+        commit = Commit()
+        commit.tree = tree.id
+        commit.author = commit.committer = b"Test Author <test@example.com>"
+        commit.author_time = commit.commit_time = 1000000000
+        commit.author_timezone = commit.commit_timezone = 0
+        commit.encoding = b"UTF-8"
+        commit.message = message.encode("utf-8")
+
+        if parent:
+            commit.parents = [parent]
+        else:
+            commit.parents = []
+
+        self.repo.object_store.add_object(commit)
+
+        # Update HEAD
+        self.repo.refs[b"HEAD"] = commit.id
+
+        return commit.id
+
+    def test_porcelain_annotate(self) -> None:
+        """Test the porcelain annotate function."""
+        # Create commits
+        commit1_id = self._make_commit_with_file(
+            "file.txt", b"line1\nline2\n", "Initial commit"
+        )
+        self._make_commit_with_file(
+            "file.txt", b"line1\nline2\nline3\n", "Add third line", parent=commit1_id
+        )
+
+        # Test annotate
+        result = list(annotate(self.temp_dir, "file.txt"))
+
+        self.assertEqual(3, len(result))
+        # Check that each result has the right structure
+        for (commit, entry), line in result:
+            self.assertIsNotNone(commit)
+            self.assertIsNotNone(entry)
+            self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
+
+    def test_porcelain_annotate_with_committish(self) -> None:
+        """Test porcelain annotate with specific commit."""
+        # Create commits
+        commit1_id = self._make_commit_with_file(
+            "file.txt", b"original\n", "Initial commit"
+        )
+        self._make_commit_with_file(
+            "file.txt", b"modified\n", "Modify file", parent=commit1_id
+        )
+
+        # Annotate at first commit
+        result = list(
+            annotate(self.temp_dir, "file.txt", committish=commit1_id.decode())
+        )
+        self.assertEqual(1, len(result))
+        self.assertEqual(b"original\n", result[0][1])
+
+        # Annotate at HEAD (second commit)
+        result = list(annotate(self.temp_dir, "file.txt"))
+        self.assertEqual(1, len(result))
+        self.assertEqual(b"modified\n", result[0][1])
+
+    def test_blame_alias(self) -> None:
+        """Test that blame is an alias for annotate."""
+        self.assertIs(blame, annotate)
+
+
+class IntegrationTestCase(TestCase):
+    """Integration tests with more complex scenarios."""
+
+    def setUp(self) -> None:
+        self.temp_dir = tempfile.mkdtemp()
+        self.repo = Repo.init(self.temp_dir)
+
+    def tearDown(self) -> None:
+        self.repo.close()
+        import shutil
+
+        shutil.rmtree(self.temp_dir)
+
+    def _create_file_commit(
+        self,
+        filename: str,
+        content: bytes,
+        message: str,
+        parent: bytes | None = None,
+    ) -> bytes:
+        """Helper to create a commit with file content."""
+        # Write file to working directory
+        filepath = os.path.join(self.temp_dir, filename)
+        with open(filepath, "wb") as f:
+            f.write(content)
+
+        # Stage file
+        self.repo.get_worktree().stage([filename.encode()])
+
+        # Create commit
+        commit_id = self.repo.get_worktree().commit(
+            message=message.encode(),
+            committer=b"Test Committer <test@example.com>",
+            author=b"Test Author <test@example.com>",
+            commit_timestamp=1000000000,
+            commit_timezone=0,
+            author_timestamp=1000000000,
+            author_timezone=0,
+        )
+
+        return commit_id
+
+    def test_complex_file_history(self) -> None:
+        """Test annotating a file with complex history."""
+        # Initial commit with 3 lines
+        self._create_file_commit(
+            "complex.txt", b"First line\nSecond line\nThird line\n", "Initial commit"
+        )
+
+        # Add lines at the beginning and end
+        self._create_file_commit(
+            "complex.txt",
+            b"New first line\nFirst line\nSecond line\nThird line\nNew last line\n",
+            "Add lines at beginning and end",
+        )
+
+        # Modify middle line
+        self._create_file_commit(
+            "complex.txt",
+            b"New first line\nFirst line\n"
+            b"Modified second line\nThird line\n"
+            b"New last line\n",
+            "Modify middle line",
+        )
+
+        # Delete a line
+        self._create_file_commit(
+            "complex.txt",
+            b"New first line\nFirst line\nModified second line\nNew last line\n",
+            "Delete third line",
+        )
+
+        # Run annotate
+        result = list(annotate(self.temp_dir, "complex.txt"))
+
+        # Should have 4 lines
+        self.assertEqual(4, len(result))
+
+        # Verify each line comes from the correct commit
+        lines = [line for (commit, entry), line in result]
+        self.assertEqual(
+            [
+                b"New first line\n",
+                b"First line\n",
+                b"Modified second line\n",
+                b"New last line\n",
+            ],
+            lines,
+        )

+ 127 - 0
tests/porcelain/test_bisect.py

@@ -0,0 +1,127 @@
+# test_bisect.py -- tests for porcelain bisect
+# Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain bisect functions."""
+
+import os
+import shutil
+import tempfile
+
+from dulwich import porcelain
+from dulwich.objects import Tree
+from dulwich.tests.utils import make_commit
+
+from .. import TestCase
+
+
+class BisectPorcelainTests(TestCase):
+    """Tests for porcelain bisect functions."""
+
+    def setUp(self) -> None:
+        self.test_dir = tempfile.mkdtemp()
+        self.repo = porcelain.init(self.test_dir)
+
+        # Create tree objects
+        tree = Tree()
+        self.repo.object_store.add_object(tree)
+
+        # Create some commits with proper trees
+        self.c1 = make_commit(id=b"1" * 40, message=b"initial commit", tree=tree.id)
+        self.c2 = make_commit(
+            id=b"2" * 40, message=b"second commit", parents=[b"1" * 40], tree=tree.id
+        )
+        self.c3 = make_commit(
+            id=b"3" * 40, message=b"third commit", parents=[b"2" * 40], tree=tree.id
+        )
+        self.c4 = make_commit(
+            id=b"4" * 40, message=b"fourth commit", parents=[b"3" * 40], tree=tree.id
+        )
+
+        # Add commits to object store
+        for commit in [self.c1, self.c2, self.c3, self.c4]:
+            self.repo.object_store.add_object(commit)
+
+        # Set HEAD to latest commit
+        self.repo.refs[b"HEAD"] = self.c4.id
+        self.repo.refs[b"refs/heads/master"] = self.c4.id
+
+    def tearDown(self) -> None:
+        shutil.rmtree(self.test_dir)
+
+    def test_bisect_start(self) -> None:
+        """Test bisect_start porcelain function."""
+        porcelain.bisect_start(self.test_dir)
+
+        # Check that bisect state files exist
+        self.assertTrue(
+            os.path.exists(os.path.join(self.repo.controldir(), "BISECT_START"))
+        )
+
+    def test_bisect_bad_good(self) -> None:
+        """Test marking commits as bad and good."""
+        porcelain.bisect_start(self.test_dir)
+        porcelain.bisect_bad(self.test_dir, self.c4.id.decode("ascii"))
+        porcelain.bisect_good(self.test_dir, self.c1.id.decode("ascii"))
+
+        # Check that refs were created
+        self.assertTrue(
+            os.path.exists(
+                os.path.join(self.repo.controldir(), "refs", "bisect", "bad")
+            )
+        )
+        self.assertTrue(
+            os.path.exists(
+                os.path.join(
+                    self.repo.controldir(),
+                    "refs",
+                    "bisect",
+                    f"good-{self.c1.id.decode('ascii')}",
+                )
+            )
+        )
+
+    def test_bisect_log(self) -> None:
+        """Test getting bisect log."""
+        porcelain.bisect_start(self.test_dir)
+        porcelain.bisect_bad(self.test_dir, self.c4.id.decode("ascii"))
+        porcelain.bisect_good(self.test_dir, self.c1.id.decode("ascii"))
+
+        log = porcelain.bisect_log(self.test_dir)
+
+        self.assertIn("git bisect start", log)
+        self.assertIn("git bisect bad", log)
+        self.assertIn("git bisect good", log)
+
+    def test_bisect_reset(self) -> None:
+        """Test resetting bisect state."""
+        porcelain.bisect_start(self.test_dir)
+        porcelain.bisect_bad(self.test_dir)
+        porcelain.bisect_good(self.test_dir, self.c1.id.decode("ascii"))
+
+        porcelain.bisect_reset(self.test_dir)
+
+        # Check that bisect state files are removed
+        self.assertFalse(
+            os.path.exists(os.path.join(self.repo.controldir(), "BISECT_START"))
+        )
+        self.assertFalse(
+            os.path.exists(os.path.join(self.repo.controldir(), "refs", "bisect"))
+        )

+ 155 - 0
tests/porcelain/test_ignore.py

@@ -0,0 +1,155 @@
+# test_ignore.py -- tests for porcelain ignore (check_ignore)
+# Copyright (C) 2017 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain ignore (check_ignore) functions."""
+
+import os
+import shutil
+import tempfile
+
+from dulwich.porcelain import _quote_path, check_ignore
+from dulwich.repo import Repo
+
+from .. import TestCase
+
+
+class CheckIgnoreQuotePathTests(TestCase):
+    """Integration tests for check_ignore with quote_path parameter."""
+
+    def setUp(self) -> None:
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.test_dir)
+
+    def test_quote_path_true_unicode_filenames(self) -> None:
+        """Test that quote_path=True returns quoted unicode filenames."""
+        # Create a repository
+        repo = Repo.init(self.test_dir)
+        self.addCleanup(repo.close)
+
+        # Create .gitignore with unicode patterns
+        gitignore_path = os.path.join(self.test_dir, ".gitignore")
+        with open(gitignore_path, "w", encoding="utf-8") as f:
+            f.write("тест*\n")
+            f.write("*.测试\n")
+
+        # Create unicode files
+        test_files = ["тест.txt", "файл.测试", "normal.txt"]
+        for filename in test_files:
+            filepath = os.path.join(self.test_dir, filename)
+            with open(filepath, "w", encoding="utf-8") as f:
+                f.write("test content")
+
+        # Test with quote_path=True (default)
+        abs_paths = [os.path.join(self.test_dir, f) for f in test_files]
+        ignored_quoted = set(check_ignore(self.test_dir, abs_paths, quote_path=True))
+
+        # Test with quote_path=False
+        ignored_unquoted = set(check_ignore(self.test_dir, abs_paths, quote_path=False))
+
+        # Verify quoted results
+        expected_quoted = {
+            '"\\321\\202\\320\\265\\321\\201\\321\\202.txt"',  # тест.txt
+            '"\\321\\204\\320\\260\\320\\271\\320\\273.\\346\\265\\213\\350\\257\\225"',  # файл.测试
+        }
+        self.assertEqual(ignored_quoted, expected_quoted)
+
+        # Verify unquoted results
+        expected_unquoted = {"тест.txt", "файл.测试"}
+        self.assertEqual(ignored_unquoted, expected_unquoted)
+
+    def test_quote_path_ascii_filenames(self) -> None:
+        """Test that ASCII filenames are unaffected by quote_path setting."""
+        # Create a repository
+        repo = Repo.init(self.test_dir)
+        self.addCleanup(repo.close)
+
+        # Create .gitignore
+        gitignore_path = os.path.join(self.test_dir, ".gitignore")
+        with open(gitignore_path, "w") as f:
+            f.write("*.tmp\n")
+            f.write("test*\n")
+
+        # Create ASCII files
+        test_files = ["test.txt", "file.tmp", "normal.txt"]
+        for filename in test_files:
+            filepath = os.path.join(self.test_dir, filename)
+            with open(filepath, "w") as f:
+                f.write("test content")
+
+        # Test both settings
+        abs_paths = [os.path.join(self.test_dir, f) for f in test_files]
+        ignored_quoted = set(check_ignore(self.test_dir, abs_paths, quote_path=True))
+        ignored_unquoted = set(check_ignore(self.test_dir, abs_paths, quote_path=False))
+
+        # Both should return the same results for ASCII filenames
+        expected = {"test.txt", "file.tmp"}
+        self.assertEqual(ignored_quoted, expected)
+        self.assertEqual(ignored_unquoted, expected)
+
+
+class QuotePathTests(TestCase):
+    """Tests for _quote_path function."""
+
+    def test_ascii_paths(self) -> None:
+        """Test that ASCII paths are not quoted."""
+        self.assertEqual(_quote_path("file.txt"), "file.txt")
+        self.assertEqual(_quote_path("dir/file.txt"), "dir/file.txt")
+        self.assertEqual(_quote_path("path with spaces.txt"), "path with spaces.txt")
+
+    def test_unicode_paths(self) -> None:
+        """Test that unicode paths are quoted with C-style escapes."""
+        # Russian characters
+        self.assertEqual(
+            _quote_path("тест.txt"), '"\\321\\202\\320\\265\\321\\201\\321\\202.txt"'
+        )
+        # Chinese characters
+        self.assertEqual(
+            _quote_path("файл.测试"),
+            '"\\321\\204\\320\\260\\320\\271\\320\\273.\\346\\265\\213\\350\\257\\225"',
+        )
+        # Mixed ASCII and unicode
+        self.assertEqual(
+            _quote_path("test-тест.txt"),
+            '"test-\\321\\202\\320\\265\\321\\201\\321\\202.txt"',
+        )
+
+    def test_special_characters(self) -> None:
+        """Test that special characters are properly escaped."""
+        # Quotes in filename
+        self.assertEqual(
+            _quote_path('file"with"quotes.txt'), '"file\\"with\\"quotes.txt"'
+        )
+        # Backslashes in filename
+        self.assertEqual(
+            _quote_path("file\\with\\backslashes.txt"),
+            '"file\\\\with\\\\backslashes.txt"',
+        )
+        # Mixed special chars and unicode
+        self.assertEqual(
+            _quote_path('тест"файл.txt'),
+            '"\\321\\202\\320\\265\\321\\201\\321\\202\\"\\321\\204\\320\\260\\320\\271\\320\\273.txt"',
+        )
+
+    def test_empty_and_edge_cases(self) -> None:
+        """Test edge cases."""
+        self.assertEqual(_quote_path(""), "")
+        self.assertEqual(_quote_path("a"), "a")  # Single ASCII char
+        self.assertEqual(_quote_path("я"), '"\\321\\217"')  # Single unicode char

+ 3 - 2
tests/porcelain/test_lfs.py

@@ -276,8 +276,9 @@ class LFSPorcelainTestCase(TestCase):
         with self.assertRaises(KeyError):
             config.get((b"filter", b"lfs"), b"smudge")
 
-        # Clone the repository
-        cloned_repo = porcelain.clone(source_dir, clone_dir)
+        # Clone the repository (may warn about missing LFS objects)
+        with self.assertLogs("dulwich.lfs", level="WARNING"):
+            cloned_repo = porcelain.clone(source_dir, clone_dir)
 
         # Verify that built-in LFS filter was used
         normalizer = cloned_repo.get_blob_normalizer()

+ 165 - 0
tests/porcelain/test_maintenance.py

@@ -0,0 +1,165 @@
+# test_maintenance.py -- tests for porcelain maintenance
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain maintenance functions."""
+
+import tempfile
+
+from dulwich import porcelain
+from dulwich.objects import Blob
+from dulwich.repo import Repo
+
+from .. import TestCase
+
+
+class PorcelainMaintenanceTestCase(TestCase):
+    """Base class for porcelain maintenance tests."""
+
+    def setUp(self):
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(self._cleanup_test_dir)
+        self.repo = Repo.init(self.test_dir)
+        self.addCleanup(self.repo.close)
+
+    def _cleanup_test_dir(self):
+        import shutil
+
+        shutil.rmtree(self.test_dir)
+
+    def _create_commit(self):
+        """Helper to create a test commit."""
+        blob = Blob.from_string(b"test content\n")
+        self.repo.object_store.add_object(blob)
+
+        from dulwich.objects import Commit, Tree
+
+        tree = Tree()
+        tree.add(b"testfile", 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        commit = Commit()
+        commit.tree = tree.id
+        commit.author = commit.committer = b"Test <test@example.com>"
+        commit.author_time = commit.commit_time = 1000000000
+        commit.author_timezone = commit.commit_timezone = 0
+        commit.encoding = b"UTF-8"
+        commit.message = b"Test commit"
+        commit.parents = []
+        self.repo.object_store.add_object(commit)
+        self.repo.refs[b"refs/heads/master"] = commit.id
+        return commit.id
+
+
+class PorcelainMaintenanceTest(PorcelainMaintenanceTestCase):
+    """Tests for porcelain.maintenance_run function."""
+
+    def test_maintenance_run(self):
+        """Test porcelain maintenance_run function."""
+        self._create_commit()
+        result = porcelain.maintenance_run(self.test_dir)
+        self.assertIn("gc", result.tasks_succeeded)
+        self.assertIn("commit-graph", result.tasks_succeeded)
+
+    def test_maintenance_run_with_tasks(self):
+        """Test porcelain maintenance_run with specific tasks."""
+        result = porcelain.maintenance_run(self.test_dir, tasks=["pack-refs"])
+        self.assertEqual(result.tasks_run, ["pack-refs"])
+        self.assertEqual(result.tasks_succeeded, ["pack-refs"])
+
+
+class MaintenanceRegisterTest(PorcelainMaintenanceTestCase):
+    """Tests for maintenance register/unregister."""
+
+    def setUp(self):
+        super().setUp()
+        # Set up a temporary HOME for testing global config
+        self.temp_home = tempfile.mkdtemp()
+        self.addCleanup(self._cleanup_temp_home)
+        self.overrideEnv("HOME", self.temp_home)
+
+    def _cleanup_temp_home(self):
+        import shutil
+
+        shutil.rmtree(self.temp_home)
+
+    def test_register_repository(self):
+        """Test registering a repository for maintenance."""
+        porcelain.maintenance_register(self.test_dir)
+
+        # Verify repository was added to global config
+        import os
+
+        from dulwich.config import ConfigFile
+
+        global_config_path = os.path.expanduser("~/.gitconfig")
+        global_config = ConfigFile.from_path(global_config_path)
+
+        repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
+        self.assertIn(self.test_dir.encode(), repos)
+
+        # Verify strategy was set
+        strategy = global_config.get((b"maintenance",), b"strategy")
+        self.assertEqual(strategy, b"incremental")
+
+        # Verify auto maintenance was disabled in repo
+        repo_config = self.repo.get_config()
+        auto = repo_config.get_boolean((b"maintenance",), b"auto")
+        self.assertFalse(auto)
+
+    def test_register_already_registered(self):
+        """Test registering an already registered repository."""
+        porcelain.maintenance_register(self.test_dir)
+        # Should not error when registering again
+        porcelain.maintenance_register(self.test_dir)
+
+    def test_unregister_repository(self):
+        """Test unregistering a repository."""
+        # First register
+        porcelain.maintenance_register(self.test_dir)
+
+        # Then unregister
+        porcelain.maintenance_unregister(self.test_dir)
+
+        # Verify repository was removed from global config
+        import os
+
+        from dulwich.config import ConfigFile
+
+        global_config_path = os.path.expanduser("~/.gitconfig")
+        global_config = ConfigFile.from_path(global_config_path)
+
+        try:
+            repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
+            self.assertNotIn(self.test_dir.encode(), repos)
+        except KeyError:
+            # No repos registered, which is fine
+            pass
+
+    def test_unregister_not_registered(self):
+        """Test unregistering a repository that is not registered."""
+        with self.assertRaises(ValueError):
+            porcelain.maintenance_unregister(self.test_dir)
+
+    def test_unregister_not_registered_force(self):
+        """Test unregistering with force flag."""
+        # Should not error with force=True
+        porcelain.maintenance_unregister(self.test_dir, force=True)

+ 159 - 0
tests/porcelain/test_mbox.py

@@ -0,0 +1,159 @@
+# test_mbox.py -- tests for porcelain mbox (mailsplit)
+# Copyright (C) 2020 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain mbox (mailsplit) functions."""
+
+import mailbox
+import os
+import tempfile
+
+from dulwich import porcelain
+
+from .. import TestCase
+
+
+class PorcelainMailsplitTests(TestCase):
+    """Tests for porcelain.mailsplit function."""
+
+    def test_mailsplit_mbox(self) -> None:
+        """Test porcelain mailsplit with mbox file."""
+        mbox_content = b"""\
+From alice@example.com Mon Jan 01 00:00:00 2025
+From: Alice <alice@example.com>
+Subject: Test
+
+Test message.
+"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            mbox_path = os.path.join(tmpdir, "test.mbox")
+            with open(mbox_path, "wb") as f:
+                f.write(mbox_content)
+
+            output_dir = os.path.join(tmpdir, "output")
+            os.makedirs(output_dir)
+
+            # Split using porcelain function
+            output_files = porcelain.mailsplit(
+                input_path=mbox_path, output_dir=output_dir
+            )
+
+            self.assertEqual(len(output_files), 1)
+            self.assertEqual(output_files[0], os.path.join(output_dir, "0001"))
+
+    def test_mailsplit_maildir(self) -> None:
+        """Test porcelain mailsplit with Maildir."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Create a Maildir
+            maildir_path = os.path.join(tmpdir, "maildir")
+            md = mailbox.Maildir(maildir_path)
+
+            msg = mailbox.MaildirMessage()
+            msg.set_payload(b"Test message")
+            msg["From"] = "test@example.com"
+            md.add(msg)
+
+            output_dir = os.path.join(tmpdir, "output")
+            os.makedirs(output_dir)
+
+            # Split using porcelain function with is_maildir=True
+            output_files = porcelain.mailsplit(
+                input_path=maildir_path, output_dir=output_dir, is_maildir=True
+            )
+
+            self.assertEqual(len(output_files), 1)
+            self.assertTrue(os.path.exists(output_files[0]))
+
+    def test_mailsplit_with_options(self) -> None:
+        """Test porcelain mailsplit with various options."""
+        mbox_content = b"""\
+From test@example.com Mon Jan 01 00:00:00 2025
+From: Test <test@example.com>
+Subject: Test
+
+Test message.
+"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            mbox_path = os.path.join(tmpdir, "test.mbox")
+            with open(mbox_path, "wb") as f:
+                f.write(mbox_content)
+
+            output_dir = os.path.join(tmpdir, "output")
+            os.makedirs(output_dir)
+
+            # Split with custom options
+            output_files = porcelain.mailsplit(
+                input_path=mbox_path,
+                output_dir=output_dir,
+                start_number=5,
+                precision=3,
+                keep_cr=True,
+            )
+
+            self.assertEqual(len(output_files), 1)
+            self.assertEqual(output_files[0], os.path.join(output_dir, "005"))
+
+    def test_mailsplit_mboxrd(self) -> None:
+        """Test porcelain mailsplit with mboxrd format."""
+        mbox_content = b"""\
+From test@example.com Mon Jan 01 00:00:00 2025
+From: Test <test@example.com>
+Subject: Test
+
+>From quoted text
+"""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            mbox_path = os.path.join(tmpdir, "test.mbox")
+            with open(mbox_path, "wb") as f:
+                f.write(mbox_content)
+
+            output_dir = os.path.join(tmpdir, "output")
+            os.makedirs(output_dir)
+
+            # Split with mboxrd=True
+            output_files = porcelain.mailsplit(
+                input_path=mbox_path, output_dir=output_dir, mboxrd=True
+            )
+
+            self.assertEqual(len(output_files), 1)
+
+            # Verify >From escaping was reversed
+            with open(output_files[0], "rb") as f:
+                content = f.read()
+                expected = b"""\
+From: Test <test@example.com>
+Subject: Test
+
+From quoted text
+"""
+                self.assertEqual(content, expected)
+
+    def test_mailsplit_maildir_requires_path(self) -> None:
+        """Test that mailsplit raises ValueError when is_maildir=True but no input_path."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            output_dir = os.path.join(tmpdir, "output")
+            os.makedirs(output_dir)
+
+            with self.assertRaises(ValueError) as cm:
+                porcelain.mailsplit(
+                    input_path=None, output_dir=output_dir, is_maildir=True
+                )
+
+            self.assertIn("required", str(cm.exception).lower())

+ 112 - 0
tests/porcelain/test_rebase.py

@@ -0,0 +1,112 @@
+# test_rebase.py -- tests for porcelain rebase
+# Copyright (C) 2025 Dulwich contributors
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain rebase functions."""
+
+import os
+import tempfile
+
+from dulwich import porcelain
+from dulwich.repo import Repo
+
+from .. import TestCase
+
+
+class RebasePorcelainTestCase(TestCase):
+    """Tests for the porcelain rebase function."""
+
+    def setUp(self):
+        """Set up test repository."""
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.repo = Repo.init(self.test_dir)
+
+        # Create initial commit
+        with open(os.path.join(self.test_dir, "README.md"), "wb") as f:
+            f.write(b"# Test Repository\n")
+
+        self.repo.get_worktree().stage(["README.md"])
+        self.initial_commit = self.repo.get_worktree().commit(
+            message=b"Initial commit",
+            committer=b"Test User <test@example.com>",
+            author=b"Test User <test@example.com>",
+        )
+
+    def tearDown(self):
+        """Clean up test directory."""
+        import shutil
+
+        shutil.rmtree(self.test_dir)
+
+    def test_porcelain_rebase(self):
+        """Test rebase through porcelain interface."""
+        # Create and checkout feature branch
+        self.repo.refs[b"refs/heads/feature"] = self.initial_commit
+        porcelain.checkout(self.repo, "feature")
+
+        # Add commit to feature branch
+        with open(os.path.join(self.test_dir, "feature.txt"), "wb") as f:
+            f.write(b"Feature file\n")
+
+        porcelain.add(self.repo, ["feature.txt"])
+        porcelain.commit(
+            self.repo,
+            message="Add feature",
+            author="Test User <test@example.com>",
+            committer="Test User <test@example.com>",
+        )
+
+        # Switch to main and add different commit
+        porcelain.checkout(self.repo, "master")
+
+        with open(os.path.join(self.test_dir, "main.txt"), "wb") as f:
+            f.write(b"Main file\n")
+
+        porcelain.add(self.repo, ["main.txt"])
+        porcelain.commit(
+            self.repo,
+            message="Main update",
+            author="Test User <test@example.com>",
+            committer="Test User <test@example.com>",
+        )
+
+        # Switch back to feature and rebase
+        porcelain.checkout(self.repo, "feature")
+
+        # Perform rebase
+        new_shas = porcelain.rebase(self.repo, "master")
+
+        # Should have rebased one commit
+        self.assertEqual(len(new_shas), 1)
+
+        # Check that the rebased commit has the correct parent and tree
+        feature_head = self.repo.refs[b"refs/heads/feature"]
+        feature_commit_obj = self.repo[feature_head]
+
+        # Should have master as parent
+        master_head = self.repo.refs[b"refs/heads/master"]
+        self.assertEqual(feature_commit_obj.parents, [master_head])
+
+        # Tree should have both files
+        tree = self.repo[feature_commit_obj.tree]
+        self.assertIn(b"feature.txt", tree)
+        self.assertIn(b"main.txt", tree)
+        self.assertIn(b"README.md", tree)

+ 0 - 197
tests/test_annotate.py

@@ -18,15 +18,12 @@
 
 """Tests for annotate support."""
 
-import os
 import tempfile
-import unittest
 from typing import Any
 from unittest import TestCase
 
 from dulwich.annotate import annotate_lines, update_lines
 from dulwich.objects import Blob, Commit, Tree
-from dulwich.porcelain import annotate, blame
 from dulwich.repo import Repo
 
 
@@ -211,197 +208,3 @@ class AnnotateLinesTestCase(TestCase):
 
         result = annotate_lines(self.repo.object_store, commit_id, b"nonexistent.txt")
         self.assertEqual([], result)
-
-
-class PorcelainAnnotateTestCase(TestCase):
-    """Tests for the porcelain annotate function."""
-
-    def setUp(self) -> None:
-        self.temp_dir = tempfile.mkdtemp()
-        self.repo = Repo.init(self.temp_dir)
-
-    def tearDown(self) -> None:
-        self.repo.close()
-        import shutil
-
-        shutil.rmtree(self.temp_dir)
-
-    def _make_commit_with_file(
-        self,
-        filename: str,
-        content: bytes,
-        message: str,
-        parent: bytes | None = None,
-    ) -> bytes:
-        """Helper to create a commit with a file."""
-        # Create blob
-        blob = Blob()
-        blob.data = content
-        self.repo.object_store.add_object(blob)
-
-        # Create tree
-        tree = Tree()
-        tree.add(filename.encode(), 0o100644, blob.id)
-        self.repo.object_store.add_object(tree)
-
-        # Create commit
-        commit = Commit()
-        commit.tree = tree.id
-        commit.author = commit.committer = b"Test Author <test@example.com>"
-        commit.author_time = commit.commit_time = 1000000000
-        commit.author_timezone = commit.commit_timezone = 0
-        commit.encoding = b"UTF-8"
-        commit.message = message.encode("utf-8")
-
-        if parent:
-            commit.parents = [parent]
-        else:
-            commit.parents = []
-
-        self.repo.object_store.add_object(commit)
-
-        # Update HEAD
-        self.repo.refs[b"HEAD"] = commit.id
-
-        return commit.id
-
-    def test_porcelain_annotate(self) -> None:
-        """Test the porcelain annotate function."""
-        # Create commits
-        commit1_id = self._make_commit_with_file(
-            "file.txt", b"line1\nline2\n", "Initial commit"
-        )
-        self._make_commit_with_file(
-            "file.txt", b"line1\nline2\nline3\n", "Add third line", parent=commit1_id
-        )
-
-        # Test annotate
-        result = list(annotate(self.temp_dir, "file.txt"))
-
-        self.assertEqual(3, len(result))
-        # Check that each result has the right structure
-        for (commit, entry), line in result:
-            self.assertIsNotNone(commit)
-            self.assertIsNotNone(entry)
-            self.assertIn(line, [b"line1\n", b"line2\n", b"line3\n"])
-
-    def test_porcelain_annotate_with_committish(self) -> None:
-        """Test porcelain annotate with specific commit."""
-        # Create commits
-        commit1_id = self._make_commit_with_file(
-            "file.txt", b"original\n", "Initial commit"
-        )
-        self._make_commit_with_file(
-            "file.txt", b"modified\n", "Modify file", parent=commit1_id
-        )
-
-        # Annotate at first commit
-        result = list(
-            annotate(self.temp_dir, "file.txt", committish=commit1_id.decode())
-        )
-        self.assertEqual(1, len(result))
-        self.assertEqual(b"original\n", result[0][1])
-
-        # Annotate at HEAD (second commit)
-        result = list(annotate(self.temp_dir, "file.txt"))
-        self.assertEqual(1, len(result))
-        self.assertEqual(b"modified\n", result[0][1])
-
-    def test_blame_alias(self) -> None:
-        """Test that blame is an alias for annotate."""
-        self.assertIs(blame, annotate)
-
-
-class IntegrationTestCase(TestCase):
-    """Integration tests with more complex scenarios."""
-
-    def setUp(self) -> None:
-        self.temp_dir = tempfile.mkdtemp()
-        self.repo = Repo.init(self.temp_dir)
-
-    def tearDown(self) -> None:
-        self.repo.close()
-        import shutil
-
-        shutil.rmtree(self.temp_dir)
-
-    def _create_file_commit(
-        self,
-        filename: str,
-        content: bytes,
-        message: str,
-        parent: bytes | None = None,
-    ) -> bytes:
-        """Helper to create a commit with file content."""
-        # Write file to working directory
-        filepath = os.path.join(self.temp_dir, filename)
-        with open(filepath, "wb") as f:
-            f.write(content)
-
-        # Stage file
-        self.repo.get_worktree().stage([filename.encode()])
-
-        # Create commit
-        commit_id = self.repo.get_worktree().commit(
-            message=message.encode(),
-            committer=b"Test Committer <test@example.com>",
-            author=b"Test Author <test@example.com>",
-            commit_timestamp=1000000000,
-            commit_timezone=0,
-            author_timestamp=1000000000,
-            author_timezone=0,
-        )
-
-        return commit_id
-
-    def test_complex_file_history(self) -> None:
-        """Test annotating a file with complex history."""
-        # Initial commit with 3 lines
-        self._create_file_commit(
-            "complex.txt", b"First line\nSecond line\nThird line\n", "Initial commit"
-        )
-
-        # Add lines at the beginning and end
-        self._create_file_commit(
-            "complex.txt",
-            b"New first line\nFirst line\nSecond line\nThird line\nNew last line\n",
-            "Add lines at beginning and end",
-        )
-
-        # Modify middle line
-        self._create_file_commit(
-            "complex.txt",
-            b"New first line\nFirst line\n"
-            b"Modified second line\nThird line\n"
-            b"New last line\n",
-            "Modify middle line",
-        )
-
-        # Delete a line
-        self._create_file_commit(
-            "complex.txt",
-            b"New first line\nFirst line\nModified second line\nNew last line\n",
-            "Delete third line",
-        )
-
-        # Run annotate
-        result = list(annotate(self.temp_dir, "complex.txt"))
-
-        # Should have 4 lines
-        self.assertEqual(4, len(result))
-
-        # Verify each line comes from the correct commit
-        lines = [line for (commit, entry), line in result]
-        self.assertEqual(
-            [
-                b"New first line\n",
-                b"First line\n",
-                b"Modified second line\n",
-                b"New last line\n",
-            ],
-            lines,
-        )
-
-
-if __name__ == "__main__":
-    unittest.main()

+ 2 - 98
tests/test_bisect.py

@@ -25,9 +25,8 @@ import os
 import shutil
 import tempfile
 
-from dulwich import porcelain
 from dulwich.bisect import BisectState
-from dulwich.objects import Tree
+from dulwich.repo import Repo
 from dulwich.tests.utils import make_commit
 
 from . import TestCase
@@ -38,7 +37,7 @@ class BisectStateTests(TestCase):
 
     def setUp(self) -> None:
         self.test_dir = tempfile.mkdtemp()
-        self.repo = porcelain.init(self.test_dir)
+        self.repo = Repo.init(self.test_dir)
 
     def tearDown(self) -> None:
         shutil.rmtree(self.test_dir)
@@ -158,98 +157,3 @@ class BisectStateTests(TestCase):
         # Reset
         state.reset()
         self.assertFalse(state.is_active)
-
-
-class BisectPorcelainTests(TestCase):
-    """Tests for porcelain bisect functions."""
-
-    def setUp(self) -> None:
-        self.test_dir = tempfile.mkdtemp()
-        self.repo = porcelain.init(self.test_dir)
-
-        # Create tree objects
-        tree = Tree()
-        self.repo.object_store.add_object(tree)
-
-        # Create some commits with proper trees
-        self.c1 = make_commit(id=b"1" * 40, message=b"initial commit", tree=tree.id)
-        self.c2 = make_commit(
-            id=b"2" * 40, message=b"second commit", parents=[b"1" * 40], tree=tree.id
-        )
-        self.c3 = make_commit(
-            id=b"3" * 40, message=b"third commit", parents=[b"2" * 40], tree=tree.id
-        )
-        self.c4 = make_commit(
-            id=b"4" * 40, message=b"fourth commit", parents=[b"3" * 40], tree=tree.id
-        )
-
-        # Add commits to object store
-        for commit in [self.c1, self.c2, self.c3, self.c4]:
-            self.repo.object_store.add_object(commit)
-
-        # Set HEAD to latest commit
-        self.repo.refs[b"HEAD"] = self.c4.id
-        self.repo.refs[b"refs/heads/master"] = self.c4.id
-
-    def tearDown(self) -> None:
-        shutil.rmtree(self.test_dir)
-
-    def test_bisect_start(self) -> None:
-        """Test bisect_start porcelain function."""
-        porcelain.bisect_start(self.test_dir)
-
-        # Check that bisect state files exist
-        self.assertTrue(
-            os.path.exists(os.path.join(self.repo.controldir(), "BISECT_START"))
-        )
-
-    def test_bisect_bad_good(self) -> None:
-        """Test marking commits as bad and good."""
-        porcelain.bisect_start(self.test_dir)
-        porcelain.bisect_bad(self.test_dir, self.c4.id.decode("ascii"))
-        porcelain.bisect_good(self.test_dir, self.c1.id.decode("ascii"))
-
-        # Check that refs were created
-        self.assertTrue(
-            os.path.exists(
-                os.path.join(self.repo.controldir(), "refs", "bisect", "bad")
-            )
-        )
-        self.assertTrue(
-            os.path.exists(
-                os.path.join(
-                    self.repo.controldir(),
-                    "refs",
-                    "bisect",
-                    f"good-{self.c1.id.decode('ascii')}",
-                )
-            )
-        )
-
-    def test_bisect_log(self) -> None:
-        """Test getting bisect log."""
-        porcelain.bisect_start(self.test_dir)
-        porcelain.bisect_bad(self.test_dir, self.c4.id.decode("ascii"))
-        porcelain.bisect_good(self.test_dir, self.c1.id.decode("ascii"))
-
-        log = porcelain.bisect_log(self.test_dir)
-
-        self.assertIn("git bisect start", log)
-        self.assertIn("git bisect bad", log)
-        self.assertIn("git bisect good", log)
-
-    def test_bisect_reset(self) -> None:
-        """Test resetting bisect state."""
-        porcelain.bisect_start(self.test_dir)
-        porcelain.bisect_bad(self.test_dir)
-        porcelain.bisect_good(self.test_dir, self.c1.id.decode("ascii"))
-
-        porcelain.bisect_reset(self.test_dir)
-
-        # Check that bisect state files are removed
-        self.assertFalse(
-            os.path.exists(os.path.join(self.repo.controldir(), "BISECT_START"))
-        )
-        self.assertFalse(
-            os.path.exists(os.path.join(self.repo.controldir(), "refs", "bisect"))
-        )

+ 46 - 20
tests/test_filters.py

@@ -29,7 +29,6 @@ import threading
 from collections.abc import Iterator
 from contextlib import contextmanager
 
-from dulwich import porcelain
 from dulwich.filters import (
     FilterContext,
     FilterError,
@@ -70,8 +69,17 @@ class GitAttributesFilterIntegrationTests(TestCase):
             f.write(b"*.bin -text\n")
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
-        porcelain.commit(self.repo, message=b"Add gitattributes")
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
+        worktree.commit(
+            message=b"Add gitattributes",
+            committer=b"Test <test@example.com>",
+            author=b"Test <test@example.com>",
+            commit_timestamp=1000000000,
+            author_timestamp=1000000000,
+            commit_timezone=0,
+            author_timezone=0,
+        )
 
         # Create text file with CRLF
         text_file = os.path.join(self.test_dir, "test.txt")
@@ -84,7 +92,7 @@ class GitAttributesFilterIntegrationTests(TestCase):
             f.write(b"binary\r\ndata\r\n")
 
         # Add files
-        porcelain.add(self.repo, paths=["test.txt", "test.bin"])
+        worktree.stage(["test.txt", "test.bin"])
 
         # Check that text file was normalized
         index = self.repo.open_index()
@@ -136,7 +144,8 @@ sys.stdout.buffer.write(result)
         config.write_to_path()
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
 
         # Create file with sensitive content
         secret_file = os.path.join(self.test_dir, "password.secret")
@@ -144,7 +153,7 @@ sys.stdout.buffer.write(result)
             f.write(b"password123\ntoken456\n")
 
         # Add file
-        porcelain.add(self.repo, paths=["password.secret"])
+        worktree.stage(["password.secret"])
 
         # Check that content was filtered
         index = self.repo.open_index()
@@ -160,8 +169,17 @@ sys.stdout.buffer.write(result)
             f.write(b"*.txt text\n")
 
         # Add and commit .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
-        porcelain.commit(self.repo, message=b"Add gitattributes")
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
+        worktree.commit(
+            message=b"Add gitattributes",
+            committer=b"Test <test@example.com>",
+            author=b"Test <test@example.com>",
+            commit_timestamp=1000000000,
+            author_timestamp=1000000000,
+            commit_timezone=0,
+            author_timezone=0,
+        )
 
         # Remove .gitattributes from working tree
         os.remove(gitattributes_path)
@@ -221,7 +239,8 @@ sys.stdout.buffer.write(result)
         config.write_to_path()
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
 
         # Create text file with lowercase and CRLF
         text_file = os.path.join(self.test_dir, "test.txt")
@@ -229,7 +248,7 @@ sys.stdout.buffer.write(result)
             f.write(b"hello\r\nworld\r\n")
 
         # Add file
-        porcelain.add(self.repo, paths=["test.txt"])
+        worktree.stage(["test.txt"])
 
         # Check that custom filter was applied (not just line ending conversion)
         index = self.repo.open_index()
@@ -264,7 +283,8 @@ sys.stdout.buffer.write(result)
         config.write_to_path()
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
 
         # Create file that would use the filter
         secret_file = os.path.join(self.test_dir, "test.secret")
@@ -273,7 +293,7 @@ sys.stdout.buffer.write(result)
 
         # Adding file should raise error due to missing required filter
         with self.assertRaises(FilterError) as cm:
-            porcelain.add(self.repo, paths=["test.secret"])
+            worktree.stage(["test.secret"])
         self.assertIn(
             "Required filter 'required_filter' is not available", str(cm.exception)
         )
@@ -294,7 +314,8 @@ sys.stdout.buffer.write(result)
         config.write_to_path()
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
 
         # Create file that would use the filter
         secret_file = os.path.join(self.test_dir, "test.secret")
@@ -303,7 +324,7 @@ sys.stdout.buffer.write(result)
 
         # Adding file should raise error due to failing required filter
         with self.assertRaises(FilterError) as cm:
-            porcelain.add(self.repo, paths=["test.secret"])
+            worktree.stage(["test.secret"])
         self.assertIn("Required clean filter failed", str(cm.exception))
 
     def test_required_filter_success(self) -> None:
@@ -322,7 +343,8 @@ sys.stdout.buffer.write(result)
         config.write_to_path()
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
 
         # Create file that would use the filter
         secret_file = os.path.join(self.test_dir, "test.secret")
@@ -330,7 +352,7 @@ sys.stdout.buffer.write(result)
             f.write(b"hello world\n")
 
         # Adding file should work and apply filter
-        porcelain.add(self.repo, paths=["test.secret"])
+        worktree.stage(["test.secret"])
 
         # Check that content was filtered
         index = self.repo.open_index()
@@ -354,7 +376,8 @@ sys.stdout.buffer.write(result)
         config.write_to_path()
 
         # Add .gitattributes
-        porcelain.add(self.repo, paths=[".gitattributes"])
+        worktree = self.repo.get_worktree()
+        worktree.stage([".gitattributes"])
 
         # Create file that would use the filter
         test_file = os.path.join(self.test_dir, "test.txt")
@@ -362,7 +385,8 @@ sys.stdout.buffer.write(result)
             f.write(b"test content\n")
 
         # Adding file should work and fallback to original content
-        porcelain.add(self.repo, paths=["test.txt"])
+        with self.assertLogs(level="WARNING"):
+            worktree.stage(["test.txt"])
 
         # Check that original content was preserved
         index = self.repo.open_index()
@@ -556,7 +580,8 @@ while True:
         )
 
         test_data = b"hello world\n"
-        result = driver.clean(test_data)
+        with self.assertLogs(level="WARNING"):
+            result = driver.clean(test_data)
 
         # Should fallback to tr command and uppercase
         self.assertEqual(result, b"HELLO WORLD\n")
@@ -1133,7 +1158,8 @@ protocol.write_pkt_line(None)
             )
 
             # Should fallback to clean_cmd when process fails
-            result = driver.clean(b"test data")
+            with self.assertLogs(level="WARNING"):
+                result = driver.clean(b"test data")
             self.assertEqual(result, b"test data")
 
         finally:

+ 0 - 137
tests/test_ignore.py

@@ -36,7 +36,6 @@ from dulwich.ignore import (
     read_ignore_patterns,
     translate,
 )
-from dulwich.porcelain import _quote_path
 from dulwich.repo import Repo
 
 from . import TestCase
@@ -567,139 +566,3 @@ class IgnoreFilterManagerTests(TestCase):
         self.assertTrue(m.is_ignored("data/"))
         self.assertTrue(m.is_ignored("data/important/"))  # Cannot re-include
         self.assertTrue(m.is_ignored("data/important/file.txt"))
-
-
-class QuotePathTests(TestCase):
-    """Tests for _quote_path function."""
-
-    def test_ascii_paths(self) -> None:
-        """Test that ASCII paths are not quoted."""
-        self.assertEqual(_quote_path("file.txt"), "file.txt")
-        self.assertEqual(_quote_path("dir/file.txt"), "dir/file.txt")
-        self.assertEqual(_quote_path("path with spaces.txt"), "path with spaces.txt")
-
-    def test_unicode_paths(self) -> None:
-        """Test that unicode paths are quoted with C-style escapes."""
-        # Russian characters
-        self.assertEqual(
-            _quote_path("тест.txt"), '"\\321\\202\\320\\265\\321\\201\\321\\202.txt"'
-        )
-        # Chinese characters
-        self.assertEqual(
-            _quote_path("файл.测试"),
-            '"\\321\\204\\320\\260\\320\\271\\320\\273.\\346\\265\\213\\350\\257\\225"',
-        )
-        # Mixed ASCII and unicode
-        self.assertEqual(
-            _quote_path("test-тест.txt"),
-            '"test-\\321\\202\\320\\265\\321\\201\\321\\202.txt"',
-        )
-
-    def test_special_characters(self) -> None:
-        """Test that special characters are properly escaped."""
-        # Quotes in filename
-        self.assertEqual(
-            _quote_path('file"with"quotes.txt'), '"file\\"with\\"quotes.txt"'
-        )
-        # Backslashes in filename
-        self.assertEqual(
-            _quote_path("file\\with\\backslashes.txt"),
-            '"file\\\\with\\\\backslashes.txt"',
-        )
-        # Mixed special chars and unicode
-        self.assertEqual(
-            _quote_path('тест"файл.txt'),
-            '"\\321\\202\\320\\265\\321\\201\\321\\202\\"\\321\\204\\320\\260\\320\\271\\320\\273.txt"',
-        )
-
-    def test_empty_and_edge_cases(self) -> None:
-        """Test edge cases."""
-        self.assertEqual(_quote_path(""), "")
-        self.assertEqual(_quote_path("a"), "a")  # Single ASCII char
-        self.assertEqual(_quote_path("я"), '"\\321\\217"')  # Single unicode char
-
-
-class CheckIgnoreQuotePathTests(TestCase):
-    """Integration tests for check_ignore with quote_path parameter."""
-
-    def setUp(self) -> None:
-        self.test_dir = tempfile.mkdtemp()
-        self.addCleanup(shutil.rmtree, self.test_dir)
-
-    def test_quote_path_true_unicode_filenames(self) -> None:
-        """Test that quote_path=True returns quoted unicode filenames."""
-        from dulwich import porcelain
-
-        # Create a repository
-        repo = Repo.init(self.test_dir)
-        self.addCleanup(repo.close)
-
-        # Create .gitignore with unicode patterns
-        gitignore_path = os.path.join(self.test_dir, ".gitignore")
-        with open(gitignore_path, "w", encoding="utf-8") as f:
-            f.write("тест*\n")
-            f.write("*.测试\n")
-
-        # Create unicode files
-        test_files = ["тест.txt", "файл.测试", "normal.txt"]
-        for filename in test_files:
-            filepath = os.path.join(self.test_dir, filename)
-            with open(filepath, "w", encoding="utf-8") as f:
-                f.write("test content")
-
-        # Test with quote_path=True (default)
-        abs_paths = [os.path.join(self.test_dir, f) for f in test_files]
-        ignored_quoted = set(
-            porcelain.check_ignore(self.test_dir, abs_paths, quote_path=True)
-        )
-
-        # Test with quote_path=False
-        ignored_unquoted = set(
-            porcelain.check_ignore(self.test_dir, abs_paths, quote_path=False)
-        )
-
-        # Verify quoted results
-        expected_quoted = {
-            '"\\321\\202\\320\\265\\321\\201\\321\\202.txt"',  # тест.txt
-            '"\\321\\204\\320\\260\\320\\271\\320\\273.\\346\\265\\213\\350\\257\\225"',  # файл.测试
-        }
-        self.assertEqual(ignored_quoted, expected_quoted)
-
-        # Verify unquoted results
-        expected_unquoted = {"тест.txt", "файл.测试"}
-        self.assertEqual(ignored_unquoted, expected_unquoted)
-
-    def test_quote_path_ascii_filenames(self) -> None:
-        """Test that ASCII filenames are unaffected by quote_path setting."""
-        from dulwich import porcelain
-
-        # Create a repository
-        repo = Repo.init(self.test_dir)
-        self.addCleanup(repo.close)
-
-        # Create .gitignore
-        gitignore_path = os.path.join(self.test_dir, ".gitignore")
-        with open(gitignore_path, "w") as f:
-            f.write("*.tmp\n")
-            f.write("test*\n")
-
-        # Create ASCII files
-        test_files = ["test.txt", "file.tmp", "normal.txt"]
-        for filename in test_files:
-            filepath = os.path.join(self.test_dir, filename)
-            with open(filepath, "w") as f:
-                f.write("test content")
-
-        # Test both settings
-        abs_paths = [os.path.join(self.test_dir, f) for f in test_files]
-        ignored_quoted = set(
-            porcelain.check_ignore(self.test_dir, abs_paths, quote_path=True)
-        )
-        ignored_unquoted = set(
-            porcelain.check_ignore(self.test_dir, abs_paths, quote_path=False)
-        )
-
-        # Both should return the same results for ASCII filenames
-        expected = {"test.txt", "file.tmp"}
-        self.assertEqual(ignored_quoted, expected)
-        self.assertEqual(ignored_unquoted, expected)

+ 25 - 6
tests/test_lfs.py

@@ -27,7 +27,7 @@ import shutil
 import tempfile
 from pathlib import Path
 
-from dulwich import porcelain
+from dulwich.client import LocalGitClient
 from dulwich.lfs import LFSFilterDriver, LFSPointer, LFSStore
 from dulwich.repo import Repo
 
@@ -362,13 +362,23 @@ class LFSIntegrationTests(TestCase):
             f.write(pointer.to_bytes())
 
         # Commit files
-        porcelain.add(source_repo, paths=[".gitattributes", "test.bin"])
-        porcelain.commit(source_repo, message=b"Add LFS tracked file")
+        source_worktree = source_repo.get_worktree()
+        source_worktree.stage([b".gitattributes", b"test.bin"])
+        source_worktree.commit(
+            message=b"Add LFS tracked file",
+            committer=b"Test <test@example.com>",
+            author=b"Test <test@example.com>",
+            commit_timestamp=1000000000,
+            author_timestamp=1000000000,
+            commit_timezone=0,
+            author_timezone=0,
+        )
         source_repo.close()
 
         # Clone the repository
         target_dir = os.path.join(self.test_dir, "target")
-        target_repo = porcelain.clone(source_dir, target_dir)
+        client = LocalGitClient()
+        target_repo = client.clone(source_dir, target_dir)
 
         # Verify no LFS commands in config
         target_config = target_repo.get_config_stack()
@@ -410,8 +420,17 @@ class LFSIntegrationTests(TestCase):
             f.write(pointer.to_bytes())
 
         # Commit
-        porcelain.add(self.repo, paths=[".gitattributes", "data.dat"])
-        porcelain.commit(self.repo, message=b"Add LFS file")
+        worktree = self.repo.get_worktree()
+        worktree.stage([b".gitattributes", b"data.dat"])
+        worktree.commit(
+            message=b"Add LFS file",
+            committer=b"Test <test@example.com>",
+            author=b"Test <test@example.com>",
+            commit_timestamp=1000000000,
+            author_timestamp=1000000000,
+            commit_timezone=0,
+            author_timezone=0,
+        )
 
         # Reset index to trigger checkout with filter
         self.repo.get_worktree().reset_index()

+ 2 - 1
tests/test_lfs_integration.py

@@ -145,5 +145,6 @@ class LFSFilterIntegrationTests(TestCase):
         blob.data = pointer.to_bytes()
 
         # Checkout should return the pointer as-is when object is missing
-        checked_out = self.normalizer.checkout_normalize(blob, b"missing.bin")
+        with self.assertLogs("dulwich.lfs", level="WARNING"):
+            checked_out = self.normalizer.checkout_normalize(blob, b"missing.bin")
         self.assertEqual(checked_out.data, blob.data)

+ 0 - 97
tests/test_maintenance.py

@@ -23,7 +23,6 @@
 
 import tempfile
 
-from dulwich import porcelain
 from dulwich.maintenance import (
     CommitGraphTask,
     GcTask,
@@ -216,99 +215,3 @@ class MaintenanceFunctionsTest(MaintenanceTaskTestCase):
         result = run_maintenance(self.repo, progress=progress)
         self.assertGreater(len(messages), 0)
         self.assertIn("gc", result.tasks_succeeded)
-
-
-class PorcelainMaintenanceTest(MaintenanceTaskTestCase):
-    """Tests for porcelain.maintenance_run function."""
-
-    def test_maintenance_run(self):
-        """Test porcelain maintenance_run function."""
-        self._create_commit()
-        result = porcelain.maintenance_run(self.test_dir)
-        self.assertIn("gc", result.tasks_succeeded)
-        self.assertIn("commit-graph", result.tasks_succeeded)
-
-    def test_maintenance_run_with_tasks(self):
-        """Test porcelain maintenance_run with specific tasks."""
-        result = porcelain.maintenance_run(self.test_dir, tasks=["pack-refs"])
-        self.assertEqual(result.tasks_run, ["pack-refs"])
-        self.assertEqual(result.tasks_succeeded, ["pack-refs"])
-
-
-class MaintenanceRegisterTest(MaintenanceTaskTestCase):
-    """Tests for maintenance register/unregister."""
-
-    def setUp(self):
-        super().setUp()
-        # Set up a temporary HOME for testing global config
-        self.temp_home = tempfile.mkdtemp()
-        self.addCleanup(self._cleanup_temp_home)
-        self.overrideEnv("HOME", self.temp_home)
-
-    def _cleanup_temp_home(self):
-        import shutil
-
-        shutil.rmtree(self.temp_home)
-
-    def test_register_repository(self):
-        """Test registering a repository for maintenance."""
-        porcelain.maintenance_register(self.test_dir)
-
-        # Verify repository was added to global config
-        import os
-
-        from dulwich.config import ConfigFile
-
-        global_config_path = os.path.expanduser("~/.gitconfig")
-        global_config = ConfigFile.from_path(global_config_path)
-
-        repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
-        self.assertIn(self.test_dir.encode(), repos)
-
-        # Verify strategy was set
-        strategy = global_config.get((b"maintenance",), b"strategy")
-        self.assertEqual(strategy, b"incremental")
-
-        # Verify auto maintenance was disabled in repo
-        repo_config = self.repo.get_config()
-        auto = repo_config.get_boolean((b"maintenance",), b"auto")
-        self.assertFalse(auto)
-
-    def test_register_already_registered(self):
-        """Test registering an already registered repository."""
-        porcelain.maintenance_register(self.test_dir)
-        # Should not error when registering again
-        porcelain.maintenance_register(self.test_dir)
-
-    def test_unregister_repository(self):
-        """Test unregistering a repository."""
-        # First register
-        porcelain.maintenance_register(self.test_dir)
-
-        # Then unregister
-        porcelain.maintenance_unregister(self.test_dir)
-
-        # Verify repository was removed from global config
-        import os
-
-        from dulwich.config import ConfigFile
-
-        global_config_path = os.path.expanduser("~/.gitconfig")
-        global_config = ConfigFile.from_path(global_config_path)
-
-        try:
-            repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
-            self.assertNotIn(self.test_dir.encode(), repos)
-        except KeyError:
-            # No repos registered, which is fine
-            pass
-
-    def test_unregister_not_registered(self):
-        """Test unregistering a repository that is not registered."""
-        with self.assertRaises(ValueError):
-            porcelain.maintenance_unregister(self.test_dir)
-
-    def test_unregister_not_registered_force(self):
-        """Test unregistering with force flag."""
-        # Should not error with force=True
-        porcelain.maintenance_unregister(self.test_dir, force=True)

+ 0 - 130
tests/test_mbox.py

@@ -26,7 +26,6 @@ import os
 import tempfile
 from io import BytesIO
 
-from dulwich import porcelain
 from dulwich.mbox import split_maildir, split_mbox
 
 from . import TestCase
@@ -319,135 +318,6 @@ class SplitMaildirTests(TestCase):
             self.assertEqual(output_files[0], os.path.join(output_dir, "01"))
 
 
-class PorcelainMailsplitTests(TestCase):
-    """Tests for porcelain.mailsplit function."""
-
-    def test_mailsplit_mbox(self) -> None:
-        """Test porcelain mailsplit with mbox file."""
-        mbox_content = b"""\
-From alice@example.com Mon Jan 01 00:00:00 2025
-From: Alice <alice@example.com>
-Subject: Test
-
-Test message.
-"""
-        with tempfile.TemporaryDirectory() as tmpdir:
-            mbox_path = os.path.join(tmpdir, "test.mbox")
-            with open(mbox_path, "wb") as f:
-                f.write(mbox_content)
-
-            output_dir = os.path.join(tmpdir, "output")
-            os.makedirs(output_dir)
-
-            # Split using porcelain function
-            output_files = porcelain.mailsplit(
-                input_path=mbox_path, output_dir=output_dir
-            )
-
-            self.assertEqual(len(output_files), 1)
-            self.assertEqual(output_files[0], os.path.join(output_dir, "0001"))
-
-    def test_mailsplit_maildir(self) -> None:
-        """Test porcelain mailsplit with Maildir."""
-        with tempfile.TemporaryDirectory() as tmpdir:
-            # Create a Maildir
-            maildir_path = os.path.join(tmpdir, "maildir")
-            md = mailbox.Maildir(maildir_path)
-
-            msg = mailbox.MaildirMessage()
-            msg.set_payload(b"Test message")
-            msg["From"] = "test@example.com"
-            md.add(msg)
-
-            output_dir = os.path.join(tmpdir, "output")
-            os.makedirs(output_dir)
-
-            # Split using porcelain function with is_maildir=True
-            output_files = porcelain.mailsplit(
-                input_path=maildir_path, output_dir=output_dir, is_maildir=True
-            )
-
-            self.assertEqual(len(output_files), 1)
-            self.assertTrue(os.path.exists(output_files[0]))
-
-    def test_mailsplit_with_options(self) -> None:
-        """Test porcelain mailsplit with various options."""
-        mbox_content = b"""\
-From test@example.com Mon Jan 01 00:00:00 2025
-From: Test <test@example.com>
-Subject: Test
-
-Test message.
-"""
-        with tempfile.TemporaryDirectory() as tmpdir:
-            mbox_path = os.path.join(tmpdir, "test.mbox")
-            with open(mbox_path, "wb") as f:
-                f.write(mbox_content)
-
-            output_dir = os.path.join(tmpdir, "output")
-            os.makedirs(output_dir)
-
-            # Split with custom options
-            output_files = porcelain.mailsplit(
-                input_path=mbox_path,
-                output_dir=output_dir,
-                start_number=5,
-                precision=3,
-                keep_cr=True,
-            )
-
-            self.assertEqual(len(output_files), 1)
-            self.assertEqual(output_files[0], os.path.join(output_dir, "005"))
-
-    def test_mailsplit_mboxrd(self) -> None:
-        """Test porcelain mailsplit with mboxrd format."""
-        mbox_content = b"""\
-From test@example.com Mon Jan 01 00:00:00 2025
-From: Test <test@example.com>
-Subject: Test
-
->From quoted text
-"""
-        with tempfile.TemporaryDirectory() as tmpdir:
-            mbox_path = os.path.join(tmpdir, "test.mbox")
-            with open(mbox_path, "wb") as f:
-                f.write(mbox_content)
-
-            output_dir = os.path.join(tmpdir, "output")
-            os.makedirs(output_dir)
-
-            # Split with mboxrd=True
-            output_files = porcelain.mailsplit(
-                input_path=mbox_path, output_dir=output_dir, mboxrd=True
-            )
-
-            self.assertEqual(len(output_files), 1)
-
-            # Verify >From escaping was reversed
-            with open(output_files[0], "rb") as f:
-                content = f.read()
-                expected = b"""\
-From: Test <test@example.com>
-Subject: Test
-
-From quoted text
-"""
-                self.assertEqual(content, expected)
-
-    def test_mailsplit_maildir_requires_path(self) -> None:
-        """Test that mailsplit raises ValueError when is_maildir=True but no input_path."""
-        with tempfile.TemporaryDirectory() as tmpdir:
-            output_dir = os.path.join(tmpdir, "output")
-            os.makedirs(output_dir)
-
-            with self.assertRaises(ValueError) as cm:
-                porcelain.mailsplit(
-                    input_path=None, output_dir=output_dir, is_maildir=True
-                )
-
-            self.assertIn("required", str(cm.exception).lower())
-
-
 class MailinfoTests(TestCase):
     """Tests for mbox.mailinfo function."""
 

+ 1 - 87
tests/test_rebase.py

@@ -22,8 +22,6 @@
 """Tests for dulwich.rebase."""
 
 import importlib.util
-import os
-import tempfile
 
 from dulwich.objects import Blob, Commit, Tree
 from dulwich.rebase import (
@@ -36,7 +34,7 @@ from dulwich.rebase import (
     rebase,
     start_interactive,
 )
-from dulwich.repo import MemoryRepo, Repo
+from dulwich.repo import MemoryRepo
 from dulwich.tests.utils import make_commit
 
 from . import DependencyMissing, TestCase
@@ -388,90 +386,6 @@ class RebaserTestCase(TestCase):
         self.assertNotIn(b"a.txt", new_c_tree)
 
 
-class RebasePorcelainTestCase(TestCase):
-    """Tests for the porcelain rebase function."""
-
-    def setUp(self):
-        """Set up test repository."""
-        super().setUp()
-        self.test_dir = tempfile.mkdtemp()
-        self.repo = Repo.init(self.test_dir)
-
-        # Create initial commit
-        with open(os.path.join(self.test_dir, "README.md"), "wb") as f:
-            f.write(b"# Test Repository\n")
-
-        self.repo.get_worktree().stage(["README.md"])
-        self.initial_commit = self.repo.get_worktree().commit(
-            message=b"Initial commit",
-            committer=b"Test User <test@example.com>",
-            author=b"Test User <test@example.com>",
-        )
-
-    def tearDown(self):
-        """Clean up test directory."""
-        import shutil
-
-        shutil.rmtree(self.test_dir)
-
-    def test_porcelain_rebase(self):
-        """Test rebase through porcelain interface."""
-        from dulwich import porcelain
-
-        # Create and checkout feature branch
-        self.repo.refs[b"refs/heads/feature"] = self.initial_commit
-        porcelain.checkout(self.repo, "feature")
-
-        # Add commit to feature branch
-        with open(os.path.join(self.test_dir, "feature.txt"), "wb") as f:
-            f.write(b"Feature file\n")
-
-        porcelain.add(self.repo, ["feature.txt"])
-        porcelain.commit(
-            self.repo,
-            message="Add feature",
-            author="Test User <test@example.com>",
-            committer="Test User <test@example.com>",
-        )
-
-        # Switch to main and add different commit
-        porcelain.checkout(self.repo, "master")
-
-        with open(os.path.join(self.test_dir, "main.txt"), "wb") as f:
-            f.write(b"Main file\n")
-
-        porcelain.add(self.repo, ["main.txt"])
-        porcelain.commit(
-            self.repo,
-            message="Main update",
-            author="Test User <test@example.com>",
-            committer="Test User <test@example.com>",
-        )
-
-        # Switch back to feature and rebase
-        porcelain.checkout(self.repo, "feature")
-
-        # Perform rebase
-        new_shas = porcelain.rebase(self.repo, "master")
-
-        # Should have rebased one commit
-        self.assertEqual(len(new_shas), 1)
-
-        # Check that the rebased commit has the correct parent and tree
-        feature_head = self.repo.refs[b"refs/heads/feature"]
-        feature_commit_obj = self.repo[feature_head]
-
-        # Should have master as parent
-        master_head = self.repo.refs[b"refs/heads/master"]
-        self.assertEqual(feature_commit_obj.parents, [master_head])
-
-        # Tree should have both files
-        tree = self.repo[feature_commit_obj.tree]
-        self.assertIn(b"feature.txt", tree)
-        self.assertIn(b"main.txt", tree)
-        self.assertIn(b"README.md", tree)
-
-
 class InteractiveRebaseTestCase(TestCase):
     """Tests for interactive rebase functionality."""
 

+ 74 - 60
tests/test_repository.py

@@ -31,9 +31,10 @@ import tempfile
 import time
 import warnings
 
-from dulwich import errors, objects, porcelain
+from dulwich import errors, objects
 from dulwich.config import Config
 from dulwich.errors import NotGitRepository
+from dulwich.index import get_unstaged_changes as _get_unstaged_changes
 from dulwich.object_store import tree_lookup_path
 from dulwich.repo import (
     InvalidUserIdentity,
@@ -50,6 +51,14 @@ from . import TestCase, skipIf
 missing_sha = b"b91fa4d900e17e99b433218e988c4eb4a3e9a097"
 
 
+def get_unstaged_changes(repo):
+    """Helper to get unstaged changes for a repo."""
+    index = repo.open_index()
+    normalizer = repo.get_blob_normalizer()
+    filter_callback = normalizer.checkin_normalize if normalizer else None
+    return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
+
+
 class CreateRepositoryTests(TestCase):
     def assertFileContentsEqual(self, expected, repo, path) -> None:
         f = repo.get_named_file(path)
@@ -539,7 +548,8 @@ class RepositoryRootTests(TestCase):
         self.addCleanup(shutil.rmtree, tmp_dir)
         t = Repo.init(tmp_dir)
         self.addCleanup(t.close)
-        r.fetch(t)
+        with self.assertLogs(level="WARNING"):
+            r.fetch(t)
         self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
         self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
         self.assertIn(b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", t)
@@ -1705,96 +1715,88 @@ class BuildRepoRootTests(TestCase):
 
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.stage(["new_dir/foo"])
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "a") as f:
             f.write("something new")
-        self._repo.get_worktree().unstage(["new_dir/foo"])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [
-                {"add": [], "delete": [], "modify": []},
-                [os.fsencode(os.path.join("new_dir", "foo"))],
-                [],
-            ],
-            status,
-        )
+        wt.unstage(["new_dir/foo"])
+
+        unstaged = get_unstaged_changes(self._repo)
+        self.assertEqual([b"new_dir/foo"], unstaged)
 
     def test_unstage_while_no_commit(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+        wt = self._repo.get_worktree()
+        wt.stage([file])
+        wt.unstage([file])
+
+        # Check that file is no longer in index
+        index = self._repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_add_file(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+        wt.stage([file])
+        wt.unstage([file])
+
+        # Check that file is no longer in index
+        index = self._repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_modify_file(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.stage([file])
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "a") as f:
             f.write("broken")
-        porcelain.add(self._repo, paths=[full_path])
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
+        wt.stage([file])
+        wt.unstage([file])
 
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+        unstaged = get_unstaged_changes(self._repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
     def test_unstage_remove_file(self) -> None:
         file = "foo"
         full_path = os.path.join(self._repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self._repo, paths=[full_path])
-        porcelain.commit(
-            self._repo,
+        wt = self._repo.get_worktree()
+        wt.stage([file])
+        wt.commit(
             message=b"unitest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         os.remove(full_path)
-        self._repo.get_worktree().unstage([file])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+        wt.unstage([file])
+
+        unstaged = get_unstaged_changes(self._repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
     def test_reset_index(self) -> None:
         r = self._repo
@@ -1803,20 +1805,32 @@ class BuildRepoRootTests(TestCase):
         with open(os.path.join(r.path, "b"), "wb") as f:
             f.write(b"added")
         r.get_worktree().stage(["a", "b"])
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [
-                {"add": [os.fsencode("b")], "delete": [], "modify": [os.fsencode("a")]},
-                [],
-                [],
-            ],
-            status,
-        )
+
+        # Check staged changes using lower-level APIs
+        index = r.open_index()
+        staged = {"add": [], "delete": [], "modify": []}
+        try:
+            head_commit = r[b"HEAD"]
+            tree_id = head_commit.tree
+        except KeyError:
+            tree_id = None
+
+        for change in index.changes_from_tree(r.object_store, tree_id):
+            if not change[0][0]:
+                staged["add"].append(change[0][1])
+            elif not change[1][1]:
+                staged["delete"].append(change[0][1])
+            else:
+                staged["modify"].append(change[0][1])
+
+        self.assertEqual({"add": [b"b"], "delete": [], "modify": [b"a"]}, staged)
+
         r.get_worktree().reset_index()
-        status = list(porcelain.status(self._repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("b")]], status
-        )
+
+        # After reset, check that nothing is staged and b is untracked
+        index = r.open_index()
+        self.assertNotIn(b"b", index)
+        self.assertIn(b"a", index)
 
     @skipIf(
         sys.platform in ("win32", "darwin"),
@@ -2353,7 +2367,7 @@ class SharedRepositoryTests(TestCase):
         with open(test_file, "wb") as f:
             f.write(b"test content")
         # Stage the file
-        porcelain.add(repo, [test_file])
+        repo.get_worktree().stage(["test.txt"])
 
         # Check index file permissions
         index_path = repo.index_path()

+ 89 - 12
tests/test_rerere.py

@@ -532,8 +532,12 @@ class RerereEndToEndTests(unittest.TestCase):
 
     def test_rerere_full_workflow(self) -> None:
         """Test complete rerere workflow with real merge conflicts."""
+        from dulwich.diff_tree import tree_changes
+        from dulwich.graph import find_merge_base
+        from dulwich.index import update_working_tree
+        from dulwich.merge import recursive_merge
         from dulwich.objects import Blob, Commit, Tree
-        from dulwich.porcelain import merge, rerere
+        from dulwich.rerere import rerere_auto
 
         # Create branch1: change "original line" to "branch1 change"
         blob_branch1 = Blob.from_string(b"line 1\nbranch1 change\nline 3\n")
@@ -583,10 +587,27 @@ class RerereEndToEndTests(unittest.TestCase):
             f.write(b"line 1\nbranch1 change\nline 3\n")
 
         # Merge branch2 into branch1 - should create conflict
-        merge_result, conflicts = merge(self.repo, b"branch2", no_commit=True)
+        # Using lower-level merge APIs
+        head_commit = commit_branch1
+        merge_commit = commit_branch2
+        merge_bases = find_merge_base(self.repo, [head_commit.id, merge_commit.id])
+        gitattributes = self.repo.get_gitattributes()
+        config = self.repo.get_config()
+        merged_tree, conflicts = recursive_merge(
+            self.repo.object_store,
+            merge_bases,
+            head_commit,
+            merge_commit,
+            gitattributes,
+            config,
+        )
+        self.repo.object_store.add_object(merged_tree)
+        changes = tree_changes(self.repo.object_store, head_commit.tree, merged_tree.id)
+        update_working_tree(
+            self.repo, head_commit.tree, merged_tree.id, change_iterator=changes
+        )
 
         # Should have conflicts
-        self.assertIsNone(merge_result)  # No commit created due to conflicts
         self.assertEqual([b"file.txt"], conflicts)
 
         # File should have conflict markers
@@ -597,7 +618,7 @@ class RerereEndToEndTests(unittest.TestCase):
         self.assertIn(b"branch2 change", content)
 
         # Record the conflict with rerere
-        recorded, resolved = rerere(self.repo)
+        recorded, resolved = rerere_auto(self.repo, self.tempdir, conflicts)
         self.assertEqual(1, len(recorded))
         self.assertEqual(0, len(resolved))  # No resolution yet
 
@@ -620,11 +641,28 @@ class RerereEndToEndTests(unittest.TestCase):
             f.write(b"line 1\nbranch1 change\nline 3\n")
 
         # Merge again - should create same conflict
-        _merge_result2, conflicts2 = merge(self.repo, b"branch2", no_commit=True)
+        merge_bases2 = find_merge_base(
+            self.repo, [commit_branch1.id, commit_branch2.id]
+        )
+        merged_tree2, conflicts2 = recursive_merge(
+            self.repo.object_store,
+            merge_bases2,
+            commit_branch1,
+            commit_branch2,
+            gitattributes,
+            config,
+        )
+        self.repo.object_store.add_object(merged_tree2)
+        changes2 = tree_changes(
+            self.repo.object_store, commit_branch1.tree, merged_tree2.id
+        )
+        update_working_tree(
+            self.repo, commit_branch1.tree, merged_tree2.id, change_iterator=changes2
+        )
         self.assertEqual([b"file.txt"], conflicts2)
 
         # Now rerere should recognize the conflict
-        recorded2, resolved2 = rerere(self.repo)
+        recorded2, resolved2 = rerere_auto(self.repo, self.tempdir, conflicts2)
         self.assertEqual(1, len(recorded2))
 
         # With autoupdate disabled, it shouldn't auto-apply
@@ -632,9 +670,12 @@ class RerereEndToEndTests(unittest.TestCase):
 
     def test_rerere_with_autoupdate(self) -> None:
         """Test rerere with autoupdate enabled."""
+        from dulwich.diff_tree import tree_changes
+        from dulwich.graph import find_merge_base
+        from dulwich.index import update_working_tree
+        from dulwich.merge import recursive_merge
         from dulwich.objects import Blob, Commit, Tree
-        from dulwich.porcelain import merge, rerere
-        from dulwich.rerere import RerereCache
+        from dulwich.rerere import RerereCache, rerere_auto
 
         # Enable autoupdate
         config = self.repo.get_config()
@@ -688,10 +729,28 @@ class RerereEndToEndTests(unittest.TestCase):
         with open(os.path.join(self.tempdir, "file.txt"), "wb") as f:
             f.write(b"line 1\nbranch1 change\nline 3\n")
 
-        merge(self.repo, b"branch2", no_commit=True)
+        # Perform merge using lower-level APIs
+        merge_bases = find_merge_base(self.repo, [commit_branch1.id, commit_branch2.id])
+        gitattributes = self.repo.get_gitattributes()
+        config = self.repo.get_config()
+        merged_tree, conflicts = recursive_merge(
+            self.repo.object_store,
+            merge_bases,
+            commit_branch1,
+            commit_branch2,
+            gitattributes,
+            config,
+        )
+        self.repo.object_store.add_object(merged_tree)
+        changes = tree_changes(
+            self.repo.object_store, commit_branch1.tree, merged_tree.id
+        )
+        update_working_tree(
+            self.repo, commit_branch1.tree, merged_tree.id, change_iterator=changes
+        )
 
         # Record conflict and resolution
-        recorded, _ = rerere(self.repo)
+        recorded, _ = rerere_auto(self.repo, self.tempdir, conflicts)
         conflict_id = recorded[0][1]
 
         resolved_content = b"line 1\nmerged change\nline 3\n"
@@ -706,10 +765,28 @@ class RerereEndToEndTests(unittest.TestCase):
         with open(os.path.join(self.tempdir, "file.txt"), "wb") as f:
             f.write(b"line 1\nbranch1 change\nline 3\n")
 
-        merge(self.repo, b"branch2", no_commit=True)
+        # Perform merge again using lower-level APIs
+        merge_bases2 = find_merge_base(
+            self.repo, [commit_branch1.id, commit_branch2.id]
+        )
+        merged_tree2, conflicts2 = recursive_merge(
+            self.repo.object_store,
+            merge_bases2,
+            commit_branch1,
+            commit_branch2,
+            gitattributes,
+            config,
+        )
+        self.repo.object_store.add_object(merged_tree2)
+        changes2 = tree_changes(
+            self.repo.object_store, commit_branch1.tree, merged_tree2.id
+        )
+        update_working_tree(
+            self.repo, commit_branch1.tree, merged_tree2.id, change_iterator=changes2
+        )
 
         # With autoupdate, rerere should auto-apply the resolution
-        recorded2, resolved2 = rerere(self.repo)
+        recorded2, resolved2 = rerere_auto(self.repo, self.tempdir, conflicts2)
         self.assertEqual(1, len(recorded2))
         self.assertEqual(1, len(resolved2))
         self.assertEqual(b"file.txt", resolved2[0])

+ 142 - 18
tests/test_source.py

@@ -21,6 +21,7 @@
 
 """Tests for scanning dulwich source code for compliance."""
 
+import ast
 import os
 import re
 import unittest
@@ -64,6 +65,76 @@ STANDARD_LICENSE_BLOCK = [
 ]
 
 
+def _get_python_files(directory_name):
+    """Get all Python files in a directory.
+
+    Args:
+        directory_name: Name of directory relative to project root (e.g., "dulwich", "tests")
+
+    Returns:
+        List of tuples of (Path object, relative path from project root)
+    """
+    project_root = Path(__file__).parent.parent
+    target_dir = project_root / directory_name
+    if not target_dir.exists():
+        raise RuntimeError(f"{directory_name} directory not found at {target_dir}")
+
+    python_files = []
+    for root, dirs, files in os.walk(target_dir):
+        # Skip build directories
+        if root.endswith(("build", "__pycache__")):
+            continue
+
+        for file in files:
+            if file.endswith(".py"):
+                file_path = Path(root) / file
+                rel_path = file_path.relative_to(project_root)
+                python_files.append((file_path, rel_path))
+
+    return python_files
+
+
+def _imports_module(file_path, module_name):
+    """Check if a Python file imports a specific module or any submodules.
+
+    Args:
+        file_path: Path to the Python file
+        module_name: Module name to check for (e.g., "dulwich.porcelain", "dulwich.cli")
+
+    Returns:
+        bool: True if the file imports the module or any submodule
+    """
+    with open(file_path, encoding="utf-8") as f:
+        tree = ast.parse(f.read(), filename=str(file_path))
+
+    for node in ast.walk(tree):
+        # Check "import dulwich.porcelain" or "import dulwich.porcelain.lfs"
+        if isinstance(node, ast.Import):
+            for alias in node.names:
+                if alias.name == module_name or alias.name.startswith(
+                    f"{module_name}."
+                ):
+                    return True
+
+        # Check "from dulwich.porcelain import ..." or "from dulwich import porcelain"
+        if isinstance(node, ast.ImportFrom):
+            # "from dulwich.porcelain import something"
+            # "from dulwich.porcelain.lfs import something"
+            if node.module == module_name or (
+                node.module and node.module.startswith(f"{module_name}.")
+            ):
+                return True
+            # Handle "from dulwich import porcelain"
+            if node.module and module_name.startswith(f"{node.module}."):
+                # e.g., module="dulwich", module_name="dulwich.porcelain"
+                suffix = module_name[len(node.module) + 1 :]
+                for alias in node.names:
+                    if alias.name == suffix:
+                        return True
+
+    return False
+
+
 class SourceCodeComplianceTests(unittest.TestCase):
     """Tests to ensure dulwich source code follows project standards."""
 
@@ -74,24 +145,7 @@ class SourceCodeComplianceTests(unittest.TestCase):
         Returns:
             List of tuples of (Path object, relative path from project root)
         """
-        project_root = Path(__file__).parent.parent
-        dulwich_dir = project_root / "dulwich"
-        if not dulwich_dir.exists():
-            raise RuntimeError(f"dulwich directory not found at {dulwich_dir}")
-
-        python_files = []
-        for root, dirs, files in os.walk(dulwich_dir):
-            # Skip build directories
-            if root.endswith(("build", "__pycache__")):
-                continue
-
-            for file in files:
-                if file.endswith(".py"):
-                    file_path = Path(root) / file
-                    rel_path = file_path.relative_to(project_root)
-                    python_files.append((file_path, rel_path))
-
-        return python_files
+        return _get_python_files("dulwich")
 
     @classmethod
     def _has_standard_preamble(cls, file_path: Path) -> tuple[bool, str]:
@@ -226,3 +280,73 @@ class SourceCodeComplianceTests(unittest.TestCase):
                 + "\n\nFiles allowed to use os.environ:\n"
                 + "\n".join(f"  - {f}" for f in sorted(allowed_files))
             )
+
+    def test_porcelain_usage_restricted_in_tests(self):
+        """Test that dulwich.porcelain is only used in allowed test directories."""
+        test_files = _get_python_files("tests")
+        self.assertGreater(len(test_files), 0, "No Python files found in tests/")
+
+        # Directories allowed to use porcelain
+        allowed_dirs = {
+            "tests/cli/",
+            "tests/porcelain/",
+            "tests/compat/",
+        }
+        # Individual test files allowed to use porcelain
+        allowed_files: set[str] = set()
+
+        files_with_violations = []
+
+        for file_path, rel_path in test_files:
+            # Convert to forward slashes for consistency
+            rel_path_str = str(rel_path).replace(os.sep, "/")
+
+            # Skip allowed directories
+            if any(rel_path_str.startswith(d) for d in allowed_dirs):
+                continue
+
+            # Skip allowed files
+            if rel_path_str in allowed_files:
+                continue
+
+            if _imports_module(file_path, "dulwich.porcelain"):
+                files_with_violations.append(rel_path_str)
+
+        if files_with_violations:
+            self.fail(
+                "The following test files use dulwich.porcelain but are not in the allowed list:\n"
+                + "\n".join(f"  - {f}" for f in files_with_violations)
+                + "\n\nLower-level tests should use dulwich APIs directly, not porcelain."
+                + "\n\nAllowed directories:\n"
+                + "\n".join(f"  - {d}" for d in sorted(allowed_dirs))
+                + "\nAllowed files:\n"
+                + "\n".join(f"  - {f}" for f in sorted(allowed_files))
+            )
+
+    def test_cli_usage_restricted_in_tests(self):
+        """Test that dulwich.cli is only used in CLI test directory."""
+        test_files = _get_python_files("tests")
+        self.assertGreater(len(test_files), 0, "No Python files found in tests/")
+
+        # Only CLI tests should import dulwich.cli
+        allowed_dir = "tests/cli/"
+
+        files_with_violations = []
+
+        for file_path, rel_path in test_files:
+            # Convert to forward slashes for consistency
+            rel_path_str = str(rel_path).replace(os.sep, "/")
+
+            # Skip allowed directory
+            if rel_path_str.startswith(allowed_dir):
+                continue
+
+            if _imports_module(file_path, "dulwich.cli"):
+                files_with_violations.append(rel_path_str)
+
+        if files_with_violations:
+            self.fail(
+                "The following test files use dulwich.cli but are not in tests/cli/:\n"
+                + "\n".join(f"  - {f}" for f in files_with_violations)
+                + "\n\nOnly CLI tests in tests/cli/ should import dulwich.cli."
+            )

+ 6 - 0
tests/test_web.py

@@ -22,6 +22,7 @@
 """Tests for the Git HTTP server."""
 
 import gzip
+import logging
 import os
 import re
 from io import BytesIO
@@ -101,6 +102,11 @@ class WebTestCase(TestCase):
 
     def setUp(self) -> None:
         super().setUp()
+        # Suppress expected error logging during web tests
+        web_logger = logging.getLogger("dulwich.web")
+        original_level = web_logger.level
+        web_logger.setLevel(logging.CRITICAL)
+        self.addCleanup(web_logger.setLevel, original_level)
         self._environ = {}
         self._req = self._req_class(
             self._environ, self._start_response, handlers=self._handlers()

+ 44 - 47
tests/test_worktree.py

@@ -27,8 +27,8 @@ import stat
 import tempfile
 from unittest import skipIf
 
-from dulwich import porcelain
 from dulwich.errors import CommitError
+from dulwich.index import get_unstaged_changes as _get_unstaged_changes
 from dulwich.object_store import tree_lookup_path
 from dulwich.repo import Repo
 from dulwich.worktree import (
@@ -47,6 +47,14 @@ from dulwich.worktree import (
 from . import TestCase
 
 
+def get_unstaged_changes(repo):
+    """Helper to get unstaged changes for a repo."""
+    index = repo.open_index()
+    normalizer = repo.get_blob_normalizer()
+    filter_callback = normalizer.checkin_normalize if normalizer else None
+    return list(_get_unstaged_changes(index, repo.path, filter_callback, False))
+
+
 class WorkTreeTestCase(TestCase):
     """Base test case for WorkTree tests."""
 
@@ -166,9 +174,8 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
 
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
-        porcelain.commit(
-            self.repo,
+        self.worktree.stage(["new_dir/foo"])
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
@@ -176,15 +183,9 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         with open(full_path, "a") as f:
             f.write("something new")
         self.worktree.unstage(["new_dir/foo"])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [
-                {"add": [], "delete": [], "modify": []},
-                [os.fsencode(os.path.join("new_dir", "foo"))],
-                [],
-            ],
-            status,
-        )
+
+        unstaged = get_unstaged_changes(self.repo)
+        self.assertEqual([b"new_dir/foo"], unstaged)
 
     def test_unstage_while_no_commit(self):
         """Test unstaging when there are no commits."""
@@ -192,31 +193,30 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         full_path = os.path.join(self.repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
+        self.worktree.stage([file])
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+
+        # Check that file is no longer in index
+        index = self.repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_add_file(self):
         """Test unstaging a newly added file."""
         file = "foo"
         full_path = os.path.join(self.repo.path, file)
-        porcelain.commit(
-            self.repo,
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
+        self.worktree.stage([file])
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [], [os.fsencode("foo")]], status
-        )
+
+        # Check that file is no longer in index
+        index = self.repo.open_index()
+        self.assertNotIn(b"foo", index)
 
     def test_unstage_modify_file(self):
         """Test unstaging a modified file."""
@@ -224,22 +224,19 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         full_path = os.path.join(self.repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
-        porcelain.commit(
-            self.repo,
+        self.worktree.stage([file])
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         with open(full_path, "a") as f:
             f.write("broken")
-        porcelain.add(self.repo, paths=[full_path])
+        self.worktree.stage([file])
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
 
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+        unstaged = get_unstaged_changes(self.repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
     def test_unstage_remove_file(self):
         """Test unstaging a removed file."""
@@ -247,19 +244,17 @@ class WorkTreeUnstagingTests(WorkTreeTestCase):
         full_path = os.path.join(self.repo.path, file)
         with open(full_path, "w") as f:
             f.write("hello")
-        porcelain.add(self.repo, paths=[full_path])
-        porcelain.commit(
-            self.repo,
+        self.worktree.stage([file])
+        self.worktree.commit(
             message=b"unittest",
             committer=b"Jane <jane@example.com>",
             author=b"John <john@example.com>",
         )
         os.remove(full_path)
         self.worktree.unstage([file])
-        status = list(porcelain.status(self.repo))
-        self.assertEqual(
-            [{"add": [], "delete": [], "modify": []}, [os.fsencode("foo")], []], status
-        )
+
+        unstaged = get_unstaged_changes(self.repo)
+        self.assertEqual([os.fsencode("foo")], unstaged)
 
 
 class WorkTreeCommitTests(WorkTreeTestCase):
@@ -920,8 +915,9 @@ class TemporaryWorktreeTests(TestCase):
         readme_path = os.path.join(self.repo_path, "README.md")
         with open(readme_path, "w") as f:
             f.write("# Test Repository\n")
-        porcelain.add(self.repo, [readme_path])
-        porcelain.commit(self.repo, message=b"Initial commit")
+        wt = self.repo.get_worktree()
+        wt.stage(["README.md"])
+        wt.commit(message=b"Initial commit")
 
     def test_temporary_worktree_creates_and_cleans_up(self) -> None:
         """Test that temporary worktree is created and cleaned up."""
@@ -990,8 +986,9 @@ class TemporaryWorktreeTests(TestCase):
         with open(test_file, "w") as f:
             f.write("Hello, world!")
 
-        porcelain.add(self.repo, [test_file])
-        porcelain.commit(self.repo, message=b"Initial commit")
+        wt = self.repo.get_worktree()
+        wt.stage(["test.txt"])
+        wt.commit(message=b"Initial commit")
 
         with temporary_worktree(self.repo) as worktree:
             # Check that the file exists in the worktree
@@ -1007,6 +1004,6 @@ class TemporaryWorktreeTests(TestCase):
             with open(wt_test_file, "w") as f:
                 f.write("Modified content")
 
-            # Changes should be visible in status
-            status = porcelain.status(worktree)
-            self.assertIn(os.fsencode("test.txt"), status.unstaged)
+            # Changes should be visible as unstaged
+            unstaged = get_unstaged_changes(worktree)
+            self.assertIn(os.fsencode("test.txt"), unstaged)