2
0

test_index.py 120 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381
  1. # test_index.py -- Tests for the git index
  2. # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the index."""
  22. import os
  23. import shutil
  24. import stat
  25. import struct
  26. import sys
  27. import tempfile
  28. from io import BytesIO
  29. from dulwich.config import ConfigDict
  30. from dulwich.diff_tree import (
  31. CHANGE_ADD,
  32. CHANGE_COPY,
  33. CHANGE_DELETE,
  34. CHANGE_MODIFY,
  35. CHANGE_RENAME,
  36. TreeChange,
  37. tree_changes,
  38. )
  39. from dulwich.index import (
  40. Index,
  41. IndexEntry,
  42. SerializedIndexEntry,
  43. _compress_path,
  44. _decode_varint,
  45. _decompress_path,
  46. _encode_varint,
  47. _fs_to_tree_path,
  48. _tree_to_fs_path,
  49. build_index_from_tree,
  50. cleanup_mode,
  51. commit_tree,
  52. detect_case_only_renames,
  53. get_unstaged_changes,
  54. index_entry_from_directory,
  55. index_entry_from_path,
  56. index_entry_from_stat,
  57. iter_fresh_entries,
  58. read_index,
  59. read_index_dict,
  60. update_working_tree,
  61. validate_path_element_default,
  62. validate_path_element_hfs,
  63. validate_path_element_ntfs,
  64. write_cache_time,
  65. write_index,
  66. write_index_dict,
  67. )
  68. from dulwich.object_store import MemoryObjectStore
  69. from dulwich.objects import S_IFGITLINK, ZERO_SHA, Blob, Tree, TreeEntry
  70. from dulwich.repo import Repo
  71. from dulwich.tests.utils import make_commit
  72. from . import TestCase, skipIf
  73. def can_symlink() -> bool:
  74. """Return whether running process can create symlinks."""
  75. if sys.platform != "win32":
  76. # Platforms other than Windows should allow symlinks without issues.
  77. return True
  78. test_source = tempfile.mkdtemp()
  79. test_target = test_source + "can_symlink"
  80. try:
  81. os.symlink(test_source, test_target)
  82. except (NotImplementedError, OSError):
  83. return False
  84. return True
  85. class IndexTestCase(TestCase):
  86. datadir = os.path.join(os.path.dirname(__file__), "../testdata/indexes")
  87. def get_simple_index(self, name):
  88. return Index(os.path.join(self.datadir, name))
  89. class SimpleIndexTestCase(IndexTestCase):
  90. def test_len(self) -> None:
  91. self.assertEqual(1, len(self.get_simple_index("index")))
  92. def test_iter(self) -> None:
  93. self.assertEqual([b"bla"], list(self.get_simple_index("index")))
  94. def test_iter_skip_hash(self) -> None:
  95. self.assertEqual([b"bla"], list(self.get_simple_index("index_skip_hash")))
  96. def test_iterobjects(self) -> None:
  97. self.assertEqual(
  98. [(b"bla", b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", 33188)],
  99. list(self.get_simple_index("index").iterobjects()),
  100. )
  101. def test_getitem(self) -> None:
  102. self.assertEqual(
  103. IndexEntry(
  104. (1230680220, 0),
  105. (1230680220, 0),
  106. 2050,
  107. 3761020,
  108. 33188,
  109. 1000,
  110. 1000,
  111. 0,
  112. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  113. 0,
  114. 0,
  115. ),
  116. self.get_simple_index("index")[b"bla"],
  117. )
  118. def test_empty(self) -> None:
  119. i = self.get_simple_index("notanindex")
  120. self.assertEqual(0, len(i))
  121. self.assertFalse(os.path.exists(i._filename))
  122. def test_against_empty_tree(self) -> None:
  123. i = self.get_simple_index("index")
  124. changes = list(i.changes_from_tree(MemoryObjectStore(), None))
  125. self.assertEqual(1, len(changes))
  126. (_oldname, newname), (_oldmode, _newmode), (_oldsha, newsha) = changes[0]
  127. self.assertEqual(b"bla", newname)
  128. self.assertEqual(b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", newsha)
  129. def test_index_pathlib(self) -> None:
  130. import tempfile
  131. from pathlib import Path
  132. # Create a temporary index file
  133. with tempfile.NamedTemporaryFile(suffix=".index", delete=False) as f:
  134. temp_path = f.name
  135. self.addCleanup(os.unlink, temp_path)
  136. # Test creating Index with pathlib.Path
  137. path_obj = Path(temp_path)
  138. index = Index(path_obj, read=False)
  139. self.assertEqual(str(path_obj), index.path)
  140. # Add an entry and write
  141. index[b"test"] = IndexEntry(
  142. ctime=(0, 0),
  143. mtime=(0, 0),
  144. dev=0,
  145. ino=0,
  146. mode=33188,
  147. uid=0,
  148. gid=0,
  149. size=0,
  150. sha=b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  151. )
  152. index.write()
  153. # Read it back with pathlib.Path
  154. index2 = Index(path_obj)
  155. self.assertIn(b"test", index2)
  156. class SimpleIndexWriterTestCase(IndexTestCase):
  157. def setUp(self) -> None:
  158. IndexTestCase.setUp(self)
  159. self.tempdir = tempfile.mkdtemp()
  160. def tearDown(self) -> None:
  161. IndexTestCase.tearDown(self)
  162. shutil.rmtree(self.tempdir)
  163. def test_simple_write(self) -> None:
  164. entries = [
  165. (
  166. SerializedIndexEntry(
  167. b"barbla",
  168. (1230680220, 0),
  169. (1230680220, 0),
  170. 2050,
  171. 3761020,
  172. 33188,
  173. 1000,
  174. 1000,
  175. 0,
  176. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  177. 0,
  178. 0,
  179. )
  180. )
  181. ]
  182. filename = os.path.join(self.tempdir, "test-simple-write-index")
  183. with open(filename, "wb+") as x:
  184. write_index(x, entries)
  185. with open(filename, "rb") as x:
  186. self.assertEqual(entries, list(read_index(x)))
  187. class ReadIndexDictTests(IndexTestCase):
  188. def setUp(self) -> None:
  189. IndexTestCase.setUp(self)
  190. self.tempdir = tempfile.mkdtemp()
  191. def tearDown(self) -> None:
  192. IndexTestCase.tearDown(self)
  193. shutil.rmtree(self.tempdir)
  194. def test_simple_write(self) -> None:
  195. entries = {
  196. b"barbla": IndexEntry(
  197. (1230680220, 0),
  198. (1230680220, 0),
  199. 2050,
  200. 3761020,
  201. 33188,
  202. 1000,
  203. 1000,
  204. 0,
  205. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  206. 0,
  207. 0,
  208. )
  209. }
  210. filename = os.path.join(self.tempdir, "test-simple-write-index")
  211. with open(filename, "wb+") as x:
  212. write_index_dict(x, entries)
  213. with open(filename, "rb") as x:
  214. self.assertEqual(entries, read_index_dict(x))
  215. class CommitTreeTests(TestCase):
  216. def setUp(self) -> None:
  217. super().setUp()
  218. self.store = MemoryObjectStore()
  219. def test_single_blob(self) -> None:
  220. blob = Blob()
  221. blob.data = b"foo"
  222. self.store.add_object(blob)
  223. blobs = [(b"bla", blob.id, stat.S_IFREG)]
  224. rootid = commit_tree(self.store, blobs)
  225. self.assertEqual(rootid, b"1a1e80437220f9312e855c37ac4398b68e5c1d50")
  226. self.assertEqual((stat.S_IFREG, blob.id), self.store[rootid][b"bla"])
  227. self.assertEqual({rootid, blob.id}, set(self.store._data.keys()))
  228. def test_nested(self) -> None:
  229. blob = Blob()
  230. blob.data = b"foo"
  231. self.store.add_object(blob)
  232. blobs = [(b"bla/bar", blob.id, stat.S_IFREG)]
  233. rootid = commit_tree(self.store, blobs)
  234. self.assertEqual(rootid, b"d92b959b216ad0d044671981196781b3258fa537")
  235. dirid = self.store[rootid][b"bla"][1]
  236. self.assertEqual(dirid, b"c1a1deb9788150829579a8b4efa6311e7b638650")
  237. self.assertEqual((stat.S_IFDIR, dirid), self.store[rootid][b"bla"])
  238. self.assertEqual((stat.S_IFREG, blob.id), self.store[dirid][b"bar"])
  239. self.assertEqual({rootid, dirid, blob.id}, set(self.store._data.keys()))
  240. class CleanupModeTests(TestCase):
  241. def assertModeEqual(self, expected, got) -> None:
  242. self.assertEqual(expected, got, f"{expected:o} != {got:o}")
  243. def test_file(self) -> None:
  244. self.assertModeEqual(0o100644, cleanup_mode(0o100000))
  245. def test_executable(self) -> None:
  246. self.assertModeEqual(0o100755, cleanup_mode(0o100711))
  247. self.assertModeEqual(0o100755, cleanup_mode(0o100700))
  248. def test_symlink(self) -> None:
  249. self.assertModeEqual(0o120000, cleanup_mode(0o120711))
  250. def test_dir(self) -> None:
  251. self.assertModeEqual(0o040000, cleanup_mode(0o40531))
  252. def test_submodule(self) -> None:
  253. self.assertModeEqual(0o160000, cleanup_mode(0o160744))
  254. class WriteCacheTimeTests(TestCase):
  255. def test_write_string(self) -> None:
  256. f = BytesIO()
  257. self.assertRaises(TypeError, write_cache_time, f, "foo")
  258. def test_write_int(self) -> None:
  259. f = BytesIO()
  260. write_cache_time(f, 434343)
  261. self.assertEqual(struct.pack(">LL", 434343, 0), f.getvalue())
  262. def test_write_tuple(self) -> None:
  263. f = BytesIO()
  264. write_cache_time(f, (434343, 21))
  265. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  266. def test_write_float(self) -> None:
  267. f = BytesIO()
  268. write_cache_time(f, 434343.000000021)
  269. self.assertEqual(struct.pack(">LL", 434343, 21), f.getvalue())
  270. class IndexEntryFromStatTests(TestCase):
  271. def test_simple(self) -> None:
  272. st = os.stat_result(
  273. (
  274. 16877,
  275. 131078,
  276. 64769,
  277. 154,
  278. 1000,
  279. 1000,
  280. 12288,
  281. 1323629595,
  282. 1324180496,
  283. 1324180496,
  284. )
  285. )
  286. entry = index_entry_from_stat(st, b"22" * 20)
  287. self.assertEqual(
  288. entry,
  289. IndexEntry(
  290. 1324180496,
  291. 1324180496,
  292. 64769,
  293. 131078,
  294. 16384,
  295. 1000,
  296. 1000,
  297. 12288,
  298. b"2222222222222222222222222222222222222222",
  299. 0,
  300. 0,
  301. ),
  302. )
  303. def test_override_mode(self) -> None:
  304. st = os.stat_result(
  305. (
  306. stat.S_IFREG + 0o644,
  307. 131078,
  308. 64769,
  309. 154,
  310. 1000,
  311. 1000,
  312. 12288,
  313. 1323629595,
  314. 1324180496,
  315. 1324180496,
  316. )
  317. )
  318. entry = index_entry_from_stat(st, b"22" * 20, mode=stat.S_IFREG + 0o755)
  319. self.assertEqual(
  320. entry,
  321. IndexEntry(
  322. 1324180496,
  323. 1324180496,
  324. 64769,
  325. 131078,
  326. 33261,
  327. 1000,
  328. 1000,
  329. 12288,
  330. b"2222222222222222222222222222222222222222",
  331. 0,
  332. 0,
  333. ),
  334. )
  335. class BuildIndexTests(TestCase):
  336. def assertReasonableIndexEntry(self, index_entry, mode, filesize, sha) -> None:
  337. self.assertEqual(index_entry.mode, mode) # mode
  338. self.assertEqual(index_entry.size, filesize) # filesize
  339. self.assertEqual(index_entry.sha, sha) # sha
  340. def assertFileContents(self, path, contents, symlink=False) -> None:
  341. if symlink:
  342. self.assertEqual(os.readlink(path), contents)
  343. else:
  344. with open(path, "rb") as f:
  345. self.assertEqual(f.read(), contents)
  346. def test_empty(self) -> None:
  347. repo_dir = tempfile.mkdtemp()
  348. self.addCleanup(shutil.rmtree, repo_dir)
  349. with Repo.init(repo_dir) as repo:
  350. tree = Tree()
  351. repo.object_store.add_object(tree)
  352. build_index_from_tree(
  353. repo.path, repo.index_path(), repo.object_store, tree.id
  354. )
  355. # Verify index entries
  356. index = repo.open_index()
  357. self.assertEqual(len(index), 0)
  358. # Verify no files
  359. self.assertEqual([".git"], os.listdir(repo.path))
  360. def test_git_dir(self) -> None:
  361. repo_dir = tempfile.mkdtemp()
  362. self.addCleanup(shutil.rmtree, repo_dir)
  363. with Repo.init(repo_dir) as repo:
  364. # Populate repo
  365. filea = Blob.from_string(b"file a")
  366. filee = Blob.from_string(b"d")
  367. tree = Tree()
  368. tree[b".git/a"] = (stat.S_IFREG | 0o644, filea.id)
  369. tree[b"c/e"] = (stat.S_IFREG | 0o644, filee.id)
  370. repo.object_store.add_objects([(o, None) for o in [filea, filee, tree]])
  371. build_index_from_tree(
  372. repo.path, repo.index_path(), repo.object_store, tree.id
  373. )
  374. # Verify index entries
  375. index = repo.open_index()
  376. self.assertEqual(len(index), 1)
  377. # filea
  378. apath = os.path.join(repo.path, ".git", "a")
  379. self.assertFalse(os.path.exists(apath))
  380. # filee
  381. epath = os.path.join(repo.path, "c", "e")
  382. self.assertTrue(os.path.exists(epath))
  383. self.assertReasonableIndexEntry(
  384. index[b"c/e"], stat.S_IFREG | 0o644, 1, filee.id
  385. )
  386. self.assertFileContents(epath, b"d")
  387. def test_nonempty(self) -> None:
  388. repo_dir = tempfile.mkdtemp()
  389. self.addCleanup(shutil.rmtree, repo_dir)
  390. with Repo.init(repo_dir) as repo:
  391. # Populate repo
  392. filea = Blob.from_string(b"file a")
  393. fileb = Blob.from_string(b"file b")
  394. filed = Blob.from_string(b"file d")
  395. tree = Tree()
  396. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  397. tree[b"b"] = (stat.S_IFREG | 0o644, fileb.id)
  398. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  399. repo.object_store.add_objects(
  400. [(o, None) for o in [filea, fileb, filed, tree]]
  401. )
  402. build_index_from_tree(
  403. repo.path, repo.index_path(), repo.object_store, tree.id
  404. )
  405. # Verify index entries
  406. index = repo.open_index()
  407. self.assertEqual(len(index), 3)
  408. # filea
  409. apath = os.path.join(repo.path, "a")
  410. self.assertTrue(os.path.exists(apath))
  411. self.assertReasonableIndexEntry(
  412. index[b"a"], stat.S_IFREG | 0o644, 6, filea.id
  413. )
  414. self.assertFileContents(apath, b"file a")
  415. # fileb
  416. bpath = os.path.join(repo.path, "b")
  417. self.assertTrue(os.path.exists(bpath))
  418. self.assertReasonableIndexEntry(
  419. index[b"b"], stat.S_IFREG | 0o644, 6, fileb.id
  420. )
  421. self.assertFileContents(bpath, b"file b")
  422. # filed
  423. dpath = os.path.join(repo.path, "c", "d")
  424. self.assertTrue(os.path.exists(dpath))
  425. self.assertReasonableIndexEntry(
  426. index[b"c/d"], stat.S_IFREG | 0o644, 6, filed.id
  427. )
  428. self.assertFileContents(dpath, b"file d")
  429. # Verify no extra files
  430. self.assertEqual([".git", "a", "b", "c"], sorted(os.listdir(repo.path)))
  431. self.assertEqual(["d"], sorted(os.listdir(os.path.join(repo.path, "c"))))
  432. @skipIf(not getattr(os, "sync", None), "Requires sync support")
  433. def test_norewrite(self) -> None:
  434. repo_dir = tempfile.mkdtemp()
  435. self.addCleanup(shutil.rmtree, repo_dir)
  436. with Repo.init(repo_dir) as repo:
  437. # Populate repo
  438. filea = Blob.from_string(b"file a")
  439. filea_path = os.path.join(repo_dir, "a")
  440. tree = Tree()
  441. tree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  442. repo.object_store.add_objects([(o, None) for o in [filea, tree]])
  443. # First Write
  444. build_index_from_tree(
  445. repo.path, repo.index_path(), repo.object_store, tree.id
  446. )
  447. # Use sync as metadata can be cached on some FS
  448. os.sync()
  449. mtime = os.stat(filea_path).st_mtime
  450. # Test Rewrite
  451. build_index_from_tree(
  452. repo.path, repo.index_path(), repo.object_store, tree.id
  453. )
  454. os.sync()
  455. self.assertEqual(mtime, os.stat(filea_path).st_mtime)
  456. # Modify content
  457. with open(filea_path, "wb") as fh:
  458. fh.write(b"test a")
  459. os.sync()
  460. mtime = os.stat(filea_path).st_mtime
  461. # Test rewrite
  462. build_index_from_tree(
  463. repo.path, repo.index_path(), repo.object_store, tree.id
  464. )
  465. os.sync()
  466. with open(filea_path, "rb") as fh:
  467. self.assertEqual(b"file a", fh.read())
  468. @skipIf(not can_symlink(), "Requires symlink support")
  469. def test_symlink(self) -> None:
  470. repo_dir = tempfile.mkdtemp()
  471. self.addCleanup(shutil.rmtree, repo_dir)
  472. with Repo.init(repo_dir) as repo:
  473. # Populate repo
  474. filed = Blob.from_string(b"file d")
  475. filee = Blob.from_string(b"d")
  476. tree = Tree()
  477. tree[b"c/d"] = (stat.S_IFREG | 0o644, filed.id)
  478. tree[b"c/e"] = (stat.S_IFLNK, filee.id) # symlink
  479. repo.object_store.add_objects([(o, None) for o in [filed, filee, tree]])
  480. build_index_from_tree(
  481. repo.path, repo.index_path(), repo.object_store, tree.id
  482. )
  483. # Verify index entries
  484. index = repo.open_index()
  485. # symlink to d
  486. epath = os.path.join(repo.path, "c", "e")
  487. self.assertTrue(os.path.exists(epath))
  488. self.assertReasonableIndexEntry(
  489. index[b"c/e"],
  490. stat.S_IFLNK,
  491. 0 if sys.platform == "win32" else 1,
  492. filee.id,
  493. )
  494. self.assertFileContents(epath, "d", symlink=True)
  495. def test_no_decode_encode(self) -> None:
  496. repo_dir = tempfile.mkdtemp()
  497. repo_dir_bytes = os.fsencode(repo_dir)
  498. self.addCleanup(shutil.rmtree, repo_dir)
  499. with Repo.init(repo_dir) as repo:
  500. # Populate repo
  501. file = Blob.from_string(b"foo")
  502. tree = Tree()
  503. latin1_name = "À".encode("latin1")
  504. try:
  505. latin1_path = os.path.join(repo_dir_bytes, latin1_name)
  506. except UnicodeDecodeError:
  507. self.skipTest("can not decode as latin1")
  508. utf8_name = "À".encode()
  509. utf8_path = os.path.join(repo_dir_bytes, utf8_name)
  510. tree[latin1_name] = (stat.S_IFREG | 0o644, file.id)
  511. tree[utf8_name] = (stat.S_IFREG | 0o644, file.id)
  512. repo.object_store.add_objects([(o, None) for o in [file, tree]])
  513. try:
  514. build_index_from_tree(
  515. repo.path, repo.index_path(), repo.object_store, tree.id
  516. )
  517. except OSError as e:
  518. if e.errno == 92 and sys.platform == "darwin":
  519. # Our filename isn't supported by the platform :(
  520. self.skipTest(f"can not write filename {e.filename!r}")
  521. else:
  522. raise
  523. except UnicodeDecodeError:
  524. # This happens e.g. with python3.6 on Windows.
  525. # It implicitly decodes using utf8, which doesn't work.
  526. self.skipTest("can not implicitly convert as utf8")
  527. # Verify index entries
  528. index = repo.open_index()
  529. self.assertIn(latin1_name, index)
  530. self.assertIn(utf8_name, index)
  531. self.assertTrue(os.path.exists(latin1_path))
  532. self.assertTrue(os.path.exists(utf8_path))
  533. def test_windows_unicode_filename_encoding(self) -> None:
  534. """Test that Unicode filenames are handled correctly on Windows.
  535. This test verifies the fix for GitHub issue #203, where filenames
  536. containing Unicode characters like 'À' were incorrectly encoded/decoded
  537. on Windows, resulting in corruption like 'À' -> 'À'.
  538. """
  539. repo_dir = tempfile.mkdtemp()
  540. self.addCleanup(shutil.rmtree, repo_dir)
  541. with Repo.init(repo_dir) as repo:
  542. # Create a blob
  543. file_content = b"test file content"
  544. blob = Blob.from_string(file_content)
  545. # Create a tree with a Unicode filename
  546. tree = Tree()
  547. unicode_filename = "À" # This is the character from GitHub issue #203
  548. utf8_filename_bytes = unicode_filename.encode(
  549. "utf-8"
  550. ) # This is how it's stored in git trees
  551. tree[utf8_filename_bytes] = (stat.S_IFREG | 0o644, blob.id)
  552. repo.object_store.add_objects([(blob, None), (tree, None)])
  553. # Build index from tree (this is what happens during checkout/clone)
  554. try:
  555. build_index_from_tree(
  556. repo.path, repo.index_path(), repo.object_store, tree.id
  557. )
  558. except (OSError, UnicodeError) as e:
  559. if sys.platform == "win32" and "cannot" in str(e).lower():
  560. self.skipTest(f"Platform doesn't support filename: {e}")
  561. raise
  562. # Check that the file was created correctly
  563. expected_file_path = os.path.join(repo.path, unicode_filename)
  564. self.assertTrue(
  565. os.path.exists(expected_file_path),
  566. f"File should exist at {expected_file_path}",
  567. )
  568. # Verify the file content is correct
  569. with open(expected_file_path, "rb") as f:
  570. actual_content = f.read()
  571. self.assertEqual(actual_content, file_content)
  572. # Test the reverse: adding a Unicode filename to the index
  573. if sys.platform == "win32":
  574. # On Windows, test that _tree_to_fs_path and _fs_to_tree_path
  575. # handle UTF-8 encoded tree paths correctly
  576. from dulwich.index import _fs_to_tree_path, _tree_to_fs_path
  577. repo_path_bytes = os.fsencode(repo.path)
  578. # Test tree path to filesystem path conversion
  579. fs_path = _tree_to_fs_path(repo_path_bytes, utf8_filename_bytes)
  580. expected_fs_path = os.path.join(
  581. repo_path_bytes, os.fsencode(unicode_filename)
  582. )
  583. self.assertEqual(fs_path, expected_fs_path)
  584. # Test filesystem path to tree path conversion
  585. # _fs_to_tree_path expects relative paths, not absolute paths
  586. # Extract just the filename from the full path
  587. filename_only = os.path.basename(fs_path)
  588. reconstructed_tree_path = _fs_to_tree_path(
  589. filename_only, tree_encoding="utf-8"
  590. )
  591. self.assertEqual(reconstructed_tree_path, utf8_filename_bytes)
  592. def test_git_submodule(self) -> None:
  593. repo_dir = tempfile.mkdtemp()
  594. self.addCleanup(shutil.rmtree, repo_dir)
  595. with Repo.init(repo_dir) as repo:
  596. filea = Blob.from_string(b"file alalala")
  597. subtree = Tree()
  598. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  599. c = make_commit(
  600. tree=subtree.id,
  601. author=b"Somebody <somebody@example.com>",
  602. committer=b"Somebody <somebody@example.com>",
  603. author_time=42342,
  604. commit_time=42342,
  605. author_timezone=0,
  606. commit_timezone=0,
  607. parents=[],
  608. message=b"Subcommit",
  609. )
  610. tree = Tree()
  611. tree[b"c"] = (S_IFGITLINK, c.id)
  612. repo.object_store.add_objects([(o, None) for o in [tree]])
  613. build_index_from_tree(
  614. repo.path, repo.index_path(), repo.object_store, tree.id
  615. )
  616. # Verify index entries
  617. index = repo.open_index()
  618. self.assertEqual(len(index), 1)
  619. # filea
  620. apath = os.path.join(repo.path, "c/a")
  621. self.assertFalse(os.path.exists(apath))
  622. # dir c
  623. cpath = os.path.join(repo.path, "c")
  624. self.assertTrue(os.path.isdir(cpath))
  625. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  626. self.assertEqual(index[b"c"].sha, c.id) # sha
  627. def test_git_submodule_exists(self) -> None:
  628. repo_dir = tempfile.mkdtemp()
  629. self.addCleanup(shutil.rmtree, repo_dir)
  630. with Repo.init(repo_dir) as repo:
  631. filea = Blob.from_string(b"file alalala")
  632. subtree = Tree()
  633. subtree[b"a"] = (stat.S_IFREG | 0o644, filea.id)
  634. c = make_commit(
  635. tree=subtree.id,
  636. author=b"Somebody <somebody@example.com>",
  637. committer=b"Somebody <somebody@example.com>",
  638. author_time=42342,
  639. commit_time=42342,
  640. author_timezone=0,
  641. commit_timezone=0,
  642. parents=[],
  643. message=b"Subcommit",
  644. )
  645. tree = Tree()
  646. tree[b"c"] = (S_IFGITLINK, c.id)
  647. os.mkdir(os.path.join(repo_dir, "c"))
  648. repo.object_store.add_objects([(o, None) for o in [tree]])
  649. build_index_from_tree(
  650. repo.path, repo.index_path(), repo.object_store, tree.id
  651. )
  652. # Verify index entries
  653. index = repo.open_index()
  654. self.assertEqual(len(index), 1)
  655. # filea
  656. apath = os.path.join(repo.path, "c/a")
  657. self.assertFalse(os.path.exists(apath))
  658. # dir c
  659. cpath = os.path.join(repo.path, "c")
  660. self.assertTrue(os.path.isdir(cpath))
  661. self.assertEqual(index[b"c"].mode, S_IFGITLINK) # mode
  662. self.assertEqual(index[b"c"].sha, c.id) # sha
  663. def test_with_line_ending_normalization(self) -> None:
  664. """Test that build_index_from_tree applies line-ending normalization."""
  665. repo_dir = tempfile.mkdtemp()
  666. self.addCleanup(shutil.rmtree, repo_dir)
  667. from dulwich.line_ending import BlobNormalizer
  668. with Repo.init(repo_dir) as repo:
  669. # Set up autocrlf config
  670. config = repo.get_config()
  671. config.set((b"core",), b"autocrlf", b"true")
  672. config.write_to_path()
  673. # Create blob with LF line endings
  674. content_lf = b"line1\nline2\nline3\n"
  675. blob = Blob.from_string(content_lf)
  676. tree = Tree()
  677. tree[b"test.txt"] = (stat.S_IFREG | 0o644, blob.id)
  678. repo.object_store.add_objects([(blob, None), (tree, None)])
  679. # Create blob normalizer
  680. autocrlf = config.get((b"core",), b"autocrlf")
  681. blob_normalizer = BlobNormalizer(config, {}, autocrlf=autocrlf)
  682. # Build index with normalization
  683. build_index_from_tree(
  684. repo.path,
  685. repo.index_path(),
  686. repo.object_store,
  687. tree.id,
  688. blob_normalizer=blob_normalizer,
  689. )
  690. # On Windows with autocrlf=true, file should have CRLF line endings
  691. test_file = os.path.join(repo.path, "test.txt")
  692. with open(test_file, "rb") as f:
  693. content = f.read()
  694. # autocrlf=true means LF -> CRLF on checkout (on all platforms for testing)
  695. expected_content = b"line1\r\nline2\r\nline3\r\n"
  696. self.assertEqual(content, expected_content)
  697. class GetUnstagedChangesTests(TestCase):
  698. def test_get_unstaged_changes(self) -> None:
  699. """Unit test for get_unstaged_changes."""
  700. repo_dir = tempfile.mkdtemp()
  701. self.addCleanup(shutil.rmtree, repo_dir)
  702. with Repo.init(repo_dir) as repo:
  703. # Commit a dummy file then modify it
  704. foo1_fullpath = os.path.join(repo_dir, "foo1")
  705. with open(foo1_fullpath, "wb") as f:
  706. f.write(b"origstuff")
  707. foo2_fullpath = os.path.join(repo_dir, "foo2")
  708. with open(foo2_fullpath, "wb") as f:
  709. f.write(b"origstuff")
  710. repo.get_worktree().stage(["foo1", "foo2"])
  711. repo.get_worktree().commit(
  712. message=b"test status",
  713. committer=b"committer <email>",
  714. author=b"author <email>",
  715. )
  716. with open(foo1_fullpath, "wb") as f:
  717. f.write(b"newstuff")
  718. # modify access and modify time of path
  719. os.utime(foo1_fullpath, (0, 0))
  720. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  721. self.assertEqual(list(changes), [b"foo1"])
  722. def test_get_unstaged_changes_with_preload(self) -> None:
  723. """Unit test for get_unstaged_changes with preload_index=True."""
  724. repo_dir = tempfile.mkdtemp()
  725. self.addCleanup(shutil.rmtree, repo_dir)
  726. with Repo.init(repo_dir) as repo:
  727. # Create multiple files to test parallel processing
  728. files = []
  729. for i in range(10):
  730. filename = f"foo{i}"
  731. fullpath = os.path.join(repo_dir, filename)
  732. with open(fullpath, "wb") as f:
  733. f.write(b"origstuff" + str(i).encode())
  734. files.append(filename)
  735. repo.get_worktree().stage(files)
  736. repo.get_worktree().commit(
  737. b"test status",
  738. author=b"author <email>",
  739. committer=b"committer <email>",
  740. )
  741. # Modify some files
  742. modified_files = [b"foo1", b"foo3", b"foo5", b"foo7"]
  743. for filename in modified_files:
  744. fullpath = os.path.join(repo_dir, filename.decode())
  745. with open(fullpath, "wb") as f:
  746. f.write(b"newstuff")
  747. os.utime(fullpath, (0, 0))
  748. # Test with preload_index=False (serial)
  749. changes_serial = list(
  750. get_unstaged_changes(repo.open_index(), repo_dir, preload_index=False)
  751. )
  752. changes_serial.sort()
  753. # Test with preload_index=True (parallel)
  754. changes_parallel = list(
  755. get_unstaged_changes(repo.open_index(), repo_dir, preload_index=True)
  756. )
  757. changes_parallel.sort()
  758. # Both should return the same results
  759. self.assertEqual(changes_serial, changes_parallel)
  760. self.assertEqual(changes_serial, sorted(modified_files))
  761. def test_get_unstaged_changes_nanosecond_precision(self) -> None:
  762. """Test that nanosecond precision mtime is used for change detection."""
  763. repo_dir = tempfile.mkdtemp()
  764. self.addCleanup(shutil.rmtree, repo_dir)
  765. with Repo.init(repo_dir) as repo:
  766. # Commit a file
  767. foo_fullpath = os.path.join(repo_dir, "foo")
  768. with open(foo_fullpath, "wb") as f:
  769. f.write(b"original content")
  770. repo.get_worktree().stage(["foo"])
  771. repo.get_worktree().commit(
  772. message=b"initial commit",
  773. committer=b"committer <email>",
  774. author=b"author <email>",
  775. )
  776. # Get the current index entry
  777. index = repo.open_index()
  778. entry = index[b"foo"]
  779. # Modify the file with the same size but different content
  780. # This simulates a very fast change within the same second
  781. with open(foo_fullpath, "wb") as f:
  782. f.write(b"modified content")
  783. # Set mtime to match the index entry exactly (same second)
  784. # but with different nanoseconds if the filesystem supports it
  785. st = os.stat(foo_fullpath)
  786. if isinstance(entry.mtime, tuple) and hasattr(st, "st_mtime_ns"):
  787. # Set the mtime to the same second as the index entry
  788. # but with a slightly different nanosecond value
  789. entry_sec = entry.mtime[0]
  790. entry_nsec = entry.mtime[1]
  791. new_mtime_ns = entry_sec * 1_000_000_000 + entry_nsec + 1000
  792. new_mtime = new_mtime_ns / 1_000_000_000
  793. os.utime(foo_fullpath, (st.st_atime, new_mtime))
  794. # The file should be detected as changed due to nanosecond difference
  795. changes = list(get_unstaged_changes(repo.open_index(), repo_dir))
  796. self.assertEqual(changes, [b"foo"])
  797. else:
  798. # If nanosecond precision is not available, skip this test
  799. self.skipTest("Nanosecond precision not available on this system")
  800. def test_get_unstaged_deleted_changes(self) -> None:
  801. """Unit test for get_unstaged_changes."""
  802. repo_dir = tempfile.mkdtemp()
  803. self.addCleanup(shutil.rmtree, repo_dir)
  804. with Repo.init(repo_dir) as repo:
  805. # Commit a dummy file then remove it
  806. foo1_fullpath = os.path.join(repo_dir, "foo1")
  807. with open(foo1_fullpath, "wb") as f:
  808. f.write(b"origstuff")
  809. repo.get_worktree().stage(["foo1"])
  810. repo.get_worktree().commit(
  811. message=b"test status",
  812. committer=b"committer <email>",
  813. author=b"author <email>",
  814. )
  815. os.unlink(foo1_fullpath)
  816. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  817. self.assertEqual(list(changes), [b"foo1"])
  818. def test_get_unstaged_changes_removed_replaced_by_directory(self) -> None:
  819. """Unit test for get_unstaged_changes."""
  820. repo_dir = tempfile.mkdtemp()
  821. self.addCleanup(shutil.rmtree, repo_dir)
  822. with Repo.init(repo_dir) as repo:
  823. # Commit a dummy file then modify it
  824. foo1_fullpath = os.path.join(repo_dir, "foo1")
  825. with open(foo1_fullpath, "wb") as f:
  826. f.write(b"origstuff")
  827. repo.get_worktree().stage(["foo1"])
  828. repo.get_worktree().commit(
  829. message=b"test status",
  830. committer=b"committer <email>",
  831. author=b"author <email>",
  832. )
  833. os.remove(foo1_fullpath)
  834. os.mkdir(foo1_fullpath)
  835. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  836. self.assertEqual(list(changes), [b"foo1"])
  837. @skipIf(not can_symlink(), "Requires symlink support")
  838. def test_get_unstaged_changes_removed_replaced_by_link(self) -> None:
  839. """Unit test for get_unstaged_changes."""
  840. repo_dir = tempfile.mkdtemp()
  841. self.addCleanup(shutil.rmtree, repo_dir)
  842. with Repo.init(repo_dir) as repo:
  843. # Commit a dummy file then modify it
  844. foo1_fullpath = os.path.join(repo_dir, "foo1")
  845. with open(foo1_fullpath, "wb") as f:
  846. f.write(b"origstuff")
  847. repo.get_worktree().stage(["foo1"])
  848. repo.get_worktree().commit(
  849. message=b"test status",
  850. committer=b"committer <email>",
  851. author=b"author <email>",
  852. )
  853. os.remove(foo1_fullpath)
  854. os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
  855. changes = get_unstaged_changes(repo.open_index(), repo_dir)
  856. self.assertEqual(list(changes), [b"foo1"])
  857. class TestValidatePathElement(TestCase):
  858. def test_default(self) -> None:
  859. self.assertTrue(validate_path_element_default(b"bla"))
  860. self.assertTrue(validate_path_element_default(b".bla"))
  861. self.assertFalse(validate_path_element_default(b".git"))
  862. self.assertFalse(validate_path_element_default(b".giT"))
  863. self.assertFalse(validate_path_element_default(b".."))
  864. self.assertTrue(validate_path_element_default(b"git~1"))
  865. def test_ntfs(self) -> None:
  866. self.assertTrue(validate_path_element_ntfs(b"bla"))
  867. self.assertTrue(validate_path_element_ntfs(b".bla"))
  868. self.assertFalse(validate_path_element_ntfs(b".git"))
  869. self.assertFalse(validate_path_element_ntfs(b".giT"))
  870. self.assertFalse(validate_path_element_ntfs(b".."))
  871. self.assertFalse(validate_path_element_ntfs(b"git~1"))
  872. def test_hfs(self) -> None:
  873. # Normal paths should pass
  874. self.assertTrue(validate_path_element_hfs(b"bla"))
  875. self.assertTrue(validate_path_element_hfs(b".bla"))
  876. # Basic .git variations should fail
  877. self.assertFalse(validate_path_element_hfs(b".git"))
  878. self.assertFalse(validate_path_element_hfs(b".giT"))
  879. self.assertFalse(validate_path_element_hfs(b".GIT"))
  880. self.assertFalse(validate_path_element_hfs(b".."))
  881. # git~1 should also fail on HFS+
  882. self.assertFalse(validate_path_element_hfs(b"git~1"))
  883. # Test HFS+ Unicode normalization attacks
  884. # .g\u200cit (zero-width non-joiner)
  885. self.assertFalse(validate_path_element_hfs(b".g\xe2\x80\x8cit"))
  886. # .gi\u200dt (zero-width joiner)
  887. self.assertFalse(validate_path_element_hfs(b".gi\xe2\x80\x8dt"))
  888. # Test other ignorable characters
  889. # .g\ufeffit (zero-width no-break space)
  890. self.assertFalse(validate_path_element_hfs(b".g\xef\xbb\xbfit"))
  891. # Valid Unicode that shouldn't be confused with .git
  892. self.assertTrue(validate_path_element_hfs(b".g\xc3\xaft")) # .gït
  893. self.assertTrue(validate_path_element_hfs(b"git")) # git without dot
  894. class TestTreeFSPathConversion(TestCase):
  895. def test_tree_to_fs_path(self) -> None:
  896. tree_path = "délwíçh/foo".encode()
  897. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  898. self.assertEqual(
  899. fs_path,
  900. os.fsencode(os.path.join("/prefix/path", "délwíçh", "foo")),
  901. )
  902. def test_tree_to_fs_path_windows_separator(self) -> None:
  903. tree_path = b"path/with/slash"
  904. original_sep = os.sep.encode("ascii")
  905. # Temporarily modify os_sep_bytes to test Windows path conversion
  906. # This simulates Windows behavior on all platforms for testing
  907. import dulwich.index
  908. dulwich.index.os_sep_bytes = b"\\"
  909. self.addCleanup(setattr, dulwich.index, "os_sep_bytes", original_sep)
  910. fs_path = _tree_to_fs_path(b"/prefix/path", tree_path)
  911. # The function should join the prefix path with the converted tree path
  912. # The expected behavior is that the path separators in the tree_path are
  913. # converted to the platform-specific separator (which we've set to backslash)
  914. expected_path = os.path.join(b"/prefix/path", b"path\\with\\slash")
  915. self.assertEqual(fs_path, expected_path)
  916. def test_fs_to_tree_path_str(self) -> None:
  917. fs_path = os.path.join(os.path.join("délwíçh", "foo"))
  918. tree_path = _fs_to_tree_path(fs_path)
  919. self.assertEqual(tree_path, "délwíçh/foo".encode())
  920. def test_fs_to_tree_path_bytes(self) -> None:
  921. fs_path = os.path.join(os.fsencode(os.path.join("délwíçh", "foo")))
  922. tree_path = _fs_to_tree_path(fs_path)
  923. self.assertEqual(tree_path, "délwíçh/foo".encode())
  924. def test_fs_to_tree_path_windows_separator(self) -> None:
  925. # Test conversion of Windows paths to tree paths
  926. fs_path = b"path\\with\\backslash"
  927. original_sep = os.sep.encode("ascii")
  928. # Temporarily modify os_sep_bytes to test Windows path conversion
  929. import dulwich.index
  930. dulwich.index.os_sep_bytes = b"\\"
  931. self.addCleanup(setattr, dulwich.index, "os_sep_bytes", original_sep)
  932. tree_path = _fs_to_tree_path(fs_path)
  933. self.assertEqual(tree_path, b"path/with/backslash")
  934. class TestIndexEntryFromPath(TestCase):
  935. def setUp(self):
  936. self.tempdir = tempfile.mkdtemp()
  937. self.addCleanup(shutil.rmtree, self.tempdir)
  938. def test_index_entry_from_path_file(self) -> None:
  939. """Test creating index entry from a regular file."""
  940. # Create a test file
  941. test_file = os.path.join(self.tempdir, "testfile")
  942. with open(test_file, "wb") as f:
  943. f.write(b"test content")
  944. # Get the index entry
  945. entry = index_entry_from_path(os.fsencode(test_file))
  946. # Verify the entry was created with the right mode
  947. self.assertIsNotNone(entry)
  948. self.assertEqual(cleanup_mode(os.stat(test_file).st_mode), entry.mode)
  949. @skipIf(not can_symlink(), "Requires symlink support")
  950. def test_index_entry_from_path_symlink(self) -> None:
  951. """Test creating index entry from a symlink."""
  952. # Create a target file
  953. target_file = os.path.join(self.tempdir, "target")
  954. with open(target_file, "wb") as f:
  955. f.write(b"target content")
  956. # Create a symlink
  957. link_file = os.path.join(self.tempdir, "symlink")
  958. os.symlink(target_file, link_file)
  959. # Get the index entry
  960. entry = index_entry_from_path(os.fsencode(link_file))
  961. # Verify the entry was created with the right mode
  962. self.assertIsNotNone(entry)
  963. self.assertEqual(cleanup_mode(os.lstat(link_file).st_mode), entry.mode)
  964. def test_index_entry_from_path_directory(self) -> None:
  965. """Test creating index entry from a directory (should return None)."""
  966. # Create a directory
  967. test_dir = os.path.join(self.tempdir, "testdir")
  968. os.mkdir(test_dir)
  969. # Get the index entry for a directory
  970. entry = index_entry_from_path(os.fsencode(test_dir))
  971. # Should return None for regular directories
  972. self.assertIsNone(entry)
  973. def test_index_entry_from_directory_regular(self) -> None:
  974. """Test index_entry_from_directory with a regular directory."""
  975. # Create a directory
  976. test_dir = os.path.join(self.tempdir, "testdir")
  977. os.mkdir(test_dir)
  978. # Get stat for the directory
  979. st = os.lstat(test_dir)
  980. # Get the index entry for a regular directory
  981. entry = index_entry_from_directory(st, os.fsencode(test_dir))
  982. # Should return None for regular directories
  983. self.assertIsNone(entry)
  984. def test_index_entry_from_directory_git_submodule(self) -> None:
  985. """Test index_entry_from_directory with a Git submodule."""
  986. # Create a git repository that will be a submodule
  987. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  988. os.mkdir(sub_repo_dir)
  989. # Create the .git directory to make it look like a git repo
  990. git_dir = os.path.join(sub_repo_dir, ".git")
  991. os.mkdir(git_dir)
  992. # Create HEAD file with a fake commit SHA
  993. head_sha = b"1234567890" * 4 # 40-char fake SHA
  994. with open(os.path.join(git_dir, "HEAD"), "wb") as f:
  995. f.write(head_sha)
  996. # Get stat for the submodule directory
  997. st = os.lstat(sub_repo_dir)
  998. # Get the index entry for a git submodule directory
  999. entry = index_entry_from_directory(st, os.fsencode(sub_repo_dir))
  1000. # Since we don't have a proper git setup, this might still return None
  1001. # This test just ensures the code path is executed
  1002. if entry is not None:
  1003. # If an entry is returned, it should have the gitlink mode
  1004. self.assertEqual(entry.mode, S_IFGITLINK)
  1005. def test_index_entry_from_path_with_object_store(self) -> None:
  1006. """Test creating index entry with object store."""
  1007. # Create a test file
  1008. test_file = os.path.join(self.tempdir, "testfile")
  1009. with open(test_file, "wb") as f:
  1010. f.write(b"test content")
  1011. # Create a memory object store
  1012. object_store = MemoryObjectStore()
  1013. # Get the index entry and add to object store
  1014. entry = index_entry_from_path(os.fsencode(test_file), object_store)
  1015. # Verify we can access the blob from the object store
  1016. self.assertIsNotNone(entry)
  1017. blob = object_store[entry.sha]
  1018. self.assertEqual(b"test content", blob.data)
  1019. def test_iter_fresh_entries(self) -> None:
  1020. """Test iterating over fresh entries."""
  1021. # Create some test files
  1022. file1 = os.path.join(self.tempdir, "file1")
  1023. with open(file1, "wb") as f:
  1024. f.write(b"file1 content")
  1025. file2 = os.path.join(self.tempdir, "file2")
  1026. with open(file2, "wb") as f:
  1027. f.write(b"file2 content")
  1028. # Create a memory object store
  1029. object_store = MemoryObjectStore()
  1030. # Get fresh entries
  1031. paths = [b"file1", b"file2", b"nonexistent"]
  1032. entries = dict(
  1033. iter_fresh_entries(paths, os.fsencode(self.tempdir), object_store)
  1034. )
  1035. # Verify both files got entries but nonexistent file is None
  1036. self.assertIn(b"file1", entries)
  1037. self.assertIn(b"file2", entries)
  1038. self.assertIn(b"nonexistent", entries)
  1039. self.assertIsNotNone(entries[b"file1"])
  1040. self.assertIsNotNone(entries[b"file2"])
  1041. self.assertIsNone(entries[b"nonexistent"])
  1042. # Check that blobs were added to object store
  1043. blob1 = object_store[entries[b"file1"].sha]
  1044. self.assertEqual(b"file1 content", blob1.data)
  1045. blob2 = object_store[entries[b"file2"].sha]
  1046. self.assertEqual(b"file2 content", blob2.data)
  1047. def test_read_submodule_head(self) -> None:
  1048. """Test reading the HEAD of a submodule."""
  1049. from dulwich.index import read_submodule_head
  1050. # Create a test repo that will be our "submodule"
  1051. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  1052. os.mkdir(sub_repo_dir)
  1053. submodule_repo = Repo.init(sub_repo_dir)
  1054. # Create a file and commit it to establish a HEAD
  1055. test_file = os.path.join(sub_repo_dir, "testfile")
  1056. with open(test_file, "wb") as f:
  1057. f.write(b"test content")
  1058. submodule_repo.get_worktree().stage(["testfile"])
  1059. commit_id = submodule_repo.get_worktree().commit(
  1060. message=b"Test commit for submodule",
  1061. )
  1062. # Test reading the HEAD
  1063. head_sha = read_submodule_head(sub_repo_dir)
  1064. self.assertEqual(commit_id, head_sha)
  1065. # Test with bytes path
  1066. head_sha_bytes = read_submodule_head(os.fsencode(sub_repo_dir))
  1067. self.assertEqual(commit_id, head_sha_bytes)
  1068. # Test with non-existent path
  1069. non_repo_dir = os.path.join(self.tempdir, "nonrepo")
  1070. os.mkdir(non_repo_dir)
  1071. self.assertIsNone(read_submodule_head(non_repo_dir))
  1072. # Test with path that doesn't have a .git directory
  1073. not_git_dir = os.path.join(self.tempdir, "notgit")
  1074. os.mkdir(not_git_dir)
  1075. self.assertIsNone(read_submodule_head(not_git_dir))
  1076. def test_has_directory_changed(self) -> None:
  1077. """Test checking if a directory has changed."""
  1078. from dulwich.index import IndexEntry, _has_directory_changed
  1079. # Setup mock IndexEntry
  1080. mock_entry = IndexEntry(
  1081. (1230680220, 0),
  1082. (1230680220, 0),
  1083. 2050,
  1084. 3761020,
  1085. 33188,
  1086. 1000,
  1087. 1000,
  1088. 0,
  1089. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391",
  1090. 0,
  1091. 0,
  1092. )
  1093. # Test with a regular directory (not a submodule)
  1094. reg_dir = os.path.join(self.tempdir, "regular_dir")
  1095. os.mkdir(reg_dir)
  1096. # Should return True for regular directory
  1097. self.assertTrue(_has_directory_changed(os.fsencode(reg_dir), mock_entry))
  1098. # Create a git repository to test submodule scenarios
  1099. sub_repo_dir = os.path.join(self.tempdir, "subrepo")
  1100. os.mkdir(sub_repo_dir)
  1101. submodule_repo = Repo.init(sub_repo_dir)
  1102. # Create a file and commit it to establish a HEAD
  1103. test_file = os.path.join(sub_repo_dir, "testfile")
  1104. with open(test_file, "wb") as f:
  1105. f.write(b"test content")
  1106. submodule_repo.get_worktree().stage(["testfile"])
  1107. commit_id = submodule_repo.get_worktree().commit(
  1108. message=b"Test commit for submodule",
  1109. )
  1110. # Create an entry with the correct commit SHA
  1111. correct_entry = IndexEntry(
  1112. (1230680220, 0),
  1113. (1230680220, 0),
  1114. 2050,
  1115. 3761020,
  1116. 33188,
  1117. 1000,
  1118. 1000,
  1119. 0,
  1120. commit_id,
  1121. 0,
  1122. 0,
  1123. )
  1124. # Create an entry with an incorrect commit SHA
  1125. incorrect_entry = IndexEntry(
  1126. (1230680220, 0),
  1127. (1230680220, 0),
  1128. 2050,
  1129. 3761020,
  1130. 33188,
  1131. 1000,
  1132. 1000,
  1133. 0,
  1134. b"0000000000000000000000000000000000000000",
  1135. 0,
  1136. 0,
  1137. )
  1138. # Should return False for submodule with correct SHA
  1139. self.assertFalse(
  1140. _has_directory_changed(os.fsencode(sub_repo_dir), correct_entry)
  1141. )
  1142. # Should return True for submodule with incorrect SHA
  1143. self.assertTrue(
  1144. _has_directory_changed(os.fsencode(sub_repo_dir), incorrect_entry)
  1145. )
  1146. def test_get_unstaged_changes(self) -> None:
  1147. """Test detecting unstaged changes in a working tree."""
  1148. from dulwich.index import (
  1149. ConflictedIndexEntry,
  1150. Index,
  1151. IndexEntry,
  1152. get_unstaged_changes,
  1153. )
  1154. # Create a test repo
  1155. repo_dir = tempfile.mkdtemp()
  1156. self.addCleanup(shutil.rmtree, repo_dir)
  1157. # Create test index
  1158. index = Index(os.path.join(repo_dir, "index"))
  1159. # Create an actual hash of our test content
  1160. from dulwich.objects import Blob
  1161. test_blob = Blob()
  1162. test_blob.data = b"initial content"
  1163. # Create some test files with known contents
  1164. file1_path = os.path.join(repo_dir, "file1")
  1165. with open(file1_path, "wb") as f:
  1166. f.write(b"initial content")
  1167. file2_path = os.path.join(repo_dir, "file2")
  1168. with open(file2_path, "wb") as f:
  1169. f.write(b"initial content")
  1170. # Add them to index
  1171. entry1 = IndexEntry(
  1172. (1230680220, 0),
  1173. (1230680220, 0),
  1174. 2050,
  1175. 3761020,
  1176. 33188,
  1177. 1000,
  1178. 1000,
  1179. 0,
  1180. b"e69de29bb2d1d6434b8b29ae775ad8c2e48c5391", # Not matching actual content
  1181. 0,
  1182. 0,
  1183. )
  1184. entry2 = IndexEntry(
  1185. (1230680220, 0),
  1186. (1230680220, 0),
  1187. 2050,
  1188. 3761020,
  1189. 33188,
  1190. 1000,
  1191. 1000,
  1192. 0,
  1193. test_blob.id, # Will be content's real hash
  1194. 0,
  1195. 0,
  1196. )
  1197. # Add a file that has a conflict
  1198. entry_conflict = ConflictedIndexEntry(b"conflict", {0: None, 1: None, 2: None})
  1199. index._byname = {
  1200. b"file1": entry1,
  1201. b"file2": entry2,
  1202. b"file3": IndexEntry(
  1203. (1230680220, 0),
  1204. (1230680220, 0),
  1205. 2050,
  1206. 3761020,
  1207. 33188,
  1208. 1000,
  1209. 1000,
  1210. 0,
  1211. b"0000000000000000000000000000000000000000",
  1212. 0,
  1213. 0,
  1214. ),
  1215. b"conflict": entry_conflict,
  1216. }
  1217. # Get unstaged changes
  1218. changes = list(get_unstaged_changes(index, repo_dir))
  1219. # File1 should be unstaged (content doesn't match hash)
  1220. # File3 doesn't exist (deleted)
  1221. # Conflict is always unstaged
  1222. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3"])
  1223. # Create directory where there should be a file
  1224. os.mkdir(os.path.join(repo_dir, "file4"))
  1225. index._byname[b"file4"] = entry1
  1226. # Get unstaged changes again
  1227. changes = list(get_unstaged_changes(index, repo_dir))
  1228. # Now file4 should also be unstaged because it's a directory instead of a file
  1229. self.assertEqual(sorted(changes), [b"conflict", b"file1", b"file3", b"file4"])
  1230. # Create a custom blob filter function
  1231. def filter_blob_callback(blob, path):
  1232. # Modify blob data to make it look changed
  1233. result_blob = Blob()
  1234. result_blob.data = b"modified " + blob.data
  1235. return result_blob
  1236. # Get unstaged changes with blob filter
  1237. changes = list(get_unstaged_changes(index, repo_dir, filter_blob_callback))
  1238. # Now both file1 and file2 should be unstaged due to the filter
  1239. self.assertEqual(
  1240. sorted(changes), [b"conflict", b"file1", b"file2", b"file3", b"file4"]
  1241. )
  1242. def test_get_unstaged_changes_with_blob_filter(self) -> None:
  1243. """Test get_unstaged_changes with filter that expects Blob objects.
  1244. This reproduces issue #2010 where passing blob.data instead of blob
  1245. to the filter callback causes AttributeError when the callback expects
  1246. a Blob object (like checkin_normalize does).
  1247. """
  1248. repo_dir = tempfile.mkdtemp()
  1249. self.addCleanup(shutil.rmtree, repo_dir)
  1250. with Repo.init(repo_dir) as repo:
  1251. # Create and commit a test file
  1252. test_file = os.path.join(repo_dir, "test.txt")
  1253. with open(test_file, "wb") as f:
  1254. f.write(b"original content")
  1255. repo.get_worktree().stage(["test.txt"])
  1256. repo.get_worktree().commit(
  1257. message=b"Initial commit",
  1258. committer=b"Test <test@example.com>",
  1259. author=b"Test <test@example.com>",
  1260. )
  1261. # Create a .gitattributes file
  1262. gitattributes_file = os.path.join(repo_dir, ".gitattributes")
  1263. with open(gitattributes_file, "wb") as f:
  1264. f.write(b"*.txt text\n")
  1265. # Modify the test file
  1266. with open(test_file, "wb") as f:
  1267. f.write(b"modified content")
  1268. # Force mtime change to ensure stat doesn't match
  1269. os.utime(test_file, (0, 0))
  1270. # Create a filter callback that expects Blob objects (like checkin_normalize)
  1271. def blob_filter_callback(blob: Blob, path: bytes) -> Blob:
  1272. """Filter that expects a Blob object, not bytes."""
  1273. # This should receive a Blob object with a .data attribute
  1274. self.assertIsInstance(blob, Blob)
  1275. self.assertTrue(hasattr(blob, "data"))
  1276. # Return the blob unchanged for this test
  1277. return blob
  1278. # This should not raise AttributeError: 'bytes' object has no attribute 'data'
  1279. changes = list(
  1280. get_unstaged_changes(repo.open_index(), repo_dir, blob_filter_callback)
  1281. )
  1282. # Should detect the change in test.txt
  1283. self.assertIn(b"test.txt", changes)
  1284. class TestManyFilesFeature(TestCase):
  1285. """Tests for the manyFiles feature (index version 4 and skipHash)."""
  1286. def setUp(self):
  1287. self.tempdir = tempfile.mkdtemp()
  1288. self.addCleanup(shutil.rmtree, self.tempdir)
  1289. def test_index_version_4_parsing(self):
  1290. """Test that index version 4 files can be parsed."""
  1291. index_path = os.path.join(self.tempdir, "index")
  1292. # Create an index with version 4
  1293. index = Index(index_path, read=False, version=4)
  1294. # Add some entries
  1295. entry = IndexEntry(
  1296. ctime=(1234567890, 0),
  1297. mtime=(1234567890, 0),
  1298. dev=1,
  1299. ino=1,
  1300. mode=0o100644,
  1301. uid=1000,
  1302. gid=1000,
  1303. size=5,
  1304. sha=ZERO_SHA,
  1305. )
  1306. index[b"test.txt"] = entry
  1307. # Write and read back
  1308. index.write()
  1309. # Read the index back
  1310. index2 = Index(index_path)
  1311. self.assertEqual(index2._version, 4)
  1312. self.assertIn(b"test.txt", index2)
  1313. def test_skip_hash_feature(self):
  1314. """Test that skipHash feature works correctly."""
  1315. index_path = os.path.join(self.tempdir, "index")
  1316. # Create an index with skipHash enabled
  1317. index = Index(index_path, read=False, skip_hash=True)
  1318. # Add some entries
  1319. entry = IndexEntry(
  1320. ctime=(1234567890, 0),
  1321. mtime=(1234567890, 0),
  1322. dev=1,
  1323. ino=1,
  1324. mode=0o100644,
  1325. uid=1000,
  1326. gid=1000,
  1327. size=5,
  1328. sha=ZERO_SHA,
  1329. )
  1330. index[b"test.txt"] = entry
  1331. # Write the index
  1332. index.write()
  1333. # Verify the file was written with zero hash
  1334. with open(index_path, "rb") as f:
  1335. f.seek(-20, 2) # Seek to last 20 bytes
  1336. trailing_hash = f.read(20)
  1337. self.assertEqual(trailing_hash, b"\x00" * 20)
  1338. # Verify we can still read it back
  1339. index2 = Index(index_path)
  1340. self.assertIn(b"test.txt", index2)
  1341. def test_version_4_no_padding(self):
  1342. """Test that version 4 entries have no padding."""
  1343. # Create entries with names that would show compression benefits
  1344. entries = [
  1345. SerializedIndexEntry(
  1346. name=b"src/main/java/com/example/Service.java",
  1347. ctime=(1234567890, 0),
  1348. mtime=(1234567890, 0),
  1349. dev=1,
  1350. ino=1,
  1351. mode=0o100644,
  1352. uid=1000,
  1353. gid=1000,
  1354. size=5,
  1355. sha=ZERO_SHA,
  1356. flags=0,
  1357. extended_flags=0,
  1358. ),
  1359. SerializedIndexEntry(
  1360. name=b"src/main/java/com/example/Controller.java",
  1361. ctime=(1234567890, 0),
  1362. mtime=(1234567890, 0),
  1363. dev=1,
  1364. ino=2,
  1365. mode=0o100644,
  1366. uid=1000,
  1367. gid=1000,
  1368. size=5,
  1369. sha=b"1" * 40,
  1370. flags=0,
  1371. extended_flags=0,
  1372. ),
  1373. ]
  1374. # Test version 2 (with padding, full paths)
  1375. buf_v2 = BytesIO()
  1376. from dulwich.index import write_cache_entry
  1377. previous_path = b""
  1378. for entry in entries:
  1379. # Set proper flags for v2
  1380. entry_v2 = SerializedIndexEntry(
  1381. entry.name,
  1382. entry.ctime,
  1383. entry.mtime,
  1384. entry.dev,
  1385. entry.ino,
  1386. entry.mode,
  1387. entry.uid,
  1388. entry.gid,
  1389. entry.size,
  1390. entry.sha,
  1391. len(entry.name),
  1392. entry.extended_flags,
  1393. )
  1394. write_cache_entry(buf_v2, entry_v2, version=2, previous_path=previous_path)
  1395. previous_path = entry.name
  1396. v2_data = buf_v2.getvalue()
  1397. # Test version 4 (path compression, no padding)
  1398. buf_v4 = BytesIO()
  1399. previous_path = b""
  1400. for entry in entries:
  1401. write_cache_entry(buf_v4, entry, version=4, previous_path=previous_path)
  1402. previous_path = entry.name
  1403. v4_data = buf_v4.getvalue()
  1404. # Version 4 should be shorter due to compression and no padding
  1405. self.assertLess(len(v4_data), len(v2_data))
  1406. # Both should parse correctly
  1407. buf_v2.seek(0)
  1408. from dulwich.index import read_cache_entry
  1409. previous_path = b""
  1410. parsed_v2_entries = []
  1411. for _ in entries:
  1412. parsed = read_cache_entry(buf_v2, version=2, previous_path=previous_path)
  1413. parsed_v2_entries.append(parsed)
  1414. previous_path = parsed.name
  1415. buf_v4.seek(0)
  1416. previous_path = b""
  1417. parsed_v4_entries = []
  1418. for _ in entries:
  1419. parsed = read_cache_entry(buf_v4, version=4, previous_path=previous_path)
  1420. parsed_v4_entries.append(parsed)
  1421. previous_path = parsed.name
  1422. # Both should have the same paths
  1423. for v2_entry, v4_entry in zip(parsed_v2_entries, parsed_v4_entries):
  1424. self.assertEqual(v2_entry.name, v4_entry.name)
  1425. self.assertEqual(v2_entry.sha, v4_entry.sha)
  1426. class TestManyFilesRepoIntegration(TestCase):
  1427. """Tests for manyFiles feature integration with Repo."""
  1428. def setUp(self):
  1429. self.tempdir = tempfile.mkdtemp()
  1430. self.addCleanup(shutil.rmtree, self.tempdir)
  1431. def test_repo_with_manyfiles_config(self):
  1432. """Test that a repository with feature.manyFiles=true uses the right settings."""
  1433. # Create a new repository
  1434. repo = Repo.init(self.tempdir)
  1435. # Set feature.manyFiles=true in config
  1436. config = repo.get_config()
  1437. config.set(b"feature", b"manyFiles", b"true")
  1438. config.write_to_path()
  1439. # Open the index - should have skipHash enabled and version 4
  1440. index = repo.open_index()
  1441. self.assertTrue(index._skip_hash)
  1442. self.assertEqual(index._version, 4)
  1443. def test_repo_with_explicit_index_settings(self):
  1444. """Test that explicit index.version and index.skipHash work."""
  1445. # Create a new repository
  1446. repo = Repo.init(self.tempdir)
  1447. # Set explicit index settings
  1448. config = repo.get_config()
  1449. config.set(b"index", b"version", b"3")
  1450. config.set(b"index", b"skipHash", b"false")
  1451. config.write_to_path()
  1452. # Open the index - should respect explicit settings
  1453. index = repo.open_index()
  1454. self.assertFalse(index._skip_hash)
  1455. self.assertEqual(index._version, 3)
  1456. class TestPathPrefixCompression(TestCase):
  1457. """Tests for index version 4 path prefix compression."""
  1458. def setUp(self):
  1459. self.tempdir = tempfile.mkdtemp()
  1460. self.addCleanup(shutil.rmtree, self.tempdir)
  1461. def test_varint_encoding_decoding(self):
  1462. """Test variable-width integer encoding and decoding."""
  1463. test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 65535, 65536]
  1464. for value in test_values:
  1465. encoded = _encode_varint(value)
  1466. decoded, _ = _decode_varint(encoded, 0)
  1467. self.assertEqual(value, decoded, f"Failed for value {value}")
  1468. def test_path_compression_simple(self):
  1469. """Test simple path compression cases."""
  1470. # Test case 1: No common prefix
  1471. compressed = _compress_path(b"file1.txt", b"")
  1472. decompressed, _ = _decompress_path(compressed, 0, b"")
  1473. self.assertEqual(b"file1.txt", decompressed)
  1474. # Test case 2: Common prefix
  1475. compressed = _compress_path(b"src/file2.txt", b"src/file1.txt")
  1476. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1477. self.assertEqual(b"src/file2.txt", decompressed)
  1478. # Test case 3: Completely different paths
  1479. compressed = _compress_path(b"docs/readme.md", b"src/file1.txt")
  1480. decompressed, _ = _decompress_path(compressed, 0, b"src/file1.txt")
  1481. self.assertEqual(b"docs/readme.md", decompressed)
  1482. def test_path_compression_deep_directories(self):
  1483. """Test compression with deep directory structures."""
  1484. path1 = b"src/main/java/com/example/service/UserService.java"
  1485. path2 = b"src/main/java/com/example/service/OrderService.java"
  1486. path3 = b"src/main/java/com/example/model/User.java"
  1487. # Compress path2 relative to path1
  1488. compressed = _compress_path(path2, path1)
  1489. decompressed, _ = _decompress_path(compressed, 0, path1)
  1490. self.assertEqual(path2, decompressed)
  1491. # Compress path3 relative to path2
  1492. compressed = _compress_path(path3, path2)
  1493. decompressed, _ = _decompress_path(compressed, 0, path2)
  1494. self.assertEqual(path3, decompressed)
  1495. def test_index_version_4_with_compression(self):
  1496. """Test full index version 4 write/read with path compression."""
  1497. index_path = os.path.join(self.tempdir, "index")
  1498. # Create an index with version 4
  1499. index = Index(index_path, read=False, version=4)
  1500. # Add multiple entries with common prefixes
  1501. paths = [
  1502. b"src/main/java/App.java",
  1503. b"src/main/java/Utils.java",
  1504. b"src/main/resources/config.properties",
  1505. b"src/test/java/AppTest.java",
  1506. b"docs/README.md",
  1507. b"docs/INSTALL.md",
  1508. ]
  1509. for i, path in enumerate(paths):
  1510. entry = IndexEntry(
  1511. ctime=(1234567890, 0),
  1512. mtime=(1234567890, 0),
  1513. dev=1,
  1514. ino=i + 1,
  1515. mode=0o100644,
  1516. uid=1000,
  1517. gid=1000,
  1518. size=10,
  1519. sha=f"{i:040d}".encode(),
  1520. )
  1521. index[path] = entry
  1522. # Write and read back
  1523. index.write()
  1524. # Read the index back
  1525. index2 = Index(index_path)
  1526. self.assertEqual(index2._version, 4)
  1527. # Verify all paths were preserved correctly
  1528. for path in paths:
  1529. self.assertIn(path, index2)
  1530. # Verify the index file is smaller than version 2 would be
  1531. with open(index_path, "rb") as f:
  1532. v4_size = len(f.read())
  1533. # Create equivalent version 2 index for comparison
  1534. index_v2_path = os.path.join(self.tempdir, "index_v2")
  1535. index_v2 = Index(index_v2_path, read=False, version=2)
  1536. for path in paths:
  1537. entry = IndexEntry(
  1538. ctime=(1234567890, 0),
  1539. mtime=(1234567890, 0),
  1540. dev=1,
  1541. ino=1,
  1542. mode=0o100644,
  1543. uid=1000,
  1544. gid=1000,
  1545. size=10,
  1546. sha=ZERO_SHA,
  1547. )
  1548. index_v2[path] = entry
  1549. index_v2.write()
  1550. with open(index_v2_path, "rb") as f:
  1551. v2_size = len(f.read())
  1552. # Version 4 should be smaller due to compression
  1553. self.assertLess(
  1554. v4_size, v2_size, "Version 4 index should be smaller than version 2"
  1555. )
  1556. def test_path_compression_edge_cases(self):
  1557. """Test edge cases in path compression."""
  1558. # Empty paths
  1559. compressed = _compress_path(b"", b"")
  1560. decompressed, _ = _decompress_path(compressed, 0, b"")
  1561. self.assertEqual(b"", decompressed)
  1562. # Path identical to previous
  1563. compressed = _compress_path(b"same.txt", b"same.txt")
  1564. decompressed, _ = _decompress_path(compressed, 0, b"same.txt")
  1565. self.assertEqual(b"same.txt", decompressed)
  1566. # Path shorter than previous
  1567. compressed = _compress_path(b"short", b"very/long/path/file.txt")
  1568. decompressed, _ = _decompress_path(compressed, 0, b"very/long/path/file.txt")
  1569. self.assertEqual(b"short", decompressed)
  1570. class TestDetectCaseOnlyRenames(TestCase):
  1571. """Tests for detect_case_only_renames function."""
  1572. def setUp(self):
  1573. self.config = ConfigDict()
  1574. def test_no_renames(self):
  1575. """Test when there are no renames."""
  1576. changes = [
  1577. TreeChange(
  1578. CHANGE_DELETE,
  1579. TreeEntry(b"file1.txt", 0o100644, b"a" * 40),
  1580. None,
  1581. ),
  1582. TreeChange(
  1583. CHANGE_ADD,
  1584. None,
  1585. TreeEntry(b"file2.txt", 0o100644, b"b" * 40),
  1586. ),
  1587. ]
  1588. result = detect_case_only_renames(changes, self.config)
  1589. # No case-only renames, so should return original changes
  1590. self.assertEqual(changes, result)
  1591. def test_simple_case_rename(self):
  1592. """Test simple case-only rename detection."""
  1593. # Default config uses case-insensitive comparison
  1594. changes = [
  1595. TreeChange(
  1596. CHANGE_DELETE,
  1597. TreeEntry(b"README.txt", 0o100644, b"a" * 40),
  1598. None,
  1599. ),
  1600. TreeChange(
  1601. CHANGE_ADD,
  1602. None,
  1603. TreeEntry(b"readme.txt", 0o100644, b"a" * 40),
  1604. ),
  1605. ]
  1606. result = detect_case_only_renames(changes, self.config)
  1607. # Should return one CHANGE_RENAME instead of ADD/DELETE pair
  1608. self.assertEqual(1, len(result))
  1609. self.assertEqual(CHANGE_RENAME, result[0].type)
  1610. self.assertEqual(b"README.txt", result[0].old.path)
  1611. self.assertEqual(b"readme.txt", result[0].new.path)
  1612. def test_nested_path_case_rename(self):
  1613. """Test case-only rename in nested paths."""
  1614. changes = [
  1615. TreeChange(
  1616. CHANGE_DELETE,
  1617. TreeEntry(b"src/Main.java", 0o100644, b"a" * 40),
  1618. None,
  1619. ),
  1620. TreeChange(
  1621. CHANGE_ADD,
  1622. None,
  1623. TreeEntry(b"src/main.java", 0o100644, b"a" * 40),
  1624. ),
  1625. ]
  1626. result = detect_case_only_renames(changes, self.config)
  1627. # Should return one CHANGE_RENAME instead of ADD/DELETE pair
  1628. self.assertEqual(1, len(result))
  1629. self.assertEqual(CHANGE_RENAME, result[0].type)
  1630. self.assertEqual(b"src/Main.java", result[0].old.path)
  1631. self.assertEqual(b"src/main.java", result[0].new.path)
  1632. def test_multiple_case_renames(self):
  1633. """Test multiple case-only renames."""
  1634. changes = [
  1635. TreeChange(
  1636. CHANGE_DELETE,
  1637. TreeEntry(b"File1.txt", 0o100644, b"a" * 40),
  1638. None,
  1639. ),
  1640. TreeChange(
  1641. CHANGE_DELETE,
  1642. TreeEntry(b"File2.TXT", 0o100644, b"b" * 40),
  1643. None,
  1644. ),
  1645. TreeChange(
  1646. CHANGE_ADD,
  1647. None,
  1648. TreeEntry(b"file1.txt", 0o100644, b"a" * 40),
  1649. ),
  1650. TreeChange(
  1651. CHANGE_ADD,
  1652. None,
  1653. TreeEntry(b"file2.txt", 0o100644, b"b" * 40),
  1654. ),
  1655. ]
  1656. result = detect_case_only_renames(changes, self.config)
  1657. # Should return two CHANGE_RENAME instead of ADD/DELETE pairs
  1658. self.assertEqual(2, len(result))
  1659. rename_changes = [c for c in result if c.type == CHANGE_RENAME]
  1660. self.assertEqual(2, len(rename_changes))
  1661. # Check that the renames are correct (order may vary)
  1662. rename_map = {c.old.path: c.new.path for c in rename_changes}
  1663. self.assertEqual(
  1664. {b"File1.txt": b"file1.txt", b"File2.TXT": b"file2.txt"}, rename_map
  1665. )
  1666. def test_case_rename_with_modify(self):
  1667. """Test case rename detection with CHANGE_MODIFY."""
  1668. changes = [
  1669. TreeChange(
  1670. CHANGE_DELETE,
  1671. TreeEntry(b"README.md", 0o100644, b"a" * 40),
  1672. None,
  1673. ),
  1674. TreeChange(
  1675. CHANGE_MODIFY,
  1676. TreeEntry(b"readme.md", 0o100644, b"a" * 40),
  1677. TreeEntry(b"readme.md", 0o100644, b"b" * 40),
  1678. ),
  1679. ]
  1680. result = detect_case_only_renames(changes, self.config)
  1681. # Should return one CHANGE_RENAME instead of DELETE/MODIFY pair
  1682. self.assertEqual(1, len(result))
  1683. self.assertEqual(CHANGE_RENAME, result[0].type)
  1684. self.assertEqual(b"README.md", result[0].old.path)
  1685. self.assertEqual(b"readme.md", result[0].new.path)
  1686. def test_hfs_normalization(self):
  1687. """Test case rename detection with HFS+ normalization."""
  1688. # Configure for HFS+ (macOS)
  1689. self.config.set((b"core",), b"protectHFS", b"true")
  1690. self.config.set((b"core",), b"protectNTFS", b"false")
  1691. # Test with composed vs decomposed Unicode
  1692. changes = [
  1693. TreeChange(
  1694. CHANGE_DELETE,
  1695. TreeEntry("café.txt".encode(), 0o100644, b"a" * 40),
  1696. None,
  1697. ),
  1698. TreeChange(
  1699. CHANGE_ADD,
  1700. None,
  1701. TreeEntry("CAFÉ.txt".encode(), 0o100644, b"a" * 40),
  1702. ),
  1703. ]
  1704. result = detect_case_only_renames(changes, self.config)
  1705. # Should return one CHANGE_RENAME for the case-only rename
  1706. self.assertEqual(1, len(result))
  1707. self.assertEqual(CHANGE_RENAME, result[0].type)
  1708. self.assertEqual("café.txt".encode(), result[0].old.path)
  1709. self.assertEqual("CAFÉ.txt".encode(), result[0].new.path)
  1710. def test_ntfs_normalization(self):
  1711. """Test case rename detection with NTFS normalization."""
  1712. # Configure for NTFS (Windows)
  1713. self.config.set((b"core",), b"protectNTFS", b"true")
  1714. self.config.set((b"core",), b"protectHFS", b"false")
  1715. # NTFS strips trailing dots and spaces
  1716. changes = [
  1717. TreeChange(
  1718. CHANGE_DELETE,
  1719. TreeEntry(b"file.txt.", 0o100644, b"a" * 40),
  1720. None,
  1721. ),
  1722. TreeChange(
  1723. CHANGE_ADD,
  1724. None,
  1725. TreeEntry(b"FILE.TXT", 0o100644, b"a" * 40),
  1726. ),
  1727. ]
  1728. result = detect_case_only_renames(changes, self.config)
  1729. # Should return one CHANGE_RENAME for the case-only rename
  1730. self.assertEqual(1, len(result))
  1731. self.assertEqual(CHANGE_RENAME, result[0].type)
  1732. self.assertEqual(b"file.txt.", result[0].old.path)
  1733. self.assertEqual(b"FILE.TXT", result[0].new.path)
  1734. def test_invalid_utf8_handling(self):
  1735. """Test handling of invalid UTF-8 in paths."""
  1736. # Invalid UTF-8 sequence
  1737. invalid_path = b"\xff\xfe"
  1738. changes = [
  1739. TreeChange(
  1740. CHANGE_DELETE,
  1741. TreeEntry(invalid_path, 0o100644, b"a" * 40),
  1742. None,
  1743. ),
  1744. TreeChange(
  1745. CHANGE_ADD,
  1746. None,
  1747. TreeEntry(b"valid.txt", 0o100644, b"b" * 40),
  1748. ),
  1749. ]
  1750. # Should not crash, just skip invalid paths
  1751. result = detect_case_only_renames(changes, self.config)
  1752. # No case-only renames detected, returns original changes
  1753. self.assertEqual(changes, result)
  1754. def test_rename_and_copy_changes(self):
  1755. """Test case rename detection with CHANGE_RENAME and CHANGE_COPY."""
  1756. changes = [
  1757. TreeChange(
  1758. CHANGE_DELETE,
  1759. TreeEntry(b"OldFile.txt", 0o100644, b"a" * 40),
  1760. None,
  1761. ),
  1762. TreeChange(
  1763. CHANGE_RENAME,
  1764. TreeEntry(b"other.txt", 0o100644, b"b" * 40),
  1765. TreeEntry(b"oldfile.txt", 0o100644, b"a" * 40),
  1766. ),
  1767. TreeChange(
  1768. CHANGE_COPY,
  1769. TreeEntry(b"source.txt", 0o100644, b"c" * 40),
  1770. TreeEntry(b"OLDFILE.TXT", 0o100644, b"a" * 40),
  1771. ),
  1772. ]
  1773. result = detect_case_only_renames(changes, self.config)
  1774. # The DELETE of OldFile.txt and COPY to OLDFILE.TXT are detected as a case-only rename
  1775. # The original RENAME (other.txt -> oldfile.txt) remains
  1776. # The COPY is consumed by the case-only rename detection
  1777. self.assertEqual(2, len(result))
  1778. # Find the changes
  1779. rename_changes = [c for c in result if c.type == CHANGE_RENAME]
  1780. self.assertEqual(2, len(rename_changes))
  1781. # Check for the case-only rename
  1782. case_rename = None
  1783. for change in rename_changes:
  1784. if change.old.path == b"OldFile.txt" and change.new.path == b"OLDFILE.TXT":
  1785. case_rename = change
  1786. break
  1787. self.assertIsNotNone(case_rename)
  1788. self.assertEqual(b"OldFile.txt", case_rename.old.path)
  1789. self.assertEqual(b"OLDFILE.TXT", case_rename.new.path)
  1790. class TestUpdateWorkingTree(TestCase):
  1791. def setUp(self):
  1792. self.tempdir = tempfile.mkdtemp()
  1793. def cleanup_tempdir():
  1794. """Remove tempdir, handling read-only files on Windows."""
  1795. def remove_readonly(func, path, excinfo):
  1796. """Error handler for Windows read-only files."""
  1797. import stat
  1798. if sys.platform == "win32" and excinfo[0] is PermissionError:
  1799. os.chmod(path, stat.S_IWRITE)
  1800. func(path)
  1801. else:
  1802. raise
  1803. shutil.rmtree(self.tempdir, onerror=remove_readonly)
  1804. self.addCleanup(cleanup_tempdir)
  1805. self.repo = Repo.init(self.tempdir)
  1806. def test_update_working_tree_with_blob_normalizer(self):
  1807. """Test update_working_tree with a blob normalizer."""
  1808. # Create a simple blob normalizer that converts CRLF to LF
  1809. class TestBlobNormalizer:
  1810. def checkout_normalize(self, blob, path):
  1811. # Convert CRLF to LF during checkout
  1812. new_blob = Blob()
  1813. new_blob.data = blob.data.replace(b"\r\n", b"\n")
  1814. return new_blob
  1815. # Create a tree with a file containing CRLF
  1816. blob = Blob()
  1817. blob.data = b"Hello\r\nWorld\r\n"
  1818. self.repo.object_store.add_object(blob)
  1819. tree = Tree()
  1820. tree[b"test.txt"] = (0o100644, blob.id)
  1821. self.repo.object_store.add_object(tree)
  1822. # Update working tree with normalizer
  1823. normalizer = TestBlobNormalizer()
  1824. changes = tree_changes(self.repo.object_store, None, tree.id)
  1825. update_working_tree(
  1826. self.repo,
  1827. None, # old_tree_id
  1828. tree.id, # new_tree_id
  1829. change_iterator=changes,
  1830. blob_normalizer=normalizer,
  1831. )
  1832. # Check that the file was written with LF line endings
  1833. test_file = os.path.join(self.tempdir, "test.txt")
  1834. with open(test_file, "rb") as f:
  1835. content = f.read()
  1836. self.assertEqual(b"Hello\nWorld\n", content)
  1837. # Check that the index has the original blob SHA
  1838. index = self.repo.open_index()
  1839. self.assertEqual(blob.id, index[b"test.txt"].sha)
  1840. def test_update_working_tree_without_blob_normalizer(self):
  1841. """Test update_working_tree without a blob normalizer."""
  1842. # Create a tree with a file containing CRLF
  1843. blob = Blob()
  1844. blob.data = b"Hello\r\nWorld\r\n"
  1845. self.repo.object_store.add_object(blob)
  1846. tree = Tree()
  1847. tree[b"test.txt"] = (0o100644, blob.id)
  1848. self.repo.object_store.add_object(tree)
  1849. # Update working tree without normalizer
  1850. changes = tree_changes(self.repo.object_store, None, tree.id)
  1851. update_working_tree(
  1852. self.repo,
  1853. None, # old_tree_id
  1854. tree.id, # new_tree_id
  1855. change_iterator=changes,
  1856. blob_normalizer=None,
  1857. )
  1858. # Check that the file was written with original CRLF line endings
  1859. test_file = os.path.join(self.tempdir, "test.txt")
  1860. with open(test_file, "rb") as f:
  1861. content = f.read()
  1862. self.assertEqual(b"Hello\r\nWorld\r\n", content)
  1863. # Check that the index has the blob SHA
  1864. index = self.repo.open_index()
  1865. self.assertEqual(blob.id, index[b"test.txt"].sha)
  1866. def test_update_working_tree_remove_directory(self):
  1867. """Test that update_working_tree properly removes directories."""
  1868. # Create initial tree with a directory containing files
  1869. blob1 = Blob()
  1870. blob1.data = b"content1"
  1871. self.repo.object_store.add_object(blob1)
  1872. blob2 = Blob()
  1873. blob2.data = b"content2"
  1874. self.repo.object_store.add_object(blob2)
  1875. tree1 = Tree()
  1876. tree1[b"dir/file1.txt"] = (0o100644, blob1.id)
  1877. tree1[b"dir/file2.txt"] = (0o100644, blob2.id)
  1878. self.repo.object_store.add_object(tree1)
  1879. # Update to tree1 (create directory with files)
  1880. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1881. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1882. # Verify directory and files exist
  1883. dir_path = os.path.join(self.tempdir, "dir")
  1884. self.assertTrue(os.path.isdir(dir_path))
  1885. self.assertTrue(os.path.exists(os.path.join(dir_path, "file1.txt")))
  1886. self.assertTrue(os.path.exists(os.path.join(dir_path, "file2.txt")))
  1887. # Create empty tree (remove everything)
  1888. tree2 = Tree()
  1889. self.repo.object_store.add_object(tree2)
  1890. # Update to empty tree
  1891. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1892. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1893. # Verify directory was removed
  1894. self.assertFalse(os.path.exists(dir_path))
  1895. def test_update_working_tree_submodule_to_file(self):
  1896. """Test replacing a submodule directory with a file."""
  1897. # Create tree with submodule
  1898. submodule_sha = b"a" * 40
  1899. tree1 = Tree()
  1900. tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
  1901. self.repo.object_store.add_object(tree1)
  1902. # Update to tree with submodule
  1903. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1904. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1905. # Verify submodule directory exists with .git file
  1906. submodule_path = os.path.join(self.tempdir, "submodule")
  1907. self.assertTrue(os.path.isdir(submodule_path))
  1908. self.assertTrue(os.path.exists(os.path.join(submodule_path, ".git")))
  1909. # Create tree with file at same path
  1910. blob = Blob()
  1911. blob.data = b"file content"
  1912. self.repo.object_store.add_object(blob)
  1913. tree2 = Tree()
  1914. tree2[b"submodule"] = (0o100644, blob.id)
  1915. self.repo.object_store.add_object(tree2)
  1916. # Update to tree with file (should remove submodule directory and create file)
  1917. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1918. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1919. # Verify it's now a file
  1920. self.assertTrue(os.path.isfile(submodule_path))
  1921. with open(submodule_path, "rb") as f:
  1922. self.assertEqual(b"file content", f.read())
  1923. def test_update_working_tree_directory_with_nested_subdir(self):
  1924. """Test removing directory with nested subdirectories."""
  1925. # Create tree with nested directories
  1926. blob = Blob()
  1927. blob.data = b"deep content"
  1928. self.repo.object_store.add_object(blob)
  1929. tree1 = Tree()
  1930. tree1[b"a/b/c/file.txt"] = (0o100644, blob.id)
  1931. self.repo.object_store.add_object(tree1)
  1932. # Update to tree1
  1933. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1934. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1935. # Verify nested structure exists
  1936. path_a = os.path.join(self.tempdir, "a")
  1937. path_b = os.path.join(path_a, "b")
  1938. path_c = os.path.join(path_b, "c")
  1939. file_path = os.path.join(path_c, "file.txt")
  1940. self.assertTrue(os.path.exists(file_path))
  1941. # Create empty tree
  1942. tree2 = Tree()
  1943. self.repo.object_store.add_object(tree2)
  1944. # Update to empty tree
  1945. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1946. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1947. # Verify all directories were removed
  1948. self.assertFalse(os.path.exists(path_a))
  1949. def test_update_working_tree_file_replaced_by_dir_not_removed(self):
  1950. """Test that a directory replacing a git file is left alone if not empty."""
  1951. # Create tree with a file
  1952. blob = Blob()
  1953. blob.data = b"file content"
  1954. self.repo.object_store.add_object(blob)
  1955. tree1 = Tree()
  1956. tree1[b"path"] = (0o100644, blob.id)
  1957. self.repo.object_store.add_object(tree1)
  1958. # Update to tree1
  1959. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1960. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1961. # Verify file exists
  1962. file_path = os.path.join(self.tempdir, "path")
  1963. self.assertTrue(os.path.isfile(file_path))
  1964. # Manually replace file with directory containing untracked file
  1965. os.remove(file_path)
  1966. os.mkdir(file_path)
  1967. with open(os.path.join(file_path, "untracked.txt"), "w") as f:
  1968. f.write("untracked content")
  1969. # Create empty tree
  1970. tree2 = Tree()
  1971. self.repo.object_store.add_object(tree2)
  1972. # Update should succeed but leave the directory alone
  1973. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  1974. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  1975. # Directory should still exist with its contents
  1976. self.assertTrue(os.path.isdir(file_path))
  1977. self.assertTrue(os.path.exists(os.path.join(file_path, "untracked.txt")))
  1978. def test_update_working_tree_file_replaced_by_empty_dir_removed(self):
  1979. """Test that an empty directory replacing a git file is removed."""
  1980. # Create tree with a file
  1981. blob = Blob()
  1982. blob.data = b"file content"
  1983. self.repo.object_store.add_object(blob)
  1984. tree1 = Tree()
  1985. tree1[b"path"] = (0o100644, blob.id)
  1986. self.repo.object_store.add_object(tree1)
  1987. # Update to tree1
  1988. changes = tree_changes(self.repo.object_store, None, tree1.id)
  1989. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  1990. # Verify file exists
  1991. file_path = os.path.join(self.tempdir, "path")
  1992. self.assertTrue(os.path.isfile(file_path))
  1993. # Manually replace file with empty directory
  1994. os.remove(file_path)
  1995. os.mkdir(file_path)
  1996. # Create empty tree
  1997. tree2 = Tree()
  1998. self.repo.object_store.add_object(tree2)
  1999. # Update should remove the empty directory
  2000. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2001. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2002. # Directory should be gone
  2003. self.assertFalse(os.path.exists(file_path))
  2004. def test_update_working_tree_symlink_transitions(self):
  2005. """Test transitions involving symlinks."""
  2006. # Skip on Windows where symlinks might not be supported
  2007. if sys.platform == "win32":
  2008. self.skipTest("Symlinks not fully supported on Windows")
  2009. # Create tree with symlink
  2010. blob1 = Blob()
  2011. blob1.data = b"target/path"
  2012. self.repo.object_store.add_object(blob1)
  2013. tree1 = Tree()
  2014. tree1[b"link"] = (0o120000, blob1.id) # Symlink mode
  2015. self.repo.object_store.add_object(tree1)
  2016. # Update to tree with symlink
  2017. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2018. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2019. link_path = os.path.join(self.tempdir, "link")
  2020. self.assertTrue(os.path.islink(link_path))
  2021. self.assertEqual(b"target/path", os.readlink(link_path).encode())
  2022. # Test 1: Replace symlink with regular file
  2023. blob2 = Blob()
  2024. blob2.data = b"file content"
  2025. self.repo.object_store.add_object(blob2)
  2026. tree2 = Tree()
  2027. tree2[b"link"] = (0o100644, blob2.id)
  2028. self.repo.object_store.add_object(tree2)
  2029. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2030. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2031. self.assertFalse(os.path.islink(link_path))
  2032. self.assertTrue(os.path.isfile(link_path))
  2033. with open(link_path, "rb") as f:
  2034. self.assertEqual(b"file content", f.read())
  2035. # Test 2: Replace file with symlink
  2036. changes = tree_changes(self.repo.object_store, tree2.id, tree1.id)
  2037. update_working_tree(self.repo, tree2.id, tree1.id, change_iterator=changes)
  2038. self.assertTrue(os.path.islink(link_path))
  2039. self.assertEqual(b"target/path", os.readlink(link_path).encode())
  2040. # Test 3: Replace symlink with directory (manually)
  2041. os.unlink(link_path)
  2042. os.mkdir(link_path)
  2043. # Create empty tree
  2044. tree3 = Tree()
  2045. self.repo.object_store.add_object(tree3)
  2046. # Should remove empty directory
  2047. changes = tree_changes(self.repo.object_store, tree1.id, tree3.id)
  2048. update_working_tree(self.repo, tree1.id, tree3.id, change_iterator=changes)
  2049. self.assertFalse(os.path.exists(link_path))
  2050. def test_update_working_tree_modified_file_to_dir_transition(self):
  2051. """Test that modified files are not removed when they should be directories."""
  2052. # Create tree with file
  2053. blob1 = Blob()
  2054. blob1.data = b"original content"
  2055. self.repo.object_store.add_object(blob1)
  2056. tree1 = Tree()
  2057. tree1[b"path"] = (0o100644, blob1.id)
  2058. self.repo.object_store.add_object(tree1)
  2059. # Update to tree1
  2060. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2061. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2062. file_path = os.path.join(self.tempdir, "path")
  2063. # Modify the file locally
  2064. with open(file_path, "w") as f:
  2065. f.write("modified content")
  2066. # Create tree where path is a directory with file
  2067. blob2 = Blob()
  2068. blob2.data = b"subfile content"
  2069. self.repo.object_store.add_object(blob2)
  2070. tree2 = Tree()
  2071. tree2[b"path/subfile"] = (0o100644, blob2.id)
  2072. self.repo.object_store.add_object(tree2)
  2073. # Update should fail because can't create directory where modified file exists
  2074. with self.assertRaises(IOError):
  2075. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2076. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2077. # File should still exist with modifications
  2078. self.assertTrue(os.path.isfile(file_path))
  2079. with open(file_path) as f:
  2080. self.assertEqual("modified content", f.read())
  2081. def test_update_working_tree_executable_transitions(self):
  2082. """Test transitions involving executable bit changes."""
  2083. # Skip on Windows where executable bit is not supported
  2084. if sys.platform == "win32":
  2085. self.skipTest("Executable bit not supported on Windows")
  2086. # Create tree with non-executable file
  2087. blob = Blob()
  2088. blob.data = b"#!/bin/sh\necho hello"
  2089. self.repo.object_store.add_object(blob)
  2090. tree1 = Tree()
  2091. tree1[b"script.sh"] = (0o100644, blob.id) # Non-executable
  2092. self.repo.object_store.add_object(tree1)
  2093. # Update to tree1
  2094. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2095. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2096. script_path = os.path.join(self.tempdir, "script.sh")
  2097. self.assertTrue(os.path.isfile(script_path))
  2098. # Check it's not executable
  2099. mode = os.stat(script_path).st_mode
  2100. self.assertFalse(mode & stat.S_IXUSR)
  2101. # Create tree with executable file (same content)
  2102. tree2 = Tree()
  2103. tree2[b"script.sh"] = (0o100755, blob.id) # Executable
  2104. self.repo.object_store.add_object(tree2)
  2105. # Update to tree2
  2106. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2107. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2108. # Check it's now executable
  2109. mode = os.stat(script_path).st_mode
  2110. self.assertTrue(mode & stat.S_IXUSR)
  2111. def test_update_working_tree_submodule_with_untracked_files(self):
  2112. """Test that submodules with untracked files are not removed."""
  2113. from dulwich.objects import S_IFGITLINK, Tree
  2114. # Create tree with submodule
  2115. submodule_sha = b"a" * 40
  2116. tree1 = Tree()
  2117. tree1[b"submodule"] = (S_IFGITLINK, submodule_sha)
  2118. self.repo.object_store.add_object(tree1)
  2119. # Update to tree with submodule
  2120. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2121. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2122. # Add untracked file to submodule directory
  2123. submodule_path = os.path.join(self.tempdir, "submodule")
  2124. untracked_path = os.path.join(submodule_path, "untracked.txt")
  2125. with open(untracked_path, "w") as f:
  2126. f.write("untracked content")
  2127. # Create empty tree
  2128. tree2 = Tree()
  2129. self.repo.object_store.add_object(tree2)
  2130. # Update should not remove submodule directory with untracked files
  2131. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2132. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2133. # Directory should still exist with untracked file
  2134. self.assertTrue(os.path.isdir(submodule_path))
  2135. self.assertTrue(os.path.exists(untracked_path))
  2136. def test_update_working_tree_dir_to_file_with_subdir(self):
  2137. """Test replacing directory structure with a file."""
  2138. # Create tree with nested directory structure
  2139. blob1 = Blob()
  2140. blob1.data = b"content1"
  2141. self.repo.object_store.add_object(blob1)
  2142. blob2 = Blob()
  2143. blob2.data = b"content2"
  2144. self.repo.object_store.add_object(blob2)
  2145. tree1 = Tree()
  2146. tree1[b"dir/subdir/file1"] = (0o100644, blob1.id)
  2147. tree1[b"dir/subdir/file2"] = (0o100644, blob2.id)
  2148. self.repo.object_store.add_object(tree1)
  2149. # Update to tree1
  2150. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2151. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2152. # Verify structure exists
  2153. dir_path = os.path.join(self.tempdir, "dir")
  2154. self.assertTrue(os.path.isdir(dir_path))
  2155. # Add an untracked file to make directory truly non-empty
  2156. untracked_path = os.path.join(dir_path, "untracked.txt")
  2157. with open(untracked_path, "w") as f:
  2158. f.write("untracked content")
  2159. # Create tree with file at "dir" path
  2160. blob3 = Blob()
  2161. blob3.data = b"replacement file"
  2162. self.repo.object_store.add_object(blob3)
  2163. tree2 = Tree()
  2164. tree2[b"dir"] = (0o100644, blob3.id)
  2165. self.repo.object_store.add_object(tree2)
  2166. # Update should fail because directory is not empty
  2167. with self.assertRaises(IsADirectoryError):
  2168. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2169. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2170. # Directory should still exist
  2171. self.assertTrue(os.path.isdir(dir_path))
  2172. def test_update_working_tree_case_sensitivity(self):
  2173. """Test handling of case-sensitive filename changes."""
  2174. # Detect if filesystem is case-insensitive by testing
  2175. test_file = os.path.join(self.tempdir, "TeSt.tmp")
  2176. with open(test_file, "w") as f:
  2177. f.write("test")
  2178. is_case_insensitive = os.path.exists(os.path.join(self.tempdir, "test.tmp"))
  2179. os.unlink(test_file)
  2180. # Set core.ignorecase to match actual filesystem behavior
  2181. # (This ensures test works correctly regardless of platform defaults)
  2182. config = self.repo.get_config()
  2183. config.set((b"core",), b"ignorecase", is_case_insensitive)
  2184. config.write_to_path()
  2185. # Create tree with lowercase file
  2186. blob1 = Blob()
  2187. blob1.data = b"lowercase content"
  2188. self.repo.object_store.add_object(blob1)
  2189. tree1 = Tree()
  2190. tree1[b"readme.txt"] = (0o100644, blob1.id)
  2191. self.repo.object_store.add_object(tree1)
  2192. # Update to tree1
  2193. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2194. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2195. # Create tree with uppercase file (different content)
  2196. blob2 = Blob()
  2197. blob2.data = b"uppercase content"
  2198. self.repo.object_store.add_object(blob2)
  2199. tree2 = Tree()
  2200. tree2[b"README.txt"] = (0o100644, blob2.id)
  2201. self.repo.object_store.add_object(tree2)
  2202. # Update to tree2
  2203. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2204. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2205. # Check what exists (behavior depends on filesystem)
  2206. lowercase_path = os.path.join(self.tempdir, "readme.txt")
  2207. uppercase_path = os.path.join(self.tempdir, "README.txt")
  2208. if is_case_insensitive:
  2209. # On case-insensitive filesystems, should have one file with new content
  2210. # The exact case of the filename may vary by OS
  2211. self.assertTrue(
  2212. os.path.exists(lowercase_path) or os.path.exists(uppercase_path)
  2213. )
  2214. # Verify content is the new content
  2215. if os.path.exists(lowercase_path):
  2216. with open(lowercase_path, "rb") as f:
  2217. self.assertEqual(b"uppercase content", f.read())
  2218. else:
  2219. with open(uppercase_path, "rb") as f:
  2220. self.assertEqual(b"uppercase content", f.read())
  2221. else:
  2222. # On case-sensitive filesystems, only the uppercase file should exist
  2223. self.assertFalse(os.path.exists(lowercase_path))
  2224. self.assertTrue(os.path.exists(uppercase_path))
  2225. with open(uppercase_path, "rb") as f:
  2226. self.assertEqual(b"uppercase content", f.read())
  2227. def test_update_working_tree_case_rename_updates_filename(self):
  2228. """Test that case-only renames update the actual filename on case-insensitive FS."""
  2229. # Detect if filesystem is case-insensitive by testing
  2230. test_file = os.path.join(self.tempdir, "TeSt.tmp")
  2231. with open(test_file, "w") as f:
  2232. f.write("test")
  2233. is_case_insensitive = os.path.exists(os.path.join(self.tempdir, "test.tmp"))
  2234. os.unlink(test_file)
  2235. if not is_case_insensitive:
  2236. self.skipTest("Test only relevant on case-insensitive filesystems")
  2237. # Set core.ignorecase to match actual filesystem behavior
  2238. config = self.repo.get_config()
  2239. config.set((b"core",), b"ignorecase", True)
  2240. config.write_to_path()
  2241. # Create tree with lowercase file
  2242. blob1 = Blob()
  2243. blob1.data = b"same content" # Using same content to test pure case rename
  2244. self.repo.object_store.add_object(blob1)
  2245. tree1 = Tree()
  2246. tree1[b"readme.txt"] = (0o100644, blob1.id)
  2247. self.repo.object_store.add_object(tree1)
  2248. # Update to tree1
  2249. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2250. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2251. # Verify initial state
  2252. files = [f for f in os.listdir(self.tempdir) if not f.startswith(".git")]
  2253. self.assertEqual(["readme.txt"], files)
  2254. # Create tree with uppercase file (same content, same blob)
  2255. tree2 = Tree()
  2256. tree2[b"README.txt"] = (0o100644, blob1.id) # Same blob!
  2257. self.repo.object_store.add_object(tree2)
  2258. # Update to tree2 (case-only rename)
  2259. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2260. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2261. # On case-insensitive filesystems, should have one file with updated case
  2262. files = [f for f in os.listdir(self.tempdir) if not f.startswith(".git")]
  2263. self.assertEqual(
  2264. 1, len(files), "Should have exactly one file after case rename"
  2265. )
  2266. # The file should now have the new case in the directory listing
  2267. actual_filename = files[0]
  2268. self.assertEqual(
  2269. "README.txt",
  2270. actual_filename,
  2271. "Filename case should be updated in directory listing",
  2272. )
  2273. # Verify content is preserved
  2274. file_path = os.path.join(self.tempdir, actual_filename)
  2275. with open(file_path, "rb") as f:
  2276. self.assertEqual(b"same content", f.read())
  2277. # Both old and new case should access the same file
  2278. lowercase_path = os.path.join(self.tempdir, "readme.txt")
  2279. uppercase_path = os.path.join(self.tempdir, "README.txt")
  2280. self.assertTrue(os.path.exists(lowercase_path))
  2281. self.assertTrue(os.path.exists(uppercase_path))
  2282. def test_update_working_tree_deeply_nested_removal(self):
  2283. """Test removal of deeply nested directory structures."""
  2284. # Create deeply nested structure
  2285. blob = Blob()
  2286. blob.data = b"deep content"
  2287. self.repo.object_store.add_object(blob)
  2288. tree1 = Tree()
  2289. # Create a very deep path
  2290. deep_path = b"/".join([b"level%d" % i for i in range(10)])
  2291. tree1[deep_path + b"/file.txt"] = (0o100644, blob.id)
  2292. self.repo.object_store.add_object(tree1)
  2293. # Update to tree1
  2294. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2295. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2296. # Verify deep structure exists
  2297. current_path = self.tempdir
  2298. for i in range(10):
  2299. current_path = os.path.join(current_path, f"level{i}")
  2300. self.assertTrue(os.path.isdir(current_path))
  2301. # Create empty tree
  2302. tree2 = Tree()
  2303. self.repo.object_store.add_object(tree2)
  2304. # Update should remove all empty directories
  2305. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2306. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2307. # Verify top level directory is gone
  2308. top_level = os.path.join(self.tempdir, "level0")
  2309. self.assertFalse(os.path.exists(top_level))
  2310. def test_update_working_tree_read_only_files(self):
  2311. """Test handling of read-only files during updates."""
  2312. # Create tree with file
  2313. blob1 = Blob()
  2314. blob1.data = b"original content"
  2315. self.repo.object_store.add_object(blob1)
  2316. tree1 = Tree()
  2317. tree1[b"readonly.txt"] = (0o100644, blob1.id)
  2318. self.repo.object_store.add_object(tree1)
  2319. # Update to tree1
  2320. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2321. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2322. # Make file read-only
  2323. file_path = os.path.join(self.tempdir, "readonly.txt")
  2324. os.chmod(file_path, 0o444) # Read-only
  2325. # Create tree with modified file
  2326. blob2 = Blob()
  2327. blob2.data = b"new content"
  2328. self.repo.object_store.add_object(blob2)
  2329. tree2 = Tree()
  2330. tree2[b"readonly.txt"] = (0o100644, blob2.id)
  2331. self.repo.object_store.add_object(tree2)
  2332. # Update should handle read-only file
  2333. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2334. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2335. # Verify content was updated
  2336. with open(file_path, "rb") as f:
  2337. self.assertEqual(b"new content", f.read())
  2338. def test_update_working_tree_invalid_filenames(self):
  2339. """Test handling of invalid filenames for the platform."""
  2340. # Create tree with potentially problematic filenames
  2341. blob = Blob()
  2342. blob.data = b"content"
  2343. self.repo.object_store.add_object(blob)
  2344. tree = Tree()
  2345. # Add files with names that might be invalid on some platforms
  2346. tree[b"valid.txt"] = (0o100644, blob.id)
  2347. if sys.platform != "win32":
  2348. # These are invalid on Windows but valid on Unix
  2349. tree[b"file:with:colons.txt"] = (0o100644, blob.id)
  2350. tree[b"file<with>brackets.txt"] = (0o100644, blob.id)
  2351. self.repo.object_store.add_object(tree)
  2352. # Update should skip invalid files based on validation
  2353. changes = tree_changes(self.repo.object_store, None, tree.id)
  2354. update_working_tree(self.repo, None, tree.id, change_iterator=changes)
  2355. # Valid file should exist
  2356. self.assertTrue(os.path.exists(os.path.join(self.tempdir, "valid.txt")))
  2357. def test_update_working_tree_symlink_to_directory(self):
  2358. """Test replacing a symlink pointing to a directory with a real directory."""
  2359. if sys.platform == "win32":
  2360. self.skipTest("Symlinks not fully supported on Windows")
  2361. # Create a target directory
  2362. target_dir = os.path.join(self.tempdir, "target")
  2363. os.mkdir(target_dir)
  2364. with open(os.path.join(target_dir, "file.txt"), "w") as f:
  2365. f.write("target file")
  2366. # Create tree with symlink pointing to directory
  2367. blob1 = Blob()
  2368. blob1.data = b"target" # Relative path to target directory
  2369. self.repo.object_store.add_object(blob1)
  2370. tree1 = Tree()
  2371. tree1[b"link"] = (0o120000, blob1.id)
  2372. self.repo.object_store.add_object(tree1)
  2373. # Update to tree1
  2374. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2375. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2376. link_path = os.path.join(self.tempdir, "link")
  2377. self.assertTrue(os.path.islink(link_path))
  2378. # Create tree with actual directory at same path
  2379. blob2 = Blob()
  2380. blob2.data = b"new file content"
  2381. self.repo.object_store.add_object(blob2)
  2382. tree2 = Tree()
  2383. tree2[b"link/newfile.txt"] = (0o100644, blob2.id)
  2384. self.repo.object_store.add_object(tree2)
  2385. # Update should replace symlink with actual directory
  2386. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2387. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2388. self.assertFalse(os.path.islink(link_path))
  2389. self.assertTrue(os.path.isdir(link_path))
  2390. self.assertTrue(os.path.exists(os.path.join(link_path, "newfile.txt")))
  2391. def test_update_working_tree_comprehensive_transitions(self):
  2392. """Test all possible file type transitions comprehensively."""
  2393. # Skip on Windows where symlinks might not be supported
  2394. if sys.platform == "win32":
  2395. self.skipTest("Symlinks not fully supported on Windows")
  2396. # Create blobs for different file types
  2397. file_blob = Blob()
  2398. file_blob.data = b"regular file content"
  2399. self.repo.object_store.add_object(file_blob)
  2400. exec_blob = Blob()
  2401. exec_blob.data = b"#!/bin/sh\necho executable"
  2402. self.repo.object_store.add_object(exec_blob)
  2403. link_blob = Blob()
  2404. link_blob.data = b"target/path"
  2405. self.repo.object_store.add_object(link_blob)
  2406. submodule_sha = b"a" * 40
  2407. # Test 1: Regular file → Submodule
  2408. tree1 = Tree()
  2409. tree1[b"item"] = (0o100644, file_blob.id)
  2410. self.repo.object_store.add_object(tree1)
  2411. tree2 = Tree()
  2412. tree2[b"item"] = (S_IFGITLINK, submodule_sha)
  2413. self.repo.object_store.add_object(tree2)
  2414. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2415. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2416. self.assertTrue(os.path.isfile(os.path.join(self.tempdir, "item")))
  2417. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2418. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2419. self.assertTrue(os.path.isdir(os.path.join(self.tempdir, "item")))
  2420. # Test 2: Submodule → Executable file
  2421. tree3 = Tree()
  2422. tree3[b"item"] = (0o100755, exec_blob.id)
  2423. self.repo.object_store.add_object(tree3)
  2424. changes = tree_changes(self.repo.object_store, tree2.id, tree3.id)
  2425. update_working_tree(self.repo, tree2.id, tree3.id, change_iterator=changes)
  2426. item_path = os.path.join(self.tempdir, "item")
  2427. self.assertTrue(os.path.isfile(item_path))
  2428. if sys.platform != "win32":
  2429. self.assertTrue(os.access(item_path, os.X_OK))
  2430. # Test 3: Executable file → Symlink
  2431. tree4 = Tree()
  2432. tree4[b"item"] = (0o120000, link_blob.id)
  2433. self.repo.object_store.add_object(tree4)
  2434. changes = tree_changes(self.repo.object_store, tree3.id, tree4.id)
  2435. update_working_tree(self.repo, tree3.id, tree4.id, change_iterator=changes)
  2436. self.assertTrue(os.path.islink(item_path))
  2437. # Test 4: Symlink → Submodule
  2438. tree5 = Tree()
  2439. tree5[b"item"] = (S_IFGITLINK, submodule_sha)
  2440. self.repo.object_store.add_object(tree5)
  2441. changes = tree_changes(self.repo.object_store, tree4.id, tree5.id)
  2442. update_working_tree(self.repo, tree4.id, tree5.id, change_iterator=changes)
  2443. self.assertTrue(os.path.isdir(item_path))
  2444. # Test 5: Clean up - Submodule → absent
  2445. tree6 = Tree()
  2446. self.repo.object_store.add_object(tree6)
  2447. changes = tree_changes(self.repo.object_store, tree5.id, tree6.id)
  2448. update_working_tree(self.repo, tree5.id, tree6.id, change_iterator=changes)
  2449. self.assertFalse(os.path.exists(item_path))
  2450. # Test 6: Symlink → Executable file
  2451. tree7 = Tree()
  2452. tree7[b"item2"] = (0o120000, link_blob.id)
  2453. self.repo.object_store.add_object(tree7)
  2454. changes = tree_changes(self.repo.object_store, tree6.id, tree7.id)
  2455. update_working_tree(self.repo, tree6.id, tree7.id, change_iterator=changes)
  2456. item2_path = os.path.join(self.tempdir, "item2")
  2457. self.assertTrue(os.path.islink(item2_path))
  2458. tree8 = Tree()
  2459. tree8[b"item2"] = (0o100755, exec_blob.id)
  2460. self.repo.object_store.add_object(tree8)
  2461. changes = tree_changes(self.repo.object_store, tree7.id, tree8.id)
  2462. update_working_tree(self.repo, tree7.id, tree8.id, change_iterator=changes)
  2463. self.assertTrue(os.path.isfile(item2_path))
  2464. if sys.platform != "win32":
  2465. self.assertTrue(os.access(item2_path, os.X_OK))
  2466. def test_update_working_tree_partial_update_failure(self):
  2467. """Test handling when update fails partway through."""
  2468. # Create initial tree
  2469. blob1 = Blob()
  2470. blob1.data = b"file1 content"
  2471. self.repo.object_store.add_object(blob1)
  2472. blob2 = Blob()
  2473. blob2.data = b"file2 content"
  2474. self.repo.object_store.add_object(blob2)
  2475. tree1 = Tree()
  2476. tree1[b"file1.txt"] = (0o100644, blob1.id)
  2477. tree1[b"file2.txt"] = (0o100644, blob2.id)
  2478. self.repo.object_store.add_object(tree1)
  2479. # Update to tree1
  2480. changes = tree_changes(self.repo.object_store, None, tree1.id)
  2481. update_working_tree(self.repo, None, tree1.id, change_iterator=changes)
  2482. # Create a directory where file2.txt is, to cause a conflict
  2483. file2_path = os.path.join(self.tempdir, "file2.txt")
  2484. os.remove(file2_path)
  2485. os.mkdir(file2_path)
  2486. # Add untracked file to prevent removal
  2487. with open(os.path.join(file2_path, "blocker.txt"), "w") as f:
  2488. f.write("blocking content")
  2489. # Create tree with updates to both files
  2490. blob3 = Blob()
  2491. blob3.data = b"file1 updated"
  2492. self.repo.object_store.add_object(blob3)
  2493. blob4 = Blob()
  2494. blob4.data = b"file2 updated"
  2495. self.repo.object_store.add_object(blob4)
  2496. tree2 = Tree()
  2497. tree2[b"file1.txt"] = (0o100644, blob3.id)
  2498. tree2[b"file2.txt"] = (0o100644, blob4.id)
  2499. self.repo.object_store.add_object(tree2)
  2500. # Update should partially succeed - file1 updated, file2 blocked
  2501. try:
  2502. changes = tree_changes(self.repo.object_store, tree1.id, tree2.id)
  2503. update_working_tree(self.repo, tree1.id, tree2.id, change_iterator=changes)
  2504. except IsADirectoryError:
  2505. # Expected to fail on file2 because it's a directory
  2506. pass
  2507. # file1 should be updated
  2508. with open(os.path.join(self.tempdir, "file1.txt"), "rb") as f:
  2509. self.assertEqual(b"file1 updated", f.read())
  2510. # file2 should still be a directory
  2511. self.assertTrue(os.path.isdir(file2_path))
  2512. def test_ensure_parent_dir_exists_windows_drive(self):
  2513. """Test that _ensure_parent_dir_exists handles Windows drive letters correctly."""
  2514. from dulwich.index import _ensure_parent_dir_exists
  2515. # Create a temporary directory to work with
  2516. with tempfile.TemporaryDirectory() as tmpdir:
  2517. # Test normal case (creates directory)
  2518. test_path = os.path.join(tmpdir, "subdir", "file.txt").encode()
  2519. _ensure_parent_dir_exists(test_path)
  2520. self.assertTrue(os.path.exists(os.path.dirname(test_path)))
  2521. # Test when parent is a file (should raise error)
  2522. file_path = os.path.join(tmpdir, "testfile").encode()
  2523. with open(file_path, "wb") as f:
  2524. f.write(b"test")
  2525. invalid_path = os.path.join(
  2526. tmpdir.encode(), b"testfile", b"subdir", b"file.txt"
  2527. )
  2528. with self.assertRaisesRegex(
  2529. OSError, "Cannot create directory, parent path is a file"
  2530. ):
  2531. _ensure_parent_dir_exists(invalid_path)
  2532. # Test with nested subdirectories
  2533. nested_path = os.path.join(tmpdir, "a", "b", "c", "d", "file.txt").encode()
  2534. _ensure_parent_dir_exists(nested_path)
  2535. self.assertTrue(os.path.exists(os.path.dirname(nested_path)))
  2536. # Test that various path formats are handled correctly by os.path.dirname
  2537. # This includes Windows drive letters, UNC paths, etc.
  2538. # The key is that we're using os.path.dirname which handles these correctly
  2539. import platform
  2540. if platform.system() == "Windows":
  2541. # Test Windows-specific paths only on Windows
  2542. test_cases = [
  2543. b"C:\\temp\\test\\file.txt",
  2544. b"D:\\file.txt",
  2545. b"\\\\server\\share\\folder\\file.txt",
  2546. ]
  2547. for path in test_cases:
  2548. # Just verify os.path.dirname handles these without errors
  2549. parent = os.path.dirname(path)
  2550. # We're not creating these directories, just testing the logic doesn't fail
  2551. self.assertIsInstance(parent, bytes)
  2552. class TestSparseIndex(TestCase):
  2553. """Tests for sparse index support."""
  2554. def test_serialized_index_entry_is_sparse_dir(self):
  2555. """Test SerializedIndexEntry.is_sparse_dir() method."""
  2556. from dulwich.index import EXTENDED_FLAG_SKIP_WORKTREE
  2557. # Regular file entry - not sparse
  2558. regular_entry = SerializedIndexEntry(
  2559. name=b"file.txt",
  2560. ctime=0,
  2561. mtime=0,
  2562. dev=0,
  2563. ino=0,
  2564. mode=0o100644,
  2565. uid=0,
  2566. gid=0,
  2567. size=0,
  2568. sha=b"\x00" * 20,
  2569. flags=0,
  2570. extended_flags=0,
  2571. )
  2572. self.assertFalse(regular_entry.is_sparse_dir())
  2573. # Directory mode but no skip-worktree flag - not sparse
  2574. dir_entry = SerializedIndexEntry(
  2575. name=b"dir/",
  2576. ctime=0,
  2577. mtime=0,
  2578. dev=0,
  2579. ino=0,
  2580. mode=stat.S_IFDIR,
  2581. uid=0,
  2582. gid=0,
  2583. size=0,
  2584. sha=b"\x00" * 20,
  2585. flags=0,
  2586. extended_flags=0,
  2587. )
  2588. self.assertFalse(dir_entry.is_sparse_dir())
  2589. # Skip-worktree flag but not directory - not sparse
  2590. skip_file = SerializedIndexEntry(
  2591. name=b"file.txt",
  2592. ctime=0,
  2593. mtime=0,
  2594. dev=0,
  2595. ino=0,
  2596. mode=0o100644,
  2597. uid=0,
  2598. gid=0,
  2599. size=0,
  2600. sha=b"\x00" * 20,
  2601. flags=0,
  2602. extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
  2603. )
  2604. self.assertFalse(skip_file.is_sparse_dir())
  2605. # Directory mode + skip-worktree + trailing slash - sparse!
  2606. sparse_dir = SerializedIndexEntry(
  2607. name=b"sparse_dir/",
  2608. ctime=0,
  2609. mtime=0,
  2610. dev=0,
  2611. ino=0,
  2612. mode=stat.S_IFDIR,
  2613. uid=0,
  2614. gid=0,
  2615. size=0,
  2616. sha=b"\x00" * 20,
  2617. flags=0,
  2618. extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
  2619. )
  2620. self.assertTrue(sparse_dir.is_sparse_dir())
  2621. def test_index_entry_is_sparse_dir(self):
  2622. """Test IndexEntry.is_sparse_dir() method."""
  2623. from dulwich.index import EXTENDED_FLAG_SKIP_WORKTREE
  2624. # Regular file - not sparse
  2625. regular = IndexEntry(
  2626. ctime=0,
  2627. mtime=0,
  2628. dev=0,
  2629. ino=0,
  2630. mode=0o100644,
  2631. uid=0,
  2632. gid=0,
  2633. size=0,
  2634. sha=b"\x00" * 20,
  2635. extended_flags=0,
  2636. )
  2637. self.assertFalse(regular.is_sparse_dir(b"file.txt"))
  2638. # Sparse directory entry
  2639. sparse = IndexEntry(
  2640. ctime=0,
  2641. mtime=0,
  2642. dev=0,
  2643. ino=0,
  2644. mode=stat.S_IFDIR,
  2645. uid=0,
  2646. gid=0,
  2647. size=0,
  2648. sha=b"\x00" * 20,
  2649. extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
  2650. )
  2651. self.assertTrue(sparse.is_sparse_dir(b"dir/"))
  2652. self.assertFalse(sparse.is_sparse_dir(b"dir")) # No trailing slash
  2653. def test_sparse_dir_extension(self):
  2654. """Test SparseDirExtension serialization."""
  2655. from dulwich.index import SDIR_EXTENSION, SparseDirExtension
  2656. ext = SparseDirExtension()
  2657. self.assertEqual(ext.signature, SDIR_EXTENSION)
  2658. self.assertEqual(ext.to_bytes(), b"")
  2659. # Test round-trip
  2660. ext2 = SparseDirExtension.from_bytes(b"")
  2661. self.assertEqual(ext2.signature, SDIR_EXTENSION)
  2662. self.assertEqual(ext2.to_bytes(), b"")
  2663. def test_index_is_sparse(self):
  2664. """Test Index.is_sparse() method."""
  2665. from dulwich.index import SparseDirExtension
  2666. with tempfile.TemporaryDirectory() as tmpdir:
  2667. index_path = os.path.join(tmpdir, "index")
  2668. idx = Index(index_path, read=False)
  2669. # Initially not sparse
  2670. self.assertFalse(idx.is_sparse())
  2671. # Add sparse directory extension
  2672. idx._extensions.append(SparseDirExtension())
  2673. self.assertTrue(idx.is_sparse())
  2674. def test_index_expansion(self):
  2675. """Test Index.ensure_full_index() expands sparse directories."""
  2676. from dulwich.index import EXTENDED_FLAG_SKIP_WORKTREE, SparseDirExtension
  2677. from dulwich.object_store import MemoryObjectStore
  2678. from dulwich.objects import Blob, Tree
  2679. # Create a tree structure
  2680. store = MemoryObjectStore()
  2681. blob1 = Blob()
  2682. blob1.data = b"file1"
  2683. store.add_object(blob1)
  2684. blob2 = Blob()
  2685. blob2.data = b"file2"
  2686. store.add_object(blob2)
  2687. subtree = Tree()
  2688. subtree[b"file1.txt"] = (0o100644, blob1.id)
  2689. subtree[b"file2.txt"] = (0o100644, blob2.id)
  2690. store.add_object(subtree)
  2691. # Create an index with a sparse directory entry
  2692. with tempfile.TemporaryDirectory() as tmpdir:
  2693. index_path = os.path.join(tmpdir, "index")
  2694. idx = Index(index_path, read=False)
  2695. # Add sparse directory entry
  2696. sparse_entry = IndexEntry(
  2697. ctime=0,
  2698. mtime=0,
  2699. dev=0,
  2700. ino=0,
  2701. mode=stat.S_IFDIR,
  2702. uid=0,
  2703. gid=0,
  2704. size=0,
  2705. sha=subtree.id,
  2706. extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
  2707. )
  2708. idx[b"subdir/"] = sparse_entry
  2709. idx._extensions.append(SparseDirExtension())
  2710. self.assertTrue(idx.is_sparse())
  2711. self.assertEqual(len(idx), 1)
  2712. # Expand the index
  2713. idx.ensure_full_index(store)
  2714. # Should no longer be sparse
  2715. self.assertFalse(idx.is_sparse())
  2716. # Should have 2 entries now (the files)
  2717. self.assertEqual(len(idx), 2)
  2718. self.assertIn(b"subdir/file1.txt", idx)
  2719. self.assertIn(b"subdir/file2.txt", idx)
  2720. # Entries should point to the correct blobs
  2721. self.assertEqual(idx[b"subdir/file1.txt"].sha, blob1.id)
  2722. self.assertEqual(idx[b"subdir/file2.txt"].sha, blob2.id)
  2723. def test_index_collapse(self):
  2724. """Test Index.convert_to_sparse() collapses directories."""
  2725. from dulwich.object_store import MemoryObjectStore
  2726. from dulwich.objects import Blob, Tree
  2727. # Create a tree structure
  2728. store = MemoryObjectStore()
  2729. blob1 = Blob()
  2730. blob1.data = b"file1"
  2731. store.add_object(blob1)
  2732. blob2 = Blob()
  2733. blob2.data = b"file2"
  2734. store.add_object(blob2)
  2735. subtree = Tree()
  2736. subtree[b"file1.txt"] = (0o100644, blob1.id)
  2737. subtree[b"file2.txt"] = (0o100644, blob2.id)
  2738. store.add_object(subtree)
  2739. tree = Tree()
  2740. tree[b"subdir"] = (stat.S_IFDIR, subtree.id)
  2741. store.add_object(tree)
  2742. # Create an index with full entries
  2743. with tempfile.TemporaryDirectory() as tmpdir:
  2744. index_path = os.path.join(tmpdir, "index")
  2745. idx = Index(index_path, read=False)
  2746. idx[b"subdir/file1.txt"] = IndexEntry(
  2747. ctime=0,
  2748. mtime=0,
  2749. dev=0,
  2750. ino=0,
  2751. mode=0o100644,
  2752. uid=0,
  2753. gid=0,
  2754. size=5,
  2755. sha=blob1.id,
  2756. extended_flags=0,
  2757. )
  2758. idx[b"subdir/file2.txt"] = IndexEntry(
  2759. ctime=0,
  2760. mtime=0,
  2761. dev=0,
  2762. ino=0,
  2763. mode=0o100644,
  2764. uid=0,
  2765. gid=0,
  2766. size=5,
  2767. sha=blob2.id,
  2768. extended_flags=0,
  2769. )
  2770. self.assertEqual(len(idx), 2)
  2771. self.assertFalse(idx.is_sparse())
  2772. # Collapse subdir to sparse
  2773. idx.convert_to_sparse(store, tree.id, {b"subdir/"})
  2774. # Should now be sparse
  2775. self.assertTrue(idx.is_sparse())
  2776. # Should have 1 entry (the sparse dir)
  2777. self.assertEqual(len(idx), 1)
  2778. self.assertIn(b"subdir/", idx)
  2779. # Entry should be a sparse directory
  2780. entry = idx[b"subdir/"]
  2781. self.assertTrue(entry.is_sparse_dir(b"subdir/"))
  2782. self.assertEqual(entry.sha, subtree.id)