|
@@ -36,6 +36,7 @@ from typing import (
|
|
|
Iterable,
|
|
|
Iterator,
|
|
|
Tuple,
|
|
|
+ Union,
|
|
|
)
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
@@ -49,6 +50,7 @@ from dulwich.objects import (
|
|
|
Tree,
|
|
|
hex_to_sha,
|
|
|
sha_to_hex,
|
|
|
+ ObjectID,
|
|
|
)
|
|
|
from dulwich.pack import (
|
|
|
SHA1Reader,
|
|
@@ -95,7 +97,7 @@ EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
|
|
|
DEFAULT_VERSION = 2
|
|
|
|
|
|
|
|
|
-def pathsplit(path):
|
|
|
+def pathsplit(path: bytes) -> Tuple[bytes, bytes]:
|
|
|
"""Split a /-delimited path into a directory part and a basename.
|
|
|
|
|
|
Args:
|
|
@@ -194,7 +196,7 @@ def read_cache_entry(f, version: int) -> Tuple[str, IndexEntry]:
|
|
|
))
|
|
|
|
|
|
|
|
|
-def write_cache_entry(f, name, entry, version):
|
|
|
+def write_cache_entry(f, name: bytes, entry: IndexEntry, version: int) -> None:
|
|
|
"""Write an index entry to a file.
|
|
|
|
|
|
Args:
|
|
@@ -249,7 +251,7 @@ def read_index(f: BinaryIO):
|
|
|
yield read_cache_entry(f, version)
|
|
|
|
|
|
|
|
|
-def read_index_dict(f):
|
|
|
+def read_index_dict(f) -> Dict[bytes, IndexEntry]:
|
|
|
"""Read an index file and return it as a dictionary.
|
|
|
|
|
|
Args:
|
|
@@ -314,7 +316,7 @@ def cleanup_mode(mode: int) -> int:
|
|
|
class Index(object):
|
|
|
"""A Git Index file."""
|
|
|
|
|
|
- def __init__(self, filename):
|
|
|
+ def __init__(self, filename: Union[bytes, str]):
|
|
|
"""Open an index file.
|
|
|
|
|
|
Args:
|
|
@@ -391,27 +393,28 @@ class Index(object):
|
|
|
"""Remove all contents from this index."""
|
|
|
self._byname = {}
|
|
|
|
|
|
- def __setitem__(self, name, x):
|
|
|
+ def __setitem__(self, name: bytes, x: IndexEntry):
|
|
|
assert isinstance(name, bytes)
|
|
|
assert len(x) == len(IndexEntry._fields)
|
|
|
# Remove the old entry if any
|
|
|
self._byname[name] = IndexEntry(*x)
|
|
|
|
|
|
- def __delitem__(self, name):
|
|
|
+ def __delitem__(self, name: bytes):
|
|
|
assert isinstance(name, bytes)
|
|
|
del self._byname[name]
|
|
|
|
|
|
- def iteritems(self):
|
|
|
+ def iteritems(self) -> Iterator[Tuple[bytes, IndexEntry]]:
|
|
|
return self._byname.items()
|
|
|
|
|
|
- def items(self):
|
|
|
+ def items(self) -> Iterator[Tuple[bytes, IndexEntry]]:
|
|
|
return self._byname.items()
|
|
|
|
|
|
- def update(self, entries):
|
|
|
+ def update(self, entries: Dict[bytes, IndexEntry]):
|
|
|
for name, value in entries.items():
|
|
|
self[name] = value
|
|
|
|
|
|
- def changes_from_tree(self, object_store, tree, want_unchanged=False):
|
|
|
+ def changes_from_tree(
|
|
|
+ self, object_store, tree: ObjectID, want_unchanged: bool = False):
|
|
|
"""Find the differences between the contents of this index and a tree.
|
|
|
|
|
|
Args:
|
|
@@ -609,8 +612,8 @@ else:
|
|
|
|
|
|
|
|
|
def build_file_from_blob(
|
|
|
- blob, mode, target_path, *, honor_filemode=True, tree_encoding="utf-8",
|
|
|
- symlink_fn=None
|
|
|
+ blob: Blob, mode: int, target_path: bytes, *, honor_filemode=True,
|
|
|
+ tree_encoding="utf-8", symlink_fn=None
|
|
|
):
|
|
|
"""Build a file or symlink on disk based on a Git object.
|
|
|
|
|
@@ -629,13 +632,12 @@ def build_file_from_blob(
|
|
|
oldstat = None
|
|
|
contents = blob.as_raw_string()
|
|
|
if stat.S_ISLNK(mode):
|
|
|
- # FIXME: This will fail on Windows. What should we do instead?
|
|
|
if oldstat:
|
|
|
os.unlink(target_path)
|
|
|
if sys.platform == "win32":
|
|
|
# os.readlink on Python3 on Windows requires a unicode string.
|
|
|
- contents = contents.decode(tree_encoding)
|
|
|
- target_path = target_path.decode(tree_encoding)
|
|
|
+ contents = contents.decode(tree_encoding) # type: ignore
|
|
|
+ target_path = target_path.decode(tree_encoding) # type: ignore
|
|
|
(symlink_fn or symlink)(contents, target_path)
|
|
|
else:
|
|
|
if oldstat is not None and oldstat.st_size == len(contents):
|
|
@@ -656,11 +658,11 @@ def build_file_from_blob(
|
|
|
INVALID_DOTNAMES = (b".git", b".", b"..", b"")
|
|
|
|
|
|
|
|
|
-def validate_path_element_default(element):
|
|
|
+def validate_path_element_default(element: bytes) -> bool:
|
|
|
return element.lower() not in INVALID_DOTNAMES
|
|
|
|
|
|
|
|
|
-def validate_path_element_ntfs(element):
|
|
|
+def validate_path_element_ntfs(element: bytes) -> bool:
|
|
|
stripped = element.rstrip(b". ").lower()
|
|
|
if stripped in INVALID_DOTNAMES:
|
|
|
return False
|
|
@@ -669,7 +671,8 @@ def validate_path_element_ntfs(element):
|
|
|
return True
|
|
|
|
|
|
|
|
|
-def validate_path(path, element_validator=validate_path_element_default):
|
|
|
+def validate_path(path: bytes,
|
|
|
+ element_validator=validate_path_element_default) -> bool:
|
|
|
"""Default path validator that just checks for .git/."""
|
|
|
parts = path.split(b"/")
|
|
|
for p in parts:
|
|
@@ -680,11 +683,11 @@ def validate_path(path, element_validator=validate_path_element_default):
|
|
|
|
|
|
|
|
|
def build_index_from_tree(
|
|
|
- root_path,
|
|
|
- index_path,
|
|
|
- object_store,
|
|
|
- tree_id,
|
|
|
- honor_filemode=True,
|
|
|
+ root_path: Union[str, bytes],
|
|
|
+ index_path: Union[str, bytes],
|
|
|
+ object_store: "BaseObjectStore",
|
|
|
+ tree_id: bytes,
|
|
|
+ honor_filemode: bool = True,
|
|
|
validate_path_element=validate_path_element_default,
|
|
|
symlink_fn=None
|
|
|
):
|
|
@@ -753,7 +756,8 @@ def build_index_from_tree(
|
|
|
index.write()
|
|
|
|
|
|
|
|
|
-def blob_from_path_and_mode(fs_path, mode, tree_encoding="utf-8"):
|
|
|
+def blob_from_path_and_mode(fs_path: bytes, mode: int,
|
|
|
+ tree_encoding="utf-8"):
|
|
|
"""Create a blob from a path and a stat object.
|
|
|
|
|
|
Args:
|
|
@@ -766,8 +770,7 @@ def blob_from_path_and_mode(fs_path, mode, tree_encoding="utf-8"):
|
|
|
if stat.S_ISLNK(mode):
|
|
|
if sys.platform == "win32":
|
|
|
# os.readlink on Python3 on Windows requires a unicode string.
|
|
|
- fs_path = os.fsdecode(fs_path)
|
|
|
- blob.data = os.readlink(fs_path).encode(tree_encoding)
|
|
|
+ blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
|
|
|
else:
|
|
|
blob.data = os.readlink(fs_path)
|
|
|
else:
|
|
@@ -776,7 +779,7 @@ def blob_from_path_and_mode(fs_path, mode, tree_encoding="utf-8"):
|
|
|
return blob
|
|
|
|
|
|
|
|
|
-def blob_from_path_and_stat(fs_path, st, tree_encoding="utf-8"):
|
|
|
+def blob_from_path_and_stat(fs_path: bytes, st, tree_encoding="utf-8"):
|
|
|
"""Create a blob from a path and a stat object.
|
|
|
|
|
|
Args:
|
|
@@ -787,7 +790,7 @@ def blob_from_path_and_stat(fs_path, st, tree_encoding="utf-8"):
|
|
|
return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
|
|
|
|
|
|
|
|
|
-def read_submodule_head(path):
|
|
|
+def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
|
|
|
"""Read the head commit of a submodule.
|
|
|
|
|
|
Args:
|
|
@@ -811,7 +814,7 @@ def read_submodule_head(path):
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def _has_directory_changed(tree_path, entry):
|
|
|
+def _has_directory_changed(tree_path: bytes, entry):
|
|
|
"""Check if a directory has changed after getting an error.
|
|
|
|
|
|
When handling an error trying to create a blob from a path, call this
|
|
@@ -836,7 +839,9 @@ def _has_directory_changed(tree_path, entry):
|
|
|
return False
|
|
|
|
|
|
|
|
|
-def get_unstaged_changes(index: Index, root_path, filter_blob_callback=None):
|
|
|
+def get_unstaged_changes(
|
|
|
+ index: Index, root_path: Union[str, bytes],
|
|
|
+ filter_blob_callback=None):
|
|
|
"""Walk through an index and check for differences against working tree.
|
|
|
|
|
|
Args:
|
|
@@ -876,7 +881,7 @@ def get_unstaged_changes(index: Index, root_path, filter_blob_callback=None):
|
|
|
os_sep_bytes = os.sep.encode("ascii")
|
|
|
|
|
|
|
|
|
-def _tree_to_fs_path(root_path, tree_path: bytes):
|
|
|
+def _tree_to_fs_path(root_path: bytes, tree_path: bytes):
|
|
|
"""Convert a git tree path to a file system path.
|
|
|
|
|
|
Args:
|
|
@@ -893,7 +898,7 @@ def _tree_to_fs_path(root_path, tree_path: bytes):
|
|
|
return os.path.join(root_path, sep_corrected_path)
|
|
|
|
|
|
|
|
|
-def _fs_to_tree_path(fs_path):
|
|
|
+def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
|
|
|
"""Convert a file system path to a git tree path.
|
|
|
|
|
|
Args:
|
|
@@ -912,7 +917,7 @@ def _fs_to_tree_path(fs_path):
|
|
|
return tree_path
|
|
|
|
|
|
|
|
|
-def index_entry_from_directory(st, path):
|
|
|
+def index_entry_from_directory(st, path: bytes) -> Optional[IndexEntry]:
|
|
|
if os.path.exists(os.path.join(path, b".git")):
|
|
|
head = read_submodule_head(path)
|
|
|
if head is None:
|
|
@@ -921,7 +926,9 @@ def index_entry_from_directory(st, path):
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def index_entry_from_path(path, object_store=None):
|
|
|
+def index_entry_from_path(
|
|
|
+ path: bytes, object_store: Optional["BaseObjectStore"] = None
|
|
|
+) -> Optional[IndexEntry]:
|
|
|
"""Create an index from a filesystem path.
|
|
|
|
|
|
This returns an index value for files, symlinks
|
|
@@ -949,8 +956,9 @@ def index_entry_from_path(path, object_store=None):
|
|
|
|
|
|
|
|
|
def iter_fresh_entries(
|
|
|
- paths, root_path, object_store: Optional["BaseObjectStore"] = None
|
|
|
-):
|
|
|
+ paths: Iterable[bytes], root_path: bytes,
|
|
|
+ object_store: Optional["BaseObjectStore"] = None
|
|
|
+) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
|
|
|
"""Iterate over current versions of index entries on disk.
|
|
|
|
|
|
Args:
|
|
@@ -968,7 +976,10 @@ def iter_fresh_entries(
|
|
|
yield path, entry
|
|
|
|
|
|
|
|
|
-def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=None):
|
|
|
+def iter_fresh_objects(
|
|
|
+ paths: Iterable[bytes], root_path: bytes, include_deleted=False,
|
|
|
+ object_store=None) -> Iterator[
|
|
|
+ Tuple[bytes, Optional[bytes], Optional[int]]]:
|
|
|
"""Iterate over versions of objects on disk referenced by index.
|
|
|
|
|
|
Args:
|
|
@@ -978,7 +989,8 @@ def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=Non
|
|
|
object_store: Optional object store to report new items to
|
|
|
Returns: Iterator over path, sha, mode
|
|
|
"""
|
|
|
- for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
|
|
|
+ for path, entry in iter_fresh_entries(
|
|
|
+ paths, root_path, object_store=object_store):
|
|
|
if entry is None:
|
|
|
if include_deleted:
|
|
|
yield path, None, None
|
|
@@ -987,7 +999,7 @@ def iter_fresh_objects(paths, root_path, include_deleted=False, object_store=Non
|
|
|
yield path, entry.sha, cleanup_mode(entry.mode)
|
|
|
|
|
|
|
|
|
-def refresh_index(index, root_path):
|
|
|
+def refresh_index(index: Index, root_path: bytes):
|
|
|
"""Refresh the contents of an index.
|
|
|
|
|
|
This is the equivalent to running 'git commit -a'.
|
|
@@ -997,7 +1009,8 @@ def refresh_index(index, root_path):
|
|
|
root_path: Root filesystem path
|
|
|
"""
|
|
|
for path, entry in iter_fresh_entries(index, root_path):
|
|
|
- index[path] = path
|
|
|
+ if entry:
|
|
|
+ index[path] = entry
|
|
|
|
|
|
|
|
|
class locked_index(object):
|
|
@@ -1005,7 +1018,7 @@ class locked_index(object):
|
|
|
|
|
|
Works as a context manager.
|
|
|
"""
|
|
|
- def __init__(self, path):
|
|
|
+ def __init__(self, path: Union[bytes, str]):
|
|
|
self._path = path
|
|
|
|
|
|
def __enter__(self):
|