index.py 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472
  1. # index.py -- File parser/writer for the git index file
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Parser for the git index file format."""
  22. import errno
  23. import os
  24. import shutil
  25. import stat
  26. import struct
  27. import sys
  28. import types
  29. from collections.abc import Generator, Iterable, Iterator
  30. from dataclasses import dataclass
  31. from enum import Enum
  32. from typing import (
  33. TYPE_CHECKING,
  34. Any,
  35. BinaryIO,
  36. Callable,
  37. Optional,
  38. Union,
  39. cast,
  40. )
  41. if TYPE_CHECKING:
  42. from .config import Config
  43. from .diff_tree import TreeChange
  44. from .file import _GitFile
  45. from .line_ending import BlobNormalizer
  46. from .repo import Repo
  47. from .file import GitFile
  48. from .object_store import iter_tree_contents
  49. from .objects import (
  50. S_IFGITLINK,
  51. S_ISGITLINK,
  52. Blob,
  53. ObjectID,
  54. Tree,
  55. hex_to_sha,
  56. sha_to_hex,
  57. )
  58. from .pack import ObjectContainer, SHA1Reader, SHA1Writer
  59. # 2-bit stage (during merge)
  60. FLAG_STAGEMASK = 0x3000
  61. FLAG_STAGESHIFT = 12
  62. FLAG_NAMEMASK = 0x0FFF
  63. # assume-valid
  64. FLAG_VALID = 0x8000
  65. # extended flag (must be zero in version 2)
  66. FLAG_EXTENDED = 0x4000
  67. # used by sparse checkout
  68. EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
  69. # used by "git add -N"
  70. EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
  71. DEFAULT_VERSION = 2
  72. # Index extension signatures
  73. TREE_EXTENSION = b"TREE"
  74. REUC_EXTENSION = b"REUC"
  75. UNTR_EXTENSION = b"UNTR"
  76. EOIE_EXTENSION = b"EOIE"
  77. IEOT_EXTENSION = b"IEOT"
  78. def _encode_varint(value: int) -> bytes:
  79. """Encode an integer using variable-width encoding.
  80. Same format as used for OFS_DELTA pack entries and index v4 path compression.
  81. Uses 7 bits per byte, with the high bit indicating continuation.
  82. Args:
  83. value: Integer to encode
  84. Returns:
  85. Encoded bytes
  86. """
  87. if value == 0:
  88. return b"\x00"
  89. result = []
  90. while value > 0:
  91. byte = value & 0x7F # Take lower 7 bits
  92. value >>= 7
  93. if value > 0:
  94. byte |= 0x80 # Set continuation bit
  95. result.append(byte)
  96. return bytes(result)
  97. def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
  98. """Decode a variable-width encoded integer.
  99. Args:
  100. data: Bytes to decode from
  101. offset: Starting offset in data
  102. Returns:
  103. tuple of (decoded_value, new_offset)
  104. """
  105. value = 0
  106. shift = 0
  107. pos = offset
  108. while pos < len(data):
  109. byte = data[pos]
  110. pos += 1
  111. value |= (byte & 0x7F) << shift
  112. shift += 7
  113. if not (byte & 0x80): # No continuation bit
  114. break
  115. return value, pos
  116. def _compress_path(path: bytes, previous_path: bytes) -> bytes:
  117. """Compress a path relative to the previous path for index version 4.
  118. Args:
  119. path: Path to compress
  120. previous_path: Previous path for comparison
  121. Returns:
  122. Compressed path data (varint prefix_len + suffix)
  123. """
  124. # Find the common prefix length
  125. common_len = 0
  126. min_len = min(len(path), len(previous_path))
  127. for i in range(min_len):
  128. if path[i] == previous_path[i]:
  129. common_len += 1
  130. else:
  131. break
  132. # The number of bytes to remove from the end of previous_path
  133. # to get the common prefix
  134. remove_len = len(previous_path) - common_len
  135. # The suffix to append
  136. suffix = path[common_len:]
  137. # Encode: varint(remove_len) + suffix + NUL
  138. return _encode_varint(remove_len) + suffix + b"\x00"
  139. def _decompress_path(
  140. data: bytes, offset: int, previous_path: bytes
  141. ) -> tuple[bytes, int]:
  142. """Decompress a path from index version 4 compressed format.
  143. Args:
  144. data: Raw data containing compressed path
  145. offset: Starting offset in data
  146. previous_path: Previous path for decompression
  147. Returns:
  148. tuple of (decompressed_path, new_offset)
  149. """
  150. # Decode the number of bytes to remove from previous path
  151. remove_len, new_offset = _decode_varint(data, offset)
  152. # Find the NUL terminator for the suffix
  153. suffix_start = new_offset
  154. suffix_end = suffix_start
  155. while suffix_end < len(data) and data[suffix_end] != 0:
  156. suffix_end += 1
  157. if suffix_end >= len(data):
  158. raise ValueError("Unterminated path suffix in compressed entry")
  159. suffix = data[suffix_start:suffix_end]
  160. new_offset = suffix_end + 1 # Skip the NUL terminator
  161. # Reconstruct the path
  162. if remove_len > len(previous_path):
  163. raise ValueError(
  164. f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
  165. )
  166. prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
  167. path = prefix + suffix
  168. return path, new_offset
  169. def _decompress_path_from_stream(
  170. f: BinaryIO, previous_path: bytes
  171. ) -> tuple[bytes, int]:
  172. """Decompress a path from index version 4 compressed format, reading from stream.
  173. Args:
  174. f: File-like object to read from
  175. previous_path: Previous path for decompression
  176. Returns:
  177. tuple of (decompressed_path, bytes_consumed)
  178. """
  179. # Decode the varint for remove_len by reading byte by byte
  180. remove_len = 0
  181. shift = 0
  182. bytes_consumed = 0
  183. while True:
  184. byte_data = f.read(1)
  185. if not byte_data:
  186. raise ValueError("Unexpected end of file while reading varint")
  187. byte = byte_data[0]
  188. bytes_consumed += 1
  189. remove_len |= (byte & 0x7F) << shift
  190. shift += 7
  191. if not (byte & 0x80): # No continuation bit
  192. break
  193. # Read the suffix until NUL terminator
  194. suffix = b""
  195. while True:
  196. byte_data = f.read(1)
  197. if not byte_data:
  198. raise ValueError("Unexpected end of file while reading path suffix")
  199. byte = byte_data[0]
  200. bytes_consumed += 1
  201. if byte == 0: # NUL terminator
  202. break
  203. suffix += bytes([byte])
  204. # Reconstruct the path
  205. if remove_len > len(previous_path):
  206. raise ValueError(
  207. f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
  208. )
  209. prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
  210. path = prefix + suffix
  211. return path, bytes_consumed
  212. class Stage(Enum):
  213. NORMAL = 0
  214. MERGE_CONFLICT_ANCESTOR = 1
  215. MERGE_CONFLICT_THIS = 2
  216. MERGE_CONFLICT_OTHER = 3
  217. @dataclass
  218. class SerializedIndexEntry:
  219. name: bytes
  220. ctime: Union[int, float, tuple[int, int]]
  221. mtime: Union[int, float, tuple[int, int]]
  222. dev: int
  223. ino: int
  224. mode: int
  225. uid: int
  226. gid: int
  227. size: int
  228. sha: bytes
  229. flags: int
  230. extended_flags: int
  231. def stage(self) -> Stage:
  232. return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
  233. @dataclass
  234. class IndexExtension:
  235. """Base class for index extensions."""
  236. signature: bytes
  237. data: bytes
  238. @classmethod
  239. def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
  240. """Create an extension from raw data.
  241. Args:
  242. signature: 4-byte extension signature
  243. data: Extension data
  244. Returns:
  245. Parsed extension object
  246. """
  247. if signature == TREE_EXTENSION:
  248. return TreeExtension.from_bytes(data)
  249. elif signature == REUC_EXTENSION:
  250. return ResolveUndoExtension.from_bytes(data)
  251. elif signature == UNTR_EXTENSION:
  252. return UntrackedExtension.from_bytes(data)
  253. else:
  254. # Unknown extension - just store raw data
  255. return cls(signature, data)
  256. def to_bytes(self) -> bytes:
  257. """Serialize extension to bytes."""
  258. return self.data
  259. class TreeExtension(IndexExtension):
  260. """Tree cache extension."""
  261. def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
  262. self.entries = entries
  263. super().__init__(TREE_EXTENSION, b"")
  264. @classmethod
  265. def from_bytes(cls, data: bytes) -> "TreeExtension":
  266. # TODO: Implement tree cache parsing
  267. return cls([])
  268. def to_bytes(self) -> bytes:
  269. # TODO: Implement tree cache serialization
  270. return b""
  271. class ResolveUndoExtension(IndexExtension):
  272. """Resolve undo extension for recording merge conflicts."""
  273. def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
  274. self.entries = entries
  275. super().__init__(REUC_EXTENSION, b"")
  276. @classmethod
  277. def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
  278. # TODO: Implement resolve undo parsing
  279. return cls([])
  280. def to_bytes(self) -> bytes:
  281. # TODO: Implement resolve undo serialization
  282. return b""
  283. class UntrackedExtension(IndexExtension):
  284. """Untracked cache extension."""
  285. def __init__(self, data: bytes) -> None:
  286. super().__init__(UNTR_EXTENSION, data)
  287. @classmethod
  288. def from_bytes(cls, data: bytes) -> "UntrackedExtension":
  289. return cls(data)
  290. @dataclass
  291. class IndexEntry:
  292. ctime: Union[int, float, tuple[int, int]]
  293. mtime: Union[int, float, tuple[int, int]]
  294. dev: int
  295. ino: int
  296. mode: int
  297. uid: int
  298. gid: int
  299. size: int
  300. sha: bytes
  301. flags: int = 0
  302. extended_flags: int = 0
  303. @classmethod
  304. def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
  305. return cls(
  306. ctime=serialized.ctime,
  307. mtime=serialized.mtime,
  308. dev=serialized.dev,
  309. ino=serialized.ino,
  310. mode=serialized.mode,
  311. uid=serialized.uid,
  312. gid=serialized.gid,
  313. size=serialized.size,
  314. sha=serialized.sha,
  315. flags=serialized.flags,
  316. extended_flags=serialized.extended_flags,
  317. )
  318. def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
  319. # Clear out any existing stage bits, then set them from the Stage.
  320. new_flags = self.flags & ~FLAG_STAGEMASK
  321. new_flags |= stage.value << FLAG_STAGESHIFT
  322. return SerializedIndexEntry(
  323. name=name,
  324. ctime=self.ctime,
  325. mtime=self.mtime,
  326. dev=self.dev,
  327. ino=self.ino,
  328. mode=self.mode,
  329. uid=self.uid,
  330. gid=self.gid,
  331. size=self.size,
  332. sha=self.sha,
  333. flags=new_flags,
  334. extended_flags=self.extended_flags,
  335. )
  336. def stage(self) -> Stage:
  337. return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
  338. @property
  339. def skip_worktree(self) -> bool:
  340. """Return True if the skip-worktree bit is set in extended_flags."""
  341. return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
  342. def set_skip_worktree(self, skip: bool = True) -> None:
  343. """Helper method to set or clear the skip-worktree bit in extended_flags.
  344. Also sets FLAG_EXTENDED in self.flags if needed.
  345. """
  346. if skip:
  347. # Turn on the skip-worktree bit
  348. self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
  349. # Also ensure the main 'extended' bit is set in flags
  350. self.flags |= FLAG_EXTENDED
  351. else:
  352. # Turn off the skip-worktree bit
  353. self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
  354. # Optionally unset the main extended bit if no extended flags remain
  355. if self.extended_flags == 0:
  356. self.flags &= ~FLAG_EXTENDED
  357. class ConflictedIndexEntry:
  358. """Index entry that represents a conflict."""
  359. ancestor: Optional[IndexEntry]
  360. this: Optional[IndexEntry]
  361. other: Optional[IndexEntry]
  362. def __init__(
  363. self,
  364. ancestor: Optional[IndexEntry] = None,
  365. this: Optional[IndexEntry] = None,
  366. other: Optional[IndexEntry] = None,
  367. ) -> None:
  368. self.ancestor = ancestor
  369. self.this = this
  370. self.other = other
  371. class UnmergedEntries(Exception):
  372. """Unmerged entries exist in the index."""
  373. def pathsplit(path: bytes) -> tuple[bytes, bytes]:
  374. """Split a /-delimited path into a directory part and a basename.
  375. Args:
  376. path: The path to split.
  377. Returns:
  378. Tuple with directory name and basename
  379. """
  380. try:
  381. (dirname, basename) = path.rsplit(b"/", 1)
  382. except ValueError:
  383. return (b"", path)
  384. else:
  385. return (dirname, basename)
  386. def pathjoin(*args: bytes) -> bytes:
  387. """Join a /-delimited path."""
  388. return b"/".join([p for p in args if p])
  389. def read_cache_time(f: BinaryIO) -> tuple[int, int]:
  390. """Read a cache time.
  391. Args:
  392. f: File-like object to read from
  393. Returns:
  394. Tuple with seconds and nanoseconds
  395. """
  396. return struct.unpack(">LL", f.read(8))
  397. def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None:
  398. """Write a cache time.
  399. Args:
  400. f: File-like object to write to
  401. t: Time to write (as int, float or tuple with secs and nsecs)
  402. """
  403. if isinstance(t, int):
  404. t = (t, 0)
  405. elif isinstance(t, float):
  406. (secs, nsecs) = divmod(t, 1.0)
  407. t = (int(secs), int(nsecs * 1000000000))
  408. elif not isinstance(t, tuple):
  409. raise TypeError(t)
  410. f.write(struct.pack(">LL", *t))
  411. def read_cache_entry(
  412. f: BinaryIO, version: int, previous_path: bytes = b""
  413. ) -> SerializedIndexEntry:
  414. """Read an entry from a cache file.
  415. Args:
  416. f: File-like object to read from
  417. version: Index version
  418. previous_path: Previous entry's path (for version 4 compression)
  419. """
  420. beginoffset = f.tell()
  421. ctime = read_cache_time(f)
  422. mtime = read_cache_time(f)
  423. (
  424. dev,
  425. ino,
  426. mode,
  427. uid,
  428. gid,
  429. size,
  430. sha,
  431. flags,
  432. ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
  433. if flags & FLAG_EXTENDED:
  434. if version < 3:
  435. raise AssertionError("extended flag set in index with version < 3")
  436. (extended_flags,) = struct.unpack(">H", f.read(2))
  437. else:
  438. extended_flags = 0
  439. if version >= 4:
  440. # Version 4: paths are always compressed (name_len should be 0)
  441. name, consumed = _decompress_path_from_stream(f, previous_path)
  442. else:
  443. # Versions < 4: regular name reading
  444. name = f.read(flags & FLAG_NAMEMASK)
  445. # Padding:
  446. if version < 4:
  447. real_size = (f.tell() - beginoffset + 8) & ~7
  448. f.read((beginoffset + real_size) - f.tell())
  449. return SerializedIndexEntry(
  450. name,
  451. ctime,
  452. mtime,
  453. dev,
  454. ino,
  455. mode,
  456. uid,
  457. gid,
  458. size,
  459. sha_to_hex(sha),
  460. flags & ~FLAG_NAMEMASK,
  461. extended_flags,
  462. )
  463. def write_cache_entry(
  464. f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
  465. ) -> None:
  466. """Write an index entry to a file.
  467. Args:
  468. f: File object
  469. entry: IndexEntry to write
  470. version: Index format version
  471. previous_path: Previous entry's path (for version 4 compression)
  472. """
  473. beginoffset = f.tell()
  474. write_cache_time(f, entry.ctime)
  475. write_cache_time(f, entry.mtime)
  476. if version >= 4:
  477. # Version 4: use compression but set name_len to actual filename length
  478. # This matches how C Git implements index v4 flags
  479. compressed_path = _compress_path(entry.name, previous_path)
  480. flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
  481. else:
  482. # Versions < 4: include actual name length
  483. flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
  484. if entry.extended_flags:
  485. flags |= FLAG_EXTENDED
  486. if flags & FLAG_EXTENDED and version is not None and version < 3:
  487. raise AssertionError("unable to use extended flags in version < 3")
  488. f.write(
  489. struct.pack(
  490. b">LLLLLL20sH",
  491. entry.dev & 0xFFFFFFFF,
  492. entry.ino & 0xFFFFFFFF,
  493. entry.mode,
  494. entry.uid,
  495. entry.gid,
  496. entry.size,
  497. hex_to_sha(entry.sha),
  498. flags,
  499. )
  500. )
  501. if flags & FLAG_EXTENDED:
  502. f.write(struct.pack(b">H", entry.extended_flags))
  503. if version >= 4:
  504. # Version 4: always write compressed path
  505. f.write(compressed_path)
  506. else:
  507. # Versions < 4: write regular path and padding
  508. f.write(entry.name)
  509. real_size = (f.tell() - beginoffset + 8) & ~7
  510. f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
  511. class UnsupportedIndexFormat(Exception):
  512. """An unsupported index format was encountered."""
  513. def __init__(self, version: int) -> None:
  514. self.index_format_version = version
  515. def read_index_header(f: BinaryIO) -> tuple[int, int]:
  516. """Read an index header from a file.
  517. Returns:
  518. tuple of (version, num_entries)
  519. """
  520. header = f.read(4)
  521. if header != b"DIRC":
  522. raise AssertionError(f"Invalid index file header: {header!r}")
  523. (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
  524. if version not in (1, 2, 3, 4):
  525. raise UnsupportedIndexFormat(version)
  526. return version, num_entries
  527. def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
  528. """Write an index extension.
  529. Args:
  530. f: File-like object to write to
  531. extension: Extension to write
  532. """
  533. data = extension.to_bytes()
  534. f.write(extension.signature)
  535. f.write(struct.pack(">I", len(data)))
  536. f.write(data)
  537. def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
  538. """Read an index file, yielding the individual entries."""
  539. version, num_entries = read_index_header(f)
  540. previous_path = b""
  541. for i in range(num_entries):
  542. entry = read_cache_entry(f, version, previous_path)
  543. previous_path = entry.name
  544. yield entry
  545. def read_index_dict_with_version(
  546. f: BinaryIO,
  547. ) -> tuple[
  548. dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
  549. ]:
  550. """Read an index file and return it as a dictionary along with the version.
  551. Returns:
  552. tuple of (entries_dict, version, extensions)
  553. """
  554. version, num_entries = read_index_header(f)
  555. ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
  556. previous_path = b""
  557. for i in range(num_entries):
  558. entry = read_cache_entry(f, version, previous_path)
  559. previous_path = entry.name
  560. stage = entry.stage()
  561. if stage == Stage.NORMAL:
  562. ret[entry.name] = IndexEntry.from_serialized(entry)
  563. else:
  564. existing = ret.setdefault(entry.name, ConflictedIndexEntry())
  565. if isinstance(existing, IndexEntry):
  566. raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
  567. if stage == Stage.MERGE_CONFLICT_ANCESTOR:
  568. existing.ancestor = IndexEntry.from_serialized(entry)
  569. elif stage == Stage.MERGE_CONFLICT_THIS:
  570. existing.this = IndexEntry.from_serialized(entry)
  571. elif stage == Stage.MERGE_CONFLICT_OTHER:
  572. existing.other = IndexEntry.from_serialized(entry)
  573. # Read extensions
  574. extensions = []
  575. while True:
  576. # Check if we're at the end (20 bytes before EOF for SHA checksum)
  577. current_pos = f.tell()
  578. f.seek(0, 2) # EOF
  579. eof_pos = f.tell()
  580. f.seek(current_pos)
  581. if current_pos >= eof_pos - 20:
  582. break
  583. # Try to read extension signature
  584. signature = f.read(4)
  585. if len(signature) < 4:
  586. break
  587. # Check if it's a valid extension signature (4 uppercase letters)
  588. if not all(65 <= b <= 90 for b in signature):
  589. # Not an extension, seek back
  590. f.seek(-4, 1)
  591. break
  592. # Read extension size
  593. size_data = f.read(4)
  594. if len(size_data) < 4:
  595. break
  596. size = struct.unpack(">I", size_data)[0]
  597. # Read extension data
  598. data = f.read(size)
  599. if len(data) < size:
  600. break
  601. extension = IndexExtension.from_raw(signature, data)
  602. extensions.append(extension)
  603. return ret, version, extensions
  604. def read_index_dict(
  605. f: BinaryIO,
  606. ) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
  607. """Read an index file and return it as a dictionary.
  608. Dict Key is tuple of path and stage number, as
  609. path alone is not unique
  610. Args:
  611. f: File object to read fromls.
  612. """
  613. ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
  614. for entry in read_index(f):
  615. stage = entry.stage()
  616. if stage == Stage.NORMAL:
  617. ret[entry.name] = IndexEntry.from_serialized(entry)
  618. else:
  619. existing = ret.setdefault(entry.name, ConflictedIndexEntry())
  620. if isinstance(existing, IndexEntry):
  621. raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
  622. if stage == Stage.MERGE_CONFLICT_ANCESTOR:
  623. existing.ancestor = IndexEntry.from_serialized(entry)
  624. elif stage == Stage.MERGE_CONFLICT_THIS:
  625. existing.this = IndexEntry.from_serialized(entry)
  626. elif stage == Stage.MERGE_CONFLICT_OTHER:
  627. existing.other = IndexEntry.from_serialized(entry)
  628. return ret
  629. def write_index(
  630. f: BinaryIO,
  631. entries: list[SerializedIndexEntry],
  632. version: Optional[int] = None,
  633. extensions: Optional[list[IndexExtension]] = None,
  634. ) -> None:
  635. """Write an index file.
  636. Args:
  637. f: File-like object to write to
  638. version: Version number to write
  639. entries: Iterable over the entries to write
  640. extensions: Optional list of extensions to write
  641. """
  642. if version is None:
  643. version = DEFAULT_VERSION
  644. # STEP 1: check if any extended_flags are set
  645. uses_extended_flags = any(e.extended_flags != 0 for e in entries)
  646. if uses_extended_flags and version < 3:
  647. # Force or bump the version to 3
  648. version = 3
  649. # The rest is unchanged, but you might insert a final check:
  650. if version < 3:
  651. # Double-check no extended flags appear
  652. for e in entries:
  653. if e.extended_flags != 0:
  654. raise AssertionError("Attempt to use extended flags in index < v3")
  655. # Proceed with the existing code to write the header and entries.
  656. f.write(b"DIRC")
  657. f.write(struct.pack(b">LL", version, len(entries)))
  658. previous_path = b""
  659. for entry in entries:
  660. write_cache_entry(f, entry, version=version, previous_path=previous_path)
  661. previous_path = entry.name
  662. # Write extensions
  663. if extensions:
  664. for extension in extensions:
  665. write_index_extension(f, extension)
  666. def write_index_dict(
  667. f: BinaryIO,
  668. entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
  669. version: Optional[int] = None,
  670. extensions: Optional[list[IndexExtension]] = None,
  671. ) -> None:
  672. """Write an index file based on the contents of a dictionary.
  673. being careful to sort by path and then by stage.
  674. """
  675. entries_list = []
  676. for key in sorted(entries):
  677. value = entries[key]
  678. if isinstance(value, ConflictedIndexEntry):
  679. if value.ancestor is not None:
  680. entries_list.append(
  681. value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
  682. )
  683. if value.this is not None:
  684. entries_list.append(
  685. value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
  686. )
  687. if value.other is not None:
  688. entries_list.append(
  689. value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
  690. )
  691. else:
  692. entries_list.append(value.serialize(key, Stage.NORMAL))
  693. write_index(f, entries_list, version=version, extensions=extensions)
  694. def cleanup_mode(mode: int) -> int:
  695. """Cleanup a mode value.
  696. This will return a mode that can be stored in a tree object.
  697. Args:
  698. mode: Mode to clean up.
  699. Returns:
  700. mode
  701. """
  702. if stat.S_ISLNK(mode):
  703. return stat.S_IFLNK
  704. elif stat.S_ISDIR(mode):
  705. return stat.S_IFDIR
  706. elif S_ISGITLINK(mode):
  707. return S_IFGITLINK
  708. ret = stat.S_IFREG | 0o644
  709. if mode & 0o100:
  710. ret |= 0o111
  711. return ret
  712. class Index:
  713. """A Git Index file."""
  714. _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
  715. def __init__(
  716. self,
  717. filename: Union[bytes, str, os.PathLike],
  718. read: bool = True,
  719. skip_hash: bool = False,
  720. version: Optional[int] = None,
  721. ) -> None:
  722. """Create an index object associated with the given filename.
  723. Args:
  724. filename: Path to the index file
  725. read: Whether to initialize the index from the given file, should it exist.
  726. skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
  727. version: Index format version to use (None = auto-detect from file or use default)
  728. """
  729. self._filename = os.fspath(filename)
  730. # TODO(jelmer): Store the version returned by read_index
  731. self._version = version
  732. self._skip_hash = skip_hash
  733. self._extensions: list[IndexExtension] = []
  734. self.clear()
  735. if read:
  736. self.read()
  737. @property
  738. def path(self) -> Union[bytes, str]:
  739. return self._filename
  740. def __repr__(self) -> str:
  741. return f"{self.__class__.__name__}({self._filename!r})"
  742. def write(self) -> None:
  743. """Write current contents of index to disk."""
  744. from typing import BinaryIO, cast
  745. f = GitFile(self._filename, "wb")
  746. try:
  747. # Filter out extensions with no meaningful data
  748. meaningful_extensions = []
  749. for ext in self._extensions:
  750. # Skip extensions that have empty data
  751. ext_data = ext.to_bytes()
  752. if ext_data:
  753. meaningful_extensions.append(ext)
  754. if self._skip_hash:
  755. # When skipHash is enabled, write the index without computing SHA1
  756. write_index_dict(
  757. cast(BinaryIO, f),
  758. self._byname,
  759. version=self._version,
  760. extensions=meaningful_extensions,
  761. )
  762. # Write 20 zero bytes instead of SHA1
  763. f.write(b"\x00" * 20)
  764. f.close()
  765. else:
  766. sha1_writer = SHA1Writer(cast(BinaryIO, f))
  767. write_index_dict(
  768. cast(BinaryIO, sha1_writer),
  769. self._byname,
  770. version=self._version,
  771. extensions=meaningful_extensions,
  772. )
  773. sha1_writer.close()
  774. except:
  775. f.close()
  776. raise
  777. def read(self) -> None:
  778. """Read current contents of index from disk."""
  779. if not os.path.exists(self._filename):
  780. return
  781. f = GitFile(self._filename, "rb")
  782. try:
  783. sha1_reader = SHA1Reader(f)
  784. entries, version, extensions = read_index_dict_with_version(
  785. cast(BinaryIO, sha1_reader)
  786. )
  787. self._version = version
  788. self._extensions = extensions
  789. self.update(entries)
  790. # Extensions have already been read by read_index_dict_with_version
  791. sha1_reader.check_sha(allow_empty=True)
  792. finally:
  793. f.close()
  794. def __len__(self) -> int:
  795. """Number of entries in this index file."""
  796. return len(self._byname)
  797. def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
  798. """Retrieve entry by relative path and stage.
  799. Returns: Either a IndexEntry or a ConflictedIndexEntry
  800. Raises KeyError: if the entry does not exist
  801. """
  802. return self._byname[key]
  803. def __iter__(self) -> Iterator[bytes]:
  804. """Iterate over the paths and stages in this index."""
  805. return iter(self._byname)
  806. def __contains__(self, key: bytes) -> bool:
  807. return key in self._byname
  808. def get_sha1(self, path: bytes) -> bytes:
  809. """Return the (git object) SHA1 for the object at a path."""
  810. value = self[path]
  811. if isinstance(value, ConflictedIndexEntry):
  812. raise UnmergedEntries
  813. return value.sha
  814. def get_mode(self, path: bytes) -> int:
  815. """Return the POSIX file mode for the object at a path."""
  816. value = self[path]
  817. if isinstance(value, ConflictedIndexEntry):
  818. raise UnmergedEntries
  819. return value.mode
  820. def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
  821. """Iterate over path, sha, mode tuples for use with commit_tree."""
  822. for path in self:
  823. entry = self[path]
  824. if isinstance(entry, ConflictedIndexEntry):
  825. raise UnmergedEntries
  826. yield path, entry.sha, cleanup_mode(entry.mode)
  827. def has_conflicts(self) -> bool:
  828. for value in self._byname.values():
  829. if isinstance(value, ConflictedIndexEntry):
  830. return True
  831. return False
  832. def clear(self) -> None:
  833. """Remove all contents from this index."""
  834. self._byname = {}
  835. def __setitem__(
  836. self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
  837. ) -> None:
  838. assert isinstance(name, bytes)
  839. self._byname[name] = value
  840. def __delitem__(self, name: bytes) -> None:
  841. del self._byname[name]
  842. def iteritems(
  843. self,
  844. ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
  845. return iter(self._byname.items())
  846. def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
  847. return iter(self._byname.items())
  848. def update(
  849. self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
  850. ) -> None:
  851. for key, value in entries.items():
  852. self[key] = value
  853. def paths(self) -> Generator[bytes, None, None]:
  854. yield from self._byname.keys()
  855. def changes_from_tree(
  856. self,
  857. object_store: ObjectContainer,
  858. tree: ObjectID,
  859. want_unchanged: bool = False,
  860. ) -> Generator[
  861. tuple[
  862. tuple[Optional[bytes], Optional[bytes]],
  863. tuple[Optional[int], Optional[int]],
  864. tuple[Optional[bytes], Optional[bytes]],
  865. ],
  866. None,
  867. None,
  868. ]:
  869. """Find the differences between the contents of this index and a tree.
  870. Args:
  871. object_store: Object store to use for retrieving tree contents
  872. tree: SHA1 of the root tree
  873. want_unchanged: Whether unchanged files should be reported
  874. Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
  875. newmode), (oldsha, newsha)
  876. """
  877. def lookup_entry(path: bytes) -> tuple[bytes, int]:
  878. entry = self[path]
  879. if hasattr(entry, "sha") and hasattr(entry, "mode"):
  880. return entry.sha, cleanup_mode(entry.mode)
  881. else:
  882. # Handle ConflictedIndexEntry case
  883. return b"", 0
  884. yield from changes_from_tree(
  885. self.paths(),
  886. lookup_entry,
  887. object_store,
  888. tree,
  889. want_unchanged=want_unchanged,
  890. )
  891. def commit(self, object_store: ObjectContainer) -> bytes:
  892. """Create a new tree from an index.
  893. Args:
  894. object_store: Object store to save the tree in
  895. Returns:
  896. Root tree SHA
  897. """
  898. return commit_tree(object_store, self.iterobjects())
  899. def commit_tree(
  900. object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
  901. ) -> bytes:
  902. """Commit a new tree.
  903. Args:
  904. object_store: Object store to add trees to
  905. blobs: Iterable over blob path, sha, mode entries
  906. Returns:
  907. SHA1 of the created tree.
  908. """
  909. trees: dict[bytes, Any] = {b"": {}}
  910. def add_tree(path: bytes) -> dict[bytes, Any]:
  911. if path in trees:
  912. return trees[path]
  913. dirname, basename = pathsplit(path)
  914. t = add_tree(dirname)
  915. assert isinstance(basename, bytes)
  916. newtree: dict[bytes, Any] = {}
  917. t[basename] = newtree
  918. trees[path] = newtree
  919. return newtree
  920. for path, sha, mode in blobs:
  921. tree_path, basename = pathsplit(path)
  922. tree = add_tree(tree_path)
  923. tree[basename] = (mode, sha)
  924. def build_tree(path: bytes) -> bytes:
  925. tree = Tree()
  926. for basename, entry in trees[path].items():
  927. if isinstance(entry, dict):
  928. mode = stat.S_IFDIR
  929. sha = build_tree(pathjoin(path, basename))
  930. else:
  931. (mode, sha) = entry
  932. tree.add(basename, mode, sha)
  933. object_store.add_object(tree)
  934. return tree.id
  935. return build_tree(b"")
  936. def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
  937. """Create a new tree from an index.
  938. Args:
  939. object_store: Object store to save the tree in
  940. index: Index file
  941. Note: This function is deprecated, use index.commit() instead.
  942. Returns: Root tree sha.
  943. """
  944. return commit_tree(object_store, index.iterobjects())
  945. def changes_from_tree(
  946. names: Iterable[bytes],
  947. lookup_entry: Callable[[bytes], tuple[bytes, int]],
  948. object_store: ObjectContainer,
  949. tree: Optional[bytes],
  950. want_unchanged: bool = False,
  951. ) -> Iterable[
  952. tuple[
  953. tuple[Optional[bytes], Optional[bytes]],
  954. tuple[Optional[int], Optional[int]],
  955. tuple[Optional[bytes], Optional[bytes]],
  956. ]
  957. ]:
  958. """Find the differences between the contents of a tree and
  959. a working copy.
  960. Args:
  961. names: Iterable of names in the working copy
  962. lookup_entry: Function to lookup an entry in the working copy
  963. object_store: Object store to use for retrieving tree contents
  964. tree: SHA1 of the root tree, or None for an empty tree
  965. want_unchanged: Whether unchanged files should be reported
  966. Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
  967. (oldsha, newsha)
  968. """
  969. # TODO(jelmer): Support a include_trees option
  970. other_names = set(names)
  971. if tree is not None:
  972. for name, mode, sha in iter_tree_contents(object_store, tree):
  973. try:
  974. (other_sha, other_mode) = lookup_entry(name)
  975. except KeyError:
  976. # Was removed
  977. yield ((name, None), (mode, None), (sha, None))
  978. else:
  979. other_names.remove(name)
  980. if want_unchanged or other_sha != sha or other_mode != mode:
  981. yield ((name, name), (mode, other_mode), (sha, other_sha))
  982. # Mention added files
  983. for name in other_names:
  984. try:
  985. (other_sha, other_mode) = lookup_entry(name)
  986. except KeyError:
  987. pass
  988. else:
  989. yield ((None, name), (None, other_mode), (None, other_sha))
  990. def index_entry_from_stat(
  991. stat_val: os.stat_result,
  992. hex_sha: bytes,
  993. mode: Optional[int] = None,
  994. ) -> IndexEntry:
  995. """Create a new index entry from a stat value.
  996. Args:
  997. stat_val: POSIX stat_result instance
  998. hex_sha: Hex sha of the object
  999. """
  1000. if mode is None:
  1001. mode = cleanup_mode(stat_val.st_mode)
  1002. return IndexEntry(
  1003. ctime=stat_val.st_ctime,
  1004. mtime=stat_val.st_mtime,
  1005. dev=stat_val.st_dev,
  1006. ino=stat_val.st_ino,
  1007. mode=mode,
  1008. uid=stat_val.st_uid,
  1009. gid=stat_val.st_gid,
  1010. size=stat_val.st_size,
  1011. sha=hex_sha,
  1012. flags=0,
  1013. extended_flags=0,
  1014. )
  1015. if sys.platform == "win32":
  1016. # On Windows, creating symlinks either requires administrator privileges
  1017. # or developer mode. Raise a more helpful error when we're unable to
  1018. # create symlinks
  1019. # https://github.com/jelmer/dulwich/issues/1005
  1020. class WindowsSymlinkPermissionError(PermissionError):
  1021. def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
  1022. super(PermissionError, self).__init__(
  1023. errno,
  1024. f"Unable to create symlink; do you have developer mode enabled? {msg}",
  1025. filename,
  1026. )
  1027. def symlink(
  1028. src: Union[str, bytes],
  1029. dst: Union[str, bytes],
  1030. target_is_directory: bool = False,
  1031. *,
  1032. dir_fd: Optional[int] = None,
  1033. ) -> None:
  1034. try:
  1035. return os.symlink(
  1036. src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
  1037. )
  1038. except PermissionError as e:
  1039. raise WindowsSymlinkPermissionError(
  1040. e.errno or 0, e.strerror or "", e.filename
  1041. ) from e
  1042. else:
  1043. symlink = os.symlink
  1044. def build_file_from_blob(
  1045. blob: Blob,
  1046. mode: int,
  1047. target_path: bytes,
  1048. *,
  1049. honor_filemode: bool = True,
  1050. tree_encoding: str = "utf-8",
  1051. symlink_fn: Optional[Callable] = None,
  1052. ) -> os.stat_result:
  1053. """Build a file or symlink on disk based on a Git object.
  1054. Args:
  1055. blob: The git object
  1056. mode: File mode
  1057. target_path: Path to write to
  1058. honor_filemode: An optional flag to honor core.filemode setting in
  1059. config file, default is core.filemode=True, change executable bit
  1060. symlink_fn: Function to use for creating symlinks
  1061. Returns: stat object for the file
  1062. """
  1063. try:
  1064. oldstat = os.lstat(target_path)
  1065. except FileNotFoundError:
  1066. oldstat = None
  1067. contents = blob.as_raw_string()
  1068. if stat.S_ISLNK(mode):
  1069. if oldstat:
  1070. _remove_file_with_readonly_handling(target_path)
  1071. if sys.platform == "win32":
  1072. # os.readlink on Python3 on Windows requires a unicode string.
  1073. contents_str = contents.decode(tree_encoding)
  1074. target_path_str = target_path.decode(tree_encoding)
  1075. (symlink_fn or symlink)(contents_str, target_path_str)
  1076. else:
  1077. (symlink_fn or symlink)(contents, target_path)
  1078. else:
  1079. if oldstat is not None and oldstat.st_size == len(contents):
  1080. with open(target_path, "rb") as f:
  1081. if f.read() == contents:
  1082. return oldstat
  1083. with open(target_path, "wb") as f:
  1084. # Write out file
  1085. f.write(contents)
  1086. if honor_filemode:
  1087. os.chmod(target_path, mode)
  1088. return os.lstat(target_path)
  1089. INVALID_DOTNAMES = (b".git", b".", b"..", b"")
  1090. def _normalize_path_element_default(element: bytes) -> bytes:
  1091. """Normalize path element for default case-insensitive comparison."""
  1092. return element.lower()
  1093. def _normalize_path_element_ntfs(element: bytes) -> bytes:
  1094. """Normalize path element for NTFS filesystem."""
  1095. return element.rstrip(b". ").lower()
  1096. def _normalize_path_element_hfs(element: bytes) -> bytes:
  1097. """Normalize path element for HFS+ filesystem."""
  1098. import unicodedata
  1099. # Decode to Unicode (let UnicodeDecodeError bubble up)
  1100. element_str = element.decode("utf-8", errors="strict")
  1101. # Remove HFS+ ignorable characters
  1102. filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
  1103. # Normalize to NFD
  1104. normalized = unicodedata.normalize("NFD", filtered)
  1105. return normalized.lower().encode("utf-8", errors="strict")
  1106. def get_path_element_normalizer(config) -> Callable[[bytes], bytes]:
  1107. """Get the appropriate path element normalization function based on config.
  1108. Args:
  1109. config: Repository configuration object
  1110. Returns:
  1111. Function that normalizes path elements for the configured filesystem
  1112. """
  1113. import os
  1114. import sys
  1115. if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
  1116. return _normalize_path_element_ntfs
  1117. elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
  1118. return _normalize_path_element_hfs
  1119. else:
  1120. return _normalize_path_element_default
  1121. def validate_path_element_default(element: bytes) -> bool:
  1122. return _normalize_path_element_default(element) not in INVALID_DOTNAMES
  1123. def validate_path_element_ntfs(element: bytes) -> bool:
  1124. normalized = _normalize_path_element_ntfs(element)
  1125. if normalized in INVALID_DOTNAMES:
  1126. return False
  1127. if normalized == b"git~1":
  1128. return False
  1129. return True
  1130. # HFS+ ignorable Unicode codepoints (from Git's utf8.c)
  1131. HFS_IGNORABLE_CHARS = {
  1132. 0x200C, # ZERO WIDTH NON-JOINER
  1133. 0x200D, # ZERO WIDTH JOINER
  1134. 0x200E, # LEFT-TO-RIGHT MARK
  1135. 0x200F, # RIGHT-TO-LEFT MARK
  1136. 0x202A, # LEFT-TO-RIGHT EMBEDDING
  1137. 0x202B, # RIGHT-TO-LEFT EMBEDDING
  1138. 0x202C, # POP DIRECTIONAL FORMATTING
  1139. 0x202D, # LEFT-TO-RIGHT OVERRIDE
  1140. 0x202E, # RIGHT-TO-LEFT OVERRIDE
  1141. 0x206A, # INHIBIT SYMMETRIC SWAPPING
  1142. 0x206B, # ACTIVATE SYMMETRIC SWAPPING
  1143. 0x206C, # INHIBIT ARABIC FORM SHAPING
  1144. 0x206D, # ACTIVATE ARABIC FORM SHAPING
  1145. 0x206E, # NATIONAL DIGIT SHAPES
  1146. 0x206F, # NOMINAL DIGIT SHAPES
  1147. 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
  1148. }
  1149. def validate_path_element_hfs(element: bytes) -> bool:
  1150. """Validate path element for HFS+ filesystem.
  1151. Equivalent to Git's is_hfs_dotgit and related checks.
  1152. Uses NFD normalization and ignores HFS+ ignorable characters.
  1153. """
  1154. try:
  1155. normalized = _normalize_path_element_hfs(element)
  1156. except UnicodeDecodeError:
  1157. # Malformed UTF-8 - be conservative and reject
  1158. return False
  1159. # Check against invalid names
  1160. if normalized in INVALID_DOTNAMES:
  1161. return False
  1162. # Also check for 8.3 short name
  1163. if normalized == b"git~1":
  1164. return False
  1165. return True
  1166. def validate_path(
  1167. path: bytes,
  1168. element_validator: Callable[[bytes], bool] = validate_path_element_default,
  1169. ) -> bool:
  1170. """Default path validator that just checks for .git/."""
  1171. parts = path.split(b"/")
  1172. for p in parts:
  1173. if not element_validator(p):
  1174. return False
  1175. else:
  1176. return True
  1177. def build_index_from_tree(
  1178. root_path: Union[str, bytes],
  1179. index_path: Union[str, bytes],
  1180. object_store: ObjectContainer,
  1181. tree_id: bytes,
  1182. honor_filemode: bool = True,
  1183. validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
  1184. symlink_fn: Optional[Callable] = None,
  1185. blob_normalizer: Optional["BlobNormalizer"] = None,
  1186. tree_encoding: str = "utf-8",
  1187. ) -> None:
  1188. """Generate and materialize index from a tree.
  1189. Args:
  1190. tree_id: Tree to materialize
  1191. root_path: Target dir for materialized index files
  1192. index_path: Target path for generated index
  1193. object_store: Non-empty object store holding tree contents
  1194. honor_filemode: An optional flag to honor core.filemode setting in
  1195. config file, default is core.filemode=True, change executable bit
  1196. validate_path_element: Function to validate path elements to check
  1197. out; default just refuses .git and .. directories.
  1198. blob_normalizer: An optional BlobNormalizer to use for converting line
  1199. endings when writing blobs to the working directory.
  1200. tree_encoding: Encoding used for tree paths (default: utf-8)
  1201. Note: existing index is wiped and contents are not merged
  1202. in a working dir. Suitable only for fresh clones.
  1203. """
  1204. index = Index(index_path, read=False)
  1205. if not isinstance(root_path, bytes):
  1206. root_path = os.fsencode(root_path)
  1207. for entry in iter_tree_contents(object_store, tree_id):
  1208. if not validate_path(entry.path, validate_path_element):
  1209. continue
  1210. full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
  1211. if not os.path.exists(os.path.dirname(full_path)):
  1212. os.makedirs(os.path.dirname(full_path))
  1213. # TODO(jelmer): Merge new index into working tree
  1214. if S_ISGITLINK(entry.mode):
  1215. if not os.path.isdir(full_path):
  1216. os.mkdir(full_path)
  1217. st = os.lstat(full_path)
  1218. # TODO(jelmer): record and return submodule paths
  1219. else:
  1220. obj = object_store[entry.sha]
  1221. assert isinstance(obj, Blob)
  1222. # Apply blob normalization for checkout if normalizer is provided
  1223. if blob_normalizer is not None:
  1224. obj = blob_normalizer.checkout_normalize(obj, entry.path)
  1225. st = build_file_from_blob(
  1226. obj,
  1227. entry.mode,
  1228. full_path,
  1229. honor_filemode=honor_filemode,
  1230. tree_encoding=tree_encoding,
  1231. symlink_fn=symlink_fn,
  1232. )
  1233. # Add file to index
  1234. if not honor_filemode or S_ISGITLINK(entry.mode):
  1235. # we can not use tuple slicing to build a new tuple,
  1236. # because on windows that will convert the times to
  1237. # longs, which causes errors further along
  1238. st_tuple = (
  1239. entry.mode,
  1240. st.st_ino,
  1241. st.st_dev,
  1242. st.st_nlink,
  1243. st.st_uid,
  1244. st.st_gid,
  1245. st.st_size,
  1246. st.st_atime,
  1247. st.st_mtime,
  1248. st.st_ctime,
  1249. )
  1250. st = st.__class__(st_tuple)
  1251. # default to a stage 0 index entry (normal)
  1252. # when reading from the filesystem
  1253. index[entry.path] = index_entry_from_stat(st, entry.sha)
  1254. index.write()
  1255. def blob_from_path_and_mode(
  1256. fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
  1257. ) -> Blob:
  1258. """Create a blob from a path and a stat object.
  1259. Args:
  1260. fs_path: Full file system path to file
  1261. mode: File mode
  1262. Returns: A `Blob` object
  1263. """
  1264. assert isinstance(fs_path, bytes)
  1265. blob = Blob()
  1266. if stat.S_ISLNK(mode):
  1267. if sys.platform == "win32":
  1268. # os.readlink on Python3 on Windows requires a unicode string.
  1269. blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
  1270. else:
  1271. blob.data = os.readlink(fs_path)
  1272. else:
  1273. with open(fs_path, "rb") as f:
  1274. blob.data = f.read()
  1275. return blob
  1276. def blob_from_path_and_stat(
  1277. fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
  1278. ) -> Blob:
  1279. """Create a blob from a path and a stat object.
  1280. Args:
  1281. fs_path: Full file system path to file
  1282. st: A stat object
  1283. Returns: A `Blob` object
  1284. """
  1285. return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
  1286. def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
  1287. """Read the head commit of a submodule.
  1288. Args:
  1289. path: path to the submodule
  1290. Returns: HEAD sha, None if not a valid head/repository
  1291. """
  1292. from .errors import NotGitRepository
  1293. from .repo import Repo
  1294. # Repo currently expects a "str", so decode if necessary.
  1295. # TODO(jelmer): Perhaps move this into Repo() ?
  1296. if not isinstance(path, str):
  1297. path = os.fsdecode(path)
  1298. try:
  1299. repo = Repo(path)
  1300. except NotGitRepository:
  1301. return None
  1302. try:
  1303. return repo.head()
  1304. except KeyError:
  1305. return None
  1306. def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
  1307. """Check if a directory has changed after getting an error.
  1308. When handling an error trying to create a blob from a path, call this
  1309. function. It will check if the path is a directory. If it's a directory
  1310. and a submodule, check the submodule head to see if it's has changed. If
  1311. not, consider the file as changed as Git tracked a file and not a
  1312. directory.
  1313. Return true if the given path should be considered as changed and False
  1314. otherwise or if the path is not a directory.
  1315. """
  1316. # This is actually a directory
  1317. if os.path.exists(os.path.join(tree_path, b".git")):
  1318. # Submodule
  1319. head = read_submodule_head(tree_path)
  1320. if entry.sha != head:
  1321. return True
  1322. else:
  1323. # The file was changed to a directory, so consider it removed.
  1324. return True
  1325. return False
  1326. os_sep_bytes = os.sep.encode("ascii")
  1327. def _ensure_parent_dir_exists(full_path: bytes) -> None:
  1328. """Ensure parent directory exists, checking no parent is a file."""
  1329. parent_dir = os.path.dirname(full_path)
  1330. if parent_dir and not os.path.exists(parent_dir):
  1331. # Check if any parent in the path is a file
  1332. parts = parent_dir.split(os_sep_bytes)
  1333. for i in range(len(parts)):
  1334. partial_path = os_sep_bytes.join(parts[: i + 1])
  1335. if (
  1336. partial_path
  1337. and os.path.exists(partial_path)
  1338. and not os.path.isdir(partial_path)
  1339. ):
  1340. # Parent path is a file, this is an error
  1341. raise OSError(
  1342. f"Cannot create directory, parent path is a file: {partial_path!r}"
  1343. )
  1344. os.makedirs(parent_dir)
  1345. def _remove_file_with_readonly_handling(path: bytes) -> None:
  1346. """Remove a file, handling read-only files on Windows.
  1347. Args:
  1348. path: Path to the file to remove
  1349. """
  1350. try:
  1351. os.unlink(path)
  1352. except PermissionError:
  1353. # On Windows, remove read-only attribute and retry
  1354. if sys.platform == "win32":
  1355. os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
  1356. os.unlink(path)
  1357. else:
  1358. raise
  1359. def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
  1360. """Remove empty parent directories up to stop_at."""
  1361. parent = os.path.dirname(path)
  1362. while parent and parent != stop_at:
  1363. try:
  1364. os.rmdir(parent)
  1365. parent = os.path.dirname(parent)
  1366. except FileNotFoundError:
  1367. # Directory doesn't exist - stop trying
  1368. break
  1369. except OSError as e:
  1370. if e.errno == errno.ENOTEMPTY:
  1371. # Directory not empty - stop trying
  1372. break
  1373. raise
  1374. def _check_symlink_matches(
  1375. full_path: bytes, repo_object_store, entry_sha: bytes
  1376. ) -> bool:
  1377. """Check if symlink target matches expected target.
  1378. Returns True if symlink matches, False if it doesn't match.
  1379. """
  1380. try:
  1381. current_target = os.readlink(full_path)
  1382. blob_obj = repo_object_store[entry_sha]
  1383. expected_target = blob_obj.as_raw_string()
  1384. if isinstance(current_target, str):
  1385. current_target = current_target.encode()
  1386. return current_target == expected_target
  1387. except FileNotFoundError:
  1388. # Symlink doesn't exist
  1389. return False
  1390. except OSError as e:
  1391. if e.errno == errno.EINVAL:
  1392. # Not a symlink
  1393. return False
  1394. raise
  1395. def _check_file_matches(
  1396. repo_object_store,
  1397. full_path: bytes,
  1398. entry_sha: bytes,
  1399. entry_mode: int,
  1400. current_stat: os.stat_result,
  1401. honor_filemode: bool,
  1402. blob_normalizer: Optional["BlobNormalizer"] = None,
  1403. tree_path: Optional[bytes] = None,
  1404. ) -> bool:
  1405. """Check if a file on disk matches the expected git object.
  1406. Returns True if file matches, False if it doesn't match.
  1407. """
  1408. # Check mode first (if honor_filemode is True)
  1409. if honor_filemode:
  1410. current_mode = stat.S_IMODE(current_stat.st_mode)
  1411. expected_mode = stat.S_IMODE(entry_mode)
  1412. # For regular files, only check the user executable bit, not group/other permissions
  1413. # This matches Git's behavior where umask differences don't count as modifications
  1414. if stat.S_ISREG(current_stat.st_mode):
  1415. # Normalize regular file modes to ignore group/other write permissions
  1416. current_mode_normalized = (
  1417. current_mode & 0o755
  1418. ) # Keep only user rwx and all read+execute
  1419. expected_mode_normalized = expected_mode & 0o755
  1420. # For Git compatibility, regular files should be either 644 or 755
  1421. if expected_mode_normalized not in (0o644, 0o755):
  1422. expected_mode_normalized = 0o644 # Default for regular files
  1423. if current_mode_normalized not in (0o644, 0o755):
  1424. # Determine if it should be executable based on user execute bit
  1425. if current_mode & 0o100: # User execute bit is set
  1426. current_mode_normalized = 0o755
  1427. else:
  1428. current_mode_normalized = 0o644
  1429. if current_mode_normalized != expected_mode_normalized:
  1430. return False
  1431. else:
  1432. # For non-regular files (symlinks, etc.), check mode exactly
  1433. if current_mode != expected_mode:
  1434. return False
  1435. # If mode matches (or we don't care), check content via size first
  1436. blob_obj = repo_object_store[entry_sha]
  1437. if current_stat.st_size != blob_obj.raw_length():
  1438. return False
  1439. # Size matches, check actual content
  1440. try:
  1441. with open(full_path, "rb") as f:
  1442. current_content = f.read()
  1443. expected_content = blob_obj.as_raw_string()
  1444. if blob_normalizer and tree_path is not None:
  1445. normalized_blob = blob_normalizer.checkout_normalize(
  1446. blob_obj, tree_path
  1447. )
  1448. expected_content = normalized_blob.as_raw_string()
  1449. return current_content == expected_content
  1450. except (FileNotFoundError, PermissionError, IsADirectoryError):
  1451. return False
  1452. def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
  1453. """Transition any type to submodule."""
  1454. from .submodule import ensure_submodule_placeholder
  1455. if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
  1456. # Already a directory, just ensure .git file exists
  1457. ensure_submodule_placeholder(repo, path)
  1458. else:
  1459. # Remove whatever is there and create submodule
  1460. if current_stat is not None:
  1461. _remove_file_with_readonly_handling(full_path)
  1462. ensure_submodule_placeholder(repo, path)
  1463. st = os.lstat(full_path)
  1464. index[path] = index_entry_from_stat(st, entry.sha)
  1465. def _transition_to_file(
  1466. object_store,
  1467. path,
  1468. full_path,
  1469. current_stat,
  1470. entry,
  1471. index,
  1472. honor_filemode,
  1473. symlink_fn,
  1474. blob_normalizer,
  1475. tree_encoding="utf-8",
  1476. ):
  1477. """Transition any type to regular file or symlink."""
  1478. # Check if we need to update
  1479. if (
  1480. current_stat is not None
  1481. and stat.S_ISREG(current_stat.st_mode)
  1482. and not stat.S_ISLNK(entry.mode)
  1483. ):
  1484. # File to file - check if update needed
  1485. file_matches = _check_file_matches(
  1486. object_store,
  1487. full_path,
  1488. entry.sha,
  1489. entry.mode,
  1490. current_stat,
  1491. honor_filemode,
  1492. blob_normalizer,
  1493. path,
  1494. )
  1495. needs_update = not file_matches
  1496. elif (
  1497. current_stat is not None
  1498. and stat.S_ISLNK(current_stat.st_mode)
  1499. and stat.S_ISLNK(entry.mode)
  1500. ):
  1501. # Symlink to symlink - check if update needed
  1502. symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
  1503. needs_update = not symlink_matches
  1504. else:
  1505. needs_update = True
  1506. if not needs_update:
  1507. # Just update index - current_stat should always be valid here since we're not updating
  1508. index[path] = index_entry_from_stat(current_stat, entry.sha)
  1509. return
  1510. # Remove existing entry if needed
  1511. if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
  1512. # Remove directory
  1513. dir_contents = set(os.listdir(full_path))
  1514. git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
  1515. if git_file_name in dir_contents:
  1516. if dir_contents != {git_file_name}:
  1517. raise IsADirectoryError(
  1518. f"Cannot replace submodule with untracked files: {full_path!r}"
  1519. )
  1520. shutil.rmtree(full_path)
  1521. else:
  1522. try:
  1523. os.rmdir(full_path)
  1524. except OSError as e:
  1525. if e.errno == errno.ENOTEMPTY:
  1526. raise IsADirectoryError(
  1527. f"Cannot replace non-empty directory with file: {full_path!r}"
  1528. )
  1529. raise
  1530. elif current_stat is not None:
  1531. _remove_file_with_readonly_handling(full_path)
  1532. # Ensure parent directory exists
  1533. _ensure_parent_dir_exists(full_path)
  1534. # Write the file
  1535. blob_obj = object_store[entry.sha]
  1536. assert isinstance(blob_obj, Blob)
  1537. if blob_normalizer:
  1538. blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
  1539. st = build_file_from_blob(
  1540. blob_obj,
  1541. entry.mode,
  1542. full_path,
  1543. honor_filemode=honor_filemode,
  1544. tree_encoding=tree_encoding,
  1545. symlink_fn=symlink_fn,
  1546. )
  1547. index[path] = index_entry_from_stat(st, entry.sha)
  1548. def _transition_to_absent(repo, path, full_path, current_stat, index):
  1549. """Remove any type of entry."""
  1550. if current_stat is None:
  1551. return
  1552. if stat.S_ISDIR(current_stat.st_mode):
  1553. # Check if it's a submodule directory
  1554. dir_contents = set(os.listdir(full_path))
  1555. git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
  1556. if git_file_name in dir_contents and dir_contents == {git_file_name}:
  1557. shutil.rmtree(full_path)
  1558. else:
  1559. try:
  1560. os.rmdir(full_path)
  1561. except OSError as e:
  1562. if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
  1563. raise
  1564. else:
  1565. _remove_file_with_readonly_handling(full_path)
  1566. try:
  1567. del index[path]
  1568. except KeyError:
  1569. pass
  1570. # Try to remove empty parent directories
  1571. _remove_empty_parents(
  1572. full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
  1573. )
  1574. def detect_case_only_renames(
  1575. changes: list["TreeChange"],
  1576. config: "Config",
  1577. ) -> list["TreeChange"]:
  1578. """Detect and transform case-only renames in a list of tree changes.
  1579. This function identifies file renames that only differ in case (e.g.,
  1580. README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
  1581. CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
  1582. based on the repository configuration.
  1583. Args:
  1584. changes: List of TreeChange objects representing file changes
  1585. config: Repository configuration object
  1586. Returns:
  1587. New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
  1588. """
  1589. from .diff_tree import (
  1590. CHANGE_ADD,
  1591. CHANGE_COPY,
  1592. CHANGE_DELETE,
  1593. CHANGE_MODIFY,
  1594. CHANGE_RENAME,
  1595. TreeChange,
  1596. )
  1597. # Build dictionaries of old and new paths with their normalized forms
  1598. old_paths_normalized = {}
  1599. new_paths_normalized = {}
  1600. old_changes = {} # Map from old path to change object
  1601. new_changes = {} # Map from new path to change object
  1602. # Get the appropriate normalizer based on config
  1603. normalize_func = get_path_element_normalizer(config)
  1604. def normalize_path(path: bytes) -> bytes:
  1605. """Normalize entire path using element normalization."""
  1606. return b"/".join(normalize_func(part) for part in path.split(b"/"))
  1607. # Pre-normalize all paths once to avoid repeated normalization
  1608. for change in changes:
  1609. if change.type == CHANGE_DELETE and change.old:
  1610. try:
  1611. normalized = normalize_path(change.old.path)
  1612. except UnicodeDecodeError:
  1613. import logging
  1614. logging.warning(
  1615. "Skipping case-only rename detection for path with invalid UTF-8: %r",
  1616. change.old.path,
  1617. )
  1618. else:
  1619. old_paths_normalized[normalized] = change.old.path
  1620. old_changes[change.old.path] = change
  1621. elif change.type == CHANGE_RENAME and change.old:
  1622. # Treat RENAME as DELETE + ADD for case-only detection
  1623. try:
  1624. normalized = normalize_path(change.old.path)
  1625. except UnicodeDecodeError:
  1626. import logging
  1627. logging.warning(
  1628. "Skipping case-only rename detection for path with invalid UTF-8: %r",
  1629. change.old.path,
  1630. )
  1631. else:
  1632. old_paths_normalized[normalized] = change.old.path
  1633. old_changes[change.old.path] = change
  1634. if (
  1635. change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
  1636. and change.new
  1637. ):
  1638. try:
  1639. normalized = normalize_path(change.new.path)
  1640. except UnicodeDecodeError:
  1641. import logging
  1642. logging.warning(
  1643. "Skipping case-only rename detection for path with invalid UTF-8: %r",
  1644. change.new.path,
  1645. )
  1646. else:
  1647. new_paths_normalized[normalized] = change.new.path
  1648. new_changes[change.new.path] = change
  1649. # Find case-only renames and transform changes
  1650. case_only_renames = set()
  1651. new_rename_changes = []
  1652. for norm_path, old_path in old_paths_normalized.items():
  1653. if norm_path in new_paths_normalized:
  1654. new_path = new_paths_normalized[norm_path]
  1655. if old_path != new_path:
  1656. # Found a case-only rename
  1657. old_change = old_changes[old_path]
  1658. new_change = new_changes[new_path]
  1659. # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
  1660. if new_change.type == CHANGE_ADD:
  1661. # Simple case: DELETE + ADD becomes RENAME
  1662. rename_change = TreeChange(
  1663. CHANGE_RENAME, old_change.old, new_change.new
  1664. )
  1665. else:
  1666. # Complex case: DELETE + MODIFY becomes RENAME
  1667. # Use the old file from DELETE and new file from MODIFY
  1668. rename_change = TreeChange(
  1669. CHANGE_RENAME, old_change.old, new_change.new
  1670. )
  1671. new_rename_changes.append(rename_change)
  1672. # Mark the old changes for removal
  1673. case_only_renames.add(old_change)
  1674. case_only_renames.add(new_change)
  1675. # Return new list with original ADD/DELETE changes replaced by renames
  1676. result = [change for change in changes if change not in case_only_renames]
  1677. result.extend(new_rename_changes)
  1678. return result
  1679. def update_working_tree(
  1680. repo: "Repo",
  1681. old_tree_id: Optional[bytes],
  1682. new_tree_id: bytes,
  1683. change_iterator: Iterator["TreeChange"],
  1684. honor_filemode: bool = True,
  1685. validate_path_element: Optional[Callable[[bytes], bool]] = None,
  1686. symlink_fn: Optional[Callable] = None,
  1687. force_remove_untracked: bool = False,
  1688. blob_normalizer: Optional["BlobNormalizer"] = None,
  1689. tree_encoding: str = "utf-8",
  1690. allow_overwrite_modified: bool = False,
  1691. ) -> None:
  1692. """Update the working tree and index to match a new tree.
  1693. This function handles:
  1694. - Adding new files
  1695. - Updating modified files
  1696. - Removing deleted files
  1697. - Cleaning up empty directories
  1698. Args:
  1699. repo: Repository object
  1700. old_tree_id: SHA of the tree before the update
  1701. new_tree_id: SHA of the tree to update to
  1702. change_iterator: Iterator of TreeChange objects to apply
  1703. honor_filemode: An optional flag to honor core.filemode setting
  1704. validate_path_element: Function to validate path elements to check out
  1705. symlink_fn: Function to use for creating symlinks
  1706. force_remove_untracked: If True, remove files that exist in working
  1707. directory but not in target tree, even if old_tree_id is None
  1708. blob_normalizer: An optional BlobNormalizer to use for converting line
  1709. endings when writing blobs to the working directory.
  1710. tree_encoding: Encoding used for tree paths (default: utf-8)
  1711. allow_overwrite_modified: If False, raise an error when attempting to
  1712. overwrite files that have been modified compared to old_tree_id
  1713. """
  1714. if validate_path_element is None:
  1715. validate_path_element = validate_path_element_default
  1716. from .diff_tree import (
  1717. CHANGE_ADD,
  1718. CHANGE_COPY,
  1719. CHANGE_DELETE,
  1720. CHANGE_MODIFY,
  1721. CHANGE_RENAME,
  1722. CHANGE_UNCHANGED,
  1723. )
  1724. repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
  1725. index = repo.open_index()
  1726. # Convert iterator to list since we need multiple passes
  1727. changes = list(change_iterator)
  1728. # Transform case-only renames on case-insensitive filesystems
  1729. import platform
  1730. default_ignore_case = platform.system() in ("Windows", "Darwin")
  1731. config = repo.get_config()
  1732. ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
  1733. if ignore_case:
  1734. config = repo.get_config()
  1735. changes = detect_case_only_renames(changes, config)
  1736. # Check for path conflicts where files need to become directories
  1737. paths_becoming_dirs = set()
  1738. for change in changes:
  1739. if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
  1740. path = change.new.path
  1741. if b"/" in path: # This is a file inside a directory
  1742. # Check if any parent path exists as a file in the old tree or changes
  1743. parts = path.split(b"/")
  1744. for i in range(1, len(parts)):
  1745. parent = b"/".join(parts[:i])
  1746. # See if this parent path is being deleted (was a file, becoming a dir)
  1747. for other_change in changes:
  1748. if (
  1749. other_change.type == CHANGE_DELETE
  1750. and other_change.old
  1751. and other_change.old.path == parent
  1752. ):
  1753. paths_becoming_dirs.add(parent)
  1754. # Check if any path that needs to become a directory has been modified
  1755. for path in paths_becoming_dirs:
  1756. full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
  1757. try:
  1758. current_stat = os.lstat(full_path)
  1759. except FileNotFoundError:
  1760. continue # File doesn't exist, nothing to check
  1761. except OSError as e:
  1762. raise OSError(
  1763. f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
  1764. ) from e
  1765. if stat.S_ISREG(current_stat.st_mode):
  1766. # Find the old entry for this path
  1767. old_change = None
  1768. for change in changes:
  1769. if (
  1770. change.type == CHANGE_DELETE
  1771. and change.old
  1772. and change.old.path == path
  1773. ):
  1774. old_change = change
  1775. break
  1776. if old_change:
  1777. # Check if file has been modified
  1778. file_matches = _check_file_matches(
  1779. repo.object_store,
  1780. full_path,
  1781. old_change.old.sha,
  1782. old_change.old.mode,
  1783. current_stat,
  1784. honor_filemode,
  1785. blob_normalizer,
  1786. path,
  1787. )
  1788. if not file_matches:
  1789. raise OSError(
  1790. f"Cannot replace modified file with directory: {path!r}"
  1791. )
  1792. # Check for uncommitted modifications before making any changes
  1793. if not allow_overwrite_modified and old_tree_id:
  1794. for change in changes:
  1795. # Only check files that are being modified or deleted
  1796. if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
  1797. path = change.old.path
  1798. if path.startswith(b".git") or not validate_path(
  1799. path, validate_path_element
  1800. ):
  1801. continue
  1802. full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
  1803. try:
  1804. current_stat = os.lstat(full_path)
  1805. except FileNotFoundError:
  1806. continue # File doesn't exist, nothing to check
  1807. except OSError as e:
  1808. raise OSError(
  1809. f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
  1810. ) from e
  1811. if stat.S_ISREG(current_stat.st_mode):
  1812. # Check if working tree file differs from old tree
  1813. file_matches = _check_file_matches(
  1814. repo.object_store,
  1815. full_path,
  1816. change.old.sha,
  1817. change.old.mode,
  1818. current_stat,
  1819. honor_filemode,
  1820. blob_normalizer,
  1821. path,
  1822. )
  1823. if not file_matches:
  1824. from .errors import WorkingTreeModifiedError
  1825. raise WorkingTreeModifiedError(
  1826. f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
  1827. f"would be overwritten by checkout. "
  1828. f"Please commit your changes or stash them before you switch branches."
  1829. )
  1830. # Apply the changes
  1831. for change in changes:
  1832. if change.type in (CHANGE_DELETE, CHANGE_RENAME):
  1833. # Remove file/directory
  1834. path = change.old.path
  1835. if path.startswith(b".git") or not validate_path(
  1836. path, validate_path_element
  1837. ):
  1838. continue
  1839. full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
  1840. try:
  1841. delete_stat: Optional[os.stat_result] = os.lstat(full_path)
  1842. except FileNotFoundError:
  1843. delete_stat = None
  1844. except OSError as e:
  1845. raise OSError(
  1846. f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
  1847. ) from e
  1848. _transition_to_absent(repo, path, full_path, delete_stat, index)
  1849. if change.type in (
  1850. CHANGE_ADD,
  1851. CHANGE_MODIFY,
  1852. CHANGE_UNCHANGED,
  1853. CHANGE_COPY,
  1854. CHANGE_RENAME,
  1855. ):
  1856. # Add or modify file
  1857. path = change.new.path
  1858. if path.startswith(b".git") or not validate_path(
  1859. path, validate_path_element
  1860. ):
  1861. continue
  1862. full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
  1863. try:
  1864. modify_stat: Optional[os.stat_result] = os.lstat(full_path)
  1865. except FileNotFoundError:
  1866. modify_stat = None
  1867. except OSError as e:
  1868. raise OSError(
  1869. f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
  1870. ) from e
  1871. if S_ISGITLINK(change.new.mode):
  1872. _transition_to_submodule(
  1873. repo, path, full_path, modify_stat, change.new, index
  1874. )
  1875. else:
  1876. _transition_to_file(
  1877. repo.object_store,
  1878. path,
  1879. full_path,
  1880. modify_stat,
  1881. change.new,
  1882. index,
  1883. honor_filemode,
  1884. symlink_fn,
  1885. blob_normalizer,
  1886. tree_encoding,
  1887. )
  1888. index.write()
  1889. def get_unstaged_changes(
  1890. index: Index,
  1891. root_path: Union[str, bytes],
  1892. filter_blob_callback: Optional[Callable] = None,
  1893. ) -> Generator[bytes, None, None]:
  1894. """Walk through an index and check for differences against working tree.
  1895. Args:
  1896. index: index to check
  1897. root_path: path in which to find files
  1898. Returns: iterator over paths with unstaged changes
  1899. """
  1900. # For each entry in the index check the sha1 & ensure not staged
  1901. if not isinstance(root_path, bytes):
  1902. root_path = os.fsencode(root_path)
  1903. for tree_path, entry in index.iteritems():
  1904. full_path = _tree_to_fs_path(root_path, tree_path)
  1905. if isinstance(entry, ConflictedIndexEntry):
  1906. # Conflicted files are always unstaged
  1907. yield tree_path
  1908. continue
  1909. try:
  1910. st = os.lstat(full_path)
  1911. if stat.S_ISDIR(st.st_mode):
  1912. if _has_directory_changed(tree_path, entry):
  1913. yield tree_path
  1914. continue
  1915. if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
  1916. continue
  1917. blob = blob_from_path_and_stat(full_path, st)
  1918. if filter_blob_callback is not None:
  1919. blob = filter_blob_callback(blob, tree_path)
  1920. except FileNotFoundError:
  1921. # The file was removed, so we assume that counts as
  1922. # different from whatever file used to exist.
  1923. yield tree_path
  1924. else:
  1925. if blob.id != entry.sha:
  1926. yield tree_path
  1927. def _tree_to_fs_path(
  1928. root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
  1929. ) -> bytes:
  1930. """Convert a git tree path to a file system path.
  1931. Args:
  1932. root_path: Root filesystem path
  1933. tree_path: Git tree path as bytes (encoded with tree_encoding)
  1934. tree_encoding: Encoding used for tree paths (default: utf-8)
  1935. Returns: File system path.
  1936. """
  1937. assert isinstance(tree_path, bytes)
  1938. if os_sep_bytes != b"/":
  1939. sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
  1940. else:
  1941. sep_corrected_path = tree_path
  1942. # On Windows, we need to handle tree path encoding properly
  1943. if sys.platform == "win32":
  1944. # Decode from tree encoding, then re-encode for filesystem
  1945. try:
  1946. tree_path_str = sep_corrected_path.decode(tree_encoding)
  1947. sep_corrected_path = os.fsencode(tree_path_str)
  1948. except UnicodeDecodeError:
  1949. # If decoding fails, use the original bytes
  1950. pass
  1951. return os.path.join(root_path, sep_corrected_path)
  1952. def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes:
  1953. """Convert a file system path to a git tree path.
  1954. Args:
  1955. fs_path: File system path.
  1956. tree_encoding: Encoding to use for tree paths (default: utf-8)
  1957. Returns: Git tree path as bytes (encoded with tree_encoding)
  1958. """
  1959. if not isinstance(fs_path, bytes):
  1960. fs_path_bytes = os.fsencode(fs_path)
  1961. else:
  1962. fs_path_bytes = fs_path
  1963. # On Windows, we need to ensure tree paths are properly encoded
  1964. if sys.platform == "win32":
  1965. try:
  1966. # Decode from filesystem encoding, then re-encode with tree encoding
  1967. fs_path_str = os.fsdecode(fs_path_bytes)
  1968. fs_path_bytes = fs_path_str.encode(tree_encoding)
  1969. except UnicodeDecodeError:
  1970. # If filesystem decoding fails, use the original bytes
  1971. pass
  1972. if os_sep_bytes != b"/":
  1973. tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
  1974. else:
  1975. tree_path = fs_path_bytes
  1976. return tree_path
  1977. def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
  1978. if os.path.exists(os.path.join(path, b".git")):
  1979. head = read_submodule_head(path)
  1980. if head is None:
  1981. return None
  1982. return index_entry_from_stat(st, head, mode=S_IFGITLINK)
  1983. return None
  1984. def index_entry_from_path(
  1985. path: bytes, object_store: Optional[ObjectContainer] = None
  1986. ) -> Optional[IndexEntry]:
  1987. """Create an index from a filesystem path.
  1988. This returns an index value for files, symlinks
  1989. and tree references. for directories and
  1990. non-existent files it returns None
  1991. Args:
  1992. path: Path to create an index entry for
  1993. object_store: Optional object store to
  1994. save new blobs in
  1995. Returns: An index entry; None for directories
  1996. """
  1997. assert isinstance(path, bytes)
  1998. st = os.lstat(path)
  1999. if stat.S_ISDIR(st.st_mode):
  2000. return index_entry_from_directory(st, path)
  2001. if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
  2002. blob = blob_from_path_and_stat(path, st)
  2003. if object_store is not None:
  2004. object_store.add_object(blob)
  2005. return index_entry_from_stat(st, blob.id)
  2006. return None
  2007. def iter_fresh_entries(
  2008. paths: Iterable[bytes],
  2009. root_path: bytes,
  2010. object_store: Optional[ObjectContainer] = None,
  2011. ) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
  2012. """Iterate over current versions of index entries on disk.
  2013. Args:
  2014. paths: Paths to iterate over
  2015. root_path: Root path to access from
  2016. object_store: Optional store to save new blobs in
  2017. Returns: Iterator over path, index_entry
  2018. """
  2019. for path in paths:
  2020. p = _tree_to_fs_path(root_path, path)
  2021. try:
  2022. entry = index_entry_from_path(p, object_store=object_store)
  2023. except (FileNotFoundError, IsADirectoryError):
  2024. entry = None
  2025. yield path, entry
  2026. def iter_fresh_objects(
  2027. paths: Iterable[bytes],
  2028. root_path: bytes,
  2029. include_deleted: bool = False,
  2030. object_store: Optional[ObjectContainer] = None,
  2031. ) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
  2032. """Iterate over versions of objects on disk referenced by index.
  2033. Args:
  2034. root_path: Root path to access from
  2035. include_deleted: Include deleted entries with sha and
  2036. mode set to None
  2037. object_store: Optional object store to report new items to
  2038. Returns: Iterator over path, sha, mode
  2039. """
  2040. for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
  2041. if entry is None:
  2042. if include_deleted:
  2043. yield path, None, None
  2044. else:
  2045. yield path, entry.sha, cleanup_mode(entry.mode)
  2046. def refresh_index(index: Index, root_path: bytes) -> None:
  2047. """Refresh the contents of an index.
  2048. This is the equivalent to running 'git commit -a'.
  2049. Args:
  2050. index: Index to update
  2051. root_path: Root filesystem path
  2052. """
  2053. for path, entry in iter_fresh_entries(index, root_path):
  2054. if entry:
  2055. index[path] = entry
  2056. class locked_index:
  2057. """Lock the index while making modifications.
  2058. Works as a context manager.
  2059. """
  2060. _file: "_GitFile"
  2061. def __init__(self, path: Union[bytes, str]) -> None:
  2062. self._path = path
  2063. def __enter__(self) -> Index:
  2064. self._file = GitFile(self._path, "wb")
  2065. self._index = Index(self._path)
  2066. return self._index
  2067. def __exit__(
  2068. self,
  2069. exc_type: Optional[type],
  2070. exc_value: Optional[BaseException],
  2071. traceback: Optional[types.TracebackType],
  2072. ) -> None:
  2073. if exc_type is not None:
  2074. self._file.abort()
  2075. return
  2076. try:
  2077. from typing import BinaryIO, cast
  2078. f = SHA1Writer(cast(BinaryIO, self._file))
  2079. write_index_dict(cast(BinaryIO, f), self._index._byname)
  2080. except BaseException:
  2081. self._file.abort()
  2082. else:
  2083. f.close()