test_index.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # or (at your option) any later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for the index."""
  19. from contextlib import closing
  20. from io import BytesIO
  21. import os
  22. import shutil
  23. import stat
  24. import struct
  25. import sys
  26. import tempfile
  27. from dulwich.index import (
  28. Index,
  29. build_index_from_tree,
  30. cleanup_mode,
  31. commit_tree,
  32. get_unstaged_changes,
  33. index_entry_from_stat,
  34. read_index,
  35. read_index_dict,
  36. validate_path_element_default,
  37. validate_path_element_ntfs,
  38. write_cache_time,
  39. write_index,
  40. write_index_dict,
  41. )
  42. from dulwich.object_store import (
  43. MemoryObjectStore,
  44. )
  45. from dulwich.objects import (
  46. Blob,
  47. Tree,
  48. )
  49. from dulwich.repo import Repo
  50. from dulwich.tests import (
  51. TestCase,
  52. skipIf,
  53. )
  54. class IndexTestCase(TestCase):
  55. datadir = os.path.join(os.path.dirname(__file__), 'data/indexes')
  56. def get_simple_index(self, name):
  57. return Index(os.path.join(self.datadir, name))
  58. class SimpleIndexTestCase(IndexTestCase):
  59. def test_len(self):
  60. self.assertEqual(1, len(self.get_simple_index("index")))
  61. def test_iter(self):
  62. self.assertEqual([b'bla'], list(self.get_simple_index("index")))
  63. def test_getitem(self):
  64. self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020,
  65. 33188, 1000, 1000, 0,
  66. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
  67. self.get_simple_index("index")[b"bla"])
  68. def test_empty(self):
  69. i = self.get_simple_index("notanindex")
  70. self.assertEqual(0, len(i))
  71. self.assertFalse(os.path.exists(i._filename))
  72. def test_against_empty_tree(self):
  73. i = self.get_simple_index("index")
  74. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  75. self.assertEqual(1, len(changes))
  76. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  77. self.assertEqual(b'bla', newname)
  78. self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha)
  79. class SimpleIndexWriterTestCase(IndexTestCase):
  80. def setUp(self):
  81. IndexTestCase.setUp(self)
  82. self.tempdir = tempfile.mkdtemp()
  83. def tearDown(self):
  84. IndexTestCase.tearDown(self)
  85. shutil.rmtree(self.tempdir)
  86. def test_simple_write(self):
  87. entries = [(b'barbla', (1230680220, 0), (1230680220, 0), 2050, 3761020,
  88. 33188, 1000, 1000, 0,
  89. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)]
  90. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  91. with open(filename, 'wb+') as x:
  92. write_index(x, entries)
  93. with open(filename, 'rb') as x:
  94. self.assertEqual(entries, list(read_index(x)))
  95. class ReadIndexDictTests(IndexTestCase):
  96. def setUp(self):
  97. IndexTestCase.setUp(self)
  98. self.tempdir = tempfile.mkdtemp()
  99. def tearDown(self):
  100. IndexTestCase.tearDown(self)
  101. shutil.rmtree(self.tempdir)
  102. def test_simple_write(self):
  103. entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020,
  104. 33188, 1000, 1000, 0,
  105. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
  106. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  107. with open(filename, 'wb+') as x:
  108. write_index_dict(x, entries)
  109. with open(filename, 'rb') as x:
  110. self.assertEqual(entries, read_index_dict(x))
  111. class CommitTreeTests(TestCase):
  112. def setUp(self):
  113. super(CommitTreeTests, self).setUp()
  114. self.store = MemoryObjectStore()
  115. def test_single_blob(self):
  116. blob = Blob()
  117. blob.data = b"foo"
  118. self.store.add_object(blob)
  119. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  120. rootid = commit_tree(self.store, blobs)
  121. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  122. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  123. self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys()))
  124. def test_nested(self):
  125. blob = Blob()
  126. blob.data = b"foo"
  127. self.store.add_object(blob)
  128. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  129. rootid = commit_tree(self.store, blobs)
  130. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  131. dirid = self.store[rootid][b"bla"][1]
  132. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  133. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  134. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  135. self.assertEqual(set([rootid, dirid, blob.id]),
  136. set(self.store._data.keys()))
  137. class CleanupModeTests(TestCase):
  138. def test_file(self):
  139. self.assertEqual(0o100644, cleanup_mode(0o100000))
  140. def test_executable(self):
  141. self.assertEqual(0o100755, cleanup_mode(0o100711))
  142. def test_symlink(self):
  143. self.assertEqual(0o120000, cleanup_mode(0o120711))
  144. def test_dir(self):
  145. self.assertEqual(0o040000, cleanup_mode(0o40531))
  146. def test_submodule(self):
  147. self.assertEqual(0o160000, cleanup_mode(0o160744))
  148. class WriteCacheTimeTests(TestCase):
  149. def test_write_string(self):
  150. f = BytesIO()
  151. self.assertRaises(TypeError, write_cache_time, f, "foo")
  152. def test_write_int(self):
  153. f = BytesIO()
  154. write_cache_time(f, 434343)
  155. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  156. def test_write_tuple(self):
  157. f = BytesIO()
  158. write_cache_time(f, (434343, 21))
  159. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  160. def test_write_float(self):
  161. f = BytesIO()
  162. write_cache_time(f, 434343.000000021)
  163. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  164. class IndexEntryFromStatTests(TestCase):
  165. def test_simple(self):
  166. st = os.stat_result((16877, 131078, 64769,
  167. 154, 1000, 1000, 12288,
  168. 1323629595, 1324180496, 1324180496))
  169. entry = index_entry_from_stat(st, "22" * 20, 0)
  170. self.assertEqual(entry, (
  171. 1324180496,
  172. 1324180496,
  173. 64769,
  174. 131078,
  175. 16384,
  176. 1000,
  177. 1000,
  178. 12288,
  179. '2222222222222222222222222222222222222222',
  180. 0))
  181. def test_override_mode(self):
  182. st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769,
  183. 154, 1000, 1000, 12288,
  184. 1323629595, 1324180496, 1324180496))
  185. entry = index_entry_from_stat(st, "22" * 20, 0,
  186. mode=stat.S_IFREG + 0o755)
  187. self.assertEqual(entry, (
  188. 1324180496,
  189. 1324180496,
  190. 64769,
  191. 131078,
  192. 33261,
  193. 1000,
  194. 1000,
  195. 12288,
  196. '2222222222222222222222222222222222222222',
  197. 0))
  198. class BuildIndexTests(TestCase):
  199. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha):
  200. self.assertEqual(index_entry[4], mode) # mode
  201. self.assertEqual(index_entry[7], filesize) # filesize
  202. self.assertEqual(index_entry[8], sha) # sha
  203. def assertFileContents(self, path, contents, symlink=False):
  204. if symlink:
  205. self.assertEqual(os.readlink(path), contents)
  206. else:
  207. with open(path, 'rb') as f:
  208. self.assertEqual(f.read(), contents)
  209. def test_empty(self):
  210. repo_dir = tempfile.mkdtemp()
  211. self.addCleanup(shutil.rmtree, repo_dir)
  212. with closing(Repo.init(repo_dir)) as repo:
  213. tree = Tree()
  214. repo.object_store.add_object(tree)
  215. build_index_from_tree(repo.path, repo.index_path(),
  216. repo.object_store, tree.id)
  217. # Verify index entries
  218. index = repo.open_index()
  219. self.assertEqual(len(index), 0)
  220. # Verify no files
  221. self.assertEqual(['.git'], os.listdir(repo.path))
  222. def test_git_dir(self):
  223. repo_dir = tempfile.mkdtemp()
  224. self.addCleanup(shutil.rmtree, repo_dir)
  225. with closing(Repo.init(repo_dir)) as repo:
  226. # Populate repo
  227. filea = Blob.from_string(b'file a')
  228. filee = Blob.from_string(b'd')
  229. tree = Tree()
  230. tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id)
  231. tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id)
  232. repo.object_store.add_objects([(o, None)
  233. for o in [filea, filee, tree]])
  234. build_index_from_tree(repo.path, repo.index_path(),
  235. repo.object_store, tree.id)
  236. # Verify index entries
  237. index = repo.open_index()
  238. self.assertEqual(len(index), 1)
  239. # filea
  240. apath = os.path.join(repo.path, '.git', 'a')
  241. self.assertFalse(os.path.exists(apath))
  242. # filee
  243. epath = os.path.join(repo.path, 'c', 'e')
  244. self.assertTrue(os.path.exists(epath))
  245. self.assertReasonableIndexEntry(index[b'c/e'],
  246. stat.S_IFREG | 0o644, 1, filee.id)
  247. self.assertFileContents(epath, b'd')
  248. def test_nonempty(self):
  249. repo_dir = tempfile.mkdtemp()
  250. self.addCleanup(shutil.rmtree, repo_dir)
  251. with closing(Repo.init(repo_dir)) as repo:
  252. # Populate repo
  253. filea = Blob.from_string(b'file a')
  254. fileb = Blob.from_string(b'file b')
  255. filed = Blob.from_string(b'file d')
  256. tree = Tree()
  257. tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
  258. tree[b'b'] = (stat.S_IFREG | 0o644, fileb.id)
  259. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  260. repo.object_store.add_objects([(o, None)
  261. for o in [filea, fileb, filed, tree]])
  262. build_index_from_tree(repo.path, repo.index_path(),
  263. repo.object_store, tree.id)
  264. # Verify index entries
  265. index = repo.open_index()
  266. self.assertEqual(len(index), 3)
  267. # filea
  268. apath = os.path.join(repo.path, 'a')
  269. self.assertTrue(os.path.exists(apath))
  270. self.assertReasonableIndexEntry(index[b'a'],
  271. stat.S_IFREG | 0o644, 6, filea.id)
  272. self.assertFileContents(apath, b'file a')
  273. # fileb
  274. bpath = os.path.join(repo.path, 'b')
  275. self.assertTrue(os.path.exists(bpath))
  276. self.assertReasonableIndexEntry(index[b'b'],
  277. stat.S_IFREG | 0o644, 6, fileb.id)
  278. self.assertFileContents(bpath, b'file b')
  279. # filed
  280. dpath = os.path.join(repo.path, 'c', 'd')
  281. self.assertTrue(os.path.exists(dpath))
  282. self.assertReasonableIndexEntry(index[b'c/d'],
  283. stat.S_IFREG | 0o644, 6, filed.id)
  284. self.assertFileContents(dpath, b'file d')
  285. # Verify no extra files
  286. self.assertEqual(['.git', 'a', 'b', 'c'],
  287. sorted(os.listdir(repo.path)))
  288. self.assertEqual(['d'],
  289. sorted(os.listdir(os.path.join(repo.path, 'c'))))
  290. @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
  291. def test_symlink(self):
  292. repo_dir = tempfile.mkdtemp()
  293. self.addCleanup(shutil.rmtree, repo_dir)
  294. with closing(Repo.init(repo_dir)) as repo:
  295. # Populate repo
  296. filed = Blob.from_string(b'file d')
  297. filee = Blob.from_string(b'd')
  298. tree = Tree()
  299. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  300. tree[b'c/e'] = (stat.S_IFLNK, filee.id) # symlink
  301. repo.object_store.add_objects([(o, None)
  302. for o in [filed, filee, tree]])
  303. build_index_from_tree(repo.path, repo.index_path(),
  304. repo.object_store, tree.id)
  305. # Verify index entries
  306. index = repo.open_index()
  307. # symlink to d
  308. epath = os.path.join(repo.path, 'c', 'e')
  309. self.assertTrue(os.path.exists(epath))
  310. self.assertReasonableIndexEntry(
  311. index[b'c/e'], stat.S_IFLNK,
  312. 0 if sys.platform == 'win32' else 1,
  313. filee.id)
  314. self.assertFileContents(epath, 'd', symlink=True)
  315. class GetUnstagedChangesTests(TestCase):
  316. def test_get_unstaged_changes(self):
  317. """Unit test for get_unstaged_changes."""
  318. repo_dir = tempfile.mkdtemp()
  319. self.addCleanup(shutil.rmtree, repo_dir)
  320. with closing(Repo.init(repo_dir)) as repo:
  321. # Commit a dummy file then modify it
  322. foo1_fullpath = os.path.join(repo_dir, 'foo1')
  323. with open(foo1_fullpath, 'wb') as f:
  324. f.write(b'origstuff')
  325. foo2_fullpath = os.path.join(repo_dir, 'foo2')
  326. with open(foo2_fullpath, 'wb') as f:
  327. f.write(b'origstuff')
  328. repo.stage(['foo1', 'foo2'])
  329. repo.do_commit(b'test status', author=b'', committer=b'')
  330. with open(foo1_fullpath, 'wb') as f:
  331. f.write(b'newstuff')
  332. # modify access and modify time of path
  333. os.utime(foo1_fullpath, (0, 0))
  334. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  335. self.assertEqual(list(changes), [b'foo1'])
  336. class TestValidatePathElement(TestCase):
  337. def test_default(self):
  338. self.assertTrue(validate_path_element_default(b"bla"))
  339. self.assertTrue(validate_path_element_default(b".bla"))
  340. self.assertFalse(validate_path_element_default(b".git"))
  341. self.assertFalse(validate_path_element_default(b".giT"))
  342. self.assertFalse(validate_path_element_default(b".."))
  343. self.assertTrue(validate_path_element_default(b"git~1"))
  344. def test_ntfs(self):
  345. self.assertTrue(validate_path_element_ntfs(b"bla"))
  346. self.assertTrue(validate_path_element_ntfs(b".bla"))
  347. self.assertFalse(validate_path_element_ntfs(b".git"))
  348. self.assertFalse(validate_path_element_ntfs(b".giT"))
  349. self.assertFalse(validate_path_element_ntfs(b".."))
  350. self.assertFalse(validate_path_element_ntfs(b"git~1"))