1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168 |
- # index.py -- File parser/writer for the git index file
- # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as public by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Parser for the git index file format."""
- import os
- import stat
- import struct
- import sys
- from dataclasses import dataclass
- from enum import Enum
- from typing import (
- Any,
- BinaryIO,
- Callable,
- Dict,
- Iterable,
- Iterator,
- List,
- Optional,
- Tuple,
- Union,
- )
- from .file import GitFile
- from .object_store import iter_tree_contents
- from .objects import (
- S_IFGITLINK,
- S_ISGITLINK,
- Blob,
- ObjectID,
- Tree,
- hex_to_sha,
- sha_to_hex,
- )
- from .pack import ObjectContainer, SHA1Reader, SHA1Writer
- # 2-bit stage (during merge)
- FLAG_STAGEMASK = 0x3000
- FLAG_STAGESHIFT = 12
- FLAG_NAMEMASK = 0x0FFF
- # assume-valid
- FLAG_VALID = 0x8000
- # extended flag (must be zero in version 2)
- FLAG_EXTENDED = 0x4000
- # used by sparse checkout
- EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
- # used by "git add -N"
- EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
- DEFAULT_VERSION = 2
- class Stage(Enum):
- NORMAL = 0
- MERGE_CONFLICT_ANCESTOR = 1
- MERGE_CONFLICT_THIS = 2
- MERGE_CONFLICT_OTHER = 3
- @dataclass
- class SerializedIndexEntry:
- name: bytes
- ctime: Union[int, float, Tuple[int, int]]
- mtime: Union[int, float, Tuple[int, int]]
- dev: int
- ino: int
- mode: int
- uid: int
- gid: int
- size: int
- sha: bytes
- flags: int
- extended_flags: int
- def stage(self) -> Stage:
- return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
- @dataclass
- class IndexEntry:
- ctime: Union[int, float, Tuple[int, int]]
- mtime: Union[int, float, Tuple[int, int]]
- dev: int
- ino: int
- mode: int
- uid: int
- gid: int
- size: int
- sha: bytes
- @classmethod
- def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
- return cls(
- ctime=serialized.ctime,
- mtime=serialized.mtime,
- dev=serialized.dev,
- ino=serialized.ino,
- mode=serialized.mode,
- uid=serialized.uid,
- gid=serialized.gid,
- size=serialized.size,
- sha=serialized.sha,
- )
- def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
- return SerializedIndexEntry(
- name=name,
- ctime=self.ctime,
- mtime=self.mtime,
- dev=self.dev,
- ino=self.ino,
- mode=self.mode,
- uid=self.uid,
- gid=self.gid,
- size=self.size,
- sha=self.sha,
- flags=stage.value << FLAG_STAGESHIFT,
- extended_flags=0,
- )
- class ConflictedIndexEntry:
- """Index entry that represents a conflict."""
- ancestor: Optional[IndexEntry]
- this: Optional[IndexEntry]
- other: Optional[IndexEntry]
- def __init__(
- self,
- ancestor: Optional[IndexEntry] = None,
- this: Optional[IndexEntry] = None,
- other: Optional[IndexEntry] = None,
- ) -> None:
- self.ancestor = ancestor
- self.this = this
- self.other = other
- class UnmergedEntries(Exception):
- """Unmerged entries exist in the index."""
- def pathsplit(path: bytes) -> Tuple[bytes, bytes]:
- """Split a /-delimited path into a directory part and a basename.
- Args:
- path: The path to split.
- Returns:
- Tuple with directory name and basename
- """
- try:
- (dirname, basename) = path.rsplit(b"/", 1)
- except ValueError:
- return (b"", path)
- else:
- return (dirname, basename)
- def pathjoin(*args):
- """Join a /-delimited path."""
- return b"/".join([p for p in args if p])
- def read_cache_time(f):
- """Read a cache time.
- Args:
- f: File-like object to read from
- Returns:
- Tuple with seconds and nanoseconds
- """
- return struct.unpack(">LL", f.read(8))
- def write_cache_time(f, t):
- """Write a cache time.
- Args:
- f: File-like object to write to
- t: Time to write (as int, float or tuple with secs and nsecs)
- """
- if isinstance(t, int):
- t = (t, 0)
- elif isinstance(t, float):
- (secs, nsecs) = divmod(t, 1.0)
- t = (int(secs), int(nsecs * 1000000000))
- elif not isinstance(t, tuple):
- raise TypeError(t)
- f.write(struct.pack(">LL", *t))
- def read_cache_entry(f, version: int) -> SerializedIndexEntry:
- """Read an entry from a cache file.
- Args:
- f: File-like object to read from
- """
- beginoffset = f.tell()
- ctime = read_cache_time(f)
- mtime = read_cache_time(f)
- (
- dev,
- ino,
- mode,
- uid,
- gid,
- size,
- sha,
- flags,
- ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
- if flags & FLAG_EXTENDED:
- if version < 3:
- raise AssertionError("extended flag set in index with version < 3")
- (extended_flags,) = struct.unpack(">H", f.read(2))
- else:
- extended_flags = 0
- name = f.read(flags & FLAG_NAMEMASK)
- # Padding:
- if version < 4:
- real_size = (f.tell() - beginoffset + 8) & ~7
- f.read((beginoffset + real_size) - f.tell())
- return SerializedIndexEntry(
- name,
- ctime,
- mtime,
- dev,
- ino,
- mode,
- uid,
- gid,
- size,
- sha_to_hex(sha),
- flags & ~FLAG_NAMEMASK,
- extended_flags,
- )
- def write_cache_entry(f, entry: SerializedIndexEntry, version: int) -> None:
- """Write an index entry to a file.
- Args:
- f: File object
- entry: IndexEntry to write, tuple with:
- """
- beginoffset = f.tell()
- write_cache_time(f, entry.ctime)
- write_cache_time(f, entry.mtime)
- flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
- if entry.extended_flags:
- flags |= FLAG_EXTENDED
- if flags & FLAG_EXTENDED and version is not None and version < 3:
- raise AssertionError("unable to use extended flags in version < 3")
- f.write(
- struct.pack(
- b">LLLLLL20sH",
- entry.dev & 0xFFFFFFFF,
- entry.ino & 0xFFFFFFFF,
- entry.mode,
- entry.uid,
- entry.gid,
- entry.size,
- hex_to_sha(entry.sha),
- flags,
- )
- )
- if flags & FLAG_EXTENDED:
- f.write(struct.pack(b">H", entry.extended_flags))
- f.write(entry.name)
- if version < 4:
- real_size = (f.tell() - beginoffset + 8) & ~7
- f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
- class UnsupportedIndexFormat(Exception):
- """An unsupported index format was encountered."""
- def __init__(self, version) -> None:
- self.index_format_version = version
- def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
- """Read an index file, yielding the individual entries."""
- header = f.read(4)
- if header != b"DIRC":
- raise AssertionError("Invalid index file header: %r" % header)
- (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
- if version not in (1, 2, 3):
- raise UnsupportedIndexFormat(version)
- for i in range(num_entries):
- yield read_cache_entry(f, version)
- def read_index_dict(f) -> Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
- """Read an index file and return it as a dictionary.
- Dict Key is tuple of path and stage number, as
- path alone is not unique
- Args:
- f: File object to read fromls.
- """
- ret: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
- for entry in read_index(f):
- stage = entry.stage()
- if stage == Stage.NORMAL:
- ret[entry.name] = IndexEntry.from_serialized(entry)
- else:
- existing = ret.setdefault(entry.name, ConflictedIndexEntry())
- if isinstance(existing, IndexEntry):
- raise AssertionError("Non-conflicted entry for %r exists" % entry.name)
- if stage == Stage.MERGE_CONFLICT_ANCESTOR:
- existing.ancestor = IndexEntry.from_serialized(entry)
- elif stage == Stage.MERGE_CONFLICT_THIS:
- existing.this = IndexEntry.from_serialized(entry)
- elif stage == Stage.MERGE_CONFLICT_OTHER:
- existing.other = IndexEntry.from_serialized(entry)
- return ret
- def write_index(
- f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None
- ):
- """Write an index file.
- Args:
- f: File-like object to write to
- version: Version number to write
- entries: Iterable over the entries to write
- """
- if version is None:
- version = DEFAULT_VERSION
- f.write(b"DIRC")
- f.write(struct.pack(b">LL", version, len(entries)))
- for entry in entries:
- write_cache_entry(f, entry, version)
- def write_index_dict(
- f: BinaryIO,
- entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
- version: Optional[int] = None,
- ) -> None:
- """Write an index file based on the contents of a dictionary.
- being careful to sort by path and then by stage.
- """
- entries_list = []
- for key in sorted(entries):
- value = entries[key]
- if isinstance(value, ConflictedIndexEntry):
- if value.ancestor is not None:
- entries_list.append(
- value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
- )
- if value.this is not None:
- entries_list.append(
- value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
- )
- if value.other is not None:
- entries_list.append(
- value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
- )
- else:
- entries_list.append(value.serialize(key, Stage.NORMAL))
- write_index(f, entries_list, version=version)
- def cleanup_mode(mode: int) -> int:
- """Cleanup a mode value.
- This will return a mode that can be stored in a tree object.
- Args:
- mode: Mode to clean up.
- Returns:
- mode
- """
- if stat.S_ISLNK(mode):
- return stat.S_IFLNK
- elif stat.S_ISDIR(mode):
- return stat.S_IFDIR
- elif S_ISGITLINK(mode):
- return S_IFGITLINK
- ret = stat.S_IFREG | 0o644
- if mode & 0o100:
- ret |= 0o111
- return ret
- class Index:
- """A Git Index file."""
- _byname: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
- def __init__(self, filename: Union[bytes, str], read=True) -> None:
- """Create an index object associated with the given filename.
- Args:
- filename: Path to the index file
- read: Whether to initialize the index from the given file, should it exist.
- """
- self._filename = filename
- # TODO(jelmer): Store the version returned by read_index
- self._version = None
- self.clear()
- if read:
- self.read()
- @property
- def path(self):
- return self._filename
- def __repr__(self) -> str:
- return f"{self.__class__.__name__}({self._filename!r})"
- def write(self) -> None:
- """Write current contents of index to disk."""
- f = GitFile(self._filename, "wb")
- try:
- f = SHA1Writer(f)
- write_index_dict(f, self._byname, version=self._version)
- finally:
- f.close()
- def read(self):
- """Read current contents of index from disk."""
- if not os.path.exists(self._filename):
- return
- f = GitFile(self._filename, "rb")
- try:
- f = SHA1Reader(f)
- self.update(read_index_dict(f))
- # FIXME: Additional data?
- f.read(os.path.getsize(self._filename) - f.tell() - 20)
- f.check_sha()
- finally:
- f.close()
- def __len__(self) -> int:
- """Number of entries in this index file."""
- return len(self._byname)
- def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
- """Retrieve entry by relative path and stage.
- Returns: Either a IndexEntry or a ConflictedIndexEntry
- Raises KeyError: if the entry does not exist
- """
- return self._byname[key]
- def __iter__(self) -> Iterator[bytes]:
- """Iterate over the paths and stages in this index."""
- return iter(self._byname)
- def __contains__(self, key):
- return key in self._byname
- def get_sha1(self, path: bytes) -> bytes:
- """Return the (git object) SHA1 for the object at a path."""
- value = self[path]
- if isinstance(value, ConflictedIndexEntry):
- raise UnmergedEntries()
- return value.sha
- def get_mode(self, path: bytes) -> int:
- """Return the POSIX file mode for the object at a path."""
- value = self[path]
- if isinstance(value, ConflictedIndexEntry):
- raise UnmergedEntries()
- return value.mode
- def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]:
- """Iterate over path, sha, mode tuples for use with commit_tree."""
- for path in self:
- entry = self[path]
- if isinstance(entry, ConflictedIndexEntry):
- raise UnmergedEntries()
- yield path, entry.sha, cleanup_mode(entry.mode)
- def has_conflicts(self) -> bool:
- for value in self._byname.values():
- if isinstance(value, ConflictedIndexEntry):
- return True
- return False
- def clear(self):
- """Remove all contents from this index."""
- self._byname = {}
- def __setitem__(
- self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
- ) -> None:
- assert isinstance(name, bytes)
- self._byname[name] = value
- def __delitem__(self, name: bytes) -> None:
- del self._byname[name]
- def iteritems(
- self
- ) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
- return iter(self._byname.items())
- def items(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
- return iter(self._byname.items())
- def update(self, entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]):
- for key, value in entries.items():
- self[key] = value
- def paths(self):
- yield from self._byname.keys()
- def changes_from_tree(
- self, object_store, tree: ObjectID, want_unchanged: bool = False
- ):
- """Find the differences between the contents of this index and a tree.
- Args:
- object_store: Object store to use for retrieving tree contents
- tree: SHA1 of the root tree
- want_unchanged: Whether unchanged files should be reported
- Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
- newmode), (oldsha, newsha)
- """
- def lookup_entry(path):
- entry = self[path]
- return entry.sha, cleanup_mode(entry.mode)
- yield from changes_from_tree(
- self.paths(),
- lookup_entry,
- object_store,
- tree,
- want_unchanged=want_unchanged,
- )
- def commit(self, object_store):
- """Create a new tree from an index.
- Args:
- object_store: Object store to save the tree in
- Returns:
- Root tree SHA
- """
- return commit_tree(object_store, self.iterobjects())
- def commit_tree(
- object_store: ObjectContainer, blobs: Iterable[Tuple[bytes, bytes, int]]
- ) -> bytes:
- """Commit a new tree.
- Args:
- object_store: Object store to add trees to
- blobs: Iterable over blob path, sha, mode entries
- Returns:
- SHA1 of the created tree.
- """
- trees: Dict[bytes, Any] = {b"": {}}
- def add_tree(path):
- if path in trees:
- return trees[path]
- dirname, basename = pathsplit(path)
- t = add_tree(dirname)
- assert isinstance(basename, bytes)
- newtree = {}
- t[basename] = newtree
- trees[path] = newtree
- return newtree
- for path, sha, mode in blobs:
- tree_path, basename = pathsplit(path)
- tree = add_tree(tree_path)
- tree[basename] = (mode, sha)
- def build_tree(path):
- tree = Tree()
- for basename, entry in trees[path].items():
- if isinstance(entry, dict):
- mode = stat.S_IFDIR
- sha = build_tree(pathjoin(path, basename))
- else:
- (mode, sha) = entry
- tree.add(basename, mode, sha)
- object_store.add_object(tree)
- return tree.id
- return build_tree(b"")
- def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
- """Create a new tree from an index.
- Args:
- object_store: Object store to save the tree in
- index: Index file
- Note: This function is deprecated, use index.commit() instead.
- Returns: Root tree sha.
- """
- return commit_tree(object_store, index.iterobjects())
- def changes_from_tree(
- names: Iterable[bytes],
- lookup_entry: Callable[[bytes], Tuple[bytes, int]],
- object_store: ObjectContainer,
- tree: Optional[bytes],
- want_unchanged=False,
- ) -> Iterable[
- Tuple[
- Tuple[Optional[bytes], Optional[bytes]],
- Tuple[Optional[int], Optional[int]],
- Tuple[Optional[bytes], Optional[bytes]],
- ]
- ]:
- """Find the differences between the contents of a tree and
- a working copy.
- Args:
- names: Iterable of names in the working copy
- lookup_entry: Function to lookup an entry in the working copy
- object_store: Object store to use for retrieving tree contents
- tree: SHA1 of the root tree, or None for an empty tree
- want_unchanged: Whether unchanged files should be reported
- Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
- (oldsha, newsha)
- """
- # TODO(jelmer): Support a include_trees option
- other_names = set(names)
- if tree is not None:
- for name, mode, sha in iter_tree_contents(object_store, tree):
- try:
- (other_sha, other_mode) = lookup_entry(name)
- except KeyError:
- # Was removed
- yield ((name, None), (mode, None), (sha, None))
- else:
- other_names.remove(name)
- if want_unchanged or other_sha != sha or other_mode != mode:
- yield ((name, name), (mode, other_mode), (sha, other_sha))
- # Mention added files
- for name in other_names:
- try:
- (other_sha, other_mode) = lookup_entry(name)
- except KeyError:
- pass
- else:
- yield ((None, name), (None, other_mode), (None, other_sha))
- def index_entry_from_stat(
- stat_val,
- hex_sha: bytes,
- mode: Optional[int] = None,
- ):
- """Create a new index entry from a stat value.
- Args:
- stat_val: POSIX stat_result instance
- hex_sha: Hex sha of the object
- """
- if mode is None:
- mode = cleanup_mode(stat_val.st_mode)
- return IndexEntry(
- stat_val.st_ctime,
- stat_val.st_mtime,
- stat_val.st_dev,
- stat_val.st_ino,
- mode,
- stat_val.st_uid,
- stat_val.st_gid,
- stat_val.st_size,
- hex_sha,
- )
- if sys.platform == "win32":
- # On Windows, creating symlinks either requires administrator privileges
- # or developer mode. Raise a more helpful error when we're unable to
- # create symlinks
- # https://github.com/jelmer/dulwich/issues/1005
- class WindowsSymlinkPermissionError(PermissionError):
- def __init__(self, errno, msg, filename) -> None:
- super(PermissionError, self).__init__(
- errno,
- "Unable to create symlink; "
- "do you have developer mode enabled? %s" % msg,
- filename,
- )
- def symlink(src, dst, target_is_directory=False, *, dir_fd=None):
- try:
- return os.symlink(
- src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
- )
- except PermissionError as e:
- raise WindowsSymlinkPermissionError(e.errno, e.strerror, e.filename) from e
- else:
- symlink = os.symlink
- def build_file_from_blob(
- blob: Blob,
- mode: int,
- target_path: bytes,
- *,
- honor_filemode=True,
- tree_encoding="utf-8",
- symlink_fn=None,
- ):
- """Build a file or symlink on disk based on a Git object.
- Args:
- blob: The git object
- mode: File mode
- target_path: Path to write to
- honor_filemode: An optional flag to honor core.filemode setting in
- config file, default is core.filemode=True, change executable bit
- symlink: Function to use for creating symlinks
- Returns: stat object for the file
- """
- try:
- oldstat = os.lstat(target_path)
- except FileNotFoundError:
- oldstat = None
- contents = blob.as_raw_string()
- if stat.S_ISLNK(mode):
- if oldstat:
- os.unlink(target_path)
- if sys.platform == "win32":
- # os.readlink on Python3 on Windows requires a unicode string.
- contents = contents.decode(tree_encoding) # type: ignore
- target_path = target_path.decode(tree_encoding) # type: ignore
- (symlink_fn or symlink)(contents, target_path)
- else:
- if oldstat is not None and oldstat.st_size == len(contents):
- with open(target_path, "rb") as f:
- if f.read() == contents:
- return oldstat
- with open(target_path, "wb") as f:
- # Write out file
- f.write(contents)
- if honor_filemode:
- os.chmod(target_path, mode)
- return os.lstat(target_path)
- INVALID_DOTNAMES = (b".git", b".", b"..", b"")
- def validate_path_element_default(element: bytes) -> bool:
- return element.lower() not in INVALID_DOTNAMES
- def validate_path_element_ntfs(element: bytes) -> bool:
- stripped = element.rstrip(b". ").lower()
- if stripped in INVALID_DOTNAMES:
- return False
- if stripped == b"git~1":
- return False
- return True
- def validate_path(path: bytes, element_validator=validate_path_element_default) -> bool:
- """Default path validator that just checks for .git/."""
- parts = path.split(b"/")
- for p in parts:
- if not element_validator(p):
- return False
- else:
- return True
- def build_index_from_tree(
- root_path: Union[str, bytes],
- index_path: Union[str, bytes],
- object_store: ObjectContainer,
- tree_id: bytes,
- honor_filemode: bool = True,
- validate_path_element=validate_path_element_default,
- symlink_fn=None,
- ):
- """Generate and materialize index from a tree.
- Args:
- tree_id: Tree to materialize
- root_path: Target dir for materialized index files
- index_path: Target path for generated index
- object_store: Non-empty object store holding tree contents
- honor_filemode: An optional flag to honor core.filemode setting in
- config file, default is core.filemode=True, change executable bit
- validate_path_element: Function to validate path elements to check
- out; default just refuses .git and .. directories.
- Note: existing index is wiped and contents are not merged
- in a working dir. Suitable only for fresh clones.
- """
- index = Index(index_path, read=False)
- if not isinstance(root_path, bytes):
- root_path = os.fsencode(root_path)
- for entry in iter_tree_contents(object_store, tree_id):
- if not validate_path(entry.path, validate_path_element):
- continue
- full_path = _tree_to_fs_path(root_path, entry.path)
- if not os.path.exists(os.path.dirname(full_path)):
- os.makedirs(os.path.dirname(full_path))
- # TODO(jelmer): Merge new index into working tree
- if S_ISGITLINK(entry.mode):
- if not os.path.isdir(full_path):
- os.mkdir(full_path)
- st = os.lstat(full_path)
- # TODO(jelmer): record and return submodule paths
- else:
- obj = object_store[entry.sha]
- assert isinstance(obj, Blob)
- st = build_file_from_blob(
- obj,
- entry.mode,
- full_path,
- honor_filemode=honor_filemode,
- symlink_fn=symlink_fn,
- )
- # Add file to index
- if not honor_filemode or S_ISGITLINK(entry.mode):
- # we can not use tuple slicing to build a new tuple,
- # because on windows that will convert the times to
- # longs, which causes errors further along
- st_tuple = (
- entry.mode,
- st.st_ino,
- st.st_dev,
- st.st_nlink,
- st.st_uid,
- st.st_gid,
- st.st_size,
- st.st_atime,
- st.st_mtime,
- st.st_ctime,
- )
- st = st.__class__(st_tuple)
- # default to a stage 0 index entry (normal)
- # when reading from the filesystem
- index[entry.path] = index_entry_from_stat(st, entry.sha)
- index.write()
- def blob_from_path_and_mode(fs_path: bytes, mode: int, tree_encoding="utf-8"):
- """Create a blob from a path and a stat object.
- Args:
- fs_path: Full file system path to file
- mode: File mode
- Returns: A `Blob` object
- """
- assert isinstance(fs_path, bytes)
- blob = Blob()
- if stat.S_ISLNK(mode):
- if sys.platform == "win32":
- # os.readlink on Python3 on Windows requires a unicode string.
- blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
- else:
- blob.data = os.readlink(fs_path)
- else:
- with open(fs_path, "rb") as f:
- blob.data = f.read()
- return blob
- def blob_from_path_and_stat(fs_path: bytes, st, tree_encoding="utf-8"):
- """Create a blob from a path and a stat object.
- Args:
- fs_path: Full file system path to file
- st: A stat object
- Returns: A `Blob` object
- """
- return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
- def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
- """Read the head commit of a submodule.
- Args:
- path: path to the submodule
- Returns: HEAD sha, None if not a valid head/repository
- """
- from .errors import NotGitRepository
- from .repo import Repo
- # Repo currently expects a "str", so decode if necessary.
- # TODO(jelmer): Perhaps move this into Repo() ?
- if not isinstance(path, str):
- path = os.fsdecode(path)
- try:
- repo = Repo(path)
- except NotGitRepository:
- return None
- try:
- return repo.head()
- except KeyError:
- return None
- def _has_directory_changed(tree_path: bytes, entry):
- """Check if a directory has changed after getting an error.
- When handling an error trying to create a blob from a path, call this
- function. It will check if the path is a directory. If it's a directory
- and a submodule, check the submodule head to see if it's has changed. If
- not, consider the file as changed as Git tracked a file and not a
- directory.
- Return true if the given path should be considered as changed and False
- otherwise or if the path is not a directory.
- """
- # This is actually a directory
- if os.path.exists(os.path.join(tree_path, b".git")):
- # Submodule
- head = read_submodule_head(tree_path)
- if entry.sha != head:
- return True
- else:
- # The file was changed to a directory, so consider it removed.
- return True
- return False
- def get_unstaged_changes(
- index: Index, root_path: Union[str, bytes], filter_blob_callback=None
- ):
- """Walk through an index and check for differences against working tree.
- Args:
- index: index to check
- root_path: path in which to find files
- Returns: iterator over paths with unstaged changes
- """
- # For each entry in the index check the sha1 & ensure not staged
- if not isinstance(root_path, bytes):
- root_path = os.fsencode(root_path)
- for tree_path, entry in index.iteritems():
- full_path = _tree_to_fs_path(root_path, tree_path)
- if isinstance(entry, ConflictedIndexEntry):
- # Conflicted files are always unstaged
- yield tree_path
- continue
- try:
- st = os.lstat(full_path)
- if stat.S_ISDIR(st.st_mode):
- if _has_directory_changed(tree_path, entry):
- yield tree_path
- continue
- if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
- continue
- blob = blob_from_path_and_stat(full_path, st)
- if filter_blob_callback is not None:
- blob = filter_blob_callback(blob, tree_path)
- except FileNotFoundError:
- # The file was removed, so we assume that counts as
- # different from whatever file used to exist.
- yield tree_path
- else:
- if blob.id != entry.sha:
- yield tree_path
- os_sep_bytes = os.sep.encode("ascii")
- def _tree_to_fs_path(root_path: bytes, tree_path: bytes):
- """Convert a git tree path to a file system path.
- Args:
- root_path: Root filesystem path
- tree_path: Git tree path as bytes
- Returns: File system path.
- """
- assert isinstance(tree_path, bytes)
- if os_sep_bytes != b"/":
- sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
- else:
- sep_corrected_path = tree_path
- return os.path.join(root_path, sep_corrected_path)
- def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
- """Convert a file system path to a git tree path.
- Args:
- fs_path: File system path.
- Returns: Git tree path as bytes
- """
- if not isinstance(fs_path, bytes):
- fs_path_bytes = os.fsencode(fs_path)
- else:
- fs_path_bytes = fs_path
- if os_sep_bytes != b"/":
- tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
- else:
- tree_path = fs_path_bytes
- return tree_path
- def index_entry_from_directory(st, path: bytes) -> Optional[IndexEntry]:
- if os.path.exists(os.path.join(path, b".git")):
- head = read_submodule_head(path)
- if head is None:
- return None
- return index_entry_from_stat(st, head, mode=S_IFGITLINK)
- return None
- def index_entry_from_path(
- path: bytes, object_store: Optional[ObjectContainer] = None
- ) -> Optional[IndexEntry]:
- """Create an index from a filesystem path.
- This returns an index value for files, symlinks
- and tree references. for directories and
- non-existent files it returns None
- Args:
- path: Path to create an index entry for
- object_store: Optional object store to
- save new blobs in
- Returns: An index entry; None for directories
- """
- assert isinstance(path, bytes)
- st = os.lstat(path)
- if stat.S_ISDIR(st.st_mode):
- return index_entry_from_directory(st, path)
- if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
- blob = blob_from_path_and_stat(path, st)
- if object_store is not None:
- object_store.add_object(blob)
- return index_entry_from_stat(st, blob.id)
- return None
- def iter_fresh_entries(
- paths: Iterable[bytes],
- root_path: bytes,
- object_store: Optional[ObjectContainer] = None,
- ) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
- """Iterate over current versions of index entries on disk.
- Args:
- paths: Paths to iterate over
- root_path: Root path to access from
- object_store: Optional store to save new blobs in
- Returns: Iterator over path, index_entry
- """
- for path in paths:
- p = _tree_to_fs_path(root_path, path)
- try:
- entry = index_entry_from_path(p, object_store=object_store)
- except (FileNotFoundError, IsADirectoryError):
- entry = None
- yield path, entry
- def iter_fresh_objects(
- paths: Iterable[bytes], root_path: bytes, include_deleted=False, object_store=None
- ) -> Iterator[Tuple[bytes, Optional[bytes], Optional[int]]]:
- """Iterate over versions of objects on disk referenced by index.
- Args:
- root_path: Root path to access from
- include_deleted: Include deleted entries with sha and
- mode set to None
- object_store: Optional object store to report new items to
- Returns: Iterator over path, sha, mode
- """
- for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
- if entry is None:
- if include_deleted:
- yield path, None, None
- else:
- yield path, entry.sha, cleanup_mode(entry.mode)
- def refresh_index(index: Index, root_path: bytes):
- """Refresh the contents of an index.
- This is the equivalent to running 'git commit -a'.
- Args:
- index: Index to update
- root_path: Root filesystem path
- """
- for path, entry in iter_fresh_entries(index, root_path):
- if entry:
- index[path] = entry
- class locked_index:
- """Lock the index while making modifications.
- Works as a context manager.
- """
- def __init__(self, path: Union[bytes, str]) -> None:
- self._path = path
- def __enter__(self):
- self._file = GitFile(self._path, "wb")
- self._index = Index(self._path)
- return self._index
- def __exit__(self, exc_type, exc_value, traceback):
- if exc_type is not None:
- self._file.abort()
- return
- try:
- f = SHA1Writer(self._file)
- write_index_dict(f, self._index._byname)
- except BaseException:
- self._file.abort()
- else:
- f.close()
|