test_index.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # or (at your option) any later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for the index."""
  19. from contextlib import closing
  20. from io import BytesIO
  21. import os
  22. import shutil
  23. import stat
  24. import struct
  25. import tempfile
  26. from dulwich.index import (
  27. Index,
  28. build_index_from_tree,
  29. cleanup_mode,
  30. commit_tree,
  31. get_unstaged_changes,
  32. index_entry_from_stat,
  33. read_index,
  34. read_index_dict,
  35. validate_path_element_default,
  36. validate_path_element_ntfs,
  37. write_cache_time,
  38. write_index,
  39. write_index_dict,
  40. )
  41. from dulwich.object_store import (
  42. MemoryObjectStore,
  43. )
  44. from dulwich.objects import (
  45. Blob,
  46. Tree,
  47. )
  48. from dulwich.repo import Repo
  49. from dulwich.tests import (
  50. TestCase,
  51. skipIf,
  52. )
  53. class IndexTestCase(TestCase):
  54. datadir = os.path.join(os.path.dirname(__file__), 'data/indexes')
  55. def get_simple_index(self, name):
  56. return Index(os.path.join(self.datadir, name))
  57. class SimpleIndexTestCase(IndexTestCase):
  58. def test_len(self):
  59. self.assertEqual(1, len(self.get_simple_index("index")))
  60. def test_iter(self):
  61. self.assertEqual([b'bla'], list(self.get_simple_index("index")))
  62. def test_getitem(self):
  63. self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020,
  64. 33188, 1000, 1000, 0,
  65. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
  66. self.get_simple_index("index")[b"bla"])
  67. def test_empty(self):
  68. i = self.get_simple_index("notanindex")
  69. self.assertEqual(0, len(i))
  70. self.assertFalse(os.path.exists(i._filename))
  71. def test_against_empty_tree(self):
  72. i = self.get_simple_index("index")
  73. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  74. self.assertEqual(1, len(changes))
  75. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  76. self.assertEqual(b'bla', newname)
  77. self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha)
  78. class SimpleIndexWriterTestCase(IndexTestCase):
  79. def setUp(self):
  80. IndexTestCase.setUp(self)
  81. self.tempdir = tempfile.mkdtemp()
  82. def tearDown(self):
  83. IndexTestCase.tearDown(self)
  84. shutil.rmtree(self.tempdir)
  85. def test_simple_write(self):
  86. entries = [(b'barbla', (1230680220, 0), (1230680220, 0), 2050, 3761020,
  87. 33188, 1000, 1000, 0,
  88. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)]
  89. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  90. with open(filename, 'wb+') as x:
  91. write_index(x, entries)
  92. with open(filename, 'rb') as x:
  93. self.assertEqual(entries, list(read_index(x)))
  94. class ReadIndexDictTests(IndexTestCase):
  95. def setUp(self):
  96. IndexTestCase.setUp(self)
  97. self.tempdir = tempfile.mkdtemp()
  98. def tearDown(self):
  99. IndexTestCase.tearDown(self)
  100. shutil.rmtree(self.tempdir)
  101. def test_simple_write(self):
  102. entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020,
  103. 33188, 1000, 1000, 0,
  104. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
  105. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  106. with open(filename, 'wb+') as x:
  107. write_index_dict(x, entries)
  108. with open(filename, 'rb') as x:
  109. self.assertEqual(entries, read_index_dict(x))
  110. class CommitTreeTests(TestCase):
  111. def setUp(self):
  112. super(CommitTreeTests, self).setUp()
  113. self.store = MemoryObjectStore()
  114. def test_single_blob(self):
  115. blob = Blob()
  116. blob.data = b"foo"
  117. self.store.add_object(blob)
  118. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  119. rootid = commit_tree(self.store, blobs)
  120. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  121. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  122. self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys()))
  123. def test_nested(self):
  124. blob = Blob()
  125. blob.data = b"foo"
  126. self.store.add_object(blob)
  127. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  128. rootid = commit_tree(self.store, blobs)
  129. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  130. dirid = self.store[rootid][b"bla"][1]
  131. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  132. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  133. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  134. self.assertEqual(set([rootid, dirid, blob.id]),
  135. set(self.store._data.keys()))
  136. class CleanupModeTests(TestCase):
  137. def test_file(self):
  138. self.assertEqual(0o100644, cleanup_mode(0o100000))
  139. def test_executable(self):
  140. self.assertEqual(0o100755, cleanup_mode(0o100711))
  141. def test_symlink(self):
  142. self.assertEqual(0o120000, cleanup_mode(0o120711))
  143. def test_dir(self):
  144. self.assertEqual(0o040000, cleanup_mode(0o40531))
  145. def test_submodule(self):
  146. self.assertEqual(0o160000, cleanup_mode(0o160744))
  147. class WriteCacheTimeTests(TestCase):
  148. def test_write_string(self):
  149. f = BytesIO()
  150. self.assertRaises(TypeError, write_cache_time, f, "foo")
  151. def test_write_int(self):
  152. f = BytesIO()
  153. write_cache_time(f, 434343)
  154. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  155. def test_write_tuple(self):
  156. f = BytesIO()
  157. write_cache_time(f, (434343, 21))
  158. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  159. def test_write_float(self):
  160. f = BytesIO()
  161. write_cache_time(f, 434343.000000021)
  162. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  163. class IndexEntryFromStatTests(TestCase):
  164. def test_simple(self):
  165. st = os.stat_result((16877, 131078, 64769,
  166. 154, 1000, 1000, 12288,
  167. 1323629595, 1324180496, 1324180496))
  168. entry = index_entry_from_stat(st, "22" * 20, 0)
  169. self.assertEqual(entry, (
  170. 1324180496,
  171. 1324180496,
  172. 64769,
  173. 131078,
  174. 16384,
  175. 1000,
  176. 1000,
  177. 12288,
  178. '2222222222222222222222222222222222222222',
  179. 0))
  180. def test_override_mode(self):
  181. st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769,
  182. 154, 1000, 1000, 12288,
  183. 1323629595, 1324180496, 1324180496))
  184. entry = index_entry_from_stat(st, "22" * 20, 0,
  185. mode=stat.S_IFREG + 0o755)
  186. self.assertEqual(entry, (
  187. 1324180496,
  188. 1324180496,
  189. 64769,
  190. 131078,
  191. 33261,
  192. 1000,
  193. 1000,
  194. 12288,
  195. '2222222222222222222222222222222222222222',
  196. 0))
  197. class BuildIndexTests(TestCase):
  198. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha):
  199. self.assertEqual(index_entry[4], mode) # mode
  200. self.assertEqual(index_entry[7], filesize) # filesize
  201. self.assertEqual(index_entry[8], sha) # sha
  202. def assertFileContents(self, path, contents, symlink=False):
  203. if symlink:
  204. self.assertEqual(os.readlink(path), contents)
  205. else:
  206. with open(path, 'rb') as f:
  207. self.assertEqual(f.read(), contents)
  208. def test_empty(self):
  209. repo_dir = tempfile.mkdtemp()
  210. self.addCleanup(shutil.rmtree, repo_dir)
  211. with closing(Repo.init(repo_dir)) as repo:
  212. tree = Tree()
  213. repo.object_store.add_object(tree)
  214. build_index_from_tree(repo.path, repo.index_path(),
  215. repo.object_store, tree.id)
  216. # Verify index entries
  217. index = repo.open_index()
  218. self.assertEqual(len(index), 0)
  219. # Verify no files
  220. self.assertEqual(['.git'], os.listdir(repo.path))
  221. @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
  222. def test_git_dir(self):
  223. repo_dir = tempfile.mkdtemp()
  224. self.addCleanup(shutil.rmtree, repo_dir)
  225. with closing(Repo.init(repo_dir)) as repo:
  226. # Populate repo
  227. filea = Blob.from_string(b'file a')
  228. filee = Blob.from_string(b'd')
  229. tree = Tree()
  230. tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id)
  231. tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id)
  232. repo.object_store.add_objects([(o, None)
  233. for o in [filea, filee, tree]])
  234. build_index_from_tree(repo.path, repo.index_path(),
  235. repo.object_store, tree.id)
  236. # Verify index entries
  237. index = repo.open_index()
  238. self.assertEqual(len(index), 1)
  239. # filea
  240. apath = os.path.join(repo.path, '.git', 'a')
  241. self.assertFalse(os.path.exists(apath))
  242. # filee
  243. epath = os.path.join(repo.path, 'c', 'e')
  244. self.assertTrue(os.path.exists(epath))
  245. self.assertReasonableIndexEntry(index[b'c/e'],
  246. stat.S_IFREG | 0o644, 1, filee.id)
  247. self.assertFileContents(epath, b'd')
  248. @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
  249. def test_nonempty(self):
  250. repo_dir = tempfile.mkdtemp()
  251. self.addCleanup(shutil.rmtree, repo_dir)
  252. with closing(Repo.init(repo_dir)) as repo:
  253. # Populate repo
  254. filea = Blob.from_string(b'file a')
  255. fileb = Blob.from_string(b'file b')
  256. filed = Blob.from_string(b'file d')
  257. filee = Blob.from_string(b'd')
  258. tree = Tree()
  259. tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
  260. tree[b'b'] = (stat.S_IFREG | 0o644, fileb.id)
  261. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  262. tree[b'c/e'] = (stat.S_IFLNK, filee.id) # symlink
  263. repo.object_store.add_objects([(o, None)
  264. for o in [filea, fileb, filed, filee, tree]])
  265. build_index_from_tree(repo.path, repo.index_path(),
  266. repo.object_store, tree.id)
  267. # Verify index entries
  268. index = repo.open_index()
  269. self.assertEqual(len(index), 4)
  270. # filea
  271. apath = os.path.join(repo.path, 'a')
  272. self.assertTrue(os.path.exists(apath))
  273. self.assertReasonableIndexEntry(index[b'a'],
  274. stat.S_IFREG | 0o644, 6, filea.id)
  275. self.assertFileContents(apath, b'file a')
  276. # fileb
  277. bpath = os.path.join(repo.path, 'b')
  278. self.assertTrue(os.path.exists(bpath))
  279. self.assertReasonableIndexEntry(index[b'b'],
  280. stat.S_IFREG | 0o644, 6, fileb.id)
  281. self.assertFileContents(bpath, b'file b')
  282. # filed
  283. dpath = os.path.join(repo.path, 'c', 'd')
  284. self.assertTrue(os.path.exists(dpath))
  285. self.assertReasonableIndexEntry(index[b'c/d'],
  286. stat.S_IFREG | 0o644, 6, filed.id)
  287. self.assertFileContents(dpath, b'file d')
  288. # symlink to d
  289. epath = os.path.join(repo.path, 'c', 'e')
  290. self.assertTrue(os.path.exists(epath))
  291. self.assertReasonableIndexEntry(index[b'c/e'],
  292. stat.S_IFLNK, 1, filee.id)
  293. self.assertFileContents(epath, 'd', symlink=True)
  294. # Verify no extra files
  295. self.assertEqual(['.git', 'a', 'b', 'c'],
  296. sorted(os.listdir(repo.path)))
  297. self.assertEqual(['d', 'e'],
  298. sorted(os.listdir(os.path.join(repo.path, 'c'))))
  299. class GetUnstagedChangesTests(TestCase):
  300. def test_get_unstaged_changes(self):
  301. """Unit test for get_unstaged_changes."""
  302. repo_dir = tempfile.mkdtemp()
  303. self.addCleanup(shutil.rmtree, repo_dir)
  304. with closing(Repo.init(repo_dir)) as repo:
  305. # Commit a dummy file then modify it
  306. foo1_fullpath = os.path.join(repo_dir, 'foo1')
  307. with open(foo1_fullpath, 'wb') as f:
  308. f.write(b'origstuff')
  309. foo2_fullpath = os.path.join(repo_dir, 'foo2')
  310. with open(foo2_fullpath, 'wb') as f:
  311. f.write(b'origstuff')
  312. repo.stage(['foo1', 'foo2'])
  313. repo.do_commit(b'test status', author=b'', committer=b'')
  314. with open(foo1_fullpath, 'wb') as f:
  315. f.write(b'newstuff')
  316. # modify access and modify time of path
  317. os.utime(foo1_fullpath, (0, 0))
  318. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  319. self.assertEqual(list(changes), [b'foo1'])
  320. class TestValidatePathElement(TestCase):
  321. def test_default(self):
  322. self.assertTrue(validate_path_element_default(b"bla"))
  323. self.assertTrue(validate_path_element_default(b".bla"))
  324. self.assertFalse(validate_path_element_default(b".git"))
  325. self.assertFalse(validate_path_element_default(b".giT"))
  326. self.assertFalse(validate_path_element_default(b".."))
  327. self.assertTrue(validate_path_element_default(b"git~1"))
  328. def test_ntfs(self):
  329. self.assertTrue(validate_path_element_ntfs(b"bla"))
  330. self.assertTrue(validate_path_element_ntfs(b".bla"))
  331. self.assertFalse(validate_path_element_ntfs(b".git"))
  332. self.assertFalse(validate_path_element_ntfs(b".giT"))
  333. self.assertFalse(validate_path_element_ntfs(b".."))
  334. self.assertFalse(validate_path_element_ntfs(b"git~1"))