2
0
Эх сурвалжийг харах

Implement git reflog expire and delete commands

Fixes #1798
Jelmer Vernooij 3 сар өмнө
parent
commit
a928159d92

+ 3 - 0
NEWS

@@ -5,6 +5,9 @@
    such as Windows GUI apps, apps started with ``pythonw``, or apps using
    ``ProcessPoolExecutor``. (Jelmer Vernooij, #1939)
 
+ * Add support for ``dulwich reflog expire`` and ``dulwich reflog delete`` commands.
+   (Jelmer Vernooij, #1798)
+
  * Add ``dulwich grep`` command.
    Supports regular expressions, case-insensitive search, line numbers, pathspec
    filtering, and respecting .gitignore patterns. (Jelmer Vernooij, #1776)

+ 149 - 4
dulwich/cli.py

@@ -154,6 +154,10 @@ def parse_relative_time(time_str: str) -> int:
             "days": 86400,
             "week": 604800,
             "weeks": 604800,
+            "month": 2592000,  # 30 days
+            "months": 2592000,
+            "year": 31536000,  # 365 days
+            "years": 31536000,
         }
 
         if unit in multipliers:
@@ -166,6 +170,44 @@ def parse_relative_time(time_str: str) -> int:
         raise
 
 
+def parse_time_to_timestamp(time_spec: str) -> int:
+    """Parse a time specification and return a Unix timestamp.
+
+    Args:
+        time_spec: Time specification. Can be:
+            - A Unix timestamp (integer as string)
+            - A relative time like "2 weeks ago"
+            - "now" for current time
+            - "all" to expire all entries (returns future time)
+            - "never" to never expire (returns 0 - epoch start)
+
+    Returns:
+        Unix timestamp
+
+    Raises:
+        ValueError: If the time specification cannot be parsed
+    """
+    import time
+
+    # Handle special cases
+    if time_spec == "all":
+        # Expire all entries - set to future time so everything is "older"
+        return int(time.time()) + (100 * 365 * 24 * 60 * 60)  # 100 years in future
+    if time_spec == "never":
+        # Never expire - set to epoch start so nothing is older
+        return 0
+
+    # Try parsing as direct Unix timestamp
+    try:
+        return int(time_spec)
+    except ValueError:
+        pass
+
+    # Parse relative time and convert to timestamp
+    seconds_ago = parse_relative_time(time_spec)
+    return int(time.time()) - seconds_ago
+
+
 def format_bytes(bytes: float) -> str:
     """Format bytes as human-readable string.
 
@@ -2239,15 +2281,71 @@ class cmd_reflog(Command):
         Args:
             args: Command line arguments
         """
-        parser = argparse.ArgumentParser()
-        parser.add_argument(
+        parser = argparse.ArgumentParser(prog="dulwich reflog")
+        subparsers = parser.add_subparsers(dest="subcommand", help="Subcommand")
+
+        # Show subcommand (default when no subcommand is specified)
+        show_parser = subparsers.add_parser(
+            "show", help="Show reflog entries (default)", add_help=False
+        )
+        show_parser.add_argument(
             "ref", nargs="?", default="HEAD", help="Reference to show reflog for"
         )
-        parser.add_argument(
+        show_parser.add_argument(
             "--all", action="store_true", help="Show reflogs for all refs"
         )
-        parsed_args = parser.parse_args(args)
 
+        # Expire subcommand
+        expire_parser = subparsers.add_parser("expire", help="Expire reflog entries")
+        expire_parser.add_argument(
+            "ref", nargs="?", help="Reference to expire reflog for"
+        )
+        expire_parser.add_argument(
+            "--all", action="store_true", help="Expire reflogs for all refs"
+        )
+        expire_parser.add_argument(
+            "--expire",
+            type=str,
+            help="Expire entries older than time (e.g., '90 days ago', 'all', 'never')",
+        )
+        expire_parser.add_argument(
+            "--expire-unreachable",
+            type=str,
+            help="Expire unreachable entries older than time",
+        )
+        expire_parser.add_argument(
+            "--dry-run", "-n", action="store_true", help="Show what would be expired"
+        )
+
+        # Delete subcommand
+        delete_parser = subparsers.add_parser(
+            "delete", help="Delete specific reflog entry"
+        )
+        delete_parser.add_argument(
+            "refspec", help="Reference specification (e.g., HEAD@{1})"
+        )
+        delete_parser.add_argument(
+            "--rewrite",
+            action="store_true",
+            help="Rewrite subsequent entries to maintain consistency",
+        )
+
+        # If no arguments or first arg is not a subcommand, treat as show
+        if not args or (args[0] not in ["show", "expire", "delete"]):
+            # Parse as show command
+            parsed_args = parser.parse_args(["show", *list(args)])
+        else:
+            parsed_args = parser.parse_args(args)
+
+        if parsed_args.subcommand == "expire":
+            self._run_expire(parsed_args)
+        elif parsed_args.subcommand == "delete":
+            self._run_delete(parsed_args)
+        else:  # show or default
+            self._run_show(parsed_args)
+
+    def _run_show(self, parsed_args: argparse.Namespace) -> None:
+        """Show reflog entries."""
         with Repo(".") as repo:
             config = repo.get_config_stack()
             with get_pager(config=config, cmd_name="reflog") as outstream:
@@ -2281,6 +2379,53 @@ class cmd_reflog(Command):
                             f"{short_new} {ref.decode('utf-8', 'replace')}@{{{i}}}: {message}\n"
                         )
 
+    def _run_expire(self, parsed_args: argparse.Namespace) -> None:
+        """Expire reflog entries."""
+        # Parse time specifications
+        expire_time = None
+        expire_unreachable_time = None
+
+        if parsed_args.expire:
+            expire_time = parse_time_to_timestamp(parsed_args.expire)
+        if parsed_args.expire_unreachable:
+            expire_unreachable_time = parse_time_to_timestamp(
+                parsed_args.expire_unreachable
+            )
+
+        # Execute expire
+        result = porcelain.reflog_expire(
+            repo=".",
+            ref=parsed_args.ref,
+            all=parsed_args.all,
+            expire_time=expire_time,
+            expire_unreachable_time=expire_unreachable_time,
+            dry_run=parsed_args.dry_run,
+        )
+
+        # Print results
+        for ref_name, count in result.items():
+            ref_str = ref_name.decode("utf-8", "replace")
+            if parsed_args.dry_run:
+                print(f"Would expire {count} entries from {ref_str}")
+            else:
+                print(f"Expired {count} entries from {ref_str}")
+
+    def _run_delete(self, parsed_args: argparse.Namespace) -> None:
+        """Delete a specific reflog entry."""
+        from dulwich.reflog import parse_reflog_spec
+
+        # Parse refspec (e.g., "HEAD@{1}" or "refs/heads/master@{2}")
+        ref, index = parse_reflog_spec(parsed_args.refspec)
+
+        # Execute delete
+        porcelain.reflog_delete(
+            repo=".",
+            ref=ref,
+            index=index,
+            rewrite=parsed_args.rewrite,
+        )
+        print(f"Deleted entry {ref.decode('utf-8', 'replace')}@{{{index}}}")
+
 
 class cmd_reset(Command):
     """Reset current HEAD to the specified state."""

+ 141 - 0
dulwich/porcelain.py

@@ -7149,6 +7149,147 @@ def reflog(
                     yield (ref_bytes, entry)
 
 
+def reflog_expire(
+    repo: RepoPath = ".",
+    ref: Optional[Union[str, bytes]] = None,
+    all: bool = False,
+    expire_time: Optional[int] = None,
+    expire_unreachable_time: Optional[int] = None,
+    dry_run: bool = False,
+) -> dict[bytes, int]:
+    """Expire reflog entries based on age and reachability.
+
+    Args:
+        repo: Path to repository or a Repo object
+        ref: Reference name (if not using --all)
+        all: If True, expire reflogs for all refs
+        expire_time: Expire entries older than this timestamp (seconds since epoch)
+        expire_unreachable_time: Expire unreachable entries older than this timestamp
+        dry_run: If True, show what would be expired without making changes
+
+    Returns:
+        Dictionary mapping ref names to number of expired entries
+    """
+    import os
+    import time
+
+    from .reflog import expire_reflog, iter_reflogs
+
+    if not all and ref is None:
+        raise ValueError("Must specify either ref or all=True")
+
+    if isinstance(ref, str):
+        ref = ref.encode("utf-8")
+
+    # Default expire times if not specified
+    if expire_time is None and expire_unreachable_time is None:
+        # Default: expire entries older than 90 days, unreachable older than 30 days
+        now = int(time.time())
+        expire_time = now - (90 * 24 * 60 * 60)
+        expire_unreachable_time = now - (30 * 24 * 60 * 60)
+
+    result = {}
+
+    with open_repo_closing(repo) as r:
+        # Determine which refs to process
+        refs_to_process: list[bytes] = []
+        if all:
+            logs_dir = os.path.join(r.controldir(), "logs")
+            refs_to_process = list(iter_reflogs(logs_dir))
+        else:
+            assert ref is not None  # Already checked above
+            refs_to_process = [ref]
+
+        # Build set of reachable objects if we have unreachable expiration time
+        reachable_objects: Optional[set[bytes]] = None
+        if expire_unreachable_time is not None:
+            from .gc import find_reachable_objects
+
+            reachable_objects = find_reachable_objects(
+                r.object_store, r.refs, include_reflogs=False
+            )
+
+        # Process each ref
+        for ref_name in refs_to_process:
+            reflog_path = r._reflog_path(ref_name)
+            if not os.path.exists(reflog_path):
+                continue
+
+            # Create reachability checker
+            def is_reachable(sha: bytes) -> bool:
+                if reachable_objects is None:
+                    # No unreachable expiration, so assume everything is reachable
+                    return True
+                return sha in reachable_objects
+
+            # Open the reflog file
+            if dry_run:
+                # For dry run, just read and count what would be expired
+                with open(reflog_path, "rb") as f:
+                    from .reflog import read_reflog
+
+                    count = 0
+                    for entry in read_reflog(f):
+                        is_obj_reachable = is_reachable(entry.new_sha)
+                        should_expire = False
+
+                        if is_obj_reachable and expire_time is not None:
+                            if entry.timestamp < expire_time:
+                                should_expire = True
+                        elif (
+                            not is_obj_reachable and expire_unreachable_time is not None
+                        ):
+                            if entry.timestamp < expire_unreachable_time:
+                                should_expire = True
+
+                        if should_expire:
+                            count += 1
+
+                    result[ref_name] = count
+            else:
+                # Actually expire entries
+                with open(reflog_path, "r+b") as f:  # type: ignore[assignment]
+                    count = expire_reflog(
+                        f,
+                        expire_time=expire_time,
+                        expire_unreachable_time=expire_unreachable_time,
+                        reachable_checker=is_reachable,
+                    )
+                    result[ref_name] = count
+
+    return result
+
+
+def reflog_delete(
+    repo: RepoPath = ".",
+    ref: Union[str, bytes] = b"HEAD",
+    index: int = 0,
+    rewrite: bool = False,
+) -> None:
+    """Delete a specific reflog entry.
+
+    Args:
+        repo: Path to repository or a Repo object
+        ref: Reference name
+        index: Reflog entry index (0 = newest, in Git reflog order)
+        rewrite: If True, rewrite old_sha of subsequent entries to maintain consistency
+    """
+    import os
+
+    from .reflog import drop_reflog_entry
+
+    if isinstance(ref, str):
+        ref = ref.encode("utf-8")
+
+    with open_repo_closing(repo) as r:
+        reflog_path = r._reflog_path(ref)
+        if not os.path.exists(reflog_path):
+            raise ValueError(f"No reflog for ref {ref.decode()}")
+
+        with open(reflog_path, "r+b") as f:
+            drop_reflog_entry(f, index, rewrite=rewrite)
+
+
 def lfs_track(
     repo: Union[str, os.PathLike[str], Repo] = ".",
     patterns: Optional[Sequence[str]] = None,

+ 118 - 1
dulwich/reflog.py

@@ -22,7 +22,7 @@
 """Utilities for reading and generating reflogs."""
 
 import collections
-from collections.abc import Generator
+from collections.abc import Callable, Generator
 from typing import IO, BinaryIO, Optional, Union
 
 from .file import _GitFile
@@ -34,6 +34,45 @@ Entry = collections.namedtuple(
 )
 
 
+def parse_reflog_spec(refspec: Union[str, bytes]) -> tuple[bytes, int]:
+    """Parse a reflog specification like 'HEAD@{1}' or 'refs/heads/master@{2}'.
+
+    Args:
+        refspec: Reflog specification (e.g., 'HEAD@{1}', 'master@{0}')
+
+    Returns:
+        Tuple of (ref_name, index) where index is in Git reflog order (0 = newest)
+
+    Raises:
+        ValueError: If the refspec is not a valid reflog specification
+    """
+    if isinstance(refspec, str):
+        refspec = refspec.encode("utf-8")
+
+    if b"@{" not in refspec:
+        raise ValueError(
+            f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}"
+        )
+
+    ref, rest = refspec.split(b"@{", 1)
+    if not rest.endswith(b"}"):
+        raise ValueError(
+            f"Invalid reflog spec: {refspec!r}. Expected format: ref@{{n}}"
+        )
+
+    index_str = rest[:-1]
+    if not index_str.isdigit():
+        raise ValueError(
+            f"Invalid reflog index: {index_str!r}. Expected integer in ref@{{n}}"
+        )
+
+    # Use HEAD if no ref specified (e.g., "@{1}")
+    if not ref:
+        ref = b"HEAD"
+
+    return ref, int(index_str)
+
+
 def format_reflog_line(
     old_sha: Optional[bytes],
     new_sha: bytes,
@@ -162,6 +201,84 @@ def drop_reflog_entry(f: BinaryIO, index: int, rewrite: bool = False) -> None:
     f.truncate()
 
 
+def expire_reflog(
+    f: BinaryIO,
+    expire_time: Optional[int] = None,
+    expire_unreachable_time: Optional[int] = None,
+    reachable_checker: Optional[Callable[[bytes], bool]] = None,
+) -> int:
+    """Expire reflog entries based on age and reachability.
+
+    Args:
+        f: File-like object for the reflog
+        expire_time: Expire entries older than this timestamp (seconds since epoch).
+            If None, entries are not expired based on age alone.
+        expire_unreachable_time: Expire unreachable entries older than this
+            timestamp. If None, unreachable entries are not expired.
+        reachable_checker: Optional callable that takes a SHA and returns True
+            if the commit is reachable. If None, all entries are considered
+            reachable.
+
+    Returns:
+        Number of entries expired
+    """
+    if expire_time is None and expire_unreachable_time is None:
+        return 0
+
+    entries = []
+    offset = f.tell()
+    for line in f:
+        entries.append((offset, parse_reflog_line(line)))
+        offset = f.tell()
+
+    # Filter entries that should be kept
+    kept_entries = []
+    expired_count = 0
+
+    for offset, entry in entries:
+        should_expire = False
+
+        # Check if entry is reachable
+        is_reachable = True
+        if reachable_checker is not None:
+            is_reachable = reachable_checker(entry.new_sha)
+
+        # Apply expiration rules
+        # Check the appropriate expiration time based on reachability
+        if is_reachable:
+            if expire_time is not None and entry.timestamp < expire_time:
+                should_expire = True
+        else:
+            if (
+                expire_unreachable_time is not None
+                and entry.timestamp < expire_unreachable_time
+            ):
+                should_expire = True
+
+        if should_expire:
+            expired_count += 1
+        else:
+            kept_entries.append((offset, entry))
+
+    # Write back the kept entries
+    if expired_count > 0:
+        f.seek(0)
+        for _, entry in kept_entries:
+            f.write(
+                format_reflog_line(
+                    entry.old_sha,
+                    entry.new_sha,
+                    entry.committer,
+                    entry.timestamp,
+                    entry.timezone,
+                    entry.message,
+                )
+            )
+        f.truncate()
+
+    return expired_count
+
+
 def iter_reflogs(logs_dir: str) -> Generator[bytes, None, None]:
     """Iterate over all reflogs in a repository.
 

+ 49 - 4
tests/test_cli.py

@@ -143,6 +143,51 @@ class HelperFunctionsTest(TestCase):
         result = launch_editor(b"Test template content")
         self.assertEqual(b"Test template content", result)
 
+    def test_parse_relative_time(self):
+        """Test parsing relative time strings."""
+        from dulwich.cli import parse_relative_time
+
+        self.assertEqual(0, parse_relative_time("now"))
+        self.assertEqual(60, parse_relative_time("1 minute ago"))
+        self.assertEqual(120, parse_relative_time("2 minutes ago"))
+        self.assertEqual(3600, parse_relative_time("1 hour ago"))
+        self.assertEqual(7200, parse_relative_time("2 hours ago"))
+        self.assertEqual(86400, parse_relative_time("1 day ago"))
+        self.assertEqual(172800, parse_relative_time("2 days ago"))
+        self.assertEqual(604800, parse_relative_time("1 week ago"))
+        self.assertEqual(1209600, parse_relative_time("2 weeks ago"))
+        self.assertEqual(2592000, parse_relative_time("1 month ago"))
+        self.assertEqual(31536000, parse_relative_time("1 year ago"))
+
+        # Test invalid formats
+        with self.assertRaises(ValueError):
+            parse_relative_time("invalid")
+        with self.assertRaises(ValueError):
+            parse_relative_time("2 days")  # Missing "ago"
+        with self.assertRaises(ValueError):
+            parse_relative_time("two days ago")  # Not a number
+
+    def test_parse_time_to_timestamp(self):
+        """Test parsing time specifications to Unix timestamps."""
+        import time
+
+        from dulwich.cli import parse_time_to_timestamp
+
+        # Test special values
+        self.assertEqual(0, parse_time_to_timestamp("never"))
+        future_time = parse_time_to_timestamp("all")
+        self.assertGreater(future_time, int(time.time()))
+
+        # Test Unix timestamp
+        self.assertEqual(1234567890, parse_time_to_timestamp("1234567890"))
+
+        # Test relative time
+        now = int(time.time())
+        result = parse_time_to_timestamp("1 day ago")
+        expected = now - 86400
+        # Allow 2 second tolerance for test execution time
+        self.assertAlmostEqual(expected, result, delta=2)
+
 
 class AddCommandTest(DulwichCliTestCase):
     """Tests for add command."""
@@ -3076,12 +3121,12 @@ class ParseRelativeTimeTestCase(TestCase):
     def test_invalid_unit(self):
         """Test invalid time units."""
         with self.assertRaises(ValueError) as cm:
-            parse_relative_time("5 months ago")
-        self.assertIn("Unknown time unit: months", str(cm.exception))
+            parse_relative_time("5 fortnights ago")
+        self.assertIn("Unknown time unit: fortnights", str(cm.exception))
 
         with self.assertRaises(ValueError) as cm:
-            parse_relative_time("2 years ago")
-        self.assertIn("Unknown time unit: years", str(cm.exception))
+            parse_relative_time("2 decades ago")
+        self.assertIn("Unknown time unit: decades", str(cm.exception))
 
     def test_singular_plural(self):
         """Test that both singular and plural forms work."""

+ 316 - 0
tests/test_porcelain.py

@@ -9455,6 +9455,322 @@ class ReflogTest(PorcelainTestCase):
         self.assertIn(b"HEAD", refs_seen)
         self.assertIn(b"refs/heads/master", refs_seen)
 
+    def test_reflog_expire_by_time(self):
+        """Test expiring reflog entries by timestamp."""
+        # Create commits
+        blob = Blob.from_string(b"test content")
+        self.repo.object_store.add_object(blob)
+
+        tree = Tree()
+        tree.add(b"test", 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        commit1 = Commit()
+        commit1.tree = tree.id
+        commit1.author = b"Test Author <test@example.com>"
+        commit1.committer = b"Test Author <test@example.com>"
+        commit1.commit_time = 1000000000
+        commit1.commit_timezone = 0
+        commit1.author_time = 1000000000
+        commit1.author_timezone = 0
+        commit1.message = b"Old commit"
+        self.repo.object_store.add_object(commit1)
+
+        commit2 = Commit()
+        commit2.tree = tree.id
+        commit2.author = b"Test Author <test@example.com>"
+        commit2.committer = b"Test Author <test@example.com>"
+        commit2.commit_time = 2000000000
+        commit2.commit_timezone = 0
+        commit2.author_time = 2000000000
+        commit2.author_timezone = 0
+        commit2.message = b"Recent commit"
+        self.repo.object_store.add_object(commit2)
+
+        # Write reflog entries with different timestamps
+        self.repo._write_reflog(
+            b"HEAD",
+            ZERO_SHA,
+            commit1.id,
+            b"Test Author <test@example.com>",
+            1000000000,
+            0,
+            b"commit: Old commit",
+        )
+        self.repo._write_reflog(
+            b"HEAD",
+            commit1.id,
+            commit2.id,
+            b"Test Author <test@example.com>",
+            2000000000,
+            0,
+            b"commit: Recent commit",
+        )
+
+        # Expire entries older than timestamp 1500000000
+        result = porcelain.reflog_expire(
+            self.repo_path, ref=b"HEAD", expire_time=1500000000
+        )
+        self.assertEqual(1, result[b"HEAD"])
+
+        # Check that only the recent entry remains
+        entries = list(porcelain.reflog(self.repo_path, b"HEAD"))
+        self.assertEqual(1, len(entries))
+        self.assertEqual(commit2.id, entries[0].new_sha)
+
+    def test_reflog_expire_all(self):
+        """Test expiring reflog entries for all refs."""
+        # Create a commit
+        blob = Blob.from_string(b"test content")
+        self.repo.object_store.add_object(blob)
+
+        tree = Tree()
+        tree.add(b"test", 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        commit = Commit()
+        commit.tree = tree.id
+        commit.author = b"Test Author <test@example.com>"
+        commit.committer = b"Test Author <test@example.com>"
+        commit.commit_time = 1000000000
+        commit.commit_timezone = 0
+        commit.author_time = 1000000000
+        commit.author_timezone = 0
+        commit.message = b"Test commit"
+        self.repo.object_store.add_object(commit)
+
+        # Write old reflog entries for multiple refs
+        self.repo._write_reflog(
+            b"HEAD",
+            ZERO_SHA,
+            commit.id,
+            b"Test Author <test@example.com>",
+            1000000000,
+            0,
+            b"commit: Test commit",
+        )
+        self.repo._write_reflog(
+            b"refs/heads/master",
+            ZERO_SHA,
+            commit.id,
+            b"Test Author <test@example.com>",
+            1000000000,
+            0,
+            b"commit: Test commit",
+        )
+
+        # Expire all old entries
+        result = porcelain.reflog_expire(
+            self.repo_path, all=True, expire_time=2000000000
+        )
+
+        # Should have expired entries from both refs
+        self.assertIn(b"HEAD", result)
+        self.assertIn(b"refs/heads/master", result)
+        self.assertEqual(1, result[b"HEAD"])
+        self.assertEqual(1, result[b"refs/heads/master"])
+
+    def test_reflog_expire_dry_run(self):
+        """Test dry-run mode for reflog expire."""
+        # Create a commit
+        blob = Blob.from_string(b"test content")
+        self.repo.object_store.add_object(blob)
+
+        tree = Tree()
+        tree.add(b"test", 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        commit = Commit()
+        commit.tree = tree.id
+        commit.author = b"Test Author <test@example.com>"
+        commit.committer = b"Test Author <test@example.com>"
+        commit.commit_time = 1000000000
+        commit.commit_timezone = 0
+        commit.author_time = 1000000000
+        commit.author_timezone = 0
+        commit.message = b"Test commit"
+        self.repo.object_store.add_object(commit)
+
+        # Write old reflog entry
+        self.repo._write_reflog(
+            b"HEAD",
+            ZERO_SHA,
+            commit.id,
+            b"Test Author <test@example.com>",
+            1000000000,
+            0,
+            b"commit: Test commit",
+        )
+
+        # Dry run expire
+        result = porcelain.reflog_expire(
+            self.repo_path, ref=b"HEAD", expire_time=2000000000, dry_run=True
+        )
+        self.assertEqual(1, result[b"HEAD"])
+
+        # Entry should still exist
+        entries = list(porcelain.reflog(self.repo_path, b"HEAD"))
+        self.assertEqual(1, len(entries))
+
+    def test_reflog_delete(self):
+        """Test deleting specific reflog entry."""
+        # Create commits
+        blob = Blob.from_string(b"test content")
+        self.repo.object_store.add_object(blob)
+
+        tree = Tree()
+        tree.add(b"test", 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        commit1 = Commit()
+        commit1.tree = tree.id
+        commit1.author = b"Test Author <test@example.com>"
+        commit1.committer = b"Test Author <test@example.com>"
+        commit1.commit_time = 1000000000
+        commit1.commit_timezone = 0
+        commit1.author_time = 1000000000
+        commit1.author_timezone = 0
+        commit1.message = b"First commit"
+        self.repo.object_store.add_object(commit1)
+
+        commit2 = Commit()
+        commit2.tree = tree.id
+        commit2.author = b"Test Author <test@example.com>"
+        commit2.committer = b"Test Author <test@example.com>"
+        commit2.commit_time = 2000000000
+        commit2.commit_timezone = 0
+        commit2.author_time = 2000000000
+        commit2.author_timezone = 0
+        commit2.message = b"Second commit"
+        self.repo.object_store.add_object(commit2)
+
+        # Write two reflog entries
+        self.repo._write_reflog(
+            b"HEAD",
+            ZERO_SHA,
+            commit1.id,
+            b"Test Author <test@example.com>",
+            1000000000,
+            0,
+            b"commit: First commit",
+        )
+        self.repo._write_reflog(
+            b"HEAD",
+            commit1.id,
+            commit2.id,
+            b"Test Author <test@example.com>",
+            2000000000,
+            0,
+            b"commit: Second commit",
+        )
+
+        # Delete the most recent entry (index 0)
+        porcelain.reflog_delete(self.repo_path, ref=b"HEAD", index=0)
+
+        # Should only have one entry left
+        entries = list(porcelain.reflog(self.repo_path, b"HEAD"))
+        self.assertEqual(1, len(entries))
+        self.assertEqual(commit1.id, entries[0].new_sha)
+
+    def test_reflog_expire_unreachable(self):
+        """Test expiring unreachable reflog entries."""
+        # Create commits
+        blob = Blob.from_string(b"test content")
+        self.repo.object_store.add_object(blob)
+
+        tree = Tree()
+        tree.add(b"test", 0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        # Create commit 1 - will be reachable (pointed to by HEAD)
+        commit1 = Commit()
+        commit1.tree = tree.id
+        commit1.author = b"Test Author <test@example.com>"
+        commit1.committer = b"Test Author <test@example.com>"
+        commit1.commit_time = 1000000000
+        commit1.commit_timezone = 0
+        commit1.author_time = 1000000000
+        commit1.author_timezone = 0
+        commit1.message = b"Reachable commit"
+        self.repo.object_store.add_object(commit1)
+
+        # Create commit 2 - will be unreachable
+        commit2 = Commit()
+        commit2.tree = tree.id
+        commit2.author = b"Test Author <test@example.com>"
+        commit2.committer = b"Test Author <test@example.com>"
+        commit2.commit_time = 1500000000
+        commit2.commit_timezone = 0
+        commit2.author_time = 1500000000
+        commit2.author_timezone = 0
+        commit2.message = b"Unreachable commit"
+        self.repo.object_store.add_object(commit2)
+
+        # Create commit 3 - will also be reachable (pointed to by master)
+        commit3 = Commit()
+        commit3.tree = tree.id
+        commit3.author = b"Test Author <test@example.com>"
+        commit3.committer = b"Test Author <test@example.com>"
+        commit3.commit_time = 2000000000
+        commit3.commit_timezone = 0
+        commit3.author_time = 2000000000
+        commit3.author_timezone = 0
+        commit3.message = b"Another reachable commit"
+        self.repo.object_store.add_object(commit3)
+
+        # Set up refs to make commit1 and commit3 reachable
+        # HEAD is a symbolic ref to refs/heads/master by default, so we set master first
+        self.repo.refs[b"refs/heads/master"] = commit1.id
+        # Create another branch pointing to commit3
+        self.repo.refs[b"refs/heads/feature"] = commit3.id
+
+        # Write reflog entries for all three commits
+        self.repo._write_reflog(
+            b"HEAD",
+            ZERO_SHA,
+            commit1.id,
+            b"Test Author <test@example.com>",
+            1000000000,
+            0,
+            b"commit: Reachable commit",
+        )
+        self.repo._write_reflog(
+            b"HEAD",
+            commit1.id,
+            commit2.id,
+            b"Test Author <test@example.com>",
+            1500000000,
+            0,
+            b"commit: Unreachable commit",
+        )
+        self.repo._write_reflog(
+            b"HEAD",
+            commit2.id,
+            commit3.id,
+            b"Test Author <test@example.com>",
+            2000000000,
+            0,
+            b"commit: Another reachable commit",
+        )
+
+        # Expire unreachable entries older than a time that includes commit2
+        # but not the reachable commits
+        result = porcelain.reflog_expire(
+            self.repo_path,
+            ref=b"HEAD",
+            expire_unreachable_time=1600000000,  # After commit2, before commit3
+        )
+
+        # Should have expired only commit2
+        self.assertEqual(1, result[b"HEAD"])
+
+        # Verify the remaining entries
+        entries = list(porcelain.reflog(self.repo_path, b"HEAD"))
+        self.assertEqual(2, len(entries))
+        # commit1 and commit3 should remain (both reachable)
+        self.assertEqual(commit1.id, entries[0].new_sha)
+        self.assertEqual(commit3.id, entries[1].new_sha)
+
 
 class WriteCommitGraphTests(PorcelainTestCase):
     """Tests for the write_commit_graph porcelain function."""

+ 138 - 0
tests/test_reflog.py

@@ -27,9 +27,11 @@ from io import BytesIO
 from dulwich.objects import ZERO_SHA, Blob, Commit, Tree
 from dulwich.reflog import (
     drop_reflog_entry,
+    expire_reflog,
     format_reflog_line,
     iter_reflogs,
     parse_reflog_line,
+    parse_reflog_spec,
     read_reflog,
 )
 from dulwich.repo import Repo
@@ -37,6 +39,50 @@ from dulwich.repo import Repo
 from . import TestCase
 
 
+class ReflogSpecTests(TestCase):
+    def test_parse_reflog_spec_basic(self) -> None:
+        # Test basic reflog spec
+        ref, index = parse_reflog_spec("HEAD@{1}")
+        self.assertEqual(b"HEAD", ref)
+        self.assertEqual(1, index)
+
+    def test_parse_reflog_spec_with_full_ref(self) -> None:
+        # Test with full ref name
+        ref, index = parse_reflog_spec("refs/heads/master@{5}")
+        self.assertEqual(b"refs/heads/master", ref)
+        self.assertEqual(5, index)
+
+    def test_parse_reflog_spec_bytes(self) -> None:
+        # Test with bytes input
+        ref, index = parse_reflog_spec(b"develop@{0}")
+        self.assertEqual(b"develop", ref)
+        self.assertEqual(0, index)
+
+    def test_parse_reflog_spec_no_ref(self) -> None:
+        # Test with no ref (defaults to HEAD)
+        ref, index = parse_reflog_spec("@{2}")
+        self.assertEqual(b"HEAD", ref)
+        self.assertEqual(2, index)
+
+    def test_parse_reflog_spec_invalid_no_brace(self) -> None:
+        # Test invalid spec without @{
+        with self.assertRaises(ValueError) as cm:
+            parse_reflog_spec("HEAD")
+        self.assertIn("Expected format: ref@{n}", str(cm.exception))
+
+    def test_parse_reflog_spec_invalid_no_closing_brace(self) -> None:
+        # Test invalid spec without closing brace
+        with self.assertRaises(ValueError) as cm:
+            parse_reflog_spec("HEAD@{1")
+        self.assertIn("Expected format: ref@{n}", str(cm.exception))
+
+    def test_parse_reflog_spec_invalid_non_numeric(self) -> None:
+        # Test invalid spec with non-numeric index
+        with self.assertRaises(ValueError) as cm:
+            parse_reflog_spec("HEAD@{foo}")
+        self.assertIn("Expected integer", str(cm.exception))
+
+
 class ReflogLineTests(TestCase):
     def test_format(self) -> None:
         self.assertEqual(
@@ -264,3 +310,95 @@ class RepoReflogTests(TestCase):
         self.assertIn(b"HEAD", reflogs)
         self.assertIn(b"refs/heads/master", reflogs)
         self.assertIn(b"refs/heads/develop", reflogs)
+
+
+class ReflogExpireTests(TestCase):
+    def setUp(self) -> None:
+        TestCase.setUp(self)
+        # Create a reflog with entries at different timestamps
+        self.f = BytesIO()
+        # Old entry (timestamp: 1000000000)
+        self.f.write(
+            b"0000000000000000000000000000000000000000 "
+            b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa "
+            b"Test <test@example.com> 1000000000 +0000\told entry\n"
+        )
+        # Medium entry (timestamp: 1500000000)
+        self.f.write(
+            b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa "
+            b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb "
+            b"Test <test@example.com> 1500000000 +0000\tmedium entry\n"
+        )
+        # Recent entry (timestamp: 2000000000)
+        self.f.write(
+            b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb "
+            b"cccccccccccccccccccccccccccccccccccccccc "
+            b"Test <test@example.com> 2000000000 +0000\trecent entry\n"
+        )
+        self.f.seek(0)
+
+    def _read_log(self):
+        self.f.seek(0)
+        return list(read_reflog(self.f))
+
+    def test_expire_no_criteria(self) -> None:
+        # If no expiration criteria, nothing should be expired
+        count = expire_reflog(self.f)
+        self.assertEqual(0, count)
+        log = self._read_log()
+        self.assertEqual(3, len(log))
+
+    def test_expire_by_time(self) -> None:
+        # Expire entries older than timestamp 1600000000
+        # Should remove the first two entries
+        count = expire_reflog(self.f, expire_time=1600000000)
+        self.assertEqual(2, count)
+        log = self._read_log()
+        self.assertEqual(1, len(log))
+        self.assertEqual(b"recent entry", log[0].message)
+
+    def test_expire_unreachable(self) -> None:
+        # Test expiring unreachable entries
+        # Mark the middle entry as unreachable
+        def reachable_checker(sha):
+            return sha != b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
+
+        count = expire_reflog(
+            self.f,
+            expire_unreachable_time=1600000000,
+            reachable_checker=reachable_checker,
+        )
+        self.assertEqual(1, count)
+        log = self._read_log()
+        self.assertEqual(2, len(log))
+        # First and third entries should remain
+        self.assertEqual(b"old entry", log[0].message)
+        self.assertEqual(b"recent entry", log[1].message)
+
+    def test_expire_mixed(self) -> None:
+        # Test with both expire_time and expire_unreachable_time
+        def reachable_checker(sha):
+            # Only the most recent entry is reachable
+            return sha == b"cccccccccccccccccccccccccccccccccccccccc"
+
+        count = expire_reflog(
+            self.f,
+            expire_time=1800000000,  # Would expire first two if reachable
+            expire_unreachable_time=1200000000,  # Would expire first if unreachable
+            reachable_checker=reachable_checker,
+        )
+        # First entry is unreachable and old enough -> expired
+        # Second entry is unreachable but not old enough -> kept
+        # Third entry is reachable and recent -> kept
+        self.assertEqual(1, count)
+        log = self._read_log()
+        self.assertEqual(2, len(log))
+        self.assertEqual(b"medium entry", log[0].message)
+        self.assertEqual(b"recent entry", log[1].message)
+
+    def test_expire_all_entries(self) -> None:
+        # Expire all entries
+        count = expire_reflog(self.f, expire_time=3000000000)
+        self.assertEqual(3, count)
+        log = self._read_log()
+        self.assertEqual(0, len(log))