Просмотр исходного кода

Add support for core.preloadIndex configuration setting

This setting enables parallel stat operations when checking for unstaged
changes, improving performance on slow filesystems like NFS.

When core.preloadIndex is enabled, get_unstaged_changes() uses a thread
pool to parallelize filesystem operations. The implementation:
- Uses ThreadPoolExecutor with up to 8 workers (or CPU count if smaller)
- Falls back to serial processing if threading is unavailable
- Maintains compatibility by defaulting to serial processing

The setting is respected in both porcelain.status() and porcelain.add()
operations.

Fixes #1851
Jelmer Vernooij 6 месяцев назад
Родитель
Сommit
e25f203197
5 измененных файлов с 192 добавлено и 28 удалено
  1. 4 0
      NEWS
  2. 87 26
      dulwich/index.py
  3. 16 2
      dulwich/porcelain.py
  4. 45 0
      tests/test_index.py
  5. 40 0
      tests/test_porcelain.py

+ 4 - 0
NEWS

@@ -46,6 +46,10 @@
  * Add IPv6 support for git:// protocol URLs.
  * Add IPv6 support for git:// protocol URLs.
    (Jelmer Vernooij, #1796)
    (Jelmer Vernooij, #1796)
 
 
+ * Add support for ``core.preloadIndex`` configuration setting to enable
+   parallel stat operations when checking for unstaged changes. This improves
+   performance on slow filesystems like NFS. (Jelmer Vernooij, #1851)
+
 0.24.1	2025-08-01
 0.24.1	2025-08-01
 
 
  * Require ``typing_extensions`` on Python 3.10.
  * Require ``typing_extensions`` on Python 3.10.

+ 87 - 26
dulwich/index.py

@@ -2440,10 +2440,55 @@ def update_working_tree(
     index.write()
     index.write()
 
 
 
 
+def _check_entry_for_changes(
+    tree_path: bytes,
+    entry: Union[IndexEntry, ConflictedIndexEntry],
+    root_path: bytes,
+    filter_blob_callback: Optional[Callable] = None,
+) -> Optional[bytes]:
+    """Check a single index entry for changes.
+
+    Args:
+      tree_path: Path in the tree
+      entry: Index entry to check
+      root_path: Root filesystem path
+      filter_blob_callback: Optional callback to filter blobs
+    Returns: tree_path if changed, None otherwise
+    """
+    if isinstance(entry, ConflictedIndexEntry):
+        # Conflicted files are always unstaged
+        return tree_path
+
+    full_path = _tree_to_fs_path(root_path, tree_path)
+    try:
+        st = os.lstat(full_path)
+        if stat.S_ISDIR(st.st_mode):
+            if _has_directory_changed(tree_path, entry):
+                return tree_path
+            return None
+
+        if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
+            return None
+
+        blob = blob_from_path_and_stat(full_path, st)
+
+        if filter_blob_callback is not None:
+            blob = filter_blob_callback(blob, tree_path)
+    except FileNotFoundError:
+        # The file was removed, so we assume that counts as
+        # different from whatever file used to exist.
+        return tree_path
+    else:
+        if blob.id != entry.sha:
+            return tree_path
+    return None
+
+
 def get_unstaged_changes(
 def get_unstaged_changes(
     index: Index,
     index: Index,
     root_path: Union[str, bytes],
     root_path: Union[str, bytes],
     filter_blob_callback: Optional[Callable] = None,
     filter_blob_callback: Optional[Callable] = None,
+    preload_index: bool = False,
 ) -> Generator[bytes, None, None]:
 ) -> Generator[bytes, None, None]:
     """Walk through an index and check for differences against working tree.
     """Walk through an index and check for differences against working tree.
 
 
@@ -2451,40 +2496,56 @@ def get_unstaged_changes(
       index: index to check
       index: index to check
       root_path: path in which to find files
       root_path: path in which to find files
       filter_blob_callback: Optional callback to filter blobs
       filter_blob_callback: Optional callback to filter blobs
+      preload_index: If True, use parallel threads to check files (requires threading support)
     Returns: iterator over paths with unstaged changes
     Returns: iterator over paths with unstaged changes
     """
     """
     # For each entry in the index check the sha1 & ensure not staged
     # For each entry in the index check the sha1 & ensure not staged
     if not isinstance(root_path, bytes):
     if not isinstance(root_path, bytes):
         root_path = os.fsencode(root_path)
         root_path = os.fsencode(root_path)
 
 
-    for tree_path, entry in index.iteritems():
-        full_path = _tree_to_fs_path(root_path, tree_path)
-        if isinstance(entry, ConflictedIndexEntry):
-            # Conflicted files are always unstaged
-            yield tree_path
-            continue
-
+    if preload_index:
+        # Use parallel processing for better performance on slow filesystems
         try:
         try:
-            st = os.lstat(full_path)
-            if stat.S_ISDIR(st.st_mode):
-                if _has_directory_changed(tree_path, entry):
-                    yield tree_path
-                continue
-
-            if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
-                continue
-
-            blob = blob_from_path_and_stat(full_path, st)
-
-            if filter_blob_callback is not None:
-                blob = filter_blob_callback(blob, tree_path)
-        except FileNotFoundError:
-            # The file was removed, so we assume that counts as
-            # different from whatever file used to exist.
-            yield tree_path
+            import multiprocessing
+            from concurrent.futures import ThreadPoolExecutor
+        except ImportError:
+            # If threading is not available, fall back to serial processing
+            preload_index = False
         else:
         else:
-            if blob.id != entry.sha:
-                yield tree_path
+            # Collect all entries first
+            entries = list(index.iteritems())
+
+            # Use number of CPUs but cap at 8 threads to avoid overhead
+            num_workers = min(multiprocessing.cpu_count(), 8)
+
+            # Process entries in parallel
+            with ThreadPoolExecutor(max_workers=num_workers) as executor:
+                # Submit all tasks
+                futures = [
+                    executor.submit(
+                        _check_entry_for_changes,
+                        tree_path,
+                        entry,
+                        root_path,
+                        filter_blob_callback,
+                    )
+                    for tree_path, entry in entries
+                ]
+
+                # Yield results as they complete
+                for future in futures:
+                    result = future.result()
+                    if result is not None:
+                        yield result
+
+    if not preload_index:
+        # Serial processing
+        for tree_path, entry in index.iteritems():
+            result = _check_entry_for_changes(
+                tree_path, entry, root_path, filter_blob_callback
+            )
+            if result is not None:
+                yield result
 
 
 
 
 def _tree_to_fs_path(
 def _tree_to_fs_path(

+ 16 - 2
dulwich/porcelain.py

@@ -875,7 +875,14 @@ def add(
         index = r.open_index()
         index = r.open_index()
         normalizer = r.get_blob_normalizer()
         normalizer = r.get_blob_normalizer()
         filter_callback = normalizer.checkin_normalize
         filter_callback = normalizer.checkin_normalize
-        all_unstaged_paths = list(get_unstaged_changes(index, r.path, filter_callback))
+
+        # Check if core.preloadIndex is enabled
+        config = r.get_config_stack()
+        preload_index = config.get_boolean(b"core", b"preloadIndex", False)
+
+        all_unstaged_paths = list(
+            get_unstaged_changes(index, r.path, filter_callback, preload_index)
+        )
 
 
         if not paths:
         if not paths:
             # When no paths specified, add all untracked and modified files from repo root
             # When no paths specified, add all untracked and modified files from repo root
@@ -2651,7 +2658,14 @@ def status(
         index = r.open_index()
         index = r.open_index()
         normalizer = r.get_blob_normalizer()
         normalizer = r.get_blob_normalizer()
         filter_callback = normalizer.checkin_normalize
         filter_callback = normalizer.checkin_normalize
-        unstaged_changes = list(get_unstaged_changes(index, r.path, filter_callback))
+
+        # Check if core.preloadIndex is enabled
+        config = r.get_config_stack()
+        preload_index = config.get_boolean(b"core", b"preloadIndex", False)
+
+        unstaged_changes = list(
+            get_unstaged_changes(index, r.path, filter_callback, preload_index)
+        )
 
 
         untracked_paths = get_untracked_paths(
         untracked_paths = get_untracked_paths(
             r.path,
             r.path,

+ 45 - 0
tests/test_index.py

@@ -854,6 +854,51 @@ class GetUnstagedChangesTests(TestCase):
 
 
             self.assertEqual(list(changes), [b"foo1"])
             self.assertEqual(list(changes), [b"foo1"])
 
 
+    def test_get_unstaged_changes_with_preload(self) -> None:
+        """Unit test for get_unstaged_changes with preload_index=True."""
+        repo_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, repo_dir)
+        with Repo.init(repo_dir) as repo:
+            # Create multiple files to test parallel processing
+            files = []
+            for i in range(10):
+                filename = f"foo{i}"
+                fullpath = os.path.join(repo_dir, filename)
+                with open(fullpath, "wb") as f:
+                    f.write(b"origstuff" + str(i).encode())
+                files.append(filename)
+
+            repo.stage(files)
+            repo.do_commit(
+                b"test status",
+                author=b"author <email>",
+                committer=b"committer <email>",
+            )
+
+            # Modify some files
+            modified_files = [b"foo1", b"foo3", b"foo5", b"foo7"]
+            for filename in modified_files:
+                fullpath = os.path.join(repo_dir, filename.decode())
+                with open(fullpath, "wb") as f:
+                    f.write(b"newstuff")
+                os.utime(fullpath, (0, 0))
+
+            # Test with preload_index=False (serial)
+            changes_serial = list(
+                get_unstaged_changes(repo.open_index(), repo_dir, preload_index=False)
+            )
+            changes_serial.sort()
+
+            # Test with preload_index=True (parallel)
+            changes_parallel = list(
+                get_unstaged_changes(repo.open_index(), repo_dir, preload_index=True)
+            )
+            changes_parallel.sort()
+
+            # Both should return the same results
+            self.assertEqual(changes_serial, changes_parallel)
+            self.assertEqual(changes_serial, sorted(modified_files))
+
     def test_get_unstaged_deleted_changes(self) -> None:
     def test_get_unstaged_deleted_changes(self) -> None:
         """Unit test for get_unstaged_changes."""
         """Unit test for get_unstaged_changes."""
         repo_dir = tempfile.mkdtemp()
         repo_dir = tempfile.mkdtemp()

+ 40 - 0
tests/test_porcelain.py

@@ -5534,6 +5534,46 @@ class StatusTests(PorcelainTestCase):
         self.assertEqual(results.staged["add"][0], filename_add.encode("ascii"))
         self.assertEqual(results.staged["add"][0], filename_add.encode("ascii"))
         self.assertEqual(results.unstaged, [b"foo"])
         self.assertEqual(results.unstaged, [b"foo"])
 
 
+    def test_status_with_core_preloadindex(self) -> None:
+        """Test status with core.preloadIndex enabled."""
+        # Set core.preloadIndex to true
+        config = self.repo.get_config()
+        config.set(b"core", b"preloadIndex", b"true")
+        config.write_to_path()
+
+        # Create multiple files
+        files = []
+        for i in range(10):
+            filename = f"file{i}"
+            fullpath = os.path.join(self.repo.path, filename)
+            with open(fullpath, "w") as f:
+                f.write(f"content{i}")
+            files.append(fullpath)
+
+        porcelain.add(repo=self.repo.path, paths=files)
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"test preload status",
+            author=b"author <email>",
+            committer=b"committer <email>",
+        )
+
+        # Modify some files
+        modified_files = ["file1", "file3", "file5", "file7"]
+        for filename in modified_files:
+            fullpath = os.path.join(self.repo.path, filename)
+            with open(fullpath, "w") as f:
+                f.write("modified content")
+            os.utime(fullpath, (0, 0))
+
+        # Status should work correctly with preloadIndex enabled
+        results = porcelain.status(self.repo)
+
+        # Check that we detected the correct unstaged changes
+        unstaged_sorted = sorted(results.unstaged)
+        expected_sorted = sorted([f.encode("ascii") for f in modified_files])
+        self.assertEqual(unstaged_sorted, expected_sorted)
+
     def test_status_all(self) -> None:
     def test_status_all(self) -> None:
         del_path = os.path.join(self.repo.path, "foo")
         del_path = os.path.join(self.repo.path, "foo")
         mod_path = os.path.join(self.repo.path, "bar")
         mod_path = os.path.join(self.repo.path, "bar")