Browse Source

Add support for git grep command (#1938)

Fixes #1776
Jelmer Vernooij 3 months ago
parent
commit
cea101fb8b
5 changed files with 373 additions and 0 deletions
  1. 4 0
      NEWS
  2. 74 0
      dulwich/cli.py
  3. 124 0
      dulwich/porcelain.py
  4. 50 0
      tests/test_cli.py
  5. 121 0
      tests/test_porcelain.py

+ 4 - 0
NEWS

@@ -1,5 +1,9 @@
 0.24.6	2025-10-17
 
+ * Add ``dulwich grep`` command.
+   Supports regular expressions, case-insensitive search, line numbers, pathspec
+   filtering, and respecting .gitignore patterns. (Jelmer Vernooij, #1776)
+
  * Add support for octopus merge strategy. (Jelmer Vernooij, #1816)
 
  * Add support for ``git show-branch`` command to display branches and their

+ 74 - 0
dulwich/cli.py

@@ -3906,6 +3906,79 @@ class cmd_gc(Command):
         return None
 
 
+class cmd_grep(Command):
+    """Search for patterns in tracked files."""
+
+    def run(self, args: Sequence[str]) -> None:
+        """Execute the grep command.
+
+        Args:
+            args: Command line arguments
+        """
+        parser = argparse.ArgumentParser()
+        parser.add_argument("pattern", help="Regular expression pattern to search for")
+        parser.add_argument(
+            "revision",
+            nargs="?",
+            default=None,
+            help="Revision to search (defaults to HEAD)",
+        )
+        parser.add_argument(
+            "pathspecs",
+            nargs="*",
+            help="Path patterns to limit search",
+        )
+        parser.add_argument(
+            "-i",
+            "--ignore-case",
+            action="store_true",
+            help="Perform case-insensitive matching",
+        )
+        parser.add_argument(
+            "-n",
+            "--line-number",
+            action="store_true",
+            help="Show line numbers for matches",
+        )
+        parser.add_argument(
+            "--max-depth",
+            type=int,
+            default=None,
+            help="Maximum directory depth to search",
+        )
+        parser.add_argument(
+            "--no-ignore",
+            action="store_true",
+            help="Do not respect .gitignore patterns",
+        )
+        parsed_args = parser.parse_args(args)
+
+        # Handle the case where revision might be a pathspec
+        revision = parsed_args.revision
+        pathspecs = parsed_args.pathspecs
+
+        # If revision looks like a pathspec (contains wildcards or slashes),
+        # treat it as a pathspec instead
+        if revision and ("*" in revision or "/" in revision or "." in revision):
+            pathspecs = [revision, *pathspecs]
+            revision = None
+
+        with Repo(".") as repo:
+            config = repo.get_config_stack()
+            with get_pager(config=config, cmd_name="grep") as outstream:
+                porcelain.grep(
+                    repo,
+                    parsed_args.pattern,
+                    outstream=outstream,
+                    rev=revision,
+                    pathspecs=pathspecs if pathspecs else None,
+                    ignore_case=parsed_args.ignore_case,
+                    line_number=parsed_args.line_number,
+                    max_depth=parsed_args.max_depth,
+                    respect_ignores=not parsed_args.no_ignore,
+                )
+
+
 class cmd_count_objects(Command):
     """Count unpacked number of objects and their disk consumption."""
 
@@ -5291,6 +5364,7 @@ commands = {
     "format-patch": cmd_format_patch,
     "fsck": cmd_fsck,
     "gc": cmd_gc,
+    "grep": cmd_grep,
     "help": cmd_help,
     "init": cmd_init,
     "lfs": cmd_lfs,

+ 124 - 0
dulwich/porcelain.py

@@ -39,6 +39,7 @@ Currently implemented:
  * fetch
  * filter_branch
  * for_each_ref
+ * grep
  * init
  * ls_files
  * ls_remote
@@ -85,6 +86,7 @@ import fnmatch
 import logging
 import os
 import posixpath
+import re
 import stat
 import sys
 import time
@@ -3197,6 +3199,128 @@ def get_untracked_paths(
     yield from ignored_dirs
 
 
+def grep(
+    repo: RepoPath,
+    pattern: Union[str, bytes],
+    *,
+    outstream: TextIO = sys.stdout,
+    rev: Optional[Union[str, bytes]] = None,
+    pathspecs: Optional[Sequence[Union[str, bytes]]] = None,
+    ignore_case: bool = False,
+    line_number: bool = False,
+    max_depth: Optional[int] = None,
+    respect_ignores: bool = True,
+) -> None:
+    """Search for a pattern in tracked files.
+
+    Args:
+      repo: Path to repository or Repo object
+      pattern: Regular expression pattern to search for
+      outstream: Stream to write results to
+      rev: Revision to search in (defaults to HEAD)
+      pathspecs: Optional list of path patterns to limit search
+      ignore_case: Whether to perform case-insensitive matching
+      line_number: Whether to output line numbers
+      max_depth: Maximum directory depth to search
+      respect_ignores: Whether to respect .gitignore patterns
+    """
+    from .object_store import iter_tree_contents
+
+    # Compile the pattern
+    flags = re.IGNORECASE if ignore_case else 0
+    try:
+        if isinstance(pattern, bytes):
+            compiled_pattern = re.compile(pattern, flags)
+        else:
+            compiled_pattern = re.compile(pattern.encode("utf-8"), flags)
+    except re.error as e:
+        raise ValueError(f"Invalid regular expression: {e}") from e
+
+    with open_repo_closing(repo) as r:
+        # Get the tree to search
+        if rev is None:
+            try:
+                commit = r[b"HEAD"]
+                assert isinstance(commit, Commit)
+            except KeyError as e:
+                raise ValueError("No HEAD commit found") from e
+        else:
+            rev_bytes = rev if isinstance(rev, bytes) else rev.encode("utf-8")
+            commit_obj = parse_commit(r, rev_bytes)
+            if commit_obj is None:
+                raise ValueError(f"Invalid revision: {rev}")
+            commit = commit_obj
+
+        tree = r[commit.tree]
+        assert isinstance(tree, Tree)
+
+        # Set up ignore filter if requested
+        ignore_manager = None
+        if respect_ignores:
+            ignore_manager = IgnoreFilterManager.from_repo(r)
+
+        # Convert pathspecs to bytes
+        pathspecs_bytes: Optional[list[bytes]] = None
+        if pathspecs:
+            pathspecs_bytes = [
+                p if isinstance(p, bytes) else p.encode("utf-8") for p in pathspecs
+            ]
+
+        # Iterate through all files in the tree
+        for entry in iter_tree_contents(r.object_store, tree.id):
+            path, mode, sha = entry.path, entry.mode, entry.sha
+            assert path is not None
+            assert mode is not None
+            assert sha is not None
+
+            # Skip directories
+            if stat.S_ISDIR(mode):
+                continue
+
+            # Check max depth
+            if max_depth is not None:
+                depth = path.count(b"/")
+                if depth > max_depth:
+                    continue
+
+            # Check pathspecs
+            if pathspecs_bytes:
+                matches_pathspec = False
+                for pathspec in pathspecs_bytes:
+                    # Simple prefix matching (could be enhanced with full pathspec support)
+                    if path.startswith(pathspec) or fnmatch.fnmatch(
+                        path.decode("utf-8", errors="replace"),
+                        pathspec.decode("utf-8", errors="replace"),
+                    ):
+                        matches_pathspec = True
+                        break
+                if not matches_pathspec:
+                    continue
+
+            # Check ignore patterns
+            if ignore_manager:
+                path_str = path.decode("utf-8", errors="replace")
+                if ignore_manager.is_ignored(path_str) is True:
+                    continue
+
+            # Get the blob content
+            blob = r[sha]
+            assert isinstance(blob, Blob)
+
+            # Search for pattern in the blob
+            content = blob.data
+            lines = content.split(b"\n")
+
+            for line_num, line in enumerate(lines, 1):
+                if compiled_pattern.search(line):
+                    path_str = path.decode("utf-8", errors="replace")
+                    line_str = line.decode("utf-8", errors="replace")
+                    if line_number:
+                        outstream.write(f"{path_str}:{line_num}:{line_str}\n")
+                    else:
+                        outstream.write(f"{path_str}:{line_str}\n")
+
+
 def get_tree_changes(repo: RepoPath) -> dict[str, list[Union[str, bytes]]]:
     """Return add/delete/modify changes to tree by comparing index to HEAD.
 

+ 50 - 0
tests/test_cli.py

@@ -2434,6 +2434,56 @@ class FsckCommandTest(DulwichCliTestCase):
         # Should complete without errors
 
 
+class GrepCommandTest(DulwichCliTestCase):
+    """Tests for grep command."""
+
+    def test_grep_basic(self):
+        # Create test files
+        with open(os.path.join(self.repo_path, "file1.txt"), "w") as f:
+            f.write("hello world\n")
+        with open(os.path.join(self.repo_path, "file2.txt"), "w") as f:
+            f.write("foo bar\n")
+
+        self._run_cli("add", "file1.txt", "file2.txt")
+        self._run_cli("commit", "--message=Add files")
+
+        _result, stdout, _stderr = self._run_cli("grep", "world")
+        self.assertEqual("file1.txt:hello world\n", stdout.replace("\r\n", "\n"))
+
+    def test_grep_line_numbers(self):
+        with open(os.path.join(self.repo_path, "test.txt"), "w") as f:
+            f.write("line1\nline2\nline3\n")
+
+        self._run_cli("add", "test.txt")
+        self._run_cli("commit", "--message=Add test")
+
+        _result, stdout, _stderr = self._run_cli("grep", "-n", "line")
+        self.assertEqual(
+            "test.txt:1:line1\ntest.txt:2:line2\ntest.txt:3:line3\n",
+            stdout.replace("\r\n", "\n"),
+        )
+
+    def test_grep_case_insensitive(self):
+        with open(os.path.join(self.repo_path, "case.txt"), "w") as f:
+            f.write("Hello World\n")
+
+        self._run_cli("add", "case.txt")
+        self._run_cli("commit", "--message=Add case")
+
+        _result, stdout, _stderr = self._run_cli("grep", "-i", "hello")
+        self.assertEqual("case.txt:Hello World\n", stdout.replace("\r\n", "\n"))
+
+    def test_grep_no_matches(self):
+        with open(os.path.join(self.repo_path, "empty.txt"), "w") as f:
+            f.write("nothing here\n")
+
+        self._run_cli("add", "empty.txt")
+        self._run_cli("commit", "--message=Add empty")
+
+        _result, stdout, _stderr = self._run_cli("grep", "nonexistent")
+        self.assertEqual("", stdout)
+
+
 class RepackCommandTest(DulwichCliTestCase):
     """Tests for repack command."""
 

+ 121 - 0
tests/test_porcelain.py

@@ -10157,3 +10157,124 @@ class CherryTests(PorcelainTestCase):
         status, commit_sha, _message = results[0]
         self.assertEqual("-", status)
         self.assertEqual(head_commit, commit_sha)
+
+
+class GrepTests(PorcelainTestCase):
+    def test_basic_grep(self) -> None:
+        """Test basic pattern matching in files."""
+        # Create some test files
+        with open(os.path.join(self.repo_path, "foo.txt"), "w") as f:
+            f.write("hello world\ngoodbye world\n")
+        with open(os.path.join(self.repo_path, "bar.txt"), "w") as f:
+            f.write("foo bar\nbaz qux\n")
+
+        porcelain.add(self.repo, paths=["foo.txt", "bar.txt"])
+        porcelain.commit(self.repo, message=b"Add test files")
+
+        # Search for "world"
+        outstream = StringIO()
+        porcelain.grep(self.repo, "world", outstream=outstream)
+        output = outstream.getvalue().replace("\r\n", "\n")
+
+        self.assertEqual("foo.txt:hello world\nfoo.txt:goodbye world\n", output)
+
+    def test_grep_with_line_numbers(self) -> None:
+        """Test grep with line numbers."""
+        with open(os.path.join(self.repo_path, "test.txt"), "w") as f:
+            f.write("line one\nline two\nline three\n")
+
+        porcelain.add(self.repo, paths=["test.txt"])
+        porcelain.commit(self.repo, message=b"Add test file")
+
+        outstream = StringIO()
+        porcelain.grep(self.repo, "line", outstream=outstream, line_number=True)
+        output = outstream.getvalue().replace("\r\n", "\n")
+
+        self.assertEqual(
+            "test.txt:1:line one\ntest.txt:2:line two\ntest.txt:3:line three\n",
+            output,
+        )
+
+    def test_grep_case_insensitive(self) -> None:
+        """Test case-insensitive grep."""
+        with open(os.path.join(self.repo_path, "case.txt"), "w") as f:
+            f.write("Hello WORLD\nGoodbye world\n")
+
+        porcelain.add(self.repo, paths=["case.txt"])
+        porcelain.commit(self.repo, message=b"Add case file")
+
+        outstream = StringIO()
+        porcelain.grep(self.repo, "HELLO", outstream=outstream, ignore_case=True)
+        output = outstream.getvalue().replace("\r\n", "\n")
+
+        self.assertEqual("case.txt:Hello WORLD\n", output)
+
+    def test_grep_with_pathspec(self) -> None:
+        """Test grep with pathspec filtering."""
+        os.makedirs(os.path.join(self.repo_path, "subdir"))
+        with open(os.path.join(self.repo_path, "file1.txt"), "w") as f:
+            f.write("pattern match\n")
+        with open(os.path.join(self.repo_path, "subdir", "file2.txt"), "w") as f:
+            f.write("pattern match\n")
+
+        porcelain.add(self.repo, paths=["file1.txt", "subdir/file2.txt"])
+        porcelain.commit(self.repo, message=b"Add files")
+
+        # Search only in subdir
+        outstream = StringIO()
+        porcelain.grep(self.repo, "pattern", outstream=outstream, pathspecs=["subdir/"])
+        output = outstream.getvalue().replace("\r\n", "\n")
+
+        self.assertEqual("subdir/file2.txt:pattern match\n", output)
+
+    def test_grep_no_matches(self) -> None:
+        """Test grep with no matches."""
+        with open(os.path.join(self.repo_path, "empty.txt"), "w") as f:
+            f.write("nothing to see here\n")
+
+        porcelain.add(self.repo, paths=["empty.txt"])
+        porcelain.commit(self.repo, message=b"Add empty file")
+
+        outstream = StringIO()
+        porcelain.grep(self.repo, "nonexistent", outstream=outstream)
+        output = outstream.getvalue()
+
+        self.assertEqual("", output)
+
+    def test_grep_regex_pattern(self) -> None:
+        """Test grep with regex patterns."""
+        with open(os.path.join(self.repo_path, "regex.txt"), "w") as f:
+            f.write("test123\ntest456\nnotest\n")
+
+        porcelain.add(self.repo, paths=["regex.txt"])
+        porcelain.commit(self.repo, message=b"Add regex file")
+
+        # Search for "test" followed by digits
+        outstream = StringIO()
+        porcelain.grep(self.repo, r"test\d+", outstream=outstream)
+        output = outstream.getvalue().replace("\r\n", "\n")
+
+        self.assertEqual("regex.txt:test123\nregex.txt:test456\n", output)
+
+    def test_grep_invalid_pattern(self) -> None:
+        """Test grep with invalid regex pattern."""
+        with open(os.path.join(self.repo_path, "test.txt"), "w") as f:
+            f.write("test\n")
+
+        porcelain.add(self.repo, paths=["test.txt"])
+        porcelain.commit(self.repo, message=b"Add test file")
+
+        outstream = StringIO()
+        with self.assertRaises(ValueError):
+            porcelain.grep(self.repo, "[invalid", outstream=outstream)
+
+    def test_grep_no_head(self) -> None:
+        """Test grep fails when there's no HEAD commit."""
+        # Create a fresh repo with no commits
+        empty_repo_path = os.path.join(self.test_dir, "empty_repo")
+        empty_repo = Repo.init(empty_repo_path, mkdir=True)
+        self.addCleanup(empty_repo.close)
+
+        outstream = StringIO()
+        with self.assertRaises(ValueError):
+            porcelain.grep(empty_repo, "pattern", outstream=outstream)