summaryrefslogtreecommitdiff
path: root/git/index
diff options
context:
space:
mode:
Diffstat (limited to 'git/index')
-rw-r--r--git/index/base.py479
-rw-r--r--git/index/fun.py185
-rw-r--r--git/index/typ.py67
-rw-r--r--git/index/util.py35
4 files changed, 490 insertions, 276 deletions
diff --git a/git/index/base.py b/git/index/base.py
index 00e51bf5..48894833 100644
--- a/git/index/base.py
+++ b/git/index/base.py
@@ -15,12 +15,7 @@ from git.compat import (
force_bytes,
defenc,
)
-from git.exc import (
- GitCommandError,
- CheckoutError,
- GitError,
- InvalidGitRepositoryError
-)
+from git.exc import GitCommandError, CheckoutError, GitError, InvalidGitRepositoryError
from git.objects import (
Blob,
Submodule,
@@ -36,7 +31,7 @@ from git.util import (
file_contents_ro,
to_native_path_linux,
unbare_repo,
- to_bin_sha
+ to_bin_sha,
)
from gitdb.base import IStream
from gitdb.db import MemoryDB
@@ -52,23 +47,32 @@ from .fun import (
write_tree_from_cache,
stat_mode_to_index_mode,
S_IFGITLINK,
- run_commit_hook
+ run_commit_hook,
)
from .typ import (
BaseIndexEntry,
IndexEntry,
)
-from .util import (
- TemporaryFileSwap,
- post_clear_cache,
- default_index,
- git_working_dir
-)
+from .util import TemporaryFileSwap, post_clear_cache, default_index, git_working_dir
# typing -----------------------------------------------------------------------------
-from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn,
- Sequence, TYPE_CHECKING, Tuple, Type, Union)
+from typing import (
+ Any,
+ BinaryIO,
+ Callable,
+ Dict,
+ IO,
+ Iterable,
+ Iterator,
+ List,
+ NoReturn,
+ Sequence,
+ TYPE_CHECKING,
+ Tuple,
+ Type,
+ Union,
+)
from git.types import Commit_ish, PathLike
@@ -85,7 +89,7 @@ Treeish = Union[Tree, Commit, str, bytes]
# ------------------------------------------------------------------------------------
-__all__ = ('IndexFile', 'CheckoutError')
+__all__ = ("IndexFile", "CheckoutError")
class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
@@ -110,11 +114,12 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
Make sure you use index.write() once you are done manipulating the index directly
before operating on it using the git command"""
+
__slots__ = ("repo", "version", "entries", "_extension_data", "_file_path")
- _VERSION = 2 # latest version we support
+ _VERSION = 2 # latest version we support
S_IFGITLINK = S_IFGITLINK # a submodule
- def __init__(self, repo: 'Repo', file_path: Union[PathLike, None] = None) -> None:
+ def __init__(self, repo: "Repo", file_path: Union[PathLike, None] = None) -> None:
"""Initialize this Index instance, optionally from the given ``file_path``.
If no file_path is given, we will be created from the current index file.
@@ -122,7 +127,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
repository's index on demand."""
self.repo = repo
self.version = self._VERSION
- self._extension_data = b''
+ self._extension_data = b""
self._file_path: PathLike = file_path or self._index_path()
def _set_cache_(self, attr: str) -> None:
@@ -152,40 +157,48 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
@property
def path(self) -> PathLike:
- """ :return: Path to the index file we are representing """
+ """:return: Path to the index file we are representing"""
return self._file_path
def _delete_entries_cache(self) -> None:
"""Safely clear the entries cache so it can be recreated"""
try:
- del(self.entries)
+ del self.entries
except AttributeError:
# fails in python 2.6.5 with this exception
pass
# END exception handling
- #{ Serializable Interface
+ # { Serializable Interface
- def _deserialize(self, stream: IO) -> 'IndexFile':
+ def _deserialize(self, stream: IO) -> "IndexFile":
"""Initialize this instance with index values read from the given stream"""
- self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream)
+ self.version, self.entries, self._extension_data, _conten_sha = read_cache(
+ stream
+ )
return self
def _entries_sorted(self) -> List[IndexEntry]:
""":return: list of entries, in a sorted fashion, first by path, then by stage"""
return sorted(self.entries.values(), key=lambda e: (e.path, e.stage))
- def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> 'IndexFile':
+ def _serialize(
+ self, stream: IO, ignore_extension_data: bool = False
+ ) -> "IndexFile":
entries = self._entries_sorted()
- extension_data = self._extension_data # type: Union[None, bytes]
+ extension_data = self._extension_data # type: Union[None, bytes]
if ignore_extension_data:
extension_data = None
write_cache(entries, stream, extension_data)
return self
- #} END serializable interface
+ # } END serializable interface
- def write(self, file_path: Union[None, PathLike] = None, ignore_extension_data: bool = False) -> None:
+ def write(
+ self,
+ file_path: Union[None, PathLike] = None,
+ ignore_extension_data: bool = False,
+ ) -> None:
"""Write the current state to our file path or to the given one
:param file_path:
@@ -229,7 +242,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
@post_clear_cache
@default_index
- def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> 'IndexFile':
+ def merge_tree(
+ self, rhs: Treeish, base: Union[None, Treeish] = None
+ ) -> "IndexFile":
"""Merge the given rhs treeish into the current index, possibly taking
a common base treeish into account.
@@ -252,7 +267,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
be raised at the first conflicting path. If you want to have proper
merge resolution to be done by yourself, you have to commit the changed
index ( or make a valid tree from it ) and retry with a three-way
- index.from_tree call. """
+ index.from_tree call."""
# -i : ignore working tree status
# --aggressive : handle more merge cases
# -m : do an actual merge
@@ -265,8 +280,8 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
return self
@classmethod
- def new(cls, repo: 'Repo', *tree_sha: Union[str, Tree]) -> 'IndexFile':
- """ Merge the given treeish revisions into a new index which is returned.
+ def new(cls, repo: "Repo", *tree_sha: Union[str, Tree]) -> "IndexFile":
+ """Merge the given treeish revisions into a new index which is returned.
This method behaves like git-read-tree --aggressive when doing the merge.
:param repo: The repository treeish are located in.
@@ -283,15 +298,18 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
inst = cls(repo)
# convert to entries dict
- entries: Dict[Tuple[PathLike, int], IndexEntry] = dict(zip(
- ((e.path, e.stage) for e in base_entries),
- (IndexEntry.from_base(e) for e in base_entries)))
+ entries: Dict[Tuple[PathLike, int], IndexEntry] = dict(
+ zip(
+ ((e.path, e.stage) for e in base_entries),
+ (IndexEntry.from_base(e) for e in base_entries),
+ )
+ )
inst.entries = entries
return inst
@classmethod
- def from_tree(cls, repo: 'Repo', *treeish: Treeish, **kwargs: Any) -> 'IndexFile':
+ def from_tree(cls, repo: "Repo", *treeish: Treeish, **kwargs: Any) -> "IndexFile":
"""Merge the given treeish revisions into a new index which is returned.
The original index will remain unaltered
@@ -326,7 +344,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
it will be temporarily moved out of the way to assure there are no unsuspected
interferences."""
if len(treeish) == 0 or len(treeish) > 3:
- raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish))
+ raise ValueError(
+ "Please specify between 1 and 3 treeish, got %i" % len(treeish)
+ )
arg_list: List[Union[Treeish, str]] = []
# ignore that working tree and index possibly are out of date
@@ -339,7 +359,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# tmp file created in git home directory to be sure renaming
# works - /tmp/ dirs could be on another device
- tmp_index = tempfile.mktemp('', '', repo.git_dir)
+ tmp_index = tempfile.mktemp("", "", repo.git_dir)
arg_list.append("--index-output=%s" % tmp_index)
arg_list.extend(treeish)
@@ -348,12 +368,12 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# Unfortunately there is no 'soft' way to do it.
# The TemporaryFileSwap assure the original file get put back
if repo.git_dir:
- index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index'))
+ index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, "index"))
try:
repo.git.read_tree(*arg_list, **kwargs)
index = cls(repo, tmp_index)
- index.entries # force it to read the file as we will delete the temp-file
- del(index_handler) # release as soon as possible
+ index.entries # force it to read the file as we will delete the temp-file
+ del index_handler # release as soon as possible
finally:
if osp.exists(tmp_index):
os.remove(tmp_index)
@@ -363,14 +383,18 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# UTILITIES
@unbare_repo
- def _iter_expand_paths(self: 'IndexFile', paths: Sequence[PathLike]) -> Iterator[PathLike]:
+ def _iter_expand_paths(
+ self: "IndexFile", paths: Sequence[PathLike]
+ ) -> Iterator[PathLike]:
"""Expand the directories in list of paths to the corresponding paths accordingly,
Note: git will add items multiple times even if a glob overlapped
with manually specified paths or if paths where specified multiple
times - we respect that and do not prune"""
+
def raise_exc(e: Exception) -> NoReturn:
raise e
+
r = str(self.repo.working_tree_dir)
rs = r + os.sep
for path in paths:
@@ -380,18 +404,20 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END make absolute path
try:
- st = os.lstat(abs_path) # handles non-symlinks as well
+ st = os.lstat(abs_path) # handles non-symlinks as well
except OSError:
# the lstat call may fail as the path may contain globs as well
pass
else:
if S_ISLNK(st.st_mode):
- yield abs_path.replace(rs, '')
+ yield abs_path.replace(rs, "")
continue
# end check symlink
# if the path is not already pointing to an existing file, resolve globs if possible
- if not os.path.exists(abs_path) and ('?' in abs_path or '*' in abs_path or '[' in abs_path):
+ if not os.path.exists(abs_path) and (
+ "?" in abs_path or "*" in abs_path or "[" in abs_path
+ ):
resolved_paths = glob.glob(abs_path)
# not abs_path in resolved_paths:
# a glob() resolving to the same path we are feeding it with
@@ -401,25 +427,31 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# whose name contains wildcard characters.
if abs_path not in resolved_paths:
for f in self._iter_expand_paths(glob.glob(abs_path)):
- yield str(f).replace(rs, '')
+ yield str(f).replace(rs, "")
continue
# END glob handling
try:
for root, _dirs, files in os.walk(abs_path, onerror=raise_exc):
for rela_file in files:
# add relative paths only
- yield osp.join(root.replace(rs, ''), rela_file)
+ yield osp.join(root.replace(rs, ""), rela_file)
# END for each file in subdir
# END for each subdirectory
except OSError:
# was a file or something that could not be iterated
- yield abs_path.replace(rs, '')
+ yield abs_path.replace(rs, "")
# END path exception handling
# END for each path
- def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: PathLike, fmakeexc: Callable[..., GitError],
- fprogress: Callable[[PathLike, bool, PathLike], None],
- read_from_stdout: bool = True) -> Union[None, str]:
+ def _write_path_to_stdin(
+ self,
+ proc: "Popen",
+ filepath: PathLike,
+ item: PathLike,
+ fmakeexc: Callable[..., GitError],
+ fprogress: Callable[[PathLike, bool, PathLike], None],
+ read_from_stdout: bool = True,
+ ) -> Union[None, str]:
"""Write path to proc.stdin and make sure it processes the item, including progress.
:return: stdout string
@@ -451,15 +483,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
fprogress(filepath, True, item)
return rval
- def iter_blobs(self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True
- ) -> Iterator[Tuple[StageType, Blob]]:
+ def iter_blobs(
+ self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True
+ ) -> Iterator[Tuple[StageType, Blob]]:
"""
:return: Iterator yielding tuples of Blob objects and stages, tuple(stage, Blob)
:param predicate:
Function(t) returning True if tuple(stage, Blob) should be yielded by the
iterator. A default filter, the BlobFilter, allows you to yield blobs
- only if they match a given list of paths. """
+ only if they match a given list of paths."""
for entry in self.entries.values():
blob = entry.to_blob(self.repo)
blob.size = entry.size
@@ -491,11 +524,13 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
return path_map
- @ classmethod
- def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]:
+ @classmethod
+ def entry_key(
+ cls, *entry: Union[BaseIndexEntry, PathLike, StageType]
+ ) -> Tuple[PathLike, StageType]:
return entry_key(*entry)
- def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> 'IndexFile':
+ def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> "IndexFile":
"""Resolve the blobs given in blob iterator. This will effectively remove the
index entries of the respective path at all non-null stages and add the given
blob as new stage null blob.
@@ -519,7 +554,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# delete all possible stages
for stage in (1, 2, 3):
try:
- del(self.entries[(blob.path, stage)])
+ del self.entries[(blob.path, stage)]
except KeyError:
pass
# END ignore key errors
@@ -530,7 +565,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
return self
- def update(self) -> 'IndexFile':
+ def update(self) -> "IndexFile":
"""Reread the contents of our index file, discarding all cached information
we might have.
@@ -550,7 +585,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
does not yet exist in the object database. This could happen if you added
Entries to the index directly.
:raise ValueError: if there are no entries in the cache
- :raise UnmergedEntriesError: """
+ :raise UnmergedEntriesError:"""
# we obtain no lock as we just flush our contents to disk as tree
# If we are a new index, the entries access will load our data accordingly
mdb = MemoryDB()
@@ -562,13 +597,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# note: additional deserialization could be saved if write_tree_from_cache
# would return sorted tree entries
- root_tree = Tree(self.repo, binsha, path='')
+ root_tree = Tree(self.repo, binsha, path="")
root_tree._cache = tree_items
return root_tree
- def _process_diff_args(self, # type: ignore[override]
- args: List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]]
- ) -> List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]]:
+ def _process_diff_args(
+ self, # type: ignore[override]
+ args: List[Union[str, "git_diff.Diffable", Type["git_diff.Diffable.Index"]]],
+ ) -> List[Union[str, "git_diff.Diffable", Type["git_diff.Diffable.Index"]]]:
try:
args.pop(args.index(self))
except IndexError:
@@ -585,12 +621,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
if self.repo.bare:
raise InvalidGitRepositoryError("require non-bare repository")
if not str(path).startswith(str(self.repo.working_tree_dir)):
- raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir))
+ raise ValueError(
+ "Absolute path %r is not in git repository at %r"
+ % (path, self.repo.working_tree_dir)
+ )
return os.path.relpath(path, self.repo.working_tree_dir)
- def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']]
- ) -> Tuple[List[PathLike], List[BaseIndexEntry]]:
- """ Split the items into two lists of path strings and BaseEntries. """
+ def _preprocess_add_items(
+ self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]]
+ ) -> Tuple[List[PathLike], List[BaseIndexEntry]]:
+ """Split the items into two lists of path strings and BaseEntries."""
paths = []
entries = []
# if it is a string put in list
@@ -612,43 +652,58 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry:
"""Store file at filepath in the database and return the base index entry
Needs the git_working_dir decorator active ! This must be assured in the calling code"""
- st = os.lstat(filepath) # handles non-symlinks as well
+ st = os.lstat(filepath) # handles non-symlinks as well
if S_ISLNK(st.st_mode):
# in PY3, readlink is string, but we need bytes. In PY2, it's just OS encoded bytes, we assume UTF-8
- open_stream: Callable[[], BinaryIO] = lambda: BytesIO(force_bytes(os.readlink(filepath),
- encoding=defenc))
+ open_stream: Callable[[], BinaryIO] = lambda: BytesIO(
+ force_bytes(os.readlink(filepath), encoding=defenc)
+ )
else:
- open_stream = lambda: open(filepath, 'rb')
+ open_stream = lambda: open(filepath, "rb")
with open_stream() as stream:
fprogress(filepath, False, filepath)
istream = self.repo.odb.store(IStream(Blob.type, st.st_size, stream))
fprogress(filepath, True, filepath)
- return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode),
- istream.binsha, 0, to_native_path_linux(filepath)))
+ return BaseIndexEntry(
+ (
+ stat_mode_to_index_mode(st.st_mode),
+ istream.binsha,
+ 0,
+ to_native_path_linux(filepath),
+ )
+ )
- @ unbare_repo
- @ git_working_dir
- def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable,
- entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]:
+ @unbare_repo
+ @git_working_dir
+ def _entries_for_paths(
+ self,
+ paths: List[str],
+ path_rewriter: Callable,
+ fprogress: Callable,
+ entries: List[BaseIndexEntry],
+ ) -> List[BaseIndexEntry]:
entries_added: List[BaseIndexEntry] = []
if path_rewriter:
for path in paths:
if osp.isabs(path):
abspath = path
- gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1:]
+ gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1 :]
else:
gitrelative_path = path
if self.repo.working_tree_dir:
abspath = osp.join(self.repo.working_tree_dir, gitrelative_path)
# end obtain relative and absolute paths
- blob = Blob(self.repo, Blob.NULL_BIN_SHA,
- stat_mode_to_index_mode(os.stat(abspath).st_mode),
- to_native_path_linux(gitrelative_path))
+ blob = Blob(
+ self.repo,
+ Blob.NULL_BIN_SHA,
+ stat_mode_to_index_mode(os.stat(abspath).st_mode),
+ to_native_path_linux(gitrelative_path),
+ )
# TODO: variable undefined
entries.append(BaseIndexEntry.from_blob(blob))
# END for each path
- del(paths[:])
+ del paths[:]
# END rewrite paths
# HANDLE PATHS
@@ -659,9 +714,15 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END path handling
return entries_added
- def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], force: bool = True,
- fprogress: Callable = lambda *args: None, path_rewriter: Union[Callable[..., PathLike], None] = None,
- write: bool = True, write_extension_data: bool = False) -> List[BaseIndexEntry]:
+ def add(
+ self,
+ items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]],
+ force: bool = True,
+ fprogress: Callable = lambda *args: None,
+ path_rewriter: Union[Callable[..., PathLike], None] = None,
+ write: bool = True,
+ write_extension_data: bool = False,
+ ) -> List[BaseIndexEntry]:
"""Add files from the working tree, specific blobs or BaseIndexEntries
to the index.
@@ -769,30 +830,43 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# That way, we are OK on a bare repository as well.
# If there are no paths, the rewriter has nothing to do either
if paths:
- entries_added.extend(self._entries_for_paths(paths, path_rewriter, fprogress, entries))
+ entries_added.extend(
+ self._entries_for_paths(paths, path_rewriter, fprogress, entries)
+ )
# HANDLE ENTRIES
if entries:
null_mode_entries = [e for e in entries if e.mode == 0]
if null_mode_entries:
raise ValueError(
- "At least one Entry has a null-mode - please use index.remove to remove files for clarity")
+ "At least one Entry has a null-mode - please use index.remove to remove files for clarity"
+ )
# END null mode should be remove
# HANDLE ENTRY OBJECT CREATION
# create objects if required, otherwise go with the existing shas
- null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA]
+ null_entries_indices = [
+ i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA
+ ]
if null_entries_indices:
- @ git_working_dir
- def handle_null_entries(self: 'IndexFile') -> None:
+
+ @git_working_dir
+ def handle_null_entries(self: "IndexFile") -> None:
for ei in null_entries_indices:
null_entry = entries[ei]
new_entry = self._store_path(null_entry.path, fprogress)
# update null entry
entries[ei] = BaseIndexEntry(
- (null_entry.mode, new_entry.binsha, null_entry.stage, null_entry.path))
+ (
+ null_entry.mode,
+ new_entry.binsha,
+ null_entry.stage,
+ null_entry.path,
+ )
+ )
# END for each entry index
+
# end closure
handle_null_entries(self)
# END null_entry handling
@@ -802,7 +876,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# all object sha's
if path_rewriter:
for i, e in enumerate(entries):
- entries[i] = BaseIndexEntry((e.mode, e.binsha, e.stage, path_rewriter(e)))
+ entries[i] = BaseIndexEntry(
+ (e.mode, e.binsha, e.stage, path_rewriter(e))
+ )
# END for each entry
# END handle path rewriting
@@ -828,8 +904,12 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
return entries_added
- def _items_to_rela_paths(self, items: Union[PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]]
- ) -> List[PathLike]:
+ def _items_to_rela_paths(
+ self,
+ items: Union[
+ PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]
+ ],
+ ) -> List[PathLike]:
"""Returns a list of repo-relative paths from the given items which
may be absolute or relative paths, entries or blobs"""
paths = []
@@ -847,10 +927,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END for each item
return paths
- @ post_clear_cache
- @ default_index
- def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], working_tree: bool = False,
- **kwargs: Any) -> List[str]:
+ @post_clear_cache
+ @default_index
+ def remove(
+ self,
+ items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]],
+ working_tree: bool = False,
+ **kwargs: Any
+ ) -> List[str]:
"""Remove the given items from the index and optionally from
the working tree as well.
@@ -885,7 +969,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
List(path_string, ...) list of repository relative paths that have
been removed effectively.
This is interesting to know in case you have provided a directory or
- globs. Paths are relative to the repository. """
+ globs. Paths are relative to the repository."""
args = []
if not working_tree:
args.append("--cached")
@@ -899,10 +983,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# rm 'path'
return [p[4:-1] for p in removed_paths]
- @ post_clear_cache
- @ default_index
- def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], skip_errors: bool = False,
- **kwargs: Any) -> List[Tuple[str, str]]:
+ @post_clear_cache
+ @default_index
+ def move(
+ self,
+ items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]],
+ skip_errors: bool = False,
+ **kwargs: Any
+ ) -> List[Tuple[str, str]]:
"""Rename/move the items, whereas the last item is considered the destination of
the move operation. If the destination is a file, the first item ( of two )
must be a file as well. If the destination is a directory, it may be preceded
@@ -928,14 +1016,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
GitCommandError: If git could not handle your request"""
args = []
if skip_errors:
- args.append('-k')
+ args.append("-k")
paths = self._items_to_rela_paths(items)
if len(paths) < 2:
- raise ValueError("Please provide at least one source and one destination of the move operation")
+ raise ValueError(
+ "Please provide at least one source and one destination of the move operation"
+ )
- was_dry_run = kwargs.pop('dry_run', kwargs.pop('n', None))
- kwargs['dry_run'] = True
+ was_dry_run = kwargs.pop("dry_run", kwargs.pop("n", None))
+ kwargs["dry_run"] = True
# first execute rename in dryrun so the command tells us what it actually does
# ( for later output )
@@ -945,7 +1035,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# parse result - first 0:n/2 lines are 'checking ', the remaining ones
# are the 'renaming' ones which we parse
for ln in range(int(len(mvlines) / 2), len(mvlines)):
- tokens = mvlines[ln].split(' to ')
+ tokens = mvlines[ln].split(" to ")
assert len(tokens) == 2, "Too many tokens in %s" % mvlines[ln]
# [0] = Renaming x
@@ -959,20 +1049,22 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END handle dryrun
# now apply the actual operation
- kwargs.pop('dry_run')
+ kwargs.pop("dry_run")
self.repo.git.mv(args, paths, **kwargs)
return out
- def commit(self,
- message: str,
- parent_commits: Union[Commit_ish, None] = None,
- head: bool = True,
- author: Union[None, 'Actor'] = None,
- committer: Union[None, 'Actor'] = None,
- author_date: Union[str, None] = None,
- commit_date: Union[str, None] = None,
- skip_hooks: bool = False) -> Commit:
+ def commit(
+ self,
+ message: str,
+ parent_commits: Union[Commit_ish, None] = None,
+ head: bool = True,
+ author: Union[None, "Actor"] = None,
+ committer: Union[None, "Actor"] = None,
+ author_date: Union[str, None] = None,
+ commit_date: Union[str, None] = None,
+ skip_hooks: bool = False,
+ ) -> Commit:
"""Commit the current default index file, creating a commit object.
For more information on the arguments, see Commit.create_from_tree().
@@ -982,18 +1074,26 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
or `--no-verify` on the command line.
:return: Commit object representing the new commit"""
if not skip_hooks:
- run_commit_hook('pre-commit', self)
+ run_commit_hook("pre-commit", self)
self._write_commit_editmsg(message)
- run_commit_hook('commit-msg', self, self._commit_editmsg_filepath())
+ run_commit_hook("commit-msg", self, self._commit_editmsg_filepath())
message = self._read_commit_editmsg()
self._remove_commit_editmsg()
tree = self.write_tree()
- rval = Commit.create_from_tree(self.repo, tree, message, parent_commits,
- head, author=author, committer=committer,
- author_date=author_date, commit_date=commit_date)
+ rval = Commit.create_from_tree(
+ self.repo,
+ tree,
+ message,
+ parent_commits,
+ head,
+ author=author,
+ committer=committer,
+ author_date=author_date,
+ commit_date=commit_date,
+ )
if not skip_hooks:
- run_commit_hook('post-commit', self)
+ run_commit_hook("post-commit", self)
return rval
def _write_commit_editmsg(self, message: str) -> None:
@@ -1010,13 +1110,15 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
def _commit_editmsg_filepath(self) -> str:
return osp.join(self.repo.common_dir, "COMMIT_EDITMSG")
- def _flush_stdin_and_wait(cls, proc: 'Popen[bytes]', ignore_stdout: bool = False) -> bytes:
+ def _flush_stdin_and_wait(
+ cls, proc: "Popen[bytes]", ignore_stdout: bool = False
+ ) -> bytes:
stdin_IO = proc.stdin
if stdin_IO:
stdin_IO.flush()
stdin_IO.close()
- stdout = b''
+ stdout = b""
if not ignore_stdout and proc.stdout:
stdout = proc.stdout.read()
@@ -1025,10 +1127,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
proc.wait()
return stdout
- @ default_index
- def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False,
- fprogress: Callable = lambda *args: None, **kwargs: Any
- ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]:
+ @default_index
+ def checkout(
+ self,
+ paths: Union[None, Iterable[PathLike]] = None,
+ force: bool = False,
+ fprogress: Callable = lambda *args: None,
+ **kwargs: Any
+ ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]:
"""Checkout the given paths or all files from the version known to the index into
the working tree.
@@ -1070,7 +1176,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
the working tree will not be deleted. This behaviour is fundamentally
different to *head.checkout*, i.e. if you want git-checkout like behaviour,
use head.checkout instead of index.checkout.
- """
+ """
args = ["--index"]
if force:
args.append("--force")
@@ -1079,7 +1185,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
failed_reasons = []
unknown_lines = []
- def handle_stderr(proc: 'Popen[bytes]', iter_checked_out_files: Iterable[PathLike]) -> None:
+ def handle_stderr(
+ proc: "Popen[bytes]", iter_checked_out_files: Iterable[PathLike]
+ ) -> None:
stderr_IO = proc.stderr
if not stderr_IO:
@@ -1089,20 +1197,27 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# line contents:
stderr = stderr_bytes.decode(defenc)
# git-checkout-index: this already exists
- endings = (' already exists', ' is not in the cache', ' does not exist at stage', ' is unmerged')
+ endings = (
+ " already exists",
+ " is not in the cache",
+ " does not exist at stage",
+ " is unmerged",
+ )
for line in stderr.splitlines():
- if not line.startswith("git checkout-index: ") and not line.startswith("git-checkout-index: "):
+ if not line.startswith("git checkout-index: ") and not line.startswith(
+ "git-checkout-index: "
+ ):
is_a_dir = " is a directory"
unlink_issue = "unable to unlink old '"
- already_exists_issue = ' already exists, no checkout' # created by entry.c:checkout_entry(...)
+ already_exists_issue = " already exists, no checkout" # created by entry.c:checkout_entry(...)
if line.endswith(is_a_dir):
- failed_files.append(line[:-len(is_a_dir)])
+ failed_files.append(line[: -len(is_a_dir)])
failed_reasons.append(is_a_dir)
elif line.startswith(unlink_issue):
- failed_files.append(line[len(unlink_issue):line.rfind("'")])
+ failed_files.append(line[len(unlink_issue) : line.rfind("'")])
failed_reasons.append(unlink_issue)
elif line.endswith(already_exists_issue):
- failed_files.append(line[:-len(already_exists_issue)])
+ failed_files.append(line[: -len(already_exists_issue)])
failed_reasons.append(already_exists_issue)
else:
unknown_lines.append(line)
@@ -1111,7 +1226,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
for e in endings:
if line.endswith(e):
- failed_files.append(line[20:-len(e)])
+ failed_files.append(line[20 : -len(e)])
failed_reasons.append(e)
break
# END if ending matches
@@ -1123,12 +1238,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
valid_files = list(set(iter_checked_out_files) - set(failed_files))
raise CheckoutError(
"Some files could not be checked out from the index due to local modifications",
- failed_files, valid_files, failed_reasons)
+ failed_files,
+ valid_files,
+ failed_reasons,
+ )
+
# END stderr handler
if paths is None:
args.append("--all")
- kwargs['as_process'] = 1
+ kwargs["as_process"] = 1
fprogress(None, False, None)
proc = self.repo.git.checkout_index(*args, **kwargs)
proc.wait()
@@ -1146,11 +1265,13 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
self.entries
args.append("--stdin")
- kwargs['as_process'] = True
- kwargs['istream'] = subprocess.PIPE
+ kwargs["as_process"] = True
+ kwargs["istream"] = subprocess.PIPE
proc = self.repo.git.checkout_index(args, **kwargs)
# FIXME: Reading from GIL!
- make_exc = lambda: GitCommandError(("git-checkout-index",) + tuple(args), 128, proc.stderr.read())
+ make_exc = lambda: GitCommandError(
+ ("git-checkout-index",) + tuple(args), 128, proc.stderr.read()
+ )
checked_out_files: List[PathLike] = []
for path in paths:
@@ -1162,13 +1283,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
self.entries[(co_path, 0)]
except KeyError:
folder = str(co_path)
- if not folder.endswith('/'):
- folder += '/'
+ if not folder.endswith("/"):
+ folder += "/"
for entry in self.entries.values():
if str(entry.path).startswith(folder):
p = entry.path
- self._write_path_to_stdin(proc, p, p, make_exc,
- fprogress, read_from_stdout=False)
+ self._write_path_to_stdin(
+ proc, p, p, make_exc, fprogress, read_from_stdout=False
+ )
checked_out_files.append(p)
path_is_directory = True
# END if entry is in directory
@@ -1176,8 +1298,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END path exception handlnig
if not path_is_directory:
- self._write_path_to_stdin(proc, co_path, path, make_exc,
- fprogress, read_from_stdout=False)
+ self._write_path_to_stdin(
+ proc, co_path, path, make_exc, fprogress, read_from_stdout=False
+ )
checked_out_files.append(co_path)
# END path is a file
# END for each path
@@ -1187,16 +1310,24 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# Without parsing stdout we don't know what failed.
raise CheckoutError(
"Some files could not be checked out from the index, probably because they didn't exist.",
- failed_files, [], failed_reasons)
+ failed_files,
+ [],
+ failed_reasons,
+ )
handle_stderr(proc, checked_out_files)
return checked_out_files
# END paths handling
- @ default_index
- def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: bool = False,
- paths: Union[None, Iterable[PathLike]] = None,
- head: bool = False, **kwargs: Any) -> 'IndexFile':
+ @default_index
+ def reset(
+ self,
+ commit: Union[Commit, "Reference", str] = "HEAD",
+ working_tree: bool = False,
+ paths: Union[None, Iterable[PathLike]] = None,
+ head: bool = False,
+ **kwargs: Any
+ ) -> "IndexFile":
"""Reset the index to reflect the tree at the given commit. This will not
adjust our HEAD reference as opposed to HEAD.reset by default.
@@ -1228,7 +1359,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
checkout the files according to their state in the index.
If you want git-reset like behaviour, use *HEAD.reset* instead.
- :return: self """
+ :return: self"""
# what we actually want to do is to merge the tree into our existing
# index, which is what git-read-tree does
new_inst = type(self).from_tree(self.repo, commit)
@@ -1244,7 +1375,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
except KeyError:
# if key is not in theirs, it musn't be in ours
try:
- del(self.entries[key])
+ del self.entries[key]
except KeyError:
pass
# END handle deletion keyerror
@@ -1258,17 +1389,23 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# END handle working tree
if head:
- self.repo.head.set_commit(self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit)
+ self.repo.head.set_commit(
+ self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit
+ )
# END handle head change
return self
# @ default_index, breaks typing for some reason, copied into function
- def diff(self, # type: ignore[override]
- other: Union[Type['git_diff.Diffable.Index'], 'Tree', 'Commit', str, None] = git_diff.Diffable.Index,
- paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
- create_patch: bool = False, **kwargs: Any
- ) -> git_diff.DiffIndex:
+ def diff(
+ self, # type: ignore[override]
+ other: Union[
+ Type["git_diff.Diffable.Index"], "Tree", "Commit", str, None
+ ] = git_diff.Diffable.Index,
+ paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None,
+ create_patch: bool = False,
+ **kwargs: Any
+ ) -> git_diff.DiffIndex:
"""Diff this index against the working copy or a Tree or Commit object
For a documentation of the parameters and return values, see,
@@ -1282,7 +1419,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
# only run if we are the default repository index
if self._file_path != self._index_path():
raise AssertionError(
- "Cannot call %r on indices that do not represent the default git index" % self.diff())
+ "Cannot call %r on indices that do not represent the default git index"
+ % self.diff()
+ )
# index against index is always empty
if other is self.Index:
return git_diff.DiffIndex()
@@ -1296,14 +1435,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable):
if isinstance(other, Object): # for Tree or Commit
# invert the existing R flag
- cur_val = kwargs.get('R', False)
- kwargs['R'] = not cur_val
+ cur_val = kwargs.get("R", False)
+ kwargs["R"] = not cur_val
return other.diff(self.Index, paths, create_patch, **kwargs)
# END diff against other item handling
# if other is not None here, something is wrong
if other is not None:
- raise ValueError("other must be None, Diffable.Index, a Tree or Commit, was %r" % other)
+ raise ValueError(
+ "other must be None, Diffable.Index, a Tree or Commit, was %r" % other
+ )
# diff against working copy - can be handled by superclass natively
return super(IndexFile, self).diff(other, paths, create_patch, **kwargs)
diff --git a/git/index/fun.py b/git/index/fun.py
index acab7423..e8dead86 100644
--- a/git/index/fun.py
+++ b/git/index/fun.py
@@ -25,14 +25,11 @@ from git.compat import (
is_win,
safe_decode,
)
-from git.exc import (
- UnmergedEntriesError,
- HookExecutionError
-)
+from git.exc import UnmergedEntriesError, HookExecutionError
from git.objects.fun import (
tree_to_stream,
traverse_tree_recursive,
- traverse_trees_recursive
+ traverse_trees_recursive,
)
from git.util import IndexFileSHA1Writer, finalize_process
from gitdb.base import IStream
@@ -40,20 +37,12 @@ from gitdb.typ import str_tree_type
import os.path as osp
-from .typ import (
- BaseIndexEntry,
- IndexEntry,
- CE_NAMEMASK,
- CE_STAGESHIFT
-)
-from .util import (
- pack,
- unpack
-)
+from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
+from .util import pack, unpack
# typing -----------------------------------------------------------------------------
-from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast)
+from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
from git.types import PathLike
@@ -61,40 +50,49 @@ if TYPE_CHECKING:
from .base import IndexFile
from git.db import GitCmdObjectDB
from git.objects.tree import TreeCacheTup
+
# from git.objects.fun import EntryTupOrNone
# ------------------------------------------------------------------------------------
-S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
+S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule
CE_NAMEMASK_INV = ~CE_NAMEMASK
-__all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key',
- 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path')
+__all__ = (
+ "write_cache",
+ "read_cache",
+ "write_tree_from_cache",
+ "entry_key",
+ "stat_mode_to_index_mode",
+ "S_IFGITLINK",
+ "run_commit_hook",
+ "hook_path",
+)
def hook_path(name: str, git_dir: PathLike) -> str:
""":return: path to the given named hook in the given git repository directory"""
- return osp.join(git_dir, 'hooks', name)
+ return osp.join(git_dir, "hooks", name)
def _has_file_extension(path):
return osp.splitext(path)[1]
-def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
+def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
"""Run the commit hook of the given name. Silently ignores hooks that do not exist.
:param name: name of hook, like 'pre-commit'
:param index: IndexFile instance
:param args: arguments passed to hook file
- :raises HookExecutionError: """
+ :raises HookExecutionError:"""
hp = hook_path(name, index.repo.git_dir)
if not os.access(hp, os.X_OK):
return None
env = os.environ.copy()
- env['GIT_INDEX_FILE'] = safe_decode(str(index.path))
- env['GIT_EDITOR'] = ':'
+ env["GIT_INDEX_FILE"] = safe_decode(str(index.path))
+ env["GIT_EDITOR"] = ":"
cmd = [hp]
try:
if is_win and not _has_file_extension(hp):
@@ -102,22 +100,26 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
# (doesn't understand shebangs). Try using bash to run the hook.
relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
cmd = ["bash.exe", relative_hp]
-
- cmd = subprocess.Popen(cmd + list(args),
- env=env,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- cwd=index.repo.working_dir,
- close_fds=is_posix,
- creationflags=PROC_CREATIONFLAGS,)
+
+ cmd = subprocess.Popen(
+ cmd + list(args),
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ cwd=index.repo.working_dir,
+ close_fds=is_posix,
+ creationflags=PROC_CREATIONFLAGS,
+ )
except Exception as ex:
raise HookExecutionError(hp, ex) from ex
else:
stdout_list: List[str] = []
stderr_list: List[str] = []
- handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process)
- stdout = ''.join(stdout_list)
- stderr = ''.join(stderr_list)
+ handle_process_output(
+ cmd, stdout_list.append, stderr_list.append, finalize_process
+ )
+ stdout = "".join(stdout_list)
+ stderr = "".join(stderr_list)
if cmd.returncode != 0:
stdout = force_text(stdout, defenc)
stderr = force_text(stderr, defenc)
@@ -128,16 +130,21 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None:
def stat_mode_to_index_mode(mode: int) -> int:
"""Convert the given mode from a stat call to the corresponding index mode
and return it"""
- if S_ISLNK(mode): # symlinks
+ if S_ISLNK(mode): # symlinks
return S_IFLNK
- if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
+ if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
return S_IFGITLINK
- return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
+ return S_IFREG | (
+ mode & S_IXUSR and 0o755 or 0o644
+ ) # blobs with or without executable bit
-def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes],
- extension_data: Union[None, bytes] = None,
- ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None:
+def write_cache(
+ entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
+ stream: IO[bytes],
+ extension_data: Union[None, bytes] = None,
+ ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
+) -> None:
"""Write the cache represented by entries to a stream
:param entries: **sorted** list of entries
@@ -163,17 +170,28 @@ def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream:
# body
for entry in entries:
beginoffset = tell()
- write(entry.ctime_bytes) # ctime
- write(entry.mtime_bytes) # mtime
+ write(entry.ctime_bytes) # ctime
+ write(entry.mtime_bytes) # mtime
path_str = str(entry.path)
path: bytes = force_bytes(path_str, encoding=defenc)
- plen = len(path) & CE_NAMEMASK # path length
+ plen = len(path) & CE_NAMEMASK # path length
assert plen == len(path), "Path %s too long to fit into index" % entry.path
- flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values
- write(pack(">LLLLLL20sH", entry.dev, entry.inode, entry.mode,
- entry.uid, entry.gid, entry.size, entry.binsha, flags))
+ flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values
+ write(
+ pack(
+ ">LLLLLL20sH",
+ entry.dev,
+ entry.inode,
+ entry.mode,
+ entry.uid,
+ entry.gid,
+ entry.size,
+ entry.binsha,
+ flags,
+ )
+ )
write(path)
- real_size = ((tell() - beginoffset + 8) & ~7)
+ real_size = (tell() - beginoffset + 8) & ~7
write(b"\0" * ((beginoffset + real_size) - tell()))
# END for each entry
@@ -216,7 +234,9 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i
# END handle entry
-def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]:
+def read_cache(
+ stream: IO[bytes],
+) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
"""Read a cache file from the given stream
:return: tuple(version, entries_dict, extension_data, content_sha)
* version is the integer version number
@@ -225,7 +245,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
* content_sha is a 20 byte sha on all cache file contents"""
version, num_entries = read_header(stream)
count = 0
- entries: Dict[Tuple[PathLike, int], 'IndexEntry'] = {}
+ entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}
read = stream.read
tell = stream.tell
@@ -233,14 +253,17 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
beginoffset = tell()
ctime = unpack(">8s", read(8))[0]
mtime = unpack(">8s", read(8))[0]
- (dev, ino, mode, uid, gid, size, sha, flags) = \
- unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
+ (dev, ino, mode, uid, gid, size, sha, flags) = unpack(
+ ">LLLLLL20sH", read(20 + 4 * 6 + 2)
+ )
path_size = flags & CE_NAMEMASK
path = read(path_size).decode(defenc)
- real_size = ((tell() - beginoffset + 8) & ~7)
+ real_size = (tell() - beginoffset + 8) & ~7
read((beginoffset + real_size) - tell())
- entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size))
+ entry = IndexEntry(
+ (mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)
+ )
# entry_key would be the method to use, but we safe the effort
entries[(path, entry.stage)] = entry
count += 1
@@ -253,19 +276,22 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
# 4 bytes length of chunk
# repeated 0 - N times
extension_data = stream.read(~0)
- assert len(extension_data) > 19, "Index Footer was not at least a sha on content as it was only %i bytes in size"\
- % len(extension_data)
+ assert len(extension_data) > 19, (
+ "Index Footer was not at least a sha on content as it was only %i bytes in size"
+ % len(extension_data)
+ )
content_sha = extension_data[-20:]
# truncate the sha in the end as we will dynamically create it anyway
- extension_data = extension_data[: -20]
+ extension_data = extension_data[:-20]
return (version, entries, extension_data, content_sha)
-def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: slice, si: int = 0
- ) -> Tuple[bytes, List['TreeCacheTup']]:
+def write_tree_from_cache(
+ entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
+) -> Tuple[bytes, List["TreeCacheTup"]]:
"""Create a tree from the given sorted list of entries and put the respective
trees into the given object database
@@ -275,7 +301,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
:param sl: slice indicating the range we should process on the entries list
:return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of
tree entries being a tuple of hexsha, mode, name"""
- tree_items: List['TreeCacheTup'] = []
+ tree_items: List["TreeCacheTup"] = []
ci = sl.start
end = sl.stop
@@ -285,7 +311,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
raise UnmergedEntriesError(entry)
# END abort on unmerged
ci += 1
- rbound = entry.path.find('/', si)
+ rbound = entry.path.find("/", si)
if rbound == -1:
# its not a tree
tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
@@ -295,7 +321,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
xi = ci
while xi < end:
oentry = entries[xi]
- orbound = oentry.path.find('/', si)
+ orbound = oentry.path.find("/", si)
if orbound == -1 or oentry.path[si:orbound] != base:
break
# END abort on base mismatch
@@ -304,7 +330,9 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
# enter recursion
# ci - 1 as we want to count our current item as well
- sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
+ sha, _tree_entry_list = write_tree_from_cache(
+ entries, odb, slice(ci - 1, xi), rbound + 1
+ )
tree_items.append((sha, S_IFDIR, base))
# skip ahead
@@ -314,18 +342,26 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl:
# finally create the tree
sio = BytesIO()
- tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items
+ tree_to_stream(
+ tree_items, sio.write
+ ) # writes to stream as bytes, but doesn't change tree_items
sio.seek(0)
istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
return (istream.binsha, tree_items)
-def _tree_entry_to_baseindexentry(tree_entry: 'TreeCacheTup', stage: int) -> BaseIndexEntry:
- return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
+def _tree_entry_to_baseindexentry(
+ tree_entry: "TreeCacheTup", stage: int
+) -> BaseIndexEntry:
+ return BaseIndexEntry(
+ (tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])
+ )
-def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
+def aggressive_tree_merge(
+ odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]
+) -> List[BaseIndexEntry]:
"""
:return: list of BaseIndexEntries representing the aggressive merge of the given
trees. All valid entries are on stage 0, whereas the conflicting ones are left
@@ -339,7 +375,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) ->
# one and two way is the same for us, as we don't have to handle an existing
# index, instrea
if len(tree_shas) in (1, 2):
- for entry in traverse_tree_recursive(odb, tree_shas[-1], ''):
+ for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
out.append(_tree_entry_to_baseindexentry(entry, 0))
# END for each entry
return out
@@ -349,7 +385,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) ->
raise ValueError("Cannot handle %i trees at once" % len(tree_shas))
# three trees
- for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ''):
+ for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
if base is not None:
# base version exists
if ours is not None:
@@ -358,8 +394,15 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) ->
# it exists in all branches, if it was changed in both
# its a conflict, otherwise we take the changed version
# This should be the most common branch, so it comes first
- if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \
- (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]):
+ if (
+ base[0] != ours[0]
+ and base[0] != theirs[0]
+ and ours[0] != theirs[0]
+ ) or (
+ base[1] != ours[1]
+ and base[1] != theirs[1]
+ and ours[1] != theirs[1]
+ ):
# changed by both
out.append(_tree_entry_to_baseindexentry(base, 1))
out.append(_tree_entry_to_baseindexentry(ours, 2))
diff --git a/git/index/typ.py b/git/index/typ.py
index 46f1b077..cbe26f27 100644
--- a/git/index/typ.py
+++ b/git/index/typ.py
@@ -2,16 +2,13 @@
from binascii import b2a_hex
-from .util import (
- pack,
- unpack
-)
+from .util import pack, unpack
from git.objects import Blob
# typing ----------------------------------------------------------------------
-from typing import (NamedTuple, Sequence, TYPE_CHECKING, Tuple, Union, cast)
+from typing import NamedTuple, Sequence, TYPE_CHECKING, Tuple, Union, cast
from git.types import PathLike
@@ -20,16 +17,16 @@ if TYPE_CHECKING:
# ---------------------------------------------------------------------------------
-__all__ = ('BlobFilter', 'BaseIndexEntry', 'IndexEntry')
+__all__ = ("BlobFilter", "BaseIndexEntry", "IndexEntry")
-#{ Invariants
-CE_NAMEMASK = 0x0fff
+# { Invariants
+CE_NAMEMASK = 0x0FFF
CE_STAGEMASK = 0x3000
CE_EXTENDED = 0x4000
CE_VALID = 0x8000
CE_STAGESHIFT = 12
-#} END invariants
+# } END invariants
class BlobFilter(object):
@@ -40,7 +37,8 @@ class BlobFilter(object):
The given paths are given relative to the repository.
"""
- __slots__ = 'paths'
+
+ __slots__ = "paths"
def __init__(self, paths: Sequence[PathLike]) -> None:
"""
@@ -62,6 +60,7 @@ class BlobFilter(object):
class BaseIndexEntryHelper(NamedTuple):
"""Typed namedtuple to provide named attribute access for BaseIndexEntry.
Needed to allow overriding __new__ in child class to preserve backwards compat."""
+
mode: int
binsha: bytes
flags: int
@@ -85,10 +84,14 @@ class BaseIndexEntry(BaseIndexEntryHelper):
use numeric indices for performance reasons.
"""
- def __new__(cls, inp_tuple: Union[Tuple[int, bytes, int, PathLike],
- Tuple[int, bytes, int, PathLike, bytes, bytes, int, int, int, int, int]]
- ) -> 'BaseIndexEntry':
- """Override __new__ to allow construction from a tuple for backwards compatibility """
+ def __new__(
+ cls,
+ inp_tuple: Union[
+ Tuple[int, bytes, int, PathLike],
+ Tuple[int, bytes, int, PathLike, bytes, bytes, int, int, int, int, int],
+ ],
+ ) -> "BaseIndexEntry":
+ """Override __new__ to allow construction from a tuple for backwards compatibility"""
return super().__new__(cls, *inp_tuple)
def __str__(self) -> str:
@@ -100,7 +103,7 @@ class BaseIndexEntry(BaseIndexEntryHelper):
@property
def hexsha(self) -> str:
"""hex version of our sha"""
- return b2a_hex(self.binsha).decode('ascii')
+ return b2a_hex(self.binsha).decode("ascii")
@property
def stage(self) -> int:
@@ -116,11 +119,11 @@ class BaseIndexEntry(BaseIndexEntryHelper):
return (self.flags & CE_STAGEMASK) >> CE_STAGESHIFT
@classmethod
- def from_blob(cls, blob: Blob, stage: int = 0) -> 'BaseIndexEntry':
+ def from_blob(cls, blob: Blob, stage: int = 0) -> "BaseIndexEntry":
""":return: Fully equipped BaseIndexEntry at the given stage"""
return cls((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path))
- def to_blob(self, repo: 'Repo') -> Blob:
+ def to_blob(self, repo: "Repo") -> Blob:
""":return: Blob using the information of this index entry"""
return Blob(repo, self.binsha, self.mode, self.path)
@@ -132,7 +135,8 @@ class IndexEntry(BaseIndexEntry):
Attributes usully accessed often are cached in the tuple whereas others are
unpacked on demand.
- See the properties for a mapping between names and tuple indices. """
+ See the properties for a mapping between names and tuple indices."""
+
@property
def ctime(self) -> Tuple[int, int]:
"""
@@ -143,11 +147,11 @@ class IndexEntry(BaseIndexEntry):
@property
def mtime(self) -> Tuple[int, int]:
- """See ctime property, but returns modification time """
+ """See ctime property, but returns modification time"""
return cast(Tuple[int, int], unpack(">LL", self.mtime_bytes))
@classmethod
- def from_base(cls, base: 'BaseIndexEntry') -> 'IndexEntry':
+ def from_base(cls, base: "BaseIndexEntry") -> "IndexEntry":
"""
:return:
Minimal entry as created from the given BaseIndexEntry instance.
@@ -155,11 +159,26 @@ class IndexEntry(BaseIndexEntry):
:param base: Instance of type BaseIndexEntry"""
time = pack(">LL", 0, 0)
- return IndexEntry((base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0))
+ return IndexEntry(
+ (base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0)
+ )
@classmethod
- def from_blob(cls, blob: Blob, stage: int = 0) -> 'IndexEntry':
+ def from_blob(cls, blob: Blob, stage: int = 0) -> "IndexEntry":
""":return: Minimal entry resembling the given blob object"""
time = pack(">LL", 0, 0)
- return IndexEntry((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path,
- time, time, 0, 0, 0, 0, blob.size))
+ return IndexEntry(
+ (
+ blob.mode,
+ blob.binsha,
+ stage << CE_STAGESHIFT,
+ blob.path,
+ time,
+ time,
+ 0,
+ 0,
+ 0,
+ 0,
+ blob.size,
+ )
+ )
diff --git a/git/index/util.py b/git/index/util.py
index 4f8af553..7339b147 100644
--- a/git/index/util.py
+++ b/git/index/util.py
@@ -11,7 +11,7 @@ import os.path as osp
# typing ----------------------------------------------------------------------
-from typing import (Any, Callable, TYPE_CHECKING)
+from typing import Any, Callable, TYPE_CHECKING
from git.types import PathLike, _T
@@ -21,24 +21,26 @@ if TYPE_CHECKING:
# ---------------------------------------------------------------------------------
-__all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir')
+__all__ = ("TemporaryFileSwap", "post_clear_cache", "default_index", "git_working_dir")
-#{ Aliases
+# { Aliases
pack = struct.pack
unpack = struct.unpack
-#} END aliases
+# } END aliases
+
class TemporaryFileSwap(object):
"""Utility class moving a file to a temporary location within the same directory
and moving it back on to where on object deletion."""
+
__slots__ = ("file_path", "tmp_file_path")
def __init__(self, file_path: PathLike) -> None:
self.file_path = file_path
- self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '')
+ self.tmp_file_path = str(self.file_path) + tempfile.mktemp("", "", "")
# it may be that the source does not exist
try:
os.rename(self.file_path, self.tmp_file_path)
@@ -53,7 +55,8 @@ class TemporaryFileSwap(object):
# END temp file exists
-#{ Decorators
+# { Decorators
+
def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]:
"""Decorator for functions that alter the index using the git command. This would
@@ -66,10 +69,13 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]:
"""
@wraps(func)
- def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T:
+ def post_clear_cache_if_not_raised(
+ self: "IndexFile", *args: Any, **kwargs: Any
+ ) -> _T:
rval = func(self, *args, **kwargs)
self._delete_entries_cache()
return rval
+
# END wrapper method
return post_clear_cache_if_not_raised
@@ -78,14 +84,17 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]:
def default_index(func: Callable[..., _T]) -> Callable[..., _T]:
"""Decorator assuring the wrapped method may only run if we are the default
repository index. This is as we rely on git commands that operate
- on that index only. """
+ on that index only."""
@wraps(func)
- def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T:
+ def check_default_index(self: "IndexFile", *args: Any, **kwargs: Any) -> _T:
if self._file_path != self._index_path():
raise AssertionError(
- "Cannot call %r on indices that do not represent the default git index" % func.__name__)
+ "Cannot call %r on indices that do not represent the default git index"
+ % func.__name__
+ )
return func(self, *args, **kwargs)
+
# END wrapper method
return check_default_index
@@ -96,7 +105,7 @@ def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]:
repository in order to assure relative paths are handled correctly"""
@wraps(func)
- def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T:
+ def set_git_working_dir(self: "IndexFile", *args: Any, **kwargs: Any) -> _T:
cur_wd = os.getcwd()
os.chdir(str(self.repo.working_tree_dir))
try:
@@ -104,8 +113,10 @@ def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]:
finally:
os.chdir(cur_wd)
# END handle working dir
+
# END wrapper
return set_git_working_dir
-#} END decorators
+
+# } END decorators