|
@@ -23,7 +23,6 @@
|
|
|
"""Tests for the index."""
|
|
|
|
|
|
|
|
|
-from contextlib import closing
|
|
|
from io import BytesIO
|
|
|
import os
|
|
|
import shutil
|
|
@@ -54,7 +53,9 @@ from dulwich.object_store import (
|
|
|
)
|
|
|
from dulwich.objects import (
|
|
|
Blob,
|
|
|
+ Commit,
|
|
|
Tree,
|
|
|
+ S_IFGITLINK,
|
|
|
)
|
|
|
from dulwich.repo import Repo
|
|
|
from dulwich.tests import (
|
|
@@ -267,7 +268,7 @@ class BuildIndexTests(TestCase):
|
|
|
def test_empty(self):
|
|
|
repo_dir = tempfile.mkdtemp()
|
|
|
self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
- with closing(Repo.init(repo_dir)) as repo:
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
tree = Tree()
|
|
|
repo.object_store.add_object(tree)
|
|
|
|
|
@@ -284,7 +285,7 @@ class BuildIndexTests(TestCase):
|
|
|
def test_git_dir(self):
|
|
|
repo_dir = tempfile.mkdtemp()
|
|
|
self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
- with closing(Repo.init(repo_dir)) as repo:
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
|
|
|
# Populate repo
|
|
|
filea = Blob.from_string(b'file a')
|
|
@@ -318,7 +319,7 @@ class BuildIndexTests(TestCase):
|
|
|
def test_nonempty(self):
|
|
|
repo_dir = tempfile.mkdtemp()
|
|
|
self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
- with closing(Repo.init(repo_dir)) as repo:
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
|
|
|
# Populate repo
|
|
|
filea = Blob.from_string(b'file a')
|
|
@@ -367,11 +368,51 @@ class BuildIndexTests(TestCase):
|
|
|
self.assertEqual(['d'],
|
|
|
sorted(os.listdir(os.path.join(repo.path, 'c'))))
|
|
|
|
|
|
+ def test_norewrite(self):
|
|
|
+ sync = getattr(os, 'sync', lambda: os.system('sync'))
|
|
|
+ repo_dir = tempfile.mkdtemp()
|
|
|
+ self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
+ # Populate repo
|
|
|
+ filea = Blob.from_string(b'file a')
|
|
|
+ filea_path = os.path.join(repo_dir, 'a')
|
|
|
+ tree = Tree()
|
|
|
+ tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
|
|
|
+
|
|
|
+ repo.object_store.add_objects([(o, None)
|
|
|
+ for o in [filea, tree]])
|
|
|
+
|
|
|
+ # First Write
|
|
|
+ build_index_from_tree(repo.path, repo.index_path(),
|
|
|
+ repo.object_store, tree.id)
|
|
|
+ # Use sync as metadata can be cached on some FS
|
|
|
+ sync()
|
|
|
+ mtime = os.stat(filea_path).st_mtime
|
|
|
+
|
|
|
+ # Test Rewrite
|
|
|
+ build_index_from_tree(repo.path, repo.index_path(),
|
|
|
+ repo.object_store, tree.id)
|
|
|
+ sync()
|
|
|
+ self.assertEqual(mtime, os.stat(filea_path).st_mtime)
|
|
|
+
|
|
|
+ # Modify content
|
|
|
+ with open(filea_path, 'wb') as fh:
|
|
|
+ fh.write(b'test a')
|
|
|
+ sync()
|
|
|
+ mtime = os.stat(filea_path).st_mtime
|
|
|
+
|
|
|
+ # Test rewrite
|
|
|
+ build_index_from_tree(repo.path, repo.index_path(),
|
|
|
+ repo.object_store, tree.id)
|
|
|
+ sync()
|
|
|
+ self.assertNotEqual(mtime, os.stat(filea_path).st_mtime)
|
|
|
+
|
|
|
+
|
|
|
@skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
|
|
|
def test_symlink(self):
|
|
|
repo_dir = tempfile.mkdtemp()
|
|
|
self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
- with closing(Repo.init(repo_dir)) as repo:
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
|
|
|
# Populate repo
|
|
|
filed = Blob.from_string(b'file d')
|
|
@@ -403,7 +444,7 @@ class BuildIndexTests(TestCase):
|
|
|
repo_dir = tempfile.mkdtemp()
|
|
|
repo_dir_bytes = repo_dir.encode(sys.getfilesystemencoding())
|
|
|
self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
- with closing(Repo.init(repo_dir)) as repo:
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
|
|
|
# Populate repo
|
|
|
file = Blob.from_string(b'foo')
|
|
@@ -430,6 +471,46 @@ class BuildIndexTests(TestCase):
|
|
|
utf8_path = os.path.join(repo_dir_bytes, utf8_name)
|
|
|
self.assertTrue(os.path.exists(utf8_path))
|
|
|
|
|
|
+ def test_git_submodule(self):
|
|
|
+ repo_dir = tempfile.mkdtemp()
|
|
|
+ self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
+ filea = Blob.from_string(b'file alalala')
|
|
|
+
|
|
|
+ subtree = Tree()
|
|
|
+ subtree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
|
|
|
+
|
|
|
+ c = Commit()
|
|
|
+ c.tree = subtree.id
|
|
|
+ c.committer = c.author = b'Somebody <somebody@example.com>'
|
|
|
+ c.commit_time = c.author_time = 42342
|
|
|
+ c.commit_timezone = c.author_timezone = 0
|
|
|
+ c.parents = []
|
|
|
+ c.message = b'Subcommit'
|
|
|
+
|
|
|
+ tree = Tree()
|
|
|
+ tree[b'c'] = (S_IFGITLINK, c.id)
|
|
|
+
|
|
|
+ repo.object_store.add_objects(
|
|
|
+ [(o, None) for o in [tree]])
|
|
|
+
|
|
|
+ build_index_from_tree(repo.path, repo.index_path(),
|
|
|
+ repo.object_store, tree.id)
|
|
|
+
|
|
|
+ # Verify index entries
|
|
|
+ index = repo.open_index()
|
|
|
+ self.assertEqual(len(index), 1)
|
|
|
+
|
|
|
+ # filea
|
|
|
+ apath = os.path.join(repo.path, 'c/a')
|
|
|
+ self.assertFalse(os.path.exists(apath))
|
|
|
+
|
|
|
+ # dir c
|
|
|
+ cpath = os.path.join(repo.path, 'c')
|
|
|
+ self.assertTrue(os.path.isdir(cpath))
|
|
|
+ self.assertEqual(index[b'c'][4], S_IFGITLINK) # mode
|
|
|
+ self.assertEqual(index[b'c'][8], c.id) # sha
|
|
|
+
|
|
|
|
|
|
class GetUnstagedChangesTests(TestCase):
|
|
|
|
|
@@ -438,7 +519,7 @@ class GetUnstagedChangesTests(TestCase):
|
|
|
|
|
|
repo_dir = tempfile.mkdtemp()
|
|
|
self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
- with closing(Repo.init(repo_dir)) as repo:
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
|
|
|
# Commit a dummy file then modify it
|
|
|
foo1_fullpath = os.path.join(repo_dir, 'foo1')
|
|
@@ -462,6 +543,27 @@ class GetUnstagedChangesTests(TestCase):
|
|
|
|
|
|
self.assertEqual(list(changes), [b'foo1'])
|
|
|
|
|
|
+ def test_get_unstaged_deleted_changes(self):
|
|
|
+ """Unit test for get_unstaged_changes."""
|
|
|
+
|
|
|
+ repo_dir = tempfile.mkdtemp()
|
|
|
+ self.addCleanup(shutil.rmtree, repo_dir)
|
|
|
+ with Repo.init(repo_dir) as repo:
|
|
|
+
|
|
|
+ # Commit a dummy file then remove it
|
|
|
+ foo1_fullpath = os.path.join(repo_dir, 'foo1')
|
|
|
+ with open(foo1_fullpath, 'wb') as f:
|
|
|
+ f.write(b'origstuff')
|
|
|
+
|
|
|
+ repo.stage(['foo1'])
|
|
|
+ repo.do_commit(b'test status', author=b'', committer=b'')
|
|
|
+
|
|
|
+ os.unlink(foo1_fullpath)
|
|
|
+
|
|
|
+ changes = get_unstaged_changes(repo.open_index(), repo_dir)
|
|
|
+
|
|
|
+ self.assertEqual(list(changes), [b'foo1'])
|
|
|
+
|
|
|
|
|
|
class TestValidatePathElement(TestCase):
|
|
|
|