|
@@ -259,6 +259,8 @@ def _decompress_path_from_stream(
|
|
|
|
|
|
|
|
|
|
|
|
|
class Stage(Enum):
|
|
class Stage(Enum):
|
|
|
|
|
+ """Represents the stage of an index entry during merge conflicts."""
|
|
|
|
|
+
|
|
|
NORMAL = 0
|
|
NORMAL = 0
|
|
|
MERGE_CONFLICT_ANCESTOR = 1
|
|
MERGE_CONFLICT_ANCESTOR = 1
|
|
|
MERGE_CONFLICT_THIS = 2
|
|
MERGE_CONFLICT_THIS = 2
|
|
@@ -267,6 +269,12 @@ class Stage(Enum):
|
|
|
|
|
|
|
|
@dataclass
|
|
@dataclass
|
|
|
class SerializedIndexEntry:
|
|
class SerializedIndexEntry:
|
|
|
|
|
+ """Represents a serialized index entry as stored in the index file.
|
|
|
|
|
+
|
|
|
|
|
+ This dataclass holds the raw data for an index entry before it's
|
|
|
|
|
+ parsed into the more user-friendly IndexEntry format.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
name: bytes
|
|
name: bytes
|
|
|
ctime: Union[int, float, tuple[int, int]]
|
|
ctime: Union[int, float, tuple[int, int]]
|
|
|
mtime: Union[int, float, tuple[int, int]]
|
|
mtime: Union[int, float, tuple[int, int]]
|
|
@@ -281,6 +289,11 @@ class SerializedIndexEntry:
|
|
|
extended_flags: int
|
|
extended_flags: int
|
|
|
|
|
|
|
|
def stage(self) -> Stage:
|
|
def stage(self) -> Stage:
|
|
|
|
|
+ """Extract the stage from the flags field.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Stage enum value indicating merge conflict state
|
|
|
|
|
+ """
|
|
|
return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
|
|
return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -320,15 +333,33 @@ class TreeExtension(IndexExtension):
|
|
|
"""Tree cache extension."""
|
|
"""Tree cache extension."""
|
|
|
|
|
|
|
|
def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
|
|
def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
|
|
|
|
|
+ """Initialize TreeExtension.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ entries: List of tree cache entries (path, sha, flags)
|
|
|
|
|
+ """
|
|
|
self.entries = entries
|
|
self.entries = entries
|
|
|
super().__init__(TREE_EXTENSION, b"")
|
|
super().__init__(TREE_EXTENSION, b"")
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_bytes(cls, data: bytes) -> "TreeExtension":
|
|
def from_bytes(cls, data: bytes) -> "TreeExtension":
|
|
|
|
|
+ """Parse TreeExtension from bytes.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ data: Raw bytes to parse
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ TreeExtension instance
|
|
|
|
|
+ """
|
|
|
# TODO: Implement tree cache parsing
|
|
# TODO: Implement tree cache parsing
|
|
|
return cls([])
|
|
return cls([])
|
|
|
|
|
|
|
|
def to_bytes(self) -> bytes:
|
|
def to_bytes(self) -> bytes:
|
|
|
|
|
+ """Serialize TreeExtension to bytes.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Serialized extension data
|
|
|
|
|
+ """
|
|
|
# TODO: Implement tree cache serialization
|
|
# TODO: Implement tree cache serialization
|
|
|
return b""
|
|
return b""
|
|
|
|
|
|
|
@@ -337,15 +368,33 @@ class ResolveUndoExtension(IndexExtension):
|
|
|
"""Resolve undo extension for recording merge conflicts."""
|
|
"""Resolve undo extension for recording merge conflicts."""
|
|
|
|
|
|
|
|
def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
|
|
def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
|
|
|
|
|
+ """Initialize ResolveUndoExtension.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ entries: List of (path, stages) where stages is a list of (stage, sha) tuples
|
|
|
|
|
+ """
|
|
|
self.entries = entries
|
|
self.entries = entries
|
|
|
super().__init__(REUC_EXTENSION, b"")
|
|
super().__init__(REUC_EXTENSION, b"")
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
|
|
def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
|
|
|
|
|
+ """Parse ResolveUndoExtension from bytes.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ data: Raw bytes to parse
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ ResolveUndoExtension instance
|
|
|
|
|
+ """
|
|
|
# TODO: Implement resolve undo parsing
|
|
# TODO: Implement resolve undo parsing
|
|
|
return cls([])
|
|
return cls([])
|
|
|
|
|
|
|
|
def to_bytes(self) -> bytes:
|
|
def to_bytes(self) -> bytes:
|
|
|
|
|
+ """Serialize ResolveUndoExtension to bytes.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Serialized extension data
|
|
|
|
|
+ """
|
|
|
# TODO: Implement resolve undo serialization
|
|
# TODO: Implement resolve undo serialization
|
|
|
return b""
|
|
return b""
|
|
|
|
|
|
|
@@ -354,15 +403,34 @@ class UntrackedExtension(IndexExtension):
|
|
|
"""Untracked cache extension."""
|
|
"""Untracked cache extension."""
|
|
|
|
|
|
|
|
def __init__(self, data: bytes) -> None:
|
|
def __init__(self, data: bytes) -> None:
|
|
|
|
|
+ """Initialize UntrackedExtension.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ data: Raw untracked cache data
|
|
|
|
|
+ """
|
|
|
super().__init__(UNTR_EXTENSION, data)
|
|
super().__init__(UNTR_EXTENSION, data)
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_bytes(cls, data: bytes) -> "UntrackedExtension":
|
|
def from_bytes(cls, data: bytes) -> "UntrackedExtension":
|
|
|
|
|
+ """Parse UntrackedExtension from bytes.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ data: Raw bytes to parse
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ UntrackedExtension instance
|
|
|
|
|
+ """
|
|
|
return cls(data)
|
|
return cls(data)
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
@dataclass
|
|
|
class IndexEntry:
|
|
class IndexEntry:
|
|
|
|
|
+ """Represents an entry in the Git index.
|
|
|
|
|
+
|
|
|
|
|
+ This is a higher-level representation of an index entry that includes
|
|
|
|
|
+ parsed data and convenience methods.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
ctime: Union[int, float, tuple[int, int]]
|
|
ctime: Union[int, float, tuple[int, int]]
|
|
|
mtime: Union[int, float, tuple[int, int]]
|
|
mtime: Union[int, float, tuple[int, int]]
|
|
|
dev: int
|
|
dev: int
|
|
@@ -377,6 +445,14 @@ class IndexEntry:
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
|
|
def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
|
|
|
|
|
+ """Create an IndexEntry from a SerializedIndexEntry.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ serialized: SerializedIndexEntry to convert
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ New IndexEntry instance
|
|
|
|
|
+ """
|
|
|
return cls(
|
|
return cls(
|
|
|
ctime=serialized.ctime,
|
|
ctime=serialized.ctime,
|
|
|
mtime=serialized.mtime,
|
|
mtime=serialized.mtime,
|
|
@@ -392,6 +468,15 @@ class IndexEntry:
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
|
|
def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
|
|
|
|
|
+ """Serialize this entry with a given name and stage.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ name: Path name for the entry
|
|
|
|
|
+ stage: Merge conflict stage
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ SerializedIndexEntry ready for writing to disk
|
|
|
|
|
+ """
|
|
|
# Clear out any existing stage bits, then set them from the Stage.
|
|
# Clear out any existing stage bits, then set them from the Stage.
|
|
|
new_flags = self.flags & ~FLAG_STAGEMASK
|
|
new_flags = self.flags & ~FLAG_STAGEMASK
|
|
|
new_flags |= stage.value << FLAG_STAGESHIFT
|
|
new_flags |= stage.value << FLAG_STAGESHIFT
|
|
@@ -411,6 +496,11 @@ class IndexEntry:
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
def stage(self) -> Stage:
|
|
def stage(self) -> Stage:
|
|
|
|
|
+ """Get the merge conflict stage of this entry.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Stage enum value
|
|
|
|
|
+ """
|
|
|
return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
|
|
return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
|
|
|
|
|
|
|
|
@property
|
|
@property
|
|
@@ -420,6 +510,7 @@ class IndexEntry:
|
|
|
|
|
|
|
|
def set_skip_worktree(self, skip: bool = True) -> None:
|
|
def set_skip_worktree(self, skip: bool = True) -> None:
|
|
|
"""Helper method to set or clear the skip-worktree bit in extended_flags.
|
|
"""Helper method to set or clear the skip-worktree bit in extended_flags.
|
|
|
|
|
+
|
|
|
Also sets FLAG_EXTENDED in self.flags if needed.
|
|
Also sets FLAG_EXTENDED in self.flags if needed.
|
|
|
"""
|
|
"""
|
|
|
if skip:
|
|
if skip:
|
|
@@ -448,6 +539,13 @@ class ConflictedIndexEntry:
|
|
|
this: Optional[IndexEntry] = None,
|
|
this: Optional[IndexEntry] = None,
|
|
|
other: Optional[IndexEntry] = None,
|
|
other: Optional[IndexEntry] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
|
|
+ """Initialize ConflictedIndexEntry.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ ancestor: The common ancestor entry
|
|
|
|
|
+ this: The current branch entry
|
|
|
|
|
+ other: The other branch entry
|
|
|
|
|
+ """
|
|
|
self.ancestor = ancestor
|
|
self.ancestor = ancestor
|
|
|
self.this = this
|
|
self.this = this
|
|
|
self.other = other
|
|
self.other = other
|
|
@@ -624,6 +722,11 @@ class UnsupportedIndexFormat(Exception):
|
|
|
"""An unsupported index format was encountered."""
|
|
"""An unsupported index format was encountered."""
|
|
|
|
|
|
|
|
def __init__(self, version: int) -> None:
|
|
def __init__(self, version: int) -> None:
|
|
|
|
|
+ """Initialize UnsupportedIndexFormat exception.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ version: The unsupported index format version
|
|
|
|
|
+ """
|
|
|
self.index_format_version = version
|
|
self.index_format_version = version
|
|
|
|
|
|
|
|
|
|
|
|
@@ -740,6 +843,7 @@ def read_index_dict(
|
|
|
f: BinaryIO,
|
|
f: BinaryIO,
|
|
|
) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
|
|
) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
|
|
|
"""Read an index file and return it as a dictionary.
|
|
"""Read an index file and return it as a dictionary.
|
|
|
|
|
+
|
|
|
Dict Key is tuple of path and stage number, as
|
|
Dict Key is tuple of path and stage number, as
|
|
|
path alone is not unique
|
|
path alone is not unique
|
|
|
Args:
|
|
Args:
|
|
@@ -811,6 +915,7 @@ def write_index_dict(
|
|
|
extensions: Optional[list[IndexExtension]] = None,
|
|
extensions: Optional[list[IndexExtension]] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Write an index file based on the contents of a dictionary.
|
|
"""Write an index file based on the contents of a dictionary.
|
|
|
|
|
+
|
|
|
being careful to sort by path and then by stage.
|
|
being careful to sort by path and then by stage.
|
|
|
"""
|
|
"""
|
|
|
entries_list = []
|
|
entries_list = []
|
|
@@ -889,9 +994,15 @@ class Index:
|
|
|
|
|
|
|
|
@property
|
|
@property
|
|
|
def path(self) -> Union[bytes, str]:
|
|
def path(self) -> Union[bytes, str]:
|
|
|
|
|
+ """Get the path to the index file.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Path to the index file
|
|
|
|
|
+ """
|
|
|
return self._filename
|
|
return self._filename
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
def __repr__(self) -> str:
|
|
|
|
|
+ """Return string representation of Index."""
|
|
|
return f"{self.__class__.__name__}({self._filename!r})"
|
|
return f"{self.__class__.__name__}({self._filename!r})"
|
|
|
|
|
|
|
|
def write(self) -> None:
|
|
def write(self) -> None:
|
|
@@ -967,6 +1078,7 @@ class Index:
|
|
|
return iter(self._byname)
|
|
return iter(self._byname)
|
|
|
|
|
|
|
|
def __contains__(self, key: bytes) -> bool:
|
|
def __contains__(self, key: bytes) -> bool:
|
|
|
|
|
+ """Check if a path exists in the index."""
|
|
|
return key in self._byname
|
|
return key in self._byname
|
|
|
|
|
|
|
|
def get_sha1(self, path: bytes) -> bytes:
|
|
def get_sha1(self, path: bytes) -> bytes:
|
|
@@ -992,6 +1104,11 @@ class Index:
|
|
|
yield path, entry.sha, cleanup_mode(entry.mode)
|
|
yield path, entry.sha, cleanup_mode(entry.mode)
|
|
|
|
|
|
|
|
def has_conflicts(self) -> bool:
|
|
def has_conflicts(self) -> bool:
|
|
|
|
|
+ """Check if the index contains any conflicted entries.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if any entries are conflicted, False otherwise
|
|
|
|
|
+ """
|
|
|
for value in self._byname.values():
|
|
for value in self._byname.values():
|
|
|
if isinstance(value, ConflictedIndexEntry):
|
|
if isinstance(value, ConflictedIndexEntry):
|
|
|
return True
|
|
return True
|
|
@@ -1004,27 +1121,49 @@ class Index:
|
|
|
def __setitem__(
|
|
def __setitem__(
|
|
|
self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
|
|
self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
|
|
|
) -> None:
|
|
) -> None:
|
|
|
|
|
+ """Set an entry in the index."""
|
|
|
assert isinstance(name, bytes)
|
|
assert isinstance(name, bytes)
|
|
|
self._byname[name] = value
|
|
self._byname[name] = value
|
|
|
|
|
|
|
|
def __delitem__(self, name: bytes) -> None:
|
|
def __delitem__(self, name: bytes) -> None:
|
|
|
|
|
+ """Delete an entry from the index."""
|
|
|
del self._byname[name]
|
|
del self._byname[name]
|
|
|
|
|
|
|
|
def iteritems(
|
|
def iteritems(
|
|
|
self,
|
|
self,
|
|
|
) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
|
|
) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
|
|
|
|
|
+ """Iterate over (path, entry) pairs in the index.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Iterator of (path, entry) tuples
|
|
|
|
|
+ """
|
|
|
return iter(self._byname.items())
|
|
return iter(self._byname.items())
|
|
|
|
|
|
|
|
def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
|
|
def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
|
|
|
|
|
+ """Get an iterator over (path, entry) pairs.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Iterator of (path, entry) tuples
|
|
|
|
|
+ """
|
|
|
return iter(self._byname.items())
|
|
return iter(self._byname.items())
|
|
|
|
|
|
|
|
def update(
|
|
def update(
|
|
|
self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
|
|
self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
|
|
|
) -> None:
|
|
) -> None:
|
|
|
|
|
+ """Update the index with multiple entries.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ entries: Dictionary mapping paths to index entries
|
|
|
|
|
+ """
|
|
|
for key, value in entries.items():
|
|
for key, value in entries.items():
|
|
|
self[key] = value
|
|
self[key] = value
|
|
|
|
|
|
|
|
def paths(self) -> Generator[bytes, None, None]:
|
|
def paths(self) -> Generator[bytes, None, None]:
|
|
|
|
|
+ """Generate all paths in the index.
|
|
|
|
|
+
|
|
|
|
|
+ Yields:
|
|
|
|
|
+ Path names as bytes
|
|
|
|
|
+ """
|
|
|
yield from self._byname.keys()
|
|
yield from self._byname.keys()
|
|
|
|
|
|
|
|
def changes_from_tree(
|
|
def changes_from_tree(
|
|
@@ -1147,8 +1286,7 @@ def changes_from_tree(
|
|
|
tuple[Optional[bytes], Optional[bytes]],
|
|
tuple[Optional[bytes], Optional[bytes]],
|
|
|
]
|
|
]
|
|
|
]:
|
|
]:
|
|
|
- """Find the differences between the contents of a tree and
|
|
|
|
|
- a working copy.
|
|
|
|
|
|
|
+ """Find the differences between the contents of a tree and a working copy.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
names: Iterable of names in the working copy
|
|
names: Iterable of names in the working copy
|
|
@@ -1194,6 +1332,7 @@ def index_entry_from_stat(
|
|
|
Args:
|
|
Args:
|
|
|
stat_val: POSIX stat_result instance
|
|
stat_val: POSIX stat_result instance
|
|
|
hex_sha: Hex sha of the object
|
|
hex_sha: Hex sha of the object
|
|
|
|
|
+ mode: Optional file mode, will be derived from stat if not provided
|
|
|
"""
|
|
"""
|
|
|
if mode is None:
|
|
if mode is None:
|
|
|
mode = cleanup_mode(stat_val.st_mode)
|
|
mode = cleanup_mode(stat_val.st_mode)
|
|
@@ -1221,7 +1360,14 @@ if sys.platform == "win32":
|
|
|
# https://github.com/jelmer/dulwich/issues/1005
|
|
# https://github.com/jelmer/dulwich/issues/1005
|
|
|
|
|
|
|
|
class WindowsSymlinkPermissionError(PermissionError):
|
|
class WindowsSymlinkPermissionError(PermissionError):
|
|
|
|
|
+ """Windows-specific error for symlink creation failures.
|
|
|
|
|
+
|
|
|
|
|
+ This error is raised when symlink creation fails on Windows,
|
|
|
|
|
+ typically due to lack of developer mode or administrator privileges.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
|
|
def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
|
|
|
|
|
+ """Initialize WindowsSymlinkPermissionError."""
|
|
|
super(PermissionError, self).__init__(
|
|
super(PermissionError, self).__init__(
|
|
|
errno,
|
|
errno,
|
|
|
f"Unable to create symlink; do you have developer mode enabled? {msg}",
|
|
f"Unable to create symlink; do you have developer mode enabled? {msg}",
|
|
@@ -1235,6 +1381,17 @@ if sys.platform == "win32":
|
|
|
*,
|
|
*,
|
|
|
dir_fd: Optional[int] = None,
|
|
dir_fd: Optional[int] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
|
|
+ """Create a symbolic link on Windows with better error handling.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ src: Source path for the symlink
|
|
|
|
|
+ dst: Destination path where symlink will be created
|
|
|
|
|
+ target_is_directory: Whether the target is a directory
|
|
|
|
|
+ dir_fd: Optional directory file descriptor
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ WindowsSymlinkPermissionError: If symlink creation fails due to permissions
|
|
|
|
|
+ """
|
|
|
try:
|
|
try:
|
|
|
return os.symlink(
|
|
return os.symlink(
|
|
|
src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
|
|
src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
|
|
@@ -1264,6 +1421,7 @@ def build_file_from_blob(
|
|
|
target_path: Path to write to
|
|
target_path: Path to write to
|
|
|
honor_filemode: An optional flag to honor core.filemode setting in
|
|
honor_filemode: An optional flag to honor core.filemode setting in
|
|
|
config file, default is core.filemode=True, change executable bit
|
|
config file, default is core.filemode=True, change executable bit
|
|
|
|
|
+ tree_encoding: Encoding to use for tree contents
|
|
|
symlink_fn: Function to use for creating symlinks
|
|
symlink_fn: Function to use for creating symlinks
|
|
|
Returns: stat object for the file
|
|
Returns: stat object for the file
|
|
|
"""
|
|
"""
|
|
@@ -1346,10 +1504,26 @@ def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_path_element_default(element: bytes) -> bool:
|
|
def validate_path_element_default(element: bytes) -> bool:
|
|
|
|
|
+ """Validate a path element using default rules.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ element: Path element to validate
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if path element is valid, False otherwise
|
|
|
|
|
+ """
|
|
|
return _normalize_path_element_default(element) not in INVALID_DOTNAMES
|
|
return _normalize_path_element_default(element) not in INVALID_DOTNAMES
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_path_element_ntfs(element: bytes) -> bool:
|
|
def validate_path_element_ntfs(element: bytes) -> bool:
|
|
|
|
|
+ """Validate a path element using NTFS filesystem rules.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ element: Path element to validate
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if path element is valid for NTFS, False otherwise
|
|
|
|
|
+ """
|
|
|
normalized = _normalize_path_element_ntfs(element)
|
|
normalized = _normalize_path_element_ntfs(element)
|
|
|
if normalized in INVALID_DOTNAMES:
|
|
if normalized in INVALID_DOTNAMES:
|
|
|
return False
|
|
return False
|
|
@@ -1437,6 +1611,7 @@ def build_index_from_tree(
|
|
|
config file, default is core.filemode=True, change executable bit
|
|
config file, default is core.filemode=True, change executable bit
|
|
|
validate_path_element: Function to validate path elements to check
|
|
validate_path_element: Function to validate path elements to check
|
|
|
out; default just refuses .git and .. directories.
|
|
out; default just refuses .git and .. directories.
|
|
|
|
|
+ symlink_fn: Function to use for creating symlinks
|
|
|
blob_normalizer: An optional BlobNormalizer to use for converting line
|
|
blob_normalizer: An optional BlobNormalizer to use for converting line
|
|
|
endings when writing blobs to the working directory.
|
|
endings when writing blobs to the working directory.
|
|
|
tree_encoding: Encoding used for tree paths (default: utf-8)
|
|
tree_encoding: Encoding used for tree paths (default: utf-8)
|
|
@@ -1510,6 +1685,7 @@ def blob_from_path_and_mode(
|
|
|
Args:
|
|
Args:
|
|
|
fs_path: Full file system path to file
|
|
fs_path: Full file system path to file
|
|
|
mode: File mode
|
|
mode: File mode
|
|
|
|
|
+ tree_encoding: Encoding to use for tree contents
|
|
|
Returns: A `Blob` object
|
|
Returns: A `Blob` object
|
|
|
"""
|
|
"""
|
|
|
assert isinstance(fs_path, bytes)
|
|
assert isinstance(fs_path, bytes)
|
|
@@ -1534,6 +1710,7 @@ def blob_from_path_and_stat(
|
|
|
Args:
|
|
Args:
|
|
|
fs_path: Full file system path to file
|
|
fs_path: Full file system path to file
|
|
|
st: A stat object
|
|
st: A stat object
|
|
|
|
|
+ tree_encoding: Encoding to use for tree contents
|
|
|
Returns: A `Blob` object
|
|
Returns: A `Blob` object
|
|
|
"""
|
|
"""
|
|
|
return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
|
|
return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
|
|
@@ -2269,6 +2446,7 @@ def get_unstaged_changes(
|
|
|
Args:
|
|
Args:
|
|
|
index: index to check
|
|
index: index to check
|
|
|
root_path: path in which to find files
|
|
root_path: path in which to find files
|
|
|
|
|
+ filter_blob_callback: Optional callback to filter blobs
|
|
|
Returns: iterator over paths with unstaged changes
|
|
Returns: iterator over paths with unstaged changes
|
|
|
"""
|
|
"""
|
|
|
# For each entry in the index check the sha1 & ensure not staged
|
|
# For each entry in the index check the sha1 & ensure not staged
|
|
@@ -2368,6 +2546,17 @@ def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -
|
|
|
|
|
|
|
|
|
|
|
|
|
def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
|
|
def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
|
|
|
|
|
+ """Create an index entry for a directory.
|
|
|
|
|
+
|
|
|
|
|
+ This is only used for submodules (directories containing .git).
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ st: Stat result for the directory
|
|
|
|
|
+ path: Path to the directory
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ IndexEntry for a submodule, or None if not a submodule
|
|
|
|
|
+ """
|
|
|
if os.path.exists(os.path.join(path, b".git")):
|
|
if os.path.exists(os.path.join(path, b".git")):
|
|
|
head = read_submodule_head(path)
|
|
head = read_submodule_head(path)
|
|
|
if head is None:
|
|
if head is None:
|
|
@@ -2436,6 +2625,7 @@ def iter_fresh_objects(
|
|
|
"""Iterate over versions of objects on disk referenced by index.
|
|
"""Iterate over versions of objects on disk referenced by index.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
|
|
+ paths: Paths to check
|
|
|
root_path: Root path to access from
|
|
root_path: Root path to access from
|
|
|
include_deleted: Include deleted entries with sha and
|
|
include_deleted: Include deleted entries with sha and
|
|
|
mode set to None
|
|
mode set to None
|
|
@@ -2473,9 +2663,11 @@ class locked_index:
|
|
|
_file: "_GitFile"
|
|
_file: "_GitFile"
|
|
|
|
|
|
|
|
def __init__(self, path: Union[bytes, str]) -> None:
|
|
def __init__(self, path: Union[bytes, str]) -> None:
|
|
|
|
|
+ """Initialize locked_index."""
|
|
|
self._path = path
|
|
self._path = path
|
|
|
|
|
|
|
|
def __enter__(self) -> Index:
|
|
def __enter__(self) -> Index:
|
|
|
|
|
+ """Enter context manager and lock index."""
|
|
|
f = GitFile(self._path, "wb")
|
|
f = GitFile(self._path, "wb")
|
|
|
assert isinstance(f, _GitFile) # GitFile in write mode always returns _GitFile
|
|
assert isinstance(f, _GitFile) # GitFile in write mode always returns _GitFile
|
|
|
self._file = f
|
|
self._file = f
|
|
@@ -2488,6 +2680,7 @@ class locked_index:
|
|
|
exc_value: Optional[BaseException],
|
|
exc_value: Optional[BaseException],
|
|
|
traceback: Optional[types.TracebackType],
|
|
traceback: Optional[types.TracebackType],
|
|
|
) -> None:
|
|
) -> None:
|
|
|
|
|
+ """Exit context manager and unlock index."""
|
|
|
if exc_type is not None:
|
|
if exc_type is not None:
|
|
|
self._file.abort()
|
|
self._file.abort()
|
|
|
return
|
|
return
|