test_index.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. # -*- coding: utf-8 -*-
  2. # test_index.py -- Tests for the git index
  3. # encoding: utf-8
  4. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
  5. #
  6. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  7. # General Public License as public by the Free Software Foundation; version 2.0
  8. # or (at your option) any later version. You can redistribute it and/or
  9. # modify it under the terms of either of these two licenses.
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. # You should have received a copy of the licenses; if not, see
  18. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  19. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  20. # License, Version 2.0.
  21. #
  22. """Tests for the index."""
  23. from contextlib import closing
  24. from io import BytesIO
  25. import os
  26. import shutil
  27. import stat
  28. import struct
  29. import sys
  30. import tempfile
  31. from dulwich.index import (
  32. Index,
  33. build_index_from_tree,
  34. cleanup_mode,
  35. commit_tree,
  36. get_unstaged_changes,
  37. index_entry_from_stat,
  38. read_index,
  39. read_index_dict,
  40. validate_path_element_default,
  41. validate_path_element_ntfs,
  42. write_cache_time,
  43. write_index,
  44. write_index_dict,
  45. _tree_to_fs_path,
  46. _fs_to_tree_path,
  47. )
  48. from dulwich.object_store import (
  49. MemoryObjectStore,
  50. )
  51. from dulwich.objects import (
  52. Blob,
  53. Tree,
  54. )
  55. from dulwich.repo import Repo
  56. from dulwich.tests import (
  57. TestCase,
  58. skipIf,
  59. )
  60. class IndexTestCase(TestCase):
  61. datadir = os.path.join(os.path.dirname(__file__), 'data/indexes')
  62. def get_simple_index(self, name):
  63. return Index(os.path.join(self.datadir, name))
  64. class SimpleIndexTestCase(IndexTestCase):
  65. def test_len(self):
  66. self.assertEqual(1, len(self.get_simple_index("index")))
  67. def test_iter(self):
  68. self.assertEqual([b'bla'], list(self.get_simple_index("index")))
  69. def test_getitem(self):
  70. self.assertEqual(((1230680220, 0), (1230680220, 0), 2050, 3761020,
  71. 33188, 1000, 1000, 0,
  72. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0),
  73. self.get_simple_index("index")[b"bla"])
  74. def test_empty(self):
  75. i = self.get_simple_index("notanindex")
  76. self.assertEqual(0, len(i))
  77. self.assertFalse(os.path.exists(i._filename))
  78. def test_against_empty_tree(self):
  79. i = self.get_simple_index("index")
  80. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  81. self.assertEqual(1, len(changes))
  82. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  83. self.assertEqual(b'bla', newname)
  84. self.assertEqual(b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', newsha)
  85. class SimpleIndexWriterTestCase(IndexTestCase):
  86. def setUp(self):
  87. IndexTestCase.setUp(self)
  88. self.tempdir = tempfile.mkdtemp()
  89. def tearDown(self):
  90. IndexTestCase.tearDown(self)
  91. shutil.rmtree(self.tempdir)
  92. def test_simple_write(self):
  93. entries = [(b'barbla', (1230680220, 0), (1230680220, 0), 2050, 3761020,
  94. 33188, 1000, 1000, 0,
  95. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)]
  96. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  97. with open(filename, 'wb+') as x:
  98. write_index(x, entries)
  99. with open(filename, 'rb') as x:
  100. self.assertEqual(entries, list(read_index(x)))
  101. class ReadIndexDictTests(IndexTestCase):
  102. def setUp(self):
  103. IndexTestCase.setUp(self)
  104. self.tempdir = tempfile.mkdtemp()
  105. def tearDown(self):
  106. IndexTestCase.tearDown(self)
  107. shutil.rmtree(self.tempdir)
  108. def test_simple_write(self):
  109. entries = {b'barbla': ((1230680220, 0), (1230680220, 0), 2050, 3761020,
  110. 33188, 1000, 1000, 0,
  111. b'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391', 0)}
  112. filename = os.path.join(self.tempdir, 'test-simple-write-index')
  113. with open(filename, 'wb+') as x:
  114. write_index_dict(x, entries)
  115. with open(filename, 'rb') as x:
  116. self.assertEqual(entries, read_index_dict(x))
  117. class CommitTreeTests(TestCase):
  118. def setUp(self):
  119. super(CommitTreeTests, self).setUp()
  120. self.store = MemoryObjectStore()
  121. def test_single_blob(self):
  122. blob = Blob()
  123. blob.data = b"foo"
  124. self.store.add_object(blob)
  125. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  126. rootid = commit_tree(self.store, blobs)
  127. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  128. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  129. self.assertEqual(set([rootid, blob.id]), set(self.store._data.keys()))
  130. def test_nested(self):
  131. blob = Blob()
  132. blob.data = b"foo"
  133. self.store.add_object(blob)
  134. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  135. rootid = commit_tree(self.store, blobs)
  136. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  137. dirid = self.store[rootid][b"bla"][1]
  138. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  139. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  140. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  141. self.assertEqual(set([rootid, dirid, blob.id]),
  142. set(self.store._data.keys()))
  143. class CleanupModeTests(TestCase):
  144. def test_file(self):
  145. self.assertEqual(0o100644, cleanup_mode(0o100000))
  146. def test_executable(self):
  147. self.assertEqual(0o100755, cleanup_mode(0o100711))
  148. def test_symlink(self):
  149. self.assertEqual(0o120000, cleanup_mode(0o120711))
  150. def test_dir(self):
  151. self.assertEqual(0o040000, cleanup_mode(0o40531))
  152. def test_submodule(self):
  153. self.assertEqual(0o160000, cleanup_mode(0o160744))
  154. class WriteCacheTimeTests(TestCase):
  155. def test_write_string(self):
  156. f = BytesIO()
  157. self.assertRaises(TypeError, write_cache_time, f, "foo")
  158. def test_write_int(self):
  159. f = BytesIO()
  160. write_cache_time(f, 434343)
  161. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  162. def test_write_tuple(self):
  163. f = BytesIO()
  164. write_cache_time(f, (434343, 21))
  165. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  166. def test_write_float(self):
  167. f = BytesIO()
  168. write_cache_time(f, 434343.000000021)
  169. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  170. class IndexEntryFromStatTests(TestCase):
  171. def test_simple(self):
  172. st = os.stat_result((16877, 131078, 64769,
  173. 154, 1000, 1000, 12288,
  174. 1323629595, 1324180496, 1324180496))
  175. entry = index_entry_from_stat(st, "22" * 20, 0)
  176. self.assertEqual(entry, (
  177. 1324180496,
  178. 1324180496,
  179. 64769,
  180. 131078,
  181. 16384,
  182. 1000,
  183. 1000,
  184. 12288,
  185. '2222222222222222222222222222222222222222',
  186. 0))
  187. def test_override_mode(self):
  188. st = os.stat_result((stat.S_IFREG + 0o644, 131078, 64769,
  189. 154, 1000, 1000, 12288,
  190. 1323629595, 1324180496, 1324180496))
  191. entry = index_entry_from_stat(st, "22" * 20, 0,
  192. mode=stat.S_IFREG + 0o755)
  193. self.assertEqual(entry, (
  194. 1324180496,
  195. 1324180496,
  196. 64769,
  197. 131078,
  198. 33261,
  199. 1000,
  200. 1000,
  201. 12288,
  202. '2222222222222222222222222222222222222222',
  203. 0))
  204. class BuildIndexTests(TestCase):
  205. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha):
  206. self.assertEqual(index_entry[4], mode) # mode
  207. self.assertEqual(index_entry[7], filesize) # filesize
  208. self.assertEqual(index_entry[8], sha) # sha
  209. def assertFileContents(self, path, contents, symlink=False):
  210. if symlink:
  211. self.assertEqual(os.readlink(path), contents)
  212. else:
  213. with open(path, 'rb') as f:
  214. self.assertEqual(f.read(), contents)
  215. def test_empty(self):
  216. repo_dir = tempfile.mkdtemp()
  217. self.addCleanup(shutil.rmtree, repo_dir)
  218. with closing(Repo.init(repo_dir)) as repo:
  219. tree = Tree()
  220. repo.object_store.add_object(tree)
  221. build_index_from_tree(repo.path, repo.index_path(),
  222. repo.object_store, tree.id)
  223. # Verify index entries
  224. index = repo.open_index()
  225. self.assertEqual(len(index), 0)
  226. # Verify no files
  227. self.assertEqual(['.git'], os.listdir(repo.path))
  228. def test_git_dir(self):
  229. repo_dir = tempfile.mkdtemp()
  230. self.addCleanup(shutil.rmtree, repo_dir)
  231. with closing(Repo.init(repo_dir)) as repo:
  232. # Populate repo
  233. filea = Blob.from_string(b'file a')
  234. filee = Blob.from_string(b'd')
  235. tree = Tree()
  236. tree[b'.git/a'] = (stat.S_IFREG | 0o644, filea.id)
  237. tree[b'c/e'] = (stat.S_IFREG | 0o644, filee.id)
  238. repo.object_store.add_objects([(o, None)
  239. for o in [filea, filee, tree]])
  240. build_index_from_tree(repo.path, repo.index_path(),
  241. repo.object_store, tree.id)
  242. # Verify index entries
  243. index = repo.open_index()
  244. self.assertEqual(len(index), 1)
  245. # filea
  246. apath = os.path.join(repo.path, '.git', 'a')
  247. self.assertFalse(os.path.exists(apath))
  248. # filee
  249. epath = os.path.join(repo.path, 'c', 'e')
  250. self.assertTrue(os.path.exists(epath))
  251. self.assertReasonableIndexEntry(index[b'c/e'],
  252. stat.S_IFREG | 0o644, 1, filee.id)
  253. self.assertFileContents(epath, b'd')
  254. def test_nonempty(self):
  255. repo_dir = tempfile.mkdtemp()
  256. self.addCleanup(shutil.rmtree, repo_dir)
  257. with closing(Repo.init(repo_dir)) as repo:
  258. # Populate repo
  259. filea = Blob.from_string(b'file a')
  260. fileb = Blob.from_string(b'file b')
  261. filed = Blob.from_string(b'file d')
  262. tree = Tree()
  263. tree[b'a'] = (stat.S_IFREG | 0o644, filea.id)
  264. tree[b'b'] = (stat.S_IFREG | 0o644, fileb.id)
  265. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  266. repo.object_store.add_objects([(o, None)
  267. for o in [filea, fileb, filed, tree]])
  268. build_index_from_tree(repo.path, repo.index_path(),
  269. repo.object_store, tree.id)
  270. # Verify index entries
  271. index = repo.open_index()
  272. self.assertEqual(len(index), 3)
  273. # filea
  274. apath = os.path.join(repo.path, 'a')
  275. self.assertTrue(os.path.exists(apath))
  276. self.assertReasonableIndexEntry(index[b'a'],
  277. stat.S_IFREG | 0o644, 6, filea.id)
  278. self.assertFileContents(apath, b'file a')
  279. # fileb
  280. bpath = os.path.join(repo.path, 'b')
  281. self.assertTrue(os.path.exists(bpath))
  282. self.assertReasonableIndexEntry(index[b'b'],
  283. stat.S_IFREG | 0o644, 6, fileb.id)
  284. self.assertFileContents(bpath, b'file b')
  285. # filed
  286. dpath = os.path.join(repo.path, 'c', 'd')
  287. self.assertTrue(os.path.exists(dpath))
  288. self.assertReasonableIndexEntry(index[b'c/d'],
  289. stat.S_IFREG | 0o644, 6, filed.id)
  290. self.assertFileContents(dpath, b'file d')
  291. # Verify no extra files
  292. self.assertEqual(['.git', 'a', 'b', 'c'],
  293. sorted(os.listdir(repo.path)))
  294. self.assertEqual(['d'],
  295. sorted(os.listdir(os.path.join(repo.path, 'c'))))
  296. @skipIf(not getattr(os, 'symlink', None), 'Requires symlink support')
  297. def test_symlink(self):
  298. repo_dir = tempfile.mkdtemp()
  299. self.addCleanup(shutil.rmtree, repo_dir)
  300. with closing(Repo.init(repo_dir)) as repo:
  301. # Populate repo
  302. filed = Blob.from_string(b'file d')
  303. filee = Blob.from_string(b'd')
  304. tree = Tree()
  305. tree[b'c/d'] = (stat.S_IFREG | 0o644, filed.id)
  306. tree[b'c/e'] = (stat.S_IFLNK, filee.id) # symlink
  307. repo.object_store.add_objects([(o, None)
  308. for o in [filed, filee, tree]])
  309. build_index_from_tree(repo.path, repo.index_path(),
  310. repo.object_store, tree.id)
  311. # Verify index entries
  312. index = repo.open_index()
  313. # symlink to d
  314. epath = os.path.join(repo.path, 'c', 'e')
  315. self.assertTrue(os.path.exists(epath))
  316. self.assertReasonableIndexEntry(
  317. index[b'c/e'], stat.S_IFLNK,
  318. 0 if sys.platform == 'win32' else 1,
  319. filee.id)
  320. self.assertFileContents(epath, 'd', symlink=True)
  321. def test_no_decode_encode(self):
  322. repo_dir = tempfile.mkdtemp()
  323. repo_dir_bytes = repo_dir.encode(sys.getfilesystemencoding())
  324. self.addCleanup(shutil.rmtree, repo_dir)
  325. with closing(Repo.init(repo_dir)) as repo:
  326. # Populate repo
  327. file = Blob.from_string(b'foo')
  328. tree = Tree()
  329. latin1_name = u'À'.encode('latin1')
  330. utf8_name = u'À'.encode('utf8')
  331. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  332. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  333. repo.object_store.add_objects(
  334. [(o, None) for o in [file, tree]])
  335. build_index_from_tree(
  336. repo.path, repo.index_path(),
  337. repo.object_store, tree.id)
  338. # Verify index entries
  339. index = repo.open_index()
  340. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  341. self.assertTrue(os.path.exists(latin1_path))
  342. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  343. self.assertTrue(os.path.exists(utf8_path))
  344. class GetUnstagedChangesTests(TestCase):
  345. def test_get_unstaged_changes(self):
  346. """Unit test for get_unstaged_changes."""
  347. repo_dir = tempfile.mkdtemp()
  348. self.addCleanup(shutil.rmtree, repo_dir)
  349. with closing(Repo.init(repo_dir)) as repo:
  350. # Commit a dummy file then modify it
  351. foo1_fullpath = os.path.join(repo_dir, 'foo1')
  352. with open(foo1_fullpath, 'wb') as f:
  353. f.write(b'origstuff')
  354. foo2_fullpath = os.path.join(repo_dir, 'foo2')
  355. with open(foo2_fullpath, 'wb') as f:
  356. f.write(b'origstuff')
  357. repo.stage(['foo1', 'foo2'])
  358. repo.do_commit(b'test status', author=b'', committer=b'')
  359. with open(foo1_fullpath, 'wb') as f:
  360. f.write(b'newstuff')
  361. # modify access and modify time of path
  362. os.utime(foo1_fullpath, (0, 0))
  363. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  364. self.assertEqual(list(changes), [b'foo1'])
  365. class TestValidatePathElement(TestCase):
  366. def test_default(self):
  367. self.assertTrue(validate_path_element_default(b"bla"))
  368. self.assertTrue(validate_path_element_default(b".bla"))
  369. self.assertFalse(validate_path_element_default(b".git"))
  370. self.assertFalse(validate_path_element_default(b".giT"))
  371. self.assertFalse(validate_path_element_default(b".."))
  372. self.assertTrue(validate_path_element_default(b"git~1"))
  373. def test_ntfs(self):
  374. self.assertTrue(validate_path_element_ntfs(b"bla"))
  375. self.assertTrue(validate_path_element_ntfs(b".bla"))
  376. self.assertFalse(validate_path_element_ntfs(b".git"))
  377. self.assertFalse(validate_path_element_ntfs(b".giT"))
  378. self.assertFalse(validate_path_element_ntfs(b".."))
  379. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  380. class TestTreeFSPathConversion(TestCase):
  381. def test_tree_to_fs_path(self):
  382. tree_path = u'délwíçh/foo'.encode('utf8')
  383. fs_path = _tree_to_fs_path(b'/prefix/path', tree_path)
  384. self.assertEqual(
  385. fs_path,
  386. os.path.join(u'/prefix/path', u'délwíçh', u'foo').encode('utf8'))
  387. def test_fs_to_tree_path_str(self):
  388. fs_path = os.path.join(os.path.join(u'délwíçh', u'foo'))
  389. tree_path = _fs_to_tree_path(fs_path, "utf-8")
  390. self.assertEqual(tree_path, u'délwíçh/foo'.encode("utf-8"))
  391. def test_fs_to_tree_path_bytes(self):
  392. fs_path = os.path.join(os.path.join(u'délwíçh', u'foo').encode('utf8'))
  393. tree_path = _fs_to_tree_path(fs_path, "utf-8")
  394. self.assertEqual(tree_path, u'délwíçh/foo'.encode('utf8'))