| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116 |
- # index.py -- File parser/writer for the git index file
- # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Parser for the git index file format."""
- import errno
- import os
- import shutil
- import stat
- import struct
- import sys
- import types
- from collections.abc import (
- Callable,
- Generator,
- Iterable,
- Iterator,
- Mapping,
- Sequence,
- Set,
- )
- from dataclasses import dataclass
- from enum import Enum
- from typing import (
- IO,
- TYPE_CHECKING,
- Any,
- BinaryIO,
- )
- if TYPE_CHECKING:
- from .config import Config
- from .diff_tree import TreeChange
- from .file import _GitFile
- from .filters import FilterBlobNormalizer
- from .object_store import BaseObjectStore
- from .repo import Repo
- from .file import GitFile
- from .object_store import iter_tree_contents
- from .objects import (
- S_IFGITLINK,
- S_ISGITLINK,
- Blob,
- ObjectID,
- Tree,
- TreeEntry,
- hex_to_sha,
- sha_to_hex,
- )
- from .pack import ObjectContainer, SHA1Reader, SHA1Writer
- # Type alias for recursive tree structure used in commit_tree
- TreeDict = dict[bytes, "TreeDict | tuple[int, bytes]"]
- # 2-bit stage (during merge)
- FLAG_STAGEMASK = 0x3000
- FLAG_STAGESHIFT = 12
- FLAG_NAMEMASK = 0x0FFF
- # assume-valid
- FLAG_VALID = 0x8000
- # extended flag (must be zero in version 2)
- FLAG_EXTENDED = 0x4000
- # used by sparse checkout
- EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
- # used by "git add -N"
- EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
- DEFAULT_VERSION = 2
- # Index extension signatures
- TREE_EXTENSION = b"TREE"
- REUC_EXTENSION = b"REUC"
- UNTR_EXTENSION = b"UNTR"
- EOIE_EXTENSION = b"EOIE"
- IEOT_EXTENSION = b"IEOT"
- SDIR_EXTENSION = b"sdir" # Sparse directory extension
- def _encode_varint(value: int) -> bytes:
- """Encode an integer using variable-width encoding.
- Same format as used for OFS_DELTA pack entries and index v4 path compression.
- Uses 7 bits per byte, with the high bit indicating continuation.
- Args:
- value: Integer to encode
- Returns:
- Encoded bytes
- """
- if value == 0:
- return b"\x00"
- result = []
- while value > 0:
- byte = value & 0x7F # Take lower 7 bits
- value >>= 7
- if value > 0:
- byte |= 0x80 # Set continuation bit
- result.append(byte)
- return bytes(result)
- def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
- """Decode a variable-width encoded integer.
- Args:
- data: Bytes to decode from
- offset: Starting offset in data
- Returns:
- tuple of (decoded_value, new_offset)
- """
- value = 0
- shift = 0
- pos = offset
- while pos < len(data):
- byte = data[pos]
- pos += 1
- value |= (byte & 0x7F) << shift
- shift += 7
- if not (byte & 0x80): # No continuation bit
- break
- return value, pos
- def _compress_path(path: bytes, previous_path: bytes) -> bytes:
- """Compress a path relative to the previous path for index version 4.
- Args:
- path: Path to compress
- previous_path: Previous path for comparison
- Returns:
- Compressed path data (varint prefix_len + suffix)
- """
- # Find the common prefix length
- common_len = 0
- min_len = min(len(path), len(previous_path))
- for i in range(min_len):
- if path[i] == previous_path[i]:
- common_len += 1
- else:
- break
- # The number of bytes to remove from the end of previous_path
- # to get the common prefix
- remove_len = len(previous_path) - common_len
- # The suffix to append
- suffix = path[common_len:]
- # Encode: varint(remove_len) + suffix + NUL
- return _encode_varint(remove_len) + suffix + b"\x00"
- def _decompress_path(
- data: bytes, offset: int, previous_path: bytes
- ) -> tuple[bytes, int]:
- """Decompress a path from index version 4 compressed format.
- Args:
- data: Raw data containing compressed path
- offset: Starting offset in data
- previous_path: Previous path for decompression
- Returns:
- tuple of (decompressed_path, new_offset)
- """
- # Decode the number of bytes to remove from previous path
- remove_len, new_offset = _decode_varint(data, offset)
- # Find the NUL terminator for the suffix
- suffix_start = new_offset
- suffix_end = suffix_start
- while suffix_end < len(data) and data[suffix_end] != 0:
- suffix_end += 1
- if suffix_end >= len(data):
- raise ValueError("Unterminated path suffix in compressed entry")
- suffix = data[suffix_start:suffix_end]
- new_offset = suffix_end + 1 # Skip the NUL terminator
- # Reconstruct the path
- if remove_len > len(previous_path):
- raise ValueError(
- f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
- )
- prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
- path = prefix + suffix
- return path, new_offset
- def _decompress_path_from_stream(
- f: BinaryIO, previous_path: bytes
- ) -> tuple[bytes, int]:
- """Decompress a path from index version 4 compressed format, reading from stream.
- Args:
- f: File-like object to read from
- previous_path: Previous path for decompression
- Returns:
- tuple of (decompressed_path, bytes_consumed)
- """
- # Decode the varint for remove_len by reading byte by byte
- remove_len = 0
- shift = 0
- bytes_consumed = 0
- while True:
- byte_data = f.read(1)
- if not byte_data:
- raise ValueError("Unexpected end of file while reading varint")
- byte = byte_data[0]
- bytes_consumed += 1
- remove_len |= (byte & 0x7F) << shift
- shift += 7
- if not (byte & 0x80): # No continuation bit
- break
- # Read the suffix until NUL terminator
- suffix = b""
- while True:
- byte_data = f.read(1)
- if not byte_data:
- raise ValueError("Unexpected end of file while reading path suffix")
- byte = byte_data[0]
- bytes_consumed += 1
- if byte == 0: # NUL terminator
- break
- suffix += bytes([byte])
- # Reconstruct the path
- if remove_len > len(previous_path):
- raise ValueError(
- f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
- )
- prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
- path = prefix + suffix
- return path, bytes_consumed
- class Stage(Enum):
- """Represents the stage of an index entry during merge conflicts."""
- NORMAL = 0
- MERGE_CONFLICT_ANCESTOR = 1
- MERGE_CONFLICT_THIS = 2
- MERGE_CONFLICT_OTHER = 3
- @dataclass
- class SerializedIndexEntry:
- """Represents a serialized index entry as stored in the index file.
- This dataclass holds the raw data for an index entry before it's
- parsed into the more user-friendly IndexEntry format.
- """
- name: bytes
- ctime: int | float | tuple[int, int]
- mtime: int | float | tuple[int, int]
- dev: int
- ino: int
- mode: int
- uid: int
- gid: int
- size: int
- sha: bytes
- flags: int
- extended_flags: int
- def stage(self) -> Stage:
- """Extract the stage from the flags field.
- Returns:
- Stage enum value indicating merge conflict state
- """
- return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
- def is_sparse_dir(self) -> bool:
- """Check if this entry represents a sparse directory.
- A sparse directory entry is a collapsed representation of an entire
- directory tree in a sparse index. It has:
- - Directory mode (0o040000)
- - SKIP_WORKTREE flag set
- - Path ending with '/'
- - SHA pointing to a tree object
- Returns:
- True if entry is a sparse directory entry
- """
- return (
- stat.S_ISDIR(self.mode)
- and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
- and self.name.endswith(b"/")
- )
- @dataclass
- class IndexExtension:
- """Base class for index extensions."""
- signature: bytes
- data: bytes
- @classmethod
- def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
- """Create an extension from raw data.
- Args:
- signature: 4-byte extension signature
- data: Extension data
- Returns:
- Parsed extension object
- """
- if signature == TREE_EXTENSION:
- return TreeExtension.from_bytes(data)
- elif signature == REUC_EXTENSION:
- return ResolveUndoExtension.from_bytes(data)
- elif signature == UNTR_EXTENSION:
- return UntrackedExtension.from_bytes(data)
- elif signature == SDIR_EXTENSION:
- return SparseDirExtension.from_bytes(data)
- else:
- # Unknown extension - just store raw data
- return cls(signature, data)
- def to_bytes(self) -> bytes:
- """Serialize extension to bytes."""
- return self.data
- class TreeExtension(IndexExtension):
- """Tree cache extension."""
- def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
- """Initialize TreeExtension.
- Args:
- entries: List of tree cache entries (path, sha, flags)
- """
- self.entries = entries
- super().__init__(TREE_EXTENSION, b"")
- @classmethod
- def from_bytes(cls, data: bytes) -> "TreeExtension":
- """Parse TreeExtension from bytes.
- Args:
- data: Raw bytes to parse
- Returns:
- TreeExtension instance
- """
- # TODO: Implement tree cache parsing
- return cls([])
- def to_bytes(self) -> bytes:
- """Serialize TreeExtension to bytes.
- Returns:
- Serialized extension data
- """
- # TODO: Implement tree cache serialization
- return b""
- class ResolveUndoExtension(IndexExtension):
- """Resolve undo extension for recording merge conflicts."""
- def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
- """Initialize ResolveUndoExtension.
- Args:
- entries: List of (path, stages) where stages is a list of (stage, sha) tuples
- """
- self.entries = entries
- super().__init__(REUC_EXTENSION, b"")
- @classmethod
- def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
- """Parse ResolveUndoExtension from bytes.
- Args:
- data: Raw bytes to parse
- Returns:
- ResolveUndoExtension instance
- """
- # TODO: Implement resolve undo parsing
- return cls([])
- def to_bytes(self) -> bytes:
- """Serialize ResolveUndoExtension to bytes.
- Returns:
- Serialized extension data
- """
- # TODO: Implement resolve undo serialization
- return b""
- class UntrackedExtension(IndexExtension):
- """Untracked cache extension."""
- def __init__(self, data: bytes) -> None:
- """Initialize UntrackedExtension.
- Args:
- data: Raw untracked cache data
- """
- super().__init__(UNTR_EXTENSION, data)
- @classmethod
- def from_bytes(cls, data: bytes) -> "UntrackedExtension":
- """Parse UntrackedExtension from bytes.
- Args:
- data: Raw bytes to parse
- Returns:
- UntrackedExtension instance
- """
- return cls(data)
- class SparseDirExtension(IndexExtension):
- """Sparse directory extension.
- This extension indicates that the index contains sparse directory entries.
- Tools that don't understand sparse index should avoid interacting with
- the index when this extension is present.
- The extension data is empty - its presence is the signal.
- """
- def __init__(self) -> None:
- """Initialize SparseDirExtension."""
- super().__init__(SDIR_EXTENSION, b"")
- @classmethod
- def from_bytes(cls, data: bytes) -> "SparseDirExtension":
- """Parse SparseDirExtension from bytes.
- Args:
- data: Raw bytes to parse (should be empty)
- Returns:
- SparseDirExtension instance
- """
- return cls()
- def to_bytes(self) -> bytes:
- """Serialize SparseDirExtension to bytes.
- Returns:
- Empty bytes (extension presence is the signal)
- """
- return b""
- @dataclass
- class IndexEntry:
- """Represents an entry in the Git index.
- This is a higher-level representation of an index entry that includes
- parsed data and convenience methods.
- """
- ctime: int | float | tuple[int, int]
- mtime: int | float | tuple[int, int]
- dev: int
- ino: int
- mode: int
- uid: int
- gid: int
- size: int
- sha: bytes
- flags: int = 0
- extended_flags: int = 0
- @classmethod
- def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
- """Create an IndexEntry from a SerializedIndexEntry.
- Args:
- serialized: SerializedIndexEntry to convert
- Returns:
- New IndexEntry instance
- """
- return cls(
- ctime=serialized.ctime,
- mtime=serialized.mtime,
- dev=serialized.dev,
- ino=serialized.ino,
- mode=serialized.mode,
- uid=serialized.uid,
- gid=serialized.gid,
- size=serialized.size,
- sha=serialized.sha,
- flags=serialized.flags,
- extended_flags=serialized.extended_flags,
- )
- def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
- """Serialize this entry with a given name and stage.
- Args:
- name: Path name for the entry
- stage: Merge conflict stage
- Returns:
- SerializedIndexEntry ready for writing to disk
- """
- # Clear out any existing stage bits, then set them from the Stage.
- new_flags = self.flags & ~FLAG_STAGEMASK
- new_flags |= stage.value << FLAG_STAGESHIFT
- return SerializedIndexEntry(
- name=name,
- ctime=self.ctime,
- mtime=self.mtime,
- dev=self.dev,
- ino=self.ino,
- mode=self.mode,
- uid=self.uid,
- gid=self.gid,
- size=self.size,
- sha=self.sha,
- flags=new_flags,
- extended_flags=self.extended_flags,
- )
- def stage(self) -> Stage:
- """Get the merge conflict stage of this entry.
- Returns:
- Stage enum value
- """
- return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
- @property
- def skip_worktree(self) -> bool:
- """Return True if the skip-worktree bit is set in extended_flags."""
- return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
- def set_skip_worktree(self, skip: bool = True) -> None:
- """Helper method to set or clear the skip-worktree bit in extended_flags.
- Also sets FLAG_EXTENDED in self.flags if needed.
- """
- if skip:
- # Turn on the skip-worktree bit
- self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
- # Also ensure the main 'extended' bit is set in flags
- self.flags |= FLAG_EXTENDED
- else:
- # Turn off the skip-worktree bit
- self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
- # Optionally unset the main extended bit if no extended flags remain
- if self.extended_flags == 0:
- self.flags &= ~FLAG_EXTENDED
- def is_sparse_dir(self, name: bytes) -> bool:
- """Check if this entry represents a sparse directory.
- A sparse directory entry is a collapsed representation of an entire
- directory tree in a sparse index. It has:
- - Directory mode (0o040000)
- - SKIP_WORKTREE flag set
- - Path ending with '/'
- - SHA pointing to a tree object
- Args:
- name: The path name for this entry (IndexEntry doesn't store name)
- Returns:
- True if entry is a sparse directory entry
- """
- return (
- stat.S_ISDIR(self.mode)
- and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
- and name.endswith(b"/")
- )
- class ConflictedIndexEntry:
- """Index entry that represents a conflict."""
- ancestor: IndexEntry | None
- this: IndexEntry | None
- other: IndexEntry | None
- def __init__(
- self,
- ancestor: IndexEntry | None = None,
- this: IndexEntry | None = None,
- other: IndexEntry | None = None,
- ) -> None:
- """Initialize ConflictedIndexEntry.
- Args:
- ancestor: The common ancestor entry
- this: The current branch entry
- other: The other branch entry
- """
- self.ancestor = ancestor
- self.this = this
- self.other = other
- class UnmergedEntries(Exception):
- """Unmerged entries exist in the index."""
- def pathsplit(path: bytes) -> tuple[bytes, bytes]:
- """Split a /-delimited path into a directory part and a basename.
- Args:
- path: The path to split.
- Returns:
- Tuple with directory name and basename
- """
- try:
- (dirname, basename) = path.rsplit(b"/", 1)
- except ValueError:
- return (b"", path)
- else:
- return (dirname, basename)
- def pathjoin(*args: bytes) -> bytes:
- """Join a /-delimited path."""
- return b"/".join([p for p in args if p])
- def read_cache_time(f: BinaryIO) -> tuple[int, int]:
- """Read a cache time.
- Args:
- f: File-like object to read from
- Returns:
- Tuple with seconds and nanoseconds
- """
- return struct.unpack(">LL", f.read(8))
- def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None:
- """Write a cache time.
- Args:
- f: File-like object to write to
- t: Time to write (as int, float or tuple with secs and nsecs)
- """
- if isinstance(t, int):
- t = (t, 0)
- elif isinstance(t, float):
- (secs, nsecs) = divmod(t, 1.0)
- t = (int(secs), int(nsecs * 1000000000))
- elif not isinstance(t, tuple):
- raise TypeError(t)
- f.write(struct.pack(">LL", *t))
- def read_cache_entry(
- f: BinaryIO, version: int, previous_path: bytes = b""
- ) -> SerializedIndexEntry:
- """Read an entry from a cache file.
- Args:
- f: File-like object to read from
- version: Index version
- previous_path: Previous entry's path (for version 4 compression)
- """
- beginoffset = f.tell()
- ctime = read_cache_time(f)
- mtime = read_cache_time(f)
- (
- dev,
- ino,
- mode,
- uid,
- gid,
- size,
- sha,
- flags,
- ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
- if flags & FLAG_EXTENDED:
- if version < 3:
- raise AssertionError("extended flag set in index with version < 3")
- (extended_flags,) = struct.unpack(">H", f.read(2))
- else:
- extended_flags = 0
- if version >= 4:
- # Version 4: paths are always compressed (name_len should be 0)
- name, _consumed = _decompress_path_from_stream(f, previous_path)
- else:
- # Versions < 4: regular name reading
- name = f.read(flags & FLAG_NAMEMASK)
- # Padding:
- if version < 4:
- real_size = (f.tell() - beginoffset + 8) & ~7
- f.read((beginoffset + real_size) - f.tell())
- return SerializedIndexEntry(
- name,
- ctime,
- mtime,
- dev,
- ino,
- mode,
- uid,
- gid,
- size,
- sha_to_hex(sha),
- flags & ~FLAG_NAMEMASK,
- extended_flags,
- )
- def write_cache_entry(
- f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
- ) -> None:
- """Write an index entry to a file.
- Args:
- f: File object
- entry: IndexEntry to write
- version: Index format version
- previous_path: Previous entry's path (for version 4 compression)
- """
- beginoffset = f.tell()
- write_cache_time(f, entry.ctime)
- write_cache_time(f, entry.mtime)
- if version >= 4:
- # Version 4: use compression but set name_len to actual filename length
- # This matches how C Git implements index v4 flags
- compressed_path = _compress_path(entry.name, previous_path)
- flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
- else:
- # Versions < 4: include actual name length
- flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
- if entry.extended_flags:
- flags |= FLAG_EXTENDED
- if flags & FLAG_EXTENDED and version is not None and version < 3:
- raise AssertionError("unable to use extended flags in version < 3")
- f.write(
- struct.pack(
- b">LLLLLL20sH",
- entry.dev & 0xFFFFFFFF,
- entry.ino & 0xFFFFFFFF,
- entry.mode,
- entry.uid,
- entry.gid,
- entry.size,
- hex_to_sha(entry.sha),
- flags,
- )
- )
- if flags & FLAG_EXTENDED:
- f.write(struct.pack(b">H", entry.extended_flags))
- if version >= 4:
- # Version 4: always write compressed path
- f.write(compressed_path)
- else:
- # Versions < 4: write regular path and padding
- f.write(entry.name)
- real_size = (f.tell() - beginoffset + 8) & ~7
- f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
- class UnsupportedIndexFormat(Exception):
- """An unsupported index format was encountered."""
- def __init__(self, version: int) -> None:
- """Initialize UnsupportedIndexFormat exception.
- Args:
- version: The unsupported index format version
- """
- self.index_format_version = version
- def read_index_header(f: BinaryIO) -> tuple[int, int]:
- """Read an index header from a file.
- Returns:
- tuple of (version, num_entries)
- """
- header = f.read(4)
- if header != b"DIRC":
- raise AssertionError(f"Invalid index file header: {header!r}")
- (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
- if version not in (1, 2, 3, 4):
- raise UnsupportedIndexFormat(version)
- return version, num_entries
- def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
- """Write an index extension.
- Args:
- f: File-like object to write to
- extension: Extension to write
- """
- data = extension.to_bytes()
- f.write(extension.signature)
- f.write(struct.pack(">I", len(data)))
- f.write(data)
- def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
- """Read an index file, yielding the individual entries."""
- version, num_entries = read_index_header(f)
- previous_path = b""
- for i in range(num_entries):
- entry = read_cache_entry(f, version, previous_path)
- previous_path = entry.name
- yield entry
- def read_index_dict_with_version(
- f: BinaryIO,
- ) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]:
- """Read an index file and return it as a dictionary along with the version.
- Returns:
- tuple of (entries_dict, version, extensions)
- """
- version, num_entries = read_index_header(f)
- ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
- previous_path = b""
- for i in range(num_entries):
- entry = read_cache_entry(f, version, previous_path)
- previous_path = entry.name
- stage = entry.stage()
- if stage == Stage.NORMAL:
- ret[entry.name] = IndexEntry.from_serialized(entry)
- else:
- existing = ret.setdefault(entry.name, ConflictedIndexEntry())
- if isinstance(existing, IndexEntry):
- raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
- if stage == Stage.MERGE_CONFLICT_ANCESTOR:
- existing.ancestor = IndexEntry.from_serialized(entry)
- elif stage == Stage.MERGE_CONFLICT_THIS:
- existing.this = IndexEntry.from_serialized(entry)
- elif stage == Stage.MERGE_CONFLICT_OTHER:
- existing.other = IndexEntry.from_serialized(entry)
- # Read extensions
- extensions = []
- while True:
- # Check if we're at the end (20 bytes before EOF for SHA checksum)
- current_pos = f.tell()
- f.seek(0, 2) # EOF
- eof_pos = f.tell()
- f.seek(current_pos)
- if current_pos >= eof_pos - 20:
- break
- # Try to read extension signature
- signature = f.read(4)
- if len(signature) < 4:
- break
- # Check if it's a valid extension signature (4 uppercase letters)
- if not all(65 <= b <= 90 for b in signature):
- # Not an extension, seek back
- f.seek(-4, 1)
- break
- # Read extension size
- size_data = f.read(4)
- if len(size_data) < 4:
- break
- size = struct.unpack(">I", size_data)[0]
- # Read extension data
- data = f.read(size)
- if len(data) < size:
- break
- extension = IndexExtension.from_raw(signature, data)
- extensions.append(extension)
- return ret, version, extensions
- def read_index_dict(
- f: BinaryIO,
- ) -> dict[bytes, IndexEntry | ConflictedIndexEntry]:
- """Read an index file and return it as a dictionary.
- Dict Key is tuple of path and stage number, as
- path alone is not unique
- Args:
- f: File object to read fromls.
- """
- ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
- for entry in read_index(f):
- stage = entry.stage()
- if stage == Stage.NORMAL:
- ret[entry.name] = IndexEntry.from_serialized(entry)
- else:
- existing = ret.setdefault(entry.name, ConflictedIndexEntry())
- if isinstance(existing, IndexEntry):
- raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
- if stage == Stage.MERGE_CONFLICT_ANCESTOR:
- existing.ancestor = IndexEntry.from_serialized(entry)
- elif stage == Stage.MERGE_CONFLICT_THIS:
- existing.this = IndexEntry.from_serialized(entry)
- elif stage == Stage.MERGE_CONFLICT_OTHER:
- existing.other = IndexEntry.from_serialized(entry)
- return ret
- def write_index(
- f: IO[bytes],
- entries: Sequence[SerializedIndexEntry],
- version: int | None = None,
- extensions: Sequence[IndexExtension] | None = None,
- ) -> None:
- """Write an index file.
- Args:
- f: File-like object to write to
- version: Version number to write
- entries: Iterable over the entries to write
- extensions: Optional list of extensions to write
- """
- if version is None:
- version = DEFAULT_VERSION
- # STEP 1: check if any extended_flags are set
- uses_extended_flags = any(e.extended_flags != 0 for e in entries)
- if uses_extended_flags and version < 3:
- # Force or bump the version to 3
- version = 3
- # The rest is unchanged, but you might insert a final check:
- if version < 3:
- # Double-check no extended flags appear
- for e in entries:
- if e.extended_flags != 0:
- raise AssertionError("Attempt to use extended flags in index < v3")
- # Proceed with the existing code to write the header and entries.
- f.write(b"DIRC")
- f.write(struct.pack(b">LL", version, len(entries)))
- previous_path = b""
- for entry in entries:
- write_cache_entry(f, entry, version=version, previous_path=previous_path)
- previous_path = entry.name
- # Write extensions
- if extensions:
- for extension in extensions:
- write_index_extension(f, extension)
- def write_index_dict(
- f: IO[bytes],
- entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry],
- version: int | None = None,
- extensions: Sequence[IndexExtension] | None = None,
- ) -> None:
- """Write an index file based on the contents of a dictionary.
- being careful to sort by path and then by stage.
- """
- entries_list = []
- for key in sorted(entries):
- value = entries[key]
- if isinstance(value, ConflictedIndexEntry):
- if value.ancestor is not None:
- entries_list.append(
- value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
- )
- if value.this is not None:
- entries_list.append(
- value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
- )
- if value.other is not None:
- entries_list.append(
- value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
- )
- else:
- entries_list.append(value.serialize(key, Stage.NORMAL))
- write_index(f, entries_list, version=version, extensions=extensions)
- def cleanup_mode(mode: int) -> int:
- """Cleanup a mode value.
- This will return a mode that can be stored in a tree object.
- Args:
- mode: Mode to clean up.
- Returns:
- mode
- """
- if stat.S_ISLNK(mode):
- return stat.S_IFLNK
- elif stat.S_ISDIR(mode):
- return stat.S_IFDIR
- elif S_ISGITLINK(mode):
- return S_IFGITLINK
- ret = stat.S_IFREG | 0o644
- if mode & 0o100:
- ret |= 0o111
- return ret
- class Index:
- """A Git Index file."""
- _byname: dict[bytes, IndexEntry | ConflictedIndexEntry]
- def __init__(
- self,
- filename: bytes | str | os.PathLike[str],
- read: bool = True,
- skip_hash: bool = False,
- version: int | None = None,
- *,
- file_mode: int | None = None,
- ) -> None:
- """Create an index object associated with the given filename.
- Args:
- filename: Path to the index file
- read: Whether to initialize the index from the given file, should it exist.
- skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
- version: Index format version to use (None = auto-detect from file or use default)
- file_mode: Optional file permission mask for shared repository
- """
- self._filename = os.fspath(filename)
- # TODO(jelmer): Store the version returned by read_index
- self._version = version
- self._skip_hash = skip_hash
- self._file_mode = file_mode
- self._extensions: list[IndexExtension] = []
- self.clear()
- if read:
- self.read()
- @property
- def path(self) -> bytes | str:
- """Get the path to the index file.
- Returns:
- Path to the index file
- """
- return self._filename
- def __repr__(self) -> str:
- """Return string representation of Index."""
- return f"{self.__class__.__name__}({self._filename!r})"
- def write(self) -> None:
- """Write current contents of index to disk."""
- mask = self._file_mode if self._file_mode is not None else 0o644
- f = GitFile(self._filename, "wb", mask=mask)
- try:
- # Filter out extensions with no meaningful data
- meaningful_extensions = []
- for ext in self._extensions:
- # Skip extensions that have empty data
- ext_data = ext.to_bytes()
- if ext_data:
- meaningful_extensions.append(ext)
- if self._skip_hash:
- # When skipHash is enabled, write the index without computing SHA1
- write_index_dict(
- f,
- self._byname,
- version=self._version,
- extensions=meaningful_extensions,
- )
- # Write 20 zero bytes instead of SHA1
- f.write(b"\x00" * 20)
- f.close()
- else:
- sha1_writer = SHA1Writer(f)
- write_index_dict(
- sha1_writer,
- self._byname,
- version=self._version,
- extensions=meaningful_extensions,
- )
- sha1_writer.close()
- except:
- f.close()
- raise
- def read(self) -> None:
- """Read current contents of index from disk."""
- if not os.path.exists(self._filename):
- return
- f = GitFile(self._filename, "rb")
- try:
- sha1_reader = SHA1Reader(f)
- entries, version, extensions = read_index_dict_with_version(sha1_reader)
- self._version = version
- self._extensions = extensions
- self.update(entries)
- # Extensions have already been read by read_index_dict_with_version
- sha1_reader.check_sha(allow_empty=True)
- finally:
- f.close()
- def __len__(self) -> int:
- """Number of entries in this index file."""
- return len(self._byname)
- def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry:
- """Retrieve entry by relative path and stage.
- Returns: Either a IndexEntry or a ConflictedIndexEntry
- Raises KeyError: if the entry does not exist
- """
- return self._byname[key]
- def __iter__(self) -> Iterator[bytes]:
- """Iterate over the paths and stages in this index."""
- return iter(self._byname)
- def __contains__(self, key: bytes) -> bool:
- """Check if a path exists in the index."""
- return key in self._byname
- def get_sha1(self, path: bytes) -> bytes:
- """Return the (git object) SHA1 for the object at a path."""
- value = self[path]
- if isinstance(value, ConflictedIndexEntry):
- raise UnmergedEntries
- return value.sha
- def get_mode(self, path: bytes) -> int:
- """Return the POSIX file mode for the object at a path."""
- value = self[path]
- if isinstance(value, ConflictedIndexEntry):
- raise UnmergedEntries
- return value.mode
- def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
- """Iterate over path, sha, mode tuples for use with commit_tree."""
- for path in self:
- entry = self[path]
- if isinstance(entry, ConflictedIndexEntry):
- raise UnmergedEntries
- yield path, entry.sha, cleanup_mode(entry.mode)
- def has_conflicts(self) -> bool:
- """Check if the index contains any conflicted entries.
- Returns:
- True if any entries are conflicted, False otherwise
- """
- for value in self._byname.values():
- if isinstance(value, ConflictedIndexEntry):
- return True
- return False
- def clear(self) -> None:
- """Remove all contents from this index."""
- self._byname = {}
- def __setitem__(
- self, name: bytes, value: IndexEntry | ConflictedIndexEntry
- ) -> None:
- """Set an entry in the index."""
- assert isinstance(name, bytes)
- self._byname[name] = value
- def __delitem__(self, name: bytes) -> None:
- """Delete an entry from the index."""
- del self._byname[name]
- def iteritems(
- self,
- ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
- """Iterate over (path, entry) pairs in the index.
- Returns:
- Iterator of (path, entry) tuples
- """
- return iter(self._byname.items())
- def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
- """Get an iterator over (path, entry) pairs.
- Returns:
- Iterator of (path, entry) tuples
- """
- return iter(self._byname.items())
- def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None:
- """Update the index with multiple entries.
- Args:
- entries: Dictionary mapping paths to index entries
- """
- for key, value in entries.items():
- self[key] = value
- def paths(self) -> Generator[bytes, None, None]:
- """Generate all paths in the index.
- Yields:
- Path names as bytes
- """
- yield from self._byname.keys()
- def changes_from_tree(
- self,
- object_store: ObjectContainer,
- tree: ObjectID,
- want_unchanged: bool = False,
- ) -> Generator[
- tuple[
- tuple[bytes | None, bytes | None],
- tuple[int | None, int | None],
- tuple[bytes | None, bytes | None],
- ],
- None,
- None,
- ]:
- """Find the differences between the contents of this index and a tree.
- Args:
- object_store: Object store to use for retrieving tree contents
- tree: SHA1 of the root tree
- want_unchanged: Whether unchanged files should be reported
- Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
- newmode), (oldsha, newsha)
- """
- def lookup_entry(path: bytes) -> tuple[bytes, int]:
- entry = self[path]
- if hasattr(entry, "sha") and hasattr(entry, "mode"):
- return entry.sha, cleanup_mode(entry.mode)
- else:
- # Handle ConflictedIndexEntry case
- return b"", 0
- yield from changes_from_tree(
- self.paths(),
- lookup_entry,
- object_store,
- tree,
- want_unchanged=want_unchanged,
- )
- def commit(self, object_store: ObjectContainer) -> bytes:
- """Create a new tree from an index.
- Args:
- object_store: Object store to save the tree in
- Returns:
- Root tree SHA
- """
- return commit_tree(object_store, self.iterobjects())
- def is_sparse(self) -> bool:
- """Check if this index contains sparse directory entries.
- Returns:
- True if any sparse directory extension is present
- """
- return any(isinstance(ext, SparseDirExtension) for ext in self._extensions)
- def ensure_full_index(self, object_store: "BaseObjectStore") -> None:
- """Expand all sparse directory entries into full file entries.
- This converts a sparse index into a full index by recursively
- expanding any sparse directory entries into their constituent files.
- Args:
- object_store: Object store to read tree objects from
- Raises:
- KeyError: If a tree object referenced by a sparse dir entry doesn't exist
- """
- if not self.is_sparse():
- return
- # Find all sparse directory entries
- sparse_dirs = []
- for path, entry in list(self._byname.items()):
- if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path):
- sparse_dirs.append((path, entry))
- # Expand each sparse directory
- for path, entry in sparse_dirs:
- # Remove the sparse directory entry
- del self._byname[path]
- # Get the tree object
- tree = object_store[entry.sha]
- if not isinstance(tree, Tree):
- raise ValueError(f"Sparse directory {path!r} points to non-tree object")
- # Recursively add all entries from the tree
- self._expand_tree(path.rstrip(b"/"), tree, object_store, entry)
- # Remove the sparse directory extension
- self._extensions = [
- ext for ext in self._extensions if not isinstance(ext, SparseDirExtension)
- ]
- def _expand_tree(
- self,
- prefix: bytes,
- tree: Tree,
- object_store: "BaseObjectStore",
- template_entry: IndexEntry,
- ) -> None:
- """Recursively expand a tree into index entries.
- Args:
- prefix: Path prefix for entries (without trailing slash)
- tree: Tree object to expand
- object_store: Object store to read nested trees from
- template_entry: Template entry to copy metadata from
- """
- for name, mode, sha in tree.items():
- if prefix:
- full_path = prefix + b"/" + name
- else:
- full_path = name
- if stat.S_ISDIR(mode):
- # Recursively expand subdirectories
- subtree = object_store[sha]
- if not isinstance(subtree, Tree):
- raise ValueError(
- f"Directory entry {full_path!r} points to non-tree object"
- )
- self._expand_tree(full_path, subtree, object_store, template_entry)
- else:
- # Create an index entry for this file
- # Use the template entry for metadata but with the file's sha and mode
- new_entry = IndexEntry(
- ctime=template_entry.ctime,
- mtime=template_entry.mtime,
- dev=template_entry.dev,
- ino=template_entry.ino,
- mode=mode,
- uid=template_entry.uid,
- gid=template_entry.gid,
- size=0, # Size is unknown from tree
- sha=sha,
- flags=0,
- extended_flags=0, # Don't copy skip-worktree flag
- )
- self._byname[full_path] = new_entry
- def convert_to_sparse(
- self,
- object_store: "BaseObjectStore",
- tree_sha: bytes,
- sparse_dirs: Set[bytes],
- ) -> None:
- """Convert full index entries to sparse directory entries.
- This collapses directories that are entirely outside the sparse
- checkout cone into single sparse directory entries.
- Args:
- object_store: Object store to read tree objects
- tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on
- sparse_dirs: Set of directory paths (with trailing /) to collapse
- Raises:
- KeyError: If tree_sha or a subdirectory doesn't exist
- """
- if not sparse_dirs:
- return
- # Get the base tree
- tree = object_store[tree_sha]
- if not isinstance(tree, Tree):
- raise ValueError(f"tree_sha {tree_sha!r} is not a tree object")
- # For each sparse directory, find its tree SHA and create sparse entry
- for dir_path in sparse_dirs:
- dir_path_stripped = dir_path.rstrip(b"/")
- # Find the tree SHA for this directory
- subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store)
- if subtree_sha is None:
- # Directory doesn't exist in tree, skip it
- continue
- # Remove all entries under this directory
- entries_to_remove = [
- path
- for path in self._byname
- if path.startswith(dir_path) or path == dir_path_stripped
- ]
- for path in entries_to_remove:
- del self._byname[path]
- # Create a sparse directory entry
- # Use minimal metadata since it's not a real file
- sparse_entry = IndexEntry(
- ctime=0,
- mtime=0,
- dev=0,
- ino=0,
- mode=stat.S_IFDIR,
- uid=0,
- gid=0,
- size=0,
- sha=subtree_sha,
- flags=0,
- extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
- )
- self._byname[dir_path] = sparse_entry
- # Add sparse directory extension if not present
- if not self.is_sparse():
- self._extensions.append(SparseDirExtension())
- def _find_subtree_sha(
- self,
- tree: Tree,
- path: bytes,
- object_store: "BaseObjectStore",
- ) -> bytes | None:
- """Find the SHA of a subtree at a given path.
- Args:
- tree: Root tree object to search in
- path: Path to the subdirectory (no trailing slash)
- object_store: Object store to read nested trees from
- Returns:
- SHA of the subtree, or None if path doesn't exist
- """
- if not path:
- return tree.id
- parts = path.split(b"/")
- current_tree = tree
- for part in parts:
- # Look for this part in the current tree
- try:
- mode, sha = current_tree[part]
- except KeyError:
- return None
- if not stat.S_ISDIR(mode):
- # Path component is a file, not a directory
- return None
- # Load the next tree
- obj = object_store[sha]
- if not isinstance(obj, Tree):
- return None
- current_tree = obj
- return current_tree.id
- def commit_tree(
- object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
- ) -> bytes:
- """Commit a new tree.
- Args:
- object_store: Object store to add trees to
- blobs: Iterable over blob path, sha, mode entries
- Returns:
- SHA1 of the created tree.
- """
- trees: dict[bytes, TreeDict] = {b"": {}}
- def add_tree(path: bytes) -> TreeDict:
- if path in trees:
- return trees[path]
- dirname, basename = pathsplit(path)
- t = add_tree(dirname)
- assert isinstance(basename, bytes)
- newtree: TreeDict = {}
- t[basename] = newtree
- trees[path] = newtree
- return newtree
- for path, sha, mode in blobs:
- tree_path, basename = pathsplit(path)
- tree = add_tree(tree_path)
- tree[basename] = (mode, sha)
- def build_tree(path: bytes) -> bytes:
- tree = Tree()
- for basename, entry in trees[path].items():
- if isinstance(entry, dict):
- mode = stat.S_IFDIR
- sha = build_tree(pathjoin(path, basename))
- else:
- (mode, sha) = entry
- tree.add(basename, mode, sha)
- object_store.add_object(tree)
- return tree.id
- return build_tree(b"")
- def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
- """Create a new tree from an index.
- Args:
- object_store: Object store to save the tree in
- index: Index file
- Note: This function is deprecated, use index.commit() instead.
- Returns: Root tree sha.
- """
- return commit_tree(object_store, index.iterobjects())
- def changes_from_tree(
- names: Iterable[bytes],
- lookup_entry: Callable[[bytes], tuple[bytes, int]],
- object_store: ObjectContainer,
- tree: bytes | None,
- want_unchanged: bool = False,
- ) -> Iterable[
- tuple[
- tuple[bytes | None, bytes | None],
- tuple[int | None, int | None],
- tuple[bytes | None, bytes | None],
- ]
- ]:
- """Find the differences between the contents of a tree and a working copy.
- Args:
- names: Iterable of names in the working copy
- lookup_entry: Function to lookup an entry in the working copy
- object_store: Object store to use for retrieving tree contents
- tree: SHA1 of the root tree, or None for an empty tree
- want_unchanged: Whether unchanged files should be reported
- Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
- (oldsha, newsha)
- """
- # TODO(jelmer): Support a include_trees option
- other_names = set(names)
- if tree is not None:
- for name, mode, sha in iter_tree_contents(object_store, tree):
- assert name is not None and mode is not None and sha is not None
- try:
- (other_sha, other_mode) = lookup_entry(name)
- except KeyError:
- # Was removed
- yield ((name, None), (mode, None), (sha, None))
- else:
- other_names.remove(name)
- if want_unchanged or other_sha != sha or other_mode != mode:
- yield ((name, name), (mode, other_mode), (sha, other_sha))
- # Mention added files
- for name in other_names:
- try:
- (other_sha, other_mode) = lookup_entry(name)
- except KeyError:
- pass
- else:
- yield ((None, name), (None, other_mode), (None, other_sha))
- def index_entry_from_stat(
- stat_val: os.stat_result,
- hex_sha: bytes,
- mode: int | None = None,
- ) -> IndexEntry:
- """Create a new index entry from a stat value.
- Args:
- stat_val: POSIX stat_result instance
- hex_sha: Hex sha of the object
- mode: Optional file mode, will be derived from stat if not provided
- """
- if mode is None:
- mode = cleanup_mode(stat_val.st_mode)
- return IndexEntry(
- ctime=stat_val.st_ctime,
- mtime=stat_val.st_mtime,
- dev=stat_val.st_dev,
- ino=stat_val.st_ino,
- mode=mode,
- uid=stat_val.st_uid,
- gid=stat_val.st_gid,
- size=stat_val.st_size,
- sha=hex_sha,
- flags=0,
- extended_flags=0,
- )
- if sys.platform == "win32":
- # On Windows, creating symlinks either requires administrator privileges
- # or developer mode. Raise a more helpful error when we're unable to
- # create symlinks
- # https://github.com/jelmer/dulwich/issues/1005
- class WindowsSymlinkPermissionError(PermissionError):
- """Windows-specific error for symlink creation failures.
- This error is raised when symlink creation fails on Windows,
- typically due to lack of developer mode or administrator privileges.
- """
- def __init__(self, errno: int, msg: str, filename: str | None) -> None:
- """Initialize WindowsSymlinkPermissionError."""
- super().__init__(
- errno,
- f"Unable to create symlink; do you have developer mode enabled? {msg}",
- filename,
- )
- def symlink(
- src: str | bytes,
- dst: str | bytes,
- target_is_directory: bool = False,
- *,
- dir_fd: int | None = None,
- ) -> None:
- """Create a symbolic link on Windows with better error handling.
- Args:
- src: Source path for the symlink
- dst: Destination path where symlink will be created
- target_is_directory: Whether the target is a directory
- dir_fd: Optional directory file descriptor
- Raises:
- WindowsSymlinkPermissionError: If symlink creation fails due to permissions
- """
- try:
- return os.symlink(
- src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
- )
- except PermissionError as e:
- raise WindowsSymlinkPermissionError(
- e.errno or 0, e.strerror or "", e.filename
- ) from e
- else:
- symlink = os.symlink
- def build_file_from_blob(
- blob: Blob,
- mode: int,
- target_path: bytes,
- *,
- honor_filemode: bool = True,
- tree_encoding: str = "utf-8",
- symlink_fn: Callable[
- [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
- ]
- | None = None,
- ) -> os.stat_result:
- """Build a file or symlink on disk based on a Git object.
- Args:
- blob: The git object
- mode: File mode
- target_path: Path to write to
- honor_filemode: An optional flag to honor core.filemode setting in
- config file, default is core.filemode=True, change executable bit
- tree_encoding: Encoding to use for tree contents
- symlink_fn: Function to use for creating symlinks
- Returns: stat object for the file
- """
- try:
- oldstat = os.lstat(target_path)
- except FileNotFoundError:
- oldstat = None
- contents = blob.as_raw_string()
- if stat.S_ISLNK(mode):
- if oldstat:
- _remove_file_with_readonly_handling(target_path)
- if sys.platform == "win32":
- # os.readlink on Python3 on Windows requires a unicode string.
- contents_str = contents.decode(tree_encoding)
- target_path_str = target_path.decode(tree_encoding)
- (symlink_fn or symlink)(contents_str, target_path_str)
- else:
- (symlink_fn or symlink)(contents, target_path)
- else:
- if oldstat is not None and oldstat.st_size == len(contents):
- with open(target_path, "rb") as f:
- if f.read() == contents:
- return oldstat
- with open(target_path, "wb") as f:
- # Write out file
- f.write(contents)
- if honor_filemode:
- os.chmod(target_path, mode)
- return os.lstat(target_path)
- INVALID_DOTNAMES = (b".git", b".", b"..", b"")
- def _normalize_path_element_default(element: bytes) -> bytes:
- """Normalize path element for default case-insensitive comparison."""
- return element.lower()
- def _normalize_path_element_ntfs(element: bytes) -> bytes:
- """Normalize path element for NTFS filesystem."""
- return element.rstrip(b". ").lower()
- def _normalize_path_element_hfs(element: bytes) -> bytes:
- """Normalize path element for HFS+ filesystem."""
- import unicodedata
- # Decode to Unicode (let UnicodeDecodeError bubble up)
- element_str = element.decode("utf-8", errors="strict")
- # Remove HFS+ ignorable characters
- filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
- # Normalize to NFD
- normalized = unicodedata.normalize("NFD", filtered)
- return normalized.lower().encode("utf-8", errors="strict")
- def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
- """Get the appropriate path element normalization function based on config.
- Args:
- config: Repository configuration object
- Returns:
- Function that normalizes path elements for the configured filesystem
- """
- import os
- import sys
- if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
- return _normalize_path_element_ntfs
- elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
- return _normalize_path_element_hfs
- else:
- return _normalize_path_element_default
- def validate_path_element_default(element: bytes) -> bool:
- """Validate a path element using default rules.
- Args:
- element: Path element to validate
- Returns:
- True if path element is valid, False otherwise
- """
- return _normalize_path_element_default(element) not in INVALID_DOTNAMES
- def validate_path_element_ntfs(element: bytes) -> bool:
- """Validate a path element using NTFS filesystem rules.
- Args:
- element: Path element to validate
- Returns:
- True if path element is valid for NTFS, False otherwise
- """
- normalized = _normalize_path_element_ntfs(element)
- if normalized in INVALID_DOTNAMES:
- return False
- if normalized == b"git~1":
- return False
- return True
- # HFS+ ignorable Unicode codepoints (from Git's utf8.c)
- HFS_IGNORABLE_CHARS = {
- 0x200C, # ZERO WIDTH NON-JOINER
- 0x200D, # ZERO WIDTH JOINER
- 0x200E, # LEFT-TO-RIGHT MARK
- 0x200F, # RIGHT-TO-LEFT MARK
- 0x202A, # LEFT-TO-RIGHT EMBEDDING
- 0x202B, # RIGHT-TO-LEFT EMBEDDING
- 0x202C, # POP DIRECTIONAL FORMATTING
- 0x202D, # LEFT-TO-RIGHT OVERRIDE
- 0x202E, # RIGHT-TO-LEFT OVERRIDE
- 0x206A, # INHIBIT SYMMETRIC SWAPPING
- 0x206B, # ACTIVATE SYMMETRIC SWAPPING
- 0x206C, # INHIBIT ARABIC FORM SHAPING
- 0x206D, # ACTIVATE ARABIC FORM SHAPING
- 0x206E, # NATIONAL DIGIT SHAPES
- 0x206F, # NOMINAL DIGIT SHAPES
- 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
- }
- def validate_path_element_hfs(element: bytes) -> bool:
- """Validate path element for HFS+ filesystem.
- Equivalent to Git's is_hfs_dotgit and related checks.
- Uses NFD normalization and ignores HFS+ ignorable characters.
- """
- try:
- normalized = _normalize_path_element_hfs(element)
- except UnicodeDecodeError:
- # Malformed UTF-8 - be conservative and reject
- return False
- # Check against invalid names
- if normalized in INVALID_DOTNAMES:
- return False
- # Also check for 8.3 short name
- if normalized == b"git~1":
- return False
- return True
- def validate_path(
- path: bytes,
- element_validator: Callable[[bytes], bool] = validate_path_element_default,
- ) -> bool:
- """Default path validator that just checks for .git/."""
- parts = path.split(b"/")
- for p in parts:
- if not element_validator(p):
- return False
- else:
- return True
- def build_index_from_tree(
- root_path: str | bytes,
- index_path: str | bytes,
- object_store: ObjectContainer,
- tree_id: bytes,
- honor_filemode: bool = True,
- validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
- symlink_fn: Callable[
- [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
- ]
- | None = None,
- blob_normalizer: "FilterBlobNormalizer | None" = None,
- tree_encoding: str = "utf-8",
- ) -> None:
- """Generate and materialize index from a tree.
- Args:
- tree_id: Tree to materialize
- root_path: Target dir for materialized index files
- index_path: Target path for generated index
- object_store: Non-empty object store holding tree contents
- honor_filemode: An optional flag to honor core.filemode setting in
- config file, default is core.filemode=True, change executable bit
- validate_path_element: Function to validate path elements to check
- out; default just refuses .git and .. directories.
- symlink_fn: Function to use for creating symlinks
- blob_normalizer: An optional BlobNormalizer to use for converting line
- endings when writing blobs to the working directory.
- tree_encoding: Encoding used for tree paths (default: utf-8)
- Note: existing index is wiped and contents are not merged
- in a working dir. Suitable only for fresh clones.
- """
- index = Index(index_path, read=False)
- if not isinstance(root_path, bytes):
- root_path = os.fsencode(root_path)
- for entry in iter_tree_contents(object_store, tree_id):
- assert (
- entry.path is not None and entry.mode is not None and entry.sha is not None
- )
- if not validate_path(entry.path, validate_path_element):
- continue
- full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
- if not os.path.exists(os.path.dirname(full_path)):
- os.makedirs(os.path.dirname(full_path))
- # TODO(jelmer): Merge new index into working tree
- if S_ISGITLINK(entry.mode):
- if not os.path.isdir(full_path):
- os.mkdir(full_path)
- st = os.lstat(full_path)
- # TODO(jelmer): record and return submodule paths
- else:
- obj = object_store[entry.sha]
- assert isinstance(obj, Blob)
- # Apply blob normalization for checkout if normalizer is provided
- if blob_normalizer is not None:
- obj = blob_normalizer.checkout_normalize(obj, entry.path)
- st = build_file_from_blob(
- obj,
- entry.mode,
- full_path,
- honor_filemode=honor_filemode,
- tree_encoding=tree_encoding,
- symlink_fn=symlink_fn,
- )
- # Add file to index
- if not honor_filemode or S_ISGITLINK(entry.mode):
- # we can not use tuple slicing to build a new tuple,
- # because on windows that will convert the times to
- # longs, which causes errors further along
- st_tuple = (
- entry.mode,
- st.st_ino,
- st.st_dev,
- st.st_nlink,
- st.st_uid,
- st.st_gid,
- st.st_size,
- st.st_atime,
- st.st_mtime,
- st.st_ctime,
- )
- st = st.__class__(st_tuple)
- # default to a stage 0 index entry (normal)
- # when reading from the filesystem
- index[entry.path] = index_entry_from_stat(st, entry.sha)
- index.write()
- def blob_from_path_and_mode(
- fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
- ) -> Blob:
- """Create a blob from a path and a stat object.
- Args:
- fs_path: Full file system path to file
- mode: File mode
- tree_encoding: Encoding to use for tree contents
- Returns: A `Blob` object
- """
- assert isinstance(fs_path, bytes)
- blob = Blob()
- if stat.S_ISLNK(mode):
- if sys.platform == "win32":
- # os.readlink on Python3 on Windows requires a unicode string.
- blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
- else:
- blob.data = os.readlink(fs_path)
- else:
- with open(fs_path, "rb") as f:
- blob.data = f.read()
- return blob
- def blob_from_path_and_stat(
- fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
- ) -> Blob:
- """Create a blob from a path and a stat object.
- Args:
- fs_path: Full file system path to file
- st: A stat object
- tree_encoding: Encoding to use for tree contents
- Returns: A `Blob` object
- """
- return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
- def read_submodule_head(path: str | bytes) -> bytes | None:
- """Read the head commit of a submodule.
- Args:
- path: path to the submodule
- Returns: HEAD sha, None if not a valid head/repository
- """
- from .errors import NotGitRepository
- from .repo import Repo
- # Repo currently expects a "str", so decode if necessary.
- # TODO(jelmer): Perhaps move this into Repo() ?
- if not isinstance(path, str):
- path = os.fsdecode(path)
- try:
- repo = Repo(path)
- except NotGitRepository:
- return None
- try:
- return repo.head()
- except KeyError:
- return None
- def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
- """Check if a directory has changed after getting an error.
- When handling an error trying to create a blob from a path, call this
- function. It will check if the path is a directory. If it's a directory
- and a submodule, check the submodule head to see if it's has changed. If
- not, consider the file as changed as Git tracked a file and not a
- directory.
- Return true if the given path should be considered as changed and False
- otherwise or if the path is not a directory.
- """
- # This is actually a directory
- if os.path.exists(os.path.join(tree_path, b".git")):
- # Submodule
- head = read_submodule_head(tree_path)
- if entry.sha != head:
- return True
- else:
- # The file was changed to a directory, so consider it removed.
- return True
- return False
- os_sep_bytes = os.sep.encode("ascii")
- def _ensure_parent_dir_exists(full_path: bytes) -> None:
- """Ensure parent directory exists, checking no parent is a file."""
- parent_dir = os.path.dirname(full_path)
- if parent_dir and not os.path.exists(parent_dir):
- # Walk up the directory tree to find the first existing parent
- current = parent_dir
- parents_to_check: list[bytes] = []
- while current and not os.path.exists(current):
- parents_to_check.insert(0, current)
- new_parent = os.path.dirname(current)
- if new_parent == current:
- # Reached the root or can't go up further
- break
- current = new_parent
- # Check if the existing parent (if any) is a directory
- if current and os.path.exists(current) and not os.path.isdir(current):
- raise OSError(
- f"Cannot create directory, parent path is a file: {current!r}"
- )
- # Now check each parent we need to create isn't blocked by an existing file
- for parent_path in parents_to_check:
- if os.path.exists(parent_path) and not os.path.isdir(parent_path):
- raise OSError(
- f"Cannot create directory, parent path is a file: {parent_path!r}"
- )
- os.makedirs(parent_dir)
- def _remove_file_with_readonly_handling(path: bytes) -> None:
- """Remove a file, handling read-only files on Windows.
- Args:
- path: Path to the file to remove
- """
- try:
- os.unlink(path)
- except PermissionError:
- # On Windows, remove read-only attribute and retry
- if sys.platform == "win32":
- os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
- os.unlink(path)
- else:
- raise
- def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
- """Remove empty parent directories up to stop_at."""
- parent = os.path.dirname(path)
- while parent and parent != stop_at:
- try:
- os.rmdir(parent)
- parent = os.path.dirname(parent)
- except FileNotFoundError:
- # Directory doesn't exist - stop trying
- break
- except OSError as e:
- if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
- # Directory not empty - stop trying
- break
- raise
- def _check_symlink_matches(
- full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
- ) -> bool:
- """Check if symlink target matches expected target.
- Returns True if symlink matches, False if it doesn't match.
- """
- try:
- current_target = os.readlink(full_path)
- blob_obj = repo_object_store[entry_sha]
- expected_target = blob_obj.as_raw_string()
- if isinstance(current_target, str):
- current_target = current_target.encode()
- return current_target == expected_target
- except FileNotFoundError:
- # Symlink doesn't exist
- return False
- except OSError as e:
- if e.errno == errno.EINVAL:
- # Not a symlink
- return False
- raise
- def _check_file_matches(
- repo_object_store: "BaseObjectStore",
- full_path: bytes,
- entry_sha: bytes,
- entry_mode: int,
- current_stat: os.stat_result,
- honor_filemode: bool,
- blob_normalizer: "FilterBlobNormalizer | None" = None,
- tree_path: bytes | None = None,
- ) -> bool:
- """Check if a file on disk matches the expected git object.
- Returns True if file matches, False if it doesn't match.
- """
- # Check mode first (if honor_filemode is True)
- if honor_filemode:
- current_mode = stat.S_IMODE(current_stat.st_mode)
- expected_mode = stat.S_IMODE(entry_mode)
- # For regular files, only check the user executable bit, not group/other permissions
- # This matches Git's behavior where umask differences don't count as modifications
- if stat.S_ISREG(current_stat.st_mode):
- # Normalize regular file modes to ignore group/other write permissions
- current_mode_normalized = (
- current_mode & 0o755
- ) # Keep only user rwx and all read+execute
- expected_mode_normalized = expected_mode & 0o755
- # For Git compatibility, regular files should be either 644 or 755
- if expected_mode_normalized not in (0o644, 0o755):
- expected_mode_normalized = 0o644 # Default for regular files
- if current_mode_normalized not in (0o644, 0o755):
- # Determine if it should be executable based on user execute bit
- if current_mode & 0o100: # User execute bit is set
- current_mode_normalized = 0o755
- else:
- current_mode_normalized = 0o644
- if current_mode_normalized != expected_mode_normalized:
- return False
- else:
- # For non-regular files (symlinks, etc.), check mode exactly
- if current_mode != expected_mode:
- return False
- # If mode matches (or we don't care), check content via size first
- blob_obj = repo_object_store[entry_sha]
- if current_stat.st_size != blob_obj.raw_length():
- return False
- # Size matches, check actual content
- try:
- with open(full_path, "rb") as f:
- current_content = f.read()
- expected_content = blob_obj.as_raw_string()
- if blob_normalizer and tree_path is not None:
- assert isinstance(blob_obj, Blob)
- normalized_blob = blob_normalizer.checkout_normalize(
- blob_obj, tree_path
- )
- expected_content = normalized_blob.as_raw_string()
- return current_content == expected_content
- except (FileNotFoundError, PermissionError, IsADirectoryError):
- return False
- def _transition_to_submodule(
- repo: "Repo",
- path: bytes,
- full_path: bytes,
- current_stat: os.stat_result | None,
- entry: IndexEntry | TreeEntry,
- index: Index,
- ) -> None:
- """Transition any type to submodule."""
- from .submodule import ensure_submodule_placeholder
- if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
- # Already a directory, just ensure .git file exists
- ensure_submodule_placeholder(repo, path)
- else:
- # Remove whatever is there and create submodule
- if current_stat is not None:
- _remove_file_with_readonly_handling(full_path)
- ensure_submodule_placeholder(repo, path)
- st = os.lstat(full_path)
- assert entry.sha is not None
- index[path] = index_entry_from_stat(st, entry.sha)
- def _transition_to_file(
- object_store: "BaseObjectStore",
- path: bytes,
- full_path: bytes,
- current_stat: os.stat_result | None,
- entry: IndexEntry | TreeEntry,
- index: Index,
- honor_filemode: bool,
- symlink_fn: Callable[
- [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
- ]
- | None,
- blob_normalizer: "FilterBlobNormalizer | None",
- tree_encoding: str = "utf-8",
- ) -> None:
- """Transition any type to regular file or symlink."""
- assert entry.sha is not None and entry.mode is not None
- # Check if we need to update
- if (
- current_stat is not None
- and stat.S_ISREG(current_stat.st_mode)
- and not stat.S_ISLNK(entry.mode)
- ):
- # File to file - check if update needed
- file_matches = _check_file_matches(
- object_store,
- full_path,
- entry.sha,
- entry.mode,
- current_stat,
- honor_filemode,
- blob_normalizer,
- path,
- )
- needs_update = not file_matches
- elif (
- current_stat is not None
- and stat.S_ISLNK(current_stat.st_mode)
- and stat.S_ISLNK(entry.mode)
- ):
- # Symlink to symlink - check if update needed
- symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
- needs_update = not symlink_matches
- else:
- needs_update = True
- if not needs_update:
- # Just update index - current_stat should always be valid here since we're not updating
- assert current_stat is not None
- index[path] = index_entry_from_stat(current_stat, entry.sha)
- return
- # Remove existing entry if needed
- if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
- # Remove directory
- dir_contents = set(os.listdir(full_path))
- git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
- if git_file_name in dir_contents:
- if dir_contents != {git_file_name}:
- raise IsADirectoryError(
- f"Cannot replace submodule with untracked files: {full_path!r}"
- )
- shutil.rmtree(full_path)
- else:
- try:
- os.rmdir(full_path)
- except OSError as e:
- if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
- raise IsADirectoryError(
- f"Cannot replace non-empty directory with file: {full_path!r}"
- )
- raise
- elif current_stat is not None:
- _remove_file_with_readonly_handling(full_path)
- # Ensure parent directory exists
- _ensure_parent_dir_exists(full_path)
- # Write the file
- blob_obj = object_store[entry.sha]
- assert isinstance(blob_obj, Blob)
- if blob_normalizer:
- blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
- st = build_file_from_blob(
- blob_obj,
- entry.mode,
- full_path,
- honor_filemode=honor_filemode,
- tree_encoding=tree_encoding,
- symlink_fn=symlink_fn,
- )
- index[path] = index_entry_from_stat(st, entry.sha)
- def _transition_to_absent(
- repo: "Repo",
- path: bytes,
- full_path: bytes,
- current_stat: os.stat_result | None,
- index: Index,
- ) -> None:
- """Remove any type of entry."""
- if current_stat is None:
- return
- if stat.S_ISDIR(current_stat.st_mode):
- # Check if it's a submodule directory
- dir_contents = set(os.listdir(full_path))
- git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
- if git_file_name in dir_contents and dir_contents == {git_file_name}:
- shutil.rmtree(full_path)
- else:
- try:
- os.rmdir(full_path)
- except OSError as e:
- if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
- raise
- else:
- _remove_file_with_readonly_handling(full_path)
- try:
- del index[path]
- except KeyError:
- pass
- # Try to remove empty parent directories
- _remove_empty_parents(
- full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
- )
- def detect_case_only_renames(
- changes: Sequence["TreeChange"],
- config: "Config",
- ) -> list["TreeChange"]:
- """Detect and transform case-only renames in a list of tree changes.
- This function identifies file renames that only differ in case (e.g.,
- README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
- CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
- based on the repository configuration.
- Args:
- changes: List of TreeChange objects representing file changes
- config: Repository configuration object
- Returns:
- New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
- """
- from .diff_tree import (
- CHANGE_ADD,
- CHANGE_COPY,
- CHANGE_DELETE,
- CHANGE_MODIFY,
- CHANGE_RENAME,
- TreeChange,
- )
- # Build dictionaries of old and new paths with their normalized forms
- old_paths_normalized = {}
- new_paths_normalized = {}
- old_changes = {} # Map from old path to change object
- new_changes = {} # Map from new path to change object
- # Get the appropriate normalizer based on config
- normalize_func = get_path_element_normalizer(config)
- def normalize_path(path: bytes) -> bytes:
- """Normalize entire path using element normalization."""
- return b"/".join(normalize_func(part) for part in path.split(b"/"))
- # Pre-normalize all paths once to avoid repeated normalization
- for change in changes:
- if change.type == CHANGE_DELETE and change.old:
- assert change.old.path is not None
- try:
- normalized = normalize_path(change.old.path)
- except UnicodeDecodeError:
- import logging
- logging.warning(
- "Skipping case-only rename detection for path with invalid UTF-8: %r",
- change.old.path,
- )
- else:
- old_paths_normalized[normalized] = change.old.path
- old_changes[change.old.path] = change
- elif change.type == CHANGE_RENAME and change.old:
- assert change.old.path is not None
- # Treat RENAME as DELETE + ADD for case-only detection
- try:
- normalized = normalize_path(change.old.path)
- except UnicodeDecodeError:
- import logging
- logging.warning(
- "Skipping case-only rename detection for path with invalid UTF-8: %r",
- change.old.path,
- )
- else:
- old_paths_normalized[normalized] = change.old.path
- old_changes[change.old.path] = change
- if (
- change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
- and change.new
- ):
- assert change.new.path is not None
- try:
- normalized = normalize_path(change.new.path)
- except UnicodeDecodeError:
- import logging
- logging.warning(
- "Skipping case-only rename detection for path with invalid UTF-8: %r",
- change.new.path,
- )
- else:
- new_paths_normalized[normalized] = change.new.path
- new_changes[change.new.path] = change
- # Find case-only renames and transform changes
- case_only_renames = set()
- new_rename_changes = []
- for norm_path, old_path in old_paths_normalized.items():
- if norm_path in new_paths_normalized:
- new_path = new_paths_normalized[norm_path]
- if old_path != new_path:
- # Found a case-only rename
- old_change = old_changes[old_path]
- new_change = new_changes[new_path]
- # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
- if new_change.type == CHANGE_ADD:
- # Simple case: DELETE + ADD becomes RENAME
- rename_change = TreeChange(
- CHANGE_RENAME, old_change.old, new_change.new
- )
- else:
- # Complex case: DELETE + MODIFY becomes RENAME
- # Use the old file from DELETE and new file from MODIFY
- rename_change = TreeChange(
- CHANGE_RENAME, old_change.old, new_change.new
- )
- new_rename_changes.append(rename_change)
- # Mark the old changes for removal
- case_only_renames.add(old_change)
- case_only_renames.add(new_change)
- # Return new list with original ADD/DELETE changes replaced by renames
- result = [change for change in changes if change not in case_only_renames]
- result.extend(new_rename_changes)
- return result
- def update_working_tree(
- repo: "Repo",
- old_tree_id: bytes | None,
- new_tree_id: bytes,
- change_iterator: Iterator["TreeChange"],
- honor_filemode: bool = True,
- validate_path_element: Callable[[bytes], bool] | None = None,
- symlink_fn: Callable[
- [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
- ]
- | None = None,
- force_remove_untracked: bool = False,
- blob_normalizer: "FilterBlobNormalizer | None" = None,
- tree_encoding: str = "utf-8",
- allow_overwrite_modified: bool = False,
- ) -> None:
- """Update the working tree and index to match a new tree.
- This function handles:
- - Adding new files
- - Updating modified files
- - Removing deleted files
- - Cleaning up empty directories
- Args:
- repo: Repository object
- old_tree_id: SHA of the tree before the update
- new_tree_id: SHA of the tree to update to
- change_iterator: Iterator of TreeChange objects to apply
- honor_filemode: An optional flag to honor core.filemode setting
- validate_path_element: Function to validate path elements to check out
- symlink_fn: Function to use for creating symlinks
- force_remove_untracked: If True, remove files that exist in working
- directory but not in target tree, even if old_tree_id is None
- blob_normalizer: An optional BlobNormalizer to use for converting line
- endings when writing blobs to the working directory.
- tree_encoding: Encoding used for tree paths (default: utf-8)
- allow_overwrite_modified: If False, raise an error when attempting to
- overwrite files that have been modified compared to old_tree_id
- """
- if validate_path_element is None:
- validate_path_element = validate_path_element_default
- from .diff_tree import (
- CHANGE_ADD,
- CHANGE_COPY,
- CHANGE_DELETE,
- CHANGE_MODIFY,
- CHANGE_RENAME,
- CHANGE_UNCHANGED,
- )
- repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
- index = repo.open_index()
- # Convert iterator to list since we need multiple passes
- changes = list(change_iterator)
- # Transform case-only renames on case-insensitive filesystems
- import platform
- default_ignore_case = platform.system() in ("Windows", "Darwin")
- config = repo.get_config()
- ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
- if ignore_case:
- config = repo.get_config()
- changes = detect_case_only_renames(changes, config)
- # Check for path conflicts where files need to become directories
- paths_becoming_dirs = set()
- for change in changes:
- if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
- assert change.new is not None
- path = change.new.path
- assert path is not None
- if b"/" in path: # This is a file inside a directory
- # Check if any parent path exists as a file in the old tree or changes
- parts = path.split(b"/")
- for i in range(1, len(parts)):
- parent = b"/".join(parts[:i])
- # See if this parent path is being deleted (was a file, becoming a dir)
- for other_change in changes:
- if (
- other_change.type == CHANGE_DELETE
- and other_change.old
- and other_change.old.path == parent
- ):
- paths_becoming_dirs.add(parent)
- # Check if any path that needs to become a directory has been modified
- for path in paths_becoming_dirs:
- full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
- try:
- current_stat = os.lstat(full_path)
- except FileNotFoundError:
- continue # File doesn't exist, nothing to check
- except OSError as e:
- raise OSError(
- f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
- ) from e
- if stat.S_ISREG(current_stat.st_mode):
- # Find the old entry for this path
- old_change = None
- for change in changes:
- if (
- change.type == CHANGE_DELETE
- and change.old
- and change.old.path == path
- ):
- old_change = change
- break
- if old_change:
- # Check if file has been modified
- assert old_change.old is not None
- assert (
- old_change.old.sha is not None and old_change.old.mode is not None
- )
- file_matches = _check_file_matches(
- repo.object_store,
- full_path,
- old_change.old.sha,
- old_change.old.mode,
- current_stat,
- honor_filemode,
- blob_normalizer,
- path,
- )
- if not file_matches:
- raise OSError(
- f"Cannot replace modified file with directory: {path!r}"
- )
- # Check for uncommitted modifications before making any changes
- if not allow_overwrite_modified and old_tree_id:
- for change in changes:
- # Only check files that are being modified or deleted
- if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
- path = change.old.path
- assert path is not None
- if path.startswith(b".git") or not validate_path(
- path, validate_path_element
- ):
- continue
- full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
- try:
- current_stat = os.lstat(full_path)
- except FileNotFoundError:
- continue # File doesn't exist, nothing to check
- except OSError as e:
- raise OSError(
- f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
- ) from e
- if stat.S_ISREG(current_stat.st_mode):
- # Check if working tree file differs from old tree
- assert change.old.sha is not None and change.old.mode is not None
- file_matches = _check_file_matches(
- repo.object_store,
- full_path,
- change.old.sha,
- change.old.mode,
- current_stat,
- honor_filemode,
- blob_normalizer,
- path,
- )
- if not file_matches:
- from .errors import WorkingTreeModifiedError
- raise WorkingTreeModifiedError(
- f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
- f"would be overwritten by checkout. "
- f"Please commit your changes or stash them before you switch branches."
- )
- # Apply the changes
- for change in changes:
- if change.type in (CHANGE_DELETE, CHANGE_RENAME):
- # Remove file/directory
- assert change.old is not None and change.old.path is not None
- path = change.old.path
- if path.startswith(b".git") or not validate_path(
- path, validate_path_element
- ):
- continue
- full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
- try:
- delete_stat: os.stat_result | None = os.lstat(full_path)
- except FileNotFoundError:
- delete_stat = None
- except OSError as e:
- raise OSError(
- f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
- ) from e
- _transition_to_absent(repo, path, full_path, delete_stat, index)
- if change.type in (
- CHANGE_ADD,
- CHANGE_MODIFY,
- CHANGE_UNCHANGED,
- CHANGE_COPY,
- CHANGE_RENAME,
- ):
- # Add or modify file
- assert (
- change.new is not None
- and change.new.path is not None
- and change.new.mode is not None
- )
- path = change.new.path
- if path.startswith(b".git") or not validate_path(
- path, validate_path_element
- ):
- continue
- full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
- try:
- modify_stat: os.stat_result | None = os.lstat(full_path)
- except FileNotFoundError:
- modify_stat = None
- except OSError as e:
- raise OSError(
- f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
- ) from e
- if S_ISGITLINK(change.new.mode):
- _transition_to_submodule(
- repo, path, full_path, modify_stat, change.new, index
- )
- else:
- _transition_to_file(
- repo.object_store,
- path,
- full_path,
- modify_stat,
- change.new,
- index,
- honor_filemode,
- symlink_fn,
- blob_normalizer,
- tree_encoding,
- )
- index.write()
- def _stat_matches_entry(st: os.stat_result, entry: IndexEntry) -> bool:
- """Check if filesystem stat matches index entry stat.
- This is used to determine if a file might have changed without reading its content.
- Git uses this optimization to avoid expensive filter operations on unchanged files.
- Args:
- st: Filesystem stat result
- entry: Index entry to compare against
- Returns: True if stat matches and file is likely unchanged
- """
- # Get entry mtime
- if isinstance(entry.mtime, tuple):
- entry_mtime_sec = entry.mtime[0]
- else:
- entry_mtime_sec = int(entry.mtime)
- # Compare modification time (seconds only for now)
- # Note: We use int() to compare only seconds, as nanosecond precision
- # can vary across filesystems
- if int(st.st_mtime) != entry_mtime_sec:
- return False
- # Compare file size
- if st.st_size != entry.size:
- return False
- # If both mtime and size match, file is likely unchanged
- return True
- def _check_entry_for_changes(
- tree_path: bytes,
- entry: IndexEntry | ConflictedIndexEntry,
- root_path: bytes,
- filter_blob_callback: Callable[[bytes, bytes], bytes] | None = None,
- ) -> bytes | None:
- """Check a single index entry for changes.
- Args:
- tree_path: Path in the tree
- entry: Index entry to check
- root_path: Root filesystem path
- filter_blob_callback: Optional callback to filter blobs
- Returns: tree_path if changed, None otherwise
- """
- if isinstance(entry, ConflictedIndexEntry):
- # Conflicted files are always unstaged
- return tree_path
- full_path = _tree_to_fs_path(root_path, tree_path)
- try:
- st = os.lstat(full_path)
- if stat.S_ISDIR(st.st_mode):
- if _has_directory_changed(tree_path, entry):
- return tree_path
- return None
- if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
- return None
- # Optimization: If stat matches index entry (mtime and size unchanged),
- # we can skip reading and filtering the file entirely. This is a significant
- # performance improvement for repositories with many unchanged files.
- # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged),
- # the filter output would be the same, so we can safely skip the expensive
- # filter operation. This addresses performance issues with LFS repositories
- # where filter operations can be very slow.
- if _stat_matches_entry(st, entry):
- return None
- blob = blob_from_path_and_stat(full_path, st)
- if filter_blob_callback is not None:
- blob.data = filter_blob_callback(blob.data, tree_path)
- except FileNotFoundError:
- # The file was removed, so we assume that counts as
- # different from whatever file used to exist.
- return tree_path
- else:
- if blob.id != entry.sha:
- return tree_path
- return None
- def get_unstaged_changes(
- index: Index,
- root_path: str | bytes,
- filter_blob_callback: Callable[..., Any] | None = None,
- preload_index: bool = False,
- ) -> Generator[bytes, None, None]:
- """Walk through an index and check for differences against working tree.
- Args:
- index: index to check
- root_path: path in which to find files
- filter_blob_callback: Optional callback to filter blobs
- preload_index: If True, use parallel threads to check files (requires threading support)
- Returns: iterator over paths with unstaged changes
- """
- # For each entry in the index check the sha1 & ensure not staged
- if not isinstance(root_path, bytes):
- root_path = os.fsencode(root_path)
- if preload_index:
- # Use parallel processing for better performance on slow filesystems
- try:
- import multiprocessing
- from concurrent.futures import ThreadPoolExecutor
- except ImportError:
- # If threading is not available, fall back to serial processing
- preload_index = False
- else:
- # Collect all entries first
- entries = list(index.iteritems())
- # Use number of CPUs but cap at 8 threads to avoid overhead
- num_workers = min(multiprocessing.cpu_count(), 8)
- # Process entries in parallel
- with ThreadPoolExecutor(max_workers=num_workers) as executor:
- # Submit all tasks
- futures = [
- executor.submit(
- _check_entry_for_changes,
- tree_path,
- entry,
- root_path,
- filter_blob_callback,
- )
- for tree_path, entry in entries
- ]
- # Yield results as they complete
- for future in futures:
- result = future.result()
- if result is not None:
- yield result
- if not preload_index:
- # Serial processing
- for tree_path, entry in index.iteritems():
- result = _check_entry_for_changes(
- tree_path, entry, root_path, filter_blob_callback
- )
- if result is not None:
- yield result
- def _tree_to_fs_path(
- root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
- ) -> bytes:
- """Convert a git tree path to a file system path.
- Args:
- root_path: Root filesystem path
- tree_path: Git tree path as bytes (encoded with tree_encoding)
- tree_encoding: Encoding used for tree paths (default: utf-8)
- Returns: File system path.
- """
- assert isinstance(tree_path, bytes)
- if os_sep_bytes != b"/":
- sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
- else:
- sep_corrected_path = tree_path
- # On Windows, we need to handle tree path encoding properly
- if sys.platform == "win32":
- # Decode from tree encoding, then re-encode for filesystem
- try:
- tree_path_str = sep_corrected_path.decode(tree_encoding)
- sep_corrected_path = os.fsencode(tree_path_str)
- except UnicodeDecodeError:
- # If decoding fails, use the original bytes
- pass
- return os.path.join(root_path, sep_corrected_path)
- def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes:
- """Convert a file system path to a git tree path.
- Args:
- fs_path: File system path.
- tree_encoding: Encoding to use for tree paths (default: utf-8)
- Returns: Git tree path as bytes (encoded with tree_encoding)
- """
- if not isinstance(fs_path, bytes):
- fs_path_bytes = os.fsencode(fs_path)
- else:
- fs_path_bytes = fs_path
- # On Windows, we need to ensure tree paths are properly encoded
- if sys.platform == "win32":
- try:
- # Decode from filesystem encoding, then re-encode with tree encoding
- fs_path_str = os.fsdecode(fs_path_bytes)
- fs_path_bytes = fs_path_str.encode(tree_encoding)
- except UnicodeDecodeError:
- # If filesystem decoding fails, use the original bytes
- pass
- if os_sep_bytes != b"/":
- tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
- else:
- tree_path = fs_path_bytes
- return tree_path
- def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None:
- """Create an index entry for a directory.
- This is only used for submodules (directories containing .git).
- Args:
- st: Stat result for the directory
- path: Path to the directory
- Returns:
- IndexEntry for a submodule, or None if not a submodule
- """
- if os.path.exists(os.path.join(path, b".git")):
- head = read_submodule_head(path)
- if head is None:
- return None
- return index_entry_from_stat(st, head, mode=S_IFGITLINK)
- return None
- def index_entry_from_path(
- path: bytes, object_store: ObjectContainer | None = None
- ) -> IndexEntry | None:
- """Create an index from a filesystem path.
- This returns an index value for files, symlinks
- and tree references. for directories and
- non-existent files it returns None
- Args:
- path: Path to create an index entry for
- object_store: Optional object store to
- save new blobs in
- Returns: An index entry; None for directories
- """
- assert isinstance(path, bytes)
- st = os.lstat(path)
- if stat.S_ISDIR(st.st_mode):
- return index_entry_from_directory(st, path)
- if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
- blob = blob_from_path_and_stat(path, st)
- if object_store is not None:
- object_store.add_object(blob)
- return index_entry_from_stat(st, blob.id)
- return None
- def iter_fresh_entries(
- paths: Iterable[bytes],
- root_path: bytes,
- object_store: ObjectContainer | None = None,
- ) -> Iterator[tuple[bytes, IndexEntry | None]]:
- """Iterate over current versions of index entries on disk.
- Args:
- paths: Paths to iterate over
- root_path: Root path to access from
- object_store: Optional store to save new blobs in
- Returns: Iterator over path, index_entry
- """
- for path in paths:
- p = _tree_to_fs_path(root_path, path)
- try:
- entry = index_entry_from_path(p, object_store=object_store)
- except (FileNotFoundError, IsADirectoryError):
- entry = None
- yield path, entry
- def iter_fresh_objects(
- paths: Iterable[bytes],
- root_path: bytes,
- include_deleted: bool = False,
- object_store: ObjectContainer | None = None,
- ) -> Iterator[tuple[bytes, bytes | None, int | None]]:
- """Iterate over versions of objects on disk referenced by index.
- Args:
- paths: Paths to check
- root_path: Root path to access from
- include_deleted: Include deleted entries with sha and
- mode set to None
- object_store: Optional object store to report new items to
- Returns: Iterator over path, sha, mode
- """
- for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
- if entry is None:
- if include_deleted:
- yield path, None, None
- else:
- yield path, entry.sha, cleanup_mode(entry.mode)
- def refresh_index(index: Index, root_path: bytes) -> None:
- """Refresh the contents of an index.
- This is the equivalent to running 'git commit -a'.
- Args:
- index: Index to update
- root_path: Root filesystem path
- """
- for path, entry in iter_fresh_entries(index, root_path):
- if entry:
- index[path] = entry
- class locked_index:
- """Lock the index while making modifications.
- Works as a context manager.
- """
- _file: "_GitFile"
- def __init__(self, path: bytes | str) -> None:
- """Initialize locked_index."""
- self._path = path
- def __enter__(self) -> Index:
- """Enter context manager and lock index."""
- f = GitFile(self._path, "wb")
- self._file = f
- self._index = Index(self._path)
- return self._index
- def __exit__(
- self,
- exc_type: type | None,
- exc_value: BaseException | None,
- traceback: types.TracebackType | None,
- ) -> None:
- """Exit context manager and unlock index."""
- if exc_type is not None:
- self._file.abort()
- return
- try:
- f = SHA1Writer(self._file)
- write_index_dict(f, self._index._byname)
- except BaseException:
- self._file.abort()
- else:
- f.close()
|