index.py 65 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147
  1. # index.py -- File parser/writer for the git index file
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Parser for the git index file format."""
  22. import errno
  23. import os
  24. import shutil
  25. import stat
  26. import struct
  27. import sys
  28. import types
  29. from collections.abc import Generator, Iterable, Iterator
  30. from dataclasses import dataclass
  31. from enum import Enum
  32. from typing import (
  33. TYPE_CHECKING,
  34. Any,
  35. BinaryIO,
  36. Callable,
  37. Optional,
  38. Union,
  39. cast,
  40. )
  41. if TYPE_CHECKING:
  42. from .file import _GitFile
  43. from .line_ending import BlobNormalizer
  44. from .repo import Repo
  45. from .file import GitFile
  46. from .object_store import iter_tree_contents
  47. from .objects import (
  48. S_IFGITLINK,
  49. S_ISGITLINK,
  50. Blob,
  51. ObjectID,
  52. Tree,
  53. hex_to_sha,
  54. sha_to_hex,
  55. )
  56. from .pack import ObjectContainer, SHA1Reader, SHA1Writer
  57. # 2-bit stage (during merge)
  58. FLAG_STAGEMASK = 0x3000
  59. FLAG_STAGESHIFT = 12
  60. FLAG_NAMEMASK = 0x0FFF
  61. # assume-valid
  62. FLAG_VALID = 0x8000
  63. # extended flag (must be zero in version 2)
  64. FLAG_EXTENDED = 0x4000
  65. # used by sparse checkout
  66. EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
  67. # used by "git add -N"
  68. EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
  69. DEFAULT_VERSION = 2
  70. # Index extension signatures
  71. TREE_EXTENSION = b"TREE"
  72. REUC_EXTENSION = b"REUC"
  73. UNTR_EXTENSION = b"UNTR"
  74. EOIE_EXTENSION = b"EOIE"
  75. IEOT_EXTENSION = b"IEOT"
  76. def _encode_varint(value: int) -> bytes:
  77. """Encode an integer using variable-width encoding.
  78. Same format as used for OFS_DELTA pack entries and index v4 path compression.
  79. Uses 7 bits per byte, with the high bit indicating continuation.
  80. Args:
  81. value: Integer to encode
  82. Returns:
  83. Encoded bytes
  84. """
  85. if value == 0:
  86. return b"\x00"
  87. result = []
  88. while value > 0:
  89. byte = value & 0x7F # Take lower 7 bits
  90. value >>= 7
  91. if value > 0:
  92. byte |= 0x80 # Set continuation bit
  93. result.append(byte)
  94. return bytes(result)
  95. def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
  96. """Decode a variable-width encoded integer.
  97. Args:
  98. data: Bytes to decode from
  99. offset: Starting offset in data
  100. Returns:
  101. tuple of (decoded_value, new_offset)
  102. """
  103. value = 0
  104. shift = 0
  105. pos = offset
  106. while pos < len(data):
  107. byte = data[pos]
  108. pos += 1
  109. value |= (byte & 0x7F) << shift
  110. shift += 7
  111. if not (byte & 0x80): # No continuation bit
  112. break
  113. return value, pos
  114. def _compress_path(path: bytes, previous_path: bytes) -> bytes:
  115. """Compress a path relative to the previous path for index version 4.
  116. Args:
  117. path: Path to compress
  118. previous_path: Previous path for comparison
  119. Returns:
  120. Compressed path data (varint prefix_len + suffix)
  121. """
  122. # Find the common prefix length
  123. common_len = 0
  124. min_len = min(len(path), len(previous_path))
  125. for i in range(min_len):
  126. if path[i] == previous_path[i]:
  127. common_len += 1
  128. else:
  129. break
  130. # The number of bytes to remove from the end of previous_path
  131. # to get the common prefix
  132. remove_len = len(previous_path) - common_len
  133. # The suffix to append
  134. suffix = path[common_len:]
  135. # Encode: varint(remove_len) + suffix + NUL
  136. return _encode_varint(remove_len) + suffix + b"\x00"
  137. def _decompress_path(
  138. data: bytes, offset: int, previous_path: bytes
  139. ) -> tuple[bytes, int]:
  140. """Decompress a path from index version 4 compressed format.
  141. Args:
  142. data: Raw data containing compressed path
  143. offset: Starting offset in data
  144. previous_path: Previous path for decompression
  145. Returns:
  146. tuple of (decompressed_path, new_offset)
  147. """
  148. # Decode the number of bytes to remove from previous path
  149. remove_len, new_offset = _decode_varint(data, offset)
  150. # Find the NUL terminator for the suffix
  151. suffix_start = new_offset
  152. suffix_end = suffix_start
  153. while suffix_end < len(data) and data[suffix_end] != 0:
  154. suffix_end += 1
  155. if suffix_end >= len(data):
  156. raise ValueError("Unterminated path suffix in compressed entry")
  157. suffix = data[suffix_start:suffix_end]
  158. new_offset = suffix_end + 1 # Skip the NUL terminator
  159. # Reconstruct the path
  160. if remove_len > len(previous_path):
  161. raise ValueError(
  162. f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
  163. )
  164. prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
  165. path = prefix + suffix
  166. return path, new_offset
  167. def _decompress_path_from_stream(
  168. f: BinaryIO, previous_path: bytes
  169. ) -> tuple[bytes, int]:
  170. """Decompress a path from index version 4 compressed format, reading from stream.
  171. Args:
  172. f: File-like object to read from
  173. previous_path: Previous path for decompression
  174. Returns:
  175. tuple of (decompressed_path, bytes_consumed)
  176. """
  177. # Decode the varint for remove_len by reading byte by byte
  178. remove_len = 0
  179. shift = 0
  180. bytes_consumed = 0
  181. while True:
  182. byte_data = f.read(1)
  183. if not byte_data:
  184. raise ValueError("Unexpected end of file while reading varint")
  185. byte = byte_data[0]
  186. bytes_consumed += 1
  187. remove_len |= (byte & 0x7F) << shift
  188. shift += 7
  189. if not (byte & 0x80): # No continuation bit
  190. break
  191. # Read the suffix until NUL terminator
  192. suffix = b""
  193. while True:
  194. byte_data = f.read(1)
  195. if not byte_data:
  196. raise ValueError("Unexpected end of file while reading path suffix")
  197. byte = byte_data[0]
  198. bytes_consumed += 1
  199. if byte == 0: # NUL terminator
  200. break
  201. suffix += bytes([byte])
  202. # Reconstruct the path
  203. if remove_len > len(previous_path):
  204. raise ValueError(
  205. f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
  206. )
  207. prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
  208. path = prefix + suffix
  209. return path, bytes_consumed
  210. class Stage(Enum):
  211. NORMAL = 0
  212. MERGE_CONFLICT_ANCESTOR = 1
  213. MERGE_CONFLICT_THIS = 2
  214. MERGE_CONFLICT_OTHER = 3
  215. @dataclass
  216. class SerializedIndexEntry:
  217. name: bytes
  218. ctime: Union[int, float, tuple[int, int]]
  219. mtime: Union[int, float, tuple[int, int]]
  220. dev: int
  221. ino: int
  222. mode: int
  223. uid: int
  224. gid: int
  225. size: int
  226. sha: bytes
  227. flags: int
  228. extended_flags: int
  229. def stage(self) -> Stage:
  230. return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
  231. @dataclass
  232. class IndexExtension:
  233. """Base class for index extensions."""
  234. signature: bytes
  235. data: bytes
  236. @classmethod
  237. def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
  238. """Create an extension from raw data.
  239. Args:
  240. signature: 4-byte extension signature
  241. data: Extension data
  242. Returns:
  243. Parsed extension object
  244. """
  245. if signature == TREE_EXTENSION:
  246. return TreeExtension.from_bytes(data)
  247. elif signature == REUC_EXTENSION:
  248. return ResolveUndoExtension.from_bytes(data)
  249. elif signature == UNTR_EXTENSION:
  250. return UntrackedExtension.from_bytes(data)
  251. else:
  252. # Unknown extension - just store raw data
  253. return cls(signature, data)
  254. def to_bytes(self) -> bytes:
  255. """Serialize extension to bytes."""
  256. return self.data
  257. class TreeExtension(IndexExtension):
  258. """Tree cache extension."""
  259. def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
  260. self.entries = entries
  261. super().__init__(TREE_EXTENSION, b"")
  262. @classmethod
  263. def from_bytes(cls, data: bytes) -> "TreeExtension":
  264. # TODO: Implement tree cache parsing
  265. return cls([])
  266. def to_bytes(self) -> bytes:
  267. # TODO: Implement tree cache serialization
  268. return b""
  269. class ResolveUndoExtension(IndexExtension):
  270. """Resolve undo extension for recording merge conflicts."""
  271. def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
  272. self.entries = entries
  273. super().__init__(REUC_EXTENSION, b"")
  274. @classmethod
  275. def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
  276. # TODO: Implement resolve undo parsing
  277. return cls([])
  278. def to_bytes(self) -> bytes:
  279. # TODO: Implement resolve undo serialization
  280. return b""
  281. class UntrackedExtension(IndexExtension):
  282. """Untracked cache extension."""
  283. def __init__(self, data: bytes) -> None:
  284. super().__init__(UNTR_EXTENSION, data)
  285. @classmethod
  286. def from_bytes(cls, data: bytes) -> "UntrackedExtension":
  287. return cls(data)
  288. @dataclass
  289. class IndexEntry:
  290. ctime: Union[int, float, tuple[int, int]]
  291. mtime: Union[int, float, tuple[int, int]]
  292. dev: int
  293. ino: int
  294. mode: int
  295. uid: int
  296. gid: int
  297. size: int
  298. sha: bytes
  299. flags: int = 0
  300. extended_flags: int = 0
  301. @classmethod
  302. def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
  303. return cls(
  304. ctime=serialized.ctime,
  305. mtime=serialized.mtime,
  306. dev=serialized.dev,
  307. ino=serialized.ino,
  308. mode=serialized.mode,
  309. uid=serialized.uid,
  310. gid=serialized.gid,
  311. size=serialized.size,
  312. sha=serialized.sha,
  313. flags=serialized.flags,
  314. extended_flags=serialized.extended_flags,
  315. )
  316. def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
  317. # Clear out any existing stage bits, then set them from the Stage.
  318. new_flags = self.flags & ~FLAG_STAGEMASK
  319. new_flags |= stage.value << FLAG_STAGESHIFT
  320. return SerializedIndexEntry(
  321. name=name,
  322. ctime=self.ctime,
  323. mtime=self.mtime,
  324. dev=self.dev,
  325. ino=self.ino,
  326. mode=self.mode,
  327. uid=self.uid,
  328. gid=self.gid,
  329. size=self.size,
  330. sha=self.sha,
  331. flags=new_flags,
  332. extended_flags=self.extended_flags,
  333. )
  334. def stage(self) -> Stage:
  335. return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
  336. @property
  337. def skip_worktree(self) -> bool:
  338. """Return True if the skip-worktree bit is set in extended_flags."""
  339. return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
  340. def set_skip_worktree(self, skip: bool = True) -> None:
  341. """Helper method to set or clear the skip-worktree bit in extended_flags.
  342. Also sets FLAG_EXTENDED in self.flags if needed.
  343. """
  344. if skip:
  345. # Turn on the skip-worktree bit
  346. self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
  347. # Also ensure the main 'extended' bit is set in flags
  348. self.flags |= FLAG_EXTENDED
  349. else:
  350. # Turn off the skip-worktree bit
  351. self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
  352. # Optionally unset the main extended bit if no extended flags remain
  353. if self.extended_flags == 0:
  354. self.flags &= ~FLAG_EXTENDED
  355. class ConflictedIndexEntry:
  356. """Index entry that represents a conflict."""
  357. ancestor: Optional[IndexEntry]
  358. this: Optional[IndexEntry]
  359. other: Optional[IndexEntry]
  360. def __init__(
  361. self,
  362. ancestor: Optional[IndexEntry] = None,
  363. this: Optional[IndexEntry] = None,
  364. other: Optional[IndexEntry] = None,
  365. ) -> None:
  366. self.ancestor = ancestor
  367. self.this = this
  368. self.other = other
  369. class UnmergedEntries(Exception):
  370. """Unmerged entries exist in the index."""
  371. def pathsplit(path: bytes) -> tuple[bytes, bytes]:
  372. """Split a /-delimited path into a directory part and a basename.
  373. Args:
  374. path: The path to split.
  375. Returns:
  376. Tuple with directory name and basename
  377. """
  378. try:
  379. (dirname, basename) = path.rsplit(b"/", 1)
  380. except ValueError:
  381. return (b"", path)
  382. else:
  383. return (dirname, basename)
  384. def pathjoin(*args: bytes) -> bytes:
  385. """Join a /-delimited path."""
  386. return b"/".join([p for p in args if p])
  387. def read_cache_time(f: BinaryIO) -> tuple[int, int]:
  388. """Read a cache time.
  389. Args:
  390. f: File-like object to read from
  391. Returns:
  392. Tuple with seconds and nanoseconds
  393. """
  394. return struct.unpack(">LL", f.read(8))
  395. def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None:
  396. """Write a cache time.
  397. Args:
  398. f: File-like object to write to
  399. t: Time to write (as int, float or tuple with secs and nsecs)
  400. """
  401. if isinstance(t, int):
  402. t = (t, 0)
  403. elif isinstance(t, float):
  404. (secs, nsecs) = divmod(t, 1.0)
  405. t = (int(secs), int(nsecs * 1000000000))
  406. elif not isinstance(t, tuple):
  407. raise TypeError(t)
  408. f.write(struct.pack(">LL", *t))
  409. def read_cache_entry(
  410. f: BinaryIO, version: int, previous_path: bytes = b""
  411. ) -> SerializedIndexEntry:
  412. """Read an entry from a cache file.
  413. Args:
  414. f: File-like object to read from
  415. version: Index version
  416. previous_path: Previous entry's path (for version 4 compression)
  417. """
  418. beginoffset = f.tell()
  419. ctime = read_cache_time(f)
  420. mtime = read_cache_time(f)
  421. (
  422. dev,
  423. ino,
  424. mode,
  425. uid,
  426. gid,
  427. size,
  428. sha,
  429. flags,
  430. ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
  431. if flags & FLAG_EXTENDED:
  432. if version < 3:
  433. raise AssertionError("extended flag set in index with version < 3")
  434. (extended_flags,) = struct.unpack(">H", f.read(2))
  435. else:
  436. extended_flags = 0
  437. if version >= 4:
  438. # Version 4: paths are always compressed (name_len should be 0)
  439. name, consumed = _decompress_path_from_stream(f, previous_path)
  440. else:
  441. # Versions < 4: regular name reading
  442. name = f.read(flags & FLAG_NAMEMASK)
  443. # Padding:
  444. if version < 4:
  445. real_size = (f.tell() - beginoffset + 8) & ~7
  446. f.read((beginoffset + real_size) - f.tell())
  447. return SerializedIndexEntry(
  448. name,
  449. ctime,
  450. mtime,
  451. dev,
  452. ino,
  453. mode,
  454. uid,
  455. gid,
  456. size,
  457. sha_to_hex(sha),
  458. flags & ~FLAG_NAMEMASK,
  459. extended_flags,
  460. )
  461. def write_cache_entry(
  462. f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
  463. ) -> None:
  464. """Write an index entry to a file.
  465. Args:
  466. f: File object
  467. entry: IndexEntry to write
  468. version: Index format version
  469. previous_path: Previous entry's path (for version 4 compression)
  470. """
  471. beginoffset = f.tell()
  472. write_cache_time(f, entry.ctime)
  473. write_cache_time(f, entry.mtime)
  474. if version >= 4:
  475. # Version 4: use compression but set name_len to actual filename length
  476. # This matches how C Git implements index v4 flags
  477. compressed_path = _compress_path(entry.name, previous_path)
  478. flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
  479. else:
  480. # Versions < 4: include actual name length
  481. flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
  482. if entry.extended_flags:
  483. flags |= FLAG_EXTENDED
  484. if flags & FLAG_EXTENDED and version is not None and version < 3:
  485. raise AssertionError("unable to use extended flags in version < 3")
  486. f.write(
  487. struct.pack(
  488. b">LLLLLL20sH",
  489. entry.dev & 0xFFFFFFFF,
  490. entry.ino & 0xFFFFFFFF,
  491. entry.mode,
  492. entry.uid,
  493. entry.gid,
  494. entry.size,
  495. hex_to_sha(entry.sha),
  496. flags,
  497. )
  498. )
  499. if flags & FLAG_EXTENDED:
  500. f.write(struct.pack(b">H", entry.extended_flags))
  501. if version >= 4:
  502. # Version 4: always write compressed path
  503. f.write(compressed_path)
  504. else:
  505. # Versions < 4: write regular path and padding
  506. f.write(entry.name)
  507. real_size = (f.tell() - beginoffset + 8) & ~7
  508. f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
  509. class UnsupportedIndexFormat(Exception):
  510. """An unsupported index format was encountered."""
  511. def __init__(self, version: int) -> None:
  512. self.index_format_version = version
  513. def read_index_header(f: BinaryIO) -> tuple[int, int]:
  514. """Read an index header from a file.
  515. Returns:
  516. tuple of (version, num_entries)
  517. """
  518. header = f.read(4)
  519. if header != b"DIRC":
  520. raise AssertionError(f"Invalid index file header: {header!r}")
  521. (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
  522. if version not in (1, 2, 3, 4):
  523. raise UnsupportedIndexFormat(version)
  524. return version, num_entries
  525. def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
  526. """Write an index extension.
  527. Args:
  528. f: File-like object to write to
  529. extension: Extension to write
  530. """
  531. data = extension.to_bytes()
  532. f.write(extension.signature)
  533. f.write(struct.pack(">I", len(data)))
  534. f.write(data)
  535. def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
  536. """Read an index file, yielding the individual entries."""
  537. version, num_entries = read_index_header(f)
  538. previous_path = b""
  539. for i in range(num_entries):
  540. entry = read_cache_entry(f, version, previous_path)
  541. previous_path = entry.name
  542. yield entry
  543. def read_index_dict_with_version(
  544. f: BinaryIO,
  545. ) -> tuple[
  546. dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
  547. ]:
  548. """Read an index file and return it as a dictionary along with the version.
  549. Returns:
  550. tuple of (entries_dict, version, extensions)
  551. """
  552. version, num_entries = read_index_header(f)
  553. ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
  554. previous_path = b""
  555. for i in range(num_entries):
  556. entry = read_cache_entry(f, version, previous_path)
  557. previous_path = entry.name
  558. stage = entry.stage()
  559. if stage == Stage.NORMAL:
  560. ret[entry.name] = IndexEntry.from_serialized(entry)
  561. else:
  562. existing = ret.setdefault(entry.name, ConflictedIndexEntry())
  563. if isinstance(existing, IndexEntry):
  564. raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
  565. if stage == Stage.MERGE_CONFLICT_ANCESTOR:
  566. existing.ancestor = IndexEntry.from_serialized(entry)
  567. elif stage == Stage.MERGE_CONFLICT_THIS:
  568. existing.this = IndexEntry.from_serialized(entry)
  569. elif stage == Stage.MERGE_CONFLICT_OTHER:
  570. existing.other = IndexEntry.from_serialized(entry)
  571. # Read extensions
  572. extensions = []
  573. while True:
  574. # Check if we're at the end (20 bytes before EOF for SHA checksum)
  575. current_pos = f.tell()
  576. f.seek(0, 2) # EOF
  577. eof_pos = f.tell()
  578. f.seek(current_pos)
  579. if current_pos >= eof_pos - 20:
  580. break
  581. # Try to read extension signature
  582. signature = f.read(4)
  583. if len(signature) < 4:
  584. break
  585. # Check if it's a valid extension signature (4 uppercase letters)
  586. if not all(65 <= b <= 90 for b in signature):
  587. # Not an extension, seek back
  588. f.seek(-4, 1)
  589. break
  590. # Read extension size
  591. size_data = f.read(4)
  592. if len(size_data) < 4:
  593. break
  594. size = struct.unpack(">I", size_data)[0]
  595. # Read extension data
  596. data = f.read(size)
  597. if len(data) < size:
  598. break
  599. extension = IndexExtension.from_raw(signature, data)
  600. extensions.append(extension)
  601. return ret, version, extensions
  602. def read_index_dict(
  603. f: BinaryIO,
  604. ) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
  605. """Read an index file and return it as a dictionary.
  606. Dict Key is tuple of path and stage number, as
  607. path alone is not unique
  608. Args:
  609. f: File object to read fromls.
  610. """
  611. ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
  612. for entry in read_index(f):
  613. stage = entry.stage()
  614. if stage == Stage.NORMAL:
  615. ret[entry.name] = IndexEntry.from_serialized(entry)
  616. else:
  617. existing = ret.setdefault(entry.name, ConflictedIndexEntry())
  618. if isinstance(existing, IndexEntry):
  619. raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
  620. if stage == Stage.MERGE_CONFLICT_ANCESTOR:
  621. existing.ancestor = IndexEntry.from_serialized(entry)
  622. elif stage == Stage.MERGE_CONFLICT_THIS:
  623. existing.this = IndexEntry.from_serialized(entry)
  624. elif stage == Stage.MERGE_CONFLICT_OTHER:
  625. existing.other = IndexEntry.from_serialized(entry)
  626. return ret
  627. def write_index(
  628. f: BinaryIO,
  629. entries: list[SerializedIndexEntry],
  630. version: Optional[int] = None,
  631. extensions: Optional[list[IndexExtension]] = None,
  632. ) -> None:
  633. """Write an index file.
  634. Args:
  635. f: File-like object to write to
  636. version: Version number to write
  637. entries: Iterable over the entries to write
  638. extensions: Optional list of extensions to write
  639. """
  640. if version is None:
  641. version = DEFAULT_VERSION
  642. # STEP 1: check if any extended_flags are set
  643. uses_extended_flags = any(e.extended_flags != 0 for e in entries)
  644. if uses_extended_flags and version < 3:
  645. # Force or bump the version to 3
  646. version = 3
  647. # The rest is unchanged, but you might insert a final check:
  648. if version < 3:
  649. # Double-check no extended flags appear
  650. for e in entries:
  651. if e.extended_flags != 0:
  652. raise AssertionError("Attempt to use extended flags in index < v3")
  653. # Proceed with the existing code to write the header and entries.
  654. f.write(b"DIRC")
  655. f.write(struct.pack(b">LL", version, len(entries)))
  656. previous_path = b""
  657. for entry in entries:
  658. write_cache_entry(f, entry, version=version, previous_path=previous_path)
  659. previous_path = entry.name
  660. # Write extensions
  661. if extensions:
  662. for extension in extensions:
  663. write_index_extension(f, extension)
  664. def write_index_dict(
  665. f: BinaryIO,
  666. entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
  667. version: Optional[int] = None,
  668. extensions: Optional[list[IndexExtension]] = None,
  669. ) -> None:
  670. """Write an index file based on the contents of a dictionary.
  671. being careful to sort by path and then by stage.
  672. """
  673. entries_list = []
  674. for key in sorted(entries):
  675. value = entries[key]
  676. if isinstance(value, ConflictedIndexEntry):
  677. if value.ancestor is not None:
  678. entries_list.append(
  679. value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
  680. )
  681. if value.this is not None:
  682. entries_list.append(
  683. value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
  684. )
  685. if value.other is not None:
  686. entries_list.append(
  687. value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
  688. )
  689. else:
  690. entries_list.append(value.serialize(key, Stage.NORMAL))
  691. write_index(f, entries_list, version=version, extensions=extensions)
  692. def cleanup_mode(mode: int) -> int:
  693. """Cleanup a mode value.
  694. This will return a mode that can be stored in a tree object.
  695. Args:
  696. mode: Mode to clean up.
  697. Returns:
  698. mode
  699. """
  700. if stat.S_ISLNK(mode):
  701. return stat.S_IFLNK
  702. elif stat.S_ISDIR(mode):
  703. return stat.S_IFDIR
  704. elif S_ISGITLINK(mode):
  705. return S_IFGITLINK
  706. ret = stat.S_IFREG | 0o644
  707. if mode & 0o100:
  708. ret |= 0o111
  709. return ret
  710. class Index:
  711. """A Git Index file."""
  712. _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
  713. def __init__(
  714. self,
  715. filename: Union[bytes, str, os.PathLike],
  716. read: bool = True,
  717. skip_hash: bool = False,
  718. version: Optional[int] = None,
  719. ) -> None:
  720. """Create an index object associated with the given filename.
  721. Args:
  722. filename: Path to the index file
  723. read: Whether to initialize the index from the given file, should it exist.
  724. skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
  725. version: Index format version to use (None = auto-detect from file or use default)
  726. """
  727. self._filename = os.fspath(filename)
  728. # TODO(jelmer): Store the version returned by read_index
  729. self._version = version
  730. self._skip_hash = skip_hash
  731. self._extensions: list[IndexExtension] = []
  732. self.clear()
  733. if read:
  734. self.read()
  735. @property
  736. def path(self) -> Union[bytes, str]:
  737. return self._filename
  738. def __repr__(self) -> str:
  739. return f"{self.__class__.__name__}({self._filename!r})"
  740. def write(self) -> None:
  741. """Write current contents of index to disk."""
  742. from typing import BinaryIO, cast
  743. f = GitFile(self._filename, "wb")
  744. try:
  745. if self._skip_hash:
  746. # When skipHash is enabled, write the index without computing SHA1
  747. write_index_dict(
  748. cast(BinaryIO, f),
  749. self._byname,
  750. version=self._version,
  751. extensions=self._extensions,
  752. )
  753. # Write 20 zero bytes instead of SHA1
  754. f.write(b"\x00" * 20)
  755. f.close()
  756. else:
  757. sha1_writer = SHA1Writer(cast(BinaryIO, f))
  758. write_index_dict(
  759. cast(BinaryIO, sha1_writer),
  760. self._byname,
  761. version=self._version,
  762. extensions=self._extensions,
  763. )
  764. sha1_writer.close()
  765. except:
  766. f.close()
  767. raise
  768. def read(self) -> None:
  769. """Read current contents of index from disk."""
  770. if not os.path.exists(self._filename):
  771. return
  772. f = GitFile(self._filename, "rb")
  773. try:
  774. sha1_reader = SHA1Reader(f)
  775. entries, version, extensions = read_index_dict_with_version(
  776. cast(BinaryIO, sha1_reader)
  777. )
  778. self._version = version
  779. self._extensions = extensions
  780. self.update(entries)
  781. # Extensions have already been read by read_index_dict_with_version
  782. sha1_reader.check_sha(allow_empty=True)
  783. finally:
  784. f.close()
  785. def __len__(self) -> int:
  786. """Number of entries in this index file."""
  787. return len(self._byname)
  788. def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
  789. """Retrieve entry by relative path and stage.
  790. Returns: Either a IndexEntry or a ConflictedIndexEntry
  791. Raises KeyError: if the entry does not exist
  792. """
  793. return self._byname[key]
  794. def __iter__(self) -> Iterator[bytes]:
  795. """Iterate over the paths and stages in this index."""
  796. return iter(self._byname)
  797. def __contains__(self, key: bytes) -> bool:
  798. return key in self._byname
  799. def get_sha1(self, path: bytes) -> bytes:
  800. """Return the (git object) SHA1 for the object at a path."""
  801. value = self[path]
  802. if isinstance(value, ConflictedIndexEntry):
  803. raise UnmergedEntries
  804. return value.sha
  805. def get_mode(self, path: bytes) -> int:
  806. """Return the POSIX file mode for the object at a path."""
  807. value = self[path]
  808. if isinstance(value, ConflictedIndexEntry):
  809. raise UnmergedEntries
  810. return value.mode
  811. def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
  812. """Iterate over path, sha, mode tuples for use with commit_tree."""
  813. for path in self:
  814. entry = self[path]
  815. if isinstance(entry, ConflictedIndexEntry):
  816. raise UnmergedEntries
  817. yield path, entry.sha, cleanup_mode(entry.mode)
  818. def has_conflicts(self) -> bool:
  819. for value in self._byname.values():
  820. if isinstance(value, ConflictedIndexEntry):
  821. return True
  822. return False
  823. def clear(self) -> None:
  824. """Remove all contents from this index."""
  825. self._byname = {}
  826. def __setitem__(
  827. self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
  828. ) -> None:
  829. assert isinstance(name, bytes)
  830. self._byname[name] = value
  831. def __delitem__(self, name: bytes) -> None:
  832. del self._byname[name]
  833. def iteritems(
  834. self,
  835. ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
  836. return iter(self._byname.items())
  837. def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
  838. return iter(self._byname.items())
  839. def update(
  840. self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
  841. ) -> None:
  842. for key, value in entries.items():
  843. self[key] = value
  844. def paths(self) -> Generator[bytes, None, None]:
  845. yield from self._byname.keys()
  846. def changes_from_tree(
  847. self,
  848. object_store: ObjectContainer,
  849. tree: ObjectID,
  850. want_unchanged: bool = False,
  851. ) -> Generator[
  852. tuple[
  853. tuple[Optional[bytes], Optional[bytes]],
  854. tuple[Optional[int], Optional[int]],
  855. tuple[Optional[bytes], Optional[bytes]],
  856. ],
  857. None,
  858. None,
  859. ]:
  860. """Find the differences between the contents of this index and a tree.
  861. Args:
  862. object_store: Object store to use for retrieving tree contents
  863. tree: SHA1 of the root tree
  864. want_unchanged: Whether unchanged files should be reported
  865. Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
  866. newmode), (oldsha, newsha)
  867. """
  868. def lookup_entry(path: bytes) -> tuple[bytes, int]:
  869. entry = self[path]
  870. if hasattr(entry, "sha") and hasattr(entry, "mode"):
  871. return entry.sha, cleanup_mode(entry.mode)
  872. else:
  873. # Handle ConflictedIndexEntry case
  874. return b"", 0
  875. yield from changes_from_tree(
  876. self.paths(),
  877. lookup_entry,
  878. object_store,
  879. tree,
  880. want_unchanged=want_unchanged,
  881. )
  882. def commit(self, object_store: ObjectContainer) -> bytes:
  883. """Create a new tree from an index.
  884. Args:
  885. object_store: Object store to save the tree in
  886. Returns:
  887. Root tree SHA
  888. """
  889. return commit_tree(object_store, self.iterobjects())
  890. def commit_tree(
  891. object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
  892. ) -> bytes:
  893. """Commit a new tree.
  894. Args:
  895. object_store: Object store to add trees to
  896. blobs: Iterable over blob path, sha, mode entries
  897. Returns:
  898. SHA1 of the created tree.
  899. """
  900. trees: dict[bytes, Any] = {b"": {}}
  901. def add_tree(path: bytes) -> dict[bytes, Any]:
  902. if path in trees:
  903. return trees[path]
  904. dirname, basename = pathsplit(path)
  905. t = add_tree(dirname)
  906. assert isinstance(basename, bytes)
  907. newtree: dict[bytes, Any] = {}
  908. t[basename] = newtree
  909. trees[path] = newtree
  910. return newtree
  911. for path, sha, mode in blobs:
  912. tree_path, basename = pathsplit(path)
  913. tree = add_tree(tree_path)
  914. tree[basename] = (mode, sha)
  915. def build_tree(path: bytes) -> bytes:
  916. tree = Tree()
  917. for basename, entry in trees[path].items():
  918. if isinstance(entry, dict):
  919. mode = stat.S_IFDIR
  920. sha = build_tree(pathjoin(path, basename))
  921. else:
  922. (mode, sha) = entry
  923. tree.add(basename, mode, sha)
  924. object_store.add_object(tree)
  925. return tree.id
  926. return build_tree(b"")
  927. def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
  928. """Create a new tree from an index.
  929. Args:
  930. object_store: Object store to save the tree in
  931. index: Index file
  932. Note: This function is deprecated, use index.commit() instead.
  933. Returns: Root tree sha.
  934. """
  935. return commit_tree(object_store, index.iterobjects())
  936. def changes_from_tree(
  937. names: Iterable[bytes],
  938. lookup_entry: Callable[[bytes], tuple[bytes, int]],
  939. object_store: ObjectContainer,
  940. tree: Optional[bytes],
  941. want_unchanged: bool = False,
  942. ) -> Iterable[
  943. tuple[
  944. tuple[Optional[bytes], Optional[bytes]],
  945. tuple[Optional[int], Optional[int]],
  946. tuple[Optional[bytes], Optional[bytes]],
  947. ]
  948. ]:
  949. """Find the differences between the contents of a tree and
  950. a working copy.
  951. Args:
  952. names: Iterable of names in the working copy
  953. lookup_entry: Function to lookup an entry in the working copy
  954. object_store: Object store to use for retrieving tree contents
  955. tree: SHA1 of the root tree, or None for an empty tree
  956. want_unchanged: Whether unchanged files should be reported
  957. Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
  958. (oldsha, newsha)
  959. """
  960. # TODO(jelmer): Support a include_trees option
  961. other_names = set(names)
  962. if tree is not None:
  963. for name, mode, sha in iter_tree_contents(object_store, tree):
  964. try:
  965. (other_sha, other_mode) = lookup_entry(name)
  966. except KeyError:
  967. # Was removed
  968. yield ((name, None), (mode, None), (sha, None))
  969. else:
  970. other_names.remove(name)
  971. if want_unchanged or other_sha != sha or other_mode != mode:
  972. yield ((name, name), (mode, other_mode), (sha, other_sha))
  973. # Mention added files
  974. for name in other_names:
  975. try:
  976. (other_sha, other_mode) = lookup_entry(name)
  977. except KeyError:
  978. pass
  979. else:
  980. yield ((None, name), (None, other_mode), (None, other_sha))
  981. def index_entry_from_stat(
  982. stat_val: os.stat_result,
  983. hex_sha: bytes,
  984. mode: Optional[int] = None,
  985. ) -> IndexEntry:
  986. """Create a new index entry from a stat value.
  987. Args:
  988. stat_val: POSIX stat_result instance
  989. hex_sha: Hex sha of the object
  990. """
  991. if mode is None:
  992. mode = cleanup_mode(stat_val.st_mode)
  993. return IndexEntry(
  994. ctime=stat_val.st_ctime,
  995. mtime=stat_val.st_mtime,
  996. dev=stat_val.st_dev,
  997. ino=stat_val.st_ino,
  998. mode=mode,
  999. uid=stat_val.st_uid,
  1000. gid=stat_val.st_gid,
  1001. size=stat_val.st_size,
  1002. sha=hex_sha,
  1003. flags=0,
  1004. extended_flags=0,
  1005. )
  1006. if sys.platform == "win32":
  1007. # On Windows, creating symlinks either requires administrator privileges
  1008. # or developer mode. Raise a more helpful error when we're unable to
  1009. # create symlinks
  1010. # https://github.com/jelmer/dulwich/issues/1005
  1011. class WindowsSymlinkPermissionError(PermissionError):
  1012. def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
  1013. super(PermissionError, self).__init__(
  1014. errno,
  1015. f"Unable to create symlink; do you have developer mode enabled? {msg}",
  1016. filename,
  1017. )
  1018. def symlink(
  1019. src: Union[str, bytes],
  1020. dst: Union[str, bytes],
  1021. target_is_directory: bool = False,
  1022. *,
  1023. dir_fd: Optional[int] = None,
  1024. ) -> None:
  1025. try:
  1026. return os.symlink(
  1027. src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
  1028. )
  1029. except PermissionError as e:
  1030. raise WindowsSymlinkPermissionError(
  1031. e.errno or 0, e.strerror or "", e.filename
  1032. ) from e
  1033. else:
  1034. symlink = os.symlink
  1035. def build_file_from_blob(
  1036. blob: Blob,
  1037. mode: int,
  1038. target_path: bytes,
  1039. *,
  1040. honor_filemode: bool = True,
  1041. tree_encoding: str = "utf-8",
  1042. symlink_fn: Optional[Callable] = None,
  1043. ) -> os.stat_result:
  1044. """Build a file or symlink on disk based on a Git object.
  1045. Args:
  1046. blob: The git object
  1047. mode: File mode
  1048. target_path: Path to write to
  1049. honor_filemode: An optional flag to honor core.filemode setting in
  1050. config file, default is core.filemode=True, change executable bit
  1051. symlink_fn: Function to use for creating symlinks
  1052. Returns: stat object for the file
  1053. """
  1054. try:
  1055. oldstat = os.lstat(target_path)
  1056. except FileNotFoundError:
  1057. oldstat = None
  1058. contents = blob.as_raw_string()
  1059. if stat.S_ISLNK(mode):
  1060. if oldstat:
  1061. _remove_file_with_readonly_handling(target_path)
  1062. if sys.platform == "win32":
  1063. # os.readlink on Python3 on Windows requires a unicode string.
  1064. contents_str = contents.decode(tree_encoding)
  1065. target_path_str = target_path.decode(tree_encoding)
  1066. (symlink_fn or symlink)(contents_str, target_path_str)
  1067. else:
  1068. (symlink_fn or symlink)(contents, target_path)
  1069. else:
  1070. if oldstat is not None and oldstat.st_size == len(contents):
  1071. with open(target_path, "rb") as f:
  1072. if f.read() == contents:
  1073. return oldstat
  1074. with open(target_path, "wb") as f:
  1075. # Write out file
  1076. f.write(contents)
  1077. if honor_filemode:
  1078. os.chmod(target_path, mode)
  1079. return os.lstat(target_path)
  1080. INVALID_DOTNAMES = (b".git", b".", b"..", b"")
  1081. def validate_path_element_default(element: bytes) -> bool:
  1082. return element.lower() not in INVALID_DOTNAMES
  1083. def validate_path_element_ntfs(element: bytes) -> bool:
  1084. stripped = element.rstrip(b". ").lower()
  1085. if stripped in INVALID_DOTNAMES:
  1086. return False
  1087. if stripped == b"git~1":
  1088. return False
  1089. return True
  1090. def validate_path(
  1091. path: bytes,
  1092. element_validator: Callable[[bytes], bool] = validate_path_element_default,
  1093. ) -> bool:
  1094. """Default path validator that just checks for .git/."""
  1095. parts = path.split(b"/")
  1096. for p in parts:
  1097. if not element_validator(p):
  1098. return False
  1099. else:
  1100. return True
  1101. def build_index_from_tree(
  1102. root_path: Union[str, bytes],
  1103. index_path: Union[str, bytes],
  1104. object_store: ObjectContainer,
  1105. tree_id: bytes,
  1106. honor_filemode: bool = True,
  1107. validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
  1108. symlink_fn: Optional[Callable] = None,
  1109. blob_normalizer: Optional["BlobNormalizer"] = None,
  1110. ) -> None:
  1111. """Generate and materialize index from a tree.
  1112. Args:
  1113. tree_id: Tree to materialize
  1114. root_path: Target dir for materialized index files
  1115. index_path: Target path for generated index
  1116. object_store: Non-empty object store holding tree contents
  1117. honor_filemode: An optional flag to honor core.filemode setting in
  1118. config file, default is core.filemode=True, change executable bit
  1119. validate_path_element: Function to validate path elements to check
  1120. out; default just refuses .git and .. directories.
  1121. blob_normalizer: An optional BlobNormalizer to use for converting line
  1122. endings when writing blobs to the working directory.
  1123. Note: existing index is wiped and contents are not merged
  1124. in a working dir. Suitable only for fresh clones.
  1125. """
  1126. index = Index(index_path, read=False)
  1127. if not isinstance(root_path, bytes):
  1128. root_path = os.fsencode(root_path)
  1129. for entry in iter_tree_contents(object_store, tree_id):
  1130. if not validate_path(entry.path, validate_path_element):
  1131. continue
  1132. full_path = _tree_to_fs_path(root_path, entry.path)
  1133. if not os.path.exists(os.path.dirname(full_path)):
  1134. os.makedirs(os.path.dirname(full_path))
  1135. # TODO(jelmer): Merge new index into working tree
  1136. if S_ISGITLINK(entry.mode):
  1137. if not os.path.isdir(full_path):
  1138. os.mkdir(full_path)
  1139. st = os.lstat(full_path)
  1140. # TODO(jelmer): record and return submodule paths
  1141. else:
  1142. obj = object_store[entry.sha]
  1143. assert isinstance(obj, Blob)
  1144. # Apply blob normalization for checkout if normalizer is provided
  1145. if blob_normalizer is not None:
  1146. obj = blob_normalizer.checkout_normalize(obj, entry.path)
  1147. st = build_file_from_blob(
  1148. obj,
  1149. entry.mode,
  1150. full_path,
  1151. honor_filemode=honor_filemode,
  1152. symlink_fn=symlink_fn,
  1153. )
  1154. # Add file to index
  1155. if not honor_filemode or S_ISGITLINK(entry.mode):
  1156. # we can not use tuple slicing to build a new tuple,
  1157. # because on windows that will convert the times to
  1158. # longs, which causes errors further along
  1159. st_tuple = (
  1160. entry.mode,
  1161. st.st_ino,
  1162. st.st_dev,
  1163. st.st_nlink,
  1164. st.st_uid,
  1165. st.st_gid,
  1166. st.st_size,
  1167. st.st_atime,
  1168. st.st_mtime,
  1169. st.st_ctime,
  1170. )
  1171. st = st.__class__(st_tuple)
  1172. # default to a stage 0 index entry (normal)
  1173. # when reading from the filesystem
  1174. index[entry.path] = index_entry_from_stat(st, entry.sha)
  1175. index.write()
  1176. def blob_from_path_and_mode(
  1177. fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
  1178. ) -> Blob:
  1179. """Create a blob from a path and a stat object.
  1180. Args:
  1181. fs_path: Full file system path to file
  1182. mode: File mode
  1183. Returns: A `Blob` object
  1184. """
  1185. assert isinstance(fs_path, bytes)
  1186. blob = Blob()
  1187. if stat.S_ISLNK(mode):
  1188. if sys.platform == "win32":
  1189. # os.readlink on Python3 on Windows requires a unicode string.
  1190. blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
  1191. else:
  1192. blob.data = os.readlink(fs_path)
  1193. else:
  1194. with open(fs_path, "rb") as f:
  1195. blob.data = f.read()
  1196. return blob
  1197. def blob_from_path_and_stat(
  1198. fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
  1199. ) -> Blob:
  1200. """Create a blob from a path and a stat object.
  1201. Args:
  1202. fs_path: Full file system path to file
  1203. st: A stat object
  1204. Returns: A `Blob` object
  1205. """
  1206. return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
  1207. def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
  1208. """Read the head commit of a submodule.
  1209. Args:
  1210. path: path to the submodule
  1211. Returns: HEAD sha, None if not a valid head/repository
  1212. """
  1213. from .errors import NotGitRepository
  1214. from .repo import Repo
  1215. # Repo currently expects a "str", so decode if necessary.
  1216. # TODO(jelmer): Perhaps move this into Repo() ?
  1217. if not isinstance(path, str):
  1218. path = os.fsdecode(path)
  1219. try:
  1220. repo = Repo(path)
  1221. except NotGitRepository:
  1222. return None
  1223. try:
  1224. return repo.head()
  1225. except KeyError:
  1226. return None
  1227. def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
  1228. """Check if a directory has changed after getting an error.
  1229. When handling an error trying to create a blob from a path, call this
  1230. function. It will check if the path is a directory. If it's a directory
  1231. and a submodule, check the submodule head to see if it's has changed. If
  1232. not, consider the file as changed as Git tracked a file and not a
  1233. directory.
  1234. Return true if the given path should be considered as changed and False
  1235. otherwise or if the path is not a directory.
  1236. """
  1237. # This is actually a directory
  1238. if os.path.exists(os.path.join(tree_path, b".git")):
  1239. # Submodule
  1240. head = read_submodule_head(tree_path)
  1241. if entry.sha != head:
  1242. return True
  1243. else:
  1244. # The file was changed to a directory, so consider it removed.
  1245. return True
  1246. return False
  1247. os_sep_bytes = os.sep.encode("ascii")
  1248. def _ensure_parent_dir_exists(full_path: bytes) -> None:
  1249. """Ensure parent directory exists, checking no parent is a file."""
  1250. parent_dir = os.path.dirname(full_path)
  1251. if parent_dir and not os.path.exists(parent_dir):
  1252. # Check if any parent in the path is a file
  1253. parts = parent_dir.split(os_sep_bytes)
  1254. for i in range(len(parts)):
  1255. partial_path = os_sep_bytes.join(parts[: i + 1])
  1256. if (
  1257. partial_path
  1258. and os.path.exists(partial_path)
  1259. and not os.path.isdir(partial_path)
  1260. ):
  1261. # Parent path is a file, this is an error
  1262. raise OSError(
  1263. f"Cannot create directory, parent path is a file: {partial_path!r}"
  1264. )
  1265. os.makedirs(parent_dir)
  1266. def _remove_file_with_readonly_handling(path: bytes) -> None:
  1267. """Remove a file, handling read-only files on Windows.
  1268. Args:
  1269. path: Path to the file to remove
  1270. """
  1271. try:
  1272. os.unlink(path)
  1273. except PermissionError:
  1274. # On Windows, remove read-only attribute and retry
  1275. if sys.platform == "win32":
  1276. os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
  1277. os.unlink(path)
  1278. else:
  1279. raise
  1280. def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
  1281. """Remove empty parent directories up to stop_at."""
  1282. parent = os.path.dirname(path)
  1283. while parent and parent != stop_at:
  1284. try:
  1285. os.rmdir(parent)
  1286. parent = os.path.dirname(parent)
  1287. except FileNotFoundError:
  1288. # Directory doesn't exist - stop trying
  1289. break
  1290. except OSError as e:
  1291. if e.errno == errno.ENOTEMPTY:
  1292. # Directory not empty - stop trying
  1293. break
  1294. raise
  1295. def _check_symlink_matches(
  1296. full_path: bytes, repo_object_store, entry_sha: bytes
  1297. ) -> bool:
  1298. """Check if symlink target matches expected target.
  1299. Returns True if symlink needs to be written, False if it matches.
  1300. """
  1301. try:
  1302. current_target = os.readlink(full_path)
  1303. blob_obj = repo_object_store[entry_sha]
  1304. expected_target = blob_obj.as_raw_string()
  1305. if isinstance(current_target, str):
  1306. current_target = current_target.encode()
  1307. return current_target != expected_target
  1308. except FileNotFoundError:
  1309. # Symlink doesn't exist
  1310. return True
  1311. except OSError as e:
  1312. if e.errno == errno.EINVAL:
  1313. # Not a symlink
  1314. return True
  1315. raise
  1316. def _check_file_matches(
  1317. repo_object_store,
  1318. full_path: bytes,
  1319. entry_sha: bytes,
  1320. entry_mode: int,
  1321. current_stat: os.stat_result,
  1322. honor_filemode: bool,
  1323. blob_normalizer: Optional["BlobNormalizer"] = None,
  1324. tree_path: Optional[bytes] = None,
  1325. ) -> bool:
  1326. """Check if a file on disk matches the expected git object.
  1327. Returns True if file needs to be written, False if it matches.
  1328. """
  1329. # Check mode first (if honor_filemode is True)
  1330. if honor_filemode:
  1331. current_mode = stat.S_IMODE(current_stat.st_mode)
  1332. expected_mode = stat.S_IMODE(entry_mode)
  1333. if current_mode != expected_mode:
  1334. return True
  1335. # If mode matches (or we don't care), check content via size first
  1336. blob_obj = repo_object_store[entry_sha]
  1337. if current_stat.st_size != blob_obj.raw_length():
  1338. return True
  1339. # Size matches, check actual content
  1340. try:
  1341. with open(full_path, "rb") as f:
  1342. current_content = f.read()
  1343. expected_content = blob_obj.as_raw_string()
  1344. if blob_normalizer and tree_path is not None:
  1345. normalized_blob = blob_normalizer.checkout_normalize(
  1346. blob_obj, tree_path
  1347. )
  1348. expected_content = normalized_blob.as_raw_string()
  1349. return current_content != expected_content
  1350. except (FileNotFoundError, PermissionError, IsADirectoryError):
  1351. return True
  1352. def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
  1353. """Transition any type to submodule."""
  1354. from .submodule import ensure_submodule_placeholder
  1355. if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
  1356. # Already a directory, just ensure .git file exists
  1357. ensure_submodule_placeholder(repo, path)
  1358. else:
  1359. # Remove whatever is there and create submodule
  1360. if current_stat is not None:
  1361. _remove_file_with_readonly_handling(full_path)
  1362. ensure_submodule_placeholder(repo, path)
  1363. st = os.lstat(full_path)
  1364. index[path] = index_entry_from_stat(st, entry.sha)
  1365. def _transition_to_file(
  1366. object_store,
  1367. path,
  1368. full_path,
  1369. current_stat,
  1370. entry,
  1371. index,
  1372. honor_filemode,
  1373. symlink_fn,
  1374. blob_normalizer,
  1375. ):
  1376. """Transition any type to regular file or symlink."""
  1377. # Check if we need to update
  1378. if (
  1379. current_stat is not None
  1380. and stat.S_ISREG(current_stat.st_mode)
  1381. and not stat.S_ISLNK(entry.mode)
  1382. ):
  1383. # File to file - check if update needed
  1384. needs_update = _check_file_matches(
  1385. object_store,
  1386. full_path,
  1387. entry.sha,
  1388. entry.mode,
  1389. current_stat,
  1390. honor_filemode,
  1391. blob_normalizer,
  1392. path,
  1393. )
  1394. elif (
  1395. current_stat is not None
  1396. and stat.S_ISLNK(current_stat.st_mode)
  1397. and stat.S_ISLNK(entry.mode)
  1398. ):
  1399. # Symlink to symlink - check if update needed
  1400. needs_update = _check_symlink_matches(full_path, object_store, entry.sha)
  1401. else:
  1402. needs_update = True
  1403. if not needs_update:
  1404. # Just update index - current_stat should always be valid here since we're not updating
  1405. index[path] = index_entry_from_stat(current_stat, entry.sha)
  1406. return
  1407. # Remove existing entry if needed
  1408. if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
  1409. # Remove directory
  1410. dir_contents = set(os.listdir(full_path))
  1411. git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
  1412. if git_file_name in dir_contents:
  1413. if dir_contents != {git_file_name}:
  1414. raise IsADirectoryError(
  1415. f"Cannot replace submodule with untracked files: {full_path!r}"
  1416. )
  1417. shutil.rmtree(full_path)
  1418. else:
  1419. try:
  1420. os.rmdir(full_path)
  1421. except OSError as e:
  1422. if e.errno == errno.ENOTEMPTY:
  1423. raise IsADirectoryError(
  1424. f"Cannot replace non-empty directory with file: {full_path!r}"
  1425. )
  1426. raise
  1427. elif current_stat is not None:
  1428. _remove_file_with_readonly_handling(full_path)
  1429. # Ensure parent directory exists
  1430. _ensure_parent_dir_exists(full_path)
  1431. # Write the file
  1432. blob_obj = object_store[entry.sha]
  1433. assert isinstance(blob_obj, Blob)
  1434. if blob_normalizer:
  1435. blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
  1436. st = build_file_from_blob(
  1437. blob_obj,
  1438. entry.mode,
  1439. full_path,
  1440. honor_filemode=honor_filemode,
  1441. symlink_fn=symlink_fn,
  1442. )
  1443. index[path] = index_entry_from_stat(st, entry.sha)
  1444. def _transition_to_absent(repo, path, full_path, current_stat, index):
  1445. """Remove any type of entry."""
  1446. if current_stat is None:
  1447. return
  1448. if stat.S_ISDIR(current_stat.st_mode):
  1449. # Check if it's a submodule directory
  1450. dir_contents = set(os.listdir(full_path))
  1451. git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
  1452. if git_file_name in dir_contents and dir_contents == {git_file_name}:
  1453. shutil.rmtree(full_path)
  1454. else:
  1455. try:
  1456. os.rmdir(full_path)
  1457. except OSError as e:
  1458. if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
  1459. raise
  1460. else:
  1461. _remove_file_with_readonly_handling(full_path)
  1462. try:
  1463. del index[path]
  1464. except KeyError:
  1465. pass
  1466. # Try to remove empty parent directories
  1467. _remove_empty_parents(
  1468. full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
  1469. )
  1470. def update_working_tree(
  1471. repo: "Repo",
  1472. old_tree_id: Optional[bytes],
  1473. new_tree_id: bytes,
  1474. honor_filemode: bool = True,
  1475. validate_path_element: Optional[Callable[[bytes], bool]] = None,
  1476. symlink_fn: Optional[Callable] = None,
  1477. force_remove_untracked: bool = False,
  1478. blob_normalizer: Optional["BlobNormalizer"] = None,
  1479. ) -> None:
  1480. """Update the working tree and index to match a new tree.
  1481. This function handles:
  1482. - Adding new files
  1483. - Updating modified files
  1484. - Removing deleted files
  1485. - Cleaning up empty directories
  1486. Args:
  1487. repo: Repository object
  1488. old_tree_id: SHA of the tree before the update
  1489. new_tree_id: SHA of the tree to update to
  1490. honor_filemode: An optional flag to honor core.filemode setting
  1491. validate_path_element: Function to validate path elements to check out
  1492. symlink_fn: Function to use for creating symlinks
  1493. force_remove_untracked: If True, remove files that exist in working
  1494. directory but not in target tree, even if old_tree_id is None
  1495. blob_normalizer: An optional BlobNormalizer to use for converting line
  1496. endings when writing blobs to the working directory.
  1497. """
  1498. if validate_path_element is None:
  1499. validate_path_element = validate_path_element_default
  1500. repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
  1501. index = repo.open_index()
  1502. # Build sets of paths for efficient lookup
  1503. new_paths = {}
  1504. for entry in iter_tree_contents(repo.object_store, new_tree_id):
  1505. if entry.path.startswith(b".git") or not validate_path(
  1506. entry.path, validate_path_element
  1507. ):
  1508. continue
  1509. new_paths[entry.path] = entry
  1510. old_paths = {}
  1511. if old_tree_id:
  1512. for entry in iter_tree_contents(repo.object_store, old_tree_id):
  1513. if not entry.path.startswith(b".git"):
  1514. old_paths[entry.path] = entry
  1515. # Process all paths
  1516. all_paths = set(new_paths.keys()) | set(old_paths.keys())
  1517. # Check for paths that need to become directories
  1518. paths_needing_dir = set()
  1519. for path in new_paths:
  1520. parts = path.split(b"/")
  1521. for i in range(1, len(parts)):
  1522. parent = b"/".join(parts[:i])
  1523. if parent in old_paths and parent not in new_paths:
  1524. paths_needing_dir.add(parent)
  1525. # Check if any path that needs to become a directory has been modified
  1526. current_stat: Optional[os.stat_result]
  1527. stat_cache: dict[bytes, Optional[os.stat_result]] = {}
  1528. for path in paths_needing_dir:
  1529. full_path = _tree_to_fs_path(repo_path, path)
  1530. try:
  1531. current_stat = os.lstat(full_path)
  1532. except FileNotFoundError:
  1533. # File doesn't exist, proceed
  1534. stat_cache[full_path] = None
  1535. except PermissionError:
  1536. # Can't read file, proceed
  1537. pass
  1538. else:
  1539. stat_cache[full_path] = current_stat
  1540. if stat.S_ISREG(current_stat.st_mode):
  1541. # Check if file has been modified
  1542. old_entry = old_paths[path]
  1543. if _check_file_matches(
  1544. repo.object_store,
  1545. full_path,
  1546. old_entry.sha,
  1547. old_entry.mode,
  1548. current_stat,
  1549. honor_filemode,
  1550. blob_normalizer,
  1551. path,
  1552. ):
  1553. # File has been modified, can't replace with directory
  1554. raise OSError(
  1555. f"Cannot replace modified file with directory: {path!r}"
  1556. )
  1557. # Process in two passes: deletions first, then additions/updates
  1558. # This handles case-only renames on case-insensitive filesystems correctly
  1559. paths_to_remove = []
  1560. paths_to_update = []
  1561. for path in sorted(all_paths):
  1562. if path in new_paths:
  1563. paths_to_update.append(path)
  1564. else:
  1565. paths_to_remove.append(path)
  1566. # First process removals
  1567. for path in paths_to_remove:
  1568. full_path = _tree_to_fs_path(repo_path, path)
  1569. # Determine current state - use cache if available
  1570. try:
  1571. current_stat = stat_cache[full_path]
  1572. except KeyError:
  1573. try:
  1574. current_stat = os.lstat(full_path)
  1575. except FileNotFoundError:
  1576. current_stat = None
  1577. _transition_to_absent(repo, path, full_path, current_stat, index)
  1578. # Then process additions/updates
  1579. for path in paths_to_update:
  1580. full_path = _tree_to_fs_path(repo_path, path)
  1581. # Determine current state - use cache if available
  1582. try:
  1583. current_stat = stat_cache[full_path]
  1584. except KeyError:
  1585. try:
  1586. current_stat = os.lstat(full_path)
  1587. except FileNotFoundError:
  1588. current_stat = None
  1589. new_entry = new_paths[path]
  1590. # Path should exist
  1591. if S_ISGITLINK(new_entry.mode):
  1592. _transition_to_submodule(
  1593. repo, path, full_path, current_stat, new_entry, index
  1594. )
  1595. else:
  1596. _transition_to_file(
  1597. repo.object_store,
  1598. path,
  1599. full_path,
  1600. current_stat,
  1601. new_entry,
  1602. index,
  1603. honor_filemode,
  1604. symlink_fn,
  1605. blob_normalizer,
  1606. )
  1607. # Handle force_remove_untracked
  1608. if force_remove_untracked:
  1609. for root, dirs, files in os.walk(repo_path):
  1610. if b".git" in os.fsencode(root):
  1611. continue
  1612. root_bytes = os.fsencode(root)
  1613. for file in files:
  1614. full_path = os.path.join(root_bytes, os.fsencode(file))
  1615. tree_path = os.path.relpath(full_path, repo_path)
  1616. if os.sep != "/":
  1617. tree_path = tree_path.replace(os.sep.encode(), b"/")
  1618. if tree_path not in new_paths:
  1619. _remove_file_with_readonly_handling(full_path)
  1620. if tree_path in index:
  1621. del index[tree_path]
  1622. # Clean up empty directories
  1623. for root, dirs, files in os.walk(repo_path, topdown=False):
  1624. root_bytes = os.fsencode(root)
  1625. if (
  1626. b".git" not in root_bytes
  1627. and root_bytes != repo_path
  1628. and not files
  1629. and not dirs
  1630. ):
  1631. try:
  1632. os.rmdir(root)
  1633. except OSError:
  1634. pass
  1635. index.write()
  1636. def get_unstaged_changes(
  1637. index: Index,
  1638. root_path: Union[str, bytes],
  1639. filter_blob_callback: Optional[Callable] = None,
  1640. ) -> Generator[bytes, None, None]:
  1641. """Walk through an index and check for differences against working tree.
  1642. Args:
  1643. index: index to check
  1644. root_path: path in which to find files
  1645. Returns: iterator over paths with unstaged changes
  1646. """
  1647. # For each entry in the index check the sha1 & ensure not staged
  1648. if not isinstance(root_path, bytes):
  1649. root_path = os.fsencode(root_path)
  1650. for tree_path, entry in index.iteritems():
  1651. full_path = _tree_to_fs_path(root_path, tree_path)
  1652. if isinstance(entry, ConflictedIndexEntry):
  1653. # Conflicted files are always unstaged
  1654. yield tree_path
  1655. continue
  1656. try:
  1657. st = os.lstat(full_path)
  1658. if stat.S_ISDIR(st.st_mode):
  1659. if _has_directory_changed(tree_path, entry):
  1660. yield tree_path
  1661. continue
  1662. if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
  1663. continue
  1664. blob = blob_from_path_and_stat(full_path, st)
  1665. if filter_blob_callback is not None:
  1666. blob = filter_blob_callback(blob, tree_path)
  1667. except FileNotFoundError:
  1668. # The file was removed, so we assume that counts as
  1669. # different from whatever file used to exist.
  1670. yield tree_path
  1671. else:
  1672. if blob.id != entry.sha:
  1673. yield tree_path
  1674. def _tree_to_fs_path(root_path: bytes, tree_path: bytes) -> bytes:
  1675. """Convert a git tree path to a file system path.
  1676. Args:
  1677. root_path: Root filesystem path
  1678. tree_path: Git tree path as bytes
  1679. Returns: File system path.
  1680. """
  1681. assert isinstance(tree_path, bytes)
  1682. if os_sep_bytes != b"/":
  1683. sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
  1684. else:
  1685. sep_corrected_path = tree_path
  1686. return os.path.join(root_path, sep_corrected_path)
  1687. def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
  1688. """Convert a file system path to a git tree path.
  1689. Args:
  1690. fs_path: File system path.
  1691. Returns: Git tree path as bytes
  1692. """
  1693. if not isinstance(fs_path, bytes):
  1694. fs_path_bytes = os.fsencode(fs_path)
  1695. else:
  1696. fs_path_bytes = fs_path
  1697. if os_sep_bytes != b"/":
  1698. tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
  1699. else:
  1700. tree_path = fs_path_bytes
  1701. return tree_path
  1702. def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
  1703. if os.path.exists(os.path.join(path, b".git")):
  1704. head = read_submodule_head(path)
  1705. if head is None:
  1706. return None
  1707. return index_entry_from_stat(st, head, mode=S_IFGITLINK)
  1708. return None
  1709. def index_entry_from_path(
  1710. path: bytes, object_store: Optional[ObjectContainer] = None
  1711. ) -> Optional[IndexEntry]:
  1712. """Create an index from a filesystem path.
  1713. This returns an index value for files, symlinks
  1714. and tree references. for directories and
  1715. non-existent files it returns None
  1716. Args:
  1717. path: Path to create an index entry for
  1718. object_store: Optional object store to
  1719. save new blobs in
  1720. Returns: An index entry; None for directories
  1721. """
  1722. assert isinstance(path, bytes)
  1723. st = os.lstat(path)
  1724. if stat.S_ISDIR(st.st_mode):
  1725. return index_entry_from_directory(st, path)
  1726. if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
  1727. blob = blob_from_path_and_stat(path, st)
  1728. if object_store is not None:
  1729. object_store.add_object(blob)
  1730. return index_entry_from_stat(st, blob.id)
  1731. return None
  1732. def iter_fresh_entries(
  1733. paths: Iterable[bytes],
  1734. root_path: bytes,
  1735. object_store: Optional[ObjectContainer] = None,
  1736. ) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
  1737. """Iterate over current versions of index entries on disk.
  1738. Args:
  1739. paths: Paths to iterate over
  1740. root_path: Root path to access from
  1741. object_store: Optional store to save new blobs in
  1742. Returns: Iterator over path, index_entry
  1743. """
  1744. for path in paths:
  1745. p = _tree_to_fs_path(root_path, path)
  1746. try:
  1747. entry = index_entry_from_path(p, object_store=object_store)
  1748. except (FileNotFoundError, IsADirectoryError):
  1749. entry = None
  1750. yield path, entry
  1751. def iter_fresh_objects(
  1752. paths: Iterable[bytes],
  1753. root_path: bytes,
  1754. include_deleted: bool = False,
  1755. object_store: Optional[ObjectContainer] = None,
  1756. ) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
  1757. """Iterate over versions of objects on disk referenced by index.
  1758. Args:
  1759. root_path: Root path to access from
  1760. include_deleted: Include deleted entries with sha and
  1761. mode set to None
  1762. object_store: Optional object store to report new items to
  1763. Returns: Iterator over path, sha, mode
  1764. """
  1765. for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
  1766. if entry is None:
  1767. if include_deleted:
  1768. yield path, None, None
  1769. else:
  1770. yield path, entry.sha, cleanup_mode(entry.mode)
  1771. def refresh_index(index: Index, root_path: bytes) -> None:
  1772. """Refresh the contents of an index.
  1773. This is the equivalent to running 'git commit -a'.
  1774. Args:
  1775. index: Index to update
  1776. root_path: Root filesystem path
  1777. """
  1778. for path, entry in iter_fresh_entries(index, root_path):
  1779. if entry:
  1780. index[path] = entry
  1781. class locked_index:
  1782. """Lock the index while making modifications.
  1783. Works as a context manager.
  1784. """
  1785. _file: "_GitFile"
  1786. def __init__(self, path: Union[bytes, str]) -> None:
  1787. self._path = path
  1788. def __enter__(self) -> Index:
  1789. self._file = GitFile(self._path, "wb")
  1790. self._index = Index(self._path)
  1791. return self._index
  1792. def __exit__(
  1793. self,
  1794. exc_type: Optional[type],
  1795. exc_value: Optional[BaseException],
  1796. traceback: Optional[types.TracebackType],
  1797. ) -> None:
  1798. if exc_type is not None:
  1799. self._file.abort()
  1800. return
  1801. try:
  1802. from typing import BinaryIO, cast
  1803. f = SHA1Writer(cast(BinaryIO, self._file))
  1804. write_index_dict(cast(BinaryIO, f), self._index._byname)
  1805. except BaseException:
  1806. self._file.abort()
  1807. else:
  1808. f.close()