Explorar o código

Add maintenance subcommand (#1960)

Jelmer Vernooij hai 2 meses
pai
achega
8323849830
Modificáronse 6 ficheiros con 1080 adicións e 19 borrados
  1. 5 0
      NEWS
  2. 117 0
      dulwich/cli.py
  3. 511 0
      dulwich/maintenance.py
  4. 55 0
      dulwich/porcelain.py
  5. 78 19
      tests/test_cli.py
  6. 314 0
      tests/test_maintenance.py

+ 5 - 0
NEWS

@@ -13,6 +13,11 @@
    ``porcelain.submodule_update()``.
    (Jelmer Vernooij, #1813)
 
+ * Add support for ``git maintenance`` command to optimize Git repository data.
+   Implements gc, commit-graph, loose-objects, incremental-repack, pack-refs, and
+   prefetch tasks. Supports automatic maintenance with ``--auto`` flag and task-specific
+   configuration. (Jelmer Vernooij)
+
 0.24.7	2025-10-23
 
  * Add sparse index support for improved performance with large repositories.

+ 117 - 0
dulwich/cli.py

@@ -4321,6 +4321,122 @@ class cmd_gc(Command):
         return None
 
 
+class cmd_maintenance(Command):
+    """Run tasks to optimize Git repository data."""
+
+    def run(self, args: Sequence[str]) -> Optional[int]:
+        """Execute the maintenance command.
+
+        Args:
+            args: Command line arguments
+        """
+        parser = argparse.ArgumentParser(
+            description="Run tasks to optimize Git repository data"
+        )
+        subparsers = parser.add_subparsers(
+            dest="subcommand", help="Maintenance subcommand"
+        )
+
+        # maintenance run subcommand
+        run_parser = subparsers.add_parser("run", help="Run maintenance tasks")
+        run_parser.add_argument(
+            "--task",
+            action="append",
+            dest="tasks",
+            help="Run a specific task (can be specified multiple times)",
+        )
+        run_parser.add_argument(
+            "--auto",
+            action="store_true",
+            help="Only run tasks if needed",
+        )
+        run_parser.add_argument(
+            "--quiet",
+            "-q",
+            action="store_true",
+            help="Only report errors",
+        )
+
+        # maintenance start subcommand (placeholder)
+        subparsers.add_parser("start", help="Start background maintenance")
+
+        # maintenance stop subcommand (placeholder)
+        subparsers.add_parser("stop", help="Stop background maintenance")
+
+        # maintenance register subcommand
+        subparsers.add_parser("register", help="Register repository for maintenance")
+
+        # maintenance unregister subcommand
+        unregister_parser = subparsers.add_parser(
+            "unregister", help="Unregister repository from maintenance"
+        )
+        unregister_parser.add_argument(
+            "--force",
+            action="store_true",
+            help="Don't error if repository is not registered",
+        )
+
+        parsed_args = parser.parse_args(args)
+
+        if not parsed_args.subcommand:
+            parser.print_help()
+            return 1
+
+        if parsed_args.subcommand == "run":
+            # Progress callback
+            def progress(msg: str) -> None:
+                if not parsed_args.quiet:
+                    logger.info(msg)
+
+            try:
+                result = porcelain.maintenance_run(
+                    ".",
+                    tasks=parsed_args.tasks,
+                    auto=parsed_args.auto,
+                    progress=progress if not parsed_args.quiet else None,
+                )
+
+                # Report results
+                if not parsed_args.quiet:
+                    if result.tasks_succeeded:
+                        logger.info("\nSuccessfully completed tasks:")
+                        for task in result.tasks_succeeded:
+                            logger.info(f"  - {task}")
+
+                    if result.tasks_failed:
+                        logger.error("\nFailed tasks:")
+                        for task in result.tasks_failed:
+                            error_msg = result.errors.get(task, "Unknown error")
+                            logger.error(f"  - {task}: {error_msg}")
+                        return 1
+
+            except porcelain.Error as e:
+                logger.error("%s", e)
+                return 1
+        elif parsed_args.subcommand == "register":
+            porcelain.maintenance_register(".")
+            logger.info("Repository registered for background maintenance")
+        elif parsed_args.subcommand == "unregister":
+            try:
+                force = getattr(parsed_args, "force", False)
+                porcelain.maintenance_unregister(".", force=force)
+            except ValueError as e:
+                logger.error(str(e))
+                return 1
+            logger.info("Repository unregistered from background maintenance")
+        elif parsed_args.subcommand in ("start", "stop"):
+            # TODO: Implement background maintenance scheduling
+            logger.error(
+                f"The '{parsed_args.subcommand}' subcommand is not yet implemented"
+            )
+            return 1
+        else:
+            parser.print_help()
+            return 1
+
+        return None
+
+
 class cmd_grep(Command):
     """Search for patterns in tracked files."""
 
@@ -5787,6 +5903,7 @@ commands = {
     "ls-files": cmd_ls_files,
     "ls-remote": cmd_ls_remote,
     "ls-tree": cmd_ls_tree,
+    "maintenance": cmd_maintenance,
     "mailsplit": cmd_mailsplit,
     "merge": cmd_merge,
     "merge-base": cmd_merge_base,

+ 511 - 0
dulwich/maintenance.py

@@ -0,0 +1,511 @@
+"""Git maintenance implementation.
+
+This module provides the git maintenance functionality for optimizing
+and maintaining Git repositories.
+"""
+
+import logging
+import os
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import TYPE_CHECKING, Callable, Optional
+
+if TYPE_CHECKING:
+    from .repo import BaseRepo, Repo
+
+logger = logging.getLogger(__name__)
+
+
+class MaintenanceSchedule(str, Enum):
+    """Maintenance schedule types."""
+
+    HOURLY = "hourly"
+    DAILY = "daily"
+    WEEKLY = "weekly"
+
+
+@dataclass
+class MaintenanceResult:
+    """Result from running maintenance tasks."""
+
+    tasks_run: list[str] = field(default_factory=list)
+    tasks_succeeded: list[str] = field(default_factory=list)
+    tasks_failed: list[str] = field(default_factory=list)
+    errors: dict[str, str] = field(default_factory=dict)
+
+
+class MaintenanceTask(ABC):
+    """Base class for maintenance tasks."""
+
+    name: str = ""
+
+    def __init__(
+        self,
+        repo: "BaseRepo",
+        auto: bool = False,
+        progress: Optional[Callable[[str], None]] = None,
+    ) -> None:
+        """Initialize maintenance task.
+
+        Args:
+            repo: Repository object
+            auto: If True, only run if needed
+            progress: Optional progress callback
+        """
+        self.repo = repo
+        self.auto = auto
+        self.progress = progress
+
+    @abstractmethod
+    def run(self) -> bool:
+        """Run the maintenance task.
+
+        Returns:
+            True if successful, False otherwise
+        """
+
+    def is_enabled(self) -> bool:
+        """Check if task is enabled in repository configuration.
+
+        Returns:
+            True if task is enabled
+        """
+        if not self.name:
+            return False
+
+        config = self.repo.get_config()
+
+        try:
+            enabled = config.get_boolean(
+                (b"maintenance", self.name.encode()), b"enabled"
+            )
+            return enabled if enabled is not None else self.default_enabled()
+        except KeyError:
+            # Return default enabled state
+            return self.default_enabled()
+
+    def default_enabled(self) -> bool:
+        """Return default enabled state for this task.
+
+        Returns:
+            True if task should be enabled by default
+        """
+        return False
+
+
+class GcTask(MaintenanceTask):
+    """Garbage collection maintenance task."""
+
+    name = "gc"
+
+    def default_enabled(self) -> bool:
+        """GC is enabled by default."""
+        return True
+
+    def run(self) -> bool:
+        """Run garbage collection.
+
+        Returns:
+            True if successful, False otherwise
+        """
+        from .gc import garbage_collect
+        from .repo import Repo
+
+        if self.progress:
+            self.progress("Running gc task")
+        assert isinstance(self.repo, Repo)
+        garbage_collect(self.repo, auto=self.auto, progress=self.progress)
+        return True
+
+
+class CommitGraphTask(MaintenanceTask):
+    """Commit-graph maintenance task."""
+
+    name = "commit-graph"
+
+    def default_enabled(self) -> bool:
+        """Commit-graph is enabled by default."""
+        return True
+
+    def run(self) -> bool:
+        """Update commit-graph file.
+
+        Returns:
+            True if successful, False otherwise
+        """
+        if self.progress:
+            self.progress("Running commit-graph task")
+
+        # Get all refs
+        refs = list(self.repo.refs.as_dict().values())
+        if refs:
+            self.repo.object_store.write_commit_graph(refs, reachable=True)
+        return True
+
+
+class LooseObjectsTask(MaintenanceTask):
+    """Loose-objects maintenance task.
+
+    This packs loose objects that are not already packed.
+    """
+
+    name = "loose-objects"
+
+    def run(self) -> bool:
+        """Pack loose objects.
+
+        Returns:
+            True if successful, False otherwise
+        """
+        from .object_store import PackBasedObjectStore
+
+        if self.progress:
+            self.progress("Running loose-objects task")
+
+        # Pack loose objects using the object store's method
+        assert isinstance(self.repo.object_store, PackBasedObjectStore)
+        count = self.repo.object_store.pack_loose_objects(progress=self.progress)
+
+        if self.progress and count > 0:
+            self.progress(f"Packed {count} loose objects")
+
+        return True
+
+
+class IncrementalRepackTask(MaintenanceTask):
+    """Incremental-repack maintenance task.
+
+    This consolidates pack files incrementally.
+    """
+
+    name = "incremental-repack"
+
+    def run(self) -> bool:
+        """Consolidate pack files incrementally.
+
+        Returns:
+            True if successful, False otherwise
+        """
+        from .object_store import PackBasedObjectStore
+
+        if self.progress:
+            self.progress("Running incremental-repack task")
+
+        # Get all packs sorted by size
+        assert isinstance(self.repo.object_store, PackBasedObjectStore)
+        packs = self.repo.object_store.packs
+        if len(packs) <= 1:
+            # Nothing to consolidate
+            if self.progress:
+                self.progress("No packs to consolidate")
+            return True
+
+        # In auto mode, only repack if there are many small packs
+        # This is a heuristic similar to git's auto gc behavior
+        if self.auto:
+            # Only repack if we have more than 50 packs
+            # (matching git's gc.autoPackLimit default)
+            if len(packs) < 50:
+                if self.progress:
+                    self.progress(
+                        f"Skipping incremental repack: only {len(packs)} packs"
+                    )
+                return True
+
+        # Perform a full repack to consolidate all packs
+        if self.progress:
+            self.progress(f"Consolidating {len(packs)} pack files")
+
+        count = self.repo.object_store.repack(progress=self.progress)
+
+        if self.progress:
+            self.progress(f"Repacked {count} objects")
+
+        return True
+
+
+class PackRefsTask(MaintenanceTask):
+    """Pack-refs maintenance task."""
+
+    name = "pack-refs"
+
+    def run(self) -> bool:
+        """Pack loose references.
+
+        Returns:
+            True if successful, False otherwise
+        """
+        if self.progress:
+            self.progress("Running pack-refs task")
+
+        self.repo.refs.pack_refs(all=True)
+        return True
+
+
+class PrefetchTask(MaintenanceTask):
+    """Prefetch maintenance task.
+
+    This prefetches remote refs to keep the object database up-to-date.
+    """
+
+    name = "prefetch"
+
+    def run(self) -> bool:
+        """Prefetch remote refs.
+
+        Returns:
+            True if successful, False otherwise
+        """
+        from .porcelain import fetch
+        from .repo import Repo
+
+        if self.progress:
+            self.progress("Running prefetch task")
+
+        config = self.repo.get_config()
+
+        # Get all configured remotes
+        remotes = set()
+        for section in config.sections():
+            if len(section) == 2 and section[0] == b"remote":
+                remotes.add(section[1].decode())
+
+        if not remotes:
+            if self.progress:
+                self.progress("No remotes configured, skipping prefetch")
+            return True
+
+        # Fetch from each remote
+        success = True
+        for remote_name in sorted(remotes):
+            try:
+                if self.progress:
+                    self.progress(f"Fetching from {remote_name}")
+
+                # Fetch quietly without updating working tree
+                # The fetch operation will update refs under refs/remotes/
+                assert isinstance(self.repo, Repo)
+                fetch(
+                    self.repo,
+                    remote_location=remote_name,
+                    quiet=True,
+                )
+            except Exception as e:
+                # Log error and mark as failed
+                logger.error(f"Failed to fetch from {remote_name}: {e}")
+                success = False
+
+        return success
+
+
+# Registry of available maintenance tasks
+MAINTENANCE_TASKS: dict[str, type[MaintenanceTask]] = {
+    "gc": GcTask,
+    "commit-graph": CommitGraphTask,
+    "loose-objects": LooseObjectsTask,
+    "incremental-repack": IncrementalRepackTask,
+    "pack-refs": PackRefsTask,
+    "prefetch": PrefetchTask,
+}
+
+
+def get_enabled_tasks(
+    repo: "BaseRepo",
+    task_filter: Optional[list[str]] = None,
+) -> list[str]:
+    """Get list of enabled maintenance tasks.
+
+    Args:
+        repo: Repository object
+        task_filter: Optional list of specific task names to run
+
+    Returns:
+        List of enabled task names
+    """
+    if task_filter:
+        # Validate requested tasks exist
+        return [name for name in task_filter if name in MAINTENANCE_TASKS]
+
+    enabled_tasks = []
+
+    # Check each task to see if it's enabled
+    for task_name, task_class in MAINTENANCE_TASKS.items():
+        # Create temporary task instance to check if enabled
+        task = task_class(repo, auto=False, progress=None)
+        if task.is_enabled():
+            enabled_tasks.append(task_name)
+
+    return enabled_tasks
+
+
+def run_maintenance(
+    repo: "BaseRepo",
+    tasks: Optional[list[str]] = None,
+    auto: bool = False,
+    progress: Optional[Callable[[str], None]] = None,
+) -> MaintenanceResult:
+    """Run maintenance tasks on a repository.
+
+    Args:
+        repo: Repository object
+        tasks: Optional list of specific task names to run
+        auto: If True, only run tasks if needed
+        progress: Optional progress callback
+
+    Returns:
+        MaintenanceResult with task execution results
+    """
+    result = MaintenanceResult()
+
+    enabled_tasks = get_enabled_tasks(repo, tasks)
+
+    for task_name in enabled_tasks:
+        result.tasks_run.append(task_name)
+
+        task_class = MAINTENANCE_TASKS.get(task_name)
+        if not task_class:
+            result.tasks_failed.append(task_name)
+            result.errors[task_name] = "Unknown task"
+            continue
+
+        try:
+            task = task_class(repo, auto=auto, progress=progress)
+            success = task.run()
+
+            if success:
+                result.tasks_succeeded.append(task_name)
+            else:
+                result.tasks_failed.append(task_name)
+        except Exception as e:
+            result.tasks_failed.append(task_name)
+            result.errors[task_name] = str(e)
+            logger.error(f"Task {task_name} failed: {e}")
+
+    return result
+
+
+def register_repository(repo: "Repo") -> None:
+    """Register a repository for background maintenance.
+
+    This adds the repository to the global maintenance.repo config and sets
+    up recommended configuration for scheduled maintenance.
+
+    Args:
+        repo: Repository to register
+    """
+    from .config import ConfigFile
+
+    repo_path = os.path.abspath(repo.path)
+
+    # Get global config path
+    global_config_path = os.path.expanduser("~/.gitconfig")
+    try:
+        global_config = ConfigFile.from_path(global_config_path)
+    except FileNotFoundError:
+        # Create new config file if it doesn't exist
+        global_config = ConfigFile()
+        global_config.path = global_config_path
+
+    # Add repository to maintenance.repo list
+    # Check if already registered
+    repo_path_bytes = repo_path.encode()
+    try:
+        existing_repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
+    except KeyError:
+        existing_repos = []
+
+    if repo_path_bytes in existing_repos:
+        # Already registered
+        return
+
+    # Add to global config
+    global_config.set((b"maintenance",), b"repo", repo_path_bytes)
+
+    # Set up incremental strategy in global config if not already set
+    try:
+        global_config.get((b"maintenance",), b"strategy")
+    except KeyError:
+        global_config.set((b"maintenance",), b"strategy", b"incremental")
+
+    # Configure task schedules for incremental strategy
+    schedule_config = {
+        b"commit-graph": b"hourly",
+        b"prefetch": b"hourly",
+        b"loose-objects": b"daily",
+        b"incremental-repack": b"daily",
+    }
+
+    for task, schedule in schedule_config.items():
+        try:
+            global_config.get((b"maintenance", task), b"schedule")
+        except KeyError:
+            global_config.set((b"maintenance", task), b"schedule", schedule)
+
+    global_config.write_to_path()
+
+    # Disable foreground auto maintenance in the repository
+    repo_config = repo.get_config()
+    repo_config.set((b"maintenance",), b"auto", False)
+    repo_config.write_to_path()
+
+
+def unregister_repository(repo: "Repo", force: bool = False) -> None:
+    """Unregister a repository from background maintenance.
+
+    This removes the repository from the global maintenance.repo config.
+
+    Args:
+        repo: Repository to unregister
+        force: If True, don't error if repository is not registered
+
+    Raises:
+        ValueError: If repository is not registered and force is False
+    """
+    from .config import ConfigFile
+
+    repo_path = os.path.abspath(repo.path)
+
+    # Get global config
+    global_config_path = os.path.expanduser("~/.gitconfig")
+    try:
+        global_config = ConfigFile.from_path(global_config_path)
+    except FileNotFoundError:
+        if not force:
+            raise ValueError(
+                f"Repository {repo_path} is not registered for maintenance"
+            )
+        return
+
+    # Check if repository is registered
+    repo_path_bytes = repo_path.encode()
+    try:
+        existing_repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
+    except KeyError:
+        if not force:
+            raise ValueError(
+                f"Repository {repo_path} is not registered for maintenance"
+            )
+        return
+
+    if repo_path_bytes not in existing_repos:
+        if not force:
+            raise ValueError(
+                f"Repository {repo_path} is not registered for maintenance"
+            )
+        return
+
+    # Remove from list
+    existing_repos.remove(repo_path_bytes)
+
+    # Delete the maintenance section and recreate it with remaining repos
+    try:
+        del global_config[(b"maintenance",)]
+    except KeyError:
+        pass
+
+    # Re-add remaining repos
+    for remaining_repo in existing_repos:
+        global_config.set((b"maintenance",), b"repo", remaining_repo)
+
+    global_config.write_to_path()

+ 55 - 0
dulwich/porcelain.py

@@ -120,6 +120,7 @@ else:
 if TYPE_CHECKING:
     from .filter_branch import CommitData
     from .gc import GCStats
+    from .maintenance import MaintenanceResult
 
 from . import replace_me
 from .archive import tar_stream
@@ -6386,6 +6387,60 @@ def prune(
             r.object_store.prune(grace_period=grace_period)
 
 
+def maintenance_run(
+    repo: RepoPath,
+    tasks: Optional[list[str]] = None,
+    auto: bool = False,
+    progress: Optional[Callable[[str], None]] = None,
+) -> "MaintenanceResult":
+    """Run maintenance tasks on a repository.
+
+    Args:
+      repo: Path to the repository or a Repo object
+      tasks: Optional list of specific task names to run
+             (e.g., ['gc', 'commit-graph', 'pack-refs'])
+      auto: If True, only run tasks if needed
+      progress: Optional progress callback
+
+    Returns:
+      MaintenanceResult object with task execution results
+    """
+    from .maintenance import run_maintenance
+
+    with open_repo_closing(repo) as r:
+        return run_maintenance(r, tasks=tasks, auto=auto, progress=progress)
+
+
+def maintenance_register(repo: RepoPath) -> None:
+    """Register a repository for background maintenance.
+
+    This adds the repository to the global maintenance.repo config and sets
+    up recommended configuration for scheduled maintenance.
+
+    Args:
+      repo: Path to the repository or repository object
+    """
+    from .maintenance import register_repository
+
+    with open_repo_closing(repo) as r:
+        register_repository(r)
+
+
+def maintenance_unregister(repo: RepoPath, force: bool = False) -> None:
+    """Unregister a repository from background maintenance.
+
+    This removes the repository from the global maintenance.repo config.
+
+    Args:
+      repo: Path to the repository or repository object
+      force: If True, don't error if repository is not registered
+    """
+    from .maintenance import unregister_repository
+
+    with open_repo_closing(repo) as r:
+        unregister_repository(r, force=force)
+
+
 def count_objects(repo: RepoPath = ".", verbose: bool = False) -> CountObjectsResult:
     """Count unpacked objects and their disk usage.
 

+ 78 - 19
tests/test_cli.py

@@ -34,6 +34,9 @@ from unittest.mock import MagicMock, patch
 
 from dulwich import cli
 from dulwich.cli import (
+    AutoFlushBinaryIOWrapper,
+    AutoFlushTextIOWrapper,
+    _should_auto_flush,
     detect_terminal_width,
     format_bytes,
     launch_editor,
@@ -3677,7 +3680,6 @@ class GitFlushTest(TestCase):
 
     def test_should_auto_flush_with_git_flush_1(self):
         """Test that GIT_FLUSH=1 enables auto-flushing."""
-        from dulwich.cli import _should_auto_flush
 
         mock_stream = MagicMock()
         mock_stream.isatty.return_value = True
@@ -3686,8 +3688,6 @@ class GitFlushTest(TestCase):
 
     def test_should_auto_flush_with_git_flush_0(self):
         """Test that GIT_FLUSH=0 disables auto-flushing."""
-        from dulwich.cli import _should_auto_flush
-
         mock_stream = MagicMock()
         mock_stream.isatty.return_value = True
 
@@ -3695,8 +3695,6 @@ class GitFlushTest(TestCase):
 
     def test_should_auto_flush_auto_detect_tty(self):
         """Test that auto-detect returns False for TTY (no flush needed)."""
-        from dulwich.cli import _should_auto_flush
-
         mock_stream = MagicMock()
         mock_stream.isatty.return_value = True
 
@@ -3704,8 +3702,6 @@ class GitFlushTest(TestCase):
 
     def test_should_auto_flush_auto_detect_pipe(self):
         """Test that auto-detect returns True for pipes (flush needed)."""
-        from dulwich.cli import _should_auto_flush
-
         mock_stream = MagicMock()
         mock_stream.isatty.return_value = False
 
@@ -3713,8 +3709,6 @@ class GitFlushTest(TestCase):
 
     def test_text_wrapper_flushes_on_write(self):
         """Test that AutoFlushTextIOWrapper flushes after write."""
-        from dulwich.cli import AutoFlushTextIOWrapper
-
         mock_stream = MagicMock()
         wrapper = AutoFlushTextIOWrapper(mock_stream)
 
@@ -3735,8 +3729,6 @@ class GitFlushTest(TestCase):
 
     def test_binary_wrapper_flushes_on_write(self):
         """Test that AutoFlushBinaryIOWrapper flushes after write."""
-        from dulwich.cli import AutoFlushBinaryIOWrapper
-
         mock_stream = MagicMock()
         wrapper = AutoFlushBinaryIOWrapper(mock_stream)
 
@@ -3746,8 +3738,6 @@ class GitFlushTest(TestCase):
 
     def test_text_wrapper_env_classmethod(self):
         """Test that AutoFlushTextIOWrapper.env() respects GIT_FLUSH."""
-        from dulwich.cli import AutoFlushTextIOWrapper
-
         mock_stream = MagicMock()
         mock_stream.isatty.return_value = False
 
@@ -3759,8 +3749,6 @@ class GitFlushTest(TestCase):
 
     def test_binary_wrapper_env_classmethod(self):
         """Test that AutoFlushBinaryIOWrapper.env() respects GIT_FLUSH."""
-        from dulwich.cli import AutoFlushBinaryIOWrapper
-
         mock_stream = MagicMock()
         mock_stream.isatty.return_value = False
 
@@ -3772,8 +3760,6 @@ class GitFlushTest(TestCase):
 
     def test_wrapper_delegates_attributes(self):
         """Test that wrapper delegates unknown attributes to stream."""
-        from dulwich.cli import AutoFlushTextIOWrapper
-
         mock_stream = MagicMock()
         mock_stream.encoding = "utf-8"
         wrapper = AutoFlushTextIOWrapper(mock_stream)
@@ -3782,8 +3768,6 @@ class GitFlushTest(TestCase):
 
     def test_wrapper_context_manager(self):
         """Test that wrapper supports context manager protocol."""
-        from dulwich.cli import AutoFlushTextIOWrapper
-
         mock_stream = MagicMock()
         wrapper = AutoFlushTextIOWrapper(mock_stream)
 
@@ -3791,5 +3775,80 @@ class GitFlushTest(TestCase):
             self.assertIs(w, wrapper)
 
 
+class MaintenanceCommandTest(DulwichCliTestCase):
+    """Tests for maintenance command."""
+
+    def setUp(self):
+        super().setUp()
+        # Set up a temporary HOME for testing global config
+        self.temp_home = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.temp_home)
+        self.overrideEnv("HOME", self.temp_home)
+
+    def test_maintenance_run_default(self):
+        """Test maintenance run with default tasks."""
+        result, _stdout, _stderr = self._run_cli("maintenance", "run")
+        self.assertIsNone(result)
+
+    def test_maintenance_run_specific_task(self):
+        """Test maintenance run with a specific task."""
+        result, _stdout, _stderr = self._run_cli(
+            "maintenance", "run", "--task", "pack-refs"
+        )
+        self.assertIsNone(result)
+
+    def test_maintenance_run_multiple_tasks(self):
+        """Test maintenance run with multiple specific tasks."""
+        result, _stdout, _stderr = self._run_cli(
+            "maintenance", "run", "--task", "pack-refs", "--task", "gc"
+        )
+        self.assertIsNone(result)
+
+    def test_maintenance_run_quiet(self):
+        """Test maintenance run with quiet flag."""
+        result, _stdout, _stderr = self._run_cli("maintenance", "run", "--quiet")
+        self.assertIsNone(result)
+
+    def test_maintenance_run_auto(self):
+        """Test maintenance run with auto flag."""
+        result, _stdout, _stderr = self._run_cli("maintenance", "run", "--auto")
+        self.assertIsNone(result)
+
+    def test_maintenance_no_subcommand(self):
+        """Test maintenance command without subcommand shows help."""
+        result, _stdout, _stderr = self._run_cli("maintenance")
+        self.assertEqual(result, 1)
+
+    def test_maintenance_register(self):
+        """Test maintenance register subcommand."""
+        result, _stdout, _stderr = self._run_cli("maintenance", "register")
+        self.assertIsNone(result)
+
+    def test_maintenance_unregister(self):
+        """Test maintenance unregister subcommand."""
+        # First register
+        _result, _stdout, _stderr = self._run_cli("maintenance", "register")
+
+        # Then unregister
+        result, _stdout, _stderr = self._run_cli("maintenance", "unregister")
+        self.assertIsNone(result)
+
+    def test_maintenance_unregister_not_registered(self):
+        """Test unregistering a repository that is not registered."""
+        result, _stdout, _stderr = self._run_cli("maintenance", "unregister")
+        self.assertEqual(result, 1)
+
+    def test_maintenance_unregister_force(self):
+        """Test unregistering with --force flag."""
+        result, _stdout, _stderr = self._run_cli("maintenance", "unregister", "--force")
+        self.assertIsNone(result)
+
+    def test_maintenance_unimplemented_subcommand(self):
+        """Test unimplemented maintenance subcommands."""
+        for subcommand in ["start", "stop"]:
+            result, _stdout, _stderr = self._run_cli("maintenance", subcommand)
+            self.assertEqual(result, 1)
+
+
 if __name__ == "__main__":
     unittest.main()

+ 314 - 0
tests/test_maintenance.py

@@ -0,0 +1,314 @@
+# test_maintenance.py -- tests for maintenance functionality
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for dulwich.maintenance."""
+
+import tempfile
+
+from dulwich import porcelain
+from dulwich.maintenance import (
+    CommitGraphTask,
+    GcTask,
+    IncrementalRepackTask,
+    LooseObjectsTask,
+    PackRefsTask,
+    PrefetchTask,
+    get_enabled_tasks,
+    run_maintenance,
+)
+from dulwich.objects import Blob
+from dulwich.repo import Repo
+
+from . import TestCase
+
+
+class MaintenanceTaskTestCase(TestCase):
+    """Base class for maintenance task tests."""
+
+    def setUp(self):
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(self._cleanup_test_dir)
+        self.repo = Repo.init(self.test_dir)
+        self.addCleanup(self.repo.close)
+
+    def _cleanup_test_dir(self):
+        import shutil
+
+        shutil.rmtree(self.test_dir)
+
+    def _create_commit(self):
+        """Create a simple commit in the test repository."""
+        blob = Blob.from_string(b"test content")
+        self.repo.object_store.add_object(blob)
+        return blob
+
+
+class GcTaskTest(MaintenanceTaskTestCase):
+    """Tests for GcTask."""
+
+    def test_default_enabled(self):
+        """Test that GC task is enabled by default."""
+        task = GcTask(self.repo)
+        self.assertTrue(task.default_enabled())
+        self.assertTrue(task.is_enabled())
+
+    def test_run(self):
+        """Test running GC task."""
+        self._create_commit()
+        task = GcTask(self.repo)
+        result = task.run()
+        self.assertTrue(result)
+
+
+class CommitGraphTaskTest(MaintenanceTaskTestCase):
+    """Tests for CommitGraphTask."""
+
+    def test_default_enabled(self):
+        """Test that commit-graph task is enabled by default."""
+        task = CommitGraphTask(self.repo)
+        self.assertTrue(task.default_enabled())
+        self.assertTrue(task.is_enabled())
+
+    def test_run(self):
+        """Test running commit-graph task."""
+        self._create_commit()
+        task = CommitGraphTask(self.repo)
+        result = task.run()
+        self.assertTrue(result)
+
+
+class LooseObjectsTaskTest(MaintenanceTaskTestCase):
+    """Tests for LooseObjectsTask."""
+
+    def test_default_enabled(self):
+        """Test that loose-objects task is disabled by default."""
+        task = LooseObjectsTask(self.repo)
+        self.assertFalse(task.default_enabled())
+
+    def test_run(self):
+        """Test running loose-objects task."""
+        self._create_commit()
+        task = LooseObjectsTask(self.repo)
+        result = task.run()
+        self.assertTrue(result)
+
+
+class IncrementalRepackTaskTest(MaintenanceTaskTestCase):
+    """Tests for IncrementalRepackTask."""
+
+    def test_default_enabled(self):
+        """Test that incremental-repack task is disabled by default."""
+        task = IncrementalRepackTask(self.repo)
+        self.assertFalse(task.default_enabled())
+
+    def test_run_no_packs(self):
+        """Test running incremental-repack with no packs."""
+        task = IncrementalRepackTask(self.repo)
+        result = task.run()
+        self.assertTrue(result)
+
+    def test_run_auto_few_packs(self):
+        """Test that auto mode skips repacking when there are few packs."""
+        self._create_commit()
+        task = IncrementalRepackTask(self.repo, auto=True)
+        result = task.run()
+        self.assertTrue(result)
+
+
+class PackRefsTaskTest(MaintenanceTaskTestCase):
+    """Tests for PackRefsTask."""
+
+    def test_default_enabled(self):
+        """Test that pack-refs task is disabled by default."""
+        task = PackRefsTask(self.repo)
+        self.assertFalse(task.default_enabled())
+
+    def test_run(self):
+        """Test running pack-refs task."""
+        task = PackRefsTask(self.repo)
+        result = task.run()
+        self.assertTrue(result)
+
+
+class PrefetchTaskTest(MaintenanceTaskTestCase):
+    """Tests for PrefetchTask."""
+
+    def test_default_enabled(self):
+        """Test that prefetch task is disabled by default."""
+        task = PrefetchTask(self.repo)
+        self.assertFalse(task.default_enabled())
+
+    def test_run_no_remotes(self):
+        """Test running prefetch with no remotes configured."""
+        task = PrefetchTask(self.repo)
+        result = task.run()
+        self.assertTrue(result)
+
+
+class MaintenanceFunctionsTest(MaintenanceTaskTestCase):
+    """Tests for maintenance module functions."""
+
+    def test_get_enabled_tasks_default(self):
+        """Test getting enabled tasks with defaults."""
+        enabled = get_enabled_tasks(self.repo)
+        # By default, only gc and commit-graph are enabled
+        self.assertIn("gc", enabled)
+        self.assertIn("commit-graph", enabled)
+        self.assertNotIn("loose-objects", enabled)
+        self.assertNotIn("incremental-repack", enabled)
+        self.assertNotIn("pack-refs", enabled)
+        self.assertNotIn("prefetch", enabled)
+
+    def test_get_enabled_tasks_with_filter(self):
+        """Test getting enabled tasks with a filter."""
+        enabled = get_enabled_tasks(self.repo, ["gc", "pack-refs"])
+        self.assertEqual(set(enabled), {"gc", "pack-refs"})
+
+    def test_get_enabled_tasks_invalid(self):
+        """Test that invalid task names are ignored."""
+        enabled = get_enabled_tasks(self.repo, ["gc", "invalid-task"])
+        self.assertEqual(enabled, ["gc"])
+
+    def test_run_maintenance(self):
+        """Test running maintenance tasks."""
+        self._create_commit()
+        result = run_maintenance(self.repo)
+        self.assertIn("gc", result.tasks_run)
+        self.assertIn("commit-graph", result.tasks_run)
+        self.assertIn("gc", result.tasks_succeeded)
+        self.assertIn("commit-graph", result.tasks_succeeded)
+        self.assertEqual(len(result.tasks_failed), 0)
+
+    def test_run_maintenance_specific_tasks(self):
+        """Test running specific maintenance tasks."""
+        result = run_maintenance(self.repo, tasks=["pack-refs"])
+        self.assertEqual(result.tasks_run, ["pack-refs"])
+        self.assertEqual(result.tasks_succeeded, ["pack-refs"])
+        self.assertEqual(len(result.tasks_failed), 0)
+
+    def test_run_maintenance_with_progress(self):
+        """Test running maintenance with progress callback."""
+        messages = []
+
+        def progress(msg):
+            messages.append(msg)
+
+        self._create_commit()
+        result = run_maintenance(self.repo, progress=progress)
+        self.assertGreater(len(messages), 0)
+        self.assertIn("gc", result.tasks_succeeded)
+
+
+class PorcelainMaintenanceTest(MaintenanceTaskTestCase):
+    """Tests for porcelain.maintenance_run function."""
+
+    def test_maintenance_run(self):
+        """Test porcelain maintenance_run function."""
+        self._create_commit()
+        result = porcelain.maintenance_run(self.test_dir)
+        self.assertIn("gc", result.tasks_succeeded)
+        self.assertIn("commit-graph", result.tasks_succeeded)
+
+    def test_maintenance_run_with_tasks(self):
+        """Test porcelain maintenance_run with specific tasks."""
+        result = porcelain.maintenance_run(self.test_dir, tasks=["pack-refs"])
+        self.assertEqual(result.tasks_run, ["pack-refs"])
+        self.assertEqual(result.tasks_succeeded, ["pack-refs"])
+
+
+class MaintenanceRegisterTest(MaintenanceTaskTestCase):
+    """Tests for maintenance register/unregister."""
+
+    def setUp(self):
+        super().setUp()
+        # Set up a temporary HOME for testing global config
+        self.temp_home = tempfile.mkdtemp()
+        self.addCleanup(self._cleanup_temp_home)
+        self.overrideEnv("HOME", self.temp_home)
+
+    def _cleanup_temp_home(self):
+        import shutil
+
+        shutil.rmtree(self.temp_home)
+
+    def test_register_repository(self):
+        """Test registering a repository for maintenance."""
+        porcelain.maintenance_register(self.test_dir)
+
+        # Verify repository was added to global config
+        import os
+
+        from dulwich.config import ConfigFile
+
+        global_config_path = os.path.expanduser("~/.gitconfig")
+        global_config = ConfigFile.from_path(global_config_path)
+
+        repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
+        self.assertIn(self.test_dir.encode(), repos)
+
+        # Verify strategy was set
+        strategy = global_config.get((b"maintenance",), b"strategy")
+        self.assertEqual(strategy, b"incremental")
+
+        # Verify auto maintenance was disabled in repo
+        repo_config = self.repo.get_config()
+        auto = repo_config.get_boolean((b"maintenance",), b"auto")
+        self.assertFalse(auto)
+
+    def test_register_already_registered(self):
+        """Test registering an already registered repository."""
+        porcelain.maintenance_register(self.test_dir)
+        # Should not error when registering again
+        porcelain.maintenance_register(self.test_dir)
+
+    def test_unregister_repository(self):
+        """Test unregistering a repository."""
+        # First register
+        porcelain.maintenance_register(self.test_dir)
+
+        # Then unregister
+        porcelain.maintenance_unregister(self.test_dir)
+
+        # Verify repository was removed from global config
+        import os
+
+        from dulwich.config import ConfigFile
+
+        global_config_path = os.path.expanduser("~/.gitconfig")
+        global_config = ConfigFile.from_path(global_config_path)
+
+        try:
+            repos = list(global_config.get_multivar((b"maintenance",), b"repo"))
+            self.assertNotIn(self.test_dir.encode(), repos)
+        except KeyError:
+            # No repos registered, which is fine
+            pass
+
+    def test_unregister_not_registered(self):
+        """Test unregistering a repository that is not registered."""
+        with self.assertRaises(ValueError):
+            porcelain.maintenance_unregister(self.test_dir)
+
+    def test_unregister_not_registered_force(self):
+        """Test unregistering with force flag."""
+        # Should not error with force=True
+        porcelain.maintenance_unregister(self.test_dir, force=True)