test_index.py 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the index."""
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from io import BytesIO
  29. from dulwich.index import (
  30. Index,
  31. IndexEntry,
  32. SerializedIndexEntry,
  33. _fs_to_tree_path,
  34. _tree_to_fs_path,
  35. build_index_from_tree,
  36. cleanup_mode,
  37. commit_tree,
  38. get_unstaged_changes,
  39. index_entry_from_directory,
  40. index_entry_from_path,
  41. index_entry_from_stat,
  42. iter_fresh_entries,
  43. read_index,
  44. read_index_dict,
  45. validate_path_element_default,
  46. validate_path_element_ntfs,
  47. write_cache_time,
  48. write_index,
  49. write_index_dict,
  50. )
  51. from dulwich.object_store import MemoryObjectStore
  52. from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
  53. from dulwich.repo import Repo
  54. from . import TestCase, skipIf
  55. def can_symlink() -> bool:
  56. """Return whether running process can create symlinks."""
  57. if sys.platform != "win32":
  58. # Platforms other than Windows should allow symlinks without issues.
  59. return True
  60. test_source = tempfile.mkdtemp()
  61. test_target = test_source + "can_symlink"
  62. try:
  63. os.symlink(test_source, test_target)
  64. except (NotImplementedError, OSError):
  65. return False
  66. return True
  67. class IndexTestCase(TestCase):
  68. datadir = os.path.join(os.path.dirname(__file__), "../testdata/indexes")
  69. def get_simple_index(self, name):
  70. return Index(os.path.join(self.datadir, name))
  71. class SimpleIndexTestCase(IndexTestCase):
  72. def test_len(self) -> None:
  73. self.assertEqual(1, len(self.get_simple_index("index")))
  74. def test_iter(self) -> None:
  75. self.assertEqual([b"bla"], list(self.get_simple_index("index")))
  76. def test_iter_skip_hash(self) -> None:
  77. self.assertEqual([b"bla"], list(self.get_simple_index("index_skip_hash")))
  78. def test_iterobjects(self) -> None:
  79. self.assertEqual(
  80. [(b"bla", b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", 33188)],
  81. list(self.get_simple_index("index").iterobjects()),
  82. )
  83. def test_getitem(self) -> None:
  84. self.assertEqual(
  85. IndexEntry(
  86. (1230680220, 0),
  87. (1230680220, 0),
  88. 2050,
  89. 3761020,
  90. 33188,
  91. 1000,
  92. 1000,
  93. 0,
  94. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  95. 0,
  96. 0,
  97. ),
  98. self.get_simple_index("index")[b"bla"],
  99. )
  100. def test_empty(self) -> None:
  101. i = self.get_simple_index("notanindex")
  102. self.assertEqual(0, len(i))
  103. self.assertFalse(os.path.exists(i._filename))
  104. def test_against_empty_tree(self) -> None:
  105. i = self.get_simple_index("index")
  106. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  107. self.assertEqual(1, len(changes))
  108. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  109. self.assertEqual(b"bla", newname)
  110. self.assertEqual(b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", newsha)
  111. class SimpleIndexWriterTestCase(IndexTestCase):
  112. def setUp(self) -> None:
  113. IndexTestCase.setUp(self)
  114. self.tempdir = tempfile.mkdtemp()
  115. def tearDown(self) -> None:
  116. IndexTestCase.tearDown(self)
  117. shutil.rmtree(self.tempdir)
  118. def test_simple_write(self) -> None:
  119. entries = [
  120. (
  121. SerializedIndexEntry(
  122. b"barbla",
  123. (1230680220, 0),
  124. (1230680220, 0),
  125. 2050,
  126. 3761020,
  127. 33188,
  128. 1000,
  129. 1000,
  130. 0,
  131. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  132. 0,
  133. 0,
  134. )
  135. )
  136. ]
  137. filename = os.path.join(self.tempdir, "test-simple-write-index")
  138. with open(filename, "wb+") as x:
  139. write_index(x, entries)
  140. with open(filename, "rb") as x:
  141. self.assertEqual(entries, list(read_index(x)))
  142. class ReadIndexDictTests(IndexTestCase):
  143. def setUp(self) -> None:
  144. IndexTestCase.setUp(self)
  145. self.tempdir = tempfile.mkdtemp()
  146. def tearDown(self) -> None:
  147. IndexTestCase.tearDown(self)
  148. shutil.rmtree(self.tempdir)
  149. def test_simple_write(self) -> None:
  150. entries = {
  151. b"barbla": IndexEntry(
  152. (1230680220, 0),
  153. (1230680220, 0),
  154. 2050,
  155. 3761020,
  156. 33188,
  157. 1000,
  158. 1000,
  159. 0,
  160. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  161. 0,
  162. 0,
  163. )
  164. }
  165. filename = os.path.join(self.tempdir, "test-simple-write-index")
  166. with open(filename, "wb+") as x:
  167. write_index_dict(x, entries)
  168. with open(filename, "rb") as x:
  169. self.assertEqual(entries, read_index_dict(x))
  170. class CommitTreeTests(TestCase):
  171. def setUp(self) -> None:
  172. super().setUp()
  173. self.store = MemoryObjectStore()
  174. def test_single_blob(self) -> None:
  175. blob = Blob()
  176. blob.data = b"foo"
  177. self.store.add_object(blob)
  178. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  179. rootid = commit_tree(self.store, blobs)
  180. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  181. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  182. self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
  183. def test_nested(self) -> None:
  184. blob = Blob()
  185. blob.data = b"foo"
  186. self.store.add_object(blob)
  187. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  188. rootid = commit_tree(self.store, blobs)
  189. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  190. dirid = self.store[rootid][b"bla"][1]
  191. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  192. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  193. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  194. self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
  195. class CleanupModeTests(TestCase):
  196. def assertModeEqual(self, expected, got) -> None:
  197. self.assertEqual(expected, got, f"{expected:o} != {got:o}")
  198. def test_file(self) -> None:
  199. self.assertModeEqual(0o100644, cleanup_mode(0o100000))
  200. def test_executable(self) -> None:
  201. self.assertModeEqual(0o100755, cleanup_mode(0o100711))
  202. self.assertModeEqual(0o100755, cleanup_mode(0o100700))
  203. def test_symlink(self) -> None:
  204. self.assertModeEqual(0o120000, cleanup_mode(0o120711))
  205. def test_dir(self) -> None:
  206. self.assertModeEqual(0o040000, cleanup_mode(0o40531))
  207. def test_submodule(self) -> None:
  208. self.assertModeEqual(0o160000, cleanup_mode(0o160744))
  209. class WriteCacheTimeTests(TestCase):
  210. def test_write_string(self) -> None:
  211. f = BytesIO()
  212. self.assertRaises(TypeError, write_cache_time, f, "foo")
  213. def test_write_int(self) -> None:
  214. f = BytesIO()
  215. write_cache_time(f, 434343)
  216. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  217. def test_write_tuple(self) -> None:
  218. f = BytesIO()
  219. write_cache_time(f, (434343, 21))
  220. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  221. def test_write_float(self) -> None:
  222. f = BytesIO()
  223. write_cache_time(f, 434343.000000021)
  224. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  225. class IndexEntryFromStatTests(TestCase):
  226. def test_simple(self) -> None:
  227. st = os.stat_result(
  228. (
  229. 16877,
  230. 131078,
  231. 64769,
  232. 154,
  233. 1000,
  234. 1000,
  235. 12288,
  236. 1323629595,
  237. 1324180496,
  238. 1324180496,
  239. )
  240. )
  241. entry = index_entry_from_stat(st, b"22" * 20)
  242. self.assertEqual(
  243. entry,
  244. IndexEntry(
  245. 1324180496,
  246. 1324180496,
  247. 64769,
  248. 131078,
  249. 16384,
  250. 1000,
  251. 1000,
  252. 12288,
  253. b"2222222222222222222222222222222222222222",
  254. 0,
  255. 0,
  256. ),
  257. )
  258. def test_override_mode(self) -> None:
  259. st = os.stat_result(
  260. (
  261. stat.S_IFREG + 0o644,
  262. 131078,
  263. 64769,
  264. 154,
  265. 1000,
  266. 1000,
  267. 12288,
  268. 1323629595,
  269. 1324180496,
  270. 1324180496,
  271. )
  272. )
  273. entry = index_entry_from_stat(st, b"22" * 20, mode=stat.S_IFREG + 0o755)
  274. self.assertEqual(
  275. entry,
  276. IndexEntry(
  277. 1324180496,
  278. 1324180496,
  279. 64769,
  280. 131078,
  281. 33261,
  282. 1000,
  283. 1000,
  284. 12288,
  285. b"2222222222222222222222222222222222222222",
  286. 0,
  287. 0,
  288. ),
  289. )
  290. class BuildIndexTests(TestCase):
  291. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha) -> None:
  292. self.assertEqual(index_entry.mode, mode) # mode
  293. self.assertEqual(index_entry.size, filesize) # filesize
  294. self.assertEqual(index_entry.sha, sha) # sha
  295. def assertFileContents(self, path, contents, symlink=False) -> None:
  296. if symlink:
  297. self.assertEqual(os.readlink(path), contents)
  298. else:
  299. with open(path, "rb") as f:
  300. self.assertEqual(f.read(), contents)
  301. def test_empty(self) -> None:
  302. repo_dir = tempfile.mkdtemp()
  303. self.addCleanup(shutil.rmtree, repo_dir)
  304. with Repo.init(repo_dir) as repo:
  305. tree = Tree()
  306. repo.object_store.add_object(tree)
  307. build_index_from_tree(
  308. repo.path, repo.index_path(), repo.object_store, tree.id
  309. )
  310. # Verify index entries
  311. index = repo.open_index()
  312. self.assertEqual(len(index), 0)
  313. # Verify no files
  314. self.assertEqual([".git"], os.listdir(repo.path))
  315. def test_git_dir(self) -> None:
  316. repo_dir = tempfile.mkdtemp()
  317. self.addCleanup(shutil.rmtree, repo_dir)
  318. with Repo.init(repo_dir) as repo:
  319. # Populate repo
  320. filea = Blob.from_string(b"file a")
  321. filee = Blob.from_string(b"d")
  322. tree = Tree()
  323. tree[b".git/a"] = (stat.S_IFREG | 0o644, filea.id)
  324. tree[b"c/e"] = (stat.S_IFREG | 0o644, filee.id)
  325. repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]])
  326. build_index_from_tree(
  327. repo.path, repo.index_path(), repo.object_store, tree.id
  328. )
  329. # Verify index entries
  330. index = repo.open_index()
  331. self.assertEqual(len(index), 1)
  332. # filea
  333. apath = os.path.join(repo.path, ".git", "a")
  334. self.assertFalse(os.path.exists(apath))
  335. # filee
  336. epath = os.path.join(repo.path, "c", "e")
  337. self.assertTrue(os.path.exists(epath))
  338. self.assertReasonableIndexEntry(
  339. index[b"c/e"], stat.S_IFREG | 0o644, 1, filee.id
  340. )
  341. self.assertFileContents(epath, b"d")
  342. def test_nonempty(self) -> None:
  343. repo_dir = tempfile.mkdtemp()
  344. self.addCleanup(shutil.rmtree, repo_dir)
  345. with Repo.init(repo_dir) as repo:
  346. # Populate repo
  347. filea = Blob.from_string(b"file a")
  348. fileb = Blob.from_string(b"file b")
  349. filed = Blob.from_string(b"file d")
  350. tree = Tree()
  351. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  352. tree[b"b"] = (stat.S_IFREG | 0o644, fileb.id)
  353. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  354. repo.object_store.add_objects(
  355. [(o, None) for o in [filea, fileb, filed, tree]]
  356. )
  357. build_index_from_tree(
  358. repo.path, repo.index_path(), repo.object_store, tree.id
  359. )
  360. # Verify index entries
  361. index = repo.open_index()
  362. self.assertEqual(len(index), 3)
  363. # filea
  364. apath = os.path.join(repo.path, "a")
  365. self.assertTrue(os.path.exists(apath))
  366. self.assertReasonableIndexEntry(
  367. index[b"a"], stat.S_IFREG | 0o644, 6, filea.id
  368. )
  369. self.assertFileContents(apath, b"file a")
  370. # fileb
  371. bpath = os.path.join(repo.path, "b")
  372. self.assertTrue(os.path.exists(bpath))
  373. self.assertReasonableIndexEntry(
  374. index[b"b"], stat.S_IFREG | 0o644, 6, fileb.id
  375. )
  376. self.assertFileContents(bpath, b"file b")
  377. # filed
  378. dpath = os.path.join(repo.path, "c", "d")
  379. self.assertTrue(os.path.exists(dpath))
  380. self.assertReasonableIndexEntry(
  381. index[b"c/d"], stat.S_IFREG | 0o644, 6, filed.id
  382. )
  383. self.assertFileContents(dpath, b"file d")
  384. # Verify no extra files
  385. self.assertEqual([".git", "a", "b", "c"], sorted(os.listdir(repo.path)))
  386. self.assertEqual(["d"], sorted(os.listdir(os.path.join(repo.path, "c"))))
  387. @skipIf(not getattr(os, "sync", None), "Requires sync support")
  388. def test_norewrite(self) -> None:
  389. repo_dir = tempfile.mkdtemp()
  390. self.addCleanup(shutil.rmtree, repo_dir)
  391. with Repo.init(repo_dir) as repo:
  392. # Populate repo
  393. filea = Blob.from_string(b"file a")
  394. filea_path = os.path.join(repo_dir, "a")
  395. tree = Tree()
  396. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  397. repo.object_store.add_objects([(o, None) for o in [filea, tree]])
  398. # First Write
  399. build_index_from_tree(
  400. repo.path, repo.index_path(), repo.object_store, tree.id
  401. )
  402. # Use sync as metadata can be cached on some FS
  403. os.sync()
  404. mtime = os.stat(filea_path).st_mtime
  405. # Test Rewrite
  406. build_index_from_tree(
  407. repo.path, repo.index_path(), repo.object_store, tree.id
  408. )
  409. os.sync()
  410. self.assertEqual(mtime, os.stat(filea_path).st_mtime)
  411. # Modify content
  412. with open(filea_path, "wb") as fh:
  413. fh.write(b"test a")
  414. os.sync()
  415. mtime = os.stat(filea_path).st_mtime
  416. # Test rewrite
  417. build_index_from_tree(
  418. repo.path, repo.index_path(), repo.object_store, tree.id
  419. )
  420. os.sync()
  421. with open(filea_path, "rb") as fh:
  422. self.assertEqual(b"file a", fh.read())
  423. @skipIf(not can_symlink(), "Requires symlink support")
  424. def test_symlink(self) -> None:
  425. repo_dir = tempfile.mkdtemp()
  426. self.addCleanup(shutil.rmtree, repo_dir)
  427. with Repo.init(repo_dir) as repo:
  428. # Populate repo
  429. filed = Blob.from_string(b"file d")
  430. filee = Blob.from_string(b"d")
  431. tree = Tree()
  432. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  433. tree[b"c/e"] = (stat.S_IFLNK, filee.id) # symlink
  434. repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]])
  435. build_index_from_tree(
  436. repo.path, repo.index_path(), repo.object_store, tree.id
  437. )
  438. # Verify index entries
  439. index = repo.open_index()
  440. # symlink to d
  441. epath = os.path.join(repo.path, "c", "e")
  442. self.assertTrue(os.path.exists(epath))
  443. self.assertReasonableIndexEntry(
  444. index[b"c/e"],
  445. stat.S_IFLNK,
  446. 0 if sys.platform == "win32" else 1,
  447. filee.id,
  448. )
  449. self.assertFileContents(epath, "d", symlink=True)
  450. def test_no_decode_encode(self) -> None:
  451. repo_dir = tempfile.mkdtemp()
  452. repo_dir_bytes = os.fsencode(repo_dir)
  453. self.addCleanup(shutil.rmtree, repo_dir)
  454. with Repo.init(repo_dir) as repo:
  455. # Populate repo
  456. file = Blob.from_string(b"foo")
  457. tree = Tree()
  458. latin1_name = "À".encode("latin1")
  459. try:
  460. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  461. except UnicodeDecodeError:
  462. self.skipTest("can not decode as latin1")
  463. utf8_name = "À".encode()
  464. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  465. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  466. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  467. repo.object_store.add_objects([(o, None) for o in [file, tree]])
  468. try:
  469. build_index_from_tree(
  470. repo.path, repo.index_path(), repo.object_store, tree.id
  471. )
  472. except OSError as e:
  473. if e.errno == 92 and sys.platform == "darwin":
  474. # Our filename isn't supported by the platform :(
  475. self.skipTest(f"can not write filename {e.filename!r}")
  476. else:
  477. raise
  478. except UnicodeDecodeError:
  479. # This happens e.g. with python3.6 on Windows.
  480. # It implicitly decodes using utf8, which doesn't work.
  481. self.skipTest("can not implicitly convert as utf8")
  482. # Verify index entries
  483. index = repo.open_index()
  484. self.assertIn(latin1_name, index)
  485. self.assertIn(utf8_name, index)
  486. self.assertTrue(os.path.exists(latin1_path))
  487. self.assertTrue(os.path.exists(utf8_path))
  488. def test_git_submodule(self) -> None:
  489. repo_dir = tempfile.mkdtemp()
  490. self.addCleanup(shutil.rmtree, repo_dir)
  491. with Repo.init(repo_dir) as repo:
  492. filea = Blob.from_string(b"file alalala")
  493. subtree = Tree()
  494. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  495. c = Commit()
  496. c.tree = subtree.id
  497. c.committer = c.author = b"Somebody <somebody@example.com>"
  498. c.commit_time = c.author_time = 42342
  499. c.commit_timezone = c.author_timezone = 0
  500. c.parents = []
  501. c.message = b"Subcommit"
  502. tree = Tree()
  503. tree[b"c"] = (S_IFGITLINK, c.id)
  504. repo.object_store.add_objects([(o, None) for o in [tree]])
  505. build_index_from_tree(
  506. repo.path, repo.index_path(), repo.object_store, tree.id
  507. )
  508. # Verify index entries
  509. index = repo.open_index()
  510. self.assertEqual(len(index), 1)
  511. # filea
  512. apath = os.path.join(repo.path, "c/a")
  513. self.assertFalse(os.path.exists(apath))
  514. # dir c
  515. cpath = os.path.join(repo.path, "c")
  516. self.assertTrue(os.path.isdir(cpath))
  517. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  518. self.assertEqual(index[b"c"].sha, c.id) # sha
  519. def test_git_submodule_exists(self) -> None:
  520. repo_dir = tempfile.mkdtemp()
  521. self.addCleanup(shutil.rmtree, repo_dir)
  522. with Repo.init(repo_dir) as repo:
  523. filea = Blob.from_string(b"file alalala")
  524. subtree = Tree()
  525. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  526. c = Commit()
  527. c.tree = subtree.id
  528. c.committer = c.author = b"Somebody <somebody@example.com>"
  529. c.commit_time = c.author_time = 42342
  530. c.commit_timezone = c.author_timezone = 0
  531. c.parents = []
  532. c.message = b"Subcommit"
  533. tree = Tree()
  534. tree[b"c"] = (S_IFGITLINK, c.id)
  535. os.mkdir(os.path.join(repo_dir, "c"))
  536. repo.object_store.add_objects([(o, None) for o in [tree]])
  537. build_index_from_tree(
  538. repo.path, repo.index_path(), repo.object_store, tree.id
  539. )
  540. # Verify index entries
  541. index = repo.open_index()
  542. self.assertEqual(len(index), 1)
  543. # filea
  544. apath = os.path.join(repo.path, "c/a")
  545. self.assertFalse(os.path.exists(apath))
  546. # dir c
  547. cpath = os.path.join(repo.path, "c")
  548. self.assertTrue(os.path.isdir(cpath))
  549. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  550. self.assertEqual(index[b"c"].sha, c.id) # sha
  551. class GetUnstagedChangesTests(TestCase):
  552. def test_get_unstaged_changes(self) -> None:
  553. """Unit test for get_unstaged_changes."""
  554. repo_dir = tempfile.mkdtemp()
  555. self.addCleanup(shutil.rmtree, repo_dir)
  556. with Repo.init(repo_dir) as repo:
  557. # Commit a dummy file then modify it
  558. foo1_fullpath = os.path.join(repo_dir, "foo1")
  559. with open(foo1_fullpath, "wb") as f:
  560. f.write(b"origstuff")
  561. foo2_fullpath = os.path.join(repo_dir, "foo2")
  562. with open(foo2_fullpath, "wb") as f:
  563. f.write(b"origstuff")
  564. repo.stage(["foo1", "foo2"])
  565. repo.do_commit(
  566. b"test status",
  567. author=b"author <email>",
  568. committer=b"committer <email>",
  569. )
  570. with open(foo1_fullpath, "wb") as f:
  571. f.write(b"newstuff")
  572. # modify access and modify time of path
  573. os.utime(foo1_fullpath, (0, 0))
  574. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  575. self.assertEqual(list(changes), [b"foo1"])
  576. def test_get_unstaged_deleted_changes(self) -> None:
  577. """Unit test for get_unstaged_changes."""
  578. repo_dir = tempfile.mkdtemp()
  579. self.addCleanup(shutil.rmtree, repo_dir)
  580. with Repo.init(repo_dir) as repo:
  581. # Commit a dummy file then remove it
  582. foo1_fullpath = os.path.join(repo_dir, "foo1")
  583. with open(foo1_fullpath, "wb") as f:
  584. f.write(b"origstuff")
  585. repo.stage(["foo1"])
  586. repo.do_commit(
  587. b"test status",
  588. author=b"author <email>",
  589. committer=b"committer <email>",
  590. )
  591. os.unlink(foo1_fullpath)
  592. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  593. self.assertEqual(list(changes), [b"foo1"])
  594. def test_get_unstaged_changes_removed_replaced_by_directory(self) -> None:
  595. """Unit test for get_unstaged_changes."""
  596. repo_dir = tempfile.mkdtemp()
  597. self.addCleanup(shutil.rmtree, repo_dir)
  598. with Repo.init(repo_dir) as repo:
  599. # Commit a dummy file then modify it
  600. foo1_fullpath = os.path.join(repo_dir, "foo1")
  601. with open(foo1_fullpath, "wb") as f:
  602. f.write(b"origstuff")
  603. repo.stage(["foo1"])
  604. repo.do_commit(
  605. b"test status",
  606. author=b"author <email>",
  607. committer=b"committer <email>",
  608. )
  609. os.remove(foo1_fullpath)
  610. os.mkdir(foo1_fullpath)
  611. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  612. self.assertEqual(list(changes), [b"foo1"])
  613. @skipIf(not can_symlink(), "Requires symlink support")
  614. def test_get_unstaged_changes_removed_replaced_by_link(self) -> None:
  615. """Unit test for get_unstaged_changes."""
  616. repo_dir = tempfile.mkdtemp()
  617. self.addCleanup(shutil.rmtree, repo_dir)
  618. with Repo.init(repo_dir) as repo:
  619. # Commit a dummy file then modify it
  620. foo1_fullpath = os.path.join(repo_dir, "foo1")
  621. with open(foo1_fullpath, "wb") as f:
  622. f.write(b"origstuff")
  623. repo.stage(["foo1"])
  624. repo.do_commit(
  625. b"test status",
  626. author=b"author <email>",
  627. committer=b"committer <email>",
  628. )
  629. os.remove(foo1_fullpath)
  630. os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
  631. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  632. self.assertEqual(list(changes), [b"foo1"])
  633. class TestValidatePathElement(TestCase):
  634. def test_default(self) -> None:
  635. self.assertTrue(validate_path_element_default(b"bla"))
  636. self.assertTrue(validate_path_element_default(b".bla"))
  637. self.assertFalse(validate_path_element_default(b".git"))
  638. self.assertFalse(validate_path_element_default(b".giT"))
  639. self.assertFalse(validate_path_element_default(b".."))
  640. self.assertTrue(validate_path_element_default(b"git~1"))
  641. def test_ntfs(self) -> None:
  642. self.assertTrue(validate_path_element_ntfs(b"bla"))
  643. self.assertTrue(validate_path_element_ntfs(b".bla"))
  644. self.assertFalse(validate_path_element_ntfs(b".git"))
  645. self.assertFalse(validate_path_element_ntfs(b".giT"))
  646. self.assertFalse(validate_path_element_ntfs(b".."))
  647. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  648. class TestTreeFSPathConversion(TestCase):
  649. def test_tree_to_fs_path(self) -> None:
  650. tree_path = "délwíçh/foo".encode()
  651. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  652. self.assertEqual(
  653. fs_path,
  654. os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
  655. )
  656. def test_tree_to_fs_path_windows_separator(self) -> None:
  657. tree_path = b"path/with/slash"
  658. original_sep = os.sep.encode("ascii")
  659. try:
  660. # Temporarily modify os_sep_bytes to test Windows path conversion
  661. # This simulates Windows behavior on all platforms for testing
  662. import dulwich.index
  663. dulwich.index.os_sep_bytes = b"\\"
  664. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  665. # The function should join the prefix path with the converted tree path
  666. # The expected behavior is that the path separators in the tree_path are
  667. # converted to the platform-specific separator (which we've set to backslash)
  668. expected_path = os.path.join(b"/prefix/path", b"path\\with\\slash")
  669. self.assertEqual(fs_path, expected_path)
  670. finally:
  671. # Restore original value
  672. dulwich.index.os_sep_bytes = original_sep
  673. def test_fs_to_tree_path_str(self) -> None:
  674. fs_path = os.path.join(os.path.join("délwíçh", "foo"))
  675. tree_path = _fs_to_tree_path(fs_path)
  676. self.assertEqual(tree_path, "délwíçh/foo".encode())
  677. def test_fs_to_tree_path_bytes(self) -> None:
  678. fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
  679. tree_path = _fs_to_tree_path(fs_path)
  680. self.assertEqual(tree_path, "délwíçh/foo".encode())
  681. def test_fs_to_tree_path_windows_separator(self) -> None:
  682. # Test conversion of Windows paths to tree paths
  683. fs_path = b"path\\with\\backslash"
  684. original_sep = os.sep.encode("ascii")
  685. try:
  686. # Temporarily modify os_sep_bytes to test Windows path conversion
  687. import dulwich.index
  688. dulwich.index.os_sep_bytes = b"\\"
  689. tree_path = _fs_to_tree_path(fs_path)
  690. self.assertEqual(tree_path, b"path/with/backslash")
  691. finally:
  692. # Restore original value
  693. dulwich.index.os_sep_bytes = original_sep
  694. class TestIndexEntryFromPath(TestCase):
  695. def setUp(self):
  696. self.tempdir = tempfile.mkdtemp()
  697. self.addCleanup(shutil.rmtree, self.tempdir)
  698. def test_index_entry_from_path_file(self) -> None:
  699. """Test creating index entry from a regular file."""
  700. # Create a test file
  701. test_file = os.path.join(self.tempdir, "testfile")
  702. with open(test_file, "wb") as f:
  703. f.write(b"test content")
  704. # Get the index entry
  705. entry = index_entry_from_path(os.fsencode(test_file))
  706. # Verify the entry was created with the right mode
  707. self.assertIsNotNone(entry)
  708. self.assertEqual(cleanup_mode(os.stat(test_file).st_mode), entry.mode)
  709. @skipIf(not can_symlink(), "Requires symlink support")
  710. def test_index_entry_from_path_symlink(self) -> None:
  711. """Test creating index entry from a symlink."""
  712. # Create a target file
  713. target_file = os.path.join(self.tempdir, "target")
  714. with open(target_file, "wb") as f:
  715. f.write(b"target content")
  716. # Create a symlink
  717. link_file = os.path.join(self.tempdir, "symlink")
  718. os.symlink(target_file, link_file)
  719. # Get the index entry
  720. entry = index_entry_from_path(os.fsencode(link_file))
  721. # Verify the entry was created with the right mode
  722. self.assertIsNotNone(entry)
  723. self.assertEqual(cleanup_mode(os.lstat(link_file).st_mode), entry.mode)
  724. def test_index_entry_from_path_directory(self) -> None:
  725. """Test creating index entry from a directory (should return None)."""
  726. # Create a directory
  727. test_dir = os.path.join(self.tempdir, "testdir")
  728. os.mkdir(test_dir)
  729. # Get the index entry for a directory
  730. entry = index_entry_from_path(os.fsencode(test_dir))
  731. # Should return None for regular directories
  732. self.assertIsNone(entry)
  733. def test_index_entry_from_directory_regular(self) -> None:
  734. """Test index_entry_from_directory with a regular directory."""
  735. # Create a directory
  736. test_dir = os.path.join(self.tempdir, "testdir")
  737. os.mkdir(test_dir)
  738. # Get stat for the directory
  739. st = os.lstat(test_dir)
  740. # Get the index entry for a regular directory
  741. entry = index_entry_from_directory(st, os.fsencode(test_dir))
  742. # Should return None for regular directories
  743. self.assertIsNone(entry)
  744. def test_index_entry_from_directory_git_submodule(self) -> None:
  745. """Test index_entry_from_directory with a Git submodule."""
  746. # Create a git repository that will be a submodule
  747. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  748. os.mkdir(sub_repo_dir)
  749. # Create the .git directory to make it look like a git repo
  750. git_dir = os.path.join(sub_repo_dir, ".git")
  751. os.mkdir(git_dir)
  752. # Create HEAD file with a fake commit SHA
  753. head_sha = b"1234567890" * 4 # 40-char fake SHA
  754. with open(os.path.join(git_dir, "HEAD"), "wb") as f:
  755. f.write(head_sha)
  756. # Get stat for the submodule directory
  757. st = os.lstat(sub_repo_dir)
  758. # Get the index entry for a git submodule directory
  759. entry = index_entry_from_directory(st, os.fsencode(sub_repo_dir))
  760. # Since we don't have a proper git setup, this might still return None
  761. # This test just ensures the code path is executed
  762. if entry is not None:
  763. # If an entry is returned, it should have the gitlink mode
  764. self.assertEqual(entry.mode, S_IFGITLINK)
  765. def test_index_entry_from_path_with_object_store(self) -> None:
  766. """Test creating index entry with object store."""
  767. # Create a test file
  768. test_file = os.path.join(self.tempdir, "testfile")
  769. with open(test_file, "wb") as f:
  770. f.write(b"test content")
  771. # Create a memory object store
  772. object_store = MemoryObjectStore()
  773. # Get the index entry and add to object store
  774. entry = index_entry_from_path(os.fsencode(test_file), object_store)
  775. # Verify we can access the blob from the object store
  776. self.assertIsNotNone(entry)
  777. blob = object_store[entry.sha]
  778. self.assertEqual(b"test content", blob.data)
  779. def test_iter_fresh_entries(self) -> None:
  780. """Test iterating over fresh entries."""
  781. # Create some test files
  782. file1 = os.path.join(self.tempdir, "file1")
  783. with open(file1, "wb") as f:
  784. f.write(b"file1 content")
  785. file2 = os.path.join(self.tempdir, "file2")
  786. with open(file2, "wb") as f:
  787. f.write(b"file2 content")
  788. # Create a memory object store
  789. object_store = MemoryObjectStore()
  790. # Get fresh entries
  791. paths = [b"file1", b"file2", b"nonexistent"]
  792. entries = dict(
  793. iter_fresh_entries(paths, os.fsencode(self.tempdir), object_store)
  794. )
  795. # Verify both files got entries but nonexistent file is None
  796. self.assertIn(b"file1", entries)
  797. self.assertIn(b"file2", entries)
  798. self.assertIn(b"nonexistent", entries)
  799. self.assertIsNotNone(entries[b"file1"])
  800. self.assertIsNotNone(entries[b"file2"])
  801. self.assertIsNone(entries[b"nonexistent"])
  802. # Check that blobs were added to object store
  803. blob1 = object_store[entries[b"file1"].sha]
  804. self.assertEqual(b"file1 content", blob1.data)
  805. blob2 = object_store[entries[b"file2"].sha]
  806. self.assertEqual(b"file2 content", blob2.data)
  807. def test_read_submodule_head(self) -> None:
  808. """Test reading the HEAD of a submodule."""
  809. from dulwich.index import read_submodule_head
  810. from dulwich.repo import Repo
  811. # Create a test repo that will be our "submodule"
  812. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  813. os.mkdir(sub_repo_dir)
  814. submodule_repo = Repo.init(sub_repo_dir)
  815. # Create a file and commit it to establish a HEAD
  816. test_file = os.path.join(sub_repo_dir, "testfile")
  817. with open(test_file, "wb") as f:
  818. f.write(b"test content")
  819. submodule_repo.stage(["testfile"])
  820. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  821. # Test reading the HEAD
  822. head_sha = read_submodule_head(sub_repo_dir)
  823. self.assertEqual(commit_id, head_sha)
  824. # Test with bytes path
  825. head_sha_bytes = read_submodule_head(os.fsencode(sub_repo_dir))
  826. self.assertEqual(commit_id, head_sha_bytes)
  827. # Test with non-existent path
  828. non_repo_dir = os.path.join(self.tempdir, "nonrepo")
  829. os.mkdir(non_repo_dir)
  830. self.assertIsNone(read_submodule_head(non_repo_dir))
  831. # Test with path that doesn't have a .git directory
  832. not_git_dir = os.path.join(self.tempdir, "notgit")
  833. os.mkdir(not_git_dir)
  834. self.assertIsNone(read_submodule_head(not_git_dir))
  835. def test_has_directory_changed(self) -> None:
  836. """Test checking if a directory has changed."""
  837. from dulwich.index import IndexEntry, _has_directory_changed
  838. from dulwich.repo import Repo
  839. # Setup mock IndexEntry
  840. mock_entry = IndexEntry(
  841. (1230680220, 0),
  842. (1230680220, 0),
  843. 2050,
  844. 3761020,
  845. 33188,
  846. 1000,
  847. 1000,
  848. 0,
  849. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  850. 0,
  851. 0,
  852. )
  853. # Test with a regular directory (not a submodule)
  854. reg_dir = os.path.join(self.tempdir, "regular_dir")
  855. os.mkdir(reg_dir)
  856. # Should return True for regular directory
  857. self.assertTrue(_has_directory_changed(os.fsencode(reg_dir), mock_entry))
  858. # Create a git repository to test submodule scenarios
  859. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  860. os.mkdir(sub_repo_dir)
  861. submodule_repo = Repo.init(sub_repo_dir)
  862. # Create a file and commit it to establish a HEAD
  863. test_file = os.path.join(sub_repo_dir, "testfile")
  864. with open(test_file, "wb") as f:
  865. f.write(b"test content")
  866. submodule_repo.stage(["testfile"])
  867. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  868. # Create an entry with the correct commit SHA
  869. correct_entry = IndexEntry(
  870. (1230680220, 0),
  871. (1230680220, 0),
  872. 2050,
  873. 3761020,
  874. 33188,
  875. 1000,
  876. 1000,
  877. 0,
  878. commit_id,
  879. 0,
  880. 0,
  881. )
  882. # Create an entry with an incorrect commit SHA
  883. incorrect_entry = IndexEntry(
  884. (1230680220, 0),
  885. (1230680220, 0),
  886. 2050,
  887. 3761020,
  888. 33188,
  889. 1000,
  890. 1000,
  891. 0,
  892. b"0000000000000000000000000000000000000000",
  893. 0,
  894. 0,
  895. )
  896. # Should return False for submodule with correct SHA
  897. self.assertFalse(
  898. _has_directory_changed(os.fsencode(sub_repo_dir), correct_entry)
  899. )
  900. # Should return True for submodule with incorrect SHA
  901. self.assertTrue(
  902. _has_directory_changed(os.fsencode(sub_repo_dir), incorrect_entry)
  903. )
  904. def test_get_unstaged_changes(self) -> None:
  905. """Test detecting unstaged changes in a working tree."""
  906. from dulwich.index import (
  907. ConflictedIndexEntry,
  908. Index,
  909. IndexEntry,
  910. get_unstaged_changes,
  911. )
  912. # Create a test repo
  913. repo_dir = tempfile.mkdtemp()
  914. self.addCleanup(shutil.rmtree, repo_dir)
  915. # Create test index
  916. index = Index(os.path.join(repo_dir, "index"))
  917. # Create an actual hash of our test content
  918. from dulwich.objects import Blob
  919. test_blob = Blob()
  920. test_blob.data = b"initial content"
  921. # Create some test files with known contents
  922. file1_path = os.path.join(repo_dir, "file1")
  923. with open(file1_path, "wb") as f:
  924. f.write(b"initial content")
  925. file2_path = os.path.join(repo_dir, "file2")
  926. with open(file2_path, "wb") as f:
  927. f.write(b"initial content")
  928. # Add them to index
  929. entry1 = IndexEntry(
  930. (1230680220, 0),
  931. (1230680220, 0),
  932. 2050,
  933. 3761020,
  934. 33188,
  935. 1000,
  936. 1000,
  937. 0,
  938. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", # Not matching actual content
  939. 0,
  940. 0,
  941. )
  942. entry2 = IndexEntry(
  943. (1230680220, 0),
  944. (1230680220, 0),
  945. 2050,
  946. 3761020,
  947. 33188,
  948. 1000,
  949. 1000,
  950. 0,
  951. test_blob.id, # Will be content's real hash
  952. 0,
  953. 0,
  954. )
  955. # Add a file that has a conflict
  956. entry_conflict = ConflictedIndexEntry(b"conflict", {0: None, 1: None, 2: None})
  957. index._byname = {
  958. b"file1": entry1,
  959. b"file2": entry2,
  960. b"file3": IndexEntry(
  961. (1230680220, 0),
  962. (1230680220, 0),
  963. 2050,
  964. 3761020,
  965. 33188,
  966. 1000,
  967. 1000,
  968. 0,
  969. b"0000000000000000000000000000000000000000",
  970. 0,
  971. 0,
  972. ),
  973. b"conflict": entry_conflict,
  974. }
  975. # Get unstaged changes
  976. changes = list(get_unstaged_changes(index, repo_dir))
  977. # File1 should be unstaged (content doesn't match hash)
  978. # File3 doesn't exist (deleted)
  979. # Conflict is always unstaged
  980. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3"])
  981. # Create directory where there should be a file
  982. os.mkdir(os.path.join(repo_dir, "file4"))
  983. index._byname[b"file4"] = entry1
  984. # Get unstaged changes again
  985. changes = list(get_unstaged_changes(index, repo_dir))
  986. # Now file4 should also be unstaged because it's a directory instead of a file
  987. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3", b"file4"])
  988. # Create a custom blob filter function
  989. def filter_blob_callback(blob, path):
  990. # Modify blob to make it look changed
  991. blob.data = b"modified " + blob.data
  992. return blob
  993. # Get unstaged changes with blob filter
  994. changes = list(get_unstaged_changes(index, repo_dir, filter_blob_callback))
  995. # Now both file1 and file2 should be unstaged due to the filter
  996. self.assertEqual(
  997. sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
  998. )