2
0
Jelmer Vernooij 1 сар өмнө
parent
commit
84b5bd6a97
5 өөрчлөгдсөн 1174 нэмэгдсэн , 133 устгасан
  1. 3 0
      NEWS
  2. 393 112
      dulwich/index.py
  3. 45 1
      dulwich/submodule.py
  4. 702 19
      tests/test_index.py
  5. 31 1
      tests/test_submodule.py

+ 3 - 0
NEWS

@@ -9,6 +9,9 @@
    fail with a KeyError.
    (Jelmer Vernooij, #1638)
 
+ * Handle different file type transitions properly in ``update_working_tree``
+   (Jelmer Vernooij, #1638)
+
  * Fix KeyError when pulling from a shallow clone. Handle missing commits
    gracefully in graph traversal operations for shallow repositories.
    (Jelmer Vernooij, #813)

+ 393 - 112
dulwich/index.py

@@ -21,7 +21,9 @@
 
 """Parser for the git index file format."""
 
+import errno
 import os
+import shutil
 import stat
 import struct
 import sys
@@ -42,7 +44,7 @@ from typing import (
 if TYPE_CHECKING:
     from .file import _GitFile
     from .line_ending import BlobNormalizer
-    from .repo import BaseRepo
+    from .repo import Repo
 
 from .file import GitFile
 from .object_store import iter_tree_contents
@@ -1261,7 +1263,7 @@ def build_file_from_blob(
     contents = blob.as_raw_string()
     if stat.S_ISLNK(mode):
         if oldstat:
-            os.unlink(target_path)
+            _remove_file_with_readonly_handling(target_path)
         if sys.platform == "win32":
             # os.readlink on Python3 on Windows requires a unicode string.
             contents_str = contents.decode(tree_encoding)
@@ -1484,8 +1486,265 @@ def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
     return False
 
 
+os_sep_bytes = os.sep.encode("ascii")
+
+
+def _ensure_parent_dir_exists(full_path: bytes) -> None:
+    """Ensure parent directory exists, checking no parent is a file."""
+    parent_dir = os.path.dirname(full_path)
+    if parent_dir and not os.path.exists(parent_dir):
+        # Check if any parent in the path is a file
+        parts = parent_dir.split(os_sep_bytes)
+        for i in range(len(parts)):
+            partial_path = os_sep_bytes.join(parts[: i + 1])
+            if (
+                partial_path
+                and os.path.exists(partial_path)
+                and not os.path.isdir(partial_path)
+            ):
+                # Parent path is a file, this is an error
+                raise OSError(
+                    f"Cannot create directory, parent path is a file: {partial_path!r}"
+                )
+        os.makedirs(parent_dir)
+
+
+def _remove_file_with_readonly_handling(path: bytes) -> None:
+    """Remove a file, handling read-only files on Windows.
+
+    Args:
+      path: Path to the file to remove
+    """
+    try:
+        os.unlink(path)
+    except PermissionError:
+        # On Windows, remove read-only attribute and retry
+        if sys.platform == "win32":
+            os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
+            os.unlink(path)
+        else:
+            raise
+
+
+def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
+    """Remove empty parent directories up to stop_at."""
+    parent = os.path.dirname(path)
+    while parent and parent != stop_at:
+        try:
+            os.rmdir(parent)
+            parent = os.path.dirname(parent)
+        except FileNotFoundError:
+            # Directory doesn't exist - stop trying
+            break
+        except OSError as e:
+            if e.errno == errno.ENOTEMPTY:
+                # Directory not empty - stop trying
+                break
+            raise
+
+
+def _check_symlink_matches(
+    full_path: bytes, repo_object_store, entry_sha: bytes
+) -> bool:
+    """Check if symlink target matches expected target.
+
+    Returns True if symlink needs to be written, False if it matches.
+    """
+    try:
+        current_target = os.readlink(full_path)
+        blob_obj = repo_object_store[entry_sha]
+        expected_target = blob_obj.as_raw_string()
+        if isinstance(current_target, str):
+            current_target = current_target.encode()
+        return current_target != expected_target
+    except FileNotFoundError:
+        # Symlink doesn't exist
+        return True
+    except OSError as e:
+        if e.errno == errno.EINVAL:
+            # Not a symlink
+            return True
+        raise
+
+
+def _check_file_matches(
+    repo_object_store,
+    full_path: bytes,
+    entry_sha: bytes,
+    entry_mode: int,
+    current_stat: os.stat_result,
+    honor_filemode: bool,
+    blob_normalizer: Optional["BlobNormalizer"] = None,
+    tree_path: Optional[bytes] = None,
+) -> bool:
+    """Check if a file on disk matches the expected git object.
+
+    Returns True if file needs to be written, False if it matches.
+    """
+    # Check mode first (if honor_filemode is True)
+    if honor_filemode:
+        current_mode = stat.S_IMODE(current_stat.st_mode)
+        expected_mode = stat.S_IMODE(entry_mode)
+        if current_mode != expected_mode:
+            return True
+
+    # If mode matches (or we don't care), check content via size first
+    blob_obj = repo_object_store[entry_sha]
+    if current_stat.st_size != blob_obj.raw_length():
+        return True
+
+    # Size matches, check actual content
+    try:
+        with open(full_path, "rb") as f:
+            current_content = f.read()
+            expected_content = blob_obj.as_raw_string()
+            if blob_normalizer and tree_path is not None:
+                normalized_blob = blob_normalizer.checkout_normalize(
+                    blob_obj, tree_path
+                )
+                expected_content = normalized_blob.as_raw_string()
+            return current_content != expected_content
+    except (FileNotFoundError, PermissionError, IsADirectoryError):
+        return True
+
+
+def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
+    """Transition any type to submodule."""
+    from .submodule import ensure_submodule_placeholder
+
+    if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
+        # Already a directory, just ensure .git file exists
+        ensure_submodule_placeholder(repo, path)
+    else:
+        # Remove whatever is there and create submodule
+        if current_stat is not None:
+            _remove_file_with_readonly_handling(full_path)
+        ensure_submodule_placeholder(repo, path)
+
+    st = os.lstat(full_path)
+    index[path] = index_entry_from_stat(st, entry.sha)
+
+
+def _transition_to_file(
+    object_store,
+    path,
+    full_path,
+    current_stat,
+    entry,
+    index,
+    honor_filemode,
+    symlink_fn,
+    blob_normalizer,
+):
+    """Transition any type to regular file or symlink."""
+    # Check if we need to update
+    if (
+        current_stat is not None
+        and stat.S_ISREG(current_stat.st_mode)
+        and not stat.S_ISLNK(entry.mode)
+    ):
+        # File to file - check if update needed
+        needs_update = _check_file_matches(
+            object_store,
+            full_path,
+            entry.sha,
+            entry.mode,
+            current_stat,
+            honor_filemode,
+            blob_normalizer,
+            path,
+        )
+    elif (
+        current_stat is not None
+        and stat.S_ISLNK(current_stat.st_mode)
+        and stat.S_ISLNK(entry.mode)
+    ):
+        # Symlink to symlink - check if update needed
+        needs_update = _check_symlink_matches(full_path, object_store, entry.sha)
+    else:
+        needs_update = True
+
+    if not needs_update:
+        # Just update index - current_stat should always be valid here since we're not updating
+        index[path] = index_entry_from_stat(current_stat, entry.sha)
+        return
+
+    # Remove existing entry if needed
+    if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
+        # Remove directory
+        dir_contents = set(os.listdir(full_path))
+        git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
+
+        if git_file_name in dir_contents:
+            if dir_contents != {git_file_name}:
+                raise IsADirectoryError(
+                    f"Cannot replace submodule with untracked files: {full_path!r}"
+                )
+            shutil.rmtree(full_path)
+        else:
+            try:
+                os.rmdir(full_path)
+            except OSError as e:
+                if e.errno == errno.ENOTEMPTY:
+                    raise IsADirectoryError(
+                        f"Cannot replace non-empty directory with file: {full_path!r}"
+                    )
+                raise
+    elif current_stat is not None:
+        _remove_file_with_readonly_handling(full_path)
+
+    # Ensure parent directory exists
+    _ensure_parent_dir_exists(full_path)
+
+    # Write the file
+    blob_obj = object_store[entry.sha]
+    assert isinstance(blob_obj, Blob)
+    if blob_normalizer:
+        blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
+    st = build_file_from_blob(
+        blob_obj,
+        entry.mode,
+        full_path,
+        honor_filemode=honor_filemode,
+        symlink_fn=symlink_fn,
+    )
+    index[path] = index_entry_from_stat(st, entry.sha)
+
+
+def _transition_to_absent(repo, path, full_path, current_stat, index):
+    """Remove any type of entry."""
+    if current_stat is None:
+        return
+
+    if stat.S_ISDIR(current_stat.st_mode):
+        # Check if it's a submodule directory
+        dir_contents = set(os.listdir(full_path))
+        git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
+
+        if git_file_name in dir_contents and dir_contents == {git_file_name}:
+            shutil.rmtree(full_path)
+        else:
+            try:
+                os.rmdir(full_path)
+            except OSError as e:
+                if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
+                    raise
+    else:
+        _remove_file_with_readonly_handling(full_path)
+
+    try:
+        del index[path]
+    except KeyError:
+        pass
+
+    # Try to remove empty parent directories
+    _remove_empty_parents(
+        full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
+    )
+
+
 def update_working_tree(
-    repo: "BaseRepo",
+    repo: "Repo",
     old_tree_id: Optional[bytes],
     new_tree_id: bytes,
     honor_filemode: bool = True,
@@ -1514,137 +1773,162 @@ def update_working_tree(
       blob_normalizer: An optional BlobNormalizer to use for converting line
         endings when writing blobs to the working directory.
     """
-    import os
-
-    # Set default validate_path_element if not provided
     if validate_path_element is None:
         validate_path_element = validate_path_element_default
 
-    # Get the trees
-    old_tree = repo[old_tree_id] if old_tree_id else None
-    repo[new_tree_id]
-
-    # Open the index
+    repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
     index = repo.open_index()
 
-    # Track which paths we've dealt with
-    handled_paths = set()
-
-    # Get repo path as string for comparisons
-    if not hasattr(repo, "path"):
-        raise ValueError("Repository must have a path attribute")
-    repo_path_str = repo.path if isinstance(repo.path, str) else repo.path.decode()
-
-    # First, update/add all files in the new tree
+    # Build sets of paths for efficient lookup
+    new_paths = {}
     for entry in iter_tree_contents(repo.object_store, new_tree_id):
-        handled_paths.add(entry.path)
-
-        # Skip .git directory
-        if entry.path.startswith(b".git"):
+        if entry.path.startswith(b".git") or not validate_path(
+            entry.path, validate_path_element
+        ):
             continue
+        new_paths[entry.path] = entry
 
-        # Validate path element
-        if not validate_path(entry.path, validate_path_element):
-            continue
+    old_paths = {}
+    if old_tree_id:
+        for entry in iter_tree_contents(repo.object_store, old_tree_id):
+            if not entry.path.startswith(b".git"):
+                old_paths[entry.path] = entry
+
+    # Process all paths
+    all_paths = set(new_paths.keys()) | set(old_paths.keys())
+
+    # Check for paths that need to become directories
+    paths_needing_dir = set()
+    for path in new_paths:
+        parts = path.split(b"/")
+        for i in range(1, len(parts)):
+            parent = b"/".join(parts[:i])
+            if parent in old_paths and parent not in new_paths:
+                paths_needing_dir.add(parent)
+
+    # Check if any path that needs to become a directory has been modified
+    current_stat: Optional[os.stat_result]
+    stat_cache: dict[bytes, Optional[os.stat_result]] = {}
+    for path in paths_needing_dir:
+        full_path = _tree_to_fs_path(repo_path, path)
+        try:
+            current_stat = os.lstat(full_path)
+        except FileNotFoundError:
+            # File doesn't exist, proceed
+            stat_cache[full_path] = None
+        except PermissionError:
+            # Can't read file, proceed
+            pass
+        else:
+            stat_cache[full_path] = current_stat
+            if stat.S_ISREG(current_stat.st_mode):
+                # Check if file has been modified
+                old_entry = old_paths[path]
+                if _check_file_matches(
+                    repo.object_store,
+                    full_path,
+                    old_entry.sha,
+                    old_entry.mode,
+                    current_stat,
+                    honor_filemode,
+                    blob_normalizer,
+                    path,
+                ):
+                    # File has been modified, can't replace with directory
+                    raise OSError(
+                        f"Cannot replace modified file with directory: {path!r}"
+                    )
+
+    # Process in two passes: deletions first, then additions/updates
+    # This handles case-only renames on case-insensitive filesystems correctly
+    paths_to_remove = []
+    paths_to_update = []
+
+    for path in sorted(all_paths):
+        if path in new_paths:
+            paths_to_update.append(path)
+        else:
+            paths_to_remove.append(path)
 
-        # Build full path
-        full_path = os.path.join(repo_path_str, entry.path.decode())
+    # First process removals
+    for path in paths_to_remove:
+        full_path = _tree_to_fs_path(repo_path, path)
 
-        # Get the blob
-        blob_obj = repo.object_store[entry.sha]
-        if not isinstance(blob_obj, Blob):
-            raise ValueError(f"Object {entry.sha!r} is not a blob")
+        # Determine current state - use cache if available
+        try:
+            current_stat = stat_cache[full_path]
+        except KeyError:
+            try:
+                current_stat = os.lstat(full_path)
+            except FileNotFoundError:
+                current_stat = None
 
-        # Apply blob normalization for checkout if normalizer is provided
-        if blob_normalizer is not None:
-            blob_obj = blob_normalizer.checkout_normalize(blob_obj, entry.path)
+        _transition_to_absent(repo, path, full_path, current_stat, index)
 
-        # Ensure parent directory exists
-        parent_dir = os.path.dirname(full_path)
-        if parent_dir and not os.path.exists(parent_dir):
-            os.makedirs(parent_dir)
+    # Then process additions/updates
+    for path in paths_to_update:
+        full_path = _tree_to_fs_path(repo_path, path)
 
-        # Write the file
-        st = build_file_from_blob(
-            blob_obj,
-            entry.mode,
-            full_path.encode(),
-            honor_filemode=honor_filemode,
-            symlink_fn=symlink_fn,
-        )
+        # Determine current state - use cache if available
+        try:
+            current_stat = stat_cache[full_path]
+        except KeyError:
+            try:
+                current_stat = os.lstat(full_path)
+            except FileNotFoundError:
+                current_stat = None
 
-        # Update index
-        index[entry.path] = index_entry_from_stat(st, entry.sha)
+        new_entry = new_paths[path]
 
-    # Remove files that existed in old tree but not in new tree
-    if old_tree:
-        for entry in iter_tree_contents(repo.object_store, old_tree_id):
-            if entry.path not in handled_paths:
-                # Skip .git directory
-                if entry.path.startswith(b".git"):
-                    continue
-
-                # File was deleted
-                full_path = os.path.join(repo_path_str, entry.path.decode())
-
-                # Remove from working tree
-                if os.path.exists(full_path):
-                    os.remove(full_path)
-
-                # Remove from index
-                if entry.path in index:
-                    del index[entry.path]
-
-                # Clean up empty directories
-                dir_path = os.path.dirname(full_path)
-                while (
-                    dir_path and dir_path != repo_path_str and os.path.exists(dir_path)
-                ):
-                    try:
-                        if not os.listdir(dir_path):
-                            os.rmdir(dir_path)
-                            dir_path = os.path.dirname(dir_path)
-                        else:
-                            break
-                    except OSError:
-                        break
-
-    # If force_remove_untracked is True, remove any files in working directory
-    # that are not in the target tree (useful for reset --hard)
-    if force_remove_untracked:
-        # Walk through all files in the working directory
-        for root, dirs, files in os.walk(repo_path_str):
-            # Skip .git directory
-            if ".git" in dirs:
-                dirs.remove(".git")
+        # Path should exist
+        if S_ISGITLINK(new_entry.mode):
+            _transition_to_submodule(
+                repo, path, full_path, current_stat, new_entry, index
+            )
+        else:
+            _transition_to_file(
+                repo.object_store,
+                path,
+                full_path,
+                current_stat,
+                new_entry,
+                index,
+                honor_filemode,
+                symlink_fn,
+                blob_normalizer,
+            )
 
+    # Handle force_remove_untracked
+    if force_remove_untracked:
+        for root, dirs, files in os.walk(repo_path):
+            if b".git" in os.fsencode(root):
+                continue
+            root_bytes = os.fsencode(root)
             for file in files:
-                full_path = os.path.join(root, file)
-                # Get relative path from repo root
-                rel_path = os.path.relpath(full_path, repo_path_str)
-                # Normalize to use forward slashes like Git does internally
-                rel_path = rel_path.replace(os.sep, "/")
-                rel_path_bytes = rel_path.encode()
-
-                # If this file is not in the target tree, remove it
-                if rel_path_bytes not in handled_paths:
-                    os.remove(full_path)
+                full_path = os.path.join(root_bytes, os.fsencode(file))
+                tree_path = os.path.relpath(full_path, repo_path)
+                if os.sep != "/":
+                    tree_path = tree_path.replace(os.sep.encode(), b"/")
 
-                    # Remove from index if present
-                    if rel_path_bytes in index:
-                        del index[rel_path_bytes]
+                if tree_path not in new_paths:
+                    _remove_file_with_readonly_handling(full_path)
+                    if tree_path in index:
+                        del index[tree_path]
 
         # Clean up empty directories
-        for root, dirs, files in os.walk(repo_path_str, topdown=False):
-            if ".git" in root:
-                continue
-            if root != repo_path_str and not files and not dirs:
+        for root, dirs, files in os.walk(repo_path, topdown=False):
+            root_bytes = os.fsencode(root)
+            if (
+                b".git" not in root_bytes
+                and root_bytes != repo_path
+                and not files
+                and not dirs
+            ):
                 try:
                     os.rmdir(root)
                 except OSError:
                     pass
 
-    # Write the updated index
     index.write()
 
 
@@ -1694,9 +1978,6 @@ def get_unstaged_changes(
                 yield tree_path
 
 
-os_sep_bytes = os.sep.encode("ascii")
-
-
 def _tree_to_fs_path(root_path: bytes, tree_path: bytes) -> bytes:
     """Convert a git tree path to a file system path.
 

+ 45 - 1
dulwich/submodule.py

@@ -21,14 +21,16 @@
 
 """Working with Git submodules."""
 
+import os
 from collections.abc import Iterator
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Union
 
 from .object_store import iter_tree_contents
 from .objects import S_ISGITLINK
 
 if TYPE_CHECKING:
     from .object_store import ObjectContainer
+    from .repo import Repo
 
 
 def iter_cached_submodules(
@@ -46,3 +48,45 @@ def iter_cached_submodules(
     for entry in iter_tree_contents(store, root_tree_id):
         if S_ISGITLINK(entry.mode):
             yield entry.path, entry.sha
+
+
+def ensure_submodule_placeholder(
+    repo: "Repo",
+    submodule_path: Union[str, bytes],
+) -> None:
+    """Create a submodule placeholder directory with .git file.
+
+    This creates the minimal structure needed for a submodule:
+    - The submodule directory
+    - A .git file pointing to the submodule's git directory
+
+    Args:
+      repo: Parent repository
+      submodule_path: Path to the submodule relative to repo root
+    """
+    # Ensure path is bytes
+    if isinstance(submodule_path, str):
+        submodule_path = submodule_path.encode()
+
+    # Get repo path as bytes
+    repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
+
+    # Create full path to submodule
+    full_path = os.path.join(repo_path, submodule_path)
+
+    # Create submodule directory if it doesn't exist
+    if not os.path.exists(full_path):
+        os.makedirs(full_path)
+
+    # Create .git file pointing to the submodule's git directory
+    git_filename = b".git" if isinstance(full_path, bytes) else ".git"
+    git_file_path = os.path.join(full_path, git_filename)
+    if not os.path.exists(git_file_path):
+        # Submodule git directories are typically stored in .git/modules/<name>
+        # The relative path from the submodule to the parent's .git directory
+        # depends on the submodule's depth
+        depth = submodule_path.count(b"/") + 1
+        relative_git_dir = b"../" * depth + b".git/modules/" + submodule_path
+
+        with open(git_file_path, "wb") as f:
+            f.write(b"gitdir: " + relative_git_dir + b"\n")

+ 702 - 19
tests/test_index.py

@@ -33,6 +33,8 @@ from dulwich.index import (
     Index,
     IndexEntry,
     SerializedIndexEntry,
+    _compress_path,
+    _decompress_path,
     _fs_to_tree_path,
     _tree_to_fs_path,
     build_index_from_tree,
@@ -45,6 +47,7 @@ from dulwich.index import (
     iter_fresh_entries,
     read_index,
     read_index_dict,
+    update_working_tree,
     validate_path_element_default,
     validate_path_element_ntfs,
     write_cache_time,
@@ -1063,7 +1066,6 @@ class TestIndexEntryFromPath(TestCase):
     def test_read_submodule_head(self) -> None:
         """Test reading the HEAD of a submodule."""
         from dulwich.index import read_submodule_head
-        from dulwich.repo import Repo
 
         # Create a test repo that will be our "submodule"
         sub_repo_dir = os.path.join(self.tempdir, "subrepo")
@@ -1099,7 +1101,6 @@ class TestIndexEntryFromPath(TestCase):
     def test_has_directory_changed(self) -> None:
         """Test checking if a directory has changed."""
         from dulwich.index import IndexEntry, _has_directory_changed
-        from dulwich.repo import Repo
 
         # Setup mock IndexEntry
         mock_entry = IndexEntry(
@@ -1465,8 +1466,6 @@ class TestManyFilesRepoIntegration(TestCase):
 
     def test_repo_with_manyfiles_config(self):
         """Test that a repository with feature.manyFiles=true uses the right settings."""
-        from dulwich.repo import Repo
-
         # Create a new repository
         repo = Repo.init(self.tempdir)
 
@@ -1482,8 +1481,6 @@ class TestManyFilesRepoIntegration(TestCase):
 
     def test_repo_with_explicit_index_settings(self):
         """Test that explicit index.version and index.skipHash work."""
-        from dulwich.repo import Repo
-
         # Create a new repository
         repo = Repo.init(self.tempdir)
 
@@ -1519,8 +1516,6 @@ class TestPathPrefixCompression(TestCase):
 
     def test_path_compression_simple(self):
         """Test simple path compression cases."""
-        from dulwich.index import _compress_path, _decompress_path
-
         # Test case 1: No common prefix
         compressed = _compress_path(b"file1.txt", b"")
         decompressed, _ = _decompress_path(compressed, 0, b"")
@@ -1538,8 +1533,6 @@ class TestPathPrefixCompression(TestCase):
 
     def test_path_compression_deep_directories(self):
         """Test compression with deep directory structures."""
-        from dulwich.index import _compress_path, _decompress_path
-
         path1 = b"src/main/java/com/example/service/UserService.java"
         path2 = b"src/main/java/com/example/service/OrderService.java"
         path3 = b"src/main/java/com/example/model/User.java"
@@ -1628,8 +1621,6 @@ class TestPathPrefixCompression(TestCase):
 
     def test_path_compression_edge_cases(self):
         """Test edge cases in path compression."""
-        from dulwich.index import _compress_path, _decompress_path
-
         # Empty paths
         compressed = _compress_path(b"", b"")
         decompressed, _ = _decompress_path(compressed, 0, b"")
@@ -1649,15 +1640,28 @@ class TestPathPrefixCompression(TestCase):
 class TestUpdateWorkingTree(TestCase):
     def setUp(self):
         self.tempdir = tempfile.mkdtemp()
-        self.addCleanup(shutil.rmtree, self.tempdir)
-        from dulwich.repo import Repo
+
+        def cleanup_tempdir():
+            """Remove tempdir, handling read-only files on Windows."""
+
+            def remove_readonly(func, path, excinfo):
+                """Error handler for Windows read-only files."""
+                import stat
+
+                if sys.platform == "win32" and excinfo[0] is PermissionError:
+                    os.chmod(path, stat.S_IWRITE)
+                    func(path)
+                else:
+                    raise
+
+            shutil.rmtree(self.tempdir, onerror=remove_readonly)
+
+        self.addCleanup(cleanup_tempdir)
 
         self.repo = Repo.init(self.tempdir)
 
     def test_update_working_tree_with_blob_normalizer(self):
         """Test update_working_tree with a blob normalizer."""
-        from dulwich.index import update_working_tree
-        from dulwich.objects import Blob, Tree
 
         # Create a simple blob normalizer that converts CRLF to LF
         class TestBlobNormalizer:
@@ -1698,9 +1702,6 @@ class TestUpdateWorkingTree(TestCase):
 
     def test_update_working_tree_without_blob_normalizer(self):
         """Test update_working_tree without a blob normalizer."""
-        from dulwich.index import update_working_tree
-        from dulwich.objects import Blob, Tree
-
         # Create a tree with a file containing CRLF
         blob = Blob()
         blob.data = b"Hello\r\nWorld\r\n"
@@ -1728,3 +1729,685 @@ class TestUpdateWorkingTree(TestCase):
         # Check that the index has the blob SHA
         index = self.repo.open_index()
         self.assertEqual(blob.id, index[b"test.txt"].sha)
+
+    def test_update_working_tree_remove_directory(self):
+        """Test that update_working_tree properly removes directories."""
+        # Create initial tree with a directory containing files
+        blob1 = Blob()
+        blob1.data = b"content1"
+        self.repo.object_store.add_object(blob1)
+
+        blob2 = Blob()
+        blob2.data = b"content2"
+        self.repo.object_store.add_object(blob2)
+
+        tree1 = Tree()
+        tree1[b"dir/file1.txt"] = (0o100644, blob1.id)
+        tree1[b"dir/file2.txt"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1 (create directory with files)
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify directory and files exist
+        dir_path = os.path.join(self.tempdir, "dir")
+        self.assertTrue(os.path.isdir(dir_path))
+        self.assertTrue(os.path.exists(os.path.join(dir_path, "file1.txt")))
+        self.assertTrue(os.path.exists(os.path.join(dir_path, "file2.txt")))
+
+        # Create empty tree (remove everything)
+        tree2 = Tree()
+        self.repo.object_store.add_object(tree2)
+
+        # Update to empty tree
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Verify directory was removed
+        self.assertFalse(os.path.exists(dir_path))
+
+    def test_update_working_tree_submodule_to_file(self):
+        """Test replacing a submodule directory with a file."""
+        # Create tree with submodule
+        submodule_sha = b"a" * 40
+        tree1 = Tree()
+        tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree with submodule
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify submodule directory exists with .git file
+        submodule_path = os.path.join(self.tempdir, "submodule")
+        self.assertTrue(os.path.isdir(submodule_path))
+        self.assertTrue(os.path.exists(os.path.join(submodule_path, ".git")))
+
+        # Create tree with file at same path
+        blob = Blob()
+        blob.data = b"file content"
+        self.repo.object_store.add_object(blob)
+
+        tree2 = Tree()
+        tree2[b"submodule"] = (0o100644, blob.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update to tree with file (should remove submodule directory and create file)
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Verify it's now a file
+        self.assertTrue(os.path.isfile(submodule_path))
+        with open(submodule_path, "rb") as f:
+            self.assertEqual(b"file content", f.read())
+
+    def test_update_working_tree_directory_with_nested_subdir(self):
+        """Test removing directory with nested subdirectories."""
+        # Create tree with nested directories
+        blob = Blob()
+        blob.data = b"deep content"
+        self.repo.object_store.add_object(blob)
+
+        tree1 = Tree()
+        tree1[b"a/b/c/file.txt"] = (0o100644, blob.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify nested structure exists
+        path_a = os.path.join(self.tempdir, "a")
+        path_b = os.path.join(path_a, "b")
+        path_c = os.path.join(path_b, "c")
+        file_path = os.path.join(path_c, "file.txt")
+
+        self.assertTrue(os.path.exists(file_path))
+
+        # Create empty tree
+        tree2 = Tree()
+        self.repo.object_store.add_object(tree2)
+
+        # Update to empty tree
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Verify all directories were removed
+        self.assertFalse(os.path.exists(path_a))
+
+    def test_update_working_tree_file_replaced_by_dir_not_removed(self):
+        """Test that a directory replacing a git file is left alone if not empty."""
+        # Create tree with a file
+        blob = Blob()
+        blob.data = b"file content"
+        self.repo.object_store.add_object(blob)
+
+        tree1 = Tree()
+        tree1[b"path"] = (0o100644, blob.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify file exists
+        file_path = os.path.join(self.tempdir, "path")
+        self.assertTrue(os.path.isfile(file_path))
+
+        # Manually replace file with directory containing untracked file
+        os.remove(file_path)
+        os.mkdir(file_path)
+        with open(os.path.join(file_path, "untracked.txt"), "w") as f:
+            f.write("untracked content")
+
+        # Create empty tree
+        tree2 = Tree()
+        self.repo.object_store.add_object(tree2)
+
+        # Update should succeed but leave the directory alone
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Directory should still exist with its contents
+        self.assertTrue(os.path.isdir(file_path))
+        self.assertTrue(os.path.exists(os.path.join(file_path, "untracked.txt")))
+
+    def test_update_working_tree_file_replaced_by_empty_dir_removed(self):
+        """Test that an empty directory replacing a git file is removed."""
+        # Create tree with a file
+        blob = Blob()
+        blob.data = b"file content"
+        self.repo.object_store.add_object(blob)
+
+        tree1 = Tree()
+        tree1[b"path"] = (0o100644, blob.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify file exists
+        file_path = os.path.join(self.tempdir, "path")
+        self.assertTrue(os.path.isfile(file_path))
+
+        # Manually replace file with empty directory
+        os.remove(file_path)
+        os.mkdir(file_path)
+
+        # Create empty tree
+        tree2 = Tree()
+        self.repo.object_store.add_object(tree2)
+
+        # Update should remove the empty directory
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Directory should be gone
+        self.assertFalse(os.path.exists(file_path))
+
+    def test_update_working_tree_symlink_transitions(self):
+        """Test transitions involving symlinks."""
+        # Skip on Windows where symlinks might not be supported
+        if sys.platform == "win32":
+            self.skipTest("Symlinks not fully supported on Windows")
+
+        # Create tree with symlink
+        blob1 = Blob()
+        blob1.data = b"target/path"
+        self.repo.object_store.add_object(blob1)
+
+        tree1 = Tree()
+        tree1[b"link"] = (0o120000, blob1.id)  # Symlink mode
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree with symlink
+        update_working_tree(self.repo, None, tree1.id)
+
+        link_path = os.path.join(self.tempdir, "link")
+        self.assertTrue(os.path.islink(link_path))
+        self.assertEqual(b"target/path", os.readlink(link_path).encode())
+
+        # Test 1: Replace symlink with regular file
+        blob2 = Blob()
+        blob2.data = b"file content"
+        self.repo.object_store.add_object(blob2)
+
+        tree2 = Tree()
+        tree2[b"link"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree2)
+
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        self.assertFalse(os.path.islink(link_path))
+        self.assertTrue(os.path.isfile(link_path))
+        with open(link_path, "rb") as f:
+            self.assertEqual(b"file content", f.read())
+
+        # Test 2: Replace file with symlink
+        update_working_tree(self.repo, tree2.id, tree1.id)
+
+        self.assertTrue(os.path.islink(link_path))
+        self.assertEqual(b"target/path", os.readlink(link_path).encode())
+
+        # Test 3: Replace symlink with directory (manually)
+        os.unlink(link_path)
+        os.mkdir(link_path)
+
+        # Create empty tree
+        tree3 = Tree()
+        self.repo.object_store.add_object(tree3)
+
+        # Should remove empty directory
+        update_working_tree(self.repo, tree1.id, tree3.id)
+        self.assertFalse(os.path.exists(link_path))
+
+    def test_update_working_tree_modified_file_to_dir_transition(self):
+        """Test that modified files are not removed when they should be directories."""
+        # Create tree with file
+        blob1 = Blob()
+        blob1.data = b"original content"
+        self.repo.object_store.add_object(blob1)
+
+        tree1 = Tree()
+        tree1[b"path"] = (0o100644, blob1.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        file_path = os.path.join(self.tempdir, "path")
+
+        # Modify the file locally
+        with open(file_path, "w") as f:
+            f.write("modified content")
+
+        # Create tree where path is a directory with file
+        blob2 = Blob()
+        blob2.data = b"subfile content"
+        self.repo.object_store.add_object(blob2)
+
+        tree2 = Tree()
+        tree2[b"path/subfile"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update should fail because can't create directory where modified file exists
+        with self.assertRaises(IOError):
+            update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # File should still exist with modifications
+        self.assertTrue(os.path.isfile(file_path))
+        with open(file_path) as f:
+            self.assertEqual("modified content", f.read())
+
+    def test_update_working_tree_executable_transitions(self):
+        """Test transitions involving executable bit changes."""
+        # Skip on Windows where executable bit is not supported
+        if sys.platform == "win32":
+            self.skipTest("Executable bit not supported on Windows")
+
+        # Create tree with non-executable file
+        blob = Blob()
+        blob.data = b"#!/bin/sh\necho hello"
+        self.repo.object_store.add_object(blob)
+
+        tree1 = Tree()
+        tree1[b"script.sh"] = (0o100644, blob.id)  # Non-executable
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        script_path = os.path.join(self.tempdir, "script.sh")
+        self.assertTrue(os.path.isfile(script_path))
+
+        # Check it's not executable
+        mode = os.stat(script_path).st_mode
+        self.assertFalse(mode & stat.S_IXUSR)
+
+        # Create tree with executable file (same content)
+        tree2 = Tree()
+        tree2[b"script.sh"] = (0o100755, blob.id)  # Executable
+        self.repo.object_store.add_object(tree2)
+
+        # Update to tree2
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Check it's now executable
+        mode = os.stat(script_path).st_mode
+        self.assertTrue(mode & stat.S_IXUSR)
+
+    def test_update_working_tree_submodule_with_untracked_files(self):
+        """Test that submodules with untracked files are not removed."""
+        from dulwich.objects import S_IFGITLINK, Tree
+
+        # Create tree with submodule
+        submodule_sha = b"a" * 40
+        tree1 = Tree()
+        tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree with submodule
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Add untracked file to submodule directory
+        submodule_path = os.path.join(self.tempdir, "submodule")
+        untracked_path = os.path.join(submodule_path, "untracked.txt")
+        with open(untracked_path, "w") as f:
+            f.write("untracked content")
+
+        # Create empty tree
+        tree2 = Tree()
+        self.repo.object_store.add_object(tree2)
+
+        # Update should not remove submodule directory with untracked files
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Directory should still exist with untracked file
+        self.assertTrue(os.path.isdir(submodule_path))
+        self.assertTrue(os.path.exists(untracked_path))
+
+    def test_update_working_tree_dir_to_file_with_subdir(self):
+        """Test replacing directory structure with a file."""
+        # Create tree with nested directory structure
+        blob1 = Blob()
+        blob1.data = b"content1"
+        self.repo.object_store.add_object(blob1)
+
+        blob2 = Blob()
+        blob2.data = b"content2"
+        self.repo.object_store.add_object(blob2)
+
+        tree1 = Tree()
+        tree1[b"dir/subdir/file1"] = (0o100644, blob1.id)
+        tree1[b"dir/subdir/file2"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify structure exists
+        dir_path = os.path.join(self.tempdir, "dir")
+        self.assertTrue(os.path.isdir(dir_path))
+
+        # Add an untracked file to make directory truly non-empty
+        untracked_path = os.path.join(dir_path, "untracked.txt")
+        with open(untracked_path, "w") as f:
+            f.write("untracked content")
+
+        # Create tree with file at "dir" path
+        blob3 = Blob()
+        blob3.data = b"replacement file"
+        self.repo.object_store.add_object(blob3)
+
+        tree2 = Tree()
+        tree2[b"dir"] = (0o100644, blob3.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update should fail because directory is not empty
+        with self.assertRaises(IsADirectoryError):
+            update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Directory should still exist
+        self.assertTrue(os.path.isdir(dir_path))
+
+    def test_update_working_tree_case_sensitivity(self):
+        """Test handling of case-sensitive filename changes."""
+        # Create tree with lowercase file
+        blob1 = Blob()
+        blob1.data = b"lowercase content"
+        self.repo.object_store.add_object(blob1)
+
+        tree1 = Tree()
+        tree1[b"readme.txt"] = (0o100644, blob1.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Create tree with uppercase file (different content)
+        blob2 = Blob()
+        blob2.data = b"uppercase content"
+        self.repo.object_store.add_object(blob2)
+
+        tree2 = Tree()
+        tree2[b"README.txt"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update to tree2
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Check what exists (behavior depends on filesystem)
+        lowercase_path = os.path.join(self.tempdir, "readme.txt")
+        uppercase_path = os.path.join(self.tempdir, "README.txt")
+
+        # On case-insensitive filesystems, one will overwrite the other
+        # On case-sensitive filesystems, both may exist
+        self.assertTrue(
+            os.path.exists(lowercase_path) or os.path.exists(uppercase_path)
+        )
+
+    def test_update_working_tree_deeply_nested_removal(self):
+        """Test removal of deeply nested directory structures."""
+        # Create deeply nested structure
+        blob = Blob()
+        blob.data = b"deep content"
+        self.repo.object_store.add_object(blob)
+
+        tree1 = Tree()
+        # Create a very deep path
+        deep_path = b"/".join([b"level%d" % i for i in range(10)])
+        tree1[deep_path + b"/file.txt"] = (0o100644, blob.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Verify deep structure exists
+        current_path = self.tempdir
+        for i in range(10):
+            current_path = os.path.join(current_path, f"level{i}")
+            self.assertTrue(os.path.isdir(current_path))
+
+        # Create empty tree
+        tree2 = Tree()
+        self.repo.object_store.add_object(tree2)
+
+        # Update should remove all empty directories
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Verify top level directory is gone
+        top_level = os.path.join(self.tempdir, "level0")
+        self.assertFalse(os.path.exists(top_level))
+
+    def test_update_working_tree_read_only_files(self):
+        """Test handling of read-only files during updates."""
+        # Create tree with file
+        blob1 = Blob()
+        blob1.data = b"original content"
+        self.repo.object_store.add_object(blob1)
+
+        tree1 = Tree()
+        tree1[b"readonly.txt"] = (0o100644, blob1.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Make file read-only
+        file_path = os.path.join(self.tempdir, "readonly.txt")
+        os.chmod(file_path, 0o444)  # Read-only
+
+        # Create tree with modified file
+        blob2 = Blob()
+        blob2.data = b"new content"
+        self.repo.object_store.add_object(blob2)
+
+        tree2 = Tree()
+        tree2[b"readonly.txt"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update should handle read-only file
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        # Verify content was updated
+        with open(file_path, "rb") as f:
+            self.assertEqual(b"new content", f.read())
+
+    def test_update_working_tree_invalid_filenames(self):
+        """Test handling of invalid filenames for the platform."""
+        # Create tree with potentially problematic filenames
+        blob = Blob()
+        blob.data = b"content"
+        self.repo.object_store.add_object(blob)
+
+        tree = Tree()
+        # Add files with names that might be invalid on some platforms
+        tree[b"valid.txt"] = (0o100644, blob.id)
+        if sys.platform != "win32":
+            # These are invalid on Windows but valid on Unix
+            tree[b"file:with:colons.txt"] = (0o100644, blob.id)
+            tree[b"file<with>brackets.txt"] = (0o100644, blob.id)
+        self.repo.object_store.add_object(tree)
+
+        # Update should skip invalid files based on validation
+        update_working_tree(self.repo, None, tree.id)
+
+        # Valid file should exist
+        self.assertTrue(os.path.exists(os.path.join(self.tempdir, "valid.txt")))
+
+    def test_update_working_tree_symlink_to_directory(self):
+        """Test replacing a symlink pointing to a directory with a real directory."""
+        if sys.platform == "win32":
+            self.skipTest("Symlinks not fully supported on Windows")
+
+        # Create a target directory
+        target_dir = os.path.join(self.tempdir, "target")
+        os.mkdir(target_dir)
+        with open(os.path.join(target_dir, "file.txt"), "w") as f:
+            f.write("target file")
+
+        # Create tree with symlink pointing to directory
+        blob1 = Blob()
+        blob1.data = b"target"  # Relative path to target directory
+        self.repo.object_store.add_object(blob1)
+
+        tree1 = Tree()
+        tree1[b"link"] = (0o120000, blob1.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        link_path = os.path.join(self.tempdir, "link")
+        self.assertTrue(os.path.islink(link_path))
+
+        # Create tree with actual directory at same path
+        blob2 = Blob()
+        blob2.data = b"new file content"
+        self.repo.object_store.add_object(blob2)
+
+        tree2 = Tree()
+        tree2[b"link/newfile.txt"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update should replace symlink with actual directory
+        update_working_tree(self.repo, tree1.id, tree2.id)
+
+        self.assertFalse(os.path.islink(link_path))
+        self.assertTrue(os.path.isdir(link_path))
+        self.assertTrue(os.path.exists(os.path.join(link_path, "newfile.txt")))
+
+    def test_update_working_tree_comprehensive_transitions(self):
+        """Test all possible file type transitions comprehensively."""
+        # Skip on Windows where symlinks might not be supported
+        if sys.platform == "win32":
+            self.skipTest("Symlinks not fully supported on Windows")
+
+        # Create blobs for different file types
+        file_blob = Blob()
+        file_blob.data = b"regular file content"
+        self.repo.object_store.add_object(file_blob)
+
+        exec_blob = Blob()
+        exec_blob.data = b"#!/bin/sh\necho executable"
+        self.repo.object_store.add_object(exec_blob)
+
+        link_blob = Blob()
+        link_blob.data = b"target/path"
+        self.repo.object_store.add_object(link_blob)
+
+        submodule_sha = b"a" * 40
+
+        # Test 1: Regular file → Submodule
+        tree1 = Tree()
+        tree1[b"item"] = (0o100644, file_blob.id)
+        self.repo.object_store.add_object(tree1)
+
+        tree2 = Tree()
+        tree2[b"item"] = (S_IFGITLINK, submodule_sha)
+        self.repo.object_store.add_object(tree2)
+
+        update_working_tree(self.repo, None, tree1.id)
+        self.assertTrue(os.path.isfile(os.path.join(self.tempdir, "item")))
+
+        update_working_tree(self.repo, tree1.id, tree2.id)
+        self.assertTrue(os.path.isdir(os.path.join(self.tempdir, "item")))
+
+        # Test 2: Submodule → Executable file
+        tree3 = Tree()
+        tree3[b"item"] = (0o100755, exec_blob.id)
+        self.repo.object_store.add_object(tree3)
+
+        update_working_tree(self.repo, tree2.id, tree3.id)
+        item_path = os.path.join(self.tempdir, "item")
+        self.assertTrue(os.path.isfile(item_path))
+        if sys.platform != "win32":
+            self.assertTrue(os.access(item_path, os.X_OK))
+
+        # Test 3: Executable file → Symlink
+        tree4 = Tree()
+        tree4[b"item"] = (0o120000, link_blob.id)
+        self.repo.object_store.add_object(tree4)
+
+        update_working_tree(self.repo, tree3.id, tree4.id)
+        self.assertTrue(os.path.islink(item_path))
+
+        # Test 4: Symlink → Submodule
+        tree5 = Tree()
+        tree5[b"item"] = (S_IFGITLINK, submodule_sha)
+        self.repo.object_store.add_object(tree5)
+
+        update_working_tree(self.repo, tree4.id, tree5.id)
+        self.assertTrue(os.path.isdir(item_path))
+
+        # Test 5: Clean up - Submodule → absent
+        tree6 = Tree()
+        self.repo.object_store.add_object(tree6)
+
+        update_working_tree(self.repo, tree5.id, tree6.id)
+        self.assertFalse(os.path.exists(item_path))
+
+        # Test 6: Symlink → Executable file
+        tree7 = Tree()
+        tree7[b"item2"] = (0o120000, link_blob.id)
+        self.repo.object_store.add_object(tree7)
+
+        update_working_tree(self.repo, tree6.id, tree7.id)
+        item2_path = os.path.join(self.tempdir, "item2")
+        self.assertTrue(os.path.islink(item2_path))
+
+        tree8 = Tree()
+        tree8[b"item2"] = (0o100755, exec_blob.id)
+        self.repo.object_store.add_object(tree8)
+
+        update_working_tree(self.repo, tree7.id, tree8.id)
+        self.assertTrue(os.path.isfile(item2_path))
+        if sys.platform != "win32":
+            self.assertTrue(os.access(item2_path, os.X_OK))
+
+    def test_update_working_tree_partial_update_failure(self):
+        """Test handling when update fails partway through."""
+        # Create initial tree
+        blob1 = Blob()
+        blob1.data = b"file1 content"
+        self.repo.object_store.add_object(blob1)
+
+        blob2 = Blob()
+        blob2.data = b"file2 content"
+        self.repo.object_store.add_object(blob2)
+
+        tree1 = Tree()
+        tree1[b"file1.txt"] = (0o100644, blob1.id)
+        tree1[b"file2.txt"] = (0o100644, blob2.id)
+        self.repo.object_store.add_object(tree1)
+
+        # Update to tree1
+        update_working_tree(self.repo, None, tree1.id)
+
+        # Create a directory where file2.txt is, to cause a conflict
+        file2_path = os.path.join(self.tempdir, "file2.txt")
+        os.remove(file2_path)
+        os.mkdir(file2_path)
+        # Add untracked file to prevent removal
+        with open(os.path.join(file2_path, "blocker.txt"), "w") as f:
+            f.write("blocking content")
+
+        # Create tree with updates to both files
+        blob3 = Blob()
+        blob3.data = b"file1 updated"
+        self.repo.object_store.add_object(blob3)
+
+        blob4 = Blob()
+        blob4.data = b"file2 updated"
+        self.repo.object_store.add_object(blob4)
+
+        tree2 = Tree()
+        tree2[b"file1.txt"] = (0o100644, blob3.id)
+        tree2[b"file2.txt"] = (0o100644, blob4.id)
+        self.repo.object_store.add_object(tree2)
+
+        # Update should partially succeed - file1 updated, file2 blocked
+        try:
+            update_working_tree(self.repo, tree1.id, tree2.id)
+        except IsADirectoryError:
+            # Expected to fail on file2 because it's a directory
+            pass
+
+        # file1 should be updated
+        with open(os.path.join(self.tempdir, "file1.txt"), "rb") as f:
+            self.assertEqual(b"file1 updated", f.read())
+
+        # file2 should still be a directory
+        self.assertTrue(os.path.isdir(file2_path))

+ 31 - 1
tests/test_submodule.py

@@ -30,7 +30,7 @@ from dulwich.objects import (
     Tree,
 )
 from dulwich.repo import Repo
-from dulwich.submodule import iter_cached_submodules
+from dulwich.submodule import ensure_submodule_placeholder, iter_cached_submodules
 
 from . import TestCase
 
@@ -115,3 +115,33 @@ class SubmoduleTests(TestCase):
         path, sha = submodules[0]
         self.assertEqual(b"submodule", path)
         self.assertEqual(submodule_sha, sha)
+
+    def test_ensure_submodule_placeholder(self) -> None:
+        """Test creating submodule placeholder directories."""
+        # Create a repository
+        repo_path = os.path.join(self.test_dir, "testrepo")
+        repo = Repo.init(repo_path, mkdir=True)
+
+        # Test creating a simple submodule placeholder
+        ensure_submodule_placeholder(repo, b"libs/mylib")
+
+        # Check that the directory was created
+        submodule_path = os.path.join(repo_path, "libs", "mylib")
+        self.assertTrue(os.path.isdir(submodule_path))
+
+        # Check that the .git file was created
+        git_file_path = os.path.join(submodule_path, ".git")
+        self.assertTrue(os.path.isfile(git_file_path))
+
+        # Check the content of the .git file
+        with open(git_file_path, "rb") as f:
+            content = f.read()
+        self.assertEqual(b"gitdir: ../../.git/modules/libs/mylib\n", content)
+
+        # Test with string path
+        ensure_submodule_placeholder(repo, "libs/another")
+        another_path = os.path.join(repo_path, "libs", "another")
+        self.assertTrue(os.path.isdir(another_path))
+
+        # Test idempotency - calling again should not fail
+        ensure_submodule_placeholder(repo, b"libs/mylib")