|
@@ -39,7 +39,10 @@ from dulwich.index import (
|
|
|
cleanup_mode,
|
|
|
commit_tree,
|
|
|
get_unstaged_changes,
|
|
|
+ index_entry_from_directory,
|
|
|
+ index_entry_from_path,
|
|
|
index_entry_from_stat,
|
|
|
+ iter_fresh_entries,
|
|
|
read_index,
|
|
|
read_index_dict,
|
|
|
validate_path_element_default,
|
|
@@ -786,6 +789,27 @@ class TestTreeFSPathConversion(TestCase):
|
|
|
os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
|
|
|
)
|
|
|
|
|
|
+ def test_tree_to_fs_path_windows_separator(self) -> None:
|
|
|
+ tree_path = b"path/with/slash"
|
|
|
+ original_sep = os.sep.encode("ascii")
|
|
|
+ try:
|
|
|
+ # Temporarily modify os_sep_bytes to test Windows path conversion
|
|
|
+ # This simulates Windows behavior on all platforms for testing
|
|
|
+ import dulwich.index
|
|
|
+
|
|
|
+ dulwich.index.os_sep_bytes = b"\\"
|
|
|
+
|
|
|
+ fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
|
|
|
+
|
|
|
+ # The function should join the prefix path with the converted tree path
|
|
|
+ # The expected behavior is that the path separators in the tree_path are
|
|
|
+ # converted to the platform-specific separator (which we've set to backslash)
|
|
|
+ expected_path = os.path.join(b"/prefix/path", b"path\\with\\slash")
|
|
|
+ self.assertEqual(fs_path, expected_path)
|
|
|
+ finally:
|
|
|
+ # Restore original value
|
|
|
+ dulwich.index.os_sep_bytes = original_sep
|
|
|
+
|
|
|
def test_fs_to_tree_path_str(self) -> None:
|
|
|
fs_path = os.path.join(os.path.join("délwíçh", "foo"))
|
|
|
tree_path = _fs_to_tree_path(fs_path)
|
|
@@ -795,3 +819,395 @@ class TestTreeFSPathConversion(TestCase):
|
|
|
fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
|
|
|
tree_path = _fs_to_tree_path(fs_path)
|
|
|
self.assertEqual(tree_path, "délwíçh/foo".encode())
|
|
|
+
|
|
|
+ def test_fs_to_tree_path_windows_separator(self) -> None:
|
|
|
+ # Test conversion of Windows paths to tree paths
|
|
|
+ fs_path = b"path\\with\\backslash"
|
|
|
+ original_sep = os.sep.encode("ascii")
|
|
|
+ try:
|
|
|
+ # Temporarily modify os_sep_bytes to test Windows path conversion
|
|
|
+ import dulwich.index
|
|
|
+
|
|
|
+ dulwich.index.os_sep_bytes = b"\\"
|
|
|
+
|
|
|
+ tree_path = _fs_to_tree_path(fs_path)
|
|
|
+ self.assertEqual(tree_path, b"path/with/backslash")
|
|
|
+ finally:
|
|
|
+ # Restore original value
|
|
|
+ dulwich.index.os_sep_bytes = original_sep
|
|
|
+
|
|
|
+
|
|
|
+class TestIndexEntryFromPath(TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ self.tempdir = tempfile.mkdtemp()
|
|
|
+ self.addCleanup(shutil.rmtree, self.tempdir)
|
|
|
+
|
|
|
+ def test_index_entry_from_path_file(self) -> None:
|
|
|
+ """Test creating index entry from a regular file."""
|
|
|
+ # Create a test file
|
|
|
+ test_file = os.path.join(self.tempdir, "testfile")
|
|
|
+ with open(test_file, "wb") as f:
|
|
|
+ f.write(b"test content")
|
|
|
+
|
|
|
+ # Get the index entry
|
|
|
+ entry = index_entry_from_path(os.fsencode(test_file))
|
|
|
+
|
|
|
+ # Verify the entry was created with the right mode
|
|
|
+ self.assertIsNotNone(entry)
|
|
|
+ self.assertEqual(cleanup_mode(os.stat(test_file).st_mode), entry.mode)
|
|
|
+
|
|
|
+ @skipIf(not can_symlink(), "Requires symlink support")
|
|
|
+ def test_index_entry_from_path_symlink(self) -> None:
|
|
|
+ """Test creating index entry from a symlink."""
|
|
|
+ # Create a target file
|
|
|
+ target_file = os.path.join(self.tempdir, "target")
|
|
|
+ with open(target_file, "wb") as f:
|
|
|
+ f.write(b"target content")
|
|
|
+
|
|
|
+ # Create a symlink
|
|
|
+ link_file = os.path.join(self.tempdir, "symlink")
|
|
|
+ os.symlink(target_file, link_file)
|
|
|
+
|
|
|
+ # Get the index entry
|
|
|
+ entry = index_entry_from_path(os.fsencode(link_file))
|
|
|
+
|
|
|
+ # Verify the entry was created with the right mode
|
|
|
+ self.assertIsNotNone(entry)
|
|
|
+ self.assertEqual(cleanup_mode(os.lstat(link_file).st_mode), entry.mode)
|
|
|
+
|
|
|
+ def test_index_entry_from_path_directory(self) -> None:
|
|
|
+ """Test creating index entry from a directory (should return None)."""
|
|
|
+ # Create a directory
|
|
|
+ test_dir = os.path.join(self.tempdir, "testdir")
|
|
|
+ os.mkdir(test_dir)
|
|
|
+
|
|
|
+ # Get the index entry for a directory
|
|
|
+ entry = index_entry_from_path(os.fsencode(test_dir))
|
|
|
+
|
|
|
+ # Should return None for regular directories
|
|
|
+ self.assertIsNone(entry)
|
|
|
+
|
|
|
+ def test_index_entry_from_directory_regular(self) -> None:
|
|
|
+ """Test index_entry_from_directory with a regular directory."""
|
|
|
+ # Create a directory
|
|
|
+ test_dir = os.path.join(self.tempdir, "testdir")
|
|
|
+ os.mkdir(test_dir)
|
|
|
+
|
|
|
+ # Get stat for the directory
|
|
|
+ st = os.lstat(test_dir)
|
|
|
+
|
|
|
+ # Get the index entry for a regular directory
|
|
|
+ entry = index_entry_from_directory(st, os.fsencode(test_dir))
|
|
|
+
|
|
|
+ # Should return None for regular directories
|
|
|
+ self.assertIsNone(entry)
|
|
|
+
|
|
|
+ def test_index_entry_from_directory_git_submodule(self) -> None:
|
|
|
+ """Test index_entry_from_directory with a Git submodule."""
|
|
|
+ # Create a git repository that will be a submodule
|
|
|
+ sub_repo_dir = os.path.join(self.tempdir, "subrepo")
|
|
|
+ os.mkdir(sub_repo_dir)
|
|
|
+
|
|
|
+ # Create the .git directory to make it look like a git repo
|
|
|
+ git_dir = os.path.join(sub_repo_dir, ".git")
|
|
|
+ os.mkdir(git_dir)
|
|
|
+
|
|
|
+ # Create HEAD file with a fake commit SHA
|
|
|
+ head_sha = b"1234567890" * 4 # 40-char fake SHA
|
|
|
+ with open(os.path.join(git_dir, "HEAD"), "wb") as f:
|
|
|
+ f.write(head_sha)
|
|
|
+
|
|
|
+ # Get stat for the submodule directory
|
|
|
+ st = os.lstat(sub_repo_dir)
|
|
|
+
|
|
|
+ # Get the index entry for a git submodule directory
|
|
|
+ entry = index_entry_from_directory(st, os.fsencode(sub_repo_dir))
|
|
|
+
|
|
|
+ # Since we don't have a proper git setup, this might still return None
|
|
|
+ # This test just ensures the code path is executed
|
|
|
+ if entry is not None:
|
|
|
+ # If an entry is returned, it should have the gitlink mode
|
|
|
+ self.assertEqual(entry.mode, S_IFGITLINK)
|
|
|
+
|
|
|
+ def test_index_entry_from_path_with_object_store(self) -> None:
|
|
|
+ """Test creating index entry with object store."""
|
|
|
+ # Create a test file
|
|
|
+ test_file = os.path.join(self.tempdir, "testfile")
|
|
|
+ with open(test_file, "wb") as f:
|
|
|
+ f.write(b"test content")
|
|
|
+
|
|
|
+ # Create a memory object store
|
|
|
+ object_store = MemoryObjectStore()
|
|
|
+
|
|
|
+ # Get the index entry and add to object store
|
|
|
+ entry = index_entry_from_path(os.fsencode(test_file), object_store)
|
|
|
+
|
|
|
+ # Verify we can access the blob from the object store
|
|
|
+ self.assertIsNotNone(entry)
|
|
|
+ blob = object_store[entry.sha]
|
|
|
+ self.assertEqual(b"test content", blob.data)
|
|
|
+
|
|
|
+ def test_iter_fresh_entries(self) -> None:
|
|
|
+ """Test iterating over fresh entries."""
|
|
|
+ # Create some test files
|
|
|
+ file1 = os.path.join(self.tempdir, "file1")
|
|
|
+ with open(file1, "wb") as f:
|
|
|
+ f.write(b"file1 content")
|
|
|
+
|
|
|
+ file2 = os.path.join(self.tempdir, "file2")
|
|
|
+ with open(file2, "wb") as f:
|
|
|
+ f.write(b"file2 content")
|
|
|
+
|
|
|
+ # Create a memory object store
|
|
|
+ object_store = MemoryObjectStore()
|
|
|
+
|
|
|
+ # Get fresh entries
|
|
|
+ paths = [b"file1", b"file2", b"nonexistent"]
|
|
|
+ entries = dict(
|
|
|
+ iter_fresh_entries(paths, os.fsencode(self.tempdir), object_store)
|
|
|
+ )
|
|
|
+
|
|
|
+ # Verify both files got entries but nonexistent file is None
|
|
|
+ self.assertIn(b"file1", entries)
|
|
|
+ self.assertIn(b"file2", entries)
|
|
|
+ self.assertIn(b"nonexistent", entries)
|
|
|
+ self.assertIsNotNone(entries[b"file1"])
|
|
|
+ self.assertIsNotNone(entries[b"file2"])
|
|
|
+ self.assertIsNone(entries[b"nonexistent"])
|
|
|
+
|
|
|
+ # Check that blobs were added to object store
|
|
|
+ blob1 = object_store[entries[b"file1"].sha]
|
|
|
+ self.assertEqual(b"file1 content", blob1.data)
|
|
|
+
|
|
|
+ blob2 = object_store[entries[b"file2"].sha]
|
|
|
+ self.assertEqual(b"file2 content", blob2.data)
|
|
|
+
|
|
|
+ def test_read_submodule_head(self) -> None:
|
|
|
+ """Test reading the HEAD of a submodule."""
|
|
|
+ from dulwich.index import read_submodule_head
|
|
|
+ from dulwich.repo import Repo
|
|
|
+
|
|
|
+ # Create a test repo that will be our "submodule"
|
|
|
+ sub_repo_dir = os.path.join(self.tempdir, "subrepo")
|
|
|
+ os.mkdir(sub_repo_dir)
|
|
|
+ submodule_repo = Repo.init(sub_repo_dir)
|
|
|
+
|
|
|
+ # Create a file and commit it to establish a HEAD
|
|
|
+ test_file = os.path.join(sub_repo_dir, "testfile")
|
|
|
+ with open(test_file, "wb") as f:
|
|
|
+ f.write(b"test content")
|
|
|
+
|
|
|
+ submodule_repo.stage(["testfile"])
|
|
|
+ commit_id = submodule_repo.do_commit(b"Test commit for submodule")
|
|
|
+
|
|
|
+ # Test reading the HEAD
|
|
|
+ head_sha = read_submodule_head(sub_repo_dir)
|
|
|
+ self.assertEqual(commit_id, head_sha)
|
|
|
+
|
|
|
+ # Test with bytes path
|
|
|
+ head_sha_bytes = read_submodule_head(os.fsencode(sub_repo_dir))
|
|
|
+ self.assertEqual(commit_id, head_sha_bytes)
|
|
|
+
|
|
|
+ # Test with non-existent path
|
|
|
+ non_repo_dir = os.path.join(self.tempdir, "nonrepo")
|
|
|
+ os.mkdir(non_repo_dir)
|
|
|
+ self.assertIsNone(read_submodule_head(non_repo_dir))
|
|
|
+
|
|
|
+ # Test with path that doesn't have a .git directory
|
|
|
+ not_git_dir = os.path.join(self.tempdir, "notgit")
|
|
|
+ os.mkdir(not_git_dir)
|
|
|
+ self.assertIsNone(read_submodule_head(not_git_dir))
|
|
|
+
|
|
|
+ def test_has_directory_changed(self) -> None:
|
|
|
+ """Test checking if a directory has changed."""
|
|
|
+ from dulwich.index import IndexEntry, _has_directory_changed
|
|
|
+ from dulwich.repo import Repo
|
|
|
+
|
|
|
+ # Setup mock IndexEntry
|
|
|
+ mock_entry = IndexEntry(
|
|
|
+ (1230680220, 0),
|
|
|
+ (1230680220, 0),
|
|
|
+ 2050,
|
|
|
+ 3761020,
|
|
|
+ 33188,
|
|
|
+ 1000,
|
|
|
+ 1000,
|
|
|
+ 0,
|
|
|
+ b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Test with a regular directory (not a submodule)
|
|
|
+ reg_dir = os.path.join(self.tempdir, "regular_dir")
|
|
|
+ os.mkdir(reg_dir)
|
|
|
+
|
|
|
+ # Should return True for regular directory
|
|
|
+ self.assertTrue(_has_directory_changed(os.fsencode(reg_dir), mock_entry))
|
|
|
+
|
|
|
+ # Create a git repository to test submodule scenarios
|
|
|
+ sub_repo_dir = os.path.join(self.tempdir, "subrepo")
|
|
|
+ os.mkdir(sub_repo_dir)
|
|
|
+ submodule_repo = Repo.init(sub_repo_dir)
|
|
|
+
|
|
|
+ # Create a file and commit it to establish a HEAD
|
|
|
+ test_file = os.path.join(sub_repo_dir, "testfile")
|
|
|
+ with open(test_file, "wb") as f:
|
|
|
+ f.write(b"test content")
|
|
|
+
|
|
|
+ submodule_repo.stage(["testfile"])
|
|
|
+ commit_id = submodule_repo.do_commit(b"Test commit for submodule")
|
|
|
+
|
|
|
+ # Create an entry with the correct commit SHA
|
|
|
+ correct_entry = IndexEntry(
|
|
|
+ (1230680220, 0),
|
|
|
+ (1230680220, 0),
|
|
|
+ 2050,
|
|
|
+ 3761020,
|
|
|
+ 33188,
|
|
|
+ 1000,
|
|
|
+ 1000,
|
|
|
+ 0,
|
|
|
+ commit_id,
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Create an entry with an incorrect commit SHA
|
|
|
+ incorrect_entry = IndexEntry(
|
|
|
+ (1230680220, 0),
|
|
|
+ (1230680220, 0),
|
|
|
+ 2050,
|
|
|
+ 3761020,
|
|
|
+ 33188,
|
|
|
+ 1000,
|
|
|
+ 1000,
|
|
|
+ 0,
|
|
|
+ b"0000000000000000000000000000000000000000",
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Should return False for submodule with correct SHA
|
|
|
+ self.assertFalse(
|
|
|
+ _has_directory_changed(os.fsencode(sub_repo_dir), correct_entry)
|
|
|
+ )
|
|
|
+
|
|
|
+ # Should return True for submodule with incorrect SHA
|
|
|
+ self.assertTrue(
|
|
|
+ _has_directory_changed(os.fsencode(sub_repo_dir), incorrect_entry)
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_get_unstaged_changes(self) -> None:
|
|
|
+ """Test detecting unstaged changes in a working tree."""
|
|
|
+ from dulwich.index import (
|
|
|
+ ConflictedIndexEntry,
|
|
|
+ Index,
|
|
|
+ IndexEntry,
|
|
|
+ get_unstaged_changes,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Create a test repo
|
|
|
+ repo_dir = tempfile.mkdtemp()
|
|
|
+ self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
+
|
|
|
+ # Create test index
|
|
|
+ index = Index(os.path.join(repo_dir, "index"))
|
|
|
+
|
|
|
+ # Create an actual hash of our test content
|
|
|
+ from dulwich.objects import Blob
|
|
|
+
|
|
|
+ test_blob = Blob()
|
|
|
+ test_blob.data = b"initial content"
|
|
|
+
|
|
|
+ # Create some test files with known contents
|
|
|
+ file1_path = os.path.join(repo_dir, "file1")
|
|
|
+ with open(file1_path, "wb") as f:
|
|
|
+ f.write(b"initial content")
|
|
|
+
|
|
|
+ file2_path = os.path.join(repo_dir, "file2")
|
|
|
+ with open(file2_path, "wb") as f:
|
|
|
+ f.write(b"initial content")
|
|
|
+
|
|
|
+ # Add them to index
|
|
|
+ entry1 = IndexEntry(
|
|
|
+ (1230680220, 0),
|
|
|
+ (1230680220, 0),
|
|
|
+ 2050,
|
|
|
+ 3761020,
|
|
|
+ 33188,
|
|
|
+ 1000,
|
|
|
+ 1000,
|
|
|
+ 0,
|
|
|
+ b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", # Not matching actual content
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
+
|
|
|
+ entry2 = IndexEntry(
|
|
|
+ (1230680220, 0),
|
|
|
+ (1230680220, 0),
|
|
|
+ 2050,
|
|
|
+ 3761020,
|
|
|
+ 33188,
|
|
|
+ 1000,
|
|
|
+ 1000,
|
|
|
+ 0,
|
|
|
+ test_blob.id, # Will be content's real hash
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Add a file that has a conflict
|
|
|
+ entry_conflict = ConflictedIndexEntry(b"conflict", {0: None, 1: None, 2: None})
|
|
|
+
|
|
|
+ index._byname = {
|
|
|
+ b"file1": entry1,
|
|
|
+ b"file2": entry2,
|
|
|
+ b"file3": IndexEntry(
|
|
|
+ (1230680220, 0),
|
|
|
+ (1230680220, 0),
|
|
|
+ 2050,
|
|
|
+ 3761020,
|
|
|
+ 33188,
|
|
|
+ 1000,
|
|
|
+ 1000,
|
|
|
+ 0,
|
|
|
+ b"0000000000000000000000000000000000000000",
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ ),
|
|
|
+ b"conflict": entry_conflict,
|
|
|
+ }
|
|
|
+
|
|
|
+ # Get unstaged changes
|
|
|
+ changes = list(get_unstaged_changes(index, repo_dir))
|
|
|
+
|
|
|
+ # File1 should be unstaged (content doesn't match hash)
|
|
|
+ # File3 doesn't exist (deleted)
|
|
|
+ # Conflict is always unstaged
|
|
|
+ self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3"])
|
|
|
+
|
|
|
+ # Create directory where there should be a file
|
|
|
+ os.mkdir(os.path.join(repo_dir, "file4"))
|
|
|
+ index._byname[b"file4"] = entry1
|
|
|
+
|
|
|
+ # Get unstaged changes again
|
|
|
+ changes = list(get_unstaged_changes(index, repo_dir))
|
|
|
+
|
|
|
+ # Now file4 should also be unstaged because it's a directory instead of a file
|
|
|
+ self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3", b"file4"])
|
|
|
+
|
|
|
+ # Create a custom blob filter function
|
|
|
+ def filter_blob_callback(blob, path):
|
|
|
+ # Modify blob to make it look changed
|
|
|
+ blob.data = b"modified " + blob.data
|
|
|
+ return blob
|
|
|
+
|
|
|
+ # Get unstaged changes with blob filter
|
|
|
+ changes = list(get_unstaged_changes(index, repo_dir, filter_blob_callback))
|
|
|
+
|
|
|
+ # Now both file1 and file2 should be unstaged due to the filter
|
|
|
+ self.assertEqual(
|
|
|
+ sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
|
|
|
+ )
|