test_index.py 91 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the index."""
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from io import BytesIO
  29. from dulwich.diff_tree import tree_changes
  30. from dulwich.index import (
  31. Index,
  32. IndexEntry,
  33. SerializedIndexEntry,
  34. _compress_path,
  35. _decode_varint,
  36. _decompress_path,
  37. _encode_varint,
  38. _fs_to_tree_path,
  39. _tree_to_fs_path,
  40. build_index_from_tree,
  41. cleanup_mode,
  42. commit_tree,
  43. get_unstaged_changes,
  44. index_entry_from_directory,
  45. index_entry_from_path,
  46. index_entry_from_stat,
  47. iter_fresh_entries,
  48. read_index,
  49. read_index_dict,
  50. update_working_tree,
  51. validate_path_element_default,
  52. validate_path_element_hfs,
  53. validate_path_element_ntfs,
  54. write_cache_time,
  55. write_index,
  56. write_index_dict,
  57. )
  58. from dulwich.object_store import MemoryObjectStore
  59. from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
  60. from dulwich.repo import Repo
  61. from . import TestCase, skipIf
  62. def can_symlink() -> bool:
  63. """Return whether running process can create symlinks."""
  64. if sys.platform != "win32":
  65. # Platforms other than Windows should allow symlinks without issues.
  66. return True
  67. test_source = tempfile.mkdtemp()
  68. test_target = test_source + "can_symlink"
  69. try:
  70. os.symlink(test_source, test_target)
  71. except (NotImplementedError, OSError):
  72. return False
  73. return True
  74. class IndexTestCase(TestCase):
  75. datadir = os.path.join(os.path.dirname(__file__), "../testdata/indexes")
  76. def get_simple_index(self, name):
  77. return Index(os.path.join(self.datadir, name))
  78. class SimpleIndexTestCase(IndexTestCase):
  79. def test_len(self) -> None:
  80. self.assertEqual(1, len(self.get_simple_index("index")))
  81. def test_iter(self) -> None:
  82. self.assertEqual([b"bla"], list(self.get_simple_index("index")))
  83. def test_iter_skip_hash(self) -> None:
  84. self.assertEqual([b"bla"], list(self.get_simple_index("index_skip_hash")))
  85. def test_iterobjects(self) -> None:
  86. self.assertEqual(
  87. [(b"bla", b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", 33188)],
  88. list(self.get_simple_index("index").iterobjects()),
  89. )
  90. def test_getitem(self) -> None:
  91. self.assertEqual(
  92. IndexEntry(
  93. (1230680220, 0),
  94. (1230680220, 0),
  95. 2050,
  96. 3761020,
  97. 33188,
  98. 1000,
  99. 1000,
  100. 0,
  101. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  102. 0,
  103. 0,
  104. ),
  105. self.get_simple_index("index")[b"bla"],
  106. )
  107. def test_empty(self) -> None:
  108. i = self.get_simple_index("notanindex")
  109. self.assertEqual(0, len(i))
  110. self.assertFalse(os.path.exists(i._filename))
  111. def test_against_empty_tree(self) -> None:
  112. i = self.get_simple_index("index")
  113. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  114. self.assertEqual(1, len(changes))
  115. (oldname, newname), (oldmode, newmode), (oldsha, newsha) = changes[0]
  116. self.assertEqual(b"bla", newname)
  117. self.assertEqual(b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", newsha)
  118. def test_index_pathlib(self) -> None:
  119. import tempfile
  120. from pathlib import Path
  121. # Create a temporary index file
  122. with tempfile.NamedTemporaryFile(suffix=".index", delete=False) as f:
  123. temp_path = f.name
  124. self.addCleanup(os.unlink, temp_path)
  125. # Test creating Index with pathlib.Path
  126. path_obj = Path(temp_path)
  127. index = Index(path_obj, read=False)
  128. self.assertEqual(str(path_obj), index.path)
  129. # Add an entry and write
  130. index[b"test"] = IndexEntry(
  131. ctime=(0, 0),
  132. mtime=(0, 0),
  133. dev=0,
  134. ino=0,
  135. mode=33188,
  136. uid=0,
  137. gid=0,
  138. size=0,
  139. sha=b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  140. )
  141. index.write()
  142. # Read it back with pathlib.Path
  143. index2 = Index(path_obj)
  144. self.assertIn(b"test", index2)
  145. class SimpleIndexWriterTestCase(IndexTestCase):
  146. def setUp(self) -> None:
  147. IndexTestCase.setUp(self)
  148. self.tempdir = tempfile.mkdtemp()
  149. def tearDown(self) -> None:
  150. IndexTestCase.tearDown(self)
  151. shutil.rmtree(self.tempdir)
  152. def test_simple_write(self) -> None:
  153. entries = [
  154. (
  155. SerializedIndexEntry(
  156. b"barbla",
  157. (1230680220, 0),
  158. (1230680220, 0),
  159. 2050,
  160. 3761020,
  161. 33188,
  162. 1000,
  163. 1000,
  164. 0,
  165. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  166. 0,
  167. 0,
  168. )
  169. )
  170. ]
  171. filename = os.path.join(self.tempdir, "test-simple-write-index")
  172. with open(filename, "wb+") as x:
  173. write_index(x, entries)
  174. with open(filename, "rb") as x:
  175. self.assertEqual(entries, list(read_index(x)))
  176. class ReadIndexDictTests(IndexTestCase):
  177. def setUp(self) -> None:
  178. IndexTestCase.setUp(self)
  179. self.tempdir = tempfile.mkdtemp()
  180. def tearDown(self) -> None:
  181. IndexTestCase.tearDown(self)
  182. shutil.rmtree(self.tempdir)
  183. def test_simple_write(self) -> None:
  184. entries = {
  185. b"barbla": IndexEntry(
  186. (1230680220, 0),
  187. (1230680220, 0),
  188. 2050,
  189. 3761020,
  190. 33188,
  191. 1000,
  192. 1000,
  193. 0,
  194. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  195. 0,
  196. 0,
  197. )
  198. }
  199. filename = os.path.join(self.tempdir, "test-simple-write-index")
  200. with open(filename, "wb+") as x:
  201. write_index_dict(x, entries)
  202. with open(filename, "rb") as x:
  203. self.assertEqual(entries, read_index_dict(x))
  204. class CommitTreeTests(TestCase):
  205. def setUp(self) -> None:
  206. super().setUp()
  207. self.store = MemoryObjectStore()
  208. def test_single_blob(self) -> None:
  209. blob = Blob()
  210. blob.data = b"foo"
  211. self.store.add_object(blob)
  212. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  213. rootid = commit_tree(self.store, blobs)
  214. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  215. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  216. self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
  217. def test_nested(self) -> None:
  218. blob = Blob()
  219. blob.data = b"foo"
  220. self.store.add_object(blob)
  221. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  222. rootid = commit_tree(self.store, blobs)
  223. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  224. dirid = self.store[rootid][b"bla"][1]
  225. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  226. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  227. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  228. self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
  229. class CleanupModeTests(TestCase):
  230. def assertModeEqual(self, expected, got) -> None:
  231. self.assertEqual(expected, got, f"{expected:o} != {got:o}")
  232. def test_file(self) -> None:
  233. self.assertModeEqual(0o100644, cleanup_mode(0o100000))
  234. def test_executable(self) -> None:
  235. self.assertModeEqual(0o100755, cleanup_mode(0o100711))
  236. self.assertModeEqual(0o100755, cleanup_mode(0o100700))
  237. def test_symlink(self) -> None:
  238. self.assertModeEqual(0o120000, cleanup_mode(0o120711))
  239. def test_dir(self) -> None:
  240. self.assertModeEqual(0o040000, cleanup_mode(0o40531))
  241. def test_submodule(self) -> None:
  242. self.assertModeEqual(0o160000, cleanup_mode(0o160744))
  243. class WriteCacheTimeTests(TestCase):
  244. def test_write_string(self) -> None:
  245. f = BytesIO()
  246. self.assertRaises(TypeError, write_cache_time, f, "foo")
  247. def test_write_int(self) -> None:
  248. f = BytesIO()
  249. write_cache_time(f, 434343)
  250. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  251. def test_write_tuple(self) -> None:
  252. f = BytesIO()
  253. write_cache_time(f, (434343, 21))
  254. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  255. def test_write_float(self) -> None:
  256. f = BytesIO()
  257. write_cache_time(f, 434343.000000021)
  258. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  259. class IndexEntryFromStatTests(TestCase):
  260. def test_simple(self) -> None:
  261. st = os.stat_result(
  262. (
  263. 16877,
  264. 131078,
  265. 64769,
  266. 154,
  267. 1000,
  268. 1000,
  269. 12288,
  270. 1323629595,
  271. 1324180496,
  272. 1324180496,
  273. )
  274. )
  275. entry = index_entry_from_stat(st, b"22" * 20)
  276. self.assertEqual(
  277. entry,
  278. IndexEntry(
  279. 1324180496,
  280. 1324180496,
  281. 64769,
  282. 131078,
  283. 16384,
  284. 1000,
  285. 1000,
  286. 12288,
  287. b"2222222222222222222222222222222222222222",
  288. 0,
  289. 0,
  290. ),
  291. )
  292. def test_override_mode(self) -> None:
  293. st = os.stat_result(
  294. (
  295. stat.S_IFREG + 0o644,
  296. 131078,
  297. 64769,
  298. 154,
  299. 1000,
  300. 1000,
  301. 12288,
  302. 1323629595,
  303. 1324180496,
  304. 1324180496,
  305. )
  306. )
  307. entry = index_entry_from_stat(st, b"22" * 20, mode=stat.S_IFREG + 0o755)
  308. self.assertEqual(
  309. entry,
  310. IndexEntry(
  311. 1324180496,
  312. 1324180496,
  313. 64769,
  314. 131078,
  315. 33261,
  316. 1000,
  317. 1000,
  318. 12288,
  319. b"2222222222222222222222222222222222222222",
  320. 0,
  321. 0,
  322. ),
  323. )
  324. class BuildIndexTests(TestCase):
  325. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha) -> None:
  326. self.assertEqual(index_entry.mode, mode) # mode
  327. self.assertEqual(index_entry.size, filesize) # filesize
  328. self.assertEqual(index_entry.sha, sha) # sha
  329. def assertFileContents(self, path, contents, symlink=False) -> None:
  330. if symlink:
  331. self.assertEqual(os.readlink(path), contents)
  332. else:
  333. with open(path, "rb") as f:
  334. self.assertEqual(f.read(), contents)
  335. def test_empty(self) -> None:
  336. repo_dir = tempfile.mkdtemp()
  337. self.addCleanup(shutil.rmtree, repo_dir)
  338. with Repo.init(repo_dir) as repo:
  339. tree = Tree()
  340. repo.object_store.add_object(tree)
  341. build_index_from_tree(
  342. repo.path, repo.index_path(), repo.object_store, tree.id
  343. )
  344. # Verify index entries
  345. index = repo.open_index()
  346. self.assertEqual(len(index), 0)
  347. # Verify no files
  348. self.assertEqual([".git"], os.listdir(repo.path))
  349. def test_git_dir(self) -> None:
  350. repo_dir = tempfile.mkdtemp()
  351. self.addCleanup(shutil.rmtree, repo_dir)
  352. with Repo.init(repo_dir) as repo:
  353. # Populate repo
  354. filea = Blob.from_string(b"file a")
  355. filee = Blob.from_string(b"d")
  356. tree = Tree()
  357. tree[b".git/a"] = (stat.S_IFREG | 0o644, filea.id)
  358. tree[b"c/e"] = (stat.S_IFREG | 0o644, filee.id)
  359. repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]])
  360. build_index_from_tree(
  361. repo.path, repo.index_path(), repo.object_store, tree.id
  362. )
  363. # Verify index entries
  364. index = repo.open_index()
  365. self.assertEqual(len(index), 1)
  366. # filea
  367. apath = os.path.join(repo.path, ".git", "a")
  368. self.assertFalse(os.path.exists(apath))
  369. # filee
  370. epath = os.path.join(repo.path, "c", "e")
  371. self.assertTrue(os.path.exists(epath))
  372. self.assertReasonableIndexEntry(
  373. index[b"c/e"], stat.S_IFREG | 0o644, 1, filee.id
  374. )
  375. self.assertFileContents(epath, b"d")
  376. def test_nonempty(self) -> None:
  377. repo_dir = tempfile.mkdtemp()
  378. self.addCleanup(shutil.rmtree, repo_dir)
  379. with Repo.init(repo_dir) as repo:
  380. # Populate repo
  381. filea = Blob.from_string(b"file a")
  382. fileb = Blob.from_string(b"file b")
  383. filed = Blob.from_string(b"file d")
  384. tree = Tree()
  385. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  386. tree[b"b"] = (stat.S_IFREG | 0o644, fileb.id)
  387. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  388. repo.object_store.add_objects(
  389. [(o, None) for o in [filea, fileb, filed, tree]]
  390. )
  391. build_index_from_tree(
  392. repo.path, repo.index_path(), repo.object_store, tree.id
  393. )
  394. # Verify index entries
  395. index = repo.open_index()
  396. self.assertEqual(len(index), 3)
  397. # filea
  398. apath = os.path.join(repo.path, "a")
  399. self.assertTrue(os.path.exists(apath))
  400. self.assertReasonableIndexEntry(
  401. index[b"a"], stat.S_IFREG | 0o644, 6, filea.id
  402. )
  403. self.assertFileContents(apath, b"file a")
  404. # fileb
  405. bpath = os.path.join(repo.path, "b")
  406. self.assertTrue(os.path.exists(bpath))
  407. self.assertReasonableIndexEntry(
  408. index[b"b"], stat.S_IFREG | 0o644, 6, fileb.id
  409. )
  410. self.assertFileContents(bpath, b"file b")
  411. # filed
  412. dpath = os.path.join(repo.path, "c", "d")
  413. self.assertTrue(os.path.exists(dpath))
  414. self.assertReasonableIndexEntry(
  415. index[b"c/d"], stat.S_IFREG | 0o644, 6, filed.id
  416. )
  417. self.assertFileContents(dpath, b"file d")
  418. # Verify no extra files
  419. self.assertEqual([".git", "a", "b", "c"], sorted(os.listdir(repo.path)))
  420. self.assertEqual(["d"], sorted(os.listdir(os.path.join(repo.path, "c"))))
  421. @skipIf(not getattr(os, "sync", None), "Requires sync support")
  422. def test_norewrite(self) -> None:
  423. repo_dir = tempfile.mkdtemp()
  424. self.addCleanup(shutil.rmtree, repo_dir)
  425. with Repo.init(repo_dir) as repo:
  426. # Populate repo
  427. filea = Blob.from_string(b"file a")
  428. filea_path = os.path.join(repo_dir, "a")
  429. tree = Tree()
  430. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  431. repo.object_store.add_objects([(o, None) for o in [filea, tree]])
  432. # First Write
  433. build_index_from_tree(
  434. repo.path, repo.index_path(), repo.object_store, tree.id
  435. )
  436. # Use sync as metadata can be cached on some FS
  437. os.sync()
  438. mtime = os.stat(filea_path).st_mtime
  439. # Test Rewrite
  440. build_index_from_tree(
  441. repo.path, repo.index_path(), repo.object_store, tree.id
  442. )
  443. os.sync()
  444. self.assertEqual(mtime, os.stat(filea_path).st_mtime)
  445. # Modify content
  446. with open(filea_path, "wb") as fh:
  447. fh.write(b"test a")
  448. os.sync()
  449. mtime = os.stat(filea_path).st_mtime
  450. # Test rewrite
  451. build_index_from_tree(
  452. repo.path, repo.index_path(), repo.object_store, tree.id
  453. )
  454. os.sync()
  455. with open(filea_path, "rb") as fh:
  456. self.assertEqual(b"file a", fh.read())
  457. @skipIf(not can_symlink(), "Requires symlink support")
  458. def test_symlink(self) -> None:
  459. repo_dir = tempfile.mkdtemp()
  460. self.addCleanup(shutil.rmtree, repo_dir)
  461. with Repo.init(repo_dir) as repo:
  462. # Populate repo
  463. filed = Blob.from_string(b"file d")
  464. filee = Blob.from_string(b"d")
  465. tree = Tree()
  466. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  467. tree[b"c/e"] = (stat.S_IFLNK, filee.id) # symlink
  468. repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]])
  469. build_index_from_tree(
  470. repo.path, repo.index_path(), repo.object_store, tree.id
  471. )
  472. # Verify index entries
  473. index = repo.open_index()
  474. # symlink to d
  475. epath = os.path.join(repo.path, "c", "e")
  476. self.assertTrue(os.path.exists(epath))
  477. self.assertReasonableIndexEntry(
  478. index[b"c/e"],
  479. stat.S_IFLNK,
  480. 0 if sys.platform == "win32" else 1,
  481. filee.id,
  482. )
  483. self.assertFileContents(epath, "d", symlink=True)
  484. def test_no_decode_encode(self) -> None:
  485. repo_dir = tempfile.mkdtemp()
  486. repo_dir_bytes = os.fsencode(repo_dir)
  487. self.addCleanup(shutil.rmtree, repo_dir)
  488. with Repo.init(repo_dir) as repo:
  489. # Populate repo
  490. file = Blob.from_string(b"foo")
  491. tree = Tree()
  492. latin1_name = "À".encode("latin1")
  493. try:
  494. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  495. except UnicodeDecodeError:
  496. self.skipTest("can not decode as latin1")
  497. utf8_name = "À".encode()
  498. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  499. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  500. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  501. repo.object_store.add_objects([(o, None) for o in [file, tree]])
  502. try:
  503. build_index_from_tree(
  504. repo.path, repo.index_path(), repo.object_store, tree.id
  505. )
  506. except OSError as e:
  507. if e.errno == 92 and sys.platform == "darwin":
  508. # Our filename isn't supported by the platform :(
  509. self.skipTest(f"can not write filename {e.filename!r}")
  510. else:
  511. raise
  512. except UnicodeDecodeError:
  513. # This happens e.g. with python3.6 on Windows.
  514. # It implicitly decodes using utf8, which doesn't work.
  515. self.skipTest("can not implicitly convert as utf8")
  516. # Verify index entries
  517. index = repo.open_index()
  518. self.assertIn(latin1_name, index)
  519. self.assertIn(utf8_name, index)
  520. self.assertTrue(os.path.exists(latin1_path))
  521. self.assertTrue(os.path.exists(utf8_path))
  522. def test_windows_unicode_filename_encoding(self) -> None:
  523. """Test that Unicode filenames are handled correctly on Windows.
  524. This test verifies the fix for GitHub issue #203, where filenames
  525. containing Unicode characters like 'À' were incorrectly encoded/decoded
  526. on Windows, resulting in corruption like 'À' -> 'À'.
  527. """
  528. repo_dir = tempfile.mkdtemp()
  529. self.addCleanup(shutil.rmtree, repo_dir)
  530. with Repo.init(repo_dir) as repo:
  531. # Create a blob
  532. file_content = b"test file content"
  533. blob = Blob.from_string(file_content)
  534. # Create a tree with a Unicode filename
  535. tree = Tree()
  536. unicode_filename = "À" # This is the character from GitHub issue #203
  537. utf8_filename_bytes = unicode_filename.encode(
  538. "utf-8"
  539. ) # This is how it's stored in git trees
  540. tree[utf8_filename_bytes] = (stat.S_IFREG | 0o644, blob.id)
  541. repo.object_store.add_objects([(blob, None), (tree, None)])
  542. # Build index from tree (this is what happens during checkout/clone)
  543. try:
  544. build_index_from_tree(
  545. repo.path, repo.index_path(), repo.object_store, tree.id
  546. )
  547. except (OSError, UnicodeError) as e:
  548. if sys.platform == "win32" and "cannot" in str(e).lower():
  549. self.skipTest(f"Platform doesn't support filename: {e}")
  550. raise
  551. # Check that the file was created correctly
  552. expected_file_path = os.path.join(repo.path, unicode_filename)
  553. self.assertTrue(
  554. os.path.exists(expected_file_path),
  555. f"File should exist at {expected_file_path}",
  556. )
  557. # Verify the file content is correct
  558. with open(expected_file_path, "rb") as f:
  559. actual_content = f.read()
  560. self.assertEqual(actual_content, file_content)
  561. # Test the reverse: adding a Unicode filename to the index
  562. if sys.platform == "win32":
  563. # On Windows, test that _tree_to_fs_path and _fs_to_tree_path
  564. # handle UTF-8 encoded tree paths correctly
  565. from dulwich.index import _fs_to_tree_path, _tree_to_fs_path
  566. repo_path_bytes = os.fsencode(repo.path)
  567. # Test tree path to filesystem path conversion
  568. fs_path = _tree_to_fs_path(repo_path_bytes, utf8_filename_bytes)
  569. expected_fs_path = os.path.join(
  570. repo_path_bytes, os.fsencode(unicode_filename)
  571. )
  572. self.assertEqual(fs_path, expected_fs_path)
  573. # Test filesystem path to tree path conversion
  574. # _fs_to_tree_path expects relative paths, not absolute paths
  575. # Extract just the filename from the full path
  576. filename_only = os.path.basename(fs_path)
  577. reconstructed_tree_path = _fs_to_tree_path(
  578. filename_only, tree_encoding="utf-8"
  579. )
  580. self.assertEqual(reconstructed_tree_path, utf8_filename_bytes)
  581. def test_git_submodule(self) -> None:
  582. repo_dir = tempfile.mkdtemp()
  583. self.addCleanup(shutil.rmtree, repo_dir)
  584. with Repo.init(repo_dir) as repo:
  585. filea = Blob.from_string(b"file alalala")
  586. subtree = Tree()
  587. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  588. c = Commit()
  589. c.tree = subtree.id
  590. c.committer = c.author = b"Somebody <somebody@example.com>"
  591. c.commit_time = c.author_time = 42342
  592. c.commit_timezone = c.author_timezone = 0
  593. c.parents = []
  594. c.message = b"Subcommit"
  595. tree = Tree()
  596. tree[b"c"] = (S_IFGITLINK, c.id)
  597. repo.object_store.add_objects([(o, None) for o in [tree]])
  598. build_index_from_tree(
  599. repo.path, repo.index_path(), repo.object_store, tree.id
  600. )
  601. # Verify index entries
  602. index = repo.open_index()
  603. self.assertEqual(len(index), 1)
  604. # filea
  605. apath = os.path.join(repo.path, "c/a")
  606. self.assertFalse(os.path.exists(apath))
  607. # dir c
  608. cpath = os.path.join(repo.path, "c")
  609. self.assertTrue(os.path.isdir(cpath))
  610. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  611. self.assertEqual(index[b"c"].sha, c.id) # sha
  612. def test_git_submodule_exists(self) -> None:
  613. repo_dir = tempfile.mkdtemp()
  614. self.addCleanup(shutil.rmtree, repo_dir)
  615. with Repo.init(repo_dir) as repo:
  616. filea = Blob.from_string(b"file alalala")
  617. subtree = Tree()
  618. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  619. c = Commit()
  620. c.tree = subtree.id
  621. c.committer = c.author = b"Somebody <somebody@example.com>"
  622. c.commit_time = c.author_time = 42342
  623. c.commit_timezone = c.author_timezone = 0
  624. c.parents = []
  625. c.message = b"Subcommit"
  626. tree = Tree()
  627. tree[b"c"] = (S_IFGITLINK, c.id)
  628. os.mkdir(os.path.join(repo_dir, "c"))
  629. repo.object_store.add_objects([(o, None) for o in [tree]])
  630. build_index_from_tree(
  631. repo.path, repo.index_path(), repo.object_store, tree.id
  632. )
  633. # Verify index entries
  634. index = repo.open_index()
  635. self.assertEqual(len(index), 1)
  636. # filea
  637. apath = os.path.join(repo.path, "c/a")
  638. self.assertFalse(os.path.exists(apath))
  639. # dir c
  640. cpath = os.path.join(repo.path, "c")
  641. self.assertTrue(os.path.isdir(cpath))
  642. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  643. self.assertEqual(index[b"c"].sha, c.id) # sha
  644. def test_with_line_ending_normalization(self) -> None:
  645. """Test that build_index_from_tree applies line-ending normalization."""
  646. repo_dir = tempfile.mkdtemp()
  647. self.addCleanup(shutil.rmtree, repo_dir)
  648. from dulwich.line_ending import BlobNormalizer
  649. with Repo.init(repo_dir) as repo:
  650. # Set up autocrlf config
  651. config = repo.get_config()
  652. config.set((b"core",), b"autocrlf", b"true")
  653. config.write_to_path()
  654. # Create blob with LF line endings
  655. content_lf = b"line1\nline2\nline3\n"
  656. blob = Blob.from_string(content_lf)
  657. tree = Tree()
  658. tree[b"test.txt"] = (stat.S_IFREG | 0o644, blob.id)
  659. repo.object_store.add_objects([(blob, None), (tree, None)])
  660. # Create blob normalizer
  661. autocrlf = config.get((b"core",), b"autocrlf")
  662. blob_normalizer = BlobNormalizer(config, {}, autocrlf=autocrlf)
  663. # Build index with normalization
  664. build_index_from_tree(
  665. repo.path,
  666. repo.index_path(),
  667. repo.object_store,
  668. tree.id,
  669. blob_normalizer=blob_normalizer,
  670. )
  671. # On Windows with autocrlf=true, file should have CRLF line endings
  672. test_file = os.path.join(repo.path, "test.txt")
  673. with open(test_file, "rb") as f:
  674. content = f.read()
  675. # autocrlf=true means LF -> CRLF on checkout (on all platforms for testing)
  676. expected_content = b"line1\r\nline2\r\nline3\r\n"
  677. self.assertEqual(content, expected_content)
  678. class GetUnstagedChangesTests(TestCase):
  679. def test_get_unstaged_changes(self) -> None:
  680. """Unit test for get_unstaged_changes."""
  681. repo_dir = tempfile.mkdtemp()
  682. self.addCleanup(shutil.rmtree, repo_dir)
  683. with Repo.init(repo_dir) as repo:
  684. # Commit a dummy file then modify it
  685. foo1_fullpath = os.path.join(repo_dir, "foo1")
  686. with open(foo1_fullpath, "wb") as f:
  687. f.write(b"origstuff")
  688. foo2_fullpath = os.path.join(repo_dir, "foo2")
  689. with open(foo2_fullpath, "wb") as f:
  690. f.write(b"origstuff")
  691. repo.stage(["foo1", "foo2"])
  692. repo.do_commit(
  693. b"test status",
  694. author=b"author <email>",
  695. committer=b"committer <email>",
  696. )
  697. with open(foo1_fullpath, "wb") as f:
  698. f.write(b"newstuff")
  699. # modify access and modify time of path
  700. os.utime(foo1_fullpath, (0, 0))
  701. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  702. self.assertEqual(list(changes), [b"foo1"])
  703. def test_get_unstaged_deleted_changes(self) -> None:
  704. """Unit test for get_unstaged_changes."""
  705. repo_dir = tempfile.mkdtemp()
  706. self.addCleanup(shutil.rmtree, repo_dir)
  707. with Repo.init(repo_dir) as repo:
  708. # Commit a dummy file then remove it
  709. foo1_fullpath = os.path.join(repo_dir, "foo1")
  710. with open(foo1_fullpath, "wb") as f:
  711. f.write(b"origstuff")
  712. repo.stage(["foo1"])
  713. repo.do_commit(
  714. b"test status",
  715. author=b"author <email>",
  716. committer=b"committer <email>",
  717. )
  718. os.unlink(foo1_fullpath)
  719. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  720. self.assertEqual(list(changes), [b"foo1"])
  721. def test_get_unstaged_changes_removed_replaced_by_directory(self) -> None:
  722. """Unit test for get_unstaged_changes."""
  723. repo_dir = tempfile.mkdtemp()
  724. self.addCleanup(shutil.rmtree, repo_dir)
  725. with Repo.init(repo_dir) as repo:
  726. # Commit a dummy file then modify it
  727. foo1_fullpath = os.path.join(repo_dir, "foo1")
  728. with open(foo1_fullpath, "wb") as f:
  729. f.write(b"origstuff")
  730. repo.stage(["foo1"])
  731. repo.do_commit(
  732. b"test status",
  733. author=b"author <email>",
  734. committer=b"committer <email>",
  735. )
  736. os.remove(foo1_fullpath)
  737. os.mkdir(foo1_fullpath)
  738. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  739. self.assertEqual(list(changes), [b"foo1"])
  740. @skipIf(not can_symlink(), "Requires symlink support")
  741. def test_get_unstaged_changes_removed_replaced_by_link(self) -> None:
  742. """Unit test for get_unstaged_changes."""
  743. repo_dir = tempfile.mkdtemp()
  744. self.addCleanup(shutil.rmtree, repo_dir)
  745. with Repo.init(repo_dir) as repo:
  746. # Commit a dummy file then modify it
  747. foo1_fullpath = os.path.join(repo_dir, "foo1")
  748. with open(foo1_fullpath, "wb") as f:
  749. f.write(b"origstuff")
  750. repo.stage(["foo1"])
  751. repo.do_commit(
  752. b"test status",
  753. author=b"author <email>",
  754. committer=b"committer <email>",
  755. )
  756. os.remove(foo1_fullpath)
  757. os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
  758. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  759. self.assertEqual(list(changes), [b"foo1"])
  760. class TestValidatePathElement(TestCase):
  761. def test_default(self) -> None:
  762. self.assertTrue(validate_path_element_default(b"bla"))
  763. self.assertTrue(validate_path_element_default(b".bla"))
  764. self.assertFalse(validate_path_element_default(b".git"))
  765. self.assertFalse(validate_path_element_default(b".giT"))
  766. self.assertFalse(validate_path_element_default(b".."))
  767. self.assertTrue(validate_path_element_default(b"git~1"))
  768. def test_ntfs(self) -> None:
  769. self.assertTrue(validate_path_element_ntfs(b"bla"))
  770. self.assertTrue(validate_path_element_ntfs(b".bla"))
  771. self.assertFalse(validate_path_element_ntfs(b".git"))
  772. self.assertFalse(validate_path_element_ntfs(b".giT"))
  773. self.assertFalse(validate_path_element_ntfs(b".."))
  774. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  775. def test_hfs(self) -> None:
  776. # Normal paths should pass
  777. self.assertTrue(validate_path_element_hfs(b"bla"))
  778. self.assertTrue(validate_path_element_hfs(b".bla"))
  779. # Basic .git variations should fail
  780. self.assertFalse(validate_path_element_hfs(b".git"))
  781. self.assertFalse(validate_path_element_hfs(b".giT"))
  782. self.assertFalse(validate_path_element_hfs(b".GIT"))
  783. self.assertFalse(validate_path_element_hfs(b".."))
  784. # git~1 should also fail on HFS+
  785. self.assertFalse(validate_path_element_hfs(b"git~1"))
  786. # Test HFS+ Unicode normalization attacks
  787. # .g\u200cit (zero-width non-joiner)
  788. self.assertFalse(validate_path_element_hfs(b".g\xe2\x80\x8cit"))
  789. # .gi\u200dt (zero-width joiner)
  790. self.assertFalse(validate_path_element_hfs(b".gi\xe2\x80\x8dt"))
  791. # Test other ignorable characters
  792. # .g\ufeffit (zero-width no-break space)
  793. self.assertFalse(validate_path_element_hfs(b".g\xef\xbb\xbfit"))
  794. # Valid Unicode that shouldn't be confused with .git
  795. self.assertTrue(validate_path_element_hfs(b".g\xc3\xaft")) # .gït
  796. self.assertTrue(validate_path_element_hfs(b"git")) # git without dot
  797. class TestTreeFSPathConversion(TestCase):
  798. def test_tree_to_fs_path(self) -> None:
  799. tree_path = "délwíçh/foo".encode()
  800. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  801. self.assertEqual(
  802. fs_path,
  803. os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
  804. )
  805. def test_tree_to_fs_path_windows_separator(self) -> None:
  806. tree_path = b"path/with/slash"
  807. original_sep = os.sep.encode("ascii")
  808. # Temporarily modify os_sep_bytes to test Windows path conversion
  809. # This simulates Windows behavior on all platforms for testing
  810. import dulwich.index
  811. dulwich.index.os_sep_bytes = b"\\"
  812. self.addCleanup(setattr, dulwich.index, "os_sep_bytes", original_sep)
  813. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  814. # The function should join the prefix path with the converted tree path
  815. # The expected behavior is that the path separators in the tree_path are
  816. # converted to the platform-specific separator (which we've set to backslash)
  817. expected_path = os.path.join(b"/prefix/path", b"path\\with\\slash")
  818. self.assertEqual(fs_path, expected_path)
  819. def test_fs_to_tree_path_str(self) -> None:
  820. fs_path = os.path.join(os.path.join("délwíçh", "foo"))
  821. tree_path = _fs_to_tree_path(fs_path)
  822. self.assertEqual(tree_path, "délwíçh/foo".encode())
  823. def test_fs_to_tree_path_bytes(self) -> None:
  824. fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
  825. tree_path = _fs_to_tree_path(fs_path)
  826. self.assertEqual(tree_path, "délwíçh/foo".encode())
  827. def test_fs_to_tree_path_windows_separator(self) -> None:
  828. # Test conversion of Windows paths to tree paths
  829. fs_path = b"path\\with\\backslash"
  830. original_sep = os.sep.encode("ascii")
  831. # Temporarily modify os_sep_bytes to test Windows path conversion
  832. import dulwich.index
  833. dulwich.index.os_sep_bytes = b"\\"
  834. self.addCleanup(setattr, dulwich.index, "os_sep_bytes", original_sep)
  835. tree_path = _fs_to_tree_path(fs_path)
  836. self.assertEqual(tree_path, b"path/with/backslash")
  837. class TestIndexEntryFromPath(TestCase):
  838. def setUp(self):
  839. self.tempdir = tempfile.mkdtemp()
  840. self.addCleanup(shutil.rmtree, self.tempdir)
  841. def test_index_entry_from_path_file(self) -> None:
  842. """Test creating index entry from a regular file."""
  843. # Create a test file
  844. test_file = os.path.join(self.tempdir, "testfile")
  845. with open(test_file, "wb") as f:
  846. f.write(b"test content")
  847. # Get the index entry
  848. entry = index_entry_from_path(os.fsencode(test_file))
  849. # Verify the entry was created with the right mode
  850. self.assertIsNotNone(entry)
  851. self.assertEqual(cleanup_mode(os.stat(test_file).st_mode), entry.mode)
  852. @skipIf(not can_symlink(), "Requires symlink support")
  853. def test_index_entry_from_path_symlink(self) -> None:
  854. """Test creating index entry from a symlink."""
  855. # Create a target file
  856. target_file = os.path.join(self.tempdir, "target")
  857. with open(target_file, "wb") as f:
  858. f.write(b"target content")
  859. # Create a symlink
  860. link_file = os.path.join(self.tempdir, "symlink")
  861. os.symlink(target_file, link_file)
  862. # Get the index entry
  863. entry = index_entry_from_path(os.fsencode(link_file))
  864. # Verify the entry was created with the right mode
  865. self.assertIsNotNone(entry)
  866. self.assertEqual(cleanup_mode(os.lstat(link_file).st_mode), entry.mode)
  867. def test_index_entry_from_path_directory(self) -> None:
  868. """Test creating index entry from a directory (should return None)."""
  869. # Create a directory
  870. test_dir = os.path.join(self.tempdir, "testdir")
  871. os.mkdir(test_dir)
  872. # Get the index entry for a directory
  873. entry = index_entry_from_path(os.fsencode(test_dir))
  874. # Should return None for regular directories
  875. self.assertIsNone(entry)
  876. def test_index_entry_from_directory_regular(self) -> None:
  877. """Test index_entry_from_directory with a regular directory."""
  878. # Create a directory
  879. test_dir = os.path.join(self.tempdir, "testdir")
  880. os.mkdir(test_dir)
  881. # Get stat for the directory
  882. st = os.lstat(test_dir)
  883. # Get the index entry for a regular directory
  884. entry = index_entry_from_directory(st, os.fsencode(test_dir))
  885. # Should return None for regular directories
  886. self.assertIsNone(entry)
  887. def test_index_entry_from_directory_git_submodule(self) -> None:
  888. """Test index_entry_from_directory with a Git submodule."""
  889. # Create a git repository that will be a submodule
  890. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  891. os.mkdir(sub_repo_dir)
  892. # Create the .git directory to make it look like a git repo
  893. git_dir = os.path.join(sub_repo_dir, ".git")
  894. os.mkdir(git_dir)
  895. # Create HEAD file with a fake commit SHA
  896. head_sha = b"1234567890" * 4 # 40-char fake SHA
  897. with open(os.path.join(git_dir, "HEAD"), "wb") as f:
  898. f.write(head_sha)
  899. # Get stat for the submodule directory
  900. st = os.lstat(sub_repo_dir)
  901. # Get the index entry for a git submodule directory
  902. entry = index_entry_from_directory(st, os.fsencode(sub_repo_dir))
  903. # Since we don't have a proper git setup, this might still return None
  904. # This test just ensures the code path is executed
  905. if entry is not None:
  906. # If an entry is returned, it should have the gitlink mode
  907. self.assertEqual(entry.mode, S_IFGITLINK)
  908. def test_index_entry_from_path_with_object_store(self) -> None:
  909. """Test creating index entry with object store."""
  910. # Create a test file
  911. test_file = os.path.join(self.tempdir, "testfile")
  912. with open(test_file, "wb") as f:
  913. f.write(b"test content")
  914. # Create a memory object store
  915. object_store = MemoryObjectStore()
  916. # Get the index entry and add to object store
  917. entry = index_entry_from_path(os.fsencode(test_file), object_store)
  918. # Verify we can access the blob from the object store
  919. self.assertIsNotNone(entry)
  920. blob = object_store[entry.sha]
  921. self.assertEqual(b"test content", blob.data)
  922. def test_iter_fresh_entries(self) -> None:
  923. """Test iterating over fresh entries."""
  924. # Create some test files
  925. file1 = os.path.join(self.tempdir, "file1")
  926. with open(file1, "wb") as f:
  927. f.write(b"file1 content")
  928. file2 = os.path.join(self.tempdir, "file2")
  929. with open(file2, "wb") as f:
  930. f.write(b"file2 content")
  931. # Create a memory object store
  932. object_store = MemoryObjectStore()
  933. # Get fresh entries
  934. paths = [b"file1", b"file2", b"nonexistent"]
  935. entries = dict(
  936. iter_fresh_entries(paths, os.fsencode(self.tempdir), object_store)
  937. )
  938. # Verify both files got entries but nonexistent file is None
  939. self.assertIn(b"file1", entries)
  940. self.assertIn(b"file2", entries)
  941. self.assertIn(b"nonexistent", entries)
  942. self.assertIsNotNone(entries[b"file1"])
  943. self.assertIsNotNone(entries[b"file2"])
  944. self.assertIsNone(entries[b"nonexistent"])
  945. # Check that blobs were added to object store
  946. blob1 = object_store[entries[b"file1"].sha]
  947. self.assertEqual(b"file1 content", blob1.data)
  948. blob2 = object_store[entries[b"file2"].sha]
  949. self.assertEqual(b"file2 content", blob2.data)
  950. def test_read_submodule_head(self) -> None:
  951. """Test reading the HEAD of a submodule."""
  952. from dulwich.index import read_submodule_head
  953. # Create a test repo that will be our "submodule"
  954. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  955. os.mkdir(sub_repo_dir)
  956. submodule_repo = Repo.init(sub_repo_dir)
  957. # Create a file and commit it to establish a HEAD
  958. test_file = os.path.join(sub_repo_dir, "testfile")
  959. with open(test_file, "wb") as f:
  960. f.write(b"test content")
  961. submodule_repo.stage(["testfile"])
  962. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  963. # Test reading the HEAD
  964. head_sha = read_submodule_head(sub_repo_dir)
  965. self.assertEqual(commit_id, head_sha)
  966. # Test with bytes path
  967. head_sha_bytes = read_submodule_head(os.fsencode(sub_repo_dir))
  968. self.assertEqual(commit_id, head_sha_bytes)
  969. # Test with non-existent path
  970. non_repo_dir = os.path.join(self.tempdir, "nonrepo")
  971. os.mkdir(non_repo_dir)
  972. self.assertIsNone(read_submodule_head(non_repo_dir))
  973. # Test with path that doesn't have a .git directory
  974. not_git_dir = os.path.join(self.tempdir, "notgit")
  975. os.mkdir(not_git_dir)
  976. self.assertIsNone(read_submodule_head(not_git_dir))
  977. def test_has_directory_changed(self) -> None:
  978. """Test checking if a directory has changed."""
  979. from dulwich.index import IndexEntry, _has_directory_changed
  980. # Setup mock IndexEntry
  981. mock_entry = IndexEntry(
  982. (1230680220, 0),
  983. (1230680220, 0),
  984. 2050,
  985. 3761020,
  986. 33188,
  987. 1000,
  988. 1000,
  989. 0,
  990. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  991. 0,
  992. 0,
  993. )
  994. # Test with a regular directory (not a submodule)
  995. reg_dir = os.path.join(self.tempdir, "regular_dir")
  996. os.mkdir(reg_dir)
  997. # Should return True for regular directory
  998. self.assertTrue(_has_directory_changed(os.fsencode(reg_dir), mock_entry))
  999. # Create a git repository to test submodule scenarios
  1000. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  1001. os.mkdir(sub_repo_dir)
  1002. submodule_repo = Repo.init(sub_repo_dir)
  1003. # Create a file and commit it to establish a HEAD
  1004. test_file = os.path.join(sub_repo_dir, "testfile")
  1005. with open(test_file, "wb") as f:
  1006. f.write(b"test content")
  1007. submodule_repo.stage(["testfile"])
  1008. commit_id = submodule_repo.do_commit(b"Test commit for submodule")
  1009. # Create an entry with the correct commit SHA
  1010. correct_entry = IndexEntry(
  1011. (1230680220, 0),
  1012. (1230680220, 0),
  1013. 2050,
  1014. 3761020,
  1015. 33188,
  1016. 1000,
  1017. 1000,
  1018. 0,
  1019. commit_id,
  1020. 0,
  1021. 0,
  1022. )
  1023. # Create an entry with an incorrect commit SHA
  1024. incorrect_entry = IndexEntry(
  1025. (1230680220, 0),
  1026. (1230680220, 0),
  1027. 2050,
  1028. 3761020,
  1029. 33188,
  1030. 1000,
  1031. 1000,
  1032. 0,
  1033. b"0000000000000000000000000000000000000000",
  1034. 0,
  1035. 0,
  1036. )
  1037. # Should return False for submodule with correct SHA
  1038. self.assertFalse(
  1039. _has_directory_changed(os.fsencode(sub_repo_dir), correct_entry)
  1040. )
  1041. # Should return True for submodule with incorrect SHA
  1042. self.assertTrue(
  1043. _has_directory_changed(os.fsencode(sub_repo_dir), incorrect_entry)
  1044. )
  1045. def test_get_unstaged_changes(self) -> None:
  1046. """Test detecting unstaged changes in a working tree."""
  1047. from dulwich.index import (
  1048. ConflictedIndexEntry,
  1049. Index,
  1050. IndexEntry,
  1051. get_unstaged_changes,
  1052. )
  1053. # Create a test repo
  1054. repo_dir = tempfile.mkdtemp()
  1055. self.addCleanup(shutil.rmtree, repo_dir)
  1056. # Create test index
  1057. index = Index(os.path.join(repo_dir, "index"))
  1058. # Create an actual hash of our test content
  1059. from dulwich.objects import Blob
  1060. test_blob = Blob()
  1061. test_blob.data = b"initial content"
  1062. # Create some test files with known contents
  1063. file1_path = os.path.join(repo_dir, "file1")
  1064. with open(file1_path, "wb") as f:
  1065. f.write(b"initial content")
  1066. file2_path = os.path.join(repo_dir, "file2")
  1067. with open(file2_path, "wb") as f:
  1068. f.write(b"initial content")
  1069. # Add them to index
  1070. entry1 = IndexEntry(
  1071. (1230680220, 0),
  1072. (1230680220, 0),
  1073. 2050,
  1074. 3761020,
  1075. 33188,
  1076. 1000,
  1077. 1000,
  1078. 0,
  1079. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", # Not matching actual content
  1080. 0,
  1081. 0,
  1082. )
  1083. entry2 = IndexEntry(
  1084. (1230680220, 0),
  1085. (1230680220, 0),
  1086. 2050,
  1087. 3761020,
  1088. 33188,
  1089. 1000,
  1090. 1000,
  1091. 0,
  1092. test_blob.id, # Will be content's real hash
  1093. 0,
  1094. 0,
  1095. )
  1096. # Add a file that has a conflict
  1097. entry_conflict = ConflictedIndexEntry(b"conflict", {0: None, 1: None, 2: None})
  1098. index._byname = {
  1099. b"file1": entry1,
  1100. b"file2": entry2,
  1101. b"file3": IndexEntry(
  1102. (1230680220, 0),
  1103. (1230680220, 0),
  1104. 2050,
  1105. 3761020,
  1106. 33188,
  1107. 1000,
  1108. 1000,
  1109. 0,
  1110. b"0000000000000000000000000000000000000000",
  1111. 0,
  1112. 0,
  1113. ),
  1114. b"conflict": entry_conflict,
  1115. }
  1116. # Get unstaged changes
  1117. changes = list(get_unstaged_changes(index, repo_dir))
  1118. # File1 should be unstaged (content doesn't match hash)
  1119. # File3 doesn't exist (deleted)
  1120. # Conflict is always unstaged
  1121. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3"])
  1122. # Create directory where there should be a file
  1123. os.mkdir(os.path.join(repo_dir, "file4"))
  1124. index._byname[b"file4"] = entry1
  1125. # Get unstaged changes again
  1126. changes = list(get_unstaged_changes(index, repo_dir))
  1127. # Now file4 should also be unstaged because it's a directory instead of a file
  1128. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3", b"file4"])
  1129. # Create a custom blob filter function
  1130. def filter_blob_callback(blob, path):
  1131. # Modify blob to make it look changed
  1132. blob.data = b"modified " + blob.data
  1133. return blob
  1134. # Get unstaged changes with blob filter
  1135. changes = list(get_unstaged_changes(index, repo_dir, filter_blob_callback))
  1136. # Now both file1 and file2 should be unstaged due to the filter
  1137. self.assertEqual(
  1138. sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
  1139. )
  1140. class TestManyFilesFeature(TestCase):
  1141. """Tests for the manyFiles feature (index version 4 and skipHash)."""
  1142. def setUp(self):
  1143. self.tempdir = tempfile.mkdtemp()
  1144. self.addCleanup(shutil.rmtree, self.tempdir)
  1145. def test_index_version_4_parsing(self):
  1146. """Test that index version 4 files can be parsed."""
  1147. index_path = os.path.join(self.tempdir, "index")
  1148. # Create an index with version 4
  1149. index = Index(index_path, read=False, version=4)
  1150. # Add some entries
  1151. entry = IndexEntry(
  1152. ctime=(1234567890, 0),
  1153. mtime=(1234567890, 0),
  1154. dev=1,
  1155. ino=1,
  1156. mode=0o100644,
  1157. uid=1000,
  1158. gid=1000,
  1159. size=5,
  1160. sha=b"0" * 40,
  1161. )
  1162. index[b"test.txt"] = entry
  1163. # Write and read back
  1164. index.write()
  1165. # Read the index back
  1166. index2 = Index(index_path)
  1167. self.assertEqual(index2._version, 4)
  1168. self.assertIn(b"test.txt", index2)
  1169. def test_skip_hash_feature(self):
  1170. """Test that skipHash feature works correctly."""
  1171. index_path = os.path.join(self.tempdir, "index")
  1172. # Create an index with skipHash enabled
  1173. index = Index(index_path, read=False, skip_hash=True)
  1174. # Add some entries
  1175. entry = IndexEntry(
  1176. ctime=(1234567890, 0),
  1177. mtime=(1234567890, 0),
  1178. dev=1,
  1179. ino=1,
  1180. mode=0o100644,
  1181. uid=1000,
  1182. gid=1000,
  1183. size=5,
  1184. sha=b"0" * 40,
  1185. )
  1186. index[b"test.txt"] = entry
  1187. # Write the index
  1188. index.write()
  1189. # Verify the file was written with zero hash
  1190. with open(index_path, "rb") as f:
  1191. f.seek(-20, 2) # Seek to last 20 bytes
  1192. trailing_hash = f.read(20)
  1193. self.assertEqual(trailing_hash, b"\x00" * 20)
  1194. # Verify we can still read it back
  1195. index2 = Index(index_path)
  1196. self.assertIn(b"test.txt", index2)
  1197. def test_version_4_no_padding(self):
  1198. """Test that version 4 entries have no padding."""
  1199. # Create entries with names that would show compression benefits
  1200. entries = [
  1201. SerializedIndexEntry(
  1202. name=b"src/main/java/com/example/Service.java",
  1203. ctime=(1234567890, 0),
  1204. mtime=(1234567890, 0),
  1205. dev=1,
  1206. ino=1,
  1207. mode=0o100644,
  1208. uid=1000,
  1209. gid=1000,
  1210. size=5,
  1211. sha=b"0" * 40,
  1212. flags=0,
  1213. extended_flags=0,
  1214. ),
  1215. SerializedIndexEntry(
  1216. name=b"src/main/java/com/example/Controller.java",
  1217. ctime=(1234567890, 0),
  1218. mtime=(1234567890, 0),
  1219. dev=1,
  1220. ino=2,
  1221. mode=0o100644,
  1222. uid=1000,
  1223. gid=1000,
  1224. size=5,
  1225. sha=b"1" * 40,
  1226. flags=0,
  1227. extended_flags=0,
  1228. ),
  1229. ]
  1230. # Test version 2 (with padding, full paths)
  1231. buf_v2 = BytesIO()
  1232. from dulwich.index import write_cache_entry
  1233. previous_path = b""
  1234. for entry in entries:
  1235. # Set proper flags for v2
  1236. entry_v2 = SerializedIndexEntry(
  1237. entry.name,
  1238. entry.ctime,
  1239. entry.mtime,
  1240. entry.dev,
  1241. entry.ino,
  1242. entry.mode,
  1243. entry.uid,
  1244. entry.gid,
  1245. entry.size,
  1246. entry.sha,
  1247. len(entry.name),
  1248. entry.extended_flags,
  1249. )
  1250. write_cache_entry(buf_v2, entry_v2, version=2, previous_path=previous_path)
  1251. previous_path = entry.name
  1252. v2_data = buf_v2.getvalue()
  1253. # Test version 4 (path compression, no padding)
  1254. buf_v4 = BytesIO()
  1255. previous_path = b""
  1256. for entry in entries:
  1257. write_cache_entry(buf_v4, entry, version=4, previous_path=previous_path)
  1258. previous_path = entry.name
  1259. v4_data = buf_v4.getvalue()
  1260. # Version 4 should be shorter due to compression and no padding
  1261. self.assertLess(len(v4_data), len(v2_data))
  1262. # Both should parse correctly
  1263. buf_v2.seek(0)
  1264. from dulwich.index import read_cache_entry
  1265. previous_path = b""
  1266. parsed_v2_entries = []
  1267. for _ in entries:
  1268. parsed = read_cache_entry(buf_v2, version=2, previous_path=previous_path)
  1269. parsed_v2_entries.append(parsed)
  1270. previous_path = parsed.name
  1271. buf_v4.seek(0)
  1272. previous_path = b""
  1273. parsed_v4_entries = []
  1274. for _ in entries:
  1275. parsed = read_cache_entry(buf_v4, version=4, previous_path=previous_path)
  1276. parsed_v4_entries.append(parsed)
  1277. previous_path = parsed.name
  1278. # Both should have the same paths
  1279. for v2_entry, v4_entry in zip(parsed_v2_entries, parsed_v4_entries):
  1280. self.assertEqual(v2_entry.name, v4_entry.name)
  1281. self.assertEqual(v2_entry.sha, v4_entry.sha)
  1282. class TestManyFilesRepoIntegration(TestCase):
  1283. """Tests for manyFiles feature integration with Repo."""
  1284. def setUp(self):
  1285. self.tempdir = tempfile.mkdtemp()
  1286. self.addCleanup(shutil.rmtree, self.tempdir)
  1287. def test_repo_with_manyfiles_config(self):
  1288. """Test that a repository with feature.manyFiles=true uses the right settings."""
  1289. # Create a new repository
  1290. repo = Repo.init(self.tempdir)
  1291. # Set feature.manyFiles=true in config
  1292. config = repo.get_config()
  1293. config.set(b"feature", b"manyFiles", b"true")
  1294. config.write_to_path()
  1295. # Open the index - should have skipHash enabled and version 4
  1296. index = repo.open_index()
  1297. self.assertTrue(index._skip_hash)
  1298. self.assertEqual(index._version, 4)
  1299. def test_repo_with_explicit_index_settings(self):
  1300. """Test that explicit index.version and index.skipHash work."""
  1301. # Create a new repository
  1302. repo = Repo.init(self.tempdir)
  1303. # Set explicit index settings
  1304. config = repo.get_config()
  1305. config.set(b"index", b"version", b"3")
  1306. config.set(b"index", b"skipHash", b"false")
  1307. config.write_to_path()
  1308. # Open the index - should respect explicit settings
  1309. index = repo.open_index()
  1310. self.assertFalse(index._skip_hash)
  1311. self.assertEqual(index._version, 3)
  1312. class TestPathPrefixCompression(TestCase):
  1313. """Tests for index version 4 path prefix compression."""
  1314. def setUp(self):
  1315. self.tempdir = tempfile.mkdtemp()
  1316. self.addCleanup(shutil.rmtree, self.tempdir)
  1317. def test_varint_encoding_decoding(self):
  1318. """Test variable-width integer encoding and decoding."""
  1319. test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
  1320. for value in test_values:
  1321. encoded = _encode_varint(value)
  1322. decoded, _ = _decode_varint(encoded, 0)
  1323. self.assertEqual(value, decoded, f"Failed for value {value}")
  1324. def test_path_compression_simple(self):
  1325. """Test simple path compression cases."""
  1326. # Test case 1: No common prefix
  1327. compressed = _compress_path(b"file1.txt", b"")
  1328. decompressed, _ = _decompress_path(compressed, 0, b"")
  1329. self.assertEqual(b"file1.txt", decompressed)
  1330. # Test case 2: Common prefix
  1331. compressed = _compress_path(b"src/file2.txt", b"src/file1.txt")
  1332. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1333. self.assertEqual(b"src/file2.txt", decompressed)
  1334. # Test case 3: Completely different paths
  1335. compressed = _compress_path(b"docs/readme.md", b"src/file1.txt")
  1336. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1337. self.assertEqual(b"docs/readme.md", decompressed)
  1338. def test_path_compression_deep_directories(self):
  1339. """Test compression with deep directory structures."""
  1340. path1 = b"src/main/java/com/example/service/UserService.java"
  1341. path2 = b"src/main/java/com/example/service/OrderService.java"
  1342. path3 = b"src/main/java/com/example/model/User.java"
  1343. # Compress path2 relative to path1
  1344. compressed = _compress_path(path2, path1)
  1345. decompressed, _ = _decompress_path(compressed, 0, path1)
  1346. self.assertEqual(path2, decompressed)
  1347. # Compress path3 relative to path2
  1348. compressed = _compress_path(path3, path2)
  1349. decompressed, _ = _decompress_path(compressed, 0, path2)
  1350. self.assertEqual(path3, decompressed)
  1351. def test_index_version_4_with_compression(self):
  1352. """Test full index version 4 write/read with path compression."""
  1353. index_path = os.path.join(self.tempdir, "index")
  1354. # Create an index with version 4
  1355. index = Index(index_path, read=False, version=4)
  1356. # Add multiple entries with common prefixes
  1357. paths = [
  1358. b"src/main/java/App.java",
  1359. b"src/main/java/Utils.java",
  1360. b"src/main/resources/config.properties",
  1361. b"src/test/java/AppTest.java",
  1362. b"docs/README.md",
  1363. b"docs/INSTALL.md",
  1364. ]
  1365. for i, path in enumerate(paths):
  1366. entry = IndexEntry(
  1367. ctime=(1234567890, 0),
  1368. mtime=(1234567890, 0),
  1369. dev=1,
  1370. ino=i + 1,
  1371. mode=0o100644,
  1372. uid=1000,
  1373. gid=1000,
  1374. size=10,
  1375. sha=f"{i:040d}".encode(),
  1376. )
  1377. index[path] = entry
  1378. # Write and read back
  1379. index.write()
  1380. # Read the index back
  1381. index2 = Index(index_path)
  1382. self.assertEqual(index2._version, 4)
  1383. # Verify all paths were preserved correctly
  1384. for path in paths:
  1385. self.assertIn(path, index2)
  1386. # Verify the index file is smaller than version 2 would be
  1387. with open(index_path, "rb") as f:
  1388. v4_size = len(f.read())
  1389. # Create equivalent version 2 index for comparison
  1390. index_v2_path = os.path.join(self.tempdir, "index_v2")
  1391. index_v2 = Index(index_v2_path, read=False, version=2)
  1392. for path in paths:
  1393. entry = IndexEntry(
  1394. ctime=(1234567890, 0),
  1395. mtime=(1234567890, 0),
  1396. dev=1,
  1397. ino=1,
  1398. mode=0o100644,
  1399. uid=1000,
  1400. gid=1000,
  1401. size=10,
  1402. sha=b"0" * 40,
  1403. )
  1404. index_v2[path] = entry
  1405. index_v2.write()
  1406. with open(index_v2_path, "rb") as f:
  1407. v2_size = len(f.read())
  1408. # Version 4 should be smaller due to compression
  1409. self.assertLess(
  1410. v4_size, v2_size, "Version 4 index should be smaller than version 2"
  1411. )
  1412. def test_path_compression_edge_cases(self):
  1413. """Test edge cases in path compression."""
  1414. # Empty paths
  1415. compressed = _compress_path(b"", b"")
  1416. decompressed, _ = _decompress_path(compressed, 0, b"")
  1417. self.assertEqual(b"", decompressed)
  1418. # Path identical to previous
  1419. compressed = _compress_path(b"same.txt", b"same.txt")
  1420. decompressed, _ = _decompress_path(compressed, 0, b"same.txt")
  1421. self.assertEqual(b"same.txt", decompressed)
  1422. # Path shorter than previous
  1423. compressed = _compress_path(b"short", b"very/long/path/file.txt")
  1424. decompressed, _ = _decompress_path(compressed, 0, b"very/long/path/file.txt")
  1425. self.assertEqual(b"short", decompressed)
  1426. class TestUpdateWorkingTree(TestCase):
  1427. def setUp(self):
  1428. self.tempdir = tempfile.mkdtemp()
  1429. def cleanup_tempdir():
  1430. """Remove tempdir, handling read-only files on Windows."""
  1431. def remove_readonly(func, path, excinfo):
  1432. """Error handler for Windows read-only files."""
  1433. import stat
  1434. if sys.platform == "win32" and excinfo[0] is PermissionError:
  1435. os.chmod(path, stat.S_IWRITE)
  1436. func(path)
  1437. else:
  1438. raise
  1439. shutil.rmtree(self.tempdir, onerror=remove_readonly)
  1440. self.addCleanup(cleanup_tempdir)
  1441. self.repo = Repo.init(self.tempdir)
  1442. def test_update_working_tree_with_blob_normalizer(self):
  1443. """Test update_working_tree with a blob normalizer."""
  1444. # Create a simple blob normalizer that converts CRLF to LF
  1445. class TestBlobNormalizer:
  1446. def checkout_normalize(self, blob, path):
  1447. # Convert CRLF to LF during checkout
  1448. new_blob = Blob()
  1449. new_blob.data = blob.data.replace(b"\r\n", b"\n")
  1450. return new_blob
  1451. # Create a tree with a file containing CRLF
  1452. blob = Blob()
  1453. blob.data = b"Hello\r\nWorld\r\n"
  1454. self.repo.object_store.add_object(blob)
  1455. tree = Tree()
  1456. tree[b"test.txt"] = (0o100644, blob.id)
  1457. self.repo.object_store.add_object(tree)
  1458. # Update working tree with normalizer
  1459. normalizer = TestBlobNormalizer()
  1460. changes = tree_changes(self.repo.object_store, None, tree.id)
  1461. update_working_tree(
  1462. self.repo,
  1463. None, # old_tree_id
  1464. tree.id, # new_tree_id
  1465. change_iterator=changes,
  1466. blob_normalizer=normalizer,
  1467. )
  1468. # Check that the file was written with LF line endings
  1469. test_file = os.path.join(self.tempdir, "test.txt")
  1470. with open(test_file, "rb") as f:
  1471. content = f.read()
  1472. self.assertEqual(b"Hello\nWorld\n", content)
  1473. # Check that the index has the original blob SHA
  1474. index = self.repo.open_index()
  1475. self.assertEqual(blob.id, index[b"test.txt"].sha)
  1476. def test_update_working_tree_without_blob_normalizer(self):
  1477. """Test update_working_tree without a blob normalizer."""
  1478. # Create a tree with a file containing CRLF
  1479. blob = Blob()
  1480. blob.data = b"Hello\r\nWorld\r\n"
  1481. self.repo.object_store.add_object(blob)
  1482. tree = Tree()
  1483. tree[b"test.txt"] = (0o100644, blob.id)
  1484. self.repo.object_store.add_object(tree)
  1485. # Update working tree without normalizer
  1486. changes = tree_changes(self.repo.object_store, None, tree.id)
  1487. update_working_tree(
  1488. self.repo,
  1489. None, # old_tree_id
  1490. tree.id, # new_tree_id
  1491. change_iterator=changes,
  1492. blob_normalizer=None,
  1493. )
  1494. # Check that the file was written with original CRLF line endings
  1495. test_file = os.path.join(self.tempdir, "test.txt")
  1496. with open(test_file, "rb") as f:
  1497. content = f.read()
  1498. self.assertEqual(b"Hello\r\nWorld\r\n", content)
  1499. # Check that the index has the blob SHA
  1500. index = self.repo.open_index()
  1501. self.assertEqual(blob.id, index[b"test.txt"].sha)
  1502. def test_update_working_tree_remove_directory(self):
  1503. """Test that update_working_tree properly removes directories."""
  1504. # Create initial tree with a directory containing files
  1505. blob1 = Blob()
  1506. blob1.data = b"content1"
  1507. self.repo.object_store.add_object(blob1)
  1508. blob2 = Blob()
  1509. blob2.data = b"content2"
  1510. self.repo.object_store.add_object(blob2)
  1511. tree1 = Tree()
  1512. tree1[b"dir/file1.txt"] = (0o100644, blob1.id)
  1513. tree1[b"dir/file2.txt"] = (0o100644, blob2.id)
  1514. self.repo.object_store.add_object(tree1)
  1515. # Update to tree1 (create directory with files)
  1516. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1517. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1518. # Verify directory and files exist
  1519. dir_path = os.path.join(self.tempdir, "dir")
  1520. self.assertTrue(os.path.isdir(dir_path))
  1521. self.assertTrue(os.path.exists(os.path.join(dir_path, "file1.txt")))
  1522. self.assertTrue(os.path.exists(os.path.join(dir_path, "file2.txt")))
  1523. # Create empty tree (remove everything)
  1524. tree2 = Tree()
  1525. self.repo.object_store.add_object(tree2)
  1526. # Update to empty tree
  1527. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1528. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1529. # Verify directory was removed
  1530. self.assertFalse(os.path.exists(dir_path))
  1531. def test_update_working_tree_submodule_to_file(self):
  1532. """Test replacing a submodule directory with a file."""
  1533. # Create tree with submodule
  1534. submodule_sha = b"a" * 40
  1535. tree1 = Tree()
  1536. tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
  1537. self.repo.object_store.add_object(tree1)
  1538. # Update to tree with submodule
  1539. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1540. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1541. # Verify submodule directory exists with .git file
  1542. submodule_path = os.path.join(self.tempdir, "submodule")
  1543. self.assertTrue(os.path.isdir(submodule_path))
  1544. self.assertTrue(os.path.exists(os.path.join(submodule_path, ".git")))
  1545. # Create tree with file at same path
  1546. blob = Blob()
  1547. blob.data = b"file content"
  1548. self.repo.object_store.add_object(blob)
  1549. tree2 = Tree()
  1550. tree2[b"submodule"] = (0o100644, blob.id)
  1551. self.repo.object_store.add_object(tree2)
  1552. # Update to tree with file (should remove submodule directory and create file)
  1553. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1554. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1555. # Verify it's now a file
  1556. self.assertTrue(os.path.isfile(submodule_path))
  1557. with open(submodule_path, "rb") as f:
  1558. self.assertEqual(b"file content", f.read())
  1559. def test_update_working_tree_directory_with_nested_subdir(self):
  1560. """Test removing directory with nested subdirectories."""
  1561. # Create tree with nested directories
  1562. blob = Blob()
  1563. blob.data = b"deep content"
  1564. self.repo.object_store.add_object(blob)
  1565. tree1 = Tree()
  1566. tree1[b"a/b/c/file.txt"] = (0o100644, blob.id)
  1567. self.repo.object_store.add_object(tree1)
  1568. # Update to tree1
  1569. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1570. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1571. # Verify nested structure exists
  1572. path_a = os.path.join(self.tempdir, "a")
  1573. path_b = os.path.join(path_a, "b")
  1574. path_c = os.path.join(path_b, "c")
  1575. file_path = os.path.join(path_c, "file.txt")
  1576. self.assertTrue(os.path.exists(file_path))
  1577. # Create empty tree
  1578. tree2 = Tree()
  1579. self.repo.object_store.add_object(tree2)
  1580. # Update to empty tree
  1581. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1582. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1583. # Verify all directories were removed
  1584. self.assertFalse(os.path.exists(path_a))
  1585. def test_update_working_tree_file_replaced_by_dir_not_removed(self):
  1586. """Test that a directory replacing a git file is left alone if not empty."""
  1587. # Create tree with a file
  1588. blob = Blob()
  1589. blob.data = b"file content"
  1590. self.repo.object_store.add_object(blob)
  1591. tree1 = Tree()
  1592. tree1[b"path"] = (0o100644, blob.id)
  1593. self.repo.object_store.add_object(tree1)
  1594. # Update to tree1
  1595. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1596. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1597. # Verify file exists
  1598. file_path = os.path.join(self.tempdir, "path")
  1599. self.assertTrue(os.path.isfile(file_path))
  1600. # Manually replace file with directory containing untracked file
  1601. os.remove(file_path)
  1602. os.mkdir(file_path)
  1603. with open(os.path.join(file_path, "untracked.txt"), "w") as f:
  1604. f.write("untracked content")
  1605. # Create empty tree
  1606. tree2 = Tree()
  1607. self.repo.object_store.add_object(tree2)
  1608. # Update should succeed but leave the directory alone
  1609. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1610. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1611. # Directory should still exist with its contents
  1612. self.assertTrue(os.path.isdir(file_path))
  1613. self.assertTrue(os.path.exists(os.path.join(file_path, "untracked.txt")))
  1614. def test_update_working_tree_file_replaced_by_empty_dir_removed(self):
  1615. """Test that an empty directory replacing a git file is removed."""
  1616. # Create tree with a file
  1617. blob = Blob()
  1618. blob.data = b"file content"
  1619. self.repo.object_store.add_object(blob)
  1620. tree1 = Tree()
  1621. tree1[b"path"] = (0o100644, blob.id)
  1622. self.repo.object_store.add_object(tree1)
  1623. # Update to tree1
  1624. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1625. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1626. # Verify file exists
  1627. file_path = os.path.join(self.tempdir, "path")
  1628. self.assertTrue(os.path.isfile(file_path))
  1629. # Manually replace file with empty directory
  1630. os.remove(file_path)
  1631. os.mkdir(file_path)
  1632. # Create empty tree
  1633. tree2 = Tree()
  1634. self.repo.object_store.add_object(tree2)
  1635. # Update should remove the empty directory
  1636. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1637. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1638. # Directory should be gone
  1639. self.assertFalse(os.path.exists(file_path))
  1640. def test_update_working_tree_symlink_transitions(self):
  1641. """Test transitions involving symlinks."""
  1642. # Skip on Windows where symlinks might not be supported
  1643. if sys.platform == "win32":
  1644. self.skipTest("Symlinks not fully supported on Windows")
  1645. # Create tree with symlink
  1646. blob1 = Blob()
  1647. blob1.data = b"target/path"
  1648. self.repo.object_store.add_object(blob1)
  1649. tree1 = Tree()
  1650. tree1[b"link"] = (0o120000, blob1.id) # Symlink mode
  1651. self.repo.object_store.add_object(tree1)
  1652. # Update to tree with symlink
  1653. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1654. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1655. link_path = os.path.join(self.tempdir, "link")
  1656. self.assertTrue(os.path.islink(link_path))
  1657. self.assertEqual(b"target/path", os.readlink(link_path).encode())
  1658. # Test 1: Replace symlink with regular file
  1659. blob2 = Blob()
  1660. blob2.data = b"file content"
  1661. self.repo.object_store.add_object(blob2)
  1662. tree2 = Tree()
  1663. tree2[b"link"] = (0o100644, blob2.id)
  1664. self.repo.object_store.add_object(tree2)
  1665. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1666. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1667. self.assertFalse(os.path.islink(link_path))
  1668. self.assertTrue(os.path.isfile(link_path))
  1669. with open(link_path, "rb") as f:
  1670. self.assertEqual(b"file content", f.read())
  1671. # Test 2: Replace file with symlink
  1672. changes = tree_changes(self.repo.object_store, tree2.id, tree1.id)
  1673. update_working_tree(self.repo, tree2.id, tree1.id, change_iterator=changes)
  1674. self.assertTrue(os.path.islink(link_path))
  1675. self.assertEqual(b"target/path", os.readlink(link_path).encode())
  1676. # Test 3: Replace symlink with directory (manually)
  1677. os.unlink(link_path)
  1678. os.mkdir(link_path)
  1679. # Create empty tree
  1680. tree3 = Tree()
  1681. self.repo.object_store.add_object(tree3)
  1682. # Should remove empty directory
  1683. changes = tree_changes(self.repo.object_store, tree1.id, tree3.id)
  1684. update_working_tree(self.repo, tree1.id, tree3.id, change_iterator=changes)
  1685. self.assertFalse(os.path.exists(link_path))
  1686. def test_update_working_tree_modified_file_to_dir_transition(self):
  1687. """Test that modified files are not removed when they should be directories."""
  1688. # Create tree with file
  1689. blob1 = Blob()
  1690. blob1.data = b"original content"
  1691. self.repo.object_store.add_object(blob1)
  1692. tree1 = Tree()
  1693. tree1[b"path"] = (0o100644, blob1.id)
  1694. self.repo.object_store.add_object(tree1)
  1695. # Update to tree1
  1696. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1697. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1698. file_path = os.path.join(self.tempdir, "path")
  1699. # Modify the file locally
  1700. with open(file_path, "w") as f:
  1701. f.write("modified content")
  1702. # Create tree where path is a directory with file
  1703. blob2 = Blob()
  1704. blob2.data = b"subfile content"
  1705. self.repo.object_store.add_object(blob2)
  1706. tree2 = Tree()
  1707. tree2[b"path/subfile"] = (0o100644, blob2.id)
  1708. self.repo.object_store.add_object(tree2)
  1709. # Update should fail because can't create directory where modified file exists
  1710. with self.assertRaises(IOError):
  1711. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1712. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1713. # File should still exist with modifications
  1714. self.assertTrue(os.path.isfile(file_path))
  1715. with open(file_path) as f:
  1716. self.assertEqual("modified content", f.read())
  1717. def test_update_working_tree_executable_transitions(self):
  1718. """Test transitions involving executable bit changes."""
  1719. # Skip on Windows where executable bit is not supported
  1720. if sys.platform == "win32":
  1721. self.skipTest("Executable bit not supported on Windows")
  1722. # Create tree with non-executable file
  1723. blob = Blob()
  1724. blob.data = b"#!/bin/sh\necho hello"
  1725. self.repo.object_store.add_object(blob)
  1726. tree1 = Tree()
  1727. tree1[b"script.sh"] = (0o100644, blob.id) # Non-executable
  1728. self.repo.object_store.add_object(tree1)
  1729. # Update to tree1
  1730. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1731. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1732. script_path = os.path.join(self.tempdir, "script.sh")
  1733. self.assertTrue(os.path.isfile(script_path))
  1734. # Check it's not executable
  1735. mode = os.stat(script_path).st_mode
  1736. self.assertFalse(mode & stat.S_IXUSR)
  1737. # Create tree with executable file (same content)
  1738. tree2 = Tree()
  1739. tree2[b"script.sh"] = (0o100755, blob.id) # Executable
  1740. self.repo.object_store.add_object(tree2)
  1741. # Update to tree2
  1742. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1743. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1744. # Check it's now executable
  1745. mode = os.stat(script_path).st_mode
  1746. self.assertTrue(mode & stat.S_IXUSR)
  1747. def test_update_working_tree_submodule_with_untracked_files(self):
  1748. """Test that submodules with untracked files are not removed."""
  1749. from dulwich.objects import S_IFGITLINK, Tree
  1750. # Create tree with submodule
  1751. submodule_sha = b"a" * 40
  1752. tree1 = Tree()
  1753. tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
  1754. self.repo.object_store.add_object(tree1)
  1755. # Update to tree with submodule
  1756. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1757. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1758. # Add untracked file to submodule directory
  1759. submodule_path = os.path.join(self.tempdir, "submodule")
  1760. untracked_path = os.path.join(submodule_path, "untracked.txt")
  1761. with open(untracked_path, "w") as f:
  1762. f.write("untracked content")
  1763. # Create empty tree
  1764. tree2 = Tree()
  1765. self.repo.object_store.add_object(tree2)
  1766. # Update should not remove submodule directory with untracked files
  1767. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1768. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1769. # Directory should still exist with untracked file
  1770. self.assertTrue(os.path.isdir(submodule_path))
  1771. self.assertTrue(os.path.exists(untracked_path))
  1772. def test_update_working_tree_dir_to_file_with_subdir(self):
  1773. """Test replacing directory structure with a file."""
  1774. # Create tree with nested directory structure
  1775. blob1 = Blob()
  1776. blob1.data = b"content1"
  1777. self.repo.object_store.add_object(blob1)
  1778. blob2 = Blob()
  1779. blob2.data = b"content2"
  1780. self.repo.object_store.add_object(blob2)
  1781. tree1 = Tree()
  1782. tree1[b"dir/subdir/file1"] = (0o100644, blob1.id)
  1783. tree1[b"dir/subdir/file2"] = (0o100644, blob2.id)
  1784. self.repo.object_store.add_object(tree1)
  1785. # Update to tree1
  1786. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1787. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1788. # Verify structure exists
  1789. dir_path = os.path.join(self.tempdir, "dir")
  1790. self.assertTrue(os.path.isdir(dir_path))
  1791. # Add an untracked file to make directory truly non-empty
  1792. untracked_path = os.path.join(dir_path, "untracked.txt")
  1793. with open(untracked_path, "w") as f:
  1794. f.write("untracked content")
  1795. # Create tree with file at "dir" path
  1796. blob3 = Blob()
  1797. blob3.data = b"replacement file"
  1798. self.repo.object_store.add_object(blob3)
  1799. tree2 = Tree()
  1800. tree2[b"dir"] = (0o100644, blob3.id)
  1801. self.repo.object_store.add_object(tree2)
  1802. # Update should fail because directory is not empty
  1803. with self.assertRaises(IsADirectoryError):
  1804. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1805. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1806. # Directory should still exist
  1807. self.assertTrue(os.path.isdir(dir_path))
  1808. def test_update_working_tree_case_sensitivity(self):
  1809. """Test handling of case-sensitive filename changes."""
  1810. # Create tree with lowercase file
  1811. blob1 = Blob()
  1812. blob1.data = b"lowercase content"
  1813. self.repo.object_store.add_object(blob1)
  1814. tree1 = Tree()
  1815. tree1[b"readme.txt"] = (0o100644, blob1.id)
  1816. self.repo.object_store.add_object(tree1)
  1817. # Update to tree1
  1818. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1819. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1820. # Create tree with uppercase file (different content)
  1821. blob2 = Blob()
  1822. blob2.data = b"uppercase content"
  1823. self.repo.object_store.add_object(blob2)
  1824. tree2 = Tree()
  1825. tree2[b"README.txt"] = (0o100644, blob2.id)
  1826. self.repo.object_store.add_object(tree2)
  1827. # Update to tree2
  1828. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1829. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1830. # Check what exists (behavior depends on filesystem)
  1831. lowercase_path = os.path.join(self.tempdir, "readme.txt")
  1832. uppercase_path = os.path.join(self.tempdir, "README.txt")
  1833. # On case-insensitive filesystems, one will overwrite the other
  1834. # On case-sensitive filesystems, both may exist
  1835. self.assertTrue(
  1836. os.path.exists(lowercase_path) or os.path.exists(uppercase_path)
  1837. )
  1838. def test_update_working_tree_deeply_nested_removal(self):
  1839. """Test removal of deeply nested directory structures."""
  1840. # Create deeply nested structure
  1841. blob = Blob()
  1842. blob.data = b"deep content"
  1843. self.repo.object_store.add_object(blob)
  1844. tree1 = Tree()
  1845. # Create a very deep path
  1846. deep_path = b"/".join([b"level%d" % i for i in range(10)])
  1847. tree1[deep_path + b"/file.txt"] = (0o100644, blob.id)
  1848. self.repo.object_store.add_object(tree1)
  1849. # Update to tree1
  1850. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1851. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1852. # Verify deep structure exists
  1853. current_path = self.tempdir
  1854. for i in range(10):
  1855. current_path = os.path.join(current_path, f"level{i}")
  1856. self.assertTrue(os.path.isdir(current_path))
  1857. # Create empty tree
  1858. tree2 = Tree()
  1859. self.repo.object_store.add_object(tree2)
  1860. # Update should remove all empty directories
  1861. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1862. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1863. # Verify top level directory is gone
  1864. top_level = os.path.join(self.tempdir, "level0")
  1865. self.assertFalse(os.path.exists(top_level))
  1866. def test_update_working_tree_read_only_files(self):
  1867. """Test handling of read-only files during updates."""
  1868. # Create tree with file
  1869. blob1 = Blob()
  1870. blob1.data = b"original content"
  1871. self.repo.object_store.add_object(blob1)
  1872. tree1 = Tree()
  1873. tree1[b"readonly.txt"] = (0o100644, blob1.id)
  1874. self.repo.object_store.add_object(tree1)
  1875. # Update to tree1
  1876. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1877. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1878. # Make file read-only
  1879. file_path = os.path.join(self.tempdir, "readonly.txt")
  1880. os.chmod(file_path, 0o444) # Read-only
  1881. # Create tree with modified file
  1882. blob2 = Blob()
  1883. blob2.data = b"new content"
  1884. self.repo.object_store.add_object(blob2)
  1885. tree2 = Tree()
  1886. tree2[b"readonly.txt"] = (0o100644, blob2.id)
  1887. self.repo.object_store.add_object(tree2)
  1888. # Update should handle read-only file
  1889. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1890. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1891. # Verify content was updated
  1892. with open(file_path, "rb") as f:
  1893. self.assertEqual(b"new content", f.read())
  1894. def test_update_working_tree_invalid_filenames(self):
  1895. """Test handling of invalid filenames for the platform."""
  1896. # Create tree with potentially problematic filenames
  1897. blob = Blob()
  1898. blob.data = b"content"
  1899. self.repo.object_store.add_object(blob)
  1900. tree = Tree()
  1901. # Add files with names that might be invalid on some platforms
  1902. tree[b"valid.txt"] = (0o100644, blob.id)
  1903. if sys.platform != "win32":
  1904. # These are invalid on Windows but valid on Unix
  1905. tree[b"file:with:colons.txt"] = (0o100644, blob.id)
  1906. tree[b"file<with>brackets.txt"] = (0o100644, blob.id)
  1907. self.repo.object_store.add_object(tree)
  1908. # Update should skip invalid files based on validation
  1909. changes = tree_changes(self.repo.object_store, None, tree.id)
  1910. update_working_tree(self.repo, None, tree.id, change_iterator=changes)
  1911. # Valid file should exist
  1912. self.assertTrue(os.path.exists(os.path.join(self.tempdir, "valid.txt")))
  1913. def test_update_working_tree_symlink_to_directory(self):
  1914. """Test replacing a symlink pointing to a directory with a real directory."""
  1915. if sys.platform == "win32":
  1916. self.skipTest("Symlinks not fully supported on Windows")
  1917. # Create a target directory
  1918. target_dir = os.path.join(self.tempdir, "target")
  1919. os.mkdir(target_dir)
  1920. with open(os.path.join(target_dir, "file.txt"), "w") as f:
  1921. f.write("target file")
  1922. # Create tree with symlink pointing to directory
  1923. blob1 = Blob()
  1924. blob1.data = b"target" # Relative path to target directory
  1925. self.repo.object_store.add_object(blob1)
  1926. tree1 = Tree()
  1927. tree1[b"link"] = (0o120000, blob1.id)
  1928. self.repo.object_store.add_object(tree1)
  1929. # Update to tree1
  1930. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1931. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1932. link_path = os.path.join(self.tempdir, "link")
  1933. self.assertTrue(os.path.islink(link_path))
  1934. # Create tree with actual directory at same path
  1935. blob2 = Blob()
  1936. blob2.data = b"new file content"
  1937. self.repo.object_store.add_object(blob2)
  1938. tree2 = Tree()
  1939. tree2[b"link/newfile.txt"] = (0o100644, blob2.id)
  1940. self.repo.object_store.add_object(tree2)
  1941. # Update should replace symlink with actual directory
  1942. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1943. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1944. self.assertFalse(os.path.islink(link_path))
  1945. self.assertTrue(os.path.isdir(link_path))
  1946. self.assertTrue(os.path.exists(os.path.join(link_path, "newfile.txt")))
  1947. def test_update_working_tree_comprehensive_transitions(self):
  1948. """Test all possible file type transitions comprehensively."""
  1949. # Skip on Windows where symlinks might not be supported
  1950. if sys.platform == "win32":
  1951. self.skipTest("Symlinks not fully supported on Windows")
  1952. # Create blobs for different file types
  1953. file_blob = Blob()
  1954. file_blob.data = b"regular file content"
  1955. self.repo.object_store.add_object(file_blob)
  1956. exec_blob = Blob()
  1957. exec_blob.data = b"#!/bin/sh\necho executable"
  1958. self.repo.object_store.add_object(exec_blob)
  1959. link_blob = Blob()
  1960. link_blob.data = b"target/path"
  1961. self.repo.object_store.add_object(link_blob)
  1962. submodule_sha = b"a" * 40
  1963. # Test 1: Regular file → Submodule
  1964. tree1 = Tree()
  1965. tree1[b"item"] = (0o100644, file_blob.id)
  1966. self.repo.object_store.add_object(tree1)
  1967. tree2 = Tree()
  1968. tree2[b"item"] = (S_IFGITLINK, submodule_sha)
  1969. self.repo.object_store.add_object(tree2)
  1970. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1971. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1972. self.assertTrue(os.path.isfile(os.path.join(self.tempdir, "item")))
  1973. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1974. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1975. self.assertTrue(os.path.isdir(os.path.join(self.tempdir, "item")))
  1976. # Test 2: Submodule → Executable file
  1977. tree3 = Tree()
  1978. tree3[b"item"] = (0o100755, exec_blob.id)
  1979. self.repo.object_store.add_object(tree3)
  1980. changes = tree_changes(self.repo.object_store, tree2.id, tree3.id)
  1981. update_working_tree(self.repo, tree2.id, tree3.id, change_iterator=changes)
  1982. item_path = os.path.join(self.tempdir, "item")
  1983. self.assertTrue(os.path.isfile(item_path))
  1984. if sys.platform != "win32":
  1985. self.assertTrue(os.access(item_path, os.X_OK))
  1986. # Test 3: Executable file → Symlink
  1987. tree4 = Tree()
  1988. tree4[b"item"] = (0o120000, link_blob.id)
  1989. self.repo.object_store.add_object(tree4)
  1990. changes = tree_changes(self.repo.object_store, tree3.id, tree4.id)
  1991. update_working_tree(self.repo, tree3.id, tree4.id, change_iterator=changes)
  1992. self.assertTrue(os.path.islink(item_path))
  1993. # Test 4: Symlink → Submodule
  1994. tree5 = Tree()
  1995. tree5[b"item"] = (S_IFGITLINK, submodule_sha)
  1996. self.repo.object_store.add_object(tree5)
  1997. changes = tree_changes(self.repo.object_store, tree4.id, tree5.id)
  1998. update_working_tree(self.repo, tree4.id, tree5.id, change_iterator=changes)
  1999. self.assertTrue(os.path.isdir(item_path))
  2000. # Test 5: Clean up - Submodule → absent
  2001. tree6 = Tree()
  2002. self.repo.object_store.add_object(tree6)
  2003. changes = tree_changes(self.repo.object_store, tree5.id, tree6.id)
  2004. update_working_tree(self.repo, tree5.id, tree6.id, change_iterator=changes)
  2005. self.assertFalse(os.path.exists(item_path))
  2006. # Test 6: Symlink → Executable file
  2007. tree7 = Tree()
  2008. tree7[b"item2"] = (0o120000, link_blob.id)
  2009. self.repo.object_store.add_object(tree7)
  2010. changes = tree_changes(self.repo.object_store, tree6.id, tree7.id)
  2011. update_working_tree(self.repo, tree6.id, tree7.id, change_iterator=changes)
  2012. item2_path = os.path.join(self.tempdir, "item2")
  2013. self.assertTrue(os.path.islink(item2_path))
  2014. tree8 = Tree()
  2015. tree8[b"item2"] = (0o100755, exec_blob.id)
  2016. self.repo.object_store.add_object(tree8)
  2017. changes = tree_changes(self.repo.object_store, tree7.id, tree8.id)
  2018. update_working_tree(self.repo, tree7.id, tree8.id, change_iterator=changes)
  2019. self.assertTrue(os.path.isfile(item2_path))
  2020. if sys.platform != "win32":
  2021. self.assertTrue(os.access(item2_path, os.X_OK))
  2022. def test_update_working_tree_partial_update_failure(self):
  2023. """Test handling when update fails partway through."""
  2024. # Create initial tree
  2025. blob1 = Blob()
  2026. blob1.data = b"file1 content"
  2027. self.repo.object_store.add_object(blob1)
  2028. blob2 = Blob()
  2029. blob2.data = b"file2 content"
  2030. self.repo.object_store.add_object(blob2)
  2031. tree1 = Tree()
  2032. tree1[b"file1.txt"] = (0o100644, blob1.id)
  2033. tree1[b"file2.txt"] = (0o100644, blob2.id)
  2034. self.repo.object_store.add_object(tree1)
  2035. # Update to tree1
  2036. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2037. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2038. # Create a directory where file2.txt is, to cause a conflict
  2039. file2_path = os.path.join(self.tempdir, "file2.txt")
  2040. os.remove(file2_path)
  2041. os.mkdir(file2_path)
  2042. # Add untracked file to prevent removal
  2043. with open(os.path.join(file2_path, "blocker.txt"), "w") as f:
  2044. f.write("blocking content")
  2045. # Create tree with updates to both files
  2046. blob3 = Blob()
  2047. blob3.data = b"file1 updated"
  2048. self.repo.object_store.add_object(blob3)
  2049. blob4 = Blob()
  2050. blob4.data = b"file2 updated"
  2051. self.repo.object_store.add_object(blob4)
  2052. tree2 = Tree()
  2053. tree2[b"file1.txt"] = (0o100644, blob3.id)
  2054. tree2[b"file2.txt"] = (0o100644, blob4.id)
  2055. self.repo.object_store.add_object(tree2)
  2056. # Update should partially succeed - file1 updated, file2 blocked
  2057. try:
  2058. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2059. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2060. except IsADirectoryError:
  2061. # Expected to fail on file2 because it's a directory
  2062. pass
  2063. # file1 should be updated
  2064. with open(os.path.join(self.tempdir, "file1.txt"), "rb") as f:
  2065. self.assertEqual(b"file1 updated", f.read())
  2066. # file2 should still be a directory
  2067. self.assertTrue(os.path.isdir(file2_path))