test_index.py 82 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the index."""
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from io import BytesIO
  29. from dulwich.index import (
  30. Index,
  31. IndexEntry,
  32. SerializedIndexEntry,
  33. _compress_path,
  34. _decompress_path,
  35. _fs_to_tree_path,
  36. _tree_to_fs_path,
  37. build_index_from_tree,
  38. cleanup_mode,
  39. commit_tree,
  40. get_unstaged_changes,
  41. index_entry_from_directory,
  42. index_entry_from_path,
  43. index_entry_from_stat,
  44. iter_fresh_entries,
  45. read_index,
  46. read_index_dict,
  47. update_working_tree,
  48. validate_path_element_default,
  49. validate_path_element_ntfs,
  50. write_cache_time,
  51. write_index,
  52. write_index_dict,
  53. )
  54. from dulwich.object_store import MemoryObjectStore
  55. from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
  56. from dulwich.repo import Repo
  57. from . import TestCase, skipIf
  58. def can_symlink() -> bool:
  59. """Return whether running process can create symlinks."""
  60. if sys.platform != "win32":
  61. # Platforms other than Windows should allow symlinks without issues.
  62. return True
  63. test_source = tempfile.mkdtemp()
  64. test_target = test_source + "can_symlink"
  65. try:
  66. os.symlink(test_source, test_target)
  67. except (NotImplementedError, OSError):
  68. return False
  69. return True
  70. class IndexTestCase(TestCase):
  71. datadir = os.path.join(os.path.dirname(__file__), "../testdata/indexes")
  72. def get_simple_index(self, name):
  73. return Index(os.path.join(self.datadir, name))
  74. class SimpleIndexTestCase(IndexTestCase):
  75. def test_len(self) -> None:
  76. self.assertEqual(1, len(self.get_simple_index("index")))
  77. def test_iter(self) -> None:
  78. self.assertEqual([b"bla"], list(self.get_simple_index("index")))
  79. def test_iter_skip_hash(self) -> None:
  80. self.assertEqual([b"bla"], list(self.get_simple_index("index_skip_hash")))
  81. def test_iterobjects(self) -> None:
  82. self.assertEqual(
  83. [(b"bla", b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", 33188)],
  84. list(self.get_simple_index("index").iterobjects()),
  85. )
  86. def test_getitem(self) -> None:
  87. self.assertEqual(
  88. IndexEntry(
  89. (1230680220, 0),
  90. (1230680220, 0),
  91. 2050,
  92. 3761020,
  93. 33188,
  94. 1000,
  95. 1000,
  96. 0,
  97. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  98. 0,
  99. 0,
  100. ),
  101. self.get_simple_index("index")[b"bla"],
  102. )
  103. def test_empty(self) -> None:
  104. i = self.get_simple_index("notanindex")
  105. self.assertEqual(0, len(i))
  106. self.assertFalse(os.path.exists(i._filename))
  107. def test_against_empty_tree(self) -> None:
  108. i = self.get_simple_index("index")
  109. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  110. self.assertEqual(1, len(changes))
  111. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  112. self.assertEqual(b"bla", newname)
  113. self.assertEqual(b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", newsha)
  114. def test_index_pathlib(self) -> None:
  115. import tempfile
  116. from pathlib import Path
  117. # Create a temporary index file
  118. with tempfile.NamedTemporaryFile(suffix=".index", delete=False) as f:
  119. temp_path = f.name
  120. try:
  121. # Test creating Index with pathlib.Path
  122. path_obj = Path(temp_path)
  123. index = Index(path_obj, read=False)
  124. self.assertEqual(str(path_obj), index.path)
  125. # Add an entry and write
  126. index[b"test"] = IndexEntry(
  127. ctime=(0, 0),
  128. mtime=(0, 0),
  129. dev=0,
  130. ino=0,
  131. mode=33188,
  132. uid=0,
  133. gid=0,
  134. size=0,
  135. sha=b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  136. )
  137. index.write()
  138. # Read it back with pathlib.Path
  139. index2 = Index(path_obj)
  140. self.assertIn(b"test", index2)
  141. finally:
  142. # Clean up
  143. os.unlink(temp_path)
  144. class SimpleIndexWriterTestCase(IndexTestCase):
  145. def setUp(self) -> None:
  146. IndexTestCase.setUp(self)
  147. self.tempdir = tempfile.mkdtemp()
  148. def tearDown(self) -> None:
  149. IndexTestCase.tearDown(self)
  150. shutil.rmtree(self.tempdir)
  151. def test_simple_write(self) -> None:
  152. entries = [
  153. (
  154. SerializedIndexEntry(
  155. b"barbla",
  156. (1230680220, 0),
  157. (1230680220, 0),
  158. 2050,
  159. 3761020,
  160. 33188,
  161. 1000,
  162. 1000,
  163. 0,
  164. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  165. 0,
  166. 0,
  167. )
  168. )
  169. ]
  170. filename = os.path.join(self.tempdir, "test-simple-write-index")
  171. with open(filename, "wb+") as x:
  172. write_index(x, entries)
  173. with open(filename, "rb") as x:
  174. self.assertEqual(entries, list(read_index(x)))
  175. class ReadIndexDictTests(IndexTestCase):
  176. def setUp(self) -> None:
  177. IndexTestCase.setUp(self)
  178. self.tempdir = tempfile.mkdtemp()
  179. def tearDown(self) -> None:
  180. IndexTestCase.tearDown(self)
  181. shutil.rmtree(self.tempdir)
  182. def test_simple_write(self) -> None:
  183. entries = {
  184. b"barbla": IndexEntry(
  185. (1230680220, 0),
  186. (1230680220, 0),
  187. 2050,
  188. 3761020,
  189. 33188,
  190. 1000,
  191. 1000,
  192. 0,
  193. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  194. 0,
  195. 0,
  196. )
  197. }
  198. filename = os.path.join(self.tempdir, "test-simple-write-index")
  199. with open(filename, "wb+") as x:
  200. write_index_dict(x, entries)
  201. with open(filename, "rb") as x:
  202. self.assertEqual(entries, read_index_dict(x))
  203. class CommitTreeTests(TestCase):
  204. def setUp(self) -> None:
  205. super().setUp()
  206. self.store = MemoryObjectStore()
  207. def test_single_blob(self) -> None:
  208. blob = Blob()
  209. blob.data = b"foo"
  210. self.store.add_object(blob)
  211. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  212. rootid = commit_tree(self.store, blobs)
  213. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  214. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  215. self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
  216. def test_nested(self) -> None:
  217. blob = Blob()
  218. blob.data = b"foo"
  219. self.store.add_object(blob)
  220. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  221. rootid = commit_tree(self.store, blobs)
  222. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  223. dirid = self.store[rootid][b"bla"][1]
  224. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  225. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  226. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  227. self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
  228. class CleanupModeTests(TestCase):
  229. def assertModeEqual(self, expected, got) -> None:
  230. self.assertEqual(expected, got, f"{expected:o} != {got:o}")
  231. def test_file(self) -> None:
  232. self.assertModeEqual(0o100644, cleanup_mode(0o100000))
  233. def test_executable(self) -> None:
  234. self.assertModeEqual(0o100755, cleanup_mode(0o100711))
  235. self.assertModeEqual(0o100755, cleanup_mode(0o100700))
  236. def test_symlink(self) -> None:
  237. self.assertModeEqual(0o120000, cleanup_mode(0o120711))
  238. def test_dir(self) -> None:
  239. self.assertModeEqual(0o040000, cleanup_mode(0o40531))
  240. def test_submodule(self) -> None:
  241. self.assertModeEqual(0o160000, cleanup_mode(0o160744))
  242. class WriteCacheTimeTests(TestCase):
  243. def test_write_string(self) -> None:
  244. f = BytesIO()
  245. self.assertRaises(TypeError, write_cache_time, f, "foo")
  246. def test_write_int(self) -> None:
  247. f = BytesIO()
  248. write_cache_time(f, 434343)
  249. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  250. def test_write_tuple(self) -> None:
  251. f = BytesIO()
  252. write_cache_time(f, (434343, 21))
  253. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  254. def test_write_float(self) -> None:
  255. f = BytesIO()
  256. write_cache_time(f, 434343.000000021)
  257. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  258. class IndexEntryFromStatTests(TestCase):
  259. def test_simple(self) -> None:
  260. st = os.stat_result(
  261. (
  262. 16877,
  263. 131078,
  264. 64769,
  265. 154,
  266. 1000,
  267. 1000,
  268. 12288,
  269. 1323629595,
  270. 1324180496,
  271. 1324180496,
  272. )
  273. )
  274. entry = index_entry_from_stat(st, b"22" * 20)
  275. self.assertEqual(
  276. entry,
  277. IndexEntry(
  278. 1324180496,
  279. 1324180496,
  280. 64769,
  281. 131078,
  282. 16384,
  283. 1000,
  284. 1000,
  285. 12288,
  286. b"2222222222222222222222222222222222222222",
  287. 0,
  288. 0,
  289. ),
  290. )
  291. def test_override_mode(self) -> None:
  292. st = os.stat_result(
  293. (
  294. stat.S_IFREG + 0o644,
  295. 131078,
  296. 64769,
  297. 154,
  298. 1000,
  299. 1000,
  300. 12288,
  301. 1323629595,
  302. 1324180496,
  303. 1324180496,
  304. )
  305. )
  306. entry = index_entry_from_stat(st, b"22" * 20, mode=stat.S_IFREG + 0o755)
  307. self.assertEqual(
  308. entry,
  309. IndexEntry(
  310. 1324180496,
  311. 1324180496,
  312. 64769,
  313. 131078,
  314. 33261,
  315. 1000,
  316. 1000,
  317. 12288,
  318. b"2222222222222222222222222222222222222222",
  319. 0,
  320. 0,
  321. ),
  322. )
  323. class BuildIndexTests(TestCase):
  324. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha) -> None:
  325. self.assertEqual(index_entry.mode, mode) # mode
  326. self.assertEqual(index_entry.size, filesize) # filesize
  327. self.assertEqual(index_entry.sha, sha) # sha
  328. def assertFileContents(self, path, contents, symlink=False) -> None:
  329. if symlink:
  330. self.assertEqual(os.readlink(path), contents)
  331. else:
  332. with open(path, "rb") as f:
  333. self.assertEqual(f.read(), contents)
  334. def test_empty(self) -> None:
  335. repo_dir = tempfile.mkdtemp()
  336. self.addCleanup(shutil.rmtree, repo_dir)
  337. with Repo.init(repo_dir) as repo:
  338. tree = Tree()
  339. repo.object_store.add_object(tree)
  340. build_index_from_tree(
  341. repo.path, repo.index_path(), repo.object_store, tree.id
  342. )
  343. # Verify index entries
  344. index = repo.open_index()
  345. self.assertEqual(len(index), 0)
  346. # Verify no files
  347. self.assertEqual([".git"], os.listdir(repo.path))
  348. def test_git_dir(self) -> None:
  349. repo_dir = tempfile.mkdtemp()
  350. self.addCleanup(shutil.rmtree, repo_dir)
  351. with Repo.init(repo_dir) as repo:
  352. # Populate repo
  353. filea = Blob.from_string(b"file a")
  354. filee = Blob.from_string(b"d")
  355. tree = Tree()
  356. tree[b".git/a"] = (stat.S_IFREG | 0o644, filea.id)
  357. tree[b"c/e"] = (stat.S_IFREG | 0o644, filee.id)
  358. repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]])
  359. build_index_from_tree(
  360. repo.path, repo.index_path(), repo.object_store, tree.id
  361. )
  362. # Verify index entries
  363. index = repo.open_index()
  364. self.assertEqual(len(index), 1)
  365. # filea
  366. apath = os.path.join(repo.path, ".git", "a")
  367. self.assertFalse(os.path.exists(apath))
  368. # filee
  369. epath = os.path.join(repo.path, "c", "e")
  370. self.assertTrue(os.path.exists(epath))
  371. self.assertReasonableIndexEntry(
  372. index[b"c/e"], stat.S_IFREG | 0o644, 1, filee.id
  373. )
  374. self.assertFileContents(epath, b"d")
  375. def test_nonempty(self) -> None:
  376. repo_dir = tempfile.mkdtemp()
  377. self.addCleanup(shutil.rmtree, repo_dir)
  378. with Repo.init(repo_dir) as repo:
  379. # Populate repo
  380. filea = Blob.from_string(b"file a")
  381. fileb = Blob.from_string(b"file b")
  382. filed = Blob.from_string(b"file d")
  383. tree = Tree()
  384. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  385. tree[b"b"] = (stat.S_IFREG | 0o644, fileb.id)
  386. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  387. repo.object_store.add_objects(
  388. [(o, None) for o in [filea, fileb, filed, tree]]
  389. )
  390. build_index_from_tree(
  391. repo.path, repo.index_path(), repo.object_store, tree.id
  392. )
  393. # Verify index entries
  394. index = repo.open_index()
  395. self.assertEqual(len(index), 3)
  396. # filea
  397. apath = os.path.join(repo.path, "a")
  398. self.assertTrue(os.path.exists(apath))
  399. self.assertReasonableIndexEntry(
  400. index[b"a"], stat.S_IFREG | 0o644, 6, filea.id
  401. )
  402. self.assertFileContents(apath, b"file a")
  403. # fileb
  404. bpath = os.path.join(repo.path, "b")
  405. self.assertTrue(os.path.exists(bpath))
  406. self.assertReasonableIndexEntry(
  407. index[b"b"], stat.S_IFREG | 0o644, 6, fileb.id
  408. )
  409. self.assertFileContents(bpath, b"file b")
  410. # filed
  411. dpath = os.path.join(repo.path, "c", "d")
  412. self.assertTrue(os.path.exists(dpath))
  413. self.assertReasonableIndexEntry(
  414. index[b"c/d"], stat.S_IFREG | 0o644, 6, filed.id
  415. )
  416. self.assertFileContents(dpath, b"file d")
  417. # Verify no extra files
  418. self.assertEqual([".git", "a", "b", "c"], sorted(os.listdir(repo.path)))
  419. self.assertEqual(["d"], sorted(os.listdir(os.path.join(repo.path, "c"))))
  420. @skipIf(not getattr(os, "sync", None), "Requires sync support")
  421. def test_norewrite(self) -> None:
  422. repo_dir = tempfile.mkdtemp()
  423. self.addCleanup(shutil.rmtree, repo_dir)
  424. with Repo.init(repo_dir) as repo:
  425. # Populate repo
  426. filea = Blob.from_string(b"file a")
  427. filea_path = os.path.join(repo_dir, "a")
  428. tree = Tree()
  429. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  430. repo.object_store.add_objects([(o, None) for o in [filea, tree]])
  431. # First Write
  432. build_index_from_tree(
  433. repo.path, repo.index_path(), repo.object_store, tree.id
  434. )
  435. # Use sync as metadata can be cached on some FS
  436. os.sync()
  437. mtime = os.stat(filea_path).st_mtime
  438. # Test Rewrite
  439. build_index_from_tree(
  440. repo.path, repo.index_path(), repo.object_store, tree.id
  441. )
  442. os.sync()
  443. self.assertEqual(mtime, os.stat(filea_path).st_mtime)
  444. # Modify content
  445. with open(filea_path, "wb") as fh:
  446. fh.write(b"test a")
  447. os.sync()
  448. mtime = os.stat(filea_path).st_mtime
  449. # Test rewrite
  450. build_index_from_tree(
  451. repo.path, repo.index_path(), repo.object_store, tree.id
  452. )
  453. os.sync()
  454. with open(filea_path, "rb") as fh:
  455. self.assertEqual(b"file a", fh.read())
  456. @skipIf(not can_symlink(), "Requires symlink support")
  457. def test_symlink(self) -> None:
  458. repo_dir = tempfile.mkdtemp()
  459. self.addCleanup(shutil.rmtree, repo_dir)
  460. with Repo.init(repo_dir) as repo:
  461. # Populate repo
  462. filed = Blob.from_string(b"file d")
  463. filee = Blob.from_string(b"d")
  464. tree = Tree()
  465. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  466. tree[b"c/e"] = (stat.S_IFLNK, filee.id) # symlink
  467. repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]])
  468. build_index_from_tree(
  469. repo.path, repo.index_path(), repo.object_store, tree.id
  470. )
  471. # Verify index entries
  472. index = repo.open_index()
  473. # symlink to d
  474. epath = os.path.join(repo.path, "c", "e")
  475. self.assertTrue(os.path.exists(epath))
  476. self.assertReasonableIndexEntry(
  477. index[b"c/e"],
  478. stat.S_IFLNK,
  479. 0 if sys.platform == "win32" else 1,
  480. filee.id,
  481. )
  482. self.assertFileContents(epath, "d", symlink=True)
  483. def test_no_decode_encode(self) -> None:
  484. repo_dir = tempfile.mkdtemp()
  485. repo_dir_bytes = os.fsencode(repo_dir)
  486. self.addCleanup(shutil.rmtree, repo_dir)
  487. with Repo.init(repo_dir) as repo:
  488. # Populate repo
  489. file = Blob.from_string(b"foo")
  490. tree = Tree()
  491. latin1_name = "À".encode("latin1")
  492. try:
  493. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  494. except UnicodeDecodeError:
  495. self.skipTest("can not decode as latin1")
  496. utf8_name = "À".encode()
  497. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  498. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  499. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  500. repo.object_store.add_objects([(o, None) for o in [file, tree]])
  501. try:
  502. build_index_from_tree(
  503. repo.path, repo.index_path(), repo.object_store, tree.id
  504. )
  505. except OSError as e:
  506. if e.errno == 92 and sys.platform == "darwin":
  507. # Our filename isn't supported by the platform :(
  508. self.skipTest(f"can not write filename {e.filename!r}")
  509. else:
  510. raise
  511. except UnicodeDecodeError:
  512. # This happens e.g. with python3.6 on Windows.
  513. # It implicitly decodes using utf8, which doesn't work.
  514. self.skipTest("can not implicitly convert as utf8")
  515. # Verify index entries
  516. index = repo.open_index()
  517. self.assertIn(latin1_name, index)
  518. self.assertIn(utf8_name, index)
  519. self.assertTrue(os.path.exists(latin1_path))
  520. self.assertTrue(os.path.exists(utf8_path))
  521. def test_git_submodule(self) -> None:
  522. repo_dir = tempfile.mkdtemp()
  523. self.addCleanup(shutil.rmtree, repo_dir)
  524. with Repo.init(repo_dir) as repo:
  525. filea = Blob.from_string(b"file alalala")
  526. subtree = Tree()
  527. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  528. c = Commit()
  529. c.tree = subtree.id
  530. c.committer = c.author = b"Somebody <somebody@example.com>"
  531. c.commit_time = c.author_time = 42342
  532. c.commit_timezone = c.author_timezone = 0
  533. c.parents = []
  534. c.message = b"Subcommit"
  535. tree = Tree()
  536. tree[b"c"] = (S_IFGITLINK, c.id)
  537. repo.object_store.add_objects([(o, None) for o in [tree]])
  538. build_index_from_tree(
  539. repo.path, repo.index_path(), repo.object_store, tree.id
  540. )
  541. # Verify index entries
  542. index = repo.open_index()
  543. self.assertEqual(len(index), 1)
  544. # filea
  545. apath = os.path.join(repo.path, "c/a")
  546. self.assertFalse(os.path.exists(apath))
  547. # dir c
  548. cpath = os.path.join(repo.path, "c")
  549. self.assertTrue(os.path.isdir(cpath))
  550. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  551. self.assertEqual(index[b"c"].sha, c.id) # sha
  552. def test_git_submodule_exists(self) -> None:
  553. repo_dir = tempfile.mkdtemp()
  554. self.addCleanup(shutil.rmtree, repo_dir)
  555. with Repo.init(repo_dir) as repo:
  556. filea = Blob.from_string(b"file alalala")
  557. subtree = Tree()
  558. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  559. c = Commit()
  560. c.tree = subtree.id
  561. c.committer = c.author = b"Somebody <somebody@example.com>"
  562. c.commit_time = c.author_time = 42342
  563. c.commit_timezone = c.author_timezone = 0
  564. c.parents = []
  565. c.message = b"Subcommit"
  566. tree = Tree()
  567. tree[b"c"] = (S_IFGITLINK, c.id)
  568. os.mkdir(os.path.join(repo_dir, "c"))
  569. repo.object_store.add_objects([(o, None) for o in [tree]])
  570. build_index_from_tree(
  571. repo.path, repo.index_path(), repo.object_store, tree.id
  572. )
  573. # Verify index entries
  574. index = repo.open_index()
  575. self.assertEqual(len(index), 1)
  576. # filea
  577. apath = os.path.join(repo.path, "c/a")
  578. self.assertFalse(os.path.exists(apath))
  579. # dir c
  580. cpath = os.path.join(repo.path, "c")
  581. self.assertTrue(os.path.isdir(cpath))
  582. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  583. self.assertEqual(index[b"c"].sha, c.id) # sha
  584. def test_with_line_ending_normalization(self) -> None:
  585. """Test that build_index_from_tree applies line-ending normalization."""
  586. repo_dir = tempfile.mkdtemp()
  587. self.addCleanup(shutil.rmtree, repo_dir)
  588. from dulwich.line_ending import BlobNormalizer
  589. with Repo.init(repo_dir) as repo:
  590. # Set up autocrlf config
  591. config = repo.get_config()
  592. config.set((b"core",), b"autocrlf", b"true")
  593. config.write_to_path()
  594. # Create blob with LF line endings
  595. content_lf = b"line1\nline2\nline3\n"
  596. blob = Blob.from_string(content_lf)
  597. tree = Tree()
  598. tree[b"test.txt"] = (stat.S_IFREG | 0o644, blob.id)
  599. repo.object_store.add_objects([(blob, None), (tree, None)])
  600. # Create blob normalizer
  601. blob_normalizer = BlobNormalizer(config, {})
  602. # Build index with normalization
  603. build_index_from_tree(
  604. repo.path,
  605. repo.index_path(),
  606. repo.object_store,
  607. tree.id,
  608. blob_normalizer=blob_normalizer,
  609. )
  610. # On Windows with autocrlf=true, file should have CRLF line endings
  611. test_file = os.path.join(repo.path, "test.txt")
  612. with open(test_file, "rb") as f:
  613. content = f.read()
  614. # autocrlf=true means LF -> CRLF on checkout (on all platforms for testing)
  615. expected_content = b"line1\r\nline2\r\nline3\r\n"
  616. self.assertEqual(content, expected_content)
  617. class GetUnstagedChangesTests(TestCase):
  618. def test_get_unstaged_changes(self) -> None:
  619. """Unit test for get_unstaged_changes."""
  620. repo_dir = tempfile.mkdtemp()
  621. self.addCleanup(shutil.rmtree, repo_dir)
  622. with Repo.init(repo_dir) as repo:
  623. # Commit a dummy file then modify it
  624. foo1_fullpath = os.path.join(repo_dir, "foo1")
  625. with open(foo1_fullpath, "wb") as f:
  626. f.write(b"origstuff")
  627. foo2_fullpath = os.path.join(repo_dir, "foo2")
  628. with open(foo2_fullpath, "wb") as f:
  629. f.write(b"origstuff")
  630. repo.stage(["foo1", "foo2"])
  631. repo.do_commit(
  632. b"test status",
  633. author=b"author <email>",
  634. committer=b"committer <email>",
  635. )
  636. with open(foo1_fullpath, "wb") as f:
  637. f.write(b"newstuff")
  638. # modify access and modify time of path
  639. os.utime(foo1_fullpath, (0, 0))
  640. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  641. self.assertEqual(list(changes), [b"foo1"])
  642. def test_get_unstaged_deleted_changes(self) -> None:
  643. """Unit test for get_unstaged_changes."""
  644. repo_dir = tempfile.mkdtemp()
  645. self.addCleanup(shutil.rmtree, repo_dir)
  646. with Repo.init(repo_dir) as repo:
  647. # Commit a dummy file then remove it
  648. foo1_fullpath = os.path.join(repo_dir, "foo1")
  649. with open(foo1_fullpath, "wb") as f:
  650. f.write(b"origstuff")
  651. repo.stage(["foo1"])
  652. repo.do_commit(
  653. b"test status",
  654. author=b"author <email>",
  655. committer=b"committer <email>",
  656. )
  657. os.unlink(foo1_fullpath)
  658. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  659. self.assertEqual(list(changes), [b"foo1"])
  660. def test_get_unstaged_changes_removed_replaced_by_directory(self) -> None:
  661. """Unit test for get_unstaged_changes."""
  662. repo_dir = tempfile.mkdtemp()
  663. self.addCleanup(shutil.rmtree, repo_dir)
  664. with Repo.init(repo_dir) as repo:
  665. # Commit a dummy file then modify it
  666. foo1_fullpath = os.path.join(repo_dir, "foo1")
  667. with open(foo1_fullpath, "wb") as f:
  668. f.write(b"origstuff")
  669. repo.stage(["foo1"])
  670. repo.do_commit(
  671. b"test status",
  672. author=b"author <email>",
  673. committer=b"committer <email>",
  674. )
  675. os.remove(foo1_fullpath)
  676. os.mkdir(foo1_fullpath)
  677. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  678. self.assertEqual(list(changes), [b"foo1"])
  679. @skipIf(not can_symlink(), "Requires symlink support")
  680. def test_get_unstaged_changes_removed_replaced_by_link(self) -> None:
  681. """Unit test for get_unstaged_changes."""
  682. repo_dir = tempfile.mkdtemp()
  683. self.addCleanup(shutil.rmtree, repo_dir)
  684. with Repo.init(repo_dir) as repo:
  685. # Commit a dummy file then modify it
  686. foo1_fullpath = os.path.join(repo_dir, "foo1")
  687. with open(foo1_fullpath, "wb") as f:
  688. f.write(b"origstuff")
  689. repo.stage(["foo1"])
  690. repo.do_commit(
  691. b"test status",
  692. author=b"author <email>",
  693. committer=b"committer <email>",
  694. )
  695. os.remove(foo1_fullpath)
  696. os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
  697. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  698. self.assertEqual(list(changes), [b"foo1"])
  699. class TestValidatePathElement(TestCase):
  700. def test_default(self) -> None:
  701. self.assertTrue(validate_path_element_default(b"bla"))
  702. self.assertTrue(validate_path_element_default(b".bla"))
  703. self.assertFalse(validate_path_element_default(b".git"))
  704. self.assertFalse(validate_path_element_default(b".giT"))
  705. self.assertFalse(validate_path_element_default(b".."))
  706. self.assertTrue(validate_path_element_default(b"git~1"))
  707. def test_ntfs(self) -> None:
  708. self.assertTrue(validate_path_element_ntfs(b"bla"))
  709. self.assertTrue(validate_path_element_ntfs(b".bla"))
  710. self.assertFalse(validate_path_element_ntfs(b".git"))
  711. self.assertFalse(validate_path_element_ntfs(b".giT"))
  712. self.assertFalse(validate_path_element_ntfs(b".."))
  713. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  714. class TestTreeFSPathConversion(TestCase):
  715. def test_tree_to_fs_path(self) -> None:
  716. tree_path = "délwíçh/foo".encode()
  717. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  718. self.assertEqual(
  719. fs_path,
  720. os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
  721. )
  722. def test_tree_to_fs_path_windows_separator(self) -> None:
  723. tree_path = b"path/with/slash"
  724. original_sep = os.sep.encode("ascii")
  725. try:
  726. # Temporarily modify os_sep_bytes to test Windows path conversion
  727. # This simulates Windows behavior on all platforms for testing
  728. import dulwich.index
  729. dulwich.index.os_sep_bytes = b"\\"
  730. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  731. # The function should join the prefix path with the converted tree path
  732. # The expected behavior is that the path separators in the tree_path are
  733. # converted to the platform-specific separator (which we've set to backslash)
  734. expected_path = os.path.join(b"/prefix/path", b"path\\with\\slash")
  735. self.assertEqual(fs_path, expected_path)
  736. finally:
  737. # Restore original value
  738. dulwich.index.os_sep_bytes = original_sep
  739. def test_fs_to_tree_path_str(self) -> None:
  740. fs_path = os.path.join(os.path.join("délwíçh", "foo"))
  741. tree_path = _fs_to_tree_path(fs_path)
  742. self.assertEqual(tree_path, "délwíçh/foo".encode())
  743. def test_fs_to_tree_path_bytes(self) -> None:
  744. fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
  745. tree_path = _fs_to_tree_path(fs_path)
  746. self.assertEqual(tree_path, "délwíçh/foo".encode())
  747. def test_fs_to_tree_path_windows_separator(self) -> None:
  748. # Test conversion of Windows paths to tree paths
  749. fs_path = b"path\\with\\backslash"
  750. original_sep = os.sep.encode("ascii")
  751. try:
  752. # Temporarily modify os_sep_bytes to test Windows path conversion
  753. import dulwich.index
  754. dulwich.index.os_sep_bytes = b"\\"
  755. tree_path = _fs_to_tree_path(fs_path)
  756. self.assertEqual(tree_path, b"path/with/backslash")
  757. finally:
  758. # Restore original value
  759. dulwich.index.os_sep_bytes = original_sep
  760. class TestIndexEntryFromPath(TestCase):
  761. def setUp(self):
  762. self.tempdir = tempfile.mkdtemp()
  763. self.addCleanup(shutil.rmtree, self.tempdir)
  764. def test_index_entry_from_path_file(self) -> None:
  765. """Test creating index entry from a regular file."""
  766. # Create a test file
  767. test_file = os.path.join(self.tempdir, "testfile")
  768. with open(test_file, "wb") as f:
  769. f.write(b"test content")
  770. # Get the index entry
  771. entry = index_entry_from_path(os.fsencode(test_file))
  772. # Verify the entry was created with the right mode
  773. self.assertIsNotNone(entry)
  774. self.assertEqual(cleanup_mode(os.stat(test_file).st_mode), entry.mode)
  775. @skipIf(not can_symlink(), "Requires symlink support")
  776. def test_index_entry_from_path_symlink(self) -> None:
  777. """Test creating index entry from a symlink."""
  778. # Create a target file
  779. target_file = os.path.join(self.tempdir, "target")
  780. with open(target_file, "wb") as f:
  781. f.write(b"target content")
  782. # Create a symlink
  783. link_file = os.path.join(self.tempdir, "symlink")
  784. os.symlink(target_file, link_file)
  785. # Get the index entry
  786. entry = index_entry_from_path(os.fsencode(link_file))
  787. # Verify the entry was created with the right mode
  788. self.assertIsNotNone(entry)
  789. self.assertEqual(cleanup_mode(os.lstat(link_file).st_mode), entry.mode)
  790. def test_index_entry_from_path_directory(self) -> None:
  791. """Test creating index entry from a directory (should return None)."""
  792. # Create a directory
  793. test_dir = os.path.join(self.tempdir, "testdir")
  794. os.mkdir(test_dir)
  795. # Get the index entry for a directory
  796. entry = index_entry_from_path(os.fsencode(test_dir))
  797. # Should return None for regular directories
  798. self.assertIsNone(entry)
  799. def test_index_entry_from_directory_regular(self) -> None:
  800. """Test index_entry_from_directory with a regular directory."""
  801. # Create a directory
  802. test_dir = os.path.join(self.tempdir, "testdir")
  803. os.mkdir(test_dir)
  804. # Get stat for the directory
  805. st = os.lstat(test_dir)
  806. # Get the index entry for a regular directory
  807. entry = index_entry_from_directory(st, os.fsencode(test_dir))
  808. # Should return None for regular directories
  809. self.assertIsNone(entry)
  810. def test_index_entry_from_directory_git_submodule(self) -> None:
  811. """Test index_entry_from_directory with a Git submodule."""
  812. # Create a git repository that will be a submodule
  813. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  814. os.mkdir(sub_repo_dir)
  815. # Create the .git directory to make it look like a git repo
  816. git_dir = os.path.join(sub_repo_dir, ".git")
  817. os.mkdir(git_dir)
  818. # Create HEAD file with a fake commit SHA
  819. head_sha = b"1234567890" * 4 # 40-char fake SHA
  820. with open(os.path.join(git_dir, "HEAD"), "wb") as f:
  821. f.write(head_sha)
  822. # Get stat for the submodule directory
  823. st = os.lstat(sub_repo_dir)
  824. # Get the index entry for a git submodule directory
  825. entry = index_entry_from_directory(st, os.fsencode(sub_repo_dir))
  826. # Since we don't have a proper git setup, this might still return None
  827. # This test just ensures the code path is executed
  828. if entry is not None:
  829. # If an entry is returned, it should have the gitlink mode
  830. self.assertEqual(entry.mode, S_IFGITLINK)
  831. def test_index_entry_from_path_with_object_store(self) -> None:
  832. """Test creating index entry with object store."""
  833. # Create a test file
  834. test_file = os.path.join(self.tempdir, "testfile")
  835. with open(test_file, "wb") as f:
  836. f.write(b"test content")
  837. # Create a memory object store
  838. object_store = MemoryObjectStore()
  839. # Get the index entry and add to object store
  840. entry = index_entry_from_path(os.fsencode(test_file), object_store)
  841. # Verify we can access the blob from the object store
  842. self.assertIsNotNone(entry)
  843. blob = object_store[entry.sha]
  844. self.assertEqual(b"test content", blob.data)
  845. def test_iter_fresh_entries(self) -> None:
  846. """Test iterating over fresh entries."""
  847. # Create some test files
  848. file1 = os.path.join(self.tempdir, "file1")
  849. with open(file1, "wb") as f:
  850. f.write(b"file1 content")
  851. file2 = os.path.join(self.tempdir, "file2")
  852. with open(file2, "wb") as f:
  853. f.write(b"file2 content")
  854. # Create a memory object store
  855. object_store = MemoryObjectStore()
  856. # Get fresh entries
  857. paths = [b"file1", b"file2", b"nonexistent"]
  858. entries = dict(
  859. iter_fresh_entries(paths, os.fsencode(self.tempdir), object_store)
  860. )
  861. # Verify both files got entries but nonexistent file is None
  862. self.assertIn(b"file1", entries)
  863. self.assertIn(b"file2", entries)
  864. self.assertIn(b"nonexistent", entries)
  865. self.assertIsNotNone(entries[b"file1"])
  866. self.assertIsNotNone(entries[b"file2"])
  867. self.assertIsNone(entries[b"nonexistent"])
  868. # Check that blobs were added to object store
  869. blob1 = object_store[entries[b"file1"].sha]
  870. self.assertEqual(b"file1 content", blob1.data)
  871. blob2 = object_store[entries[b"file2"].sha]
  872. self.assertEqual(b"file2 content", blob2.data)
  873. def test_read_submodule_head(self) -> None:
  874. """Test reading the HEAD of a submodule."""
  875. from dulwich.index import read_submodule_head
  876. # Create a test repo that will be our "submodule"
  877. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  878. os.mkdir(sub_repo_dir)
  879. submodule_repo = Repo.init(sub_repo_dir)
  880. # Create a file and commit it to establish a HEAD
  881. test_file = os.path.join(sub_repo_dir, "testfile")
  882. with open(test_file, "wb") as f:
  883. f.write(b"test content")
  884. submodule_repo.stage(["testfile"])
  885. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  886. # Test reading the HEAD
  887. head_sha = read_submodule_head(sub_repo_dir)
  888. self.assertEqual(commit_id, head_sha)
  889. # Test with bytes path
  890. head_sha_bytes = read_submodule_head(os.fsencode(sub_repo_dir))
  891. self.assertEqual(commit_id, head_sha_bytes)
  892. # Test with non-existent path
  893. non_repo_dir = os.path.join(self.tempdir, "nonrepo")
  894. os.mkdir(non_repo_dir)
  895. self.assertIsNone(read_submodule_head(non_repo_dir))
  896. # Test with path that doesn't have a .git directory
  897. not_git_dir = os.path.join(self.tempdir, "notgit")
  898. os.mkdir(not_git_dir)
  899. self.assertIsNone(read_submodule_head(not_git_dir))
  900. def test_has_directory_changed(self) -> None:
  901. """Test checking if a directory has changed."""
  902. from dulwich.index import IndexEntry, _has_directory_changed
  903. # Setup mock IndexEntry
  904. mock_entry = IndexEntry(
  905. (1230680220, 0),
  906. (1230680220, 0),
  907. 2050,
  908. 3761020,
  909. 33188,
  910. 1000,
  911. 1000,
  912. 0,
  913. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  914. 0,
  915. 0,
  916. )
  917. # Test with a regular directory (not a submodule)
  918. reg_dir = os.path.join(self.tempdir, "regular_dir")
  919. os.mkdir(reg_dir)
  920. # Should return True for regular directory
  921. self.assertTrue(_has_directory_changed(os.fsencode(reg_dir), mock_entry))
  922. # Create a git repository to test submodule scenarios
  923. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  924. os.mkdir(sub_repo_dir)
  925. submodule_repo = Repo.init(sub_repo_dir)
  926. # Create a file and commit it to establish a HEAD
  927. test_file = os.path.join(sub_repo_dir, "testfile")
  928. with open(test_file, "wb") as f:
  929. f.write(b"test content")
  930. submodule_repo.stage(["testfile"])
  931. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  932. # Create an entry with the correct commit SHA
  933. correct_entry = IndexEntry(
  934. (1230680220, 0),
  935. (1230680220, 0),
  936. 2050,
  937. 3761020,
  938. 33188,
  939. 1000,
  940. 1000,
  941. 0,
  942. commit_id,
  943. 0,
  944. 0,
  945. )
  946. # Create an entry with an incorrect commit SHA
  947. incorrect_entry = IndexEntry(
  948. (1230680220, 0),
  949. (1230680220, 0),
  950. 2050,
  951. 3761020,
  952. 33188,
  953. 1000,
  954. 1000,
  955. 0,
  956. b"0000000000000000000000000000000000000000",
  957. 0,
  958. 0,
  959. )
  960. # Should return False for submodule with correct SHA
  961. self.assertFalse(
  962. _has_directory_changed(os.fsencode(sub_repo_dir), correct_entry)
  963. )
  964. # Should return True for submodule with incorrect SHA
  965. self.assertTrue(
  966. _has_directory_changed(os.fsencode(sub_repo_dir), incorrect_entry)
  967. )
  968. def test_get_unstaged_changes(self) -> None:
  969. """Test detecting unstaged changes in a working tree."""
  970. from dulwich.index import (
  971. ConflictedIndexEntry,
  972. Index,
  973. IndexEntry,
  974. get_unstaged_changes,
  975. )
  976. # Create a test repo
  977. repo_dir = tempfile.mkdtemp()
  978. self.addCleanup(shutil.rmtree, repo_dir)
  979. # Create test index
  980. index = Index(os.path.join(repo_dir, "index"))
  981. # Create an actual hash of our test content
  982. from dulwich.objects import Blob
  983. test_blob = Blob()
  984. test_blob.data = b"initial content"
  985. # Create some test files with known contents
  986. file1_path = os.path.join(repo_dir, "file1")
  987. with open(file1_path, "wb") as f:
  988. f.write(b"initial content")
  989. file2_path = os.path.join(repo_dir, "file2")
  990. with open(file2_path, "wb") as f:
  991. f.write(b"initial content")
  992. # Add them to index
  993. entry1 = IndexEntry(
  994. (1230680220, 0),
  995. (1230680220, 0),
  996. 2050,
  997. 3761020,
  998. 33188,
  999. 1000,
  1000. 1000,
  1001. 0,
  1002. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", # Not matching actual content
  1003. 0,
  1004. 0,
  1005. )
  1006. entry2 = IndexEntry(
  1007. (1230680220, 0),
  1008. (1230680220, 0),
  1009. 2050,
  1010. 3761020,
  1011. 33188,
  1012. 1000,
  1013. 1000,
  1014. 0,
  1015. test_blob.id, # Will be content's real hash
  1016. 0,
  1017. 0,
  1018. )
  1019. # Add a file that has a conflict
  1020. entry_conflict = ConflictedIndexEntry(b"conflict", {0: None, 1: None, 2: None})
  1021. index._byname = {
  1022. b"file1": entry1,
  1023. b"file2": entry2,
  1024. b"file3": IndexEntry(
  1025. (1230680220, 0),
  1026. (1230680220, 0),
  1027. 2050,
  1028. 3761020,
  1029. 33188,
  1030. 1000,
  1031. 1000,
  1032. 0,
  1033. b"0000000000000000000000000000000000000000",
  1034. 0,
  1035. 0,
  1036. ),
  1037. b"conflict": entry_conflict,
  1038. }
  1039. # Get unstaged changes
  1040. changes = list(get_unstaged_changes(index, repo_dir))
  1041. # File1 should be unstaged (content doesn't match hash)
  1042. # File3 doesn't exist (deleted)
  1043. # Conflict is always unstaged
  1044. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3"])
  1045. # Create directory where there should be a file
  1046. os.mkdir(os.path.join(repo_dir, "file4"))
  1047. index._byname[b"file4"] = entry1
  1048. # Get unstaged changes again
  1049. changes = list(get_unstaged_changes(index, repo_dir))
  1050. # Now file4 should also be unstaged because it's a directory instead of a file
  1051. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3", b"file4"])
  1052. # Create a custom blob filter function
  1053. def filter_blob_callback(blob, path):
  1054. # Modify blob to make it look changed
  1055. blob.data = b"modified " + blob.data
  1056. return blob
  1057. # Get unstaged changes with blob filter
  1058. changes = list(get_unstaged_changes(index, repo_dir, filter_blob_callback))
  1059. # Now both file1 and file2 should be unstaged due to the filter
  1060. self.assertEqual(
  1061. sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
  1062. )
  1063. class TestManyFilesFeature(TestCase):
  1064. """Tests for the manyFiles feature (index version 4 and skipHash)."""
  1065. def setUp(self):
  1066. self.tempdir = tempfile.mkdtemp()
  1067. self.addCleanup(shutil.rmtree, self.tempdir)
  1068. def test_index_version_4_parsing(self):
  1069. """Test that index version 4 files can be parsed."""
  1070. index_path = os.path.join(self.tempdir, "index")
  1071. # Create an index with version 4
  1072. index = Index(index_path, read=False, version=4)
  1073. # Add some entries
  1074. entry = IndexEntry(
  1075. ctime=(1234567890, 0),
  1076. mtime=(1234567890, 0),
  1077. dev=1,
  1078. ino=1,
  1079. mode=0o100644,
  1080. uid=1000,
  1081. gid=1000,
  1082. size=5,
  1083. sha=b"0" * 40,
  1084. )
  1085. index[b"test.txt"] = entry
  1086. # Write and read back
  1087. index.write()
  1088. # Read the index back
  1089. index2 = Index(index_path)
  1090. self.assertEqual(index2._version, 4)
  1091. self.assertIn(b"test.txt", index2)
  1092. def test_skip_hash_feature(self):
  1093. """Test that skipHash feature works correctly."""
  1094. index_path = os.path.join(self.tempdir, "index")
  1095. # Create an index with skipHash enabled
  1096. index = Index(index_path, read=False, skip_hash=True)
  1097. # Add some entries
  1098. entry = IndexEntry(
  1099. ctime=(1234567890, 0),
  1100. mtime=(1234567890, 0),
  1101. dev=1,
  1102. ino=1,
  1103. mode=0o100644,
  1104. uid=1000,
  1105. gid=1000,
  1106. size=5,
  1107. sha=b"0" * 40,
  1108. )
  1109. index[b"test.txt"] = entry
  1110. # Write the index
  1111. index.write()
  1112. # Verify the file was written with zero hash
  1113. with open(index_path, "rb") as f:
  1114. f.seek(-20, 2) # Seek to last 20 bytes
  1115. trailing_hash = f.read(20)
  1116. self.assertEqual(trailing_hash, b"\x00" * 20)
  1117. # Verify we can still read it back
  1118. index2 = Index(index_path)
  1119. self.assertIn(b"test.txt", index2)
  1120. def test_version_4_no_padding(self):
  1121. """Test that version 4 entries have no padding."""
  1122. # Create entries with names that would show compression benefits
  1123. entries = [
  1124. SerializedIndexEntry(
  1125. name=b"src/main/java/com/example/Service.java",
  1126. ctime=(1234567890, 0),
  1127. mtime=(1234567890, 0),
  1128. dev=1,
  1129. ino=1,
  1130. mode=0o100644,
  1131. uid=1000,
  1132. gid=1000,
  1133. size=5,
  1134. sha=b"0" * 40,
  1135. flags=0,
  1136. extended_flags=0,
  1137. ),
  1138. SerializedIndexEntry(
  1139. name=b"src/main/java/com/example/Controller.java",
  1140. ctime=(1234567890, 0),
  1141. mtime=(1234567890, 0),
  1142. dev=1,
  1143. ino=2,
  1144. mode=0o100644,
  1145. uid=1000,
  1146. gid=1000,
  1147. size=5,
  1148. sha=b"1" * 40,
  1149. flags=0,
  1150. extended_flags=0,
  1151. ),
  1152. ]
  1153. # Test version 2 (with padding, full paths)
  1154. buf_v2 = BytesIO()
  1155. from dulwich.index import write_cache_entry
  1156. previous_path = b""
  1157. for entry in entries:
  1158. # Set proper flags for v2
  1159. entry_v2 = SerializedIndexEntry(
  1160. entry.name,
  1161. entry.ctime,
  1162. entry.mtime,
  1163. entry.dev,
  1164. entry.ino,
  1165. entry.mode,
  1166. entry.uid,
  1167. entry.gid,
  1168. entry.size,
  1169. entry.sha,
  1170. len(entry.name),
  1171. entry.extended_flags,
  1172. )
  1173. write_cache_entry(buf_v2, entry_v2, version=2, previous_path=previous_path)
  1174. previous_path = entry.name
  1175. v2_data = buf_v2.getvalue()
  1176. # Test version 4 (path compression, no padding)
  1177. buf_v4 = BytesIO()
  1178. previous_path = b""
  1179. for entry in entries:
  1180. write_cache_entry(buf_v4, entry, version=4, previous_path=previous_path)
  1181. previous_path = entry.name
  1182. v4_data = buf_v4.getvalue()
  1183. # Version 4 should be shorter due to compression and no padding
  1184. self.assertLess(len(v4_data), len(v2_data))
  1185. # Both should parse correctly
  1186. buf_v2.seek(0)
  1187. from dulwich.index import read_cache_entry
  1188. previous_path = b""
  1189. parsed_v2_entries = []
  1190. for _ in entries:
  1191. parsed = read_cache_entry(buf_v2, version=2, previous_path=previous_path)
  1192. parsed_v2_entries.append(parsed)
  1193. previous_path = parsed.name
  1194. buf_v4.seek(0)
  1195. previous_path = b""
  1196. parsed_v4_entries = []
  1197. for _ in entries:
  1198. parsed = read_cache_entry(buf_v4, version=4, previous_path=previous_path)
  1199. parsed_v4_entries.append(parsed)
  1200. previous_path = parsed.name
  1201. # Both should have the same paths
  1202. for v2_entry, v4_entry in zip(parsed_v2_entries, parsed_v4_entries):
  1203. self.assertEqual(v2_entry.name, v4_entry.name)
  1204. self.assertEqual(v2_entry.sha, v4_entry.sha)
  1205. class TestManyFilesRepoIntegration(TestCase):
  1206. """Tests for manyFiles feature integration with Repo."""
  1207. def setUp(self):
  1208. self.tempdir = tempfile.mkdtemp()
  1209. self.addCleanup(shutil.rmtree, self.tempdir)
  1210. def test_repo_with_manyfiles_config(self):
  1211. """Test that a repository with feature.manyFiles=true uses the right settings."""
  1212. # Create a new repository
  1213. repo = Repo.init(self.tempdir)
  1214. # Set feature.manyFiles=true in config
  1215. config = repo.get_config()
  1216. config.set(b"feature", b"manyFiles", b"true")
  1217. config.write_to_path()
  1218. # Open the index - should have skipHash enabled and version 4
  1219. index = repo.open_index()
  1220. self.assertTrue(index._skip_hash)
  1221. self.assertEqual(index._version, 4)
  1222. def test_repo_with_explicit_index_settings(self):
  1223. """Test that explicit index.version and index.skipHash work."""
  1224. # Create a new repository
  1225. repo = Repo.init(self.tempdir)
  1226. # Set explicit index settings
  1227. config = repo.get_config()
  1228. config.set(b"index", b"version", b"3")
  1229. config.set(b"index", b"skipHash", b"false")
  1230. config.write_to_path()
  1231. # Open the index - should respect explicit settings
  1232. index = repo.open_index()
  1233. self.assertFalse(index._skip_hash)
  1234. self.assertEqual(index._version, 3)
  1235. class TestPathPrefixCompression(TestCase):
  1236. """Tests for index version 4 path prefix compression."""
  1237. def setUp(self):
  1238. self.tempdir = tempfile.mkdtemp()
  1239. self.addCleanup(shutil.rmtree, self.tempdir)
  1240. def test_varint_encoding_decoding(self):
  1241. """Test variable-width integer encoding and decoding."""
  1242. from dulwich.index import _decode_varint, _encode_varint
  1243. test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
  1244. for value in test_values:
  1245. encoded = _encode_varint(value)
  1246. decoded, _ = _decode_varint(encoded, 0)
  1247. self.assertEqual(value, decoded, f"Failed for value {value}")
  1248. def test_path_compression_simple(self):
  1249. """Test simple path compression cases."""
  1250. # Test case 1: No common prefix
  1251. compressed = _compress_path(b"file1.txt", b"")
  1252. decompressed, _ = _decompress_path(compressed, 0, b"")
  1253. self.assertEqual(b"file1.txt", decompressed)
  1254. # Test case 2: Common prefix
  1255. compressed = _compress_path(b"src/file2.txt", b"src/file1.txt")
  1256. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1257. self.assertEqual(b"src/file2.txt", decompressed)
  1258. # Test case 3: Completely different paths
  1259. compressed = _compress_path(b"docs/readme.md", b"src/file1.txt")
  1260. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1261. self.assertEqual(b"docs/readme.md", decompressed)
  1262. def test_path_compression_deep_directories(self):
  1263. """Test compression with deep directory structures."""
  1264. path1 = b"src/main/java/com/example/service/UserService.java"
  1265. path2 = b"src/main/java/com/example/service/OrderService.java"
  1266. path3 = b"src/main/java/com/example/model/User.java"
  1267. # Compress path2 relative to path1
  1268. compressed = _compress_path(path2, path1)
  1269. decompressed, _ = _decompress_path(compressed, 0, path1)
  1270. self.assertEqual(path2, decompressed)
  1271. # Compress path3 relative to path2
  1272. compressed = _compress_path(path3, path2)
  1273. decompressed, _ = _decompress_path(compressed, 0, path2)
  1274. self.assertEqual(path3, decompressed)
  1275. def test_index_version_4_with_compression(self):
  1276. """Test full index version 4 write/read with path compression."""
  1277. index_path = os.path.join(self.tempdir, "index")
  1278. # Create an index with version 4
  1279. index = Index(index_path, read=False, version=4)
  1280. # Add multiple entries with common prefixes
  1281. paths = [
  1282. b"src/main/java/App.java",
  1283. b"src/main/java/Utils.java",
  1284. b"src/main/resources/config.properties",
  1285. b"src/test/java/AppTest.java",
  1286. b"docs/README.md",
  1287. b"docs/INSTALL.md",
  1288. ]
  1289. for i, path in enumerate(paths):
  1290. entry = IndexEntry(
  1291. ctime=(1234567890, 0),
  1292. mtime=(1234567890, 0),
  1293. dev=1,
  1294. ino=i + 1,
  1295. mode=0o100644,
  1296. uid=1000,
  1297. gid=1000,
  1298. size=10,
  1299. sha=f"{i:040d}".encode(),
  1300. )
  1301. index[path] = entry
  1302. # Write and read back
  1303. index.write()
  1304. # Read the index back
  1305. index2 = Index(index_path)
  1306. self.assertEqual(index2._version, 4)
  1307. # Verify all paths were preserved correctly
  1308. for path in paths:
  1309. self.assertIn(path, index2)
  1310. # Verify the index file is smaller than version 2 would be
  1311. with open(index_path, "rb") as f:
  1312. v4_size = len(f.read())
  1313. # Create equivalent version 2 index for comparison
  1314. index_v2_path = os.path.join(self.tempdir, "index_v2")
  1315. index_v2 = Index(index_v2_path, read=False, version=2)
  1316. for path in paths:
  1317. entry = IndexEntry(
  1318. ctime=(1234567890, 0),
  1319. mtime=(1234567890, 0),
  1320. dev=1,
  1321. ino=1,
  1322. mode=0o100644,
  1323. uid=1000,
  1324. gid=1000,
  1325. size=10,
  1326. sha=b"0" * 40,
  1327. )
  1328. index_v2[path] = entry
  1329. index_v2.write()
  1330. with open(index_v2_path, "rb") as f:
  1331. v2_size = len(f.read())
  1332. # Version 4 should be smaller due to compression
  1333. self.assertLess(
  1334. v4_size, v2_size, "Version 4 index should be smaller than version 2"
  1335. )
  1336. def test_path_compression_edge_cases(self):
  1337. """Test edge cases in path compression."""
  1338. # Empty paths
  1339. compressed = _compress_path(b"", b"")
  1340. decompressed, _ = _decompress_path(compressed, 0, b"")
  1341. self.assertEqual(b"", decompressed)
  1342. # Path identical to previous
  1343. compressed = _compress_path(b"same.txt", b"same.txt")
  1344. decompressed, _ = _decompress_path(compressed, 0, b"same.txt")
  1345. self.assertEqual(b"same.txt", decompressed)
  1346. # Path shorter than previous
  1347. compressed = _compress_path(b"short", b"very/long/path/file.txt")
  1348. decompressed, _ = _decompress_path(compressed, 0, b"very/long/path/file.txt")
  1349. self.assertEqual(b"short", decompressed)
  1350. class TestUpdateWorkingTree(TestCase):
  1351. def setUp(self):
  1352. self.tempdir = tempfile.mkdtemp()
  1353. def cleanup_tempdir():
  1354. """Remove tempdir, handling read-only files on Windows."""
  1355. def remove_readonly(func, path, excinfo):
  1356. """Error handler for Windows read-only files."""
  1357. import stat
  1358. if sys.platform == "win32" and excinfo[0] is PermissionError:
  1359. os.chmod(path, stat.S_IWRITE)
  1360. func(path)
  1361. else:
  1362. raise
  1363. shutil.rmtree(self.tempdir, onerror=remove_readonly)
  1364. self.addCleanup(cleanup_tempdir)
  1365. self.repo = Repo.init(self.tempdir)
  1366. def test_update_working_tree_with_blob_normalizer(self):
  1367. """Test update_working_tree with a blob normalizer."""
  1368. # Create a simple blob normalizer that converts CRLF to LF
  1369. class TestBlobNormalizer:
  1370. def checkout_normalize(self, blob, path):
  1371. # Convert CRLF to LF during checkout
  1372. new_blob = Blob()
  1373. new_blob.data = blob.data.replace(b"\r\n", b"\n")
  1374. return new_blob
  1375. # Create a tree with a file containing CRLF
  1376. blob = Blob()
  1377. blob.data = b"Hello\r\nWorld\r\n"
  1378. self.repo.object_store.add_object(blob)
  1379. tree = Tree()
  1380. tree[b"test.txt"] = (0o100644, blob.id)
  1381. self.repo.object_store.add_object(tree)
  1382. # Update working tree with normalizer
  1383. normalizer = TestBlobNormalizer()
  1384. update_working_tree(
  1385. self.repo,
  1386. None, # old_tree_id
  1387. tree.id, # new_tree_id
  1388. blob_normalizer=normalizer,
  1389. )
  1390. # Check that the file was written with LF line endings
  1391. test_file = os.path.join(self.tempdir, "test.txt")
  1392. with open(test_file, "rb") as f:
  1393. content = f.read()
  1394. self.assertEqual(b"Hello\nWorld\n", content)
  1395. # Check that the index has the original blob SHA
  1396. index = self.repo.open_index()
  1397. self.assertEqual(blob.id, index[b"test.txt"].sha)
  1398. def test_update_working_tree_without_blob_normalizer(self):
  1399. """Test update_working_tree without a blob normalizer."""
  1400. # Create a tree with a file containing CRLF
  1401. blob = Blob()
  1402. blob.data = b"Hello\r\nWorld\r\n"
  1403. self.repo.object_store.add_object(blob)
  1404. tree = Tree()
  1405. tree[b"test.txt"] = (0o100644, blob.id)
  1406. self.repo.object_store.add_object(tree)
  1407. # Update working tree without normalizer
  1408. update_working_tree(
  1409. self.repo,
  1410. None, # old_tree_id
  1411. tree.id, # new_tree_id
  1412. blob_normalizer=None,
  1413. )
  1414. # Check that the file was written with original CRLF line endings
  1415. test_file = os.path.join(self.tempdir, "test.txt")
  1416. with open(test_file, "rb") as f:
  1417. content = f.read()
  1418. self.assertEqual(b"Hello\r\nWorld\r\n", content)
  1419. # Check that the index has the blob SHA
  1420. index = self.repo.open_index()
  1421. self.assertEqual(blob.id, index[b"test.txt"].sha)
  1422. def test_update_working_tree_remove_directory(self):
  1423. """Test that update_working_tree properly removes directories."""
  1424. # Create initial tree with a directory containing files
  1425. blob1 = Blob()
  1426. blob1.data = b"content1"
  1427. self.repo.object_store.add_object(blob1)
  1428. blob2 = Blob()
  1429. blob2.data = b"content2"
  1430. self.repo.object_store.add_object(blob2)
  1431. tree1 = Tree()
  1432. tree1[b"dir/file1.txt"] = (0o100644, blob1.id)
  1433. tree1[b"dir/file2.txt"] = (0o100644, blob2.id)
  1434. self.repo.object_store.add_object(tree1)
  1435. # Update to tree1 (create directory with files)
  1436. update_working_tree(self.repo, None, tree1.id)
  1437. # Verify directory and files exist
  1438. dir_path = os.path.join(self.tempdir, "dir")
  1439. self.assertTrue(os.path.isdir(dir_path))
  1440. self.assertTrue(os.path.exists(os.path.join(dir_path, "file1.txt")))
  1441. self.assertTrue(os.path.exists(os.path.join(dir_path, "file2.txt")))
  1442. # Create empty tree (remove everything)
  1443. tree2 = Tree()
  1444. self.repo.object_store.add_object(tree2)
  1445. # Update to empty tree
  1446. update_working_tree(self.repo, tree1.id, tree2.id)
  1447. # Verify directory was removed
  1448. self.assertFalse(os.path.exists(dir_path))
  1449. def test_update_working_tree_submodule_to_file(self):
  1450. """Test replacing a submodule directory with a file."""
  1451. # Create tree with submodule
  1452. submodule_sha = b"a" * 40
  1453. tree1 = Tree()
  1454. tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
  1455. self.repo.object_store.add_object(tree1)
  1456. # Update to tree with submodule
  1457. update_working_tree(self.repo, None, tree1.id)
  1458. # Verify submodule directory exists with .git file
  1459. submodule_path = os.path.join(self.tempdir, "submodule")
  1460. self.assertTrue(os.path.isdir(submodule_path))
  1461. self.assertTrue(os.path.exists(os.path.join(submodule_path, ".git")))
  1462. # Create tree with file at same path
  1463. blob = Blob()
  1464. blob.data = b"file content"
  1465. self.repo.object_store.add_object(blob)
  1466. tree2 = Tree()
  1467. tree2[b"submodule"] = (0o100644, blob.id)
  1468. self.repo.object_store.add_object(tree2)
  1469. # Update to tree with file (should remove submodule directory and create file)
  1470. update_working_tree(self.repo, tree1.id, tree2.id)
  1471. # Verify it's now a file
  1472. self.assertTrue(os.path.isfile(submodule_path))
  1473. with open(submodule_path, "rb") as f:
  1474. self.assertEqual(b"file content", f.read())
  1475. def test_update_working_tree_directory_with_nested_subdir(self):
  1476. """Test removing directory with nested subdirectories."""
  1477. # Create tree with nested directories
  1478. blob = Blob()
  1479. blob.data = b"deep content"
  1480. self.repo.object_store.add_object(blob)
  1481. tree1 = Tree()
  1482. tree1[b"a/b/c/file.txt"] = (0o100644, blob.id)
  1483. self.repo.object_store.add_object(tree1)
  1484. # Update to tree1
  1485. update_working_tree(self.repo, None, tree1.id)
  1486. # Verify nested structure exists
  1487. path_a = os.path.join(self.tempdir, "a")
  1488. path_b = os.path.join(path_a, "b")
  1489. path_c = os.path.join(path_b, "c")
  1490. file_path = os.path.join(path_c, "file.txt")
  1491. self.assertTrue(os.path.exists(file_path))
  1492. # Create empty tree
  1493. tree2 = Tree()
  1494. self.repo.object_store.add_object(tree2)
  1495. # Update to empty tree
  1496. update_working_tree(self.repo, tree1.id, tree2.id)
  1497. # Verify all directories were removed
  1498. self.assertFalse(os.path.exists(path_a))
  1499. def test_update_working_tree_file_replaced_by_dir_not_removed(self):
  1500. """Test that a directory replacing a git file is left alone if not empty."""
  1501. # Create tree with a file
  1502. blob = Blob()
  1503. blob.data = b"file content"
  1504. self.repo.object_store.add_object(blob)
  1505. tree1 = Tree()
  1506. tree1[b"path"] = (0o100644, blob.id)
  1507. self.repo.object_store.add_object(tree1)
  1508. # Update to tree1
  1509. update_working_tree(self.repo, None, tree1.id)
  1510. # Verify file exists
  1511. file_path = os.path.join(self.tempdir, "path")
  1512. self.assertTrue(os.path.isfile(file_path))
  1513. # Manually replace file with directory containing untracked file
  1514. os.remove(file_path)
  1515. os.mkdir(file_path)
  1516. with open(os.path.join(file_path, "untracked.txt"), "w") as f:
  1517. f.write("untracked content")
  1518. # Create empty tree
  1519. tree2 = Tree()
  1520. self.repo.object_store.add_object(tree2)
  1521. # Update should succeed but leave the directory alone
  1522. update_working_tree(self.repo, tree1.id, tree2.id)
  1523. # Directory should still exist with its contents
  1524. self.assertTrue(os.path.isdir(file_path))
  1525. self.assertTrue(os.path.exists(os.path.join(file_path, "untracked.txt")))
  1526. def test_update_working_tree_file_replaced_by_empty_dir_removed(self):
  1527. """Test that an empty directory replacing a git file is removed."""
  1528. # Create tree with a file
  1529. blob = Blob()
  1530. blob.data = b"file content"
  1531. self.repo.object_store.add_object(blob)
  1532. tree1 = Tree()
  1533. tree1[b"path"] = (0o100644, blob.id)
  1534. self.repo.object_store.add_object(tree1)
  1535. # Update to tree1
  1536. update_working_tree(self.repo, None, tree1.id)
  1537. # Verify file exists
  1538. file_path = os.path.join(self.tempdir, "path")
  1539. self.assertTrue(os.path.isfile(file_path))
  1540. # Manually replace file with empty directory
  1541. os.remove(file_path)
  1542. os.mkdir(file_path)
  1543. # Create empty tree
  1544. tree2 = Tree()
  1545. self.repo.object_store.add_object(tree2)
  1546. # Update should remove the empty directory
  1547. update_working_tree(self.repo, tree1.id, tree2.id)
  1548. # Directory should be gone
  1549. self.assertFalse(os.path.exists(file_path))
  1550. def test_update_working_tree_symlink_transitions(self):
  1551. """Test transitions involving symlinks."""
  1552. # Skip on Windows where symlinks might not be supported
  1553. if sys.platform == "win32":
  1554. self.skipTest("Symlinks not fully supported on Windows")
  1555. # Create tree with symlink
  1556. blob1 = Blob()
  1557. blob1.data = b"target/path"
  1558. self.repo.object_store.add_object(blob1)
  1559. tree1 = Tree()
  1560. tree1[b"link"] = (0o120000, blob1.id) # Symlink mode
  1561. self.repo.object_store.add_object(tree1)
  1562. # Update to tree with symlink
  1563. update_working_tree(self.repo, None, tree1.id)
  1564. link_path = os.path.join(self.tempdir, "link")
  1565. self.assertTrue(os.path.islink(link_path))
  1566. self.assertEqual(b"target/path", os.readlink(link_path).encode())
  1567. # Test 1: Replace symlink with regular file
  1568. blob2 = Blob()
  1569. blob2.data = b"file content"
  1570. self.repo.object_store.add_object(blob2)
  1571. tree2 = Tree()
  1572. tree2[b"link"] = (0o100644, blob2.id)
  1573. self.repo.object_store.add_object(tree2)
  1574. update_working_tree(self.repo, tree1.id, tree2.id)
  1575. self.assertFalse(os.path.islink(link_path))
  1576. self.assertTrue(os.path.isfile(link_path))
  1577. with open(link_path, "rb") as f:
  1578. self.assertEqual(b"file content", f.read())
  1579. # Test 2: Replace file with symlink
  1580. update_working_tree(self.repo, tree2.id, tree1.id)
  1581. self.assertTrue(os.path.islink(link_path))
  1582. self.assertEqual(b"target/path", os.readlink(link_path).encode())
  1583. # Test 3: Replace symlink with directory (manually)
  1584. os.unlink(link_path)
  1585. os.mkdir(link_path)
  1586. # Create empty tree
  1587. tree3 = Tree()
  1588. self.repo.object_store.add_object(tree3)
  1589. # Should remove empty directory
  1590. update_working_tree(self.repo, tree1.id, tree3.id)
  1591. self.assertFalse(os.path.exists(link_path))
  1592. def test_update_working_tree_modified_file_to_dir_transition(self):
  1593. """Test that modified files are not removed when they should be directories."""
  1594. # Create tree with file
  1595. blob1 = Blob()
  1596. blob1.data = b"original content"
  1597. self.repo.object_store.add_object(blob1)
  1598. tree1 = Tree()
  1599. tree1[b"path"] = (0o100644, blob1.id)
  1600. self.repo.object_store.add_object(tree1)
  1601. # Update to tree1
  1602. update_working_tree(self.repo, None, tree1.id)
  1603. file_path = os.path.join(self.tempdir, "path")
  1604. # Modify the file locally
  1605. with open(file_path, "w") as f:
  1606. f.write("modified content")
  1607. # Create tree where path is a directory with file
  1608. blob2 = Blob()
  1609. blob2.data = b"subfile content"
  1610. self.repo.object_store.add_object(blob2)
  1611. tree2 = Tree()
  1612. tree2[b"path/subfile"] = (0o100644, blob2.id)
  1613. self.repo.object_store.add_object(tree2)
  1614. # Update should fail because can't create directory where modified file exists
  1615. with self.assertRaises(IOError):
  1616. update_working_tree(self.repo, tree1.id, tree2.id)
  1617. # File should still exist with modifications
  1618. self.assertTrue(os.path.isfile(file_path))
  1619. with open(file_path) as f:
  1620. self.assertEqual("modified content", f.read())
  1621. def test_update_working_tree_executable_transitions(self):
  1622. """Test transitions involving executable bit changes."""
  1623. # Skip on Windows where executable bit is not supported
  1624. if sys.platform == "win32":
  1625. self.skipTest("Executable bit not supported on Windows")
  1626. # Create tree with non-executable file
  1627. blob = Blob()
  1628. blob.data = b"#!/bin/sh\necho hello"
  1629. self.repo.object_store.add_object(blob)
  1630. tree1 = Tree()
  1631. tree1[b"script.sh"] = (0o100644, blob.id) # Non-executable
  1632. self.repo.object_store.add_object(tree1)
  1633. # Update to tree1
  1634. update_working_tree(self.repo, None, tree1.id)
  1635. script_path = os.path.join(self.tempdir, "script.sh")
  1636. self.assertTrue(os.path.isfile(script_path))
  1637. # Check it's not executable
  1638. mode = os.stat(script_path).st_mode
  1639. self.assertFalse(mode & stat.S_IXUSR)
  1640. # Create tree with executable file (same content)
  1641. tree2 = Tree()
  1642. tree2[b"script.sh"] = (0o100755, blob.id) # Executable
  1643. self.repo.object_store.add_object(tree2)
  1644. # Update to tree2
  1645. update_working_tree(self.repo, tree1.id, tree2.id)
  1646. # Check it's now executable
  1647. mode = os.stat(script_path).st_mode
  1648. self.assertTrue(mode & stat.S_IXUSR)
  1649. def test_update_working_tree_submodule_with_untracked_files(self):
  1650. """Test that submodules with untracked files are not removed."""
  1651. from dulwich.objects import S_IFGITLINK, Tree
  1652. # Create tree with submodule
  1653. submodule_sha = b"a" * 40
  1654. tree1 = Tree()
  1655. tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
  1656. self.repo.object_store.add_object(tree1)
  1657. # Update to tree with submodule
  1658. update_working_tree(self.repo, None, tree1.id)
  1659. # Add untracked file to submodule directory
  1660. submodule_path = os.path.join(self.tempdir, "submodule")
  1661. untracked_path = os.path.join(submodule_path, "untracked.txt")
  1662. with open(untracked_path, "w") as f:
  1663. f.write("untracked content")
  1664. # Create empty tree
  1665. tree2 = Tree()
  1666. self.repo.object_store.add_object(tree2)
  1667. # Update should not remove submodule directory with untracked files
  1668. update_working_tree(self.repo, tree1.id, tree2.id)
  1669. # Directory should still exist with untracked file
  1670. self.assertTrue(os.path.isdir(submodule_path))
  1671. self.assertTrue(os.path.exists(untracked_path))
  1672. def test_update_working_tree_dir_to_file_with_subdir(self):
  1673. """Test replacing directory structure with a file."""
  1674. # Create tree with nested directory structure
  1675. blob1 = Blob()
  1676. blob1.data = b"content1"
  1677. self.repo.object_store.add_object(blob1)
  1678. blob2 = Blob()
  1679. blob2.data = b"content2"
  1680. self.repo.object_store.add_object(blob2)
  1681. tree1 = Tree()
  1682. tree1[b"dir/subdir/file1"] = (0o100644, blob1.id)
  1683. tree1[b"dir/subdir/file2"] = (0o100644, blob2.id)
  1684. self.repo.object_store.add_object(tree1)
  1685. # Update to tree1
  1686. update_working_tree(self.repo, None, tree1.id)
  1687. # Verify structure exists
  1688. dir_path = os.path.join(self.tempdir, "dir")
  1689. self.assertTrue(os.path.isdir(dir_path))
  1690. # Add an untracked file to make directory truly non-empty
  1691. untracked_path = os.path.join(dir_path, "untracked.txt")
  1692. with open(untracked_path, "w") as f:
  1693. f.write("untracked content")
  1694. # Create tree with file at "dir" path
  1695. blob3 = Blob()
  1696. blob3.data = b"replacement file"
  1697. self.repo.object_store.add_object(blob3)
  1698. tree2 = Tree()
  1699. tree2[b"dir"] = (0o100644, blob3.id)
  1700. self.repo.object_store.add_object(tree2)
  1701. # Update should fail because directory is not empty
  1702. with self.assertRaises(IsADirectoryError):
  1703. update_working_tree(self.repo, tree1.id, tree2.id)
  1704. # Directory should still exist
  1705. self.assertTrue(os.path.isdir(dir_path))
  1706. def test_update_working_tree_case_sensitivity(self):
  1707. """Test handling of case-sensitive filename changes."""
  1708. # Create tree with lowercase file
  1709. blob1 = Blob()
  1710. blob1.data = b"lowercase content"
  1711. self.repo.object_store.add_object(blob1)
  1712. tree1 = Tree()
  1713. tree1[b"readme.txt"] = (0o100644, blob1.id)
  1714. self.repo.object_store.add_object(tree1)
  1715. # Update to tree1
  1716. update_working_tree(self.repo, None, tree1.id)
  1717. # Create tree with uppercase file (different content)
  1718. blob2 = Blob()
  1719. blob2.data = b"uppercase content"
  1720. self.repo.object_store.add_object(blob2)
  1721. tree2 = Tree()
  1722. tree2[b"README.txt"] = (0o100644, blob2.id)
  1723. self.repo.object_store.add_object(tree2)
  1724. # Update to tree2
  1725. update_working_tree(self.repo, tree1.id, tree2.id)
  1726. # Check what exists (behavior depends on filesystem)
  1727. lowercase_path = os.path.join(self.tempdir, "readme.txt")
  1728. uppercase_path = os.path.join(self.tempdir, "README.txt")
  1729. # On case-insensitive filesystems, one will overwrite the other
  1730. # On case-sensitive filesystems, both may exist
  1731. self.assertTrue(
  1732. os.path.exists(lowercase_path) or os.path.exists(uppercase_path)
  1733. )
  1734. def test_update_working_tree_deeply_nested_removal(self):
  1735. """Test removal of deeply nested directory structures."""
  1736. # Create deeply nested structure
  1737. blob = Blob()
  1738. blob.data = b"deep content"
  1739. self.repo.object_store.add_object(blob)
  1740. tree1 = Tree()
  1741. # Create a very deep path
  1742. deep_path = b"/".join([b"level%d" % i for i in range(10)])
  1743. tree1[deep_path + b"/file.txt"] = (0o100644, blob.id)
  1744. self.repo.object_store.add_object(tree1)
  1745. # Update to tree1
  1746. update_working_tree(self.repo, None, tree1.id)
  1747. # Verify deep structure exists
  1748. current_path = self.tempdir
  1749. for i in range(10):
  1750. current_path = os.path.join(current_path, f"level{i}")
  1751. self.assertTrue(os.path.isdir(current_path))
  1752. # Create empty tree
  1753. tree2 = Tree()
  1754. self.repo.object_store.add_object(tree2)
  1755. # Update should remove all empty directories
  1756. update_working_tree(self.repo, tree1.id, tree2.id)
  1757. # Verify top level directory is gone
  1758. top_level = os.path.join(self.tempdir, "level0")
  1759. self.assertFalse(os.path.exists(top_level))
  1760. def test_update_working_tree_read_only_files(self):
  1761. """Test handling of read-only files during updates."""
  1762. # Create tree with file
  1763. blob1 = Blob()
  1764. blob1.data = b"original content"
  1765. self.repo.object_store.add_object(blob1)
  1766. tree1 = Tree()
  1767. tree1[b"readonly.txt"] = (0o100644, blob1.id)
  1768. self.repo.object_store.add_object(tree1)
  1769. # Update to tree1
  1770. update_working_tree(self.repo, None, tree1.id)
  1771. # Make file read-only
  1772. file_path = os.path.join(self.tempdir, "readonly.txt")
  1773. os.chmod(file_path, 0o444) # Read-only
  1774. # Create tree with modified file
  1775. blob2 = Blob()
  1776. blob2.data = b"new content"
  1777. self.repo.object_store.add_object(blob2)
  1778. tree2 = Tree()
  1779. tree2[b"readonly.txt"] = (0o100644, blob2.id)
  1780. self.repo.object_store.add_object(tree2)
  1781. # Update should handle read-only file
  1782. update_working_tree(self.repo, tree1.id, tree2.id)
  1783. # Verify content was updated
  1784. with open(file_path, "rb") as f:
  1785. self.assertEqual(b"new content", f.read())
  1786. def test_update_working_tree_invalid_filenames(self):
  1787. """Test handling of invalid filenames for the platform."""
  1788. # Create tree with potentially problematic filenames
  1789. blob = Blob()
  1790. blob.data = b"content"
  1791. self.repo.object_store.add_object(blob)
  1792. tree = Tree()
  1793. # Add files with names that might be invalid on some platforms
  1794. tree[b"valid.txt"] = (0o100644, blob.id)
  1795. if sys.platform != "win32":
  1796. # These are invalid on Windows but valid on Unix
  1797. tree[b"file:with:colons.txt"] = (0o100644, blob.id)
  1798. tree[b"file<with>brackets.txt"] = (0o100644, blob.id)
  1799. self.repo.object_store.add_object(tree)
  1800. # Update should skip invalid files based on validation
  1801. update_working_tree(self.repo, None, tree.id)
  1802. # Valid file should exist
  1803. self.assertTrue(os.path.exists(os.path.join(self.tempdir, "valid.txt")))
  1804. def test_update_working_tree_symlink_to_directory(self):
  1805. """Test replacing a symlink pointing to a directory with a real directory."""
  1806. if sys.platform == "win32":
  1807. self.skipTest("Symlinks not fully supported on Windows")
  1808. # Create a target directory
  1809. target_dir = os.path.join(self.tempdir, "target")
  1810. os.mkdir(target_dir)
  1811. with open(os.path.join(target_dir, "file.txt"), "w") as f:
  1812. f.write("target file")
  1813. # Create tree with symlink pointing to directory
  1814. blob1 = Blob()
  1815. blob1.data = b"target" # Relative path to target directory
  1816. self.repo.object_store.add_object(blob1)
  1817. tree1 = Tree()
  1818. tree1[b"link"] = (0o120000, blob1.id)
  1819. self.repo.object_store.add_object(tree1)
  1820. # Update to tree1
  1821. update_working_tree(self.repo, None, tree1.id)
  1822. link_path = os.path.join(self.tempdir, "link")
  1823. self.assertTrue(os.path.islink(link_path))
  1824. # Create tree with actual directory at same path
  1825. blob2 = Blob()
  1826. blob2.data = b"new file content"
  1827. self.repo.object_store.add_object(blob2)
  1828. tree2 = Tree()
  1829. tree2[b"link/newfile.txt"] = (0o100644, blob2.id)
  1830. self.repo.object_store.add_object(tree2)
  1831. # Update should replace symlink with actual directory
  1832. update_working_tree(self.repo, tree1.id, tree2.id)
  1833. self.assertFalse(os.path.islink(link_path))
  1834. self.assertTrue(os.path.isdir(link_path))
  1835. self.assertTrue(os.path.exists(os.path.join(link_path, "newfile.txt")))
  1836. def test_update_working_tree_comprehensive_transitions(self):
  1837. """Test all possible file type transitions comprehensively."""
  1838. # Skip on Windows where symlinks might not be supported
  1839. if sys.platform == "win32":
  1840. self.skipTest("Symlinks not fully supported on Windows")
  1841. # Create blobs for different file types
  1842. file_blob = Blob()
  1843. file_blob.data = b"regular file content"
  1844. self.repo.object_store.add_object(file_blob)
  1845. exec_blob = Blob()
  1846. exec_blob.data = b"#!/bin/sh\necho executable"
  1847. self.repo.object_store.add_object(exec_blob)
  1848. link_blob = Blob()
  1849. link_blob.data = b"target/path"
  1850. self.repo.object_store.add_object(link_blob)
  1851. submodule_sha = b"a" * 40
  1852. # Test 1: Regular file → Submodule
  1853. tree1 = Tree()
  1854. tree1[b"item"] = (0o100644, file_blob.id)
  1855. self.repo.object_store.add_object(tree1)
  1856. tree2 = Tree()
  1857. tree2[b"item"] = (S_IFGITLINK, submodule_sha)
  1858. self.repo.object_store.add_object(tree2)
  1859. update_working_tree(self.repo, None, tree1.id)
  1860. self.assertTrue(os.path.isfile(os.path.join(self.tempdir, "item")))
  1861. update_working_tree(self.repo, tree1.id, tree2.id)
  1862. self.assertTrue(os.path.isdir(os.path.join(self.tempdir, "item")))
  1863. # Test 2: Submodule → Executable file
  1864. tree3 = Tree()
  1865. tree3[b"item"] = (0o100755, exec_blob.id)
  1866. self.repo.object_store.add_object(tree3)
  1867. update_working_tree(self.repo, tree2.id, tree3.id)
  1868. item_path = os.path.join(self.tempdir, "item")
  1869. self.assertTrue(os.path.isfile(item_path))
  1870. if sys.platform != "win32":
  1871. self.assertTrue(os.access(item_path, os.X_OK))
  1872. # Test 3: Executable file → Symlink
  1873. tree4 = Tree()
  1874. tree4[b"item"] = (0o120000, link_blob.id)
  1875. self.repo.object_store.add_object(tree4)
  1876. update_working_tree(self.repo, tree3.id, tree4.id)
  1877. self.assertTrue(os.path.islink(item_path))
  1878. # Test 4: Symlink → Submodule
  1879. tree5 = Tree()
  1880. tree5[b"item"] = (S_IFGITLINK, submodule_sha)
  1881. self.repo.object_store.add_object(tree5)
  1882. update_working_tree(self.repo, tree4.id, tree5.id)
  1883. self.assertTrue(os.path.isdir(item_path))
  1884. # Test 5: Clean up - Submodule → absent
  1885. tree6 = Tree()
  1886. self.repo.object_store.add_object(tree6)
  1887. update_working_tree(self.repo, tree5.id, tree6.id)
  1888. self.assertFalse(os.path.exists(item_path))
  1889. # Test 6: Symlink → Executable file
  1890. tree7 = Tree()
  1891. tree7[b"item2"] = (0o120000, link_blob.id)
  1892. self.repo.object_store.add_object(tree7)
  1893. update_working_tree(self.repo, tree6.id, tree7.id)
  1894. item2_path = os.path.join(self.tempdir, "item2")
  1895. self.assertTrue(os.path.islink(item2_path))
  1896. tree8 = Tree()
  1897. tree8[b"item2"] = (0o100755, exec_blob.id)
  1898. self.repo.object_store.add_object(tree8)
  1899. update_working_tree(self.repo, tree7.id, tree8.id)
  1900. self.assertTrue(os.path.isfile(item2_path))
  1901. if sys.platform != "win32":
  1902. self.assertTrue(os.access(item2_path, os.X_OK))
  1903. def test_update_working_tree_partial_update_failure(self):
  1904. """Test handling when update fails partway through."""
  1905. # Create initial tree
  1906. blob1 = Blob()
  1907. blob1.data = b"file1 content"
  1908. self.repo.object_store.add_object(blob1)
  1909. blob2 = Blob()
  1910. blob2.data = b"file2 content"
  1911. self.repo.object_store.add_object(blob2)
  1912. tree1 = Tree()
  1913. tree1[b"file1.txt"] = (0o100644, blob1.id)
  1914. tree1[b"file2.txt"] = (0o100644, blob2.id)
  1915. self.repo.object_store.add_object(tree1)
  1916. # Update to tree1
  1917. update_working_tree(self.repo, None, tree1.id)
  1918. # Create a directory where file2.txt is, to cause a conflict
  1919. file2_path = os.path.join(self.tempdir, "file2.txt")
  1920. os.remove(file2_path)
  1921. os.mkdir(file2_path)
  1922. # Add untracked file to prevent removal
  1923. with open(os.path.join(file2_path, "blocker.txt"), "w") as f:
  1924. f.write("blocking content")
  1925. # Create tree with updates to both files
  1926. blob3 = Blob()
  1927. blob3.data = b"file1 updated"
  1928. self.repo.object_store.add_object(blob3)
  1929. blob4 = Blob()
  1930. blob4.data = b"file2 updated"
  1931. self.repo.object_store.add_object(blob4)
  1932. tree2 = Tree()
  1933. tree2[b"file1.txt"] = (0o100644, blob3.id)
  1934. tree2[b"file2.txt"] = (0o100644, blob4.id)
  1935. self.repo.object_store.add_object(tree2)
  1936. # Update should partially succeed - file1 updated, file2 blocked
  1937. try:
  1938. update_working_tree(self.repo, tree1.id, tree2.id)
  1939. except IsADirectoryError:
  1940. # Expected to fail on file2 because it's a directory
  1941. pass
  1942. # file1 should be updated
  1943. with open(os.path.join(self.tempdir, "file1.txt"), "rb") as f:
  1944. self.assertEqual(b"file1 updated", f.read())
  1945. # file2 should still be a directory
  1946. self.assertTrue(os.path.isdir(file2_path))