2
0

test_index.py 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the index."""
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from io import BytesIO
  29. from dulwich.index import (
  30. Index,
  31. IndexEntry,
  32. SerializedIndexEntry,
  33. _fs_to_tree_path,
  34. _tree_to_fs_path,
  35. build_index_from_tree,
  36. cleanup_mode,
  37. commit_tree,
  38. get_unstaged_changes,
  39. index_entry_from_directory,
  40. index_entry_from_path,
  41. index_entry_from_stat,
  42. iter_fresh_entries,
  43. read_index,
  44. read_index_dict,
  45. validate_path_element_default,
  46. validate_path_element_ntfs,
  47. write_cache_time,
  48. write_index,
  49. write_index_dict,
  50. )
  51. from dulwich.object_store import MemoryObjectStore
  52. from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
  53. from dulwich.repo import Repo
  54. from . import TestCase, skipIf
  55. def can_symlink() -> bool:
  56. """Return whether running process can create symlinks."""
  57. if sys.platform != "win32":
  58. # Platforms other than Windows should allow symlinks without issues.
  59. return True
  60. test_source = tempfile.mkdtemp()
  61. test_target = test_source + "can_symlink"
  62. try:
  63. os.symlink(test_source, test_target)
  64. except (NotImplementedError, OSError):
  65. return False
  66. return True
  67. class IndexTestCase(TestCase):
  68. datadir = os.path.join(os.path.dirname(__file__), "../testdata/indexes")
  69. def get_simple_index(self, name):
  70. return Index(os.path.join(self.datadir, name))
  71. class SimpleIndexTestCase(IndexTestCase):
  72. def test_len(self) -> None:
  73. self.assertEqual(1, len(self.get_simple_index("index")))
  74. def test_iter(self) -> None:
  75. self.assertEqual([b"bla"], list(self.get_simple_index("index")))
  76. def test_iter_skip_hash(self) -> None:
  77. self.assertEqual([b"bla"], list(self.get_simple_index("index_skip_hash")))
  78. def test_iterobjects(self) -> None:
  79. self.assertEqual(
  80. [(b"bla", b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", 33188)],
  81. list(self.get_simple_index("index").iterobjects()),
  82. )
  83. def test_getitem(self) -> None:
  84. self.assertEqual(
  85. IndexEntry(
  86. (1230680220, 0),
  87. (1230680220, 0),
  88. 2050,
  89. 3761020,
  90. 33188,
  91. 1000,
  92. 1000,
  93. 0,
  94. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  95. 0,
  96. 0,
  97. ),
  98. self.get_simple_index("index")[b"bla"],
  99. )
  100. def test_empty(self) -> None:
  101. i = self.get_simple_index("notanindex")
  102. self.assertEqual(0, len(i))
  103. self.assertFalse(os.path.exists(i._filename))
  104. def test_against_empty_tree(self) -> None:
  105. i = self.get_simple_index("index")
  106. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  107. self.assertEqual(1, len(changes))
  108. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  109. self.assertEqual(b"bla", newname)
  110. self.assertEqual(b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", newsha)
  111. def test_index_pathlib(self) -> None:
  112. import tempfile
  113. from pathlib import Path
  114. # Create a temporary index file
  115. with tempfile.NamedTemporaryFile(suffix=".index", delete=False) as f:
  116. temp_path = f.name
  117. try:
  118. # Test creating Index with pathlib.Path
  119. path_obj = Path(temp_path)
  120. index = Index(path_obj, read=False)
  121. self.assertEqual(str(path_obj), index.path)
  122. # Add an entry and write
  123. index[b"test"] = IndexEntry(
  124. ctime=(0, 0),
  125. mtime=(0, 0),
  126. dev=0,
  127. ino=0,
  128. mode=33188,
  129. uid=0,
  130. gid=0,
  131. size=0,
  132. sha=b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  133. )
  134. index.write()
  135. # Read it back with pathlib.Path
  136. index2 = Index(path_obj)
  137. self.assertIn(b"test", index2)
  138. finally:
  139. # Clean up
  140. os.unlink(temp_path)
  141. class SimpleIndexWriterTestCase(IndexTestCase):
  142. def setUp(self) -> None:
  143. IndexTestCase.setUp(self)
  144. self.tempdir = tempfile.mkdtemp()
  145. def tearDown(self) -> None:
  146. IndexTestCase.tearDown(self)
  147. shutil.rmtree(self.tempdir)
  148. def test_simple_write(self) -> None:
  149. entries = [
  150. (
  151. SerializedIndexEntry(
  152. b"barbla",
  153. (1230680220, 0),
  154. (1230680220, 0),
  155. 2050,
  156. 3761020,
  157. 33188,
  158. 1000,
  159. 1000,
  160. 0,
  161. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  162. 0,
  163. 0,
  164. )
  165. )
  166. ]
  167. filename = os.path.join(self.tempdir, "test-simple-write-index")
  168. with open(filename, "wb+") as x:
  169. write_index(x, entries)
  170. with open(filename, "rb") as x:
  171. self.assertEqual(entries, list(read_index(x)))
  172. class ReadIndexDictTests(IndexTestCase):
  173. def setUp(self) -> None:
  174. IndexTestCase.setUp(self)
  175. self.tempdir = tempfile.mkdtemp()
  176. def tearDown(self) -> None:
  177. IndexTestCase.tearDown(self)
  178. shutil.rmtree(self.tempdir)
  179. def test_simple_write(self) -> None:
  180. entries = {
  181. b"barbla": IndexEntry(
  182. (1230680220, 0),
  183. (1230680220, 0),
  184. 2050,
  185. 3761020,
  186. 33188,
  187. 1000,
  188. 1000,
  189. 0,
  190. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  191. 0,
  192. 0,
  193. )
  194. }
  195. filename = os.path.join(self.tempdir, "test-simple-write-index")
  196. with open(filename, "wb+") as x:
  197. write_index_dict(x, entries)
  198. with open(filename, "rb") as x:
  199. self.assertEqual(entries, read_index_dict(x))
  200. class CommitTreeTests(TestCase):
  201. def setUp(self) -> None:
  202. super().setUp()
  203. self.store = MemoryObjectStore()
  204. def test_single_blob(self) -> None:
  205. blob = Blob()
  206. blob.data = b"foo"
  207. self.store.add_object(blob)
  208. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  209. rootid = commit_tree(self.store, blobs)
  210. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  211. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  212. self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
  213. def test_nested(self) -> None:
  214. blob = Blob()
  215. blob.data = b"foo"
  216. self.store.add_object(blob)
  217. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  218. rootid = commit_tree(self.store, blobs)
  219. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  220. dirid = self.store[rootid][b"bla"][1]
  221. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  222. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  223. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  224. self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
  225. class CleanupModeTests(TestCase):
  226. def assertModeEqual(self, expected, got) -> None:
  227. self.assertEqual(expected, got, f"{expected:o} != {got:o}")
  228. def test_file(self) -> None:
  229. self.assertModeEqual(0o100644, cleanup_mode(0o100000))
  230. def test_executable(self) -> None:
  231. self.assertModeEqual(0o100755, cleanup_mode(0o100711))
  232. self.assertModeEqual(0o100755, cleanup_mode(0o100700))
  233. def test_symlink(self) -> None:
  234. self.assertModeEqual(0o120000, cleanup_mode(0o120711))
  235. def test_dir(self) -> None:
  236. self.assertModeEqual(0o040000, cleanup_mode(0o40531))
  237. def test_submodule(self) -> None:
  238. self.assertModeEqual(0o160000, cleanup_mode(0o160744))
  239. class WriteCacheTimeTests(TestCase):
  240. def test_write_string(self) -> None:
  241. f = BytesIO()
  242. self.assertRaises(TypeError, write_cache_time, f, "foo")
  243. def test_write_int(self) -> None:
  244. f = BytesIO()
  245. write_cache_time(f, 434343)
  246. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  247. def test_write_tuple(self) -> None:
  248. f = BytesIO()
  249. write_cache_time(f, (434343, 21))
  250. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  251. def test_write_float(self) -> None:
  252. f = BytesIO()
  253. write_cache_time(f, 434343.000000021)
  254. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  255. class IndexEntryFromStatTests(TestCase):
  256. def test_simple(self) -> None:
  257. st = os.stat_result(
  258. (
  259. 16877,
  260. 131078,
  261. 64769,
  262. 154,
  263. 1000,
  264. 1000,
  265. 12288,
  266. 1323629595,
  267. 1324180496,
  268. 1324180496,
  269. )
  270. )
  271. entry = index_entry_from_stat(st, b"22" * 20)
  272. self.assertEqual(
  273. entry,
  274. IndexEntry(
  275. 1324180496,
  276. 1324180496,
  277. 64769,
  278. 131078,
  279. 16384,
  280. 1000,
  281. 1000,
  282. 12288,
  283. b"2222222222222222222222222222222222222222",
  284. 0,
  285. 0,
  286. ),
  287. )
  288. def test_override_mode(self) -> None:
  289. st = os.stat_result(
  290. (
  291. stat.S_IFREG + 0o644,
  292. 131078,
  293. 64769,
  294. 154,
  295. 1000,
  296. 1000,
  297. 12288,
  298. 1323629595,
  299. 1324180496,
  300. 1324180496,
  301. )
  302. )
  303. entry = index_entry_from_stat(st, b"22" * 20, mode=stat.S_IFREG + 0o755)
  304. self.assertEqual(
  305. entry,
  306. IndexEntry(
  307. 1324180496,
  308. 1324180496,
  309. 64769,
  310. 131078,
  311. 33261,
  312. 1000,
  313. 1000,
  314. 12288,
  315. b"2222222222222222222222222222222222222222",
  316. 0,
  317. 0,
  318. ),
  319. )
  320. class BuildIndexTests(TestCase):
  321. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha) -> None:
  322. self.assertEqual(index_entry.mode, mode) # mode
  323. self.assertEqual(index_entry.size, filesize) # filesize
  324. self.assertEqual(index_entry.sha, sha) # sha
  325. def assertFileContents(self, path, contents, symlink=False) -> None:
  326. if symlink:
  327. self.assertEqual(os.readlink(path), contents)
  328. else:
  329. with open(path, "rb") as f:
  330. self.assertEqual(f.read(), contents)
  331. def test_empty(self) -> None:
  332. repo_dir = tempfile.mkdtemp()
  333. self.addCleanup(shutil.rmtree, repo_dir)
  334. with Repo.init(repo_dir) as repo:
  335. tree = Tree()
  336. repo.object_store.add_object(tree)
  337. build_index_from_tree(
  338. repo.path, repo.index_path(), repo.object_store, tree.id
  339. )
  340. # Verify index entries
  341. index = repo.open_index()
  342. self.assertEqual(len(index), 0)
  343. # Verify no files
  344. self.assertEqual([".git"], os.listdir(repo.path))
  345. def test_git_dir(self) -> None:
  346. repo_dir = tempfile.mkdtemp()
  347. self.addCleanup(shutil.rmtree, repo_dir)
  348. with Repo.init(repo_dir) as repo:
  349. # Populate repo
  350. filea = Blob.from_string(b"file a")
  351. filee = Blob.from_string(b"d")
  352. tree = Tree()
  353. tree[b".git/a"] = (stat.S_IFREG | 0o644, filea.id)
  354. tree[b"c/e"] = (stat.S_IFREG | 0o644, filee.id)
  355. repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]])
  356. build_index_from_tree(
  357. repo.path, repo.index_path(), repo.object_store, tree.id
  358. )
  359. # Verify index entries
  360. index = repo.open_index()
  361. self.assertEqual(len(index), 1)
  362. # filea
  363. apath = os.path.join(repo.path, ".git", "a")
  364. self.assertFalse(os.path.exists(apath))
  365. # filee
  366. epath = os.path.join(repo.path, "c", "e")
  367. self.assertTrue(os.path.exists(epath))
  368. self.assertReasonableIndexEntry(
  369. index[b"c/e"], stat.S_IFREG | 0o644, 1, filee.id
  370. )
  371. self.assertFileContents(epath, b"d")
  372. def test_nonempty(self) -> None:
  373. repo_dir = tempfile.mkdtemp()
  374. self.addCleanup(shutil.rmtree, repo_dir)
  375. with Repo.init(repo_dir) as repo:
  376. # Populate repo
  377. filea = Blob.from_string(b"file a")
  378. fileb = Blob.from_string(b"file b")
  379. filed = Blob.from_string(b"file d")
  380. tree = Tree()
  381. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  382. tree[b"b"] = (stat.S_IFREG | 0o644, fileb.id)
  383. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  384. repo.object_store.add_objects(
  385. [(o, None) for o in [filea, fileb, filed, tree]]
  386. )
  387. build_index_from_tree(
  388. repo.path, repo.index_path(), repo.object_store, tree.id
  389. )
  390. # Verify index entries
  391. index = repo.open_index()
  392. self.assertEqual(len(index), 3)
  393. # filea
  394. apath = os.path.join(repo.path, "a")
  395. self.assertTrue(os.path.exists(apath))
  396. self.assertReasonableIndexEntry(
  397. index[b"a"], stat.S_IFREG | 0o644, 6, filea.id
  398. )
  399. self.assertFileContents(apath, b"file a")
  400. # fileb
  401. bpath = os.path.join(repo.path, "b")
  402. self.assertTrue(os.path.exists(bpath))
  403. self.assertReasonableIndexEntry(
  404. index[b"b"], stat.S_IFREG | 0o644, 6, fileb.id
  405. )
  406. self.assertFileContents(bpath, b"file b")
  407. # filed
  408. dpath = os.path.join(repo.path, "c", "d")
  409. self.assertTrue(os.path.exists(dpath))
  410. self.assertReasonableIndexEntry(
  411. index[b"c/d"], stat.S_IFREG | 0o644, 6, filed.id
  412. )
  413. self.assertFileContents(dpath, b"file d")
  414. # Verify no extra files
  415. self.assertEqual([".git", "a", "b", "c"], sorted(os.listdir(repo.path)))
  416. self.assertEqual(["d"], sorted(os.listdir(os.path.join(repo.path, "c"))))
  417. @skipIf(not getattr(os, "sync", None), "Requires sync support")
  418. def test_norewrite(self) -> None:
  419. repo_dir = tempfile.mkdtemp()
  420. self.addCleanup(shutil.rmtree, repo_dir)
  421. with Repo.init(repo_dir) as repo:
  422. # Populate repo
  423. filea = Blob.from_string(b"file a")
  424. filea_path = os.path.join(repo_dir, "a")
  425. tree = Tree()
  426. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  427. repo.object_store.add_objects([(o, None) for o in [filea, tree]])
  428. # First Write
  429. build_index_from_tree(
  430. repo.path, repo.index_path(), repo.object_store, tree.id
  431. )
  432. # Use sync as metadata can be cached on some FS
  433. os.sync()
  434. mtime = os.stat(filea_path).st_mtime
  435. # Test Rewrite
  436. build_index_from_tree(
  437. repo.path, repo.index_path(), repo.object_store, tree.id
  438. )
  439. os.sync()
  440. self.assertEqual(mtime, os.stat(filea_path).st_mtime)
  441. # Modify content
  442. with open(filea_path, "wb") as fh:
  443. fh.write(b"test a")
  444. os.sync()
  445. mtime = os.stat(filea_path).st_mtime
  446. # Test rewrite
  447. build_index_from_tree(
  448. repo.path, repo.index_path(), repo.object_store, tree.id
  449. )
  450. os.sync()
  451. with open(filea_path, "rb") as fh:
  452. self.assertEqual(b"file a", fh.read())
  453. @skipIf(not can_symlink(), "Requires symlink support")
  454. def test_symlink(self) -> None:
  455. repo_dir = tempfile.mkdtemp()
  456. self.addCleanup(shutil.rmtree, repo_dir)
  457. with Repo.init(repo_dir) as repo:
  458. # Populate repo
  459. filed = Blob.from_string(b"file d")
  460. filee = Blob.from_string(b"d")
  461. tree = Tree()
  462. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  463. tree[b"c/e"] = (stat.S_IFLNK, filee.id) # symlink
  464. repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]])
  465. build_index_from_tree(
  466. repo.path, repo.index_path(), repo.object_store, tree.id
  467. )
  468. # Verify index entries
  469. index = repo.open_index()
  470. # symlink to d
  471. epath = os.path.join(repo.path, "c", "e")
  472. self.assertTrue(os.path.exists(epath))
  473. self.assertReasonableIndexEntry(
  474. index[b"c/e"],
  475. stat.S_IFLNK,
  476. 0 if sys.platform == "win32" else 1,
  477. filee.id,
  478. )
  479. self.assertFileContents(epath, "d", symlink=True)
  480. def test_no_decode_encode(self) -> None:
  481. repo_dir = tempfile.mkdtemp()
  482. repo_dir_bytes = os.fsencode(repo_dir)
  483. self.addCleanup(shutil.rmtree, repo_dir)
  484. with Repo.init(repo_dir) as repo:
  485. # Populate repo
  486. file = Blob.from_string(b"foo")
  487. tree = Tree()
  488. latin1_name = "À".encode("latin1")
  489. try:
  490. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  491. except UnicodeDecodeError:
  492. self.skipTest("can not decode as latin1")
  493. utf8_name = "À".encode()
  494. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  495. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  496. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  497. repo.object_store.add_objects([(o, None) for o in [file, tree]])
  498. try:
  499. build_index_from_tree(
  500. repo.path, repo.index_path(), repo.object_store, tree.id
  501. )
  502. except OSError as e:
  503. if e.errno == 92 and sys.platform == "darwin":
  504. # Our filename isn't supported by the platform :(
  505. self.skipTest(f"can not write filename {e.filename!r}")
  506. else:
  507. raise
  508. except UnicodeDecodeError:
  509. # This happens e.g. with python3.6 on Windows.
  510. # It implicitly decodes using utf8, which doesn't work.
  511. self.skipTest("can not implicitly convert as utf8")
  512. # Verify index entries
  513. index = repo.open_index()
  514. self.assertIn(latin1_name, index)
  515. self.assertIn(utf8_name, index)
  516. self.assertTrue(os.path.exists(latin1_path))
  517. self.assertTrue(os.path.exists(utf8_path))
  518. def test_git_submodule(self) -> None:
  519. repo_dir = tempfile.mkdtemp()
  520. self.addCleanup(shutil.rmtree, repo_dir)
  521. with Repo.init(repo_dir) as repo:
  522. filea = Blob.from_string(b"file alalala")
  523. subtree = Tree()
  524. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  525. c = Commit()
  526. c.tree = subtree.id
  527. c.committer = c.author = b"Somebody <somebody@example.com>"
  528. c.commit_time = c.author_time = 42342
  529. c.commit_timezone = c.author_timezone = 0
  530. c.parents = []
  531. c.message = b"Subcommit"
  532. tree = Tree()
  533. tree[b"c"] = (S_IFGITLINK, c.id)
  534. repo.object_store.add_objects([(o, None) for o in [tree]])
  535. build_index_from_tree(
  536. repo.path, repo.index_path(), repo.object_store, tree.id
  537. )
  538. # Verify index entries
  539. index = repo.open_index()
  540. self.assertEqual(len(index), 1)
  541. # filea
  542. apath = os.path.join(repo.path, "c/a")
  543. self.assertFalse(os.path.exists(apath))
  544. # dir c
  545. cpath = os.path.join(repo.path, "c")
  546. self.assertTrue(os.path.isdir(cpath))
  547. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  548. self.assertEqual(index[b"c"].sha, c.id) # sha
  549. def test_git_submodule_exists(self) -> None:
  550. repo_dir = tempfile.mkdtemp()
  551. self.addCleanup(shutil.rmtree, repo_dir)
  552. with Repo.init(repo_dir) as repo:
  553. filea = Blob.from_string(b"file alalala")
  554. subtree = Tree()
  555. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  556. c = Commit()
  557. c.tree = subtree.id
  558. c.committer = c.author = b"Somebody <somebody@example.com>"
  559. c.commit_time = c.author_time = 42342
  560. c.commit_timezone = c.author_timezone = 0
  561. c.parents = []
  562. c.message = b"Subcommit"
  563. tree = Tree()
  564. tree[b"c"] = (S_IFGITLINK, c.id)
  565. os.mkdir(os.path.join(repo_dir, "c"))
  566. repo.object_store.add_objects([(o, None) for o in [tree]])
  567. build_index_from_tree(
  568. repo.path, repo.index_path(), repo.object_store, tree.id
  569. )
  570. # Verify index entries
  571. index = repo.open_index()
  572. self.assertEqual(len(index), 1)
  573. # filea
  574. apath = os.path.join(repo.path, "c/a")
  575. self.assertFalse(os.path.exists(apath))
  576. # dir c
  577. cpath = os.path.join(repo.path, "c")
  578. self.assertTrue(os.path.isdir(cpath))
  579. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  580. self.assertEqual(index[b"c"].sha, c.id) # sha
  581. def test_with_line_ending_normalization(self) -> None:
  582. """Test that build_index_from_tree applies line-ending normalization."""
  583. repo_dir = tempfile.mkdtemp()
  584. self.addCleanup(shutil.rmtree, repo_dir)
  585. from dulwich.line_ending import BlobNormalizer
  586. with Repo.init(repo_dir) as repo:
  587. # Set up autocrlf config
  588. config = repo.get_config()
  589. config.set((b"core",), b"autocrlf", b"true")
  590. config.write_to_path()
  591. # Create blob with LF line endings
  592. content_lf = b"line1\nline2\nline3\n"
  593. blob = Blob.from_string(content_lf)
  594. tree = Tree()
  595. tree[b"test.txt"] = (stat.S_IFREG | 0o644, blob.id)
  596. repo.object_store.add_objects([(blob, None), (tree, None)])
  597. # Create blob normalizer
  598. blob_normalizer = BlobNormalizer(config, {})
  599. # Build index with normalization
  600. build_index_from_tree(
  601. repo.path,
  602. repo.index_path(),
  603. repo.object_store,
  604. tree.id,
  605. blob_normalizer=blob_normalizer,
  606. )
  607. # On Windows with autocrlf=true, file should have CRLF line endings
  608. test_file = os.path.join(repo.path, "test.txt")
  609. with open(test_file, "rb") as f:
  610. content = f.read()
  611. # autocrlf=true means LF -> CRLF on checkout (on all platforms for testing)
  612. expected_content = b"line1\r\nline2\r\nline3\r\n"
  613. self.assertEqual(content, expected_content)
  614. class GetUnstagedChangesTests(TestCase):
  615. def test_get_unstaged_changes(self) -> None:
  616. """Unit test for get_unstaged_changes."""
  617. repo_dir = tempfile.mkdtemp()
  618. self.addCleanup(shutil.rmtree, repo_dir)
  619. with Repo.init(repo_dir) as repo:
  620. # Commit a dummy file then modify it
  621. foo1_fullpath = os.path.join(repo_dir, "foo1")
  622. with open(foo1_fullpath, "wb") as f:
  623. f.write(b"origstuff")
  624. foo2_fullpath = os.path.join(repo_dir, "foo2")
  625. with open(foo2_fullpath, "wb") as f:
  626. f.write(b"origstuff")
  627. repo.stage(["foo1", "foo2"])
  628. repo.do_commit(
  629. b"test status",
  630. author=b"author <email>",
  631. committer=b"committer <email>",
  632. )
  633. with open(foo1_fullpath, "wb") as f:
  634. f.write(b"newstuff")
  635. # modify access and modify time of path
  636. os.utime(foo1_fullpath, (0, 0))
  637. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  638. self.assertEqual(list(changes), [b"foo1"])
  639. def test_get_unstaged_deleted_changes(self) -> None:
  640. """Unit test for get_unstaged_changes."""
  641. repo_dir = tempfile.mkdtemp()
  642. self.addCleanup(shutil.rmtree, repo_dir)
  643. with Repo.init(repo_dir) as repo:
  644. # Commit a dummy file then remove it
  645. foo1_fullpath = os.path.join(repo_dir, "foo1")
  646. with open(foo1_fullpath, "wb") as f:
  647. f.write(b"origstuff")
  648. repo.stage(["foo1"])
  649. repo.do_commit(
  650. b"test status",
  651. author=b"author <email>",
  652. committer=b"committer <email>",
  653. )
  654. os.unlink(foo1_fullpath)
  655. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  656. self.assertEqual(list(changes), [b"foo1"])
  657. def test_get_unstaged_changes_removed_replaced_by_directory(self) -> None:
  658. """Unit test for get_unstaged_changes."""
  659. repo_dir = tempfile.mkdtemp()
  660. self.addCleanup(shutil.rmtree, repo_dir)
  661. with Repo.init(repo_dir) as repo:
  662. # Commit a dummy file then modify it
  663. foo1_fullpath = os.path.join(repo_dir, "foo1")
  664. with open(foo1_fullpath, "wb") as f:
  665. f.write(b"origstuff")
  666. repo.stage(["foo1"])
  667. repo.do_commit(
  668. b"test status",
  669. author=b"author <email>",
  670. committer=b"committer <email>",
  671. )
  672. os.remove(foo1_fullpath)
  673. os.mkdir(foo1_fullpath)
  674. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  675. self.assertEqual(list(changes), [b"foo1"])
  676. @skipIf(not can_symlink(), "Requires symlink support")
  677. def test_get_unstaged_changes_removed_replaced_by_link(self) -> None:
  678. """Unit test for get_unstaged_changes."""
  679. repo_dir = tempfile.mkdtemp()
  680. self.addCleanup(shutil.rmtree, repo_dir)
  681. with Repo.init(repo_dir) as repo:
  682. # Commit a dummy file then modify it
  683. foo1_fullpath = os.path.join(repo_dir, "foo1")
  684. with open(foo1_fullpath, "wb") as f:
  685. f.write(b"origstuff")
  686. repo.stage(["foo1"])
  687. repo.do_commit(
  688. b"test status",
  689. author=b"author <email>",
  690. committer=b"committer <email>",
  691. )
  692. os.remove(foo1_fullpath)
  693. os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
  694. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  695. self.assertEqual(list(changes), [b"foo1"])
  696. class TestValidatePathElement(TestCase):
  697. def test_default(self) -> None:
  698. self.assertTrue(validate_path_element_default(b"bla"))
  699. self.assertTrue(validate_path_element_default(b".bla"))
  700. self.assertFalse(validate_path_element_default(b".git"))
  701. self.assertFalse(validate_path_element_default(b".giT"))
  702. self.assertFalse(validate_path_element_default(b".."))
  703. self.assertTrue(validate_path_element_default(b"git~1"))
  704. def test_ntfs(self) -> None:
  705. self.assertTrue(validate_path_element_ntfs(b"bla"))
  706. self.assertTrue(validate_path_element_ntfs(b".bla"))
  707. self.assertFalse(validate_path_element_ntfs(b".git"))
  708. self.assertFalse(validate_path_element_ntfs(b".giT"))
  709. self.assertFalse(validate_path_element_ntfs(b".."))
  710. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  711. class TestTreeFSPathConversion(TestCase):
  712. def test_tree_to_fs_path(self) -> None:
  713. tree_path = "délwíçh/foo".encode()
  714. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  715. self.assertEqual(
  716. fs_path,
  717. os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
  718. )
  719. def test_tree_to_fs_path_windows_separator(self) -> None:
  720. tree_path = b"path/with/slash"
  721. original_sep = os.sep.encode("ascii")
  722. try:
  723. # Temporarily modify os_sep_bytes to test Windows path conversion
  724. # This simulates Windows behavior on all platforms for testing
  725. import dulwich.index
  726. dulwich.index.os_sep_bytes = b"\\"
  727. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  728. # The function should join the prefix path with the converted tree path
  729. # The expected behavior is that the path separators in the tree_path are
  730. # converted to the platform-specific separator (which we've set to backslash)
  731. expected_path = os.path.join(b"/prefix/path", b"path\\with\\slash")
  732. self.assertEqual(fs_path, expected_path)
  733. finally:
  734. # Restore original value
  735. dulwich.index.os_sep_bytes = original_sep
  736. def test_fs_to_tree_path_str(self) -> None:
  737. fs_path = os.path.join(os.path.join("délwíçh", "foo"))
  738. tree_path = _fs_to_tree_path(fs_path)
  739. self.assertEqual(tree_path, "délwíçh/foo".encode())
  740. def test_fs_to_tree_path_bytes(self) -> None:
  741. fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
  742. tree_path = _fs_to_tree_path(fs_path)
  743. self.assertEqual(tree_path, "délwíçh/foo".encode())
  744. def test_fs_to_tree_path_windows_separator(self) -> None:
  745. # Test conversion of Windows paths to tree paths
  746. fs_path = b"path\\with\\backslash"
  747. original_sep = os.sep.encode("ascii")
  748. try:
  749. # Temporarily modify os_sep_bytes to test Windows path conversion
  750. import dulwich.index
  751. dulwich.index.os_sep_bytes = b"\\"
  752. tree_path = _fs_to_tree_path(fs_path)
  753. self.assertEqual(tree_path, b"path/with/backslash")
  754. finally:
  755. # Restore original value
  756. dulwich.index.os_sep_bytes = original_sep
  757. class TestIndexEntryFromPath(TestCase):
  758. def setUp(self):
  759. self.tempdir = tempfile.mkdtemp()
  760. self.addCleanup(shutil.rmtree, self.tempdir)
  761. def test_index_entry_from_path_file(self) -> None:
  762. """Test creating index entry from a regular file."""
  763. # Create a test file
  764. test_file = os.path.join(self.tempdir, "testfile")
  765. with open(test_file, "wb") as f:
  766. f.write(b"test content")
  767. # Get the index entry
  768. entry = index_entry_from_path(os.fsencode(test_file))
  769. # Verify the entry was created with the right mode
  770. self.assertIsNotNone(entry)
  771. self.assertEqual(cleanup_mode(os.stat(test_file).st_mode), entry.mode)
  772. @skipIf(not can_symlink(), "Requires symlink support")
  773. def test_index_entry_from_path_symlink(self) -> None:
  774. """Test creating index entry from a symlink."""
  775. # Create a target file
  776. target_file = os.path.join(self.tempdir, "target")
  777. with open(target_file, "wb") as f:
  778. f.write(b"target content")
  779. # Create a symlink
  780. link_file = os.path.join(self.tempdir, "symlink")
  781. os.symlink(target_file, link_file)
  782. # Get the index entry
  783. entry = index_entry_from_path(os.fsencode(link_file))
  784. # Verify the entry was created with the right mode
  785. self.assertIsNotNone(entry)
  786. self.assertEqual(cleanup_mode(os.lstat(link_file).st_mode), entry.mode)
  787. def test_index_entry_from_path_directory(self) -> None:
  788. """Test creating index entry from a directory (should return None)."""
  789. # Create a directory
  790. test_dir = os.path.join(self.tempdir, "testdir")
  791. os.mkdir(test_dir)
  792. # Get the index entry for a directory
  793. entry = index_entry_from_path(os.fsencode(test_dir))
  794. # Should return None for regular directories
  795. self.assertIsNone(entry)
  796. def test_index_entry_from_directory_regular(self) -> None:
  797. """Test index_entry_from_directory with a regular directory."""
  798. # Create a directory
  799. test_dir = os.path.join(self.tempdir, "testdir")
  800. os.mkdir(test_dir)
  801. # Get stat for the directory
  802. st = os.lstat(test_dir)
  803. # Get the index entry for a regular directory
  804. entry = index_entry_from_directory(st, os.fsencode(test_dir))
  805. # Should return None for regular directories
  806. self.assertIsNone(entry)
  807. def test_index_entry_from_directory_git_submodule(self) -> None:
  808. """Test index_entry_from_directory with a Git submodule."""
  809. # Create a git repository that will be a submodule
  810. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  811. os.mkdir(sub_repo_dir)
  812. # Create the .git directory to make it look like a git repo
  813. git_dir = os.path.join(sub_repo_dir, ".git")
  814. os.mkdir(git_dir)
  815. # Create HEAD file with a fake commit SHA
  816. head_sha = b"1234567890" * 4 # 40-char fake SHA
  817. with open(os.path.join(git_dir, "HEAD"), "wb") as f:
  818. f.write(head_sha)
  819. # Get stat for the submodule directory
  820. st = os.lstat(sub_repo_dir)
  821. # Get the index entry for a git submodule directory
  822. entry = index_entry_from_directory(st, os.fsencode(sub_repo_dir))
  823. # Since we don't have a proper git setup, this might still return None
  824. # This test just ensures the code path is executed
  825. if entry is not None:
  826. # If an entry is returned, it should have the gitlink mode
  827. self.assertEqual(entry.mode, S_IFGITLINK)
  828. def test_index_entry_from_path_with_object_store(self) -> None:
  829. """Test creating index entry with object store."""
  830. # Create a test file
  831. test_file = os.path.join(self.tempdir, "testfile")
  832. with open(test_file, "wb") as f:
  833. f.write(b"test content")
  834. # Create a memory object store
  835. object_store = MemoryObjectStore()
  836. # Get the index entry and add to object store
  837. entry = index_entry_from_path(os.fsencode(test_file), object_store)
  838. # Verify we can access the blob from the object store
  839. self.assertIsNotNone(entry)
  840. blob = object_store[entry.sha]
  841. self.assertEqual(b"test content", blob.data)
  842. def test_iter_fresh_entries(self) -> None:
  843. """Test iterating over fresh entries."""
  844. # Create some test files
  845. file1 = os.path.join(self.tempdir, "file1")
  846. with open(file1, "wb") as f:
  847. f.write(b"file1 content")
  848. file2 = os.path.join(self.tempdir, "file2")
  849. with open(file2, "wb") as f:
  850. f.write(b"file2 content")
  851. # Create a memory object store
  852. object_store = MemoryObjectStore()
  853. # Get fresh entries
  854. paths = [b"file1", b"file2", b"nonexistent"]
  855. entries = dict(
  856. iter_fresh_entries(paths, os.fsencode(self.tempdir), object_store)
  857. )
  858. # Verify both files got entries but nonexistent file is None
  859. self.assertIn(b"file1", entries)
  860. self.assertIn(b"file2", entries)
  861. self.assertIn(b"nonexistent", entries)
  862. self.assertIsNotNone(entries[b"file1"])
  863. self.assertIsNotNone(entries[b"file2"])
  864. self.assertIsNone(entries[b"nonexistent"])
  865. # Check that blobs were added to object store
  866. blob1 = object_store[entries[b"file1"].sha]
  867. self.assertEqual(b"file1 content", blob1.data)
  868. blob2 = object_store[entries[b"file2"].sha]
  869. self.assertEqual(b"file2 content", blob2.data)
  870. def test_read_submodule_head(self) -> None:
  871. """Test reading the HEAD of a submodule."""
  872. from dulwich.index import read_submodule_head
  873. from dulwich.repo import Repo
  874. # Create a test repo that will be our "submodule"
  875. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  876. os.mkdir(sub_repo_dir)
  877. submodule_repo = Repo.init(sub_repo_dir)
  878. # Create a file and commit it to establish a HEAD
  879. test_file = os.path.join(sub_repo_dir, "testfile")
  880. with open(test_file, "wb") as f:
  881. f.write(b"test content")
  882. submodule_repo.stage(["testfile"])
  883. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  884. # Test reading the HEAD
  885. head_sha = read_submodule_head(sub_repo_dir)
  886. self.assertEqual(commit_id, head_sha)
  887. # Test with bytes path
  888. head_sha_bytes = read_submodule_head(os.fsencode(sub_repo_dir))
  889. self.assertEqual(commit_id, head_sha_bytes)
  890. # Test with non-existent path
  891. non_repo_dir = os.path.join(self.tempdir, "nonrepo")
  892. os.mkdir(non_repo_dir)
  893. self.assertIsNone(read_submodule_head(non_repo_dir))
  894. # Test with path that doesn't have a .git directory
  895. not_git_dir = os.path.join(self.tempdir, "notgit")
  896. os.mkdir(not_git_dir)
  897. self.assertIsNone(read_submodule_head(not_git_dir))
  898. def test_has_directory_changed(self) -> None:
  899. """Test checking if a directory has changed."""
  900. from dulwich.index import IndexEntry, _has_directory_changed
  901. from dulwich.repo import Repo
  902. # Setup mock IndexEntry
  903. mock_entry = IndexEntry(
  904. (1230680220, 0),
  905. (1230680220, 0),
  906. 2050,
  907. 3761020,
  908. 33188,
  909. 1000,
  910. 1000,
  911. 0,
  912. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  913. 0,
  914. 0,
  915. )
  916. # Test with a regular directory (not a submodule)
  917. reg_dir = os.path.join(self.tempdir, "regular_dir")
  918. os.mkdir(reg_dir)
  919. # Should return True for regular directory
  920. self.assertTrue(_has_directory_changed(os.fsencode(reg_dir), mock_entry))
  921. # Create a git repository to test submodule scenarios
  922. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  923. os.mkdir(sub_repo_dir)
  924. submodule_repo = Repo.init(sub_repo_dir)
  925. # Create a file and commit it to establish a HEAD
  926. test_file = os.path.join(sub_repo_dir, "testfile")
  927. with open(test_file, "wb") as f:
  928. f.write(b"test content")
  929. submodule_repo.stage(["testfile"])
  930. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  931. # Create an entry with the correct commit SHA
  932. correct_entry = IndexEntry(
  933. (1230680220, 0),
  934. (1230680220, 0),
  935. 2050,
  936. 3761020,
  937. 33188,
  938. 1000,
  939. 1000,
  940. 0,
  941. commit_id,
  942. 0,
  943. 0,
  944. )
  945. # Create an entry with an incorrect commit SHA
  946. incorrect_entry = IndexEntry(
  947. (1230680220, 0),
  948. (1230680220, 0),
  949. 2050,
  950. 3761020,
  951. 33188,
  952. 1000,
  953. 1000,
  954. 0,
  955. b"0000000000000000000000000000000000000000",
  956. 0,
  957. 0,
  958. )
  959. # Should return False for submodule with correct SHA
  960. self.assertFalse(
  961. _has_directory_changed(os.fsencode(sub_repo_dir), correct_entry)
  962. )
  963. # Should return True for submodule with incorrect SHA
  964. self.assertTrue(
  965. _has_directory_changed(os.fsencode(sub_repo_dir), incorrect_entry)
  966. )
  967. def test_get_unstaged_changes(self) -> None:
  968. """Test detecting unstaged changes in a working tree."""
  969. from dulwich.index import (
  970. ConflictedIndexEntry,
  971. Index,
  972. IndexEntry,
  973. get_unstaged_changes,
  974. )
  975. # Create a test repo
  976. repo_dir = tempfile.mkdtemp()
  977. self.addCleanup(shutil.rmtree, repo_dir)
  978. # Create test index
  979. index = Index(os.path.join(repo_dir, "index"))
  980. # Create an actual hash of our test content
  981. from dulwich.objects import Blob
  982. test_blob = Blob()
  983. test_blob.data = b"initial content"
  984. # Create some test files with known contents
  985. file1_path = os.path.join(repo_dir, "file1")
  986. with open(file1_path, "wb") as f:
  987. f.write(b"initial content")
  988. file2_path = os.path.join(repo_dir, "file2")
  989. with open(file2_path, "wb") as f:
  990. f.write(b"initial content")
  991. # Add them to index
  992. entry1 = IndexEntry(
  993. (1230680220, 0),
  994. (1230680220, 0),
  995. 2050,
  996. 3761020,
  997. 33188,
  998. 1000,
  999. 1000,
  1000. 0,
  1001. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", # Not matching actual content
  1002. 0,
  1003. 0,
  1004. )
  1005. entry2 = IndexEntry(
  1006. (1230680220, 0),
  1007. (1230680220, 0),
  1008. 2050,
  1009. 3761020,
  1010. 33188,
  1011. 1000,
  1012. 1000,
  1013. 0,
  1014. test_blob.id, # Will be content's real hash
  1015. 0,
  1016. 0,
  1017. )
  1018. # Add a file that has a conflict
  1019. entry_conflict = ConflictedIndexEntry(b"conflict", {0: None, 1: None, 2: None})
  1020. index._byname = {
  1021. b"file1": entry1,
  1022. b"file2": entry2,
  1023. b"file3": IndexEntry(
  1024. (1230680220, 0),
  1025. (1230680220, 0),
  1026. 2050,
  1027. 3761020,
  1028. 33188,
  1029. 1000,
  1030. 1000,
  1031. 0,
  1032. b"0000000000000000000000000000000000000000",
  1033. 0,
  1034. 0,
  1035. ),
  1036. b"conflict": entry_conflict,
  1037. }
  1038. # Get unstaged changes
  1039. changes = list(get_unstaged_changes(index, repo_dir))
  1040. # File1 should be unstaged (content doesn't match hash)
  1041. # File3 doesn't exist (deleted)
  1042. # Conflict is always unstaged
  1043. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3"])
  1044. # Create directory where there should be a file
  1045. os.mkdir(os.path.join(repo_dir, "file4"))
  1046. index._byname[b"file4"] = entry1
  1047. # Get unstaged changes again
  1048. changes = list(get_unstaged_changes(index, repo_dir))
  1049. # Now file4 should also be unstaged because it's a directory instead of a file
  1050. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3", b"file4"])
  1051. # Create a custom blob filter function
  1052. def filter_blob_callback(blob, path):
  1053. # Modify blob to make it look changed
  1054. blob.data = b"modified " + blob.data
  1055. return blob
  1056. # Get unstaged changes with blob filter
  1057. changes = list(get_unstaged_changes(index, repo_dir, filter_blob_callback))
  1058. # Now both file1 and file2 should be unstaged due to the filter
  1059. self.assertEqual(
  1060. sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
  1061. )
  1062. class TestManyFilesFeature(TestCase):
  1063. """Tests for the manyFiles feature (index version 4 and skipHash)."""
  1064. def setUp(self):
  1065. self.tempdir = tempfile.mkdtemp()
  1066. self.addCleanup(shutil.rmtree, self.tempdir)
  1067. def test_index_version_4_parsing(self):
  1068. """Test that index version 4 files can be parsed."""
  1069. index_path = os.path.join(self.tempdir, "index")
  1070. # Create an index with version 4
  1071. index = Index(index_path, read=False, version=4)
  1072. # Add some entries
  1073. entry = IndexEntry(
  1074. ctime=(1234567890, 0),
  1075. mtime=(1234567890, 0),
  1076. dev=1,
  1077. ino=1,
  1078. mode=0o100644,
  1079. uid=1000,
  1080. gid=1000,
  1081. size=5,
  1082. sha=b"0" * 40,
  1083. )
  1084. index[b"test.txt"] = entry
  1085. # Write and read back
  1086. index.write()
  1087. # Read the index back
  1088. index2 = Index(index_path)
  1089. self.assertEqual(index2._version, 4)
  1090. self.assertIn(b"test.txt", index2)
  1091. def test_skip_hash_feature(self):
  1092. """Test that skipHash feature works correctly."""
  1093. index_path = os.path.join(self.tempdir, "index")
  1094. # Create an index with skipHash enabled
  1095. index = Index(index_path, read=False, skip_hash=True)
  1096. # Add some entries
  1097. entry = IndexEntry(
  1098. ctime=(1234567890, 0),
  1099. mtime=(1234567890, 0),
  1100. dev=1,
  1101. ino=1,
  1102. mode=0o100644,
  1103. uid=1000,
  1104. gid=1000,
  1105. size=5,
  1106. sha=b"0" * 40,
  1107. )
  1108. index[b"test.txt"] = entry
  1109. # Write the index
  1110. index.write()
  1111. # Verify the file was written with zero hash
  1112. with open(index_path, "rb") as f:
  1113. f.seek(-20, 2) # Seek to last 20 bytes
  1114. trailing_hash = f.read(20)
  1115. self.assertEqual(trailing_hash, b"\x00" * 20)
  1116. # Verify we can still read it back
  1117. index2 = Index(index_path)
  1118. self.assertIn(b"test.txt", index2)
  1119. def test_version_4_no_padding(self):
  1120. """Test that version 4 entries have no padding."""
  1121. # Create entries with names that would show compression benefits
  1122. entries = [
  1123. SerializedIndexEntry(
  1124. name=b"src/main/java/com/example/Service.java",
  1125. ctime=(1234567890, 0),
  1126. mtime=(1234567890, 0),
  1127. dev=1,
  1128. ino=1,
  1129. mode=0o100644,
  1130. uid=1000,
  1131. gid=1000,
  1132. size=5,
  1133. sha=b"0" * 40,
  1134. flags=0,
  1135. extended_flags=0,
  1136. ),
  1137. SerializedIndexEntry(
  1138. name=b"src/main/java/com/example/Controller.java",
  1139. ctime=(1234567890, 0),
  1140. mtime=(1234567890, 0),
  1141. dev=1,
  1142. ino=2,
  1143. mode=0o100644,
  1144. uid=1000,
  1145. gid=1000,
  1146. size=5,
  1147. sha=b"1" * 40,
  1148. flags=0,
  1149. extended_flags=0,
  1150. ),
  1151. ]
  1152. # Test version 2 (with padding, full paths)
  1153. buf_v2 = BytesIO()
  1154. from dulwich.index import write_cache_entry
  1155. previous_path = b""
  1156. for entry in entries:
  1157. # Set proper flags for v2
  1158. entry_v2 = SerializedIndexEntry(
  1159. entry.name,
  1160. entry.ctime,
  1161. entry.mtime,
  1162. entry.dev,
  1163. entry.ino,
  1164. entry.mode,
  1165. entry.uid,
  1166. entry.gid,
  1167. entry.size,
  1168. entry.sha,
  1169. len(entry.name),
  1170. entry.extended_flags,
  1171. )
  1172. write_cache_entry(buf_v2, entry_v2, version=2, previous_path=previous_path)
  1173. previous_path = entry.name
  1174. v2_data = buf_v2.getvalue()
  1175. # Test version 4 (path compression, no padding)
  1176. buf_v4 = BytesIO()
  1177. previous_path = b""
  1178. for entry in entries:
  1179. write_cache_entry(buf_v4, entry, version=4, previous_path=previous_path)
  1180. previous_path = entry.name
  1181. v4_data = buf_v4.getvalue()
  1182. # Version 4 should be shorter due to compression and no padding
  1183. self.assertLess(len(v4_data), len(v2_data))
  1184. # Both should parse correctly
  1185. buf_v2.seek(0)
  1186. from dulwich.index import read_cache_entry
  1187. previous_path = b""
  1188. parsed_v2_entries = []
  1189. for _ in entries:
  1190. parsed = read_cache_entry(buf_v2, version=2, previous_path=previous_path)
  1191. parsed_v2_entries.append(parsed)
  1192. previous_path = parsed.name
  1193. buf_v4.seek(0)
  1194. previous_path = b""
  1195. parsed_v4_entries = []
  1196. for _ in entries:
  1197. parsed = read_cache_entry(buf_v4, version=4, previous_path=previous_path)
  1198. parsed_v4_entries.append(parsed)
  1199. previous_path = parsed.name
  1200. # Both should have the same paths
  1201. for v2_entry, v4_entry in zip(parsed_v2_entries, parsed_v4_entries):
  1202. self.assertEqual(v2_entry.name, v4_entry.name)
  1203. self.assertEqual(v2_entry.sha, v4_entry.sha)
  1204. class TestManyFilesRepoIntegration(TestCase):
  1205. """Tests for manyFiles feature integration with Repo."""
  1206. def setUp(self):
  1207. self.tempdir = tempfile.mkdtemp()
  1208. self.addCleanup(shutil.rmtree, self.tempdir)
  1209. def test_repo_with_manyfiles_config(self):
  1210. """Test that a repository with feature.manyFiles=true uses the right settings."""
  1211. from dulwich.repo import Repo
  1212. # Create a new repository
  1213. repo = Repo.init(self.tempdir)
  1214. # Set feature.manyFiles=true in config
  1215. config = repo.get_config()
  1216. config.set(b"feature", b"manyFiles", b"true")
  1217. config.write_to_path()
  1218. # Open the index - should have skipHash enabled and version 4
  1219. index = repo.open_index()
  1220. self.assertTrue(index._skip_hash)
  1221. self.assertEqual(index._version, 4)
  1222. def test_repo_with_explicit_index_settings(self):
  1223. """Test that explicit index.version and index.skipHash work."""
  1224. from dulwich.repo import Repo
  1225. # Create a new repository
  1226. repo = Repo.init(self.tempdir)
  1227. # Set explicit index settings
  1228. config = repo.get_config()
  1229. config.set(b"index", b"version", b"3")
  1230. config.set(b"index", b"skipHash", b"false")
  1231. config.write_to_path()
  1232. # Open the index - should respect explicit settings
  1233. index = repo.open_index()
  1234. self.assertFalse(index._skip_hash)
  1235. self.assertEqual(index._version, 3)
  1236. class TestPathPrefixCompression(TestCase):
  1237. """Tests for index version 4 path prefix compression."""
  1238. def setUp(self):
  1239. self.tempdir = tempfile.mkdtemp()
  1240. self.addCleanup(shutil.rmtree, self.tempdir)
  1241. def test_varint_encoding_decoding(self):
  1242. """Test variable-width integer encoding and decoding."""
  1243. from dulwich.index import _decode_varint, _encode_varint
  1244. test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
  1245. for value in test_values:
  1246. encoded = _encode_varint(value)
  1247. decoded, _ = _decode_varint(encoded, 0)
  1248. self.assertEqual(value, decoded, f"Failed for value {value}")
  1249. def test_path_compression_simple(self):
  1250. """Test simple path compression cases."""
  1251. from dulwich.index import _compress_path, _decompress_path
  1252. # Test case 1: No common prefix
  1253. compressed = _compress_path(b"file1.txt", b"")
  1254. decompressed, _ = _decompress_path(compressed, 0, b"")
  1255. self.assertEqual(b"file1.txt", decompressed)
  1256. # Test case 2: Common prefix
  1257. compressed = _compress_path(b"src/file2.txt", b"src/file1.txt")
  1258. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1259. self.assertEqual(b"src/file2.txt", decompressed)
  1260. # Test case 3: Completely different paths
  1261. compressed = _compress_path(b"docs/readme.md", b"src/file1.txt")
  1262. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1263. self.assertEqual(b"docs/readme.md", decompressed)
  1264. def test_path_compression_deep_directories(self):
  1265. """Test compression with deep directory structures."""
  1266. from dulwich.index import _compress_path, _decompress_path
  1267. path1 = b"src/main/java/com/example/service/UserService.java"
  1268. path2 = b"src/main/java/com/example/service/OrderService.java"
  1269. path3 = b"src/main/java/com/example/model/User.java"
  1270. # Compress path2 relative to path1
  1271. compressed = _compress_path(path2, path1)
  1272. decompressed, _ = _decompress_path(compressed, 0, path1)
  1273. self.assertEqual(path2, decompressed)
  1274. # Compress path3 relative to path2
  1275. compressed = _compress_path(path3, path2)
  1276. decompressed, _ = _decompress_path(compressed, 0, path2)
  1277. self.assertEqual(path3, decompressed)
  1278. def test_index_version_4_with_compression(self):
  1279. """Test full index version 4 write/read with path compression."""
  1280. index_path = os.path.join(self.tempdir, "index")
  1281. # Create an index with version 4
  1282. index = Index(index_path, read=False, version=4)
  1283. # Add multiple entries with common prefixes
  1284. paths = [
  1285. b"src/main/java/App.java",
  1286. b"src/main/java/Utils.java",
  1287. b"src/main/resources/config.properties",
  1288. b"src/test/java/AppTest.java",
  1289. b"docs/README.md",
  1290. b"docs/INSTALL.md",
  1291. ]
  1292. for i, path in enumerate(paths):
  1293. entry = IndexEntry(
  1294. ctime=(1234567890, 0),
  1295. mtime=(1234567890, 0),
  1296. dev=1,
  1297. ino=i + 1,
  1298. mode=0o100644,
  1299. uid=1000,
  1300. gid=1000,
  1301. size=10,
  1302. sha=f"{i:040d}".encode(),
  1303. )
  1304. index[path] = entry
  1305. # Write and read back
  1306. index.write()
  1307. # Read the index back
  1308. index2 = Index(index_path)
  1309. self.assertEqual(index2._version, 4)
  1310. # Verify all paths were preserved correctly
  1311. for path in paths:
  1312. self.assertIn(path, index2)
  1313. # Verify the index file is smaller than version 2 would be
  1314. with open(index_path, "rb") as f:
  1315. v4_size = len(f.read())
  1316. # Create equivalent version 2 index for comparison
  1317. index_v2_path = os.path.join(self.tempdir, "index_v2")
  1318. index_v2 = Index(index_v2_path, read=False, version=2)
  1319. for path in paths:
  1320. entry = IndexEntry(
  1321. ctime=(1234567890, 0),
  1322. mtime=(1234567890, 0),
  1323. dev=1,
  1324. ino=1,
  1325. mode=0o100644,
  1326. uid=1000,
  1327. gid=1000,
  1328. size=10,
  1329. sha=b"0" * 40,
  1330. )
  1331. index_v2[path] = entry
  1332. index_v2.write()
  1333. with open(index_v2_path, "rb") as f:
  1334. v2_size = len(f.read())
  1335. # Version 4 should be smaller due to compression
  1336. self.assertLess(
  1337. v4_size, v2_size, "Version 4 index should be smaller than version 2"
  1338. )
  1339. def test_path_compression_edge_cases(self):
  1340. """Test edge cases in path compression."""
  1341. from dulwich.index import _compress_path, _decompress_path
  1342. # Empty paths
  1343. compressed = _compress_path(b"", b"")
  1344. decompressed, _ = _decompress_path(compressed, 0, b"")
  1345. self.assertEqual(b"", decompressed)
  1346. # Path identical to previous
  1347. compressed = _compress_path(b"same.txt", b"same.txt")
  1348. decompressed, _ = _decompress_path(compressed, 0, b"same.txt")
  1349. self.assertEqual(b"same.txt", decompressed)
  1350. # Path shorter than previous
  1351. compressed = _compress_path(b"short", b"very/long/path/file.txt")
  1352. decompressed, _ = _decompress_path(compressed, 0, b"very/long/path/file.txt")
  1353. self.assertEqual(b"short", decompressed)