diff options
Diffstat (limited to 'git/index')
-rw-r--r-- | git/index/base.py | 479 | ||||
-rw-r--r-- | git/index/fun.py | 185 | ||||
-rw-r--r-- | git/index/typ.py | 67 | ||||
-rw-r--r-- | git/index/util.py | 35 |
4 files changed, 490 insertions, 276 deletions
diff --git a/git/index/base.py b/git/index/base.py index 00e51bf5..48894833 100644 --- a/git/index/base.py +++ b/git/index/base.py @@ -15,12 +15,7 @@ from git.compat import ( force_bytes, defenc, ) -from git.exc import ( - GitCommandError, - CheckoutError, - GitError, - InvalidGitRepositoryError -) +from git.exc import GitCommandError, CheckoutError, GitError, InvalidGitRepositoryError from git.objects import ( Blob, Submodule, @@ -36,7 +31,7 @@ from git.util import ( file_contents_ro, to_native_path_linux, unbare_repo, - to_bin_sha + to_bin_sha, ) from gitdb.base import IStream from gitdb.db import MemoryDB @@ -52,23 +47,32 @@ from .fun import ( write_tree_from_cache, stat_mode_to_index_mode, S_IFGITLINK, - run_commit_hook + run_commit_hook, ) from .typ import ( BaseIndexEntry, IndexEntry, ) -from .util import ( - TemporaryFileSwap, - post_clear_cache, - default_index, - git_working_dir -) +from .util import TemporaryFileSwap, post_clear_cache, default_index, git_working_dir # typing ----------------------------------------------------------------------------- -from typing import (Any, BinaryIO, Callable, Dict, IO, Iterable, Iterator, List, NoReturn, - Sequence, TYPE_CHECKING, Tuple, Type, Union) +from typing import ( + Any, + BinaryIO, + Callable, + Dict, + IO, + Iterable, + Iterator, + List, + NoReturn, + Sequence, + TYPE_CHECKING, + Tuple, + Type, + Union, +) from git.types import Commit_ish, PathLike @@ -85,7 +89,7 @@ Treeish = Union[Tree, Commit, str, bytes] # ------------------------------------------------------------------------------------ -__all__ = ('IndexFile', 'CheckoutError') +__all__ = ("IndexFile", "CheckoutError") class IndexFile(LazyMixin, git_diff.Diffable, Serializable): @@ -110,11 +114,12 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): Make sure you use index.write() once you are done manipulating the index directly before operating on it using the git command""" + __slots__ = ("repo", "version", "entries", "_extension_data", "_file_path") - _VERSION = 2 # latest version we support + _VERSION = 2 # latest version we support S_IFGITLINK = S_IFGITLINK # a submodule - def __init__(self, repo: 'Repo', file_path: Union[PathLike, None] = None) -> None: + def __init__(self, repo: "Repo", file_path: Union[PathLike, None] = None) -> None: """Initialize this Index instance, optionally from the given ``file_path``. If no file_path is given, we will be created from the current index file. @@ -122,7 +127,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): repository's index on demand.""" self.repo = repo self.version = self._VERSION - self._extension_data = b'' + self._extension_data = b"" self._file_path: PathLike = file_path or self._index_path() def _set_cache_(self, attr: str) -> None: @@ -152,40 +157,48 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): @property def path(self) -> PathLike: - """ :return: Path to the index file we are representing """ + """:return: Path to the index file we are representing""" return self._file_path def _delete_entries_cache(self) -> None: """Safely clear the entries cache so it can be recreated""" try: - del(self.entries) + del self.entries except AttributeError: # fails in python 2.6.5 with this exception pass # END exception handling - #{ Serializable Interface + # { Serializable Interface - def _deserialize(self, stream: IO) -> 'IndexFile': + def _deserialize(self, stream: IO) -> "IndexFile": """Initialize this instance with index values read from the given stream""" - self.version, self.entries, self._extension_data, _conten_sha = read_cache(stream) + self.version, self.entries, self._extension_data, _conten_sha = read_cache( + stream + ) return self def _entries_sorted(self) -> List[IndexEntry]: """:return: list of entries, in a sorted fashion, first by path, then by stage""" return sorted(self.entries.values(), key=lambda e: (e.path, e.stage)) - def _serialize(self, stream: IO, ignore_extension_data: bool = False) -> 'IndexFile': + def _serialize( + self, stream: IO, ignore_extension_data: bool = False + ) -> "IndexFile": entries = self._entries_sorted() - extension_data = self._extension_data # type: Union[None, bytes] + extension_data = self._extension_data # type: Union[None, bytes] if ignore_extension_data: extension_data = None write_cache(entries, stream, extension_data) return self - #} END serializable interface + # } END serializable interface - def write(self, file_path: Union[None, PathLike] = None, ignore_extension_data: bool = False) -> None: + def write( + self, + file_path: Union[None, PathLike] = None, + ignore_extension_data: bool = False, + ) -> None: """Write the current state to our file path or to the given one :param file_path: @@ -229,7 +242,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): @post_clear_cache @default_index - def merge_tree(self, rhs: Treeish, base: Union[None, Treeish] = None) -> 'IndexFile': + def merge_tree( + self, rhs: Treeish, base: Union[None, Treeish] = None + ) -> "IndexFile": """Merge the given rhs treeish into the current index, possibly taking a common base treeish into account. @@ -252,7 +267,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): be raised at the first conflicting path. If you want to have proper merge resolution to be done by yourself, you have to commit the changed index ( or make a valid tree from it ) and retry with a three-way - index.from_tree call. """ + index.from_tree call.""" # -i : ignore working tree status # --aggressive : handle more merge cases # -m : do an actual merge @@ -265,8 +280,8 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): return self @classmethod - def new(cls, repo: 'Repo', *tree_sha: Union[str, Tree]) -> 'IndexFile': - """ Merge the given treeish revisions into a new index which is returned. + def new(cls, repo: "Repo", *tree_sha: Union[str, Tree]) -> "IndexFile": + """Merge the given treeish revisions into a new index which is returned. This method behaves like git-read-tree --aggressive when doing the merge. :param repo: The repository treeish are located in. @@ -283,15 +298,18 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): inst = cls(repo) # convert to entries dict - entries: Dict[Tuple[PathLike, int], IndexEntry] = dict(zip( - ((e.path, e.stage) for e in base_entries), - (IndexEntry.from_base(e) for e in base_entries))) + entries: Dict[Tuple[PathLike, int], IndexEntry] = dict( + zip( + ((e.path, e.stage) for e in base_entries), + (IndexEntry.from_base(e) for e in base_entries), + ) + ) inst.entries = entries return inst @classmethod - def from_tree(cls, repo: 'Repo', *treeish: Treeish, **kwargs: Any) -> 'IndexFile': + def from_tree(cls, repo: "Repo", *treeish: Treeish, **kwargs: Any) -> "IndexFile": """Merge the given treeish revisions into a new index which is returned. The original index will remain unaltered @@ -326,7 +344,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): it will be temporarily moved out of the way to assure there are no unsuspected interferences.""" if len(treeish) == 0 or len(treeish) > 3: - raise ValueError("Please specify between 1 and 3 treeish, got %i" % len(treeish)) + raise ValueError( + "Please specify between 1 and 3 treeish, got %i" % len(treeish) + ) arg_list: List[Union[Treeish, str]] = [] # ignore that working tree and index possibly are out of date @@ -339,7 +359,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # tmp file created in git home directory to be sure renaming # works - /tmp/ dirs could be on another device - tmp_index = tempfile.mktemp('', '', repo.git_dir) + tmp_index = tempfile.mktemp("", "", repo.git_dir) arg_list.append("--index-output=%s" % tmp_index) arg_list.extend(treeish) @@ -348,12 +368,12 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # Unfortunately there is no 'soft' way to do it. # The TemporaryFileSwap assure the original file get put back if repo.git_dir: - index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, 'index')) + index_handler = TemporaryFileSwap(join_path_native(repo.git_dir, "index")) try: repo.git.read_tree(*arg_list, **kwargs) index = cls(repo, tmp_index) - index.entries # force it to read the file as we will delete the temp-file - del(index_handler) # release as soon as possible + index.entries # force it to read the file as we will delete the temp-file + del index_handler # release as soon as possible finally: if osp.exists(tmp_index): os.remove(tmp_index) @@ -363,14 +383,18 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # UTILITIES @unbare_repo - def _iter_expand_paths(self: 'IndexFile', paths: Sequence[PathLike]) -> Iterator[PathLike]: + def _iter_expand_paths( + self: "IndexFile", paths: Sequence[PathLike] + ) -> Iterator[PathLike]: """Expand the directories in list of paths to the corresponding paths accordingly, Note: git will add items multiple times even if a glob overlapped with manually specified paths or if paths where specified multiple times - we respect that and do not prune""" + def raise_exc(e: Exception) -> NoReturn: raise e + r = str(self.repo.working_tree_dir) rs = r + os.sep for path in paths: @@ -380,18 +404,20 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END make absolute path try: - st = os.lstat(abs_path) # handles non-symlinks as well + st = os.lstat(abs_path) # handles non-symlinks as well except OSError: # the lstat call may fail as the path may contain globs as well pass else: if S_ISLNK(st.st_mode): - yield abs_path.replace(rs, '') + yield abs_path.replace(rs, "") continue # end check symlink # if the path is not already pointing to an existing file, resolve globs if possible - if not os.path.exists(abs_path) and ('?' in abs_path or '*' in abs_path or '[' in abs_path): + if not os.path.exists(abs_path) and ( + "?" in abs_path or "*" in abs_path or "[" in abs_path + ): resolved_paths = glob.glob(abs_path) # not abs_path in resolved_paths: # a glob() resolving to the same path we are feeding it with @@ -401,25 +427,31 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # whose name contains wildcard characters. if abs_path not in resolved_paths: for f in self._iter_expand_paths(glob.glob(abs_path)): - yield str(f).replace(rs, '') + yield str(f).replace(rs, "") continue # END glob handling try: for root, _dirs, files in os.walk(abs_path, onerror=raise_exc): for rela_file in files: # add relative paths only - yield osp.join(root.replace(rs, ''), rela_file) + yield osp.join(root.replace(rs, ""), rela_file) # END for each file in subdir # END for each subdirectory except OSError: # was a file or something that could not be iterated - yield abs_path.replace(rs, '') + yield abs_path.replace(rs, "") # END path exception handling # END for each path - def _write_path_to_stdin(self, proc: 'Popen', filepath: PathLike, item: PathLike, fmakeexc: Callable[..., GitError], - fprogress: Callable[[PathLike, bool, PathLike], None], - read_from_stdout: bool = True) -> Union[None, str]: + def _write_path_to_stdin( + self, + proc: "Popen", + filepath: PathLike, + item: PathLike, + fmakeexc: Callable[..., GitError], + fprogress: Callable[[PathLike, bool, PathLike], None], + read_from_stdout: bool = True, + ) -> Union[None, str]: """Write path to proc.stdin and make sure it processes the item, including progress. :return: stdout string @@ -451,15 +483,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): fprogress(filepath, True, item) return rval - def iter_blobs(self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True - ) -> Iterator[Tuple[StageType, Blob]]: + def iter_blobs( + self, predicate: Callable[[Tuple[StageType, Blob]], bool] = lambda t: True + ) -> Iterator[Tuple[StageType, Blob]]: """ :return: Iterator yielding tuples of Blob objects and stages, tuple(stage, Blob) :param predicate: Function(t) returning True if tuple(stage, Blob) should be yielded by the iterator. A default filter, the BlobFilter, allows you to yield blobs - only if they match a given list of paths. """ + only if they match a given list of paths.""" for entry in self.entries.values(): blob = entry.to_blob(self.repo) blob.size = entry.size @@ -491,11 +524,13 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): return path_map - @ classmethod - def entry_key(cls, *entry: Union[BaseIndexEntry, PathLike, StageType]) -> Tuple[PathLike, StageType]: + @classmethod + def entry_key( + cls, *entry: Union[BaseIndexEntry, PathLike, StageType] + ) -> Tuple[PathLike, StageType]: return entry_key(*entry) - def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> 'IndexFile': + def resolve_blobs(self, iter_blobs: Iterator[Blob]) -> "IndexFile": """Resolve the blobs given in blob iterator. This will effectively remove the index entries of the respective path at all non-null stages and add the given blob as new stage null blob. @@ -519,7 +554,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # delete all possible stages for stage in (1, 2, 3): try: - del(self.entries[(blob.path, stage)]) + del self.entries[(blob.path, stage)] except KeyError: pass # END ignore key errors @@ -530,7 +565,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): return self - def update(self) -> 'IndexFile': + def update(self) -> "IndexFile": """Reread the contents of our index file, discarding all cached information we might have. @@ -550,7 +585,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): does not yet exist in the object database. This could happen if you added Entries to the index directly. :raise ValueError: if there are no entries in the cache - :raise UnmergedEntriesError: """ + :raise UnmergedEntriesError:""" # we obtain no lock as we just flush our contents to disk as tree # If we are a new index, the entries access will load our data accordingly mdb = MemoryDB() @@ -562,13 +597,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # note: additional deserialization could be saved if write_tree_from_cache # would return sorted tree entries - root_tree = Tree(self.repo, binsha, path='') + root_tree = Tree(self.repo, binsha, path="") root_tree._cache = tree_items return root_tree - def _process_diff_args(self, # type: ignore[override] - args: List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]] - ) -> List[Union[str, 'git_diff.Diffable', Type['git_diff.Diffable.Index']]]: + def _process_diff_args( + self, # type: ignore[override] + args: List[Union[str, "git_diff.Diffable", Type["git_diff.Diffable.Index"]]], + ) -> List[Union[str, "git_diff.Diffable", Type["git_diff.Diffable.Index"]]]: try: args.pop(args.index(self)) except IndexError: @@ -585,12 +621,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): if self.repo.bare: raise InvalidGitRepositoryError("require non-bare repository") if not str(path).startswith(str(self.repo.working_tree_dir)): - raise ValueError("Absolute path %r is not in git repository at %r" % (path, self.repo.working_tree_dir)) + raise ValueError( + "Absolute path %r is not in git repository at %r" + % (path, self.repo.working_tree_dir) + ) return os.path.relpath(path, self.repo.working_tree_dir) - def _preprocess_add_items(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']] - ) -> Tuple[List[PathLike], List[BaseIndexEntry]]: - """ Split the items into two lists of path strings and BaseEntries. """ + def _preprocess_add_items( + self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]] + ) -> Tuple[List[PathLike], List[BaseIndexEntry]]: + """Split the items into two lists of path strings and BaseEntries.""" paths = [] entries = [] # if it is a string put in list @@ -612,43 +652,58 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry: """Store file at filepath in the database and return the base index entry Needs the git_working_dir decorator active ! This must be assured in the calling code""" - st = os.lstat(filepath) # handles non-symlinks as well + st = os.lstat(filepath) # handles non-symlinks as well if S_ISLNK(st.st_mode): # in PY3, readlink is string, but we need bytes. In PY2, it's just OS encoded bytes, we assume UTF-8 - open_stream: Callable[[], BinaryIO] = lambda: BytesIO(force_bytes(os.readlink(filepath), - encoding=defenc)) + open_stream: Callable[[], BinaryIO] = lambda: BytesIO( + force_bytes(os.readlink(filepath), encoding=defenc) + ) else: - open_stream = lambda: open(filepath, 'rb') + open_stream = lambda: open(filepath, "rb") with open_stream() as stream: fprogress(filepath, False, filepath) istream = self.repo.odb.store(IStream(Blob.type, st.st_size, stream)) fprogress(filepath, True, filepath) - return BaseIndexEntry((stat_mode_to_index_mode(st.st_mode), - istream.binsha, 0, to_native_path_linux(filepath))) + return BaseIndexEntry( + ( + stat_mode_to_index_mode(st.st_mode), + istream.binsha, + 0, + to_native_path_linux(filepath), + ) + ) - @ unbare_repo - @ git_working_dir - def _entries_for_paths(self, paths: List[str], path_rewriter: Callable, fprogress: Callable, - entries: List[BaseIndexEntry]) -> List[BaseIndexEntry]: + @unbare_repo + @git_working_dir + def _entries_for_paths( + self, + paths: List[str], + path_rewriter: Callable, + fprogress: Callable, + entries: List[BaseIndexEntry], + ) -> List[BaseIndexEntry]: entries_added: List[BaseIndexEntry] = [] if path_rewriter: for path in paths: if osp.isabs(path): abspath = path - gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1:] + gitrelative_path = path[len(str(self.repo.working_tree_dir)) + 1 :] else: gitrelative_path = path if self.repo.working_tree_dir: abspath = osp.join(self.repo.working_tree_dir, gitrelative_path) # end obtain relative and absolute paths - blob = Blob(self.repo, Blob.NULL_BIN_SHA, - stat_mode_to_index_mode(os.stat(abspath).st_mode), - to_native_path_linux(gitrelative_path)) + blob = Blob( + self.repo, + Blob.NULL_BIN_SHA, + stat_mode_to_index_mode(os.stat(abspath).st_mode), + to_native_path_linux(gitrelative_path), + ) # TODO: variable undefined entries.append(BaseIndexEntry.from_blob(blob)) # END for each path - del(paths[:]) + del paths[:] # END rewrite paths # HANDLE PATHS @@ -659,9 +714,15 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END path handling return entries_added - def add(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], force: bool = True, - fprogress: Callable = lambda *args: None, path_rewriter: Union[Callable[..., PathLike], None] = None, - write: bool = True, write_extension_data: bool = False) -> List[BaseIndexEntry]: + def add( + self, + items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]], + force: bool = True, + fprogress: Callable = lambda *args: None, + path_rewriter: Union[Callable[..., PathLike], None] = None, + write: bool = True, + write_extension_data: bool = False, + ) -> List[BaseIndexEntry]: """Add files from the working tree, specific blobs or BaseIndexEntries to the index. @@ -769,30 +830,43 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # That way, we are OK on a bare repository as well. # If there are no paths, the rewriter has nothing to do either if paths: - entries_added.extend(self._entries_for_paths(paths, path_rewriter, fprogress, entries)) + entries_added.extend( + self._entries_for_paths(paths, path_rewriter, fprogress, entries) + ) # HANDLE ENTRIES if entries: null_mode_entries = [e for e in entries if e.mode == 0] if null_mode_entries: raise ValueError( - "At least one Entry has a null-mode - please use index.remove to remove files for clarity") + "At least one Entry has a null-mode - please use index.remove to remove files for clarity" + ) # END null mode should be remove # HANDLE ENTRY OBJECT CREATION # create objects if required, otherwise go with the existing shas - null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA] + null_entries_indices = [ + i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA + ] if null_entries_indices: - @ git_working_dir - def handle_null_entries(self: 'IndexFile') -> None: + + @git_working_dir + def handle_null_entries(self: "IndexFile") -> None: for ei in null_entries_indices: null_entry = entries[ei] new_entry = self._store_path(null_entry.path, fprogress) # update null entry entries[ei] = BaseIndexEntry( - (null_entry.mode, new_entry.binsha, null_entry.stage, null_entry.path)) + ( + null_entry.mode, + new_entry.binsha, + null_entry.stage, + null_entry.path, + ) + ) # END for each entry index + # end closure handle_null_entries(self) # END null_entry handling @@ -802,7 +876,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # all object sha's if path_rewriter: for i, e in enumerate(entries): - entries[i] = BaseIndexEntry((e.mode, e.binsha, e.stage, path_rewriter(e))) + entries[i] = BaseIndexEntry( + (e.mode, e.binsha, e.stage, path_rewriter(e)) + ) # END for each entry # END handle path rewriting @@ -828,8 +904,12 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): return entries_added - def _items_to_rela_paths(self, items: Union[PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]]] - ) -> List[PathLike]: + def _items_to_rela_paths( + self, + items: Union[ + PathLike, Sequence[Union[PathLike, BaseIndexEntry, Blob, Submodule]] + ], + ) -> List[PathLike]: """Returns a list of repo-relative paths from the given items which may be absolute or relative paths, entries or blobs""" paths = [] @@ -847,10 +927,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END for each item return paths - @ post_clear_cache - @ default_index - def remove(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], working_tree: bool = False, - **kwargs: Any) -> List[str]: + @post_clear_cache + @default_index + def remove( + self, + items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]], + working_tree: bool = False, + **kwargs: Any + ) -> List[str]: """Remove the given items from the index and optionally from the working tree as well. @@ -885,7 +969,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): List(path_string, ...) list of repository relative paths that have been removed effectively. This is interesting to know in case you have provided a directory or - globs. Paths are relative to the repository. """ + globs. Paths are relative to the repository.""" args = [] if not working_tree: args.append("--cached") @@ -899,10 +983,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # rm 'path' return [p[4:-1] for p in removed_paths] - @ post_clear_cache - @ default_index - def move(self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, 'Submodule']], skip_errors: bool = False, - **kwargs: Any) -> List[Tuple[str, str]]: + @post_clear_cache + @default_index + def move( + self, + items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]], + skip_errors: bool = False, + **kwargs: Any + ) -> List[Tuple[str, str]]: """Rename/move the items, whereas the last item is considered the destination of the move operation. If the destination is a file, the first item ( of two ) must be a file as well. If the destination is a directory, it may be preceded @@ -928,14 +1016,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): GitCommandError: If git could not handle your request""" args = [] if skip_errors: - args.append('-k') + args.append("-k") paths = self._items_to_rela_paths(items) if len(paths) < 2: - raise ValueError("Please provide at least one source and one destination of the move operation") + raise ValueError( + "Please provide at least one source and one destination of the move operation" + ) - was_dry_run = kwargs.pop('dry_run', kwargs.pop('n', None)) - kwargs['dry_run'] = True + was_dry_run = kwargs.pop("dry_run", kwargs.pop("n", None)) + kwargs["dry_run"] = True # first execute rename in dryrun so the command tells us what it actually does # ( for later output ) @@ -945,7 +1035,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # parse result - first 0:n/2 lines are 'checking ', the remaining ones # are the 'renaming' ones which we parse for ln in range(int(len(mvlines) / 2), len(mvlines)): - tokens = mvlines[ln].split(' to ') + tokens = mvlines[ln].split(" to ") assert len(tokens) == 2, "Too many tokens in %s" % mvlines[ln] # [0] = Renaming x @@ -959,20 +1049,22 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END handle dryrun # now apply the actual operation - kwargs.pop('dry_run') + kwargs.pop("dry_run") self.repo.git.mv(args, paths, **kwargs) return out - def commit(self, - message: str, - parent_commits: Union[Commit_ish, None] = None, - head: bool = True, - author: Union[None, 'Actor'] = None, - committer: Union[None, 'Actor'] = None, - author_date: Union[str, None] = None, - commit_date: Union[str, None] = None, - skip_hooks: bool = False) -> Commit: + def commit( + self, + message: str, + parent_commits: Union[Commit_ish, None] = None, + head: bool = True, + author: Union[None, "Actor"] = None, + committer: Union[None, "Actor"] = None, + author_date: Union[str, None] = None, + commit_date: Union[str, None] = None, + skip_hooks: bool = False, + ) -> Commit: """Commit the current default index file, creating a commit object. For more information on the arguments, see Commit.create_from_tree(). @@ -982,18 +1074,26 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): or `--no-verify` on the command line. :return: Commit object representing the new commit""" if not skip_hooks: - run_commit_hook('pre-commit', self) + run_commit_hook("pre-commit", self) self._write_commit_editmsg(message) - run_commit_hook('commit-msg', self, self._commit_editmsg_filepath()) + run_commit_hook("commit-msg", self, self._commit_editmsg_filepath()) message = self._read_commit_editmsg() self._remove_commit_editmsg() tree = self.write_tree() - rval = Commit.create_from_tree(self.repo, tree, message, parent_commits, - head, author=author, committer=committer, - author_date=author_date, commit_date=commit_date) + rval = Commit.create_from_tree( + self.repo, + tree, + message, + parent_commits, + head, + author=author, + committer=committer, + author_date=author_date, + commit_date=commit_date, + ) if not skip_hooks: - run_commit_hook('post-commit', self) + run_commit_hook("post-commit", self) return rval def _write_commit_editmsg(self, message: str) -> None: @@ -1010,13 +1110,15 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): def _commit_editmsg_filepath(self) -> str: return osp.join(self.repo.common_dir, "COMMIT_EDITMSG") - def _flush_stdin_and_wait(cls, proc: 'Popen[bytes]', ignore_stdout: bool = False) -> bytes: + def _flush_stdin_and_wait( + cls, proc: "Popen[bytes]", ignore_stdout: bool = False + ) -> bytes: stdin_IO = proc.stdin if stdin_IO: stdin_IO.flush() stdin_IO.close() - stdout = b'' + stdout = b"" if not ignore_stdout and proc.stdout: stdout = proc.stdout.read() @@ -1025,10 +1127,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): proc.wait() return stdout - @ default_index - def checkout(self, paths: Union[None, Iterable[PathLike]] = None, force: bool = False, - fprogress: Callable = lambda *args: None, **kwargs: Any - ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]: + @default_index + def checkout( + self, + paths: Union[None, Iterable[PathLike]] = None, + force: bool = False, + fprogress: Callable = lambda *args: None, + **kwargs: Any + ) -> Union[None, Iterator[PathLike], Sequence[PathLike]]: """Checkout the given paths or all files from the version known to the index into the working tree. @@ -1070,7 +1176,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): the working tree will not be deleted. This behaviour is fundamentally different to *head.checkout*, i.e. if you want git-checkout like behaviour, use head.checkout instead of index.checkout. - """ + """ args = ["--index"] if force: args.append("--force") @@ -1079,7 +1185,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): failed_reasons = [] unknown_lines = [] - def handle_stderr(proc: 'Popen[bytes]', iter_checked_out_files: Iterable[PathLike]) -> None: + def handle_stderr( + proc: "Popen[bytes]", iter_checked_out_files: Iterable[PathLike] + ) -> None: stderr_IO = proc.stderr if not stderr_IO: @@ -1089,20 +1197,27 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # line contents: stderr = stderr_bytes.decode(defenc) # git-checkout-index: this already exists - endings = (' already exists', ' is not in the cache', ' does not exist at stage', ' is unmerged') + endings = ( + " already exists", + " is not in the cache", + " does not exist at stage", + " is unmerged", + ) for line in stderr.splitlines(): - if not line.startswith("git checkout-index: ") and not line.startswith("git-checkout-index: "): + if not line.startswith("git checkout-index: ") and not line.startswith( + "git-checkout-index: " + ): is_a_dir = " is a directory" unlink_issue = "unable to unlink old '" - already_exists_issue = ' already exists, no checkout' # created by entry.c:checkout_entry(...) + already_exists_issue = " already exists, no checkout" # created by entry.c:checkout_entry(...) if line.endswith(is_a_dir): - failed_files.append(line[:-len(is_a_dir)]) + failed_files.append(line[: -len(is_a_dir)]) failed_reasons.append(is_a_dir) elif line.startswith(unlink_issue): - failed_files.append(line[len(unlink_issue):line.rfind("'")]) + failed_files.append(line[len(unlink_issue) : line.rfind("'")]) failed_reasons.append(unlink_issue) elif line.endswith(already_exists_issue): - failed_files.append(line[:-len(already_exists_issue)]) + failed_files.append(line[: -len(already_exists_issue)]) failed_reasons.append(already_exists_issue) else: unknown_lines.append(line) @@ -1111,7 +1226,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): for e in endings: if line.endswith(e): - failed_files.append(line[20:-len(e)]) + failed_files.append(line[20 : -len(e)]) failed_reasons.append(e) break # END if ending matches @@ -1123,12 +1238,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): valid_files = list(set(iter_checked_out_files) - set(failed_files)) raise CheckoutError( "Some files could not be checked out from the index due to local modifications", - failed_files, valid_files, failed_reasons) + failed_files, + valid_files, + failed_reasons, + ) + # END stderr handler if paths is None: args.append("--all") - kwargs['as_process'] = 1 + kwargs["as_process"] = 1 fprogress(None, False, None) proc = self.repo.git.checkout_index(*args, **kwargs) proc.wait() @@ -1146,11 +1265,13 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): self.entries args.append("--stdin") - kwargs['as_process'] = True - kwargs['istream'] = subprocess.PIPE + kwargs["as_process"] = True + kwargs["istream"] = subprocess.PIPE proc = self.repo.git.checkout_index(args, **kwargs) # FIXME: Reading from GIL! - make_exc = lambda: GitCommandError(("git-checkout-index",) + tuple(args), 128, proc.stderr.read()) + make_exc = lambda: GitCommandError( + ("git-checkout-index",) + tuple(args), 128, proc.stderr.read() + ) checked_out_files: List[PathLike] = [] for path in paths: @@ -1162,13 +1283,14 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): self.entries[(co_path, 0)] except KeyError: folder = str(co_path) - if not folder.endswith('/'): - folder += '/' + if not folder.endswith("/"): + folder += "/" for entry in self.entries.values(): if str(entry.path).startswith(folder): p = entry.path - self._write_path_to_stdin(proc, p, p, make_exc, - fprogress, read_from_stdout=False) + self._write_path_to_stdin( + proc, p, p, make_exc, fprogress, read_from_stdout=False + ) checked_out_files.append(p) path_is_directory = True # END if entry is in directory @@ -1176,8 +1298,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END path exception handlnig if not path_is_directory: - self._write_path_to_stdin(proc, co_path, path, make_exc, - fprogress, read_from_stdout=False) + self._write_path_to_stdin( + proc, co_path, path, make_exc, fprogress, read_from_stdout=False + ) checked_out_files.append(co_path) # END path is a file # END for each path @@ -1187,16 +1310,24 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # Without parsing stdout we don't know what failed. raise CheckoutError( "Some files could not be checked out from the index, probably because they didn't exist.", - failed_files, [], failed_reasons) + failed_files, + [], + failed_reasons, + ) handle_stderr(proc, checked_out_files) return checked_out_files # END paths handling - @ default_index - def reset(self, commit: Union[Commit, 'Reference', str] = 'HEAD', working_tree: bool = False, - paths: Union[None, Iterable[PathLike]] = None, - head: bool = False, **kwargs: Any) -> 'IndexFile': + @default_index + def reset( + self, + commit: Union[Commit, "Reference", str] = "HEAD", + working_tree: bool = False, + paths: Union[None, Iterable[PathLike]] = None, + head: bool = False, + **kwargs: Any + ) -> "IndexFile": """Reset the index to reflect the tree at the given commit. This will not adjust our HEAD reference as opposed to HEAD.reset by default. @@ -1228,7 +1359,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): checkout the files according to their state in the index. If you want git-reset like behaviour, use *HEAD.reset* instead. - :return: self """ + :return: self""" # what we actually want to do is to merge the tree into our existing # index, which is what git-read-tree does new_inst = type(self).from_tree(self.repo, commit) @@ -1244,7 +1375,7 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): except KeyError: # if key is not in theirs, it musn't be in ours try: - del(self.entries[key]) + del self.entries[key] except KeyError: pass # END handle deletion keyerror @@ -1258,17 +1389,23 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # END handle working tree if head: - self.repo.head.set_commit(self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit) + self.repo.head.set_commit( + self.repo.commit(commit), logmsg="%s: Updating HEAD" % commit + ) # END handle head change return self # @ default_index, breaks typing for some reason, copied into function - def diff(self, # type: ignore[override] - other: Union[Type['git_diff.Diffable.Index'], 'Tree', 'Commit', str, None] = git_diff.Diffable.Index, - paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, - create_patch: bool = False, **kwargs: Any - ) -> git_diff.DiffIndex: + def diff( + self, # type: ignore[override] + other: Union[ + Type["git_diff.Diffable.Index"], "Tree", "Commit", str, None + ] = git_diff.Diffable.Index, + paths: Union[PathLike, List[PathLike], Tuple[PathLike, ...], None] = None, + create_patch: bool = False, + **kwargs: Any + ) -> git_diff.DiffIndex: """Diff this index against the working copy or a Tree or Commit object For a documentation of the parameters and return values, see, @@ -1282,7 +1419,9 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): # only run if we are the default repository index if self._file_path != self._index_path(): raise AssertionError( - "Cannot call %r on indices that do not represent the default git index" % self.diff()) + "Cannot call %r on indices that do not represent the default git index" + % self.diff() + ) # index against index is always empty if other is self.Index: return git_diff.DiffIndex() @@ -1296,14 +1435,16 @@ class IndexFile(LazyMixin, git_diff.Diffable, Serializable): if isinstance(other, Object): # for Tree or Commit # invert the existing R flag - cur_val = kwargs.get('R', False) - kwargs['R'] = not cur_val + cur_val = kwargs.get("R", False) + kwargs["R"] = not cur_val return other.diff(self.Index, paths, create_patch, **kwargs) # END diff against other item handling # if other is not None here, something is wrong if other is not None: - raise ValueError("other must be None, Diffable.Index, a Tree or Commit, was %r" % other) + raise ValueError( + "other must be None, Diffable.Index, a Tree or Commit, was %r" % other + ) # diff against working copy - can be handled by superclass natively return super(IndexFile, self).diff(other, paths, create_patch, **kwargs) diff --git a/git/index/fun.py b/git/index/fun.py index acab7423..e8dead86 100644 --- a/git/index/fun.py +++ b/git/index/fun.py @@ -25,14 +25,11 @@ from git.compat import ( is_win, safe_decode, ) -from git.exc import ( - UnmergedEntriesError, - HookExecutionError -) +from git.exc import UnmergedEntriesError, HookExecutionError from git.objects.fun import ( tree_to_stream, traverse_tree_recursive, - traverse_trees_recursive + traverse_trees_recursive, ) from git.util import IndexFileSHA1Writer, finalize_process from gitdb.base import IStream @@ -40,20 +37,12 @@ from gitdb.typ import str_tree_type import os.path as osp -from .typ import ( - BaseIndexEntry, - IndexEntry, - CE_NAMEMASK, - CE_STAGESHIFT -) -from .util import ( - pack, - unpack -) +from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT +from .util import pack, unpack # typing ----------------------------------------------------------------------------- -from typing import (Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast) +from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast from git.types import PathLike @@ -61,40 +50,49 @@ if TYPE_CHECKING: from .base import IndexFile from git.db import GitCmdObjectDB from git.objects.tree import TreeCacheTup + # from git.objects.fun import EntryTupOrNone # ------------------------------------------------------------------------------------ -S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule +S_IFGITLINK = S_IFLNK | S_IFDIR # a submodule CE_NAMEMASK_INV = ~CE_NAMEMASK -__all__ = ('write_cache', 'read_cache', 'write_tree_from_cache', 'entry_key', - 'stat_mode_to_index_mode', 'S_IFGITLINK', 'run_commit_hook', 'hook_path') +__all__ = ( + "write_cache", + "read_cache", + "write_tree_from_cache", + "entry_key", + "stat_mode_to_index_mode", + "S_IFGITLINK", + "run_commit_hook", + "hook_path", +) def hook_path(name: str, git_dir: PathLike) -> str: """:return: path to the given named hook in the given git repository directory""" - return osp.join(git_dir, 'hooks', name) + return osp.join(git_dir, "hooks", name) def _has_file_extension(path): return osp.splitext(path)[1] -def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: +def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: """Run the commit hook of the given name. Silently ignores hooks that do not exist. :param name: name of hook, like 'pre-commit' :param index: IndexFile instance :param args: arguments passed to hook file - :raises HookExecutionError: """ + :raises HookExecutionError:""" hp = hook_path(name, index.repo.git_dir) if not os.access(hp, os.X_OK): return None env = os.environ.copy() - env['GIT_INDEX_FILE'] = safe_decode(str(index.path)) - env['GIT_EDITOR'] = ':' + env["GIT_INDEX_FILE"] = safe_decode(str(index.path)) + env["GIT_EDITOR"] = ":" cmd = [hp] try: if is_win and not _has_file_extension(hp): @@ -102,22 +100,26 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: # (doesn't understand shebangs). Try using bash to run the hook. relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() cmd = ["bash.exe", relative_hp] - - cmd = subprocess.Popen(cmd + list(args), - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=index.repo.working_dir, - close_fds=is_posix, - creationflags=PROC_CREATIONFLAGS,) + + cmd = subprocess.Popen( + cmd + list(args), + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=index.repo.working_dir, + close_fds=is_posix, + creationflags=PROC_CREATIONFLAGS, + ) except Exception as ex: raise HookExecutionError(hp, ex) from ex else: stdout_list: List[str] = [] stderr_list: List[str] = [] - handle_process_output(cmd, stdout_list.append, stderr_list.append, finalize_process) - stdout = ''.join(stdout_list) - stderr = ''.join(stderr_list) + handle_process_output( + cmd, stdout_list.append, stderr_list.append, finalize_process + ) + stdout = "".join(stdout_list) + stderr = "".join(stderr_list) if cmd.returncode != 0: stdout = force_text(stdout, defenc) stderr = force_text(stderr, defenc) @@ -128,16 +130,21 @@ def run_commit_hook(name: str, index: 'IndexFile', *args: str) -> None: def stat_mode_to_index_mode(mode: int) -> int: """Convert the given mode from a stat call to the corresponding index mode and return it""" - if S_ISLNK(mode): # symlinks + if S_ISLNK(mode): # symlinks return S_IFLNK - if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules + if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules return S_IFGITLINK - return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit + return S_IFREG | ( + mode & S_IXUSR and 0o755 or 0o644 + ) # blobs with or without executable bit -def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: IO[bytes], - extension_data: Union[None, bytes] = None, - ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer) -> None: +def write_cache( + entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]], + stream: IO[bytes], + extension_data: Union[None, bytes] = None, + ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer, +) -> None: """Write the cache represented by entries to a stream :param entries: **sorted** list of entries @@ -163,17 +170,28 @@ def write_cache(entries: Sequence[Union[BaseIndexEntry, 'IndexEntry']], stream: # body for entry in entries: beginoffset = tell() - write(entry.ctime_bytes) # ctime - write(entry.mtime_bytes) # mtime + write(entry.ctime_bytes) # ctime + write(entry.mtime_bytes) # mtime path_str = str(entry.path) path: bytes = force_bytes(path_str, encoding=defenc) - plen = len(path) & CE_NAMEMASK # path length + plen = len(path) & CE_NAMEMASK # path length assert plen == len(path), "Path %s too long to fit into index" % entry.path - flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values - write(pack(">LLLLLL20sH", entry.dev, entry.inode, entry.mode, - entry.uid, entry.gid, entry.size, entry.binsha, flags)) + flags = plen | (entry.flags & CE_NAMEMASK_INV) # clear possible previous values + write( + pack( + ">LLLLLL20sH", + entry.dev, + entry.inode, + entry.mode, + entry.uid, + entry.gid, + entry.size, + entry.binsha, + flags, + ) + ) write(path) - real_size = ((tell() - beginoffset + 8) & ~7) + real_size = (tell() - beginoffset + 8) & ~7 write(b"\0" * ((beginoffset + real_size) - tell())) # END for each entry @@ -216,7 +234,9 @@ def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, i # END handle entry -def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'IndexEntry'], bytes, bytes]: +def read_cache( + stream: IO[bytes], +) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: """Read a cache file from the given stream :return: tuple(version, entries_dict, extension_data, content_sha) * version is the integer version number @@ -225,7 +245,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde * content_sha is a 20 byte sha on all cache file contents""" version, num_entries = read_header(stream) count = 0 - entries: Dict[Tuple[PathLike, int], 'IndexEntry'] = {} + entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} read = stream.read tell = stream.tell @@ -233,14 +253,17 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde beginoffset = tell() ctime = unpack(">8s", read(8))[0] mtime = unpack(">8s", read(8))[0] - (dev, ino, mode, uid, gid, size, sha, flags) = \ - unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2)) + (dev, ino, mode, uid, gid, size, sha, flags) = unpack( + ">LLLLLL20sH", read(20 + 4 * 6 + 2) + ) path_size = flags & CE_NAMEMASK path = read(path_size).decode(defenc) - real_size = ((tell() - beginoffset + 8) & ~7) + real_size = (tell() - beginoffset + 8) & ~7 read((beginoffset + real_size) - tell()) - entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)) + entry = IndexEntry( + (mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size) + ) # entry_key would be the method to use, but we safe the effort entries[(path, entry.stage)] = entry count += 1 @@ -253,19 +276,22 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde # 4 bytes length of chunk # repeated 0 - N times extension_data = stream.read(~0) - assert len(extension_data) > 19, "Index Footer was not at least a sha on content as it was only %i bytes in size"\ - % len(extension_data) + assert len(extension_data) > 19, ( + "Index Footer was not at least a sha on content as it was only %i bytes in size" + % len(extension_data) + ) content_sha = extension_data[-20:] # truncate the sha in the end as we will dynamically create it anyway - extension_data = extension_data[: -20] + extension_data = extension_data[:-20] return (version, entries, extension_data, content_sha) -def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: slice, si: int = 0 - ) -> Tuple[bytes, List['TreeCacheTup']]: +def write_tree_from_cache( + entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0 +) -> Tuple[bytes, List["TreeCacheTup"]]: """Create a tree from the given sorted list of entries and put the respective trees into the given object database @@ -275,7 +301,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: :param sl: slice indicating the range we should process on the entries list :return: tuple(binsha, list(tree_entry, ...)) a tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name""" - tree_items: List['TreeCacheTup'] = [] + tree_items: List["TreeCacheTup"] = [] ci = sl.start end = sl.stop @@ -285,7 +311,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: raise UnmergedEntriesError(entry) # END abort on unmerged ci += 1 - rbound = entry.path.find('/', si) + rbound = entry.path.find("/", si) if rbound == -1: # its not a tree tree_items.append((entry.binsha, entry.mode, entry.path[si:])) @@ -295,7 +321,7 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: xi = ci while xi < end: oentry = entries[xi] - orbound = oentry.path.find('/', si) + orbound = oentry.path.find("/", si) if orbound == -1 or oentry.path[si:orbound] != base: break # END abort on base mismatch @@ -304,7 +330,9 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: # enter recursion # ci - 1 as we want to count our current item as well - sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) + sha, _tree_entry_list = write_tree_from_cache( + entries, odb, slice(ci - 1, xi), rbound + 1 + ) tree_items.append((sha, S_IFDIR, base)) # skip ahead @@ -314,18 +342,26 @@ def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: # finally create the tree sio = BytesIO() - tree_to_stream(tree_items, sio.write) # writes to stream as bytes, but doesn't change tree_items + tree_to_stream( + tree_items, sio.write + ) # writes to stream as bytes, but doesn't change tree_items sio.seek(0) istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) return (istream.binsha, tree_items) -def _tree_entry_to_baseindexentry(tree_entry: 'TreeCacheTup', stage: int) -> BaseIndexEntry: - return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) +def _tree_entry_to_baseindexentry( + tree_entry: "TreeCacheTup", stage: int +) -> BaseIndexEntry: + return BaseIndexEntry( + (tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]) + ) -def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: +def aggressive_tree_merge( + odb: "GitCmdObjectDB", tree_shas: Sequence[bytes] +) -> List[BaseIndexEntry]: """ :return: list of BaseIndexEntries representing the aggressive merge of the given trees. All valid entries are on stage 0, whereas the conflicting ones are left @@ -339,7 +375,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> # one and two way is the same for us, as we don't have to handle an existing # index, instrea if len(tree_shas) in (1, 2): - for entry in traverse_tree_recursive(odb, tree_shas[-1], ''): + for entry in traverse_tree_recursive(odb, tree_shas[-1], ""): out.append(_tree_entry_to_baseindexentry(entry, 0)) # END for each entry return out @@ -349,7 +385,7 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) # three trees - for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ''): + for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""): if base is not None: # base version exists if ours is not None: @@ -358,8 +394,15 @@ def aggressive_tree_merge(odb: 'GitCmdObjectDB', tree_shas: Sequence[bytes]) -> # it exists in all branches, if it was changed in both # its a conflict, otherwise we take the changed version # This should be the most common branch, so it comes first - if(base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or \ - (base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]): + if ( + base[0] != ours[0] + and base[0] != theirs[0] + and ours[0] != theirs[0] + ) or ( + base[1] != ours[1] + and base[1] != theirs[1] + and ours[1] != theirs[1] + ): # changed by both out.append(_tree_entry_to_baseindexentry(base, 1)) out.append(_tree_entry_to_baseindexentry(ours, 2)) diff --git a/git/index/typ.py b/git/index/typ.py index 46f1b077..cbe26f27 100644 --- a/git/index/typ.py +++ b/git/index/typ.py @@ -2,16 +2,13 @@ from binascii import b2a_hex -from .util import ( - pack, - unpack -) +from .util import pack, unpack from git.objects import Blob # typing ---------------------------------------------------------------------- -from typing import (NamedTuple, Sequence, TYPE_CHECKING, Tuple, Union, cast) +from typing import NamedTuple, Sequence, TYPE_CHECKING, Tuple, Union, cast from git.types import PathLike @@ -20,16 +17,16 @@ if TYPE_CHECKING: # --------------------------------------------------------------------------------- -__all__ = ('BlobFilter', 'BaseIndexEntry', 'IndexEntry') +__all__ = ("BlobFilter", "BaseIndexEntry", "IndexEntry") -#{ Invariants -CE_NAMEMASK = 0x0fff +# { Invariants +CE_NAMEMASK = 0x0FFF CE_STAGEMASK = 0x3000 CE_EXTENDED = 0x4000 CE_VALID = 0x8000 CE_STAGESHIFT = 12 -#} END invariants +# } END invariants class BlobFilter(object): @@ -40,7 +37,8 @@ class BlobFilter(object): The given paths are given relative to the repository. """ - __slots__ = 'paths' + + __slots__ = "paths" def __init__(self, paths: Sequence[PathLike]) -> None: """ @@ -62,6 +60,7 @@ class BlobFilter(object): class BaseIndexEntryHelper(NamedTuple): """Typed namedtuple to provide named attribute access for BaseIndexEntry. Needed to allow overriding __new__ in child class to preserve backwards compat.""" + mode: int binsha: bytes flags: int @@ -85,10 +84,14 @@ class BaseIndexEntry(BaseIndexEntryHelper): use numeric indices for performance reasons. """ - def __new__(cls, inp_tuple: Union[Tuple[int, bytes, int, PathLike], - Tuple[int, bytes, int, PathLike, bytes, bytes, int, int, int, int, int]] - ) -> 'BaseIndexEntry': - """Override __new__ to allow construction from a tuple for backwards compatibility """ + def __new__( + cls, + inp_tuple: Union[ + Tuple[int, bytes, int, PathLike], + Tuple[int, bytes, int, PathLike, bytes, bytes, int, int, int, int, int], + ], + ) -> "BaseIndexEntry": + """Override __new__ to allow construction from a tuple for backwards compatibility""" return super().__new__(cls, *inp_tuple) def __str__(self) -> str: @@ -100,7 +103,7 @@ class BaseIndexEntry(BaseIndexEntryHelper): @property def hexsha(self) -> str: """hex version of our sha""" - return b2a_hex(self.binsha).decode('ascii') + return b2a_hex(self.binsha).decode("ascii") @property def stage(self) -> int: @@ -116,11 +119,11 @@ class BaseIndexEntry(BaseIndexEntryHelper): return (self.flags & CE_STAGEMASK) >> CE_STAGESHIFT @classmethod - def from_blob(cls, blob: Blob, stage: int = 0) -> 'BaseIndexEntry': + def from_blob(cls, blob: Blob, stage: int = 0) -> "BaseIndexEntry": """:return: Fully equipped BaseIndexEntry at the given stage""" return cls((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path)) - def to_blob(self, repo: 'Repo') -> Blob: + def to_blob(self, repo: "Repo") -> Blob: """:return: Blob using the information of this index entry""" return Blob(repo, self.binsha, self.mode, self.path) @@ -132,7 +135,8 @@ class IndexEntry(BaseIndexEntry): Attributes usully accessed often are cached in the tuple whereas others are unpacked on demand. - See the properties for a mapping between names and tuple indices. """ + See the properties for a mapping between names and tuple indices.""" + @property def ctime(self) -> Tuple[int, int]: """ @@ -143,11 +147,11 @@ class IndexEntry(BaseIndexEntry): @property def mtime(self) -> Tuple[int, int]: - """See ctime property, but returns modification time """ + """See ctime property, but returns modification time""" return cast(Tuple[int, int], unpack(">LL", self.mtime_bytes)) @classmethod - def from_base(cls, base: 'BaseIndexEntry') -> 'IndexEntry': + def from_base(cls, base: "BaseIndexEntry") -> "IndexEntry": """ :return: Minimal entry as created from the given BaseIndexEntry instance. @@ -155,11 +159,26 @@ class IndexEntry(BaseIndexEntry): :param base: Instance of type BaseIndexEntry""" time = pack(">LL", 0, 0) - return IndexEntry((base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0)) + return IndexEntry( + (base.mode, base.binsha, base.flags, base.path, time, time, 0, 0, 0, 0, 0) + ) @classmethod - def from_blob(cls, blob: Blob, stage: int = 0) -> 'IndexEntry': + def from_blob(cls, blob: Blob, stage: int = 0) -> "IndexEntry": """:return: Minimal entry resembling the given blob object""" time = pack(">LL", 0, 0) - return IndexEntry((blob.mode, blob.binsha, stage << CE_STAGESHIFT, blob.path, - time, time, 0, 0, 0, 0, blob.size)) + return IndexEntry( + ( + blob.mode, + blob.binsha, + stage << CE_STAGESHIFT, + blob.path, + time, + time, + 0, + 0, + 0, + 0, + blob.size, + ) + ) diff --git a/git/index/util.py b/git/index/util.py index 4f8af553..7339b147 100644 --- a/git/index/util.py +++ b/git/index/util.py @@ -11,7 +11,7 @@ import os.path as osp # typing ---------------------------------------------------------------------- -from typing import (Any, Callable, TYPE_CHECKING) +from typing import Any, Callable, TYPE_CHECKING from git.types import PathLike, _T @@ -21,24 +21,26 @@ if TYPE_CHECKING: # --------------------------------------------------------------------------------- -__all__ = ('TemporaryFileSwap', 'post_clear_cache', 'default_index', 'git_working_dir') +__all__ = ("TemporaryFileSwap", "post_clear_cache", "default_index", "git_working_dir") -#{ Aliases +# { Aliases pack = struct.pack unpack = struct.unpack -#} END aliases +# } END aliases + class TemporaryFileSwap(object): """Utility class moving a file to a temporary location within the same directory and moving it back on to where on object deletion.""" + __slots__ = ("file_path", "tmp_file_path") def __init__(self, file_path: PathLike) -> None: self.file_path = file_path - self.tmp_file_path = str(self.file_path) + tempfile.mktemp('', '', '') + self.tmp_file_path = str(self.file_path) + tempfile.mktemp("", "", "") # it may be that the source does not exist try: os.rename(self.file_path, self.tmp_file_path) @@ -53,7 +55,8 @@ class TemporaryFileSwap(object): # END temp file exists -#{ Decorators +# { Decorators + def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: """Decorator for functions that alter the index using the git command. This would @@ -66,10 +69,13 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: """ @wraps(func) - def post_clear_cache_if_not_raised(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + def post_clear_cache_if_not_raised( + self: "IndexFile", *args: Any, **kwargs: Any + ) -> _T: rval = func(self, *args, **kwargs) self._delete_entries_cache() return rval + # END wrapper method return post_clear_cache_if_not_raised @@ -78,14 +84,17 @@ def post_clear_cache(func: Callable[..., _T]) -> Callable[..., _T]: def default_index(func: Callable[..., _T]) -> Callable[..., _T]: """Decorator assuring the wrapped method may only run if we are the default repository index. This is as we rely on git commands that operate - on that index only. """ + on that index only.""" @wraps(func) - def check_default_index(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + def check_default_index(self: "IndexFile", *args: Any, **kwargs: Any) -> _T: if self._file_path != self._index_path(): raise AssertionError( - "Cannot call %r on indices that do not represent the default git index" % func.__name__) + "Cannot call %r on indices that do not represent the default git index" + % func.__name__ + ) return func(self, *args, **kwargs) + # END wrapper method return check_default_index @@ -96,7 +105,7 @@ def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: repository in order to assure relative paths are handled correctly""" @wraps(func) - def set_git_working_dir(self: 'IndexFile', *args: Any, **kwargs: Any) -> _T: + def set_git_working_dir(self: "IndexFile", *args: Any, **kwargs: Any) -> _T: cur_wd = os.getcwd() os.chdir(str(self.repo.working_tree_dir)) try: @@ -104,8 +113,10 @@ def git_working_dir(func: Callable[..., _T]) -> Callable[..., _T]: finally: os.chdir(cur_wd) # END handle working dir + # END wrapper return set_git_working_dir -#} END decorators + +# } END decorators |