Explorar o código

Optimize update_working_tree and prevent overwriting uncommitted changes (#1708)

- Replace iter_tree_contents with diff_tree for massive performance
improvement
- Only process files that actually changed between trees
- Add safety check to prevent overwriting uncommitted changes by default
Jelmer Vernooij hai 1 mes
pai
achega
1587519944
Modificáronse 6 ficheiros con 1114 adicións e 236 borrados
  1. 4 0
      dulwich/errors.py
  2. 382 158
      dulwich/index.py
  3. 66 19
      dulwich/porcelain.py
  4. 4 0
      dulwich/stash.py
  5. 445 46
      tests/test_index.py
  6. 213 13
      tests/test_porcelain.py

+ 4 - 0
dulwich/errors.py

@@ -180,3 +180,7 @@ class RefFormatError(Exception):
 
 class HookError(Exception):
     """An error occurred while executing a hook."""
+
+
+class WorkingTreeModifiedError(Exception):
+    """Indicates that the working tree has modifications that would be overwritten."""

+ 382 - 158
dulwich/index.py

@@ -42,6 +42,8 @@ from typing import (
 )
 
 if TYPE_CHECKING:
+    from .config import Config
+    from .diff_tree import TreeChange
     from .file import _GitFile
     from .line_ending import BlobNormalizer
     from .repo import Repo
@@ -1298,15 +1300,59 @@ def build_file_from_blob(
 INVALID_DOTNAMES = (b".git", b".", b"..", b"")
 
 
+def _normalize_path_element_default(element: bytes) -> bytes:
+    """Normalize path element for default case-insensitive comparison."""
+    return element.lower()
+
+
+def _normalize_path_element_ntfs(element: bytes) -> bytes:
+    """Normalize path element for NTFS filesystem."""
+    return element.rstrip(b". ").lower()
+
+
+def _normalize_path_element_hfs(element: bytes) -> bytes:
+    """Normalize path element for HFS+ filesystem."""
+    import unicodedata
+
+    # Decode to Unicode (let UnicodeDecodeError bubble up)
+    element_str = element.decode("utf-8", errors="strict")
+
+    # Remove HFS+ ignorable characters
+    filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
+    # Normalize to NFD
+    normalized = unicodedata.normalize("NFD", filtered)
+    return normalized.lower().encode("utf-8", errors="strict")
+
+
+def get_path_element_normalizer(config) -> Callable[[bytes], bytes]:
+    """Get the appropriate path element normalization function based on config.
+
+    Args:
+        config: Repository configuration object
+
+    Returns:
+        Function that normalizes path elements for the configured filesystem
+    """
+    import os
+    import sys
+
+    if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
+        return _normalize_path_element_ntfs
+    elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
+        return _normalize_path_element_hfs
+    else:
+        return _normalize_path_element_default
+
+
 def validate_path_element_default(element: bytes) -> bool:
-    return element.lower() not in INVALID_DOTNAMES
+    return _normalize_path_element_default(element) not in INVALID_DOTNAMES
 
 
 def validate_path_element_ntfs(element: bytes) -> bool:
-    stripped = element.rstrip(b". ").lower()
-    if stripped in INVALID_DOTNAMES:
+    normalized = _normalize_path_element_ntfs(element)
+    if normalized in INVALID_DOTNAMES:
         return False
-    if stripped == b"git~1":
+    if normalized == b"git~1":
         return False
     return True
 
@@ -1338,28 +1384,18 @@ def validate_path_element_hfs(element: bytes) -> bool:
     Equivalent to Git's is_hfs_dotgit and related checks.
     Uses NFD normalization and ignores HFS+ ignorable characters.
     """
-    import unicodedata
-
     try:
-        # Decode to Unicode
-        element_str = element.decode("utf-8", errors="strict")
+        normalized = _normalize_path_element_hfs(element)
     except UnicodeDecodeError:
         # Malformed UTF-8 - be conservative and reject
         return False
 
-    # Remove HFS+ ignorable characters (like Git's next_hfs_char)
-    filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
-
-    # Normalize to NFD (HFS+ uses a variant of NFD)
-    normalized = unicodedata.normalize("NFD", filtered)
-
-    # Check against invalid names (case-insensitive)
-    normalized_bytes = normalized.encode("utf-8", errors="strict")
-    if normalized_bytes.lower() in INVALID_DOTNAMES:
+    # Check against invalid names
+    if normalized in INVALID_DOTNAMES:
         return False
 
     # Also check for 8.3 short name
-    if normalized_bytes.lower() == b"git~1":
+    if normalized == b"git~1":
         return False
 
     return True
@@ -1613,7 +1649,7 @@ def _check_symlink_matches(
 ) -> bool:
     """Check if symlink target matches expected target.
 
-    Returns True if symlink needs to be written, False if it matches.
+    Returns True if symlink matches, False if it doesn't match.
     """
     try:
         current_target = os.readlink(full_path)
@@ -1621,14 +1657,14 @@ def _check_symlink_matches(
         expected_target = blob_obj.as_raw_string()
         if isinstance(current_target, str):
             current_target = current_target.encode()
-        return current_target != expected_target
+        return current_target == expected_target
     except FileNotFoundError:
         # Symlink doesn't exist
-        return True
+        return False
     except OSError as e:
         if e.errno == errno.EINVAL:
             # Not a symlink
-            return True
+            return False
         raise
 
 
@@ -1644,19 +1680,43 @@ def _check_file_matches(
 ) -> bool:
     """Check if a file on disk matches the expected git object.
 
-    Returns True if file needs to be written, False if it matches.
+    Returns True if file matches, False if it doesn't match.
     """
     # Check mode first (if honor_filemode is True)
     if honor_filemode:
         current_mode = stat.S_IMODE(current_stat.st_mode)
         expected_mode = stat.S_IMODE(entry_mode)
-        if current_mode != expected_mode:
-            return True
+
+        # For regular files, only check the user executable bit, not group/other permissions
+        # This matches Git's behavior where umask differences don't count as modifications
+        if stat.S_ISREG(current_stat.st_mode):
+            # Normalize regular file modes to ignore group/other write permissions
+            current_mode_normalized = (
+                current_mode & 0o755
+            )  # Keep only user rwx and all read+execute
+            expected_mode_normalized = expected_mode & 0o755
+
+            # For Git compatibility, regular files should be either 644 or 755
+            if expected_mode_normalized not in (0o644, 0o755):
+                expected_mode_normalized = 0o644  # Default for regular files
+            if current_mode_normalized not in (0o644, 0o755):
+                # Determine if it should be executable based on user execute bit
+                if current_mode & 0o100:  # User execute bit is set
+                    current_mode_normalized = 0o755
+                else:
+                    current_mode_normalized = 0o644
+
+            if current_mode_normalized != expected_mode_normalized:
+                return False
+        else:
+            # For non-regular files (symlinks, etc.), check mode exactly
+            if current_mode != expected_mode:
+                return False
 
     # If mode matches (or we don't care), check content via size first
     blob_obj = repo_object_store[entry_sha]
     if current_stat.st_size != blob_obj.raw_length():
-        return True
+        return False
 
     # Size matches, check actual content
     try:
@@ -1668,9 +1728,9 @@ def _check_file_matches(
                     blob_obj, tree_path
                 )
                 expected_content = normalized_blob.as_raw_string()
-            return current_content != expected_content
+            return current_content == expected_content
     except (FileNotFoundError, PermissionError, IsADirectoryError):
-        return True
+        return False
 
 
 def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
@@ -1710,7 +1770,7 @@ def _transition_to_file(
         and not stat.S_ISLNK(entry.mode)
     ):
         # File to file - check if update needed
-        needs_update = _check_file_matches(
+        file_matches = _check_file_matches(
             object_store,
             full_path,
             entry.sha,
@@ -1720,13 +1780,15 @@ def _transition_to_file(
             blob_normalizer,
             path,
         )
+        needs_update = not file_matches
     elif (
         current_stat is not None
         and stat.S_ISLNK(current_stat.st_mode)
         and stat.S_ISLNK(entry.mode)
     ):
         # Symlink to symlink - check if update needed
-        needs_update = _check_symlink_matches(full_path, object_store, entry.sha)
+        symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
+        needs_update = not symlink_matches
     else:
         needs_update = True
 
@@ -1810,16 +1872,142 @@ def _transition_to_absent(repo, path, full_path, current_stat, index):
     )
 
 
+def detect_case_only_renames(
+    changes: list["TreeChange"],
+    config: "Config",
+) -> list["TreeChange"]:
+    """Detect and transform case-only renames in a list of tree changes.
+
+    This function identifies file renames that only differ in case (e.g.,
+    README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
+    CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
+    based on the repository configuration.
+
+    Args:
+      changes: List of TreeChange objects representing file changes
+      config: Repository configuration object
+
+    Returns:
+      New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
+    """
+    from .diff_tree import (
+        CHANGE_ADD,
+        CHANGE_COPY,
+        CHANGE_DELETE,
+        CHANGE_MODIFY,
+        CHANGE_RENAME,
+        TreeChange,
+    )
+
+    # Build dictionaries of old and new paths with their normalized forms
+    old_paths_normalized = {}
+    new_paths_normalized = {}
+    old_changes = {}  # Map from old path to change object
+    new_changes = {}  # Map from new path to change object
+
+    # Get the appropriate normalizer based on config
+    normalize_func = get_path_element_normalizer(config)
+
+    def normalize_path(path: bytes) -> bytes:
+        """Normalize entire path using element normalization."""
+        return b"/".join(normalize_func(part) for part in path.split(b"/"))
+
+    # Pre-normalize all paths once to avoid repeated normalization
+    for change in changes:
+        if change.type == CHANGE_DELETE and change.old:
+            try:
+                normalized = normalize_path(change.old.path)
+            except UnicodeDecodeError:
+                import logging
+
+                logging.warning(
+                    "Skipping case-only rename detection for path with invalid UTF-8: %r",
+                    change.old.path,
+                )
+            else:
+                old_paths_normalized[normalized] = change.old.path
+                old_changes[change.old.path] = change
+        elif change.type == CHANGE_RENAME and change.old:
+            # Treat RENAME as DELETE + ADD for case-only detection
+            try:
+                normalized = normalize_path(change.old.path)
+            except UnicodeDecodeError:
+                import logging
+
+                logging.warning(
+                    "Skipping case-only rename detection for path with invalid UTF-8: %r",
+                    change.old.path,
+                )
+            else:
+                old_paths_normalized[normalized] = change.old.path
+                old_changes[change.old.path] = change
+
+        if (
+            change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
+            and change.new
+        ):
+            try:
+                normalized = normalize_path(change.new.path)
+            except UnicodeDecodeError:
+                import logging
+
+                logging.warning(
+                    "Skipping case-only rename detection for path with invalid UTF-8: %r",
+                    change.new.path,
+                )
+            else:
+                new_paths_normalized[normalized] = change.new.path
+                new_changes[change.new.path] = change
+
+    # Find case-only renames and transform changes
+    case_only_renames = set()
+    new_rename_changes = []
+
+    for norm_path, old_path in old_paths_normalized.items():
+        if norm_path in new_paths_normalized:
+            new_path = new_paths_normalized[norm_path]
+            if old_path != new_path:
+                # Found a case-only rename
+                old_change = old_changes[old_path]
+                new_change = new_changes[new_path]
+
+                # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
+                if new_change.type == CHANGE_ADD:
+                    # Simple case: DELETE + ADD becomes RENAME
+                    rename_change = TreeChange(
+                        CHANGE_RENAME, old_change.old, new_change.new
+                    )
+                else:
+                    # Complex case: DELETE + MODIFY becomes RENAME
+                    # Use the old file from DELETE and new file from MODIFY
+                    rename_change = TreeChange(
+                        CHANGE_RENAME, old_change.old, new_change.new
+                    )
+
+                new_rename_changes.append(rename_change)
+
+                # Mark the old changes for removal
+                case_only_renames.add(old_change)
+                case_only_renames.add(new_change)
+
+    # Return new list with original ADD/DELETE changes replaced by renames
+    result = [change for change in changes if change not in case_only_renames]
+    result.extend(new_rename_changes)
+    return result
+
+
 def update_working_tree(
     repo: "Repo",
     old_tree_id: Optional[bytes],
     new_tree_id: bytes,
+    change_iterator: Iterator["TreeChange"],
     honor_filemode: bool = True,
     validate_path_element: Optional[Callable[[bytes], bool]] = None,
     symlink_fn: Optional[Callable] = None,
     force_remove_untracked: bool = False,
     blob_normalizer: Optional["BlobNormalizer"] = None,
     tree_encoding: str = "utf-8",
+    allow_overwrite_modified: bool = False,
 ) -> None:
     """Update the working tree and index to match a new tree.
 
@@ -1833,6 +2021,7 @@ def update_working_tree(
       repo: Repository object
       old_tree_id: SHA of the tree before the update
       new_tree_id: SHA of the tree to update to
+      change_iterator: Iterator of TreeChange objects to apply
       honor_filemode: An optional flag to honor core.filemode setting
       validate_path_element: Function to validate path elements to check out
       symlink_fn: Function to use for creating symlinks
@@ -1841,168 +2030,203 @@ def update_working_tree(
       blob_normalizer: An optional BlobNormalizer to use for converting line
         endings when writing blobs to the working directory.
       tree_encoding: Encoding used for tree paths (default: utf-8)
+      allow_overwrite_modified: If False, raise an error when attempting to
+        overwrite files that have been modified compared to old_tree_id
     """
     if validate_path_element is None:
         validate_path_element = validate_path_element_default
 
+    from .diff_tree import (
+        CHANGE_ADD,
+        CHANGE_COPY,
+        CHANGE_DELETE,
+        CHANGE_MODIFY,
+        CHANGE_RENAME,
+        CHANGE_UNCHANGED,
+    )
+
     repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
     index = repo.open_index()
 
-    # Build sets of paths for efficient lookup
-    new_paths = {}
-    for entry in iter_tree_contents(repo.object_store, new_tree_id):
-        if entry.path.startswith(b".git") or not validate_path(
-            entry.path, validate_path_element
-        ):
-            continue
-        new_paths[entry.path] = entry
-
-    old_paths = {}
-    if old_tree_id:
-        for entry in iter_tree_contents(repo.object_store, old_tree_id):
-            if not entry.path.startswith(b".git"):
-                old_paths[entry.path] = entry
-
-    # Process all paths
-    all_paths = set(new_paths.keys()) | set(old_paths.keys())
-
-    # Check for paths that need to become directories
-    paths_needing_dir = set()
-    for path in new_paths:
-        parts = path.split(b"/")
-        for i in range(1, len(parts)):
-            parent = b"/".join(parts[:i])
-            if parent in old_paths and parent not in new_paths:
-                paths_needing_dir.add(parent)
+    # Convert iterator to list since we need multiple passes
+    changes = list(change_iterator)
+
+    # Transform case-only renames on case-insensitive filesystems
+    import platform
+
+    default_ignore_case = platform.system() in ("Windows", "Darwin")
+    config = repo.get_config()
+    ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
+
+    if ignore_case:
+        config = repo.get_config()
+        changes = detect_case_only_renames(changes, config)
+
+    # Check for path conflicts where files need to become directories
+    paths_becoming_dirs = set()
+    for change in changes:
+        if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
+            path = change.new.path
+            if b"/" in path:  # This is a file inside a directory
+                # Check if any parent path exists as a file in the old tree or changes
+                parts = path.split(b"/")
+                for i in range(1, len(parts)):
+                    parent = b"/".join(parts[:i])
+                    # See if this parent path is being deleted (was a file, becoming a dir)
+                    for other_change in changes:
+                        if (
+                            other_change.type == CHANGE_DELETE
+                            and other_change.old
+                            and other_change.old.path == parent
+                        ):
+                            paths_becoming_dirs.add(parent)
 
     # Check if any path that needs to become a directory has been modified
-    current_stat: Optional[os.stat_result]
-    stat_cache: dict[bytes, Optional[os.stat_result]] = {}
-    for path in paths_needing_dir:
+    for path in paths_becoming_dirs:
         full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
         try:
             current_stat = os.lstat(full_path)
         except FileNotFoundError:
-            # File doesn't exist, proceed
-            stat_cache[full_path] = None
-        except PermissionError:
-            # Can't read file, proceed
-            pass
-        else:
-            stat_cache[full_path] = current_stat
-            if stat.S_ISREG(current_stat.st_mode):
+            continue  # File doesn't exist, nothing to check
+        except OSError as e:
+            raise OSError(
+                f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
+            ) from e
+
+        if stat.S_ISREG(current_stat.st_mode):
+            # Find the old entry for this path
+            old_change = None
+            for change in changes:
+                if (
+                    change.type == CHANGE_DELETE
+                    and change.old
+                    and change.old.path == path
+                ):
+                    old_change = change
+                    break
+
+            if old_change:
                 # Check if file has been modified
-                old_entry = old_paths[path]
-                if _check_file_matches(
+                file_matches = _check_file_matches(
                     repo.object_store,
                     full_path,
-                    old_entry.sha,
-                    old_entry.mode,
+                    old_change.old.sha,
+                    old_change.old.mode,
                     current_stat,
                     honor_filemode,
                     blob_normalizer,
                     path,
-                ):
-                    # File has been modified, can't replace with directory
+                )
+                if not file_matches:
                     raise OSError(
                         f"Cannot replace modified file with directory: {path!r}"
                     )
 
-    # Process in two passes: deletions first, then additions/updates
-    # This handles case-only renames on case-insensitive filesystems correctly
-    paths_to_remove = []
-    paths_to_update = []
-
-    for path in sorted(all_paths):
-        if path in new_paths:
-            paths_to_update.append(path)
-        else:
-            paths_to_remove.append(path)
+    # Check for uncommitted modifications before making any changes
+    if not allow_overwrite_modified and old_tree_id:
+        for change in changes:
+            # Only check files that are being modified or deleted
+            if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
+                path = change.old.path
+                if path.startswith(b".git") or not validate_path(
+                    path, validate_path_element
+                ):
+                    continue
 
-    # First process removals
-    for path in paths_to_remove:
-        full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
+                full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
+                try:
+                    current_stat = os.lstat(full_path)
+                except FileNotFoundError:
+                    continue  # File doesn't exist, nothing to check
+                except OSError as e:
+                    raise OSError(
+                        f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
+                    ) from e
+
+                if stat.S_ISREG(current_stat.st_mode):
+                    # Check if working tree file differs from old tree
+                    file_matches = _check_file_matches(
+                        repo.object_store,
+                        full_path,
+                        change.old.sha,
+                        change.old.mode,
+                        current_stat,
+                        honor_filemode,
+                        blob_normalizer,
+                        path,
+                    )
+                    if not file_matches:
+                        from .errors import WorkingTreeModifiedError
+
+                        raise WorkingTreeModifiedError(
+                            f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
+                            f"would be overwritten by checkout. "
+                            f"Please commit your changes or stash them before you switch branches."
+                        )
+
+    # Apply the changes
+    for change in changes:
+        if change.type in (CHANGE_DELETE, CHANGE_RENAME):
+            # Remove file/directory
+            path = change.old.path
+            if path.startswith(b".git") or not validate_path(
+                path, validate_path_element
+            ):
+                continue
 
-        # Determine current state - use cache if available
-        try:
-            current_stat = stat_cache[full_path]
-        except KeyError:
+            full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
             try:
-                current_stat = os.lstat(full_path)
+                delete_stat: Optional[os.stat_result] = os.lstat(full_path)
             except FileNotFoundError:
-                current_stat = None
+                delete_stat = None
+            except OSError as e:
+                raise OSError(
+                    f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
+                ) from e
 
-        _transition_to_absent(repo, path, full_path, current_stat, index)
+            _transition_to_absent(repo, path, full_path, delete_stat, index)
 
-    # Then process additions/updates
-    for path in paths_to_update:
-        full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
+        if change.type in (
+            CHANGE_ADD,
+            CHANGE_MODIFY,
+            CHANGE_UNCHANGED,
+            CHANGE_COPY,
+            CHANGE_RENAME,
+        ):
+            # Add or modify file
+            path = change.new.path
+            if path.startswith(b".git") or not validate_path(
+                path, validate_path_element
+            ):
+                continue
 
-        # Determine current state - use cache if available
-        try:
-            current_stat = stat_cache[full_path]
-        except KeyError:
+            full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
             try:
-                current_stat = os.lstat(full_path)
+                modify_stat: Optional[os.stat_result] = os.lstat(full_path)
             except FileNotFoundError:
-                current_stat = None
-
-        new_entry = new_paths[path]
-
-        # Path should exist
-        if S_ISGITLINK(new_entry.mode):
-            _transition_to_submodule(
-                repo, path, full_path, current_stat, new_entry, index
-            )
-        else:
-            _transition_to_file(
-                repo.object_store,
-                path,
-                full_path,
-                current_stat,
-                new_entry,
-                index,
-                honor_filemode,
-                symlink_fn,
-                blob_normalizer,
-                tree_encoding,
-            )
+                modify_stat = None
+            except OSError as e:
+                raise OSError(
+                    f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
+                ) from e
 
-    # Handle force_remove_untracked
-    if force_remove_untracked:
-        for root, dirs, files in os.walk(repo_path):
-            if b".git" in os.fsencode(root):
-                continue
-            root_bytes = os.fsencode(root)
-            for file in files:
-                full_path = os.path.join(root_bytes, os.fsencode(file))
-                tree_path = os.path.relpath(full_path, repo_path)
-                if os.sep != "/":
-                    tree_path = tree_path.replace(os.sep.encode(), b"/")
-
-                if tree_path not in new_paths:
-                    _remove_file_with_readonly_handling(full_path)
-                    if tree_path in index:
-                        del index[tree_path]
-
-        # Clean up empty directories
-        for root, dirs, files in os.walk(repo_path, topdown=False):
-            root_bytes = os.fsencode(root)
-            if (
-                b".git" not in root_bytes
-                and root_bytes != repo_path
-                and not files
-                and not dirs
-            ):
-                try:
-                    os.rmdir(root)
-                except FileNotFoundError:
-                    # Directory was already removed
-                    pass
-                except OSError as e:
-                    if e.errno != errno.ENOTEMPTY:
-                        # Only ignore "directory not empty" errors
-                        raise
+            if S_ISGITLINK(change.new.mode):
+                _transition_to_submodule(
+                    repo, path, full_path, modify_stat, change.new, index
+                )
+            else:
+                _transition_to_file(
+                    repo.object_store,
+                    path,
+                    full_path,
+                    modify_stat,
+                    change.new,
+                    index,
+                    honor_filemode,
+                    symlink_fn,
+                    blob_normalizer,
+                    tree_encoding,
+                )
 
     index.write()
 

+ 66 - 19
dulwich/porcelain.py

@@ -85,6 +85,7 @@ import stat
 import sys
 import time
 from collections import namedtuple
+from collections.abc import Iterator
 from contextlib import closing, contextmanager
 from dataclasses import dataclass
 from io import BytesIO, RawIOBase
@@ -103,6 +104,8 @@ from .diff_tree import (
     CHANGE_MODIFY,
     CHANGE_RENAME,
     RENAME_CHANGE_TYPES,
+    TreeChange,
+    tree_changes,
 )
 from .errors import SendPackError
 from .graph import can_fast_forward
@@ -1896,13 +1899,6 @@ def reset(repo, mode, treeish: Union[str, bytes, Commit, Tree, Tag] = "HEAD") ->
 
         elif mode == "hard":
             # Hard reset: update HEAD, index, and working tree
-            # Get current HEAD tree for comparison
-            try:
-                current_head = r.refs[b"HEAD"]
-                current_tree = r[current_head].tree
-            except KeyError:
-                current_tree = None
-
             # Get configuration for working directory update
             config = r.get_config()
             honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
@@ -1929,15 +1925,28 @@ def reset(repo, mode, treeish: Union[str, bytes, Commit, Tree, Tag] = "HEAD") ->
 
             # Update working tree and index
             blob_normalizer = r.get_blob_normalizer()
+            # For reset --hard, use current index tree as old tree to get proper deletions
+            index = r.open_index()
+            if len(index) > 0:
+                index_tree_id = index.commit(r.object_store)
+            else:
+                # Empty index
+                index_tree_id = None
+
+            changes = tree_changes(
+                r.object_store, index_tree_id, tree.id, want_unchanged=True
+            )
             update_working_tree(
                 r,
-                current_tree,
+                index_tree_id,
                 tree.id,
+                change_iterator=changes,
                 honor_filemode=honor_filemode,
                 validate_path_element=validate_path_element,
                 symlink_fn=symlink_fn,
                 force_remove_untracked=True,
                 blob_normalizer=blob_normalizer,
+                allow_overwrite_modified=True,  # Allow overwriting modified files
             )
         else:
             raise Error(f"Invalid reset mode: {mode}")
@@ -2106,6 +2115,8 @@ def pull(
       fast_forward: If True, raise an exception when fast-forward is not possible
       ff_only: If True, only allow fast-forward merges. Raises DivergedBranches
         when branches have diverged rather than performing a merge.
+      force: If True, allow overwriting local changes in the working tree.
+        If False, pull will abort if it would overwrite uncommitted changes.
       filter_spec: A git-rev-list-style object filter spec, as an ASCII string.
         Only used if the server supports the Git protocol-v2 'filter'
         feature, and ignored otherwise.
@@ -2181,8 +2192,14 @@ def pull(
         if not merged and old_tree_id is not None:
             new_tree_id = r[b"HEAD"].tree
             blob_normalizer = r.get_blob_normalizer()
+            changes = tree_changes(r.object_store, old_tree_id, new_tree_id)
             update_working_tree(
-                r, old_tree_id, new_tree_id, blob_normalizer=blob_normalizer
+                r,
+                old_tree_id,
+                new_tree_id,
+                change_iterator=changes,
+                blob_normalizer=blob_normalizer,
+                allow_overwrite_modified=force,
             )
         if remote_name is not None:
             _import_remote_refs(r.refs, remote_name, fetch_result.refs)
@@ -3319,15 +3336,20 @@ def checkout(
         blob_normalizer = r.get_blob_normalizer()
 
         # Update working tree
+        tree_change_iterator: Iterator[TreeChange] = tree_changes(
+            r.object_store, current_tree_id, target_tree_id
+        )
         update_working_tree(
             r,
             current_tree_id,
             target_tree_id,
+            change_iterator=tree_change_iterator,
             honor_filemode=honor_filemode,
             validate_path_element=validate_path_element,
             symlink_fn=symlink_fn,
             force_remove_untracked=force,
             blob_normalizer=blob_normalizer,
+            allow_overwrite_modified=force,
         )
 
         # Update HEAD
@@ -3829,7 +3851,10 @@ def _do_merge(
         # Fast-forward merge
         r.refs[b"HEAD"] = merge_commit_id
         # Update the working directory
-        update_working_tree(r, head_commit.tree, merge_commit.tree)
+        changes = tree_changes(r.object_store, head_commit.tree, merge_commit.tree)
+        update_working_tree(
+            r, head_commit.tree, merge_commit.tree, change_iterator=changes
+        )
         return (merge_commit_id, [])
 
     if base_commit_id == merge_commit_id:
@@ -3848,7 +3873,8 @@ def _do_merge(
     r.object_store.add_object(merged_tree)
 
     # Update index and working directory
-    update_working_tree(r, head_commit.tree, merged_tree.id)
+    changes = tree_changes(r.object_store, head_commit.tree, merged_tree.id)
+    update_working_tree(r, head_commit.tree, merged_tree.id, change_iterator=changes)
 
     if conflicts or no_commit:
         # Don't create a commit if there are conflicts or no_commit is True
@@ -4134,7 +4160,15 @@ def cherry_pick(
         r.reset_index(merged_tree.id)
 
         # Update working tree from the new index
-        update_working_tree(r, head_commit.tree, merged_tree.id)
+        # Allow overwriting because we're applying the merge result
+        changes = tree_changes(r.object_store, head_commit.tree, merged_tree.id)
+        update_working_tree(
+            r,
+            head_commit.tree,
+            merged_tree.id,
+            change_iterator=changes,
+            allow_overwrite_modified=True,
+        )
 
         if conflicts:
             # Save state for later continuation
@@ -4248,7 +4282,10 @@ def revert(
 
             if conflicts:
                 # Update working tree with conflicts
-                update_working_tree(r, current_tree, merged_tree.id)
+                changes = tree_changes(r.object_store, current_tree, merged_tree.id)
+                update_working_tree(
+                    r, current_tree, merged_tree.id, change_iterator=changes
+                )
                 conflicted_paths = [c.decode("utf-8", "replace") for c in conflicts]
                 raise Error(f"Conflicts while reverting: {', '.join(conflicted_paths)}")
 
@@ -4256,7 +4293,10 @@ def revert(
             r.object_store.add_object(merged_tree)
 
             # Update working tree
-            update_working_tree(r, current_tree, merged_tree.id)
+            changes = tree_changes(r.object_store, current_tree, merged_tree.id)
+            update_working_tree(
+                r, current_tree, merged_tree.id, change_iterator=changes
+            )
             current_tree = merged_tree.id
 
             if not no_commit:
@@ -4831,7 +4871,8 @@ def bisect_start(
                 old_tree = r[r.head()].tree if r.head() else None
                 r.refs[b"HEAD"] = next_sha
                 commit = r[next_sha]
-                update_working_tree(r, old_tree, commit.tree)
+                changes = tree_changes(r.object_store, old_tree, commit.tree)
+                update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
             return next_sha
 
 
@@ -4855,7 +4896,8 @@ def bisect_bad(repo=".", rev: Optional[Union[str, bytes, Commit, Tag]] = None):
             old_tree = r[r.head()].tree if r.head() else None
             r.refs[b"HEAD"] = next_sha
             commit = r[next_sha]
-            update_working_tree(r, old_tree, commit.tree)
+            changes = tree_changes(r.object_store, old_tree, commit.tree)
+            update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
 
         return next_sha
 
@@ -4880,7 +4922,8 @@ def bisect_good(repo=".", rev: Optional[Union[str, bytes, Commit, Tag]] = None):
             old_tree = r[r.head()].tree if r.head() else None
             r.refs[b"HEAD"] = next_sha
             commit = r[next_sha]
-            update_working_tree(r, old_tree, commit.tree)
+            changes = tree_changes(r.object_store, old_tree, commit.tree)
+            update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
 
         return next_sha
 
@@ -4918,7 +4961,8 @@ def bisect_skip(
             old_tree = r[r.head()].tree if r.head() else None
             r.refs[b"HEAD"] = next_sha
             commit = r[next_sha]
-            update_working_tree(r, old_tree, commit.tree)
+            changes = tree_changes(r.object_store, old_tree, commit.tree)
+            update_working_tree(r, old_tree, commit.tree, change_iterator=changes)
 
         return next_sha
 
@@ -4946,7 +4990,10 @@ def bisect_reset(repo=".", commit: Optional[Union[str, bytes, Commit, Tag]] = No
             new_head = r.head()
             if new_head:
                 new_commit = r[new_head]
-                update_working_tree(r, old_tree, new_commit.tree)
+                changes = tree_changes(r.object_store, old_tree, new_commit.tree)
+                update_working_tree(
+                    r, old_tree, new_commit.tree, change_iterator=changes
+                )
         except KeyError:
             # No HEAD after reset
             pass

+ 4 - 0
dulwich/stash.py

@@ -25,6 +25,7 @@ import os
 import sys
 from typing import TYPE_CHECKING, Optional, TypedDict
 
+from .diff_tree import tree_changes
 from .file import GitFile
 from .index import (
     IndexEntry,
@@ -317,10 +318,13 @@ class Stash:
         # Update from stash tree to HEAD tree
         # This will remove files that were in stash but not in HEAD,
         # and restore files to their HEAD versions
+        changes = tree_changes(self._repo.object_store, stash_tree_id, head_tree_id)
         update_working_tree(
             self._repo,
             old_tree_id=stash_tree_id,
             new_tree_id=head_tree_id,
+            change_iterator=changes,
+            allow_overwrite_modified=True,  # We need to overwrite modified files
         )
 
         return cid

+ 445 - 46
tests/test_index.py

@@ -29,6 +29,16 @@ import sys
 import tempfile
 from io import BytesIO
 
+from dulwich.config import ConfigDict
+from dulwich.diff_tree import (
+    CHANGE_ADD,
+    CHANGE_COPY,
+    CHANGE_DELETE,
+    CHANGE_MODIFY,
+    CHANGE_RENAME,
+    TreeChange,
+    tree_changes,
+)
 from dulwich.index import (
     Index,
     IndexEntry,
@@ -42,6 +52,7 @@ from dulwich.index import (
     build_index_from_tree,
     cleanup_mode,
     commit_tree,
+    detect_case_only_renames,
     get_unstaged_changes,
     index_entry_from_directory,
     index_entry_from_path,
@@ -58,7 +69,7 @@ from dulwich.index import (
     write_index_dict,
 )
 from dulwich.object_store import MemoryObjectStore
-from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
+from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree, TreeEntry
 from dulwich.repo import Repo
 
 from . import TestCase, skipIf
@@ -1731,6 +1742,254 @@ class TestPathPrefixCompression(TestCase):
         self.assertEqual(b"short", decompressed)
 
 
+class TestDetectCaseOnlyRenames(TestCase):
+    """Tests for detect_case_only_renames function."""
+
+    def setUp(self):
+        self.config = ConfigDict()
+
+    def test_no_renames(self):
+        """Test when there are no renames."""
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"file1.txt", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"file2.txt", 0o100644, b"b" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # No case-only renames, so should return original changes
+        self.assertEqual(changes, result)
+
+    def test_simple_case_rename(self):
+        """Test simple case-only rename detection."""
+        # Default config uses case-insensitive comparison
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"README.txt", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"readme.txt", 0o100644, b"a" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # Should return one CHANGE_RENAME instead of ADD/DELETE pair
+        self.assertEqual(1, len(result))
+        self.assertEqual(CHANGE_RENAME, result[0].type)
+        self.assertEqual(b"README.txt", result[0].old.path)
+        self.assertEqual(b"readme.txt", result[0].new.path)
+
+    def test_nested_path_case_rename(self):
+        """Test case-only rename in nested paths."""
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"src/Main.java", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"src/main.java", 0o100644, b"a" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # Should return one CHANGE_RENAME instead of ADD/DELETE pair
+        self.assertEqual(1, len(result))
+        self.assertEqual(CHANGE_RENAME, result[0].type)
+        self.assertEqual(b"src/Main.java", result[0].old.path)
+        self.assertEqual(b"src/main.java", result[0].new.path)
+
+    def test_multiple_case_renames(self):
+        """Test multiple case-only renames."""
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"File1.txt", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"File2.TXT", 0o100644, b"b" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"file1.txt", 0o100644, b"a" * 40),
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"file2.txt", 0o100644, b"b" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # Should return two CHANGE_RENAME instead of ADD/DELETE pairs
+        self.assertEqual(2, len(result))
+        rename_changes = [c for c in result if c.type == CHANGE_RENAME]
+        self.assertEqual(2, len(rename_changes))
+        # Check that the renames are correct (order may vary)
+        rename_map = {c.old.path: c.new.path for c in rename_changes}
+        self.assertEqual(
+            {b"File1.txt": b"file1.txt", b"File2.TXT": b"file2.txt"}, rename_map
+        )
+
+    def test_case_rename_with_modify(self):
+        """Test case rename detection with CHANGE_MODIFY."""
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"README.md", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_MODIFY,
+                TreeEntry(b"readme.md", 0o100644, b"a" * 40),
+                TreeEntry(b"readme.md", 0o100644, b"b" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # Should return one CHANGE_RENAME instead of DELETE/MODIFY pair
+        self.assertEqual(1, len(result))
+        self.assertEqual(CHANGE_RENAME, result[0].type)
+        self.assertEqual(b"README.md", result[0].old.path)
+        self.assertEqual(b"readme.md", result[0].new.path)
+
+    def test_hfs_normalization(self):
+        """Test case rename detection with HFS+ normalization."""
+        # Configure for HFS+ (macOS)
+        self.config.set((b"core",), b"protectHFS", b"true")
+        self.config.set((b"core",), b"protectNTFS", b"false")
+
+        # Test with composed vs decomposed Unicode
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry("café.txt".encode(), 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry("CAFÉ.txt".encode(), 0o100644, b"a" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+
+        # Should return one CHANGE_RENAME for the case-only rename
+        self.assertEqual(1, len(result))
+        self.assertEqual(CHANGE_RENAME, result[0].type)
+        self.assertEqual("café.txt".encode(), result[0].old.path)
+        self.assertEqual("CAFÉ.txt".encode(), result[0].new.path)
+
+    def test_ntfs_normalization(self):
+        """Test case rename detection with NTFS normalization."""
+        # Configure for NTFS (Windows)
+        self.config.set((b"core",), b"protectNTFS", b"true")
+        self.config.set((b"core",), b"protectHFS", b"false")
+
+        # NTFS strips trailing dots and spaces
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"file.txt.", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"FILE.TXT", 0o100644, b"a" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # Should return one CHANGE_RENAME for the case-only rename
+        self.assertEqual(1, len(result))
+        self.assertEqual(CHANGE_RENAME, result[0].type)
+        self.assertEqual(b"file.txt.", result[0].old.path)
+        self.assertEqual(b"FILE.TXT", result[0].new.path)
+
+    def test_invalid_utf8_handling(self):
+        """Test handling of invalid UTF-8 in paths."""
+        # Invalid UTF-8 sequence
+        invalid_path = b"\xff\xfe"
+
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(invalid_path, 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_ADD,
+                None,
+                TreeEntry(b"valid.txt", 0o100644, b"b" * 40),
+            ),
+        ]
+
+        # Should not crash, just skip invalid paths
+        result = detect_case_only_renames(changes, self.config)
+        # No case-only renames detected, returns original changes
+        self.assertEqual(changes, result)
+
+    def test_rename_and_copy_changes(self):
+        """Test case rename detection with CHANGE_RENAME and CHANGE_COPY."""
+        changes = [
+            TreeChange(
+                CHANGE_DELETE,
+                TreeEntry(b"OldFile.txt", 0o100644, b"a" * 40),
+                None,
+            ),
+            TreeChange(
+                CHANGE_RENAME,
+                TreeEntry(b"other.txt", 0o100644, b"b" * 40),
+                TreeEntry(b"oldfile.txt", 0o100644, b"a" * 40),
+            ),
+            TreeChange(
+                CHANGE_COPY,
+                TreeEntry(b"source.txt", 0o100644, b"c" * 40),
+                TreeEntry(b"OLDFILE.TXT", 0o100644, b"a" * 40),
+            ),
+        ]
+
+        result = detect_case_only_renames(changes, self.config)
+        # The DELETE of OldFile.txt and COPY to OLDFILE.TXT are detected as a case-only rename
+        # The original RENAME (other.txt -> oldfile.txt) remains
+        # The COPY is consumed by the case-only rename detection
+        self.assertEqual(2, len(result))
+
+        # Find the changes
+        rename_changes = [c for c in result if c.type == CHANGE_RENAME]
+        self.assertEqual(2, len(rename_changes))
+
+        # Check for the case-only rename
+        case_rename = None
+        for change in rename_changes:
+            if change.old.path == b"OldFile.txt" and change.new.path == b"OLDFILE.TXT":
+                case_rename = change
+                break
+
+        self.assertIsNotNone(case_rename)
+        self.assertEqual(b"OldFile.txt", case_rename.old.path)
+        self.assertEqual(b"OLDFILE.TXT", case_rename.new.path)
+
+
 class TestUpdateWorkingTree(TestCase):
     def setUp(self):
         self.tempdir = tempfile.mkdtemp()
@@ -1776,10 +2035,12 @@ class TestUpdateWorkingTree(TestCase):
 
         # Update working tree with normalizer
         normalizer = TestBlobNormalizer()
+        changes = tree_changes(self.repo.object_store, None, tree.id)
         update_working_tree(
             self.repo,
             None,  # old_tree_id
             tree.id,  # new_tree_id
+            change_iterator=changes,
             blob_normalizer=normalizer,
         )
 
@@ -1806,10 +2067,12 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree)
 
         # Update working tree without normalizer
+        changes = tree_changes(self.repo.object_store, None, tree.id)
         update_working_tree(
             self.repo,
             None,  # old_tree_id
             tree.id,  # new_tree_id
+            change_iterator=changes,
             blob_normalizer=None,
         )
 
@@ -1841,7 +2104,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1 (create directory with files)
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify directory and files exist
         dir_path = os.path.join(self.tempdir, "dir")
@@ -1854,7 +2118,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update to empty tree
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Verify directory was removed
         self.assertFalse(os.path.exists(dir_path))
@@ -1868,7 +2133,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree with submodule
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify submodule directory exists with .git file
         submodule_path = os.path.join(self.tempdir, "submodule")
@@ -1885,7 +2151,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update to tree with file (should remove submodule directory and create file)
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Verify it's now a file
         self.assertTrue(os.path.isfile(submodule_path))
@@ -1904,7 +2171,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify nested structure exists
         path_a = os.path.join(self.tempdir, "a")
@@ -1919,7 +2187,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update to empty tree
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Verify all directories were removed
         self.assertFalse(os.path.exists(path_a))
@@ -1936,7 +2205,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify file exists
         file_path = os.path.join(self.tempdir, "path")
@@ -1953,7 +2223,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update should succeed but leave the directory alone
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Directory should still exist with its contents
         self.assertTrue(os.path.isdir(file_path))
@@ -1971,7 +2242,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify file exists
         file_path = os.path.join(self.tempdir, "path")
@@ -1986,7 +2258,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update should remove the empty directory
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Directory should be gone
         self.assertFalse(os.path.exists(file_path))
@@ -2007,7 +2280,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree with symlink
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         link_path = os.path.join(self.tempdir, "link")
         self.assertTrue(os.path.islink(link_path))
@@ -2022,7 +2296,8 @@ class TestUpdateWorkingTree(TestCase):
         tree2[b"link"] = (0o100644, blob2.id)
         self.repo.object_store.add_object(tree2)
 
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         self.assertFalse(os.path.islink(link_path))
         self.assertTrue(os.path.isfile(link_path))
@@ -2030,7 +2305,8 @@ class TestUpdateWorkingTree(TestCase):
             self.assertEqual(b"file content", f.read())
 
         # Test 2: Replace file with symlink
-        update_working_tree(self.repo, tree2.id, tree1.id)
+        changes = tree_changes(self.repo.object_store, tree2.id, tree1.id)
+        update_working_tree(self.repo, tree2.id, tree1.id, change_iterator=changes)
 
         self.assertTrue(os.path.islink(link_path))
         self.assertEqual(b"target/path", os.readlink(link_path).encode())
@@ -2044,7 +2320,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree3)
 
         # Should remove empty directory
-        update_working_tree(self.repo, tree1.id, tree3.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree3.id)
+        update_working_tree(self.repo, tree1.id, tree3.id, change_iterator=changes)
         self.assertFalse(os.path.exists(link_path))
 
     def test_update_working_tree_modified_file_to_dir_transition(self):
@@ -2059,7 +2336,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         file_path = os.path.join(self.tempdir, "path")
 
@@ -2078,7 +2356,8 @@ class TestUpdateWorkingTree(TestCase):
 
         # Update should fail because can't create directory where modified file exists
         with self.assertRaises(IOError):
-            update_working_tree(self.repo, tree1.id, tree2.id)
+            changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+            update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # File should still exist with modifications
         self.assertTrue(os.path.isfile(file_path))
@@ -2101,7 +2380,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         script_path = os.path.join(self.tempdir, "script.sh")
         self.assertTrue(os.path.isfile(script_path))
@@ -2116,7 +2396,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update to tree2
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Check it's now executable
         mode = os.stat(script_path).st_mode
@@ -2133,7 +2414,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree with submodule
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Add untracked file to submodule directory
         submodule_path = os.path.join(self.tempdir, "submodule")
@@ -2146,7 +2428,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update should not remove submodule directory with untracked files
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Directory should still exist with untracked file
         self.assertTrue(os.path.isdir(submodule_path))
@@ -2169,7 +2452,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify structure exists
         dir_path = os.path.join(self.tempdir, "dir")
@@ -2191,13 +2475,27 @@ class TestUpdateWorkingTree(TestCase):
 
         # Update should fail because directory is not empty
         with self.assertRaises(IsADirectoryError):
-            update_working_tree(self.repo, tree1.id, tree2.id)
+            changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+            update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Directory should still exist
         self.assertTrue(os.path.isdir(dir_path))
 
     def test_update_working_tree_case_sensitivity(self):
         """Test handling of case-sensitive filename changes."""
+        # Detect if filesystem is case-insensitive by testing
+        test_file = os.path.join(self.tempdir, "TeSt.tmp")
+        with open(test_file, "w") as f:
+            f.write("test")
+        is_case_insensitive = os.path.exists(os.path.join(self.tempdir, "test.tmp"))
+        os.unlink(test_file)
+
+        # Set core.ignorecase to match actual filesystem behavior
+        # (This ensures test works correctly regardless of platform defaults)
+        config = self.repo.get_config()
+        config.set((b"core",), b"ignorecase", is_case_insensitive)
+        config.write_to_path()
+
         # Create tree with lowercase file
         blob1 = Blob()
         blob1.data = b"lowercase content"
@@ -2208,7 +2506,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Create tree with uppercase file (different content)
         blob2 = Blob()
@@ -2220,18 +2519,101 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update to tree2
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Check what exists (behavior depends on filesystem)
         lowercase_path = os.path.join(self.tempdir, "readme.txt")
         uppercase_path = os.path.join(self.tempdir, "README.txt")
 
-        # On case-insensitive filesystems, one will overwrite the other
-        # On case-sensitive filesystems, both may exist
-        self.assertTrue(
-            os.path.exists(lowercase_path) or os.path.exists(uppercase_path)
+        if is_case_insensitive:
+            # On case-insensitive filesystems, should have one file with new content
+            # The exact case of the filename may vary by OS
+            self.assertTrue(
+                os.path.exists(lowercase_path) or os.path.exists(uppercase_path)
+            )
+            # Verify content is the new content
+            if os.path.exists(lowercase_path):
+                with open(lowercase_path, "rb") as f:
+                    self.assertEqual(b"uppercase content", f.read())
+            else:
+                with open(uppercase_path, "rb") as f:
+                    self.assertEqual(b"uppercase content", f.read())
+        else:
+            # On case-sensitive filesystems, only the uppercase file should exist
+            self.assertFalse(os.path.exists(lowercase_path))
+            self.assertTrue(os.path.exists(uppercase_path))
+            with open(uppercase_path, "rb") as f:
+                self.assertEqual(b"uppercase content", f.read())
+
+    def test_update_working_tree_case_rename_updates_filename(self):
+        """Test that case-only renames update the actual filename on case-insensitive FS."""
+        # Detect if filesystem is case-insensitive by testing
+        test_file = os.path.join(self.tempdir, "TeSt.tmp")
+        with open(test_file, "w") as f:
+            f.write("test")
+        is_case_insensitive = os.path.exists(os.path.join(self.tempdir, "test.tmp"))
+        os.unlink(test_file)
+
+        if not is_case_insensitive:
+            self.skipTest("Test only relevant on case-insensitive filesystems")
+
+        # Set core.ignorecase to match actual filesystem behavior
+        config = self.repo.get_config()
+        config.set((b"core",), b"ignorecase", True)
+        config.write_to_path()
+
+        # Create tree with lowercase file
+        blob1 = Blob()
+        blob1.data = b"same content"  # Using same content to test pure case rename
+        self.repo.object_store.add_object(blob1)
+
+        tree1 = Tree()
+        tree1[b"readme.txt"] = (0o100644, blob1.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
+
+        # Verify initial state
+        files = [f for f in os.listdir(self.tempdir) if not f.startswith(".git")]
+        self.assertEqual(["readme.txt"], files)
+
+        # Create tree with uppercase file (same content, same blob)
+        tree2 = Tree()
+        tree2[b"README.txt"] = (0o100644, blob1.id)  # Same blob!
+        self.repo.object_store.add_object(tree2)
+
+        # Update to tree2 (case-only rename)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
+
+        # On case-insensitive filesystems, should have one file with updated case
+        files = [f for f in os.listdir(self.tempdir) if not f.startswith(".git")]
+        self.assertEqual(
+            1, len(files), "Should have exactly one file after case rename"
+        )
+
+        # The file should now have the new case in the directory listing
+        actual_filename = files[0]
+        self.assertEqual(
+            "README.txt",
+            actual_filename,
+            "Filename case should be updated in directory listing",
         )
 
+        # Verify content is preserved
+        file_path = os.path.join(self.tempdir, actual_filename)
+        with open(file_path, "rb") as f:
+            self.assertEqual(b"same content", f.read())
+
+        # Both old and new case should access the same file
+        lowercase_path = os.path.join(self.tempdir, "readme.txt")
+        uppercase_path = os.path.join(self.tempdir, "README.txt")
+        self.assertTrue(os.path.exists(lowercase_path))
+        self.assertTrue(os.path.exists(uppercase_path))
+
     def test_update_working_tree_deeply_nested_removal(self):
         """Test removal of deeply nested directory structures."""
         # Create deeply nested structure
@@ -2246,7 +2628,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Verify deep structure exists
         current_path = self.tempdir
@@ -2259,7 +2642,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update should remove all empty directories
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Verify top level directory is gone
         top_level = os.path.join(self.tempdir, "level0")
@@ -2277,7 +2661,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Make file read-only
         file_path = os.path.join(self.tempdir, "readonly.txt")
@@ -2293,7 +2678,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update should handle read-only file
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         # Verify content was updated
         with open(file_path, "rb") as f:
@@ -2316,7 +2702,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree)
 
         # Update should skip invalid files based on validation
-        update_working_tree(self.repo, None, tree.id)
+        changes = tree_changes(self.repo.object_store, None, tree.id)
+        update_working_tree(self.repo, None, tree.id, change_iterator=changes)
 
         # Valid file should exist
         self.assertTrue(os.path.exists(os.path.join(self.tempdir, "valid.txt")))
@@ -2342,7 +2729,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         link_path = os.path.join(self.tempdir, "link")
         self.assertTrue(os.path.islink(link_path))
@@ -2357,7 +2745,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree2)
 
         # Update should replace symlink with actual directory
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
 
         self.assertFalse(os.path.islink(link_path))
         self.assertTrue(os.path.isdir(link_path))
@@ -2393,10 +2782,12 @@ class TestUpdateWorkingTree(TestCase):
         tree2[b"item"] = (S_IFGITLINK, submodule_sha)
         self.repo.object_store.add_object(tree2)
 
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
         self.assertTrue(os.path.isfile(os.path.join(self.tempdir, "item")))
 
-        update_working_tree(self.repo, tree1.id, tree2.id)
+        changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+        update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
         self.assertTrue(os.path.isdir(os.path.join(self.tempdir, "item")))
 
         # Test 2: Submodule → Executable file
@@ -2404,7 +2795,8 @@ class TestUpdateWorkingTree(TestCase):
         tree3[b"item"] = (0o100755, exec_blob.id)
         self.repo.object_store.add_object(tree3)
 
-        update_working_tree(self.repo, tree2.id, tree3.id)
+        changes = tree_changes(self.repo.object_store, tree2.id, tree3.id)
+        update_working_tree(self.repo, tree2.id, tree3.id, change_iterator=changes)
         item_path = os.path.join(self.tempdir, "item")
         self.assertTrue(os.path.isfile(item_path))
         if sys.platform != "win32":
@@ -2415,7 +2807,8 @@ class TestUpdateWorkingTree(TestCase):
         tree4[b"item"] = (0o120000, link_blob.id)
         self.repo.object_store.add_object(tree4)
 
-        update_working_tree(self.repo, tree3.id, tree4.id)
+        changes = tree_changes(self.repo.object_store, tree3.id, tree4.id)
+        update_working_tree(self.repo, tree3.id, tree4.id, change_iterator=changes)
         self.assertTrue(os.path.islink(item_path))
 
         # Test 4: Symlink → Submodule
@@ -2423,14 +2816,16 @@ class TestUpdateWorkingTree(TestCase):
         tree5[b"item"] = (S_IFGITLINK, submodule_sha)
         self.repo.object_store.add_object(tree5)
 
-        update_working_tree(self.repo, tree4.id, tree5.id)
+        changes = tree_changes(self.repo.object_store, tree4.id, tree5.id)
+        update_working_tree(self.repo, tree4.id, tree5.id, change_iterator=changes)
         self.assertTrue(os.path.isdir(item_path))
 
         # Test 5: Clean up - Submodule → absent
         tree6 = Tree()
         self.repo.object_store.add_object(tree6)
 
-        update_working_tree(self.repo, tree5.id, tree6.id)
+        changes = tree_changes(self.repo.object_store, tree5.id, tree6.id)
+        update_working_tree(self.repo, tree5.id, tree6.id, change_iterator=changes)
         self.assertFalse(os.path.exists(item_path))
 
         # Test 6: Symlink → Executable file
@@ -2438,7 +2833,8 @@ class TestUpdateWorkingTree(TestCase):
         tree7[b"item2"] = (0o120000, link_blob.id)
         self.repo.object_store.add_object(tree7)
 
-        update_working_tree(self.repo, tree6.id, tree7.id)
+        changes = tree_changes(self.repo.object_store, tree6.id, tree7.id)
+        update_working_tree(self.repo, tree6.id, tree7.id, change_iterator=changes)
         item2_path = os.path.join(self.tempdir, "item2")
         self.assertTrue(os.path.islink(item2_path))
 
@@ -2446,7 +2842,8 @@ class TestUpdateWorkingTree(TestCase):
         tree8[b"item2"] = (0o100755, exec_blob.id)
         self.repo.object_store.add_object(tree8)
 
-        update_working_tree(self.repo, tree7.id, tree8.id)
+        changes = tree_changes(self.repo.object_store, tree7.id, tree8.id)
+        update_working_tree(self.repo, tree7.id, tree8.id, change_iterator=changes)
         self.assertTrue(os.path.isfile(item2_path))
         if sys.platform != "win32":
             self.assertTrue(os.access(item2_path, os.X_OK))
@@ -2468,7 +2865,8 @@ class TestUpdateWorkingTree(TestCase):
         self.repo.object_store.add_object(tree1)
 
         # Update to tree1
-        update_working_tree(self.repo, None, tree1.id)
+        changes = tree_changes(self.repo.object_store, None, tree1.id)
+        update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
 
         # Create a directory where file2.txt is, to cause a conflict
         file2_path = os.path.join(self.tempdir, "file2.txt")
@@ -2494,7 +2892,8 @@ class TestUpdateWorkingTree(TestCase):
 
         # Update should partially succeed - file1 updated, file2 blocked
         try:
-            update_working_tree(self.repo, tree1.id, tree2.id)
+            changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
+            update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
         except IsADirectoryError:
             # Expected to fail on file2 because it's a directory
             pass

+ 213 - 13
tests/test_porcelain.py

@@ -3540,11 +3540,12 @@ class ResetTests(PorcelainTestCase):
         with open(file2, "w") as f:
             f.write("new content")
 
-        # Reset to commit that has file2 removed - should delete untracked file2
+        # Reset to commit that has file2 removed - untracked file2 should remain
         porcelain.reset(self.repo, "hard", sha2)
 
         self.assertTrue(os.path.exists(file1))
-        self.assertFalse(os.path.exists(file2))
+        # Untracked files are not removed by reset --hard
+        self.assertTrue(os.path.exists(file2))
 
     def test_hard_reset_to_remote_branch(self) -> None:
         """Test reset --hard to remote branch deletes local files not in remote."""
@@ -4024,16 +4025,30 @@ class CheckoutTests(PorcelainTestCase):
             [{"add": [], "delete": [], "modify": [b"nee"]}, [], []], status
         )
 
-        # The new checkout behavior allows switching if the file doesn't exist in target branch
-        # (changes can be preserved)
-        porcelain.checkout(self.repo, b"uni")
-        self.assertEqual(b"uni", porcelain.active_branch(self.repo))
+        # Checkout should fail when there are staged changes that would be lost
+        # This matches Git's behavior to prevent data loss
+        from dulwich.errors import WorkingTreeModifiedError
+
+        with self.assertRaises(WorkingTreeModifiedError) as cm:
+            porcelain.checkout(self.repo, b"uni")
+
+        self.assertIn("nee", str(cm.exception))
+
+        # Should still be on master branch
+        self.assertEqual(b"master", porcelain.active_branch(self.repo))
 
-        # The staged changes are lost and the file is removed from working tree
-        # because it doesn't exist in the target branch
+        # The staged changes should still be present
         status = list(porcelain.status(self.repo))
-        # File 'nee' is gone completely
-        self.assertEqual([{"add": [], "delete": [], "modify": []}, [], []], status)
+        self.assertEqual(
+            [{"add": [], "delete": [], "modify": [b"nee"]}, [], []], status
+        )
+        self.assertTrue(os.path.exists(nee_path))
+
+        # Force checkout should work and lose the changes
+        porcelain.checkout(self.repo, b"uni", force=True)
+        self.assertEqual(b"uni", porcelain.active_branch(self.repo))
+
+        # Now the file should be gone
         self.assertFalse(os.path.exists(nee_path))
 
     def test_checkout_to_branch_with_modified_file_not_present_forced(self) -> None:
@@ -4454,7 +4469,7 @@ class GeneralCheckoutTests(PorcelainTestCase):
         self.assertEqual(b"master", porcelain.active_branch(self.repo))
 
     def test_checkout_force(self) -> None:
-        """Test forced checkout discards local changes."""
+        """Test forced checkout discards local changes for files that differ between branches."""
         # Modify a file
         with open(self._foo_path, "w") as f:
             f.write("modified content\n")
@@ -4464,10 +4479,11 @@ class GeneralCheckoutTests(PorcelainTestCase):
 
         self.assertEqual(b"feature", porcelain.active_branch(self.repo))
 
-        # Local changes should be discarded
+        # Since foo has the same content in master and feature branches,
+        # checkout should NOT restore it - the modified content should remain
         with open(self._foo_path) as f:
             content = f.read()
-        self.assertEqual("initial content\n", content)
+        self.assertEqual("modified content\n", content)
 
     def test_checkout_nonexistent_ref(self) -> None:
         """Test checkout of non-existent branch/commit."""
@@ -5294,6 +5310,190 @@ class PullTests(PorcelainTestCase):
         with Repo(self.target_path) as r:
             self.assertEqual(r[b"HEAD"].id, self.repo[b"HEAD"].id)
 
+    def test_pull_protects_modified_files(self) -> None:
+        """Test that pull refuses to overwrite uncommitted changes by default."""
+        from dulwich.errors import WorkingTreeModifiedError
+
+        outstream = BytesIO()
+        errstream = BytesIO()
+
+        # Create a file with content in the source repo
+        test_file = os.path.join(self.repo.path, "testfile.txt")
+        with open(test_file, "w") as f:
+            f.write("original content")
+
+        porcelain.add(repo=self.repo.path, paths=[test_file])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Add test file",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Pull this change to target first
+        porcelain.pull(
+            self.target_path,
+            self.repo.path,
+            b"refs/heads/master",
+            outstream=outstream,
+            errstream=errstream,
+        )
+
+        # Now modify the file in source repo
+        with open(test_file, "w") as f:
+            f.write("updated content")
+
+        porcelain.add(repo=self.repo.path, paths=[test_file])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Update test file",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Modify the same file in target working directory (uncommitted)
+        target_file = os.path.join(self.target_path, "testfile.txt")
+        with open(target_file, "w") as f:
+            f.write("local modifications")
+
+        # Pull should fail because of uncommitted changes
+        with self.assertRaises(WorkingTreeModifiedError) as cm:
+            porcelain.pull(
+                self.target_path,
+                self.repo.path,
+                b"refs/heads/master",
+                outstream=outstream,
+                errstream=errstream,
+            )
+
+        self.assertIn("Your local changes", str(cm.exception))
+        self.assertIn("testfile.txt", str(cm.exception))
+
+        # Verify the file still has local modifications
+        with open(target_file) as f:
+            self.assertEqual(f.read(), "local modifications")
+
+    def test_pull_force_overwrites_modified_files(self) -> None:
+        """Test that pull with force=True overwrites uncommitted changes."""
+        outstream = BytesIO()
+        errstream = BytesIO()
+
+        # Create a file with content in the source repo
+        test_file = os.path.join(self.repo.path, "testfile.txt")
+        with open(test_file, "w") as f:
+            f.write("original content")
+
+        porcelain.add(repo=self.repo.path, paths=[test_file])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Add test file",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Pull this change to target first
+        porcelain.pull(
+            self.target_path,
+            self.repo.path,
+            b"refs/heads/master",
+            outstream=outstream,
+            errstream=errstream,
+        )
+
+        # Now modify the file in source repo
+        with open(test_file, "w") as f:
+            f.write("updated content")
+
+        porcelain.add(repo=self.repo.path, paths=[test_file])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Update test file",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Modify the same file in target working directory (uncommitted)
+        target_file = os.path.join(self.target_path, "testfile.txt")
+        with open(target_file, "w") as f:
+            f.write("local modifications")
+
+        # Pull with force=True should succeed and overwrite local changes
+        porcelain.pull(
+            self.target_path,
+            self.repo.path,
+            b"refs/heads/master",
+            outstream=outstream,
+            errstream=errstream,
+            force=True,
+        )
+
+        # Verify the file now has the remote content
+        with open(target_file) as f:
+            self.assertEqual(f.read(), "updated content")
+
+        # Check the HEAD is updated too
+        with Repo(self.target_path) as r:
+            self.assertEqual(r[b"HEAD"].id, self.repo[b"HEAD"].id)
+
+    def test_pull_allows_unmodified_files(self) -> None:
+        """Test that pull allows updating files that haven't been modified locally."""
+        outstream = BytesIO()
+        errstream = BytesIO()
+
+        # Create a file with content in the source repo
+        test_file = os.path.join(self.repo.path, "testfile.txt")
+        with open(test_file, "w") as f:
+            f.write("original content")
+
+        porcelain.add(repo=self.repo.path, paths=[test_file])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Add test file",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Pull this change to target first
+        porcelain.pull(
+            self.target_path,
+            self.repo.path,
+            b"refs/heads/master",
+            outstream=outstream,
+            errstream=errstream,
+        )
+
+        # Now modify the file in source repo
+        with open(test_file, "w") as f:
+            f.write("updated content")
+
+        porcelain.add(repo=self.repo.path, paths=[test_file])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Update test file",
+            author=b"test <email>",
+            committer=b"test <email>",
+        )
+
+        # Don't modify the file in target - it should be safe to update
+        target_file = os.path.join(self.target_path, "testfile.txt")
+
+        # Pull should succeed since the file wasn't modified locally
+        porcelain.pull(
+            self.target_path,
+            self.repo.path,
+            b"refs/heads/master",
+            outstream=outstream,
+            errstream=errstream,
+        )
+
+        # Verify the file now has the remote content
+        with open(target_file) as f:
+            self.assertEqual(f.read(), "updated content")
+
+        # Check the HEAD is updated too
+        with Repo(self.target_path) as r:
+            self.assertEqual(r[b"HEAD"].id, self.repo[b"HEAD"].id)
+
 
 class StatusTests(PorcelainTestCase):
     def test_empty(self) -> None: