test_index.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. # test_index.py -- Tests for the git index
  2. # encoding: utf-8
  3. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU General Public License
  7. # as published by the Free Software Foundation; version 2
  8. # or (at your option) any later version of the License.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for the index."""
  20. from contextlib import closing
  21. from io import BytesIO
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from dulwich.index import (
  29. Index,
  30. build_index_from_tree,
  31. cleanup_mode,
  32. commit_tree,
  33. get_unstaged_changes,
  34. index_entry_from_stat,
  35. read_index,
  36. read_index_dict,
  37. validate_path_element_default,
  38. validate_path_element_ntfs,
  39. write_cache_time,
  40. write_index,
  41. write_index_dict,
  42. )
  43. from dulwich.object_store import (
  44. MemoryObjectStore,
  45. )
  46. from dulwich.objects import (
  47. Blob,
  48. Tree,
  49. )
  50. from dulwich.repo import Repo
  51. from dulwich.tests import (
  52. expectedFailure,
  53. TestCase,
  54. skipIf,
  55. )
  56. class IndexTestCase(TestCase):
  57. datadir = os.path.join(os.path.dirname(__file__), 'data/indexes')
  58. def get_simple_index(self, name):
  59. return Index(os.path.join(self.datadir, name))
  60. class SimpleIndexTestCase(IndexTestCase):
  61. def test_len(self):
  62. self.assertEqual(1, len(self.get_simple_index("index")))
  63. def test_iter(self):
  64. self.assertEqual([b'bla'], list(self.get_simple_index("index")))
  65. def test_getitem(self):
  66. self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020,
  67. 33188, 1000, 1000, 0,
  68. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
  69. self.get_simple_index("index")[b"bla"])
  70. def test_empty(self):
  71. i = self.get_simple_index("notanindex")
  72. self.assertEqual(0, len(i))
  73. self.assertFalse(os.path.exists(i._filename))
  74. def test_against_empty_tree(self):
  75. i = self.get_simple_index("index")
  76. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  77. self.assertEqual(1, len(changes))
  78. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  79. self.assertEqual(b'bla', newname)
  80. self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha)
  81. class SimpleIndexWriterTestCase(IndexTestCase):
  82. def setUp(self):
  83. IndexTestCase.setUp(self)
  84. self.tempdir = tempfile.mkdtemp()
  85. def tearDown(self):
  86. IndexTestCase.tearDown(self)
  87. shutil.rmtree(self.tempdir)
  88. def test_simple_write(self):
  89. entries = [(b'barbla', (1230680220, 0), (1230680220, 0), 2050, 3761020,
  90. 33188, 1000, 1000, 0,
  91. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)]
  92. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  93. with open(filename, 'wb+') as x:
  94. write_index(x, entries)
  95. with open(filename, 'rb') as x:
  96. self.assertEqual(entries, list(read_index(x)))
  97. class ReadIndexDictTests(IndexTestCase):
  98. def setUp(self):
  99. IndexTestCase.setUp(self)
  100. self.tempdir = tempfile.mkdtemp()
  101. def tearDown(self):
  102. IndexTestCase.tearDown(self)
  103. shutil.rmtree(self.tempdir)
  104. def test_simple_write(self):
  105. entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020,
  106. 33188, 1000, 1000, 0,
  107. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
  108. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  109. with open(filename, 'wb+') as x:
  110. write_index_dict(x, entries)
  111. with open(filename, 'rb') as x:
  112. self.assertEqual(entries, read_index_dict(x))
  113. class CommitTreeTests(TestCase):
  114. def setUp(self):
  115. super(CommitTreeTests, self).setUp()
  116. self.store = MemoryObjectStore()
  117. def test_single_blob(self):
  118. blob = Blob()
  119. blob.data = b"foo"
  120. self.store.add_object(blob)
  121. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  122. rootid = commit_tree(self.store, blobs)
  123. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  124. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  125. self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys()))
  126. def test_nested(self):
  127. blob = Blob()
  128. blob.data = b"foo"
  129. self.store.add_object(blob)
  130. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  131. rootid = commit_tree(self.store, blobs)
  132. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  133. dirid = self.store[rootid][b"bla"][1]
  134. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  135. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  136. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  137. self.assertEqual(set([rootid, dirid, blob.id]),
  138. set(self.store._data.keys()))
  139. class CleanupModeTests(TestCase):
  140. def test_file(self):
  141. self.assertEqual(0o100644, cleanup_mode(0o100000))
  142. def test_executable(self):
  143. self.assertEqual(0o100755, cleanup_mode(0o100711))
  144. def test_symlink(self):
  145. self.assertEqual(0o120000, cleanup_mode(0o120711))
  146. def test_dir(self):
  147. self.assertEqual(0o040000, cleanup_mode(0o40531))
  148. def test_submodule(self):
  149. self.assertEqual(0o160000, cleanup_mode(0o160744))
  150. class WriteCacheTimeTests(TestCase):
  151. def test_write_string(self):
  152. f = BytesIO()
  153. self.assertRaises(TypeError, write_cache_time, f, "foo")
  154. def test_write_int(self):
  155. f = BytesIO()
  156. write_cache_time(f, 434343)
  157. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  158. def test_write_tuple(self):
  159. f = BytesIO()
  160. write_cache_time(f, (434343, 21))
  161. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  162. def test_write_float(self):
  163. f = BytesIO()
  164. write_cache_time(f, 434343.000000021)
  165. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  166. class IndexEntryFromStatTests(TestCase):
  167. def test_simple(self):
  168. st = os.stat_result((16877, 131078, 64769,
  169. 154, 1000, 1000, 12288,
  170. 1323629595, 1324180496, 1324180496))
  171. entry = index_entry_from_stat(st, "22" * 20, 0)
  172. self.assertEqual(entry, (
  173. 1324180496,
  174. 1324180496,
  175. 64769,
  176. 131078,
  177. 16384,
  178. 1000,
  179. 1000,
  180. 12288,
  181. '2222222222222222222222222222222222222222',
  182. 0))
  183. def test_override_mode(self):
  184. st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769,
  185. 154, 1000, 1000, 12288,
  186. 1323629595, 1324180496, 1324180496))
  187. entry = index_entry_from_stat(st, "22" * 20, 0,
  188. mode=stat.S_IFREG + 0o755)
  189. self.assertEqual(entry, (
  190. 1324180496,
  191. 1324180496,
  192. 64769,
  193. 131078,
  194. 33261,
  195. 1000,
  196. 1000,
  197. 12288,
  198. '2222222222222222222222222222222222222222',
  199. 0))
  200. class BuildIndexTests(TestCase):
  201. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha):
  202. self.assertEqual(index_entry[4], mode) # mode
  203. self.assertEqual(index_entry[7], filesize) # filesize
  204. self.assertEqual(index_entry[8], sha) # sha
  205. def assertFileContents(self, path, contents, symlink=False):
  206. if symlink:
  207. self.assertEqual(os.readlink(path), contents)
  208. else:
  209. with open(path, 'rb') as f:
  210. self.assertEqual(f.read(), contents)
  211. def test_empty(self):
  212. repo_dir = tempfile.mkdtemp()
  213. self.addCleanup(shutil.rmtree, repo_dir)
  214. with closing(Repo.init(repo_dir)) as repo:
  215. tree = Tree()
  216. repo.object_store.add_object(tree)
  217. build_index_from_tree(repo.path, repo.index_path(),
  218. repo.object_store, tree.id)
  219. # Verify index entries
  220. index = repo.open_index()
  221. self.assertEqual(len(index), 0)
  222. # Verify no files
  223. self.assertEqual(['.git'], os.listdir(repo.path))
  224. def test_git_dir(self):
  225. repo_dir = tempfile.mkdtemp()
  226. self.addCleanup(shutil.rmtree, repo_dir)
  227. with closing(Repo.init(repo_dir)) as repo:
  228. # Populate repo
  229. filea = Blob.from_string(b'file a')
  230. filee = Blob.from_string(b'd')
  231. tree = Tree()
  232. tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id)
  233. tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id)
  234. repo.object_store.add_objects([(o, None)
  235. for o in [filea, filee, tree]])
  236. build_index_from_tree(repo.path, repo.index_path(),
  237. repo.object_store, tree.id)
  238. # Verify index entries
  239. index = repo.open_index()
  240. self.assertEqual(len(index), 1)
  241. # filea
  242. apath = os.path.join(repo.path, '.git', 'a')
  243. self.assertFalse(os.path.exists(apath))
  244. # filee
  245. epath = os.path.join(repo.path, 'c', 'e')
  246. self.assertTrue(os.path.exists(epath))
  247. self.assertReasonableIndexEntry(index[b'c/e'],
  248. stat.S_IFREG | 0o644, 1, filee.id)
  249. self.assertFileContents(epath, b'd')
  250. def test_nonempty(self):
  251. repo_dir = tempfile.mkdtemp()
  252. self.addCleanup(shutil.rmtree, repo_dir)
  253. with closing(Repo.init(repo_dir)) as repo:
  254. # Populate repo
  255. filea = Blob.from_string(b'file a')
  256. fileb = Blob.from_string(b'file b')
  257. filed = Blob.from_string(b'file d')
  258. tree = Tree()
  259. tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
  260. tree[b'b'] = (stat.S_IFREG | 0o644, fileb.id)
  261. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  262. repo.object_store.add_objects([(o, None)
  263. for o in [filea, fileb, filed, tree]])
  264. build_index_from_tree(repo.path, repo.index_path(),
  265. repo.object_store, tree.id)
  266. # Verify index entries
  267. index = repo.open_index()
  268. self.assertEqual(len(index), 3)
  269. # filea
  270. apath = os.path.join(repo.path, 'a')
  271. self.assertTrue(os.path.exists(apath))
  272. self.assertReasonableIndexEntry(index[b'a'],
  273. stat.S_IFREG | 0o644, 6, filea.id)
  274. self.assertFileContents(apath, b'file a')
  275. # fileb
  276. bpath = os.path.join(repo.path, 'b')
  277. self.assertTrue(os.path.exists(bpath))
  278. self.assertReasonableIndexEntry(index[b'b'],
  279. stat.S_IFREG | 0o644, 6, fileb.id)
  280. self.assertFileContents(bpath, b'file b')
  281. # filed
  282. dpath = os.path.join(repo.path, 'c', 'd')
  283. self.assertTrue(os.path.exists(dpath))
  284. self.assertReasonableIndexEntry(index[b'c/d'],
  285. stat.S_IFREG | 0o644, 6, filed.id)
  286. self.assertFileContents(dpath, b'file d')
  287. # Verify no extra files
  288. self.assertEqual(['.git', 'a', 'b', 'c'],
  289. sorted(os.listdir(repo.path)))
  290. self.assertEqual(['d'],
  291. sorted(os.listdir(os.path.join(repo.path, 'c'))))
  292. @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
  293. def test_symlink(self):
  294. repo_dir = tempfile.mkdtemp()
  295. self.addCleanup(shutil.rmtree, repo_dir)
  296. with closing(Repo.init(repo_dir)) as repo:
  297. # Populate repo
  298. filed = Blob.from_string(b'file d')
  299. filee = Blob.from_string(b'd')
  300. tree = Tree()
  301. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  302. tree[b'c/e'] = (stat.S_IFLNK, filee.id) # symlink
  303. repo.object_store.add_objects([(o, None)
  304. for o in [filed, filee, tree]])
  305. build_index_from_tree(repo.path, repo.index_path(),
  306. repo.object_store, tree.id)
  307. # Verify index entries
  308. index = repo.open_index()
  309. # symlink to d
  310. epath = os.path.join(repo.path, 'c', 'e')
  311. self.assertTrue(os.path.exists(epath))
  312. self.assertReasonableIndexEntry(
  313. index[b'c/e'], stat.S_IFLNK,
  314. 0 if sys.platform == 'win32' else 1,
  315. filee.id)
  316. self.assertFileContents(epath, 'd', symlink=True)
  317. @expectedFailure
  318. def test_no_decode_encode(self):
  319. repo_dir = tempfile.mkdtemp()
  320. repo_dir_bytes = repo_dir.encode(sys.getfilesystemencoding())
  321. self.addCleanup(shutil.rmtree, repo_dir)
  322. with closing(Repo.init(repo_dir)) as repo:
  323. # Populate repo
  324. file = Blob.from_string(b'foo')
  325. tree = Tree()
  326. latin1_name = u'À'.encode('latin1')
  327. utf8_name = u'À'.encode('utf8')
  328. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  329. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  330. repo.object_store.add_objects(
  331. [(o, None) for o in [file, tree]])
  332. build_index_from_tree(
  333. repo.path, repo.index_path(),
  334. repo.object_store, tree.id)
  335. # Verify index entries
  336. index = repo.open_index()
  337. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  338. self.assertTrue(os.path.exists(latin1_path))
  339. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  340. self.assertTrue(os.path.exists(utf8_path))
  341. class GetUnstagedChangesTests(TestCase):
  342. def test_get_unstaged_changes(self):
  343. """Unit test for get_unstaged_changes."""
  344. repo_dir = tempfile.mkdtemp()
  345. self.addCleanup(shutil.rmtree, repo_dir)
  346. with closing(Repo.init(repo_dir)) as repo:
  347. # Commit a dummy file then modify it
  348. foo1_fullpath = os.path.join(repo_dir, 'foo1')
  349. with open(foo1_fullpath, 'wb') as f:
  350. f.write(b'origstuff')
  351. foo2_fullpath = os.path.join(repo_dir, 'foo2')
  352. with open(foo2_fullpath, 'wb') as f:
  353. f.write(b'origstuff')
  354. repo.stage(['foo1', 'foo2'])
  355. repo.do_commit(b'test status', author=b'', committer=b'')
  356. with open(foo1_fullpath, 'wb') as f:
  357. f.write(b'newstuff')
  358. # modify access and modify time of path
  359. os.utime(foo1_fullpath, (0, 0))
  360. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  361. self.assertEqual(list(changes), [b'foo1'])
  362. class TestValidatePathElement(TestCase):
  363. def test_default(self):
  364. self.assertTrue(validate_path_element_default(b"bla"))
  365. self.assertTrue(validate_path_element_default(b".bla"))
  366. self.assertFalse(validate_path_element_default(b".git"))
  367. self.assertFalse(validate_path_element_default(b".giT"))
  368. self.assertFalse(validate_path_element_default(b".."))
  369. self.assertTrue(validate_path_element_default(b"git~1"))
  370. def test_ntfs(self):
  371. self.assertTrue(validate_path_element_ntfs(b"bla"))
  372. self.assertTrue(validate_path_element_ntfs(b".bla"))
  373. self.assertFalse(validate_path_element_ntfs(b".git"))
  374. self.assertFalse(validate_path_element_ntfs(b".giT"))
  375. self.assertFalse(validate_path_element_ntfs(b".."))
  376. self.assertFalse(validate_path_element_ntfs(b"git~1"))