diff options
Diffstat (limited to 'git/objects')
-rw-r--r-- | git/objects/__init__.py | 10 | ||||
-rw-r--r-- | git/objects/base.py | 61 | ||||
-rw-r--r-- | git/objects/blob.py | 7 | ||||
-rw-r--r-- | git/objects/commit.py | 339 | ||||
-rw-r--r-- | git/objects/fun.py | 75 | ||||
-rw-r--r-- | git/objects/submodule/base.py | 536 | ||||
-rw-r--r-- | git/objects/submodule/root.py | 204 | ||||
-rw-r--r-- | git/objects/submodule/util.py | 40 | ||||
-rw-r--r-- | git/objects/tag.py | 50 | ||||
-rw-r--r-- | git/objects/tree.py | 158 | ||||
-rw-r--r-- | git/objects/util.py | 362 |
11 files changed, 1220 insertions, 622 deletions
diff --git a/git/objects/__init__.py b/git/objects/__init__.py index 1d0bb7a5..d2e1e53a 100644 --- a/git/objects/__init__.py +++ b/git/objects/__init__.py @@ -12,13 +12,17 @@ from .submodule.base import * from .submodule.root import * from .tag import * from .tree import * + # Fix import dependency - add IndexObject to the util module, so that it can be # imported by the submodule.base smutil.IndexObject = IndexObject # type: ignore[attr-defined] smutil.Object = Object # type: ignore[attr-defined] -del(smutil) +del smutil # must come after submodule was made available -__all__ = [name for name, obj in locals().items() - if not (name.startswith('_') or inspect.ismodule(obj))] +__all__ = [ + name + for name, obj in locals().items() + if not (name.startswith("_") or inspect.ismodule(obj)) +] diff --git a/git/objects/base.py b/git/objects/base.py index 66e15a8f..9d005725 100644 --- a/git/objects/base.py +++ b/git/objects/base.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: from .submodule.base import Submodule from git.refs.reference import Reference -IndexObjUnion = Union['Tree', 'Blob', 'Submodule'] +IndexObjUnion = Union["Tree", "Blob", "Submodule"] # -------------------------------------------------------------------------- @@ -40,14 +40,20 @@ __all__ = ("Object", "IndexObject") class Object(LazyMixin): """Implements an Object which may be Blobs, Trees, Commits and Tags""" - NULL_HEX_SHA = '0' * 40 - NULL_BIN_SHA = b'\0' * 20 - TYPES = (dbtyp.str_blob_type, dbtyp.str_tree_type, dbtyp.str_commit_type, dbtyp.str_tag_type) + NULL_HEX_SHA = "0" * 40 + NULL_BIN_SHA = b"\0" * 20 + + TYPES = ( + dbtyp.str_blob_type, + dbtyp.str_tree_type, + dbtyp.str_commit_type, + dbtyp.str_tag_type, + ) __slots__ = ("repo", "binsha", "size") type: Union[Lit_commit_ish, None] = None - def __init__(self, repo: 'Repo', binsha: bytes): + def __init__(self, repo: "Repo", binsha: bytes): """Initialize an object by identifying it by its binary sha. All keyword arguments will be set on demand if None. @@ -57,10 +63,13 @@ class Object(LazyMixin): super(Object, self).__init__() self.repo = repo self.binsha = binsha - assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % (binsha, len(binsha)) + assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % ( + binsha, + len(binsha), + ) @classmethod - def new(cls, repo: 'Repo', id: Union[str, 'Reference']) -> Commit_ish: + def new(cls, repo: "Repo", id: Union[str, "Reference"]) -> Commit_ish: """ :return: New Object instance of a type appropriate to the object type behind id. The id of the newly created object will be a binsha even though @@ -73,14 +82,14 @@ class Object(LazyMixin): return repo.rev_parse(str(id)) @classmethod - def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Commit_ish: + def new_from_sha(cls, repo: "Repo", sha1: bytes) -> Commit_ish: """ :return: new object instance of a type appropriate to represent the given binary sha1 :param sha1: 20 byte binary sha1""" if sha1 == cls.NULL_BIN_SHA: # the NULL binsha is always the root commit - return get_object_type_by_name(b'commit')(repo, sha1) + return get_object_type_by_name(b"commit")(repo, sha1) # END handle special case oinfo = repo.odb.info(sha1) inst = get_object_type_by_name(oinfo.type)(repo, oinfo.binsha) @@ -98,13 +107,13 @@ class Object(LazyMixin): def __eq__(self, other: Any) -> bool: """:return: True if the objects have the same SHA1""" - if not hasattr(other, 'binsha'): + if not hasattr(other, "binsha"): return False return self.binsha == other.binsha def __ne__(self, other: Any) -> bool: - """:return: True if the objects do not have the same SHA1 """ - if not hasattr(other, 'binsha'): + """:return: True if the objects do not have the same SHA1""" + if not hasattr(other, "binsha"): return True return self.binsha != other.binsha @@ -124,15 +133,15 @@ class Object(LazyMixin): def hexsha(self) -> str: """:return: 40 byte hex version of our 20 byte binary sha""" # b2a_hex produces bytes - return bin_to_hex(self.binsha).decode('ascii') + return bin_to_hex(self.binsha).decode("ascii") @property - def data_stream(self) -> 'OStream': - """ :return: File Object compatible stream to the uncompressed raw data of the object + def data_stream(self) -> "OStream": + """:return: File Object compatible stream to the uncompressed raw data of the object :note: returned streams must be read in order""" return self.repo.odb.stream(self.binsha) - def stream_data(self, ostream: 'OStream') -> 'Object': + def stream_data(self, ostream: "OStream") -> "Object": """Writes our data directly to the given output stream :param ostream: File object compatible stream object. :return: self""" @@ -145,14 +154,19 @@ class IndexObject(Object): """Base for all objects that can be part of the index file , namely Tree, Blob and SubModule objects""" + __slots__ = ("path", "mode") # for compatibility with iterable lists - _id_attribute_ = 'path' - - def __init__(self, - repo: 'Repo', binsha: bytes, mode: Union[None, int] = None, path: Union[None, PathLike] = None - ) -> None: + _id_attribute_ = "path" + + def __init__( + self, + repo: "Repo", + binsha: bytes, + mode: Union[None, int] = None, + path: Union[None, PathLike] = None, + ) -> None: """Initialize a newly instanced IndexObject :param repo: is the Repo we are located in @@ -184,7 +198,8 @@ class IndexObject(Object): # they cannot be retrieved lateron ( not without searching for them ) raise AttributeError( "Attribute '%s' unset: path and mode attributes must have been set during %s object creation" - % (attr, type(self).__name__)) + % (attr, type(self).__name__) + ) else: super(IndexObject, self)._set_cache_(attr) # END handle slot attribute @@ -201,7 +216,7 @@ class IndexObject(Object): Absolute path to this index object in the file system ( as opposed to the .path field which is a path relative to the git repository ). - The returned path will be native to the system and contains '\' on windows. """ + The returned path will be native to the system and contains '\' on windows.""" if self.repo.working_tree_dir is not None: return join_path_native(self.repo.working_tree_dir, self.path) else: diff --git a/git/objects/blob.py b/git/objects/blob.py index 99b5c636..1881f210 100644 --- a/git/objects/blob.py +++ b/git/objects/blob.py @@ -8,14 +8,15 @@ from . import base from git.types import Literal -__all__ = ('Blob', ) +__all__ = ("Blob",) class Blob(base.IndexObject): """A Blob encapsulates a git blob object""" + DEFAULT_MIME_TYPE = "text/plain" - type: Literal['blob'] = "blob" + type: Literal["blob"] = "blob" # valid blob modes executable_mode = 0o100755 @@ -28,7 +29,7 @@ class Blob(base.IndexObject): def mime_type(self) -> str: """ :return: String describing the mime type of this file (based on the filename) - :note: Defaults to 'text/plain' in case the actual file type is unknown. """ + :note: Defaults to 'text/plain' in case the actual file type is unknown.""" guesses = None if self.path: guesses = guess_type(str(self.path)) diff --git a/git/objects/commit.py b/git/objects/commit.py index 96a2a8e5..137cc620 100644 --- a/git/objects/commit.py +++ b/git/objects/commit.py @@ -6,12 +6,7 @@ import datetime from subprocess import Popen, PIPE from gitdb import IStream -from git.util import ( - hex_to_bin, - Actor, - Stats, - finalize_process -) +from git.util import hex_to_bin, Actor, Stats, finalize_process from git.diff import Diffable from git.cmd import Git @@ -26,13 +21,7 @@ from .util import ( from_timestamp, ) -from time import ( - time, - daylight, - altzone, - timezone, - localtime -) +from time import time, daylight, altzone, timezone, localtime import os from io import BytesIO import logging @@ -40,7 +29,18 @@ import logging # typing ------------------------------------------------------------------ -from typing import Any, IO, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING, cast, Dict +from typing import ( + Any, + IO, + Iterator, + List, + Sequence, + Tuple, + Union, + TYPE_CHECKING, + cast, + Dict, +) from git.types import PathLike, Literal @@ -50,10 +50,10 @@ if TYPE_CHECKING: # ------------------------------------------------------------------------ -log = logging.getLogger('git.objects.commit') +log = logging.getLogger("git.objects.commit") log.addHandler(logging.NullHandler()) -__all__ = ('Commit', ) +__all__ = ("Commit",) class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): @@ -69,30 +69,44 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): env_committer_date = "GIT_COMMITTER_DATE" # CONFIGURATION KEYS - conf_encoding = 'i18n.commitencoding' + conf_encoding = "i18n.commitencoding" # INVARIANTS default_encoding = "UTF-8" # object configuration - type: Literal['commit'] = "commit" - __slots__ = ("tree", - "author", "authored_date", "author_tz_offset", - "committer", "committed_date", "committer_tz_offset", - "message", "parents", "encoding", "gpgsig") + type: Literal["commit"] = "commit" + __slots__ = ( + "tree", + "author", + "authored_date", + "author_tz_offset", + "committer", + "committed_date", + "committer_tz_offset", + "message", + "parents", + "encoding", + "gpgsig", + ) _id_attribute_ = "hexsha" - def __init__(self, repo: 'Repo', binsha: bytes, tree: Union[Tree, None] = None, - author: Union[Actor, None] = None, - authored_date: Union[int, None] = None, - author_tz_offset: Union[None, float] = None, - committer: Union[Actor, None] = None, - committed_date: Union[int, None] = None, - committer_tz_offset: Union[None, float] = None, - message: Union[str, bytes, None] = None, - parents: Union[Sequence['Commit'], None] = None, - encoding: Union[str, None] = None, - gpgsig: Union[str, None] = None) -> None: + def __init__( + self, + repo: "Repo", + binsha: bytes, + tree: Union[Tree, None] = None, + author: Union[Actor, None] = None, + authored_date: Union[int, None] = None, + author_tz_offset: Union[None, float] = None, + committer: Union[Actor, None] = None, + committed_date: Union[int, None] = None, + committer_tz_offset: Union[None, float] = None, + message: Union[str, bytes, None] = None, + parents: Union[Sequence["Commit"], None] = None, + encoding: Union[str, None] = None, + gpgsig: Union[str, None] = None, + ) -> None: """Instantiate a new Commit. All keyword arguments taking None as default will be implicitly set on first query. @@ -130,7 +144,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): super(Commit, self).__init__(repo, binsha) self.binsha = binsha if tree is not None: - assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree) + assert isinstance( + tree, Tree + ), "Tree needs to be a Tree instance, was %s" % type(tree) if tree is not None: self.tree = tree if author is not None: @@ -155,16 +171,16 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): self.gpgsig = gpgsig @classmethod - def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]: + def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]: return tuple(commit.parents) @classmethod - def _calculate_sha_(cls, repo: 'Repo', commit: 'Commit') -> bytes: - '''Calculate the sha of a commit. + def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes: + """Calculate the sha of a commit. :param repo: Repo object the commit should be part of :param commit: Commit object for which to generate the sha - ''' + """ stream = BytesIO() commit._serialize(stream) @@ -174,18 +190,18 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): istream = repo.odb.store(IStream(cls.type, streamlen, stream)) return istream.binsha - def replace(self, **kwargs: Any) -> 'Commit': - '''Create new commit object from existing commit object. + def replace(self, **kwargs: Any) -> "Commit": + """Create new commit object from existing commit object. Any values provided as keyword arguments will replace the corresponding attribute in the new object. - ''' + """ attrs = {k: getattr(self, k) for k in self.__slots__} for attrname in kwargs: if attrname not in self.__slots__: - raise ValueError('invalid attribute name') + raise ValueError("invalid attribute name") attrs.update(kwargs) new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs) @@ -214,11 +230,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): def summary(self) -> Union[str, bytes]: """:return: First line of the commit message""" if isinstance(self.message, str): - return self.message.split('\n', 1)[0] + return self.message.split("\n", 1)[0] else: - return self.message.split(b'\n', 1)[0] + return self.message.split(b"\n", 1)[0] - def count(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> int: + def count( + self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any + ) -> int: """Count the number of commits reachable from this commit :param paths: @@ -232,7 +250,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # yes, it makes a difference whether empty paths are given or not in our case # as the empty paths version will ignore merge commits for some reason. if paths: - return len(self.repo.git.rev_list(self.hexsha, '--', paths, **kwargs).splitlines()) + return len( + self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines() + ) return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines()) @property @@ -244,9 +264,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): return self.repo.git.name_rev(self) @classmethod - def iter_items(cls, repo: 'Repo', rev: Union[str, 'Commit', 'SymbolicReference'], # type: ignore - paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any - ) -> Iterator['Commit']: + def iter_items( + cls, + repo: "Repo", + rev: Union[str, "Commit", "SymbolicReference"], # type: ignore + paths: Union[PathLike, Sequence[PathLike]] = "", + **kwargs: Any, + ) -> Iterator["Commit"]: """Find all commits matching the given criteria. :param repo: is the Repo @@ -260,19 +284,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): ``skip`` is the number of commits to skip ``since`` all commits since i.e. '1970-01-01' :return: iterator yielding Commit items""" - if 'pretty' in kwargs: - raise ValueError("--pretty cannot be used as parsing expects single sha's only") + if "pretty" in kwargs: + raise ValueError( + "--pretty cannot be used as parsing expects single sha's only" + ) # END handle pretty # use -- in any case, to prevent possibility of ambiguous arguments # see https://github.com/gitpython-developers/GitPython/issues/264 - args_list: List[PathLike] = ['--'] + args_list: List[PathLike] = ["--"] if paths: paths_tup: Tuple[PathLike, ...] if isinstance(paths, (str, os.PathLike)): - paths_tup = (paths, ) + paths_tup = (paths,) else: paths_tup = tuple(paths) @@ -282,37 +308,41 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs) return cls._iter_from_process_or_stream(repo, proc) - def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator['Commit']: + def iter_parents( + self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any + ) -> Iterator["Commit"]: """Iterate _all_ parents of this commit. :param paths: Optional path or list of paths limiting the Commits to those that contain at least one of the paths :param kwargs: All arguments allowed by git-rev-list - :return: Iterator yielding Commit objects which are parents of self """ + :return: Iterator yielding Commit objects which are parents of self""" # skip ourselves skip = kwargs.get("skip", 1) - if skip == 0: # skip ourselves + if skip == 0: # skip ourselves skip = 1 - kwargs['skip'] = skip + kwargs["skip"] = skip return self.iter_items(self.repo, self, paths, **kwargs) - @ property + @property def stats(self) -> Stats: """Create a git stat from changes between this commit and its first parent or from all changes done if this is the very first commit. :return: git.Stats""" if not self.parents: - text = self.repo.git.diff_tree(self.hexsha, '--', numstat=True, root=True) + text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True) text2 = "" for line in text.splitlines()[1:]: (insertions, deletions, filename) = line.split("\t") text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename) text = text2 else: - text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True) + text = self.repo.git.diff( + self.parents[0].hexsha, self.hexsha, "--", numstat=True + ) return Stats._list_from_string(self.repo, text) @property @@ -352,19 +382,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): """ d = {} - cmd = ['git', 'interpret-trailers', '--parse'] + cmd = ["git", "interpret-trailers", "--parse"] proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore trailer: str = proc.communicate(str(self.message).encode())[0].decode() - if trailer.endswith('\n'): + if trailer.endswith("\n"): trailer = trailer[0:-1] - if trailer != '': - for line in trailer.split('\n'): - key, value = line.split(':', 1) + if trailer != "": + for line in trailer.split("\n"): + key, value = line.split(":", 1) d[key.strip()] = value.strip() return d - @ classmethod - def _iter_from_process_or_stream(cls, repo: 'Repo', proc_or_stream: Union[Popen, IO]) -> Iterator['Commit']: + @classmethod + def _iter_from_process_or_stream( + cls, repo: "Repo", proc_or_stream: Union[Popen, IO] + ) -> Iterator["Commit"]: """Parse out commit information into a list of Commit objects We expect one-line per commit, and parse the actual commit information directly from our lighting fast object database @@ -378,11 +410,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # def is_stream(inp) -> TypeGuard[IO]: # return hasattr(proc_or_stream, 'readline') - if hasattr(proc_or_stream, 'wait'): + if hasattr(proc_or_stream, "wait"): proc_or_stream = cast(Popen, proc_or_stream) if proc_or_stream.stdout is not None: stream = proc_or_stream.stdout - elif hasattr(proc_or_stream, 'readline'): + elif hasattr(proc_or_stream, "readline"): proc_or_stream = cast(IO, proc_or_stream) stream = proc_or_stream @@ -402,15 +434,23 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END for each line in stream # TODO: Review this - it seems process handling got a bit out of control # due to many developers trying to fix the open file handles issue - if hasattr(proc_or_stream, 'wait'): + if hasattr(proc_or_stream, "wait"): proc_or_stream = cast(Popen, proc_or_stream) finalize_process(proc_or_stream) - @ classmethod - def create_from_tree(cls, repo: 'Repo', tree: Union[Tree, str], message: str, - parent_commits: Union[None, List['Commit']] = None, head: bool = False, - author: Union[None, Actor] = None, committer: Union[None, Actor] = None, - author_date: Union[None, str] = None, commit_date: Union[None, str] = None) -> 'Commit': + @classmethod + def create_from_tree( + cls, + repo: "Repo", + tree: Union[Tree, str], + message: str, + parent_commits: Union[None, List["Commit"]] = None, + head: bool = False, + author: Union[None, Actor] = None, + committer: Union[None, Actor] = None, + author_date: Union[None, str] = None, + commit_date: Union[None, str] = None, + ) -> "Commit": """Commit the given tree, creating a commit object. :param repo: Repo object the commit should be part of @@ -473,7 +513,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): is_dst = daylight and localtime().tm_isdst > 0 offset = altzone if is_dst else timezone - author_date_str = env.get(cls.env_author_date, '') + author_date_str = env.get(cls.env_author_date, "") if author_date: author_time, author_offset = parse_date(author_date) elif author_date_str: @@ -482,7 +522,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): author_time, author_offset = unix_time, offset # END set author time - committer_date_str = env.get(cls.env_committer_date, '') + committer_date_str = env.get(cls.env_committer_date, "") if commit_date: committer_time, committer_offset = parse_date(commit_date) elif committer_date_str: @@ -492,7 +532,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END set committer time # assume utf8 encoding - enc_section, enc_option = cls.conf_encoding.split('.') + enc_section, enc_option = cls.conf_encoding.split(".") conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding) if not isinstance(conf_encoding, str): raise TypeError("conf_encoding could not be coerced to str") @@ -504,10 +544,20 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END tree conversion # CREATE NEW COMMIT - new_commit = cls(repo, cls.NULL_BIN_SHA, tree, - author, author_time, author_offset, - committer, committer_time, committer_offset, - message, parent_commits, conf_encoding) + new_commit = cls( + repo, + cls.NULL_BIN_SHA, + tree, + author, + author_time, + author_offset, + committer, + committer_time, + committer_offset, + message, + parent_commits, + conf_encoding, + ) new_commit.binsha = cls._calculate_sha_(repo, new_commit) @@ -515,48 +565,74 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # need late import here, importing git at the very beginning throws # as well ... import git.refs + try: repo.head.set_commit(new_commit, logmsg=message) except ValueError: # head is not yet set to the ref our HEAD points to # Happens on first commit - master = git.refs.Head.create(repo, repo.head.ref, new_commit, logmsg="commit (initial): %s" % message) - repo.head.set_reference(master, logmsg='commit: Switching to %s' % master) + master = git.refs.Head.create( + repo, + repo.head.ref, + new_commit, + logmsg="commit (initial): %s" % message, + ) + repo.head.set_reference( + master, logmsg="commit: Switching to %s" % master + ) # END handle empty repositories # END advance head handling return new_commit - #{ Serializable Implementation + # { Serializable Implementation - def _serialize(self, stream: BytesIO) -> 'Commit': + def _serialize(self, stream: BytesIO) -> "Commit": write = stream.write - write(("tree %s\n" % self.tree).encode('ascii')) + write(("tree %s\n" % self.tree).encode("ascii")) for p in self.parents: - write(("parent %s\n" % p).encode('ascii')) + write(("parent %s\n" % p).encode("ascii")) a = self.author aname = a.name c = self.committer fmt = "%s %s <%s> %s %s\n" - write((fmt % ("author", aname, a.email, - self.authored_date, - altz_to_utctz_str(self.author_tz_offset))).encode(self.encoding)) + write( + ( + fmt + % ( + "author", + aname, + a.email, + self.authored_date, + altz_to_utctz_str(self.author_tz_offset), + ) + ).encode(self.encoding) + ) # encode committer aname = c.name - write((fmt % ("committer", aname, c.email, - self.committed_date, - altz_to_utctz_str(self.committer_tz_offset))).encode(self.encoding)) + write( + ( + fmt + % ( + "committer", + aname, + c.email, + self.committed_date, + altz_to_utctz_str(self.committer_tz_offset), + ) + ).encode(self.encoding) + ) if self.encoding != self.default_encoding: - write(("encoding %s\n" % self.encoding).encode('ascii')) + write(("encoding %s\n" % self.encoding).encode("ascii")) try: - if self.__getattribute__('gpgsig'): + if self.__getattribute__("gpgsig"): write(b"gpgsig") for sigline in self.gpgsig.rstrip("\n").split("\n"): - write((" " + sigline + "\n").encode('ascii')) + write((" " + sigline + "\n").encode("ascii")) except AttributeError: pass @@ -570,23 +646,29 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # END handle encoding return self - def _deserialize(self, stream: BytesIO) -> 'Commit': + def _deserialize(self, stream: BytesIO) -> "Commit": """ :param from_rev_list: if true, the stream format is coming from the rev-list command Otherwise it is assumed to be a plain data stream from our object """ readline = stream.readline - self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, '') + self.tree = Tree( + self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "" + ) self.parents = [] next_line = None while True: parent_line = readline() - if not parent_line.startswith(b'parent'): + if not parent_line.startswith(b"parent"): next_line = parent_line break # END abort reading parents - self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode('ascii')))) + self.parents.append( + type(self)( + self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii")) + ) + ) # END for each parent line self.parents = tuple(self.parents) @@ -596,9 +678,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): # we might run into one or more mergetag blocks, skip those for now next_line = readline() - while next_line.startswith(b'mergetag '): + while next_line.startswith(b"mergetag "): next_line = readline() - while next_line.startswith(b' '): + while next_line.startswith(b" "): next_line = readline() # end skip mergetags @@ -612,10 +694,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): buf = enc.strip() while buf: if buf[0:10] == b"encoding ": - self.encoding = buf[buf.find(b' ') + 1:].decode( - self.encoding, 'ignore') + self.encoding = buf[buf.find(b" ") + 1 :].decode( + self.encoding, "ignore" + ) elif buf[0:7] == b"gpgsig ": - sig = buf[buf.find(b' ') + 1:] + b"\n" + sig = buf[buf.find(b" ") + 1 :] + b"\n" is_next_header = False while True: sigbuf = readline() @@ -627,37 +710,55 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable): break sig += sigbuf[1:] # end read all signature - self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, 'ignore') + self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore") if is_next_header: continue buf = readline().strip() # decode the authors name try: - (self.author, self.authored_date, self.author_tz_offset) = \ - parse_actor_and_date(author_line.decode(self.encoding, 'replace')) + ( + self.author, + self.authored_date, + self.author_tz_offset, + ) = parse_actor_and_date(author_line.decode(self.encoding, "replace")) except UnicodeDecodeError: - log.error("Failed to decode author line '%s' using encoding %s", author_line, self.encoding, - exc_info=True) + log.error( + "Failed to decode author line '%s' using encoding %s", + author_line, + self.encoding, + exc_info=True, + ) try: - self.committer, self.committed_date, self.committer_tz_offset = \ - parse_actor_and_date(committer_line.decode(self.encoding, 'replace')) + ( + self.committer, + self.committed_date, + self.committer_tz_offset, + ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace")) except UnicodeDecodeError: - log.error("Failed to decode committer line '%s' using encoding %s", committer_line, self.encoding, - exc_info=True) + log.error( + "Failed to decode committer line '%s' using encoding %s", + committer_line, + self.encoding, + exc_info=True, + ) # END handle author's encoding # a stream from our data simply gives us the plain message # The end of our message stream is marked with a newline that we strip self.message = stream.read() try: - self.message = self.message.decode(self.encoding, 'replace') + self.message = self.message.decode(self.encoding, "replace") except UnicodeDecodeError: - log.error("Failed to decode message '%s' using encoding %s", - self.message, self.encoding, exc_info=True) + log.error( + "Failed to decode message '%s' using encoding %s", + self.message, + self.encoding, + exc_info=True, + ) # END exception handling return self - #} END serializable implementation + # } END serializable implementation diff --git a/git/objects/fun.py b/git/objects/fun.py index 19b4e525..de065599 100644 --- a/git/objects/fun.py +++ b/git/objects/fun.py @@ -2,14 +2,20 @@ from stat import S_ISDIR -from git.compat import ( - safe_decode, - defenc -) +from git.compat import safe_decode, defenc # typing ---------------------------------------------- -from typing import Callable, List, MutableSequence, Sequence, Tuple, TYPE_CHECKING, Union, overload +from typing import ( + Callable, + List, + MutableSequence, + Sequence, + Tuple, + TYPE_CHECKING, + Union, + overload, +) if TYPE_CHECKING: from _typeshed import ReadableBuffer @@ -21,19 +27,25 @@ EntryTupOrNone = Union[EntryTup, None] # --------------------------------------------------- -__all__ = ('tree_to_stream', 'tree_entries_from_data', 'traverse_trees_recursive', - 'traverse_tree_recursive') +__all__ = ( + "tree_to_stream", + "tree_entries_from_data", + "traverse_trees_recursive", + "traverse_tree_recursive", +) -def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer'], Union[int, None]]) -> None: +def tree_to_stream( + entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]] +) -> None: """Write the give list of entries into a stream using its write method :param entries: **sorted** list of tuples with (binsha, mode, name) :param write: write method which takes a data string""" - ord_zero = ord('0') - bit_mask = 7 # 3 bits set + ord_zero = ord("0") + bit_mask = 7 # 3 bits set for binsha, mode, name in entries: - mode_str = b'' + mode_str = b"" for i in range(6): mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str # END for each 8 octal value @@ -52,7 +64,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer name_bytes = name.encode(defenc) else: name_bytes = name # type: ignore[unreachable] # check runtime types - is always str? - write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha))) + write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha))) # END for each item @@ -60,8 +72,8 @@ def tree_entries_from_data(data: bytes) -> List[EntryTup]: """Reads the binary representation of a tree and returns tuples of Tree items :param data: data block with tree data (as bytes) :return: list(tuple(binsha, mode, tree_relative_path), ...)""" - ord_zero = ord('0') - space_ord = ord(' ') + ord_zero = ord("0") + space_ord = ord(" ") len_data = len(data) i = 0 out = [] @@ -95,15 +107,16 @@ def tree_entries_from_data(data: bytes) -> List[EntryTup]: # byte is NULL, get next 20 i += 1 - sha = data[i:i + 20] + sha = data[i : i + 20] i = i + 20 out.append((sha, mode, name)) # END for each byte in data stream return out -def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int - ) -> EntryTupOrNone: +def _find_by_name( + tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int +) -> EntryTupOrNone: """return data entry matching the given name and tree mode or None. Before the item is returned, the respective data item is set @@ -126,12 +139,12 @@ def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: return None -@ overload +@overload def _to_full_path(item: None, path_prefix: str) -> None: ... -@ overload +@overload def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: ... @@ -143,8 +156,9 @@ def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone: return (item[0], item[1], path_prefix + item[2]) -def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[bytes, None]], - path_prefix: str) -> List[Tuple[EntryTupOrNone, ...]]: +def traverse_trees_recursive( + odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str +) -> List[Tuple[EntryTupOrNone, ...]]: """ :return: list of list with entries according to the given binary tree-shas. The result is encoded in a list @@ -187,7 +201,7 @@ def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[by entries = [None for _ in range(nt)] entries[ti] = item _sha, mode, name = item - is_dir = S_ISDIR(mode) # type mode bits + is_dir = S_ISDIR(mode) # type mode bits # find this item in all other tree data items # wrap around, but stop one before our current index, hence @@ -199,8 +213,13 @@ def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[by # END for each other item data # if we are a directory, enter recursion if is_dir: - out.extend(traverse_trees_recursive( - odb, [((ei and ei[0]) or None) for ei in entries], path_prefix + name + '/')) + out.extend( + traverse_trees_recursive( + odb, + [((ei and ei[0]) or None) for ei in entries], + path_prefix + name + "/", + ) + ) else: out.append(tuple(_to_full_path(e, path_prefix) for e in entries)) @@ -210,12 +229,14 @@ def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[by # END for each item # we are done with one tree, set all its data empty - del(tree_data[:]) + del tree_data[:] # END for each tree_data chunk return out -def traverse_tree_recursive(odb: 'GitCmdObjectDB', tree_sha: bytes, path_prefix: str) -> List[EntryTup]: +def traverse_tree_recursive( + odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str +) -> List[EntryTup]: """ :return: list of entries of the tree pointed to by the binary tree_sha. An entry has the following format: @@ -229,7 +250,7 @@ def traverse_tree_recursive(odb: 'GitCmdObjectDB', tree_sha: bytes, path_prefix: # unpacking/packing is faster than accessing individual items for sha, mode, name in data: if S_ISDIR(mode): - entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + '/')) + entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/")) else: entries.append((sha, mode, path_prefix + name)) # END for each item diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index f7820455..84a34206 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -11,16 +11,12 @@ from git.compat import ( defenc, is_win, ) -from git.config import ( - SectionConstraint, - GitConfigParser, - cp -) +from git.config import SectionConstraint, GitConfigParser, cp from git.exc import ( InvalidGitRepositoryError, NoSuchPathError, RepositoryDirtyError, - BadName + BadName, ) from git.objects.base import IndexObject, Object from git.objects.util import TraversableIterableObj @@ -31,7 +27,7 @@ from git.util import ( RemoteProgress, rmtree, unbare_repo, - IterableList + IterableList, ) from git.util import HIDE_WINDOWS_KNOWN_ERRORS @@ -42,7 +38,7 @@ from .util import ( sm_name, sm_section, SubmoduleConfigParser, - find_first_remote_branch + find_first_remote_branch, ) @@ -63,7 +59,7 @@ if TYPE_CHECKING: __all__ = ["Submodule", "UpdateProgress"] -log = logging.getLogger('git.objects.submodule.base') +log = logging.getLogger("git.objects.submodule.base") log.addHandler(logging.NullHandler()) @@ -71,7 +67,11 @@ class UpdateProgress(RemoteProgress): """Class providing detailed progress information to the caller who should derive from it and implement the ``update(...)`` message""" - CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)] + + CLONE, FETCH, UPDWKTREE = [ + 1 << x + for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3) + ] _num_op_codes: int = RemoteProgress._num_op_codes + 3 __slots__ = () @@ -98,25 +98,30 @@ class Submodule(IndexObject, TraversableIterableObj): All methods work in bare and non-bare repositories.""" _id_attribute_ = "name" - k_modules_file = '.gitmodules' - k_head_option = 'branch' - k_head_default = 'master' - k_default_mode = stat.S_IFDIR | stat.S_IFLNK # submodules are directories with link-status + k_modules_file = ".gitmodules" + k_head_option = "branch" + k_head_default = "master" + k_default_mode = ( + stat.S_IFDIR | stat.S_IFLNK + ) # submodules are directories with link-status # this is a bogus type for base class compatibility - type: Literal['submodule'] = 'submodule' # type: ignore - - __slots__ = ('_parent_commit', '_url', '_branch_path', '_name', '__weakref__') - _cache_attrs = ('path', '_url', '_branch_path') - - def __init__(self, repo: 'Repo', binsha: bytes, - mode: Union[int, None] = None, - path: Union[PathLike, None] = None, - name: Union[str, None] = None, - parent_commit: Union[Commit_ish, None] = None, - url: Union[str, None] = None, - branch_path: Union[PathLike, None] = None - ) -> None: + type: Literal["submodule"] = "submodule" # type: ignore + + __slots__ = ("_parent_commit", "_url", "_branch_path", "_name", "__weakref__") + _cache_attrs = ("path", "_url", "_branch_path") + + def __init__( + self, + repo: "Repo", + binsha: bytes, + mode: Union[int, None] = None, + path: Union[PathLike, None] = None, + name: Union[str, None] = None, + parent_commit: Union[Commit_ish, None] = None, + url: Union[str, None] = None, + branch_path: Union[PathLike, None] = None, + ) -> None: """Initialize this instance with its attributes. We only document the ones that differ from ``IndexObject`` @@ -137,32 +142,38 @@ class Submodule(IndexObject, TraversableIterableObj): self._name = name def _set_cache_(self, attr: str) -> None: - if attr in ('path', '_url', '_branch_path'): + if attr in ("path", "_url", "_branch_path"): reader: SectionConstraint = self.config_reader() # default submodule values try: - self.path = reader.get('path') + self.path = reader.get("path") except cp.NoSectionError as e: if self.repo.working_tree_dir is not None: - raise ValueError("This submodule instance does not exist anymore in '%s' file" - % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e + raise ValueError( + "This submodule instance does not exist anymore in '%s' file" + % osp.join(self.repo.working_tree_dir, ".gitmodules") + ) from e # end - self._url = reader.get('url') + self._url = reader.get("url") # git-python extension values - optional - self._branch_path = reader.get_value(self.k_head_option, git.Head.to_full_path(self.k_head_default)) - elif attr == '_name': - raise AttributeError("Cannot retrieve the name of a submodule if it was not set initially") + self._branch_path = reader.get_value( + self.k_head_option, git.Head.to_full_path(self.k_head_default) + ) + elif attr == "_name": + raise AttributeError( + "Cannot retrieve the name of a submodule if it was not set initially" + ) else: super(Submodule, self)._set_cache_(attr) # END handle attribute name @classmethod - def _get_intermediate_items(cls, item: 'Submodule') -> IterableList['Submodule']: + def _get_intermediate_items(cls, item: "Submodule") -> IterableList["Submodule"]: """:return: all the submodules of our module repository""" try: return cls.list_items(item.module()) except InvalidGitRepositoryError: - return IterableList('') + return IterableList("") # END handle intermediate items @classmethod @@ -188,13 +199,18 @@ class Submodule(IndexObject, TraversableIterableObj): return self._name def __repr__(self) -> str: - return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)"\ - % (type(self).__name__, self._name, self.path, self.url, self.branch_path) + return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)" % ( + type(self).__name__, + self._name, + self.path, + self.url, + self.branch_path, + ) @classmethod - def _config_parser(cls, repo: 'Repo', - parent_commit: Union[Commit_ish, None], - read_only: bool) -> SubmoduleConfigParser: + def _config_parser( + cls, repo: "Repo", parent_commit: Union[Commit_ish, None], read_only: bool + ) -> SubmoduleConfigParser: """:return: Config Parser constrained to our submodule in read or write mode :raise IOError: If the .gitmodules file cannot be found, either locally or in the repository at the given parent commit. Otherwise the exception would be delayed until the first @@ -211,17 +227,23 @@ class Submodule(IndexObject, TraversableIterableObj): if not repo.bare and parent_matches_head and repo.working_tree_dir: fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file) else: - assert parent_commit is not None, "need valid parent_commit in bare repositories" + assert ( + parent_commit is not None + ), "need valid parent_commit in bare repositories" try: fp_module = cls._sio_modules(parent_commit) except KeyError as e: - raise IOError("Could not find %s file in the tree of parent commit %s" % - (cls.k_modules_file, parent_commit)) from e + raise IOError( + "Could not find %s file in the tree of parent commit %s" + % (cls.k_modules_file, parent_commit) + ) from e # END handle exceptions # END handle non-bare working tree if not read_only and (repo.bare or not parent_matches_head): - raise ValueError("Cannot write blobs of 'historical' submodule configurations") + raise ValueError( + "Cannot write blobs of 'historical' submodule configurations" + ) # END handle writes of historical submodules return SubmoduleConfigParser(fp_module, read_only=read_only) @@ -246,7 +268,7 @@ class Submodule(IndexObject, TraversableIterableObj): def _config_parser_constrained(self, read_only: bool) -> SectionConstraint: """:return: Config Parser constrained to our submodule in read or write mode""" try: - pc: Union['Commit_ish', None] = self.parent_commit + pc: Union["Commit_ish", None] = self.parent_commit except ValueError: pc = None # end handle empty parent repository @@ -255,16 +277,20 @@ class Submodule(IndexObject, TraversableIterableObj): return SectionConstraint(parser, sm_section(self.name)) @classmethod - def _module_abspath(cls, parent_repo: 'Repo', path: PathLike, name: str) -> PathLike: + def _module_abspath( + cls, parent_repo: "Repo", path: PathLike, name: str + ) -> PathLike: if cls._need_gitfile_submodules(parent_repo.git): - return osp.join(parent_repo.git_dir, 'modules', name) + return osp.join(parent_repo.git_dir, "modules", name) if parent_repo.working_tree_dir: return osp.join(parent_repo.working_tree_dir, path) raise NotADirectoryError() # end @classmethod - def _clone_repo(cls, repo: 'Repo', url: str, path: PathLike, name: str, **kwargs: Any) -> 'Repo': + def _clone_repo( + cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any + ) -> "Repo": """:return: Repo instance of newly cloned repository :param repo: our parent repository :param url: url to clone from @@ -274,7 +300,7 @@ class Submodule(IndexObject, TraversableIterableObj): module_abspath = cls._module_abspath(repo, path, name) module_checkout_path = module_abspath if cls._need_gitfile_submodules(repo.git): - kwargs['separate_git_dir'] = module_abspath + kwargs["separate_git_dir"] = module_abspath module_abspath_dir = osp.dirname(module_abspath) if not osp.isdir(module_abspath_dir): os.makedirs(module_abspath_dir) @@ -288,29 +314,36 @@ class Submodule(IndexObject, TraversableIterableObj): return clone @classmethod - def _to_relative_path(cls, parent_repo: 'Repo', path: PathLike) -> PathLike: + def _to_relative_path(cls, parent_repo: "Repo", path: PathLike) -> PathLike: """:return: a path guaranteed to be relative to the given parent - repository :raise ValueError: if path is not contained in the parent repository's working tree""" path = to_native_path_linux(path) - if path.endswith('/'): + if path.endswith("/"): path = path[:-1] # END handle trailing slash if osp.isabs(path) and parent_repo.working_tree_dir: working_tree_linux = to_native_path_linux(parent_repo.working_tree_dir) if not path.startswith(working_tree_linux): - raise ValueError("Submodule checkout path '%s' needs to be within the parents repository at '%s'" - % (working_tree_linux, path)) - path = path[len(working_tree_linux.rstrip('/')) + 1:] + raise ValueError( + "Submodule checkout path '%s' needs to be within the parents repository at '%s'" + % (working_tree_linux, path) + ) + path = path[len(working_tree_linux.rstrip("/")) + 1 :] if not path: - raise ValueError("Absolute submodule path '%s' didn't yield a valid relative path" % path) + raise ValueError( + "Absolute submodule path '%s' didn't yield a valid relative path" + % path + ) # end verify converted relative path makes sense # end convert to a relative path return path @classmethod - def _write_git_file_and_module_config(cls, working_tree_dir: PathLike, module_abspath: PathLike) -> None: + def _write_git_file_and_module_config( + cls, working_tree_dir: PathLike, module_abspath: PathLike + ) -> None: """Writes a .git file containing a(preferably) relative path to the actual git module repository. It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir :note: will overwrite existing files ! @@ -320,26 +353,40 @@ class Submodule(IndexObject, TraversableIterableObj): :param working_tree_dir: directory to write the .git file into :param module_abspath: absolute path to the bare repository """ - git_file = osp.join(working_tree_dir, '.git') + git_file = osp.join(working_tree_dir, ".git") rela_path = osp.relpath(module_abspath, start=working_tree_dir) if is_win: if osp.isfile(git_file): os.remove(git_file) - with open(git_file, 'wb') as fp: + with open(git_file, "wb") as fp: fp.write(("gitdir: %s" % rela_path).encode(defenc)) - with GitConfigParser(osp.join(module_abspath, 'config'), - read_only=False, merge_includes=False) as writer: - writer.set_value('core', 'worktree', - to_native_path_linux(osp.relpath(working_tree_dir, start=module_abspath))) + with GitConfigParser( + osp.join(module_abspath, "config"), read_only=False, merge_includes=False + ) as writer: + writer.set_value( + "core", + "worktree", + to_native_path_linux( + osp.relpath(working_tree_dir, start=module_abspath) + ), + ) - #{ Edit Interface + # { Edit Interface @classmethod - def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None, - branch: Union[str, None] = None, no_checkout: bool = False, depth: Union[int, None] = None, - env: Union[Mapping[str, str], None] = None, clone_multi_options: Union[Sequence[TBD], None] = None - ) -> 'Submodule': + def add( + cls, + repo: "Repo", + name: str, + path: PathLike, + url: Union[str, None] = None, + branch: Union[str, None] = None, + no_checkout: bool = False, + depth: Union[int, None] = None, + env: Union[Mapping[str, str], None] = None, + clone_multi_options: Union[Sequence[TBD], None] = None, + ) -> "Submodule": """Add a new submodule to the given repository. This will alter the index as well as the .gitmodules file, but will not create a new commit. If the submodule already exists, no matter if the configuration differs @@ -379,7 +426,9 @@ class Submodule(IndexObject, TraversableIterableObj): update fails for instance""" if repo.bare: - raise InvalidGitRepositoryError("Cannot add submodules to bare repositories") + raise InvalidGitRepositoryError( + "Cannot add submodules to bare repositories" + ) # END handle bare repos path = cls._to_relative_path(repo, path) @@ -391,7 +440,14 @@ class Submodule(IndexObject, TraversableIterableObj): # END assure url correctness # INSTANTIATE INTERMEDIATE SM - sm = cls(repo, cls.NULL_BIN_SHA, cls.k_default_mode, path, name, url='invalid-temporary') + sm = cls( + repo, + cls.NULL_BIN_SHA, + cls.k_default_mode, + path, + name, + url="invalid-temporary", + ) if sm.exists(): # reretrieve submodule from tree try: @@ -414,7 +470,9 @@ class Submodule(IndexObject, TraversableIterableObj): if has_module and url is not None: if url not in [r.url for r in sm.module().remotes]: raise ValueError( - "Specified URL '%s' does not match any remote url of the repository at '%s'" % (url, sm.abspath)) + "Specified URL '%s' does not match any remote url of the repository at '%s'" + % (url, sm.abspath) + ) # END check url # END verify urls match @@ -422,29 +480,33 @@ class Submodule(IndexObject, TraversableIterableObj): if url is None: if not has_module: - raise ValueError("A URL was not given and a repository did not exist at %s" % path) + raise ValueError( + "A URL was not given and a repository did not exist at %s" % path + ) # END check url mrepo = sm.module() # assert isinstance(mrepo, git.Repo) urls = [r.url for r in mrepo.remotes] if not urls: - raise ValueError("Didn't find any remote url in repository at %s" % sm.abspath) + raise ValueError( + "Didn't find any remote url in repository at %s" % sm.abspath + ) # END verify we have url url = urls[0] else: # clone new repo - kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {'n': no_checkout} + kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {"n": no_checkout} if not branch_is_default: - kwargs['b'] = br.name + kwargs["b"] = br.name # END setup checkout-branch if depth: if isinstance(depth, int): - kwargs['depth'] = depth + kwargs["depth"] = depth else: raise ValueError("depth should be an integer") if clone_multi_options: - kwargs['multi_options'] = clone_multi_options + kwargs["multi_options"] = clone_multi_options # _clone_repo(cls, repo, url, path, name, **kwargs): mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs) @@ -460,13 +522,13 @@ class Submodule(IndexObject, TraversableIterableObj): writer: Union[GitConfigParser, SectionConstraint] with sm.repo.config_writer() as writer: - writer.set_value(sm_section(name), 'url', url) + writer.set_value(sm_section(name), "url", url) # update configuration and index index = sm.repo.index with sm.config_writer(index=index, write=False) as writer: - writer.set_value('url', url) - writer.set_value('path', path) + writer.set_value("url", url) + writer.set_value("path", path) sm._url = url if not branch_is_default: @@ -481,10 +543,18 @@ class Submodule(IndexObject, TraversableIterableObj): return sm - def update(self, recursive: bool = False, init: bool = True, to_latest_revision: bool = False, - progress: Union['UpdateProgress', None] = None, dry_run: bool = False, - force: bool = False, keep_going: bool = False, env: Union[Mapping[str, str], None] = None, - clone_multi_options: Union[Sequence[TBD], None] = None) -> 'Submodule': + def update( + self, + recursive: bool = False, + init: bool = True, + to_latest_revision: bool = False, + progress: Union["UpdateProgress", None] = None, + dry_run: bool = False, + force: bool = False, + keep_going: bool = False, + env: Union[Mapping[str, str], None] = None, + clone_multi_options: Union[Sequence[TBD], None] = None, + ) -> "Submodule": """Update the repository of this submodule to point to the checkout we point at with the binsha of this instance. @@ -527,7 +597,7 @@ class Submodule(IndexObject, TraversableIterableObj): if progress is None: progress = UpdateProgress() # END handle progress - prefix = '' + prefix = "" if dry_run: prefix = "DRY-RUN: " # END handle prefix @@ -550,17 +620,27 @@ class Submodule(IndexObject, TraversableIterableObj): op |= BEGIN # END handle start - progress.update(op, i, len_rmts, prefix + "Fetching remote %s of submodule %r" - % (remote, self.name)) - #=============================== + progress.update( + op, + i, + len_rmts, + prefix + + "Fetching remote %s of submodule %r" % (remote, self.name), + ) + # =============================== if not dry_run: remote.fetch(progress=progress) # END handle dry-run - #=============================== + # =============================== if i == len_rmts - 1: op |= END # END handle end - progress.update(op, i, len_rmts, prefix + "Done fetching remote of submodule %r" % self.name) + progress.update( + op, + i, + len_rmts, + prefix + "Done fetching remote of submodule %r" % self.name, + ) # END fetch new data except InvalidGitRepositoryError: mrepo = None @@ -574,27 +654,49 @@ class Submodule(IndexObject, TraversableIterableObj): try: os.rmdir(checkout_module_abspath) except OSError as e: - raise OSError("Module directory at %r does already exist and is non-empty" - % checkout_module_abspath) from e + raise OSError( + "Module directory at %r does already exist and is non-empty" + % checkout_module_abspath + ) from e # END handle OSError # END handle directory removal # don't check it out at first - nonetheless it will create a local # branch according to the remote-HEAD if possible - progress.update(BEGIN | CLONE, 0, 1, prefix + "Cloning url '%s' to '%s' in submodule %r" % - (self.url, checkout_module_abspath, self.name)) + progress.update( + BEGIN | CLONE, + 0, + 1, + prefix + + "Cloning url '%s' to '%s' in submodule %r" + % (self.url, checkout_module_abspath, self.name), + ) if not dry_run: - mrepo = self._clone_repo(self.repo, self.url, self.path, self.name, n=True, env=env, - multi_options=clone_multi_options) + mrepo = self._clone_repo( + self.repo, + self.url, + self.path, + self.name, + n=True, + env=env, + multi_options=clone_multi_options, + ) # END handle dry-run - progress.update(END | CLONE, 0, 1, prefix + "Done cloning to %s" % checkout_module_abspath) + progress.update( + END | CLONE, + 0, + 1, + prefix + "Done cloning to %s" % checkout_module_abspath, + ) if not dry_run: # see whether we have a valid branch to checkout try: - mrepo = cast('Repo', mrepo) + mrepo = cast("Repo", mrepo) # find a remote which has our branch - we try to be flexible - remote_branch = find_first_remote_branch(mrepo.remotes, self.branch_name) + remote_branch = find_first_remote_branch( + mrepo.remotes, self.branch_name + ) local_branch = mkhead(mrepo, self.branch_path) # have a valid branch, but no checkout - make sure we can figure @@ -603,10 +705,15 @@ class Submodule(IndexObject, TraversableIterableObj): # END initial checkout + branch creation # make sure HEAD is not detached - mrepo.head.set_reference(local_branch, logmsg="submodule: attaching head to %s" % local_branch) + mrepo.head.set_reference( + local_branch, + logmsg="submodule: attaching head to %s" % local_branch, + ) mrepo.head.reference.set_tracking_branch(remote_branch) except (IndexError, InvalidGitRepositoryError): - log.warning("Failed to checkout tracking branch %s", self.branch_path) + log.warning( + "Failed to checkout tracking branch %s", self.branch_path + ) # END handle tracking branch # NOTE: Have to write the repo config file as well, otherwise @@ -614,7 +721,7 @@ class Submodule(IndexObject, TraversableIterableObj): # Maybe this is a good way to assure it doesn't get into our way, but # we want to stay backwards compatible too ... . Its so redundant ! with self.repo.config_writer() as writer: - writer.set_value(sm_section(self.name), 'url', self.url) + writer.set_value(sm_section(self.name), "url", self.url) # END handle dry_run # END handle initialization @@ -628,7 +735,10 @@ class Submodule(IndexObject, TraversableIterableObj): # END handle dry_run if mrepo is not None and to_latest_revision: - msg_base = "Cannot update to latest revision in repository at %r as " % mrepo.working_dir + msg_base = ( + "Cannot update to latest revision in repository at %r as " + % mrepo.working_dir + ) if not is_detached: rref = mrepo.head.reference.tracking_branch() if rref is not None: @@ -636,8 +746,11 @@ class Submodule(IndexObject, TraversableIterableObj): binsha = rcommit.binsha hexsha = rcommit.hexsha else: - log.error("%s a tracking branch was not set for local branch '%s'", - msg_base, mrepo.head.reference) + log.error( + "%s a tracking branch was not set for local branch '%s'", + msg_base, + mrepo.head.reference, + ) # END handle remote ref else: log.error("%s there was no local tracking branch", msg_base) @@ -654,28 +767,47 @@ class Submodule(IndexObject, TraversableIterableObj): may_reset = True if mrepo.head.commit.binsha != self.NULL_BIN_SHA: base_commit = mrepo.merge_base(mrepo.head.commit, hexsha) - if len(base_commit) == 0 or (base_commit[0] is not None and base_commit[0].hexsha == hexsha): + if len(base_commit) == 0 or ( + base_commit[0] is not None and base_commit[0].hexsha == hexsha + ): if force: msg = "Will force checkout or reset on local branch that is possibly in the future of" msg += "the commit it will be checked out to, effectively 'forgetting' new commits" log.debug(msg) else: msg = "Skipping %s on branch '%s' of submodule repo '%s' as it contains un-pushed commits" - msg %= (is_detached and "checkout" or "reset", mrepo.head, mrepo) + msg %= ( + is_detached and "checkout" or "reset", + mrepo.head, + mrepo, + ) log.info(msg) may_reset = False # end handle force # end handle if we are in the future - if may_reset and not force and mrepo.is_dirty(index=True, working_tree=True, untracked_files=True): - raise RepositoryDirtyError(mrepo, "Cannot reset a dirty repository") + if ( + may_reset + and not force + and mrepo.is_dirty( + index=True, working_tree=True, untracked_files=True + ) + ): + raise RepositoryDirtyError( + mrepo, "Cannot reset a dirty repository" + ) # end handle force and dirty state # end handle empty repo # end verify future/past - progress.update(BEGIN | UPDWKTREE, 0, 1, prefix + - "Updating working tree at %s for submodule %r to revision %s" - % (self.path, self.name, hexsha)) + progress.update( + BEGIN | UPDWKTREE, + 0, + 1, + prefix + + "Updating working tree at %s for submodule %r to revision %s" + % (self.path, self.name, hexsha), + ) if not dry_run and may_reset: if is_detached: @@ -688,8 +820,12 @@ class Submodule(IndexObject, TraversableIterableObj): mrepo.head.reset(hexsha, index=True, working_tree=True) # END handle checkout # if we may reset/checkout - progress.update(END | UPDWKTREE, 0, 1, prefix + "Done updating working tree for submodule %r" - % self.name) + progress.update( + END | UPDWKTREE, + 0, + 1, + prefix + "Done updating working tree for submodule %r" % self.name, + ) # END update to new commit only if needed except Exception as err: if not keep_going: @@ -703,8 +839,15 @@ class Submodule(IndexObject, TraversableIterableObj): # in dry_run mode, the module might not exist if mrepo is not None: for submodule in self.iter_items(self.module()): - submodule.update(recursive, init, to_latest_revision, progress=progress, dry_run=dry_run, - force=force, keep_going=keep_going) + submodule.update( + recursive, + init, + to_latest_revision, + progress=progress, + dry_run=dry_run, + force=force, + keep_going=keep_going, + ) # END handle recursive update # END handle dry run # END for each submodule @@ -712,7 +855,9 @@ class Submodule(IndexObject, TraversableIterableObj): return self @unbare_repo - def move(self, module_path: PathLike, configuration: bool = True, module: bool = True) -> 'Submodule': + def move( + self, module_path: PathLike, configuration: bool = True, module: bool = True + ) -> "Submodule": """Move the submodule to a another module path. This involves physically moving the repository at our current path, changing the configuration, as well as adjusting our index entry accordingly. @@ -732,7 +877,9 @@ class Submodule(IndexObject, TraversableIterableObj): in an inconsistent state if a sub - step fails for some reason """ if module + configuration < 1: - raise ValueError("You must specify to move at least the module or the configuration of the submodule") + raise ValueError( + "You must specify to move at least the module or the configuration of the submodule" + ) # END handle input module_checkout_path = self._to_relative_path(self.repo, module_path) @@ -742,9 +889,13 @@ class Submodule(IndexObject, TraversableIterableObj): return self # END handle no change - module_checkout_abspath = join_path_native(str(self.repo.working_tree_dir), module_checkout_path) + module_checkout_abspath = join_path_native( + str(self.repo.working_tree_dir), module_checkout_path + ) if osp.isfile(module_checkout_abspath): - raise ValueError("Cannot move repository onto a file: %s" % module_checkout_abspath) + raise ValueError( + "Cannot move repository onto a file: %s" % module_checkout_abspath + ) # END handle target files index = self.repo.index @@ -780,9 +931,11 @@ class Submodule(IndexObject, TraversableIterableObj): os.renames(cur_path, module_checkout_abspath) renamed_module = True - if osp.isfile(osp.join(module_checkout_abspath, '.git')): + if osp.isfile(osp.join(module_checkout_abspath, ".git")): module_abspath = self._module_abspath(self.repo, self.path, self.name) - self._write_git_file_and_module_config(module_checkout_abspath, module_abspath) + self._write_git_file_and_module_config( + module_checkout_abspath, module_abspath + ) # end handle git file rewrite # END move physical module @@ -794,16 +947,20 @@ class Submodule(IndexObject, TraversableIterableObj): try: ekey = index.entry_key(self.path, 0) entry = index.entries[ekey] - del(index.entries[ekey]) - nentry = git.IndexEntry(entry[:3] + (module_checkout_path,) + entry[4:]) + del index.entries[ekey] + nentry = git.IndexEntry( + entry[:3] + (module_checkout_path,) + entry[4:] + ) index.entries[tekey] = nentry except KeyError as e: - raise InvalidGitRepositoryError("Submodule's entry at %r did not exist" % (self.path)) from e + raise InvalidGitRepositoryError( + "Submodule's entry at %r did not exist" % (self.path) + ) from e # END handle submodule doesn't exist # update configuration - with self.config_writer(index=index) as writer: # auto-write - writer.set_value('path', module_checkout_path) + with self.config_writer(index=index) as writer: # auto-write + writer.set_value("path", module_checkout_path) self.path = module_checkout_path # END handle configuration flag except Exception: @@ -821,8 +978,13 @@ class Submodule(IndexObject, TraversableIterableObj): return self @unbare_repo - def remove(self, module: bool = True, force: bool = False, - configuration: bool = True, dry_run: bool = False) -> 'Submodule': + def remove( + self, + module: bool = True, + force: bool = False, + configuration: bool = True, + dry_run: bool = False, + ) -> "Submodule": """Remove this submodule from the repository. This will remove our entry from the .gitmodules file and the entry in the .git / config file. @@ -850,7 +1012,9 @@ class Submodule(IndexObject, TraversableIterableObj): :raise InvalidGitRepositoryError: thrown if the repository cannot be deleted :raise OSError: if directories or files could not be removed""" if not (module or configuration): - raise ValueError("Need to specify to delete at least the module, or the configuration") + raise ValueError( + "Need to specify to delete at least the module, or the configuration" + ) # END handle parameters # Recursively remove children of this submodule @@ -858,12 +1022,14 @@ class Submodule(IndexObject, TraversableIterableObj): for csm in self.children(): nc += 1 csm.remove(module, force, configuration, dry_run) - del(csm) + del csm # end if configuration and not dry_run and nc > 0: # Assure we don't leave the parent repository in a dirty state, and commit our changes # It's important for recursive, unforced, deletions to work as expected - self.module().index.commit("Removed at least one of child-modules of '%s'" % self.name) + self.module().index.commit( + "Removed at least one of child-modules of '%s'" % self.name + ) # end handle recursion # DELETE REPOSITORY WORKING TREE @@ -882,7 +1048,9 @@ class Submodule(IndexObject, TraversableIterableObj): elif osp.isdir(mp): method = rmtree elif osp.exists(mp): - raise AssertionError("Cannot forcibly delete repository as it was neither a link, nor a directory") + raise AssertionError( + "Cannot forcibly delete repository as it was neither a link, nor a directory" + ) # END handle brutal deletion if not dry_run: assert method @@ -893,7 +1061,8 @@ class Submodule(IndexObject, TraversableIterableObj): if mod.is_dirty(index=True, working_tree=True, untracked_files=True): raise InvalidGitRepositoryError( "Cannot delete module at %s with any modifications, unless force is specified" - % mod.working_tree_dir) + % mod.working_tree_dir + ) # END check for dirt # figure out whether we have new commits compared to the remotes @@ -910,30 +1079,36 @@ class Submodule(IndexObject, TraversableIterableObj): # not a single remote branch contained all our commits if len(rrefs) and num_branches_with_new_commits == len(rrefs): raise InvalidGitRepositoryError( - "Cannot delete module at %s as there are new commits" % mod.working_tree_dir) + "Cannot delete module at %s as there are new commits" + % mod.working_tree_dir + ) # END handle new commits # have to manually delete references as python's scoping is # not existing, they could keep handles open ( on windows this is a problem ) if len(rrefs): - del(rref) # skipcq: PYL-W0631 + del rref # skipcq: PYL-W0631 # END handle remotes - del(rrefs) - del(remote) + del rrefs + del remote # END for each remote # finally delete our own submodule if not dry_run: self._clear_cache() wtd = mod.working_tree_dir - del(mod) # release file-handles (windows) + del mod # release file-handles (windows) import gc + gc.collect() try: rmtree(str(wtd)) except Exception as ex: if HIDE_WINDOWS_KNOWN_ERRORS: from unittest import SkipTest - raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex + + raise SkipTest( + "FIXME: fails with: PermissionError\n {}".format(ex) + ) from ex raise # END delete tree if possible # END handle force @@ -945,7 +1120,10 @@ class Submodule(IndexObject, TraversableIterableObj): except Exception as ex: if HIDE_WINDOWS_KNOWN_ERRORS: from unittest import SkipTest - raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex + + raise SkipTest( + f"FIXME: fails with: PermissionError\n {ex}" + ) from ex else: raise # end handle separate bare repository @@ -961,7 +1139,7 @@ class Submodule(IndexObject, TraversableIterableObj): # first the index-entry parent_index = self.repo.index try: - del(parent_index.entries[parent_index.entry_key(self.path, 0)]) + del parent_index.entries[parent_index.entry_key(self.path, 0)] except KeyError: pass # END delete entry @@ -979,7 +1157,9 @@ class Submodule(IndexObject, TraversableIterableObj): return self - def set_parent_commit(self, commit: Union[Commit_ish, None], check: bool = True) -> 'Submodule': + def set_parent_commit( + self, commit: Union[Commit_ish, None], check: bool = True + ) -> "Submodule": """Set this instance to use the given commit whose tree is supposed to contain the .gitmodules blob. @@ -1000,7 +1180,10 @@ class Submodule(IndexObject, TraversableIterableObj): pcommit = self.repo.commit(commit) pctree = pcommit.tree if self.k_modules_file not in pctree: - raise ValueError("Tree of commit %s did not contain the %s file" % (commit, self.k_modules_file)) + raise ValueError( + "Tree of commit %s did not contain the %s file" + % (commit, self.k_modules_file) + ) # END handle exceptions prev_pc = self._parent_commit @@ -1010,7 +1193,10 @@ class Submodule(IndexObject, TraversableIterableObj): parser = self._config_parser(self.repo, self._parent_commit, read_only=True) if not parser.has_section(sm_section(self.name)): self._parent_commit = prev_pc - raise ValueError("Submodule at path %r did not exist in parent commit %s" % (self.path, commit)) + raise ValueError( + "Submodule at path %r did not exist in parent commit %s" + % (self.path, commit) + ) # END handle submodule did not exist # END handle checking mode @@ -1027,8 +1213,9 @@ class Submodule(IndexObject, TraversableIterableObj): return self @unbare_repo - def config_writer(self, index: Union['IndexFile', None] = None, write: bool = True - ) -> SectionConstraint['SubmoduleConfigParser']: + def config_writer( + self, index: Union["IndexFile", None] = None, write: bool = True + ) -> SectionConstraint["SubmoduleConfigParser"]: """:return: a config writer instance allowing you to read and write the data belonging to this submodule into the .gitmodules file. @@ -1049,7 +1236,7 @@ class Submodule(IndexObject, TraversableIterableObj): return writer @unbare_repo - def rename(self, new_name: str) -> 'Submodule': + def rename(self, new_name: str) -> "Submodule": """Rename this submodule :note: This method takes care of renaming the submodule in various places, such as @@ -1081,7 +1268,9 @@ class Submodule(IndexObject, TraversableIterableObj): # .git/modules mod = self.module() if mod.has_separate_working_tree(): - destination_module_abspath = self._module_abspath(self.repo, self.path, new_name) + destination_module_abspath = self._module_abspath( + self.repo, self.path, new_name + ) source_dir = mod.git_dir # Let's be sure the submodule name is not so obviously tied to a directory if str(destination_module_abspath).startswith(str(mod.git_dir)): @@ -1091,17 +1280,19 @@ class Submodule(IndexObject, TraversableIterableObj): # end handle self-containment os.renames(source_dir, destination_module_abspath) if mod.working_tree_dir: - self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath) + self._write_git_file_and_module_config( + mod.working_tree_dir, destination_module_abspath + ) # end move separate git repository return self - #} END edit interface + # } END edit interface - #{ Query Interface + # { Query Interface @unbare_repo - def module(self) -> 'Repo': + def module(self) -> "Repo": """:return: Repo instance initialized from the repository at our submodule path :raise InvalidGitRepositoryError: if a repository was not available. This could also mean that it was not yet initialized""" @@ -1113,9 +1304,13 @@ class Submodule(IndexObject, TraversableIterableObj): return repo # END handle repo uninitialized except (InvalidGitRepositoryError, NoSuchPathError) as e: - raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath) from e + raise InvalidGitRepositoryError( + "No valid repository at %s" % module_checkout_abspath + ) from e else: - raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath) + raise InvalidGitRepositoryError( + "Repository at %r was not yet checked out" % module_checkout_abspath + ) # END handle exceptions def module_exists(self) -> bool: @@ -1162,7 +1357,7 @@ class Submodule(IndexObject, TraversableIterableObj): # END handle object state consistency @property - def branch(self) -> 'Head': + def branch(self) -> "Head": """:return: The branch instance that we are to checkout :raise InvalidGitRepositoryError: if our module is not yet checked out""" return mkhead(self.module(), self._branch_path) @@ -1187,7 +1382,7 @@ class Submodule(IndexObject, TraversableIterableObj): return self._url @property - def parent_commit(self) -> 'Commit_ish': + def parent_commit(self) -> "Commit_ish": """:return: Commit instance with the tree containing the .gitmodules file :note: will always point to the current head's commit if it was not set explicitly""" if self._parent_commit is None: @@ -1215,22 +1410,27 @@ class Submodule(IndexObject, TraversableIterableObj): :raise IOError: If the .gitmodules file/blob could not be read""" return self._config_parser_constrained(read_only=True) - def children(self) -> IterableList['Submodule']: + def children(self) -> IterableList["Submodule"]: """ :return: IterableList(Submodule, ...) an iterable list of submodules instances which are children of this submodule or 0 if the submodule is not checked out""" return self._get_intermediate_items(self) - #} END query interface + # } END query interface - #{ Iterable Interface + # { Iterable Interface @classmethod - def iter_items(cls, repo: 'Repo', parent_commit: Union[Commit_ish, str] = 'HEAD', *Args: Any, **kwargs: Any - ) -> Iterator['Submodule']: + def iter_items( + cls, + repo: "Repo", + parent_commit: Union[Commit_ish, str] = "HEAD", + *Args: Any, + **kwargs: Any, + ) -> Iterator["Submodule"]: """:return: iterator yielding Submodule instances available in the given repository""" try: - pc = repo.commit(parent_commit) # parent commit instance + pc = repo.commit(parent_commit) # parent commit instance parser = cls._config_parser(repo, pc, read_only=True) except (IOError, BadName): return iter([]) @@ -1238,8 +1438,8 @@ class Submodule(IndexObject, TraversableIterableObj): for sms in parser.sections(): n = sm_name(sms) - p = parser.get(sms, 'path') - u = parser.get(sms, 'url') + p = parser.get(sms, "path") + u = parser.get(sms, "url") b = cls.k_head_default if parser.has_option(sms, cls.k_head_option): b = str(parser.get(sms, cls.k_head_option)) @@ -1248,7 +1448,7 @@ class Submodule(IndexObject, TraversableIterableObj): # get the binsha index = repo.index try: - rt = pc.tree # root tree + rt = pc.tree # root tree sm = rt[p] except KeyError: # try the index, maybe it was just added @@ -1273,4 +1473,4 @@ class Submodule(IndexObject, TraversableIterableObj): yield sm # END for each section - #} END iterable interface + # } END iterable interface diff --git a/git/objects/submodule/root.py b/git/objects/submodule/root.py index 08e1f954..16f0f91f 100644 --- a/git/objects/submodule/root.py +++ b/git/objects/submodule/root.py @@ -1,7 +1,4 @@ -from .base import ( - Submodule, - UpdateProgress -) +from .base import Submodule, UpdateProgress from .util import find_first_remote_branch from git.exc import InvalidGitRepositoryError import git @@ -22,14 +19,17 @@ if TYPE_CHECKING: __all__ = ["RootModule", "RootUpdateProgress"] -log = logging.getLogger('git.objects.submodule.root') +log = logging.getLogger("git.objects.submodule.root") log.addHandler(logging.NullHandler()) class RootUpdateProgress(UpdateProgress): """Utility class which adds more opcodes to the UpdateProgress""" + REMOVE, PATHCHANGE, BRANCHCHANGE, URLCHANGE = [ - 1 << x for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4)] + 1 << x + for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4) + ] _num_op_codes = UpdateProgress._num_op_codes + 4 __slots__ = () @@ -50,32 +50,39 @@ class RootModule(Submodule): __slots__ = () - k_root_name = '__ROOT__' + k_root_name = "__ROOT__" - def __init__(self, repo: 'Repo'): + def __init__(self, repo: "Repo"): # repo, binsha, mode=None, path=None, name = None, parent_commit=None, url=None, ref=None) super(RootModule, self).__init__( repo, binsha=self.NULL_BIN_SHA, mode=self.k_default_mode, - path='', + path="", name=self.k_root_name, parent_commit=repo.head.commit, - url='', - branch_path=git.Head.to_full_path(self.k_head_default) + url="", + branch_path=git.Head.to_full_path(self.k_head_default), ) def _clear_cache(self) -> None: """May not do anything""" pass - #{ Interface - - def update(self, previous_commit: Union[Commit_ish, None] = None, # type: ignore[override] - recursive: bool = True, force_remove: bool = False, init: bool = True, - to_latest_revision: bool = False, progress: Union[None, 'RootUpdateProgress'] = None, - dry_run: bool = False, force_reset: bool = False, keep_going: bool = False - ) -> 'RootModule': + # { Interface + + def update( + self, + previous_commit: Union[Commit_ish, None] = None, # type: ignore[override] + recursive: bool = True, + force_remove: bool = False, + init: bool = True, + to_latest_revision: bool = False, + progress: Union[None, "RootUpdateProgress"] = None, + dry_run: bool = False, + force_reset: bool = False, + keep_going: bool = False, + ) -> "RootModule": """Update the submodules of this repository to the current HEAD commit. This method behaves smartly by determining changes of the path of a submodules repository, next to changes to the to-be-checked-out commit or the branch to be @@ -109,16 +116,18 @@ class RootModule(Submodule): In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules :return: self""" if self.repo.bare: - raise InvalidGitRepositoryError("Cannot update submodules in bare repositories") + raise InvalidGitRepositoryError( + "Cannot update submodules in bare repositories" + ) # END handle bare if progress is None: progress = RootUpdateProgress() # END assure progress is set - prefix = '' + prefix = "" if dry_run: - prefix = 'DRY-RUN: ' + prefix = "DRY-RUN: " repo = self.repo @@ -137,17 +146,19 @@ class RootModule(Submodule): previous_commit = cur_commit # END exception handling else: - previous_commit = repo.commit(previous_commit) # obtain commit object + previous_commit = repo.commit(previous_commit) # obtain commit object # END handle previous commit - psms: 'IterableList[Submodule]' = self.list_items(repo, parent_commit=previous_commit) - sms: 'IterableList[Submodule]' = self.list_items(repo) + psms: "IterableList[Submodule]" = self.list_items( + repo, parent_commit=previous_commit + ) + sms: "IterableList[Submodule]" = self.list_items(repo) spsms = set(psms) ssms = set(sms) # HANDLE REMOVALS ################### - rrsm = (spsms - ssms) + rrsm = spsms - ssms len_rrsm = len(rrsm) for i, rsm in enumerate(rrsm): @@ -158,37 +169,58 @@ class RootModule(Submodule): # fake it into thinking its at the current commit to allow deletion # of previous module. Trigger the cache to be updated before that - progress.update(op, i, len_rrsm, prefix + "Removing submodule %r at %s" % (rsm.name, rsm.abspath)) + progress.update( + op, + i, + len_rrsm, + prefix + "Removing submodule %r at %s" % (rsm.name, rsm.abspath), + ) rsm._parent_commit = repo.head.commit - rsm.remove(configuration=False, module=True, force=force_remove, dry_run=dry_run) + rsm.remove( + configuration=False, + module=True, + force=force_remove, + dry_run=dry_run, + ) if i == len_rrsm - 1: op |= END # END handle end - progress.update(op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name) + progress.update( + op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name + ) # END for each removed submodule # HANDLE PATH RENAMES ##################### # url changes + branch changes - csms = (spsms & ssms) + csms = spsms & ssms len_csms = len(csms) for i, csm in enumerate(csms): - psm: 'Submodule' = psms[csm.name] - sm: 'Submodule' = sms[csm.name] + psm: "Submodule" = psms[csm.name] + sm: "Submodule" = sms[csm.name] # PATH CHANGES ############## if sm.path != psm.path and psm.module_exists(): - progress.update(BEGIN | PATHCHANGE, i, len_csms, prefix + - "Moving repository of submodule %r from %s to %s" - % (sm.name, psm.abspath, sm.abspath)) + progress.update( + BEGIN | PATHCHANGE, + i, + len_csms, + prefix + + "Moving repository of submodule %r from %s to %s" + % (sm.name, psm.abspath, sm.abspath), + ) # move the module to the new path if not dry_run: psm.move(sm.path, module=True, configuration=False) # END handle dry_run progress.update( - END | PATHCHANGE, i, len_csms, prefix + "Done moving repository of submodule %r" % sm.name) + END | PATHCHANGE, + i, + len_csms, + prefix + "Done moving repository of submodule %r" % sm.name, + ) # END handle path changes if sm.module_exists(): @@ -198,14 +230,20 @@ class RootModule(Submodule): # Add the new remote, remove the old one # This way, if the url just changes, the commits will not # have to be re-retrieved - nn = '__new_origin__' + nn = "__new_origin__" smm = sm.module() rmts = smm.remotes # don't do anything if we already have the url we search in place if len([r for r in rmts if r.url == sm.url]) == 0: - progress.update(BEGIN | URLCHANGE, i, len_csms, prefix + - "Changing url of submodule %r from %s to %s" % (sm.name, psm.url, sm.url)) + progress.update( + BEGIN | URLCHANGE, + i, + len_csms, + prefix + + "Changing url of submodule %r from %s to %s" + % (sm.name, psm.url, sm.url), + ) if not dry_run: assert nn not in [r.name for r in rmts] @@ -214,7 +252,16 @@ class RootModule(Submodule): # If we have a tracking branch, it should be available # in the new remote as well. - if len([r for r in smr.refs if r.remote_head == sm.branch_name]) == 0: + if ( + len( + [ + r + for r in smr.refs + if r.remote_head == sm.branch_name + ] + ) + == 0 + ): raise ValueError( "Submodule branch named %r was not available in new submodule remote at %r" % (sm.branch_name, sm.url) @@ -242,7 +289,9 @@ class RootModule(Submodule): # Alternatively we could just generate a unique name and leave all # existing ones in place raise InvalidGitRepositoryError( - "Couldn't find original remote-repo at url %r" % psm.url) + "Couldn't find original remote-repo at url %r" + % psm.url + ) # END handle one single remote # END handle check we found a remote @@ -277,15 +326,23 @@ class RootModule(Submodule): # this way, it will be checked out in the next step # This will change the submodule relative to us, so # the user will be able to commit the change easily - log.warning("Current sha %s was not contained in the tracking\ - branch at the new remote, setting it the the remote's tracking branch", sm.hexsha) + log.warning( + "Current sha %s was not contained in the tracking\ + branch at the new remote, setting it the the remote's tracking branch", + sm.hexsha, + ) sm.binsha = rref.commit.binsha # END reset binsha # NOTE: All checkout is performed by the base implementation of update # END handle dry_run progress.update( - END | URLCHANGE, i, len_csms, prefix + "Done adjusting url of submodule %r" % (sm.name)) + END | URLCHANGE, + i, + len_csms, + prefix + + "Done adjusting url of submodule %r" % (sm.name), + ) # END skip remote handling if new url already exists in module # END handle url @@ -294,9 +351,14 @@ class RootModule(Submodule): if sm.branch_path != psm.branch_path: # finally, create a new tracking branch which tracks the # new remote branch - progress.update(BEGIN | BRANCHCHANGE, i, len_csms, prefix + - "Changing branch of submodule %r from %s to %s" - % (sm.name, psm.branch_path, sm.branch_path)) + progress.update( + BEGIN | BRANCHCHANGE, + i, + len_csms, + prefix + + "Changing branch of submodule %r from %s to %s" + % (sm.name, psm.branch_path, sm.branch_path), + ) if not dry_run: smm = sm.module() smmr = smm.remotes @@ -306,13 +368,19 @@ class RootModule(Submodule): # end for each remote try: - tbr = git.Head.create(smm, sm.branch_name, logmsg='branch: Created from HEAD') + tbr = git.Head.create( + smm, + sm.branch_name, + logmsg="branch: Created from HEAD", + ) except OSError: # ... or reuse the existing one tbr = git.Head(smm, sm.branch_path) # END assure tracking branch exists - tbr.set_tracking_branch(find_first_remote_branch(smmr, sm.branch_name)) + tbr.set_tracking_branch( + find_first_remote_branch(smmr, sm.branch_name) + ) # NOTE: All head-resetting is done in the base implementation of update # but we will have to checkout the new branch here. As it still points to the currently # checkout out commit, we don't do any harm. @@ -321,7 +389,11 @@ class RootModule(Submodule): # END handle dry_run progress.update( - END | BRANCHCHANGE, i, len_csms, prefix + "Done changing branch of submodule %r" % sm.name) + END | BRANCHCHANGE, + i, + len_csms, + prefix + "Done changing branch of submodule %r" % sm.name, + ) # END handle branch # END handle # END for each common submodule @@ -335,8 +407,15 @@ class RootModule(Submodule): ###################################### for sm in sms: # update the submodule using the default method - sm.update(recursive=False, init=init, to_latest_revision=to_latest_revision, - progress=progress, dry_run=dry_run, force=force_reset, keep_going=keep_going) + sm.update( + recursive=False, + init=init, + to_latest_revision=to_latest_revision, + progress=progress, + dry_run=dry_run, + force=force_reset, + keep_going=keep_going, + ) # update recursively depth first - question is which inconsistent # state will be better in case it fails somewhere. Defective branch @@ -345,18 +424,27 @@ class RootModule(Submodule): if recursive: # the module would exist by now if we are not in dry_run mode if sm.module_exists(): - type(self)(sm.module()).update(recursive=True, force_remove=force_remove, - init=init, to_latest_revision=to_latest_revision, - progress=progress, dry_run=dry_run, force_reset=force_reset, - keep_going=keep_going) + type(self)(sm.module()).update( + recursive=True, + force_remove=force_remove, + init=init, + to_latest_revision=to_latest_revision, + progress=progress, + dry_run=dry_run, + force_reset=force_reset, + keep_going=keep_going, + ) # END handle dry_run # END handle recursive # END for each submodule to update return self - def module(self) -> 'Repo': + def module(self) -> "Repo": """:return: the actual repository containing the submodules""" return self.repo - #} END interface -#} END classes + + # } END interface + + +# } END classes diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py index cc1cd60a..456ae34b 100644 --- a/git/objects/submodule/util.py +++ b/git/objects/submodule/util.py @@ -20,10 +20,15 @@ if TYPE_CHECKING: from git.refs import RemoteReference -__all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch', - 'SubmoduleConfigParser') +__all__ = ( + "sm_section", + "sm_name", + "mkhead", + "find_first_remote_branch", + "SubmoduleConfigParser", +) -#{ Utilities +# { Utilities def sm_section(name: str) -> str: @@ -37,12 +42,14 @@ def sm_name(section: str) -> str: return section[11:-1] -def mkhead(repo: 'Repo', path: PathLike) -> 'Head': +def mkhead(repo: "Repo", path: PathLike) -> "Head": """:return: New branch/head instance""" return git.Head(repo, git.Head.to_full_path(path)) -def find_first_remote_branch(remotes: Sequence['Remote'], branch_name: str) -> 'RemoteReference': +def find_first_remote_branch( + remotes: Sequence["Remote"], branch_name: str +) -> "RemoteReference": """Find the remote branch matching the name of the given branch or raise InvalidGitRepositoryError""" for remote in remotes: try: @@ -51,12 +58,16 @@ def find_first_remote_branch(remotes: Sequence['Remote'], branch_name: str) -> ' continue # END exception handling # END for remote - raise InvalidGitRepositoryError("Didn't find remote branch '%r' in any of the given remotes" % branch_name) + raise InvalidGitRepositoryError( + "Didn't find remote branch '%r' in any of the given remotes" % branch_name + ) -#} END utilities +# } END utilities + + +# { Classes -#{ Classes class SubmoduleConfigParser(GitConfigParser): @@ -70,13 +81,13 @@ class SubmoduleConfigParser(GitConfigParser): """ def __init__(self, *args: Any, **kwargs: Any) -> None: - self._smref: Union['ReferenceType[Submodule]', None] = None + self._smref: Union["ReferenceType[Submodule]", None] = None self._index = None self._auto_write = True super(SubmoduleConfigParser, self).__init__(*args, **kwargs) - #{ Interface - def set_submodule(self, submodule: 'Submodule') -> None: + # { Interface + def set_submodule(self, submodule: "Submodule") -> None: """Set this instance's submodule. It must be called before the first write operation begins""" self._smref = weakref.ref(submodule) @@ -97,14 +108,15 @@ class SubmoduleConfigParser(GitConfigParser): sm._clear_cache() # END handle weakref - #} END interface + # } END interface - #{ Overridden Methods + # { Overridden Methods def write(self) -> None: # type: ignore[override] rval: None = super(SubmoduleConfigParser, self).write() self.flush_to_index() return rval + # END overridden methods -#} END classes +# } END classes diff --git a/git/objects/tag.py b/git/objects/tag.py index 7048eb40..3956a89e 100644 --- a/git/objects/tag.py +++ b/git/objects/tag.py @@ -20,23 +20,34 @@ if TYPE_CHECKING: from .blob import Blob from .tree import Tree -__all__ = ("TagObject", ) +__all__ = ("TagObject",) class TagObject(base.Object): """Non-Lightweight tag carrying additional information about an object we are pointing to.""" - type: Literal['tag'] = "tag" - __slots__ = ("object", "tag", "tagger", "tagged_date", "tagger_tz_offset", "message") - def __init__(self, repo: 'Repo', binsha: bytes, - object: Union[None, base.Object] = None, - tag: Union[None, str] = None, - tagger: Union[None, 'Actor'] = None, - tagged_date: Union[int, None] = None, - tagger_tz_offset: Union[int, None] = None, - message: Union[str, None] = None - ) -> None: # @ReservedAssignment + type: Literal["tag"] = "tag" + __slots__ = ( + "object", + "tag", + "tagger", + "tagged_date", + "tagger_tz_offset", + "message", + ) + + def __init__( + self, + repo: "Repo", + binsha: bytes, + object: Union[None, base.Object] = None, + tag: Union[None, str] = None, + tagger: Union[None, "Actor"] = None, + tagged_date: Union[int, None] = None, + tagger_tz_offset: Union[int, None] = None, + message: Union[str, None] = None, + ) -> None: # @ReservedAssignment """Initialize a tag object with additional data :param repo: repository this object is located in @@ -51,7 +62,7 @@ class TagObject(base.Object): authored_date is in, in a format similar to time.altzone""" super(TagObject, self).__init__(repo, binsha) if object is not None: - self.object: Union['Commit', 'Blob', 'Tree', 'TagObject'] = object + self.object: Union["Commit", "Blob", "Tree", "TagObject"] = object if tag is not None: self.tag = tag if tagger is not None: @@ -67,19 +78,22 @@ class TagObject(base.Object): """Cache all our attributes at once""" if attr in TagObject.__slots__: ostream = self.repo.odb.stream(self.binsha) - lines: List[str] = ostream.read().decode(defenc, 'replace').splitlines() + lines: List[str] = ostream.read().decode(defenc, "replace").splitlines() _obj, hexsha = lines[0].split(" ") _type_token, type_name = lines[1].split(" ") - object_type = get_object_type_by_name(type_name.encode('ascii')) - self.object = \ - object_type(self.repo, hex_to_bin(hexsha)) + object_type = get_object_type_by_name(type_name.encode("ascii")) + self.object = object_type(self.repo, hex_to_bin(hexsha)) self.tag = lines[2][4:] # tag <tag name> if len(lines) > 3: tagger_info = lines[3] # tagger <actor> <date> - self.tagger, self.tagged_date, self.tagger_tz_offset = parse_actor_and_date(tagger_info) + ( + self.tagger, + self.tagged_date, + self.tagger_tz_offset, + ) = parse_actor_and_date(tagger_info) # line 4 empty - it could mark the beginning of the next header # in case there really is no message, it would not exist. Otherwise @@ -87,7 +101,7 @@ class TagObject(base.Object): if len(lines) > 5: self.message = "\n".join(lines[5:]) else: - self.message = '' + self.message = "" # END check our attributes else: super(TagObject, self)._set_cache_(attr) diff --git a/git/objects/tree.py b/git/objects/tree.py index 22531895..e1fcced7 100644 --- a/git/objects/tree.py +++ b/git/objects/tree.py @@ -13,16 +13,24 @@ from .base import IndexObject, IndexObjUnion from .blob import Blob from .submodule.base import Submodule -from .fun import ( - tree_entries_from_data, - tree_to_stream -) +from .fun import tree_entries_from_data, tree_to_stream # typing ------------------------------------------------- -from typing import (Any, Callable, Dict, Iterable, Iterator, List, - Tuple, Type, Union, cast, TYPE_CHECKING) +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Tuple, + Type, + Union, + cast, + TYPE_CHECKING, +) from git.types import PathLike, Literal @@ -32,14 +40,15 @@ if TYPE_CHECKING: TreeCacheTup = Tuple[bytes, int, str] -TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion, - Tuple['Submodule', 'Submodule']]] +TraversedTreeTup = Union[ + Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]] +] # def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]: # return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) -#-------------------------------------------------------- +# -------------------------------------------------------- cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) @@ -60,8 +69,9 @@ def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int: return len_a - len_b -def merge_sort(a: List[TreeCacheTup], - cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None: +def merge_sort( + a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int] +) -> None: if len(a) < 2: return None @@ -102,7 +112,8 @@ class TreeModifier(object): Once all adjustments are complete, the _cache, which really is a reference to the cache of a tree, will be sorted. Assuring it will be in a serializable state""" - __slots__ = '_cache' + + __slots__ = "_cache" def __init__(self, cache: List[TreeCacheTup]) -> None: self._cache = cache @@ -116,18 +127,21 @@ class TreeModifier(object): # END for each item in cache return -1 - #{ Interface - def set_done(self) -> 'TreeModifier': + # { Interface + def set_done(self) -> "TreeModifier": """Call this method once you are done modifying the tree information. It may be called several times, but be aware that each call will cause a sort operation :return self:""" merge_sort(self._cache, git_cmp) return self - #} END interface - #{ Mutators - def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> 'TreeModifier': + # } END interface + + # { Mutators + def add( + self, sha: bytes, mode: int, name: str, force: bool = False + ) -> "TreeModifier": """Add the given item to the tree. If an item with the given name already exists, nothing will be done, but a ValueError will be raised if the sha and mode of the existing item do not match the one you add, unless @@ -138,7 +152,7 @@ class TreeModifier(object): :param force: If True, an item with your name and information will overwrite any existing item with the same name, no matter which information it has :return: self""" - if '/' in name: + if "/" in name: raise ValueError("Name must not contain '/' characters") if (mode >> 12) not in Tree._map_id_to_type: raise ValueError("Invalid object type according to mode %o" % mode) @@ -168,7 +182,11 @@ class TreeModifier(object): puts the caller into responsibility to assure the input is correct. For more information on the parameters, see ``add`` :param binsha: 20 byte binary sha""" - assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) + assert ( + isinstance(binsha, bytes) + and isinstance(mode, int) + and isinstance(name, str) + ) tree_cache = (binsha, mode, name) self._cache.append(tree_cache) @@ -177,9 +195,9 @@ class TreeModifier(object): """Deletes an item with the given name if it exists""" index = self._index_by_name(name) if index > -1: - del(self._cache[index]) + del self._cache[index] - #} END mutators + # } END mutators class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): @@ -195,11 +213,11 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): blob = tree[0] """ - type: Literal['tree'] = "tree" + type: Literal["tree"] = "tree" __slots__ = "_cache" # actual integer ids for comparison - commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link + commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link blob_id = 0o10 symlink_id = 0o12 tree_id = 0o04 @@ -211,12 +229,20 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): # tree id added once Tree is defined } - def __init__(self, repo: 'Repo', binsha: bytes, mode: int = tree_id << 12, path: Union[PathLike, None] = None): + def __init__( + self, + repo: "Repo", + binsha: bytes, + mode: int = tree_id << 12, + path: Union[PathLike, None] = None, + ): super(Tree, self).__init__(repo, binsha, mode, path) - @ classmethod - def _get_intermediate_items(cls, index_object: IndexObjUnion, - ) -> Union[Tuple['Tree', ...], Tuple[()]]: + @classmethod + def _get_intermediate_items( + cls, + index_object: IndexObjUnion, + ) -> Union[Tuple["Tree", ...], Tuple[()]]: if index_object.type == "tree": return tuple(index_object._iter_convert_to_object(index_object._cache)) return () @@ -230,8 +256,9 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): super(Tree, self)._set_cache_(attr) # END handle attribute - def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup] - ) -> Iterator[IndexObjUnion]: + def _iter_convert_to_object( + self, iterable: Iterable[TreeCacheTup] + ) -> Iterator[IndexObjUnion]: """Iterable yields tuples of (binsha, mode, name), which will be converted to the respective object representation""" for binsha, mode, name in iterable: @@ -239,7 +266,9 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): try: yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) except KeyError as e: - raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e + raise TypeError( + "Unknown mode %o found in tree data for path '%s'" % (mode, path) + ) from e # END for each item def join(self, file: str) -> IndexObjUnion: @@ -248,13 +277,13 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): :raise KeyError: if given file or tree does not exist in tree""" msg = "Blob or Tree named %r not found" - if '/' in file: + if "/" in file: tree = self item = self - tokens = file.split('/') + tokens = file.split("/") for i, token in enumerate(tokens): item = tree[token] - if item.type == 'tree': + if item.type == "tree": tree = item else: # safety assertion - blobs are at the end of the path @@ -268,9 +297,10 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): return item else: for info in self._cache: - if info[2] == file: # [2] == name - return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], - join_path(self.path, info[2])) + if info[2] == file: # [2] == name + return self._map_id_to_type[info[1] >> 12]( + self.repo, info[0], info[1], join_path(self.path, info[2]) + ) # END for each obj raise KeyError(msg % file) # END handle long paths @@ -279,17 +309,17 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): """For PY3 only""" return self.join(file) - @ property - def trees(self) -> List['Tree']: + @property + def trees(self) -> List["Tree"]: """:return: list(Tree, ...) list of trees directly below this tree""" return [i for i in self if i.type == "tree"] - @ property + @property def blobs(self) -> List[Blob]: """:return: list(Blob, ...) list of blobs directly below this tree""" return [i for i in self if i.type == "blob"] - @ property + @property def cache(self) -> TreeModifier: """ :return: An object allowing to modify the internal cache. This can be used @@ -298,16 +328,20 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): See the ``TreeModifier`` for more information on how to alter the cache""" return TreeModifier(self._cache) - def traverse(self, # type: ignore[override] - predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, - prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, - depth: int = -1, - branch_first: bool = True, - visit_once: bool = False, - ignore_self: int = 1, - as_edge: bool = False - ) -> Union[Iterator[IndexObjUnion], - Iterator[TraversedTreeTup]]: + def traverse( + self, # type: ignore[override] + predicate: Callable[ + [Union[IndexObjUnion, TraversedTreeTup], int], bool + ] = lambda i, d: True, + prune: Callable[ + [Union[IndexObjUnion, TraversedTreeTup], int], bool + ] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = False, + ignore_self: int = 1, + as_edge: bool = False, + ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: """For documentation, see util.Traversable._traverse() Trees are set to visit_once = False to gain more performance in the traversal""" @@ -321,9 +355,17 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): # ret_tup = itertools.tee(ret, 2) # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" # return ret_tup[0]""" - return cast(Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], - super(Tree, self)._traverse(predicate, prune, depth, # type: ignore - branch_first, visit_once, ignore_self)) + return cast( + Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], + super(Tree, self)._traverse( + predicate, + prune, + depth, # type: ignore + branch_first, + visit_once, + ignore_self, + ), + ) def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: """ @@ -331,7 +373,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): traverse() Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] """ - return super(Tree, self)._list_traverse(* args, **kwargs) + return super(Tree, self)._list_traverse(*args, **kwargs) # List protocol @@ -347,7 +389,9 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: if isinstance(item, int): info = self._cache[item] - return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) + return self._map_id_to_type[info[1] >> 12]( + self.repo, info[0], info[1], join_path(self.path, info[2]) + ) if isinstance(item, str): # compatibility @@ -378,7 +422,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): def __reversed__(self) -> Iterator[IndexObjUnion]: return reversed(self._iter_convert_to_object(self._cache)) # type: ignore - def _serialize(self, stream: 'BytesIO') -> 'Tree': + def _serialize(self, stream: "BytesIO") -> "Tree": """Serialize this tree into the stream. Please note that we will assume our tree data to be in a sorted state. If this is not the case, serialization will not generate a correct tree representation as these are assumed to be sorted @@ -386,7 +430,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): tree_to_stream(self._cache, stream.write) return self - def _deserialize(self, stream: 'BytesIO') -> 'Tree': + def _deserialize(self, stream: "BytesIO") -> "Tree": self._cache = tree_entries_from_data(stream.read()) return self diff --git a/git/objects/util.py b/git/objects/util.py index 800eccdf..4ba59c8a 100644 --- a/git/objects/util.py +++ b/git/objects/util.py @@ -7,11 +7,7 @@ from abc import ABC, abstractmethod import warnings -from git.util import ( - IterableList, - IterableObj, - Actor -) +from git.util import IterableList, IterableObj, Actor import re from collections import deque @@ -22,10 +18,24 @@ import calendar from datetime import datetime, timedelta, tzinfo # typing ------------------------------------------------------------ -from typing import (Any, Callable, Deque, Iterator, Generic, NamedTuple, overload, Sequence, # NOQA: F401 - TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast) +from typing import ( + Any, + Callable, + Deque, + Iterator, + Generic, + NamedTuple, + overload, + Sequence, # NOQA: F401 + TYPE_CHECKING, + Tuple, + Type, + TypeVar, + Union, + cast, +) -from git.types import Has_id_attribute, Literal, _T # NOQA: F401 +from git.types import Has_id_attribute, Literal, _T # NOQA: F401 if TYPE_CHECKING: from io import BytesIO, StringIO @@ -46,24 +56,38 @@ else: class TraverseNT(NamedTuple): depth: int - item: Union['Traversable', 'Blob'] - src: Union['Traversable', None] + item: Union["Traversable", "Blob"] + src: Union["Traversable", None] -T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse() +T_TIobj = TypeVar( + "T_TIobj", bound="TraversableIterableObj" +) # for TraversableIterableObj.traverse() -TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule - 'TraversedTreeTup'] # for tree.traverse() +TraversedTup = Union[ + Tuple[Union["Traversable", None], "Traversable"], # for commit, submodule + "TraversedTreeTup", +] # for tree.traverse() # -------------------------------------------------------------------- -__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date', - 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz', - 'verify_utctz', 'Actor', 'tzoffset', 'utc') +__all__ = ( + "get_object_type_by_name", + "parse_date", + "parse_actor_and_date", + "ProcessStreamAdapter", + "Traversable", + "altz_to_utctz_str", + "utctz_to_altz", + "verify_utctz", + "Actor", + "tzoffset", + "utc", +) ZERO = timedelta(0) -#{ Functions +# { Functions def mode_str_to_int(modestr: Union[bytes, str]) -> int: @@ -82,8 +106,9 @@ def mode_str_to_int(modestr: Union[bytes, str]) -> int: return mode -def get_object_type_by_name(object_type_name: bytes - ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]: +def get_object_type_by_name( + object_type_name: bytes, +) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]: """ :return: type suitable to handle the given object type name. Use the type to create new instances. @@ -93,18 +118,24 @@ def get_object_type_by_name(object_type_name: bytes :raise ValueError: In case object_type_name is unknown""" if object_type_name == b"commit": from . import commit + return commit.Commit elif object_type_name == b"tag": from . import tag + return tag.TagObject elif object_type_name == b"blob": from . import blob + return blob.Blob elif object_type_name == b"tree": from . import tree + return tree.Tree else: - raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode()) + raise ValueError( + "Cannot handle unknown object type: %s" % object_type_name.decode() + ) def utctz_to_altz(utctz: str) -> int: @@ -121,7 +152,7 @@ def altz_to_utctz_str(altz: float) -> str: utci = -1 * int((float(altz) / 3600) * 100) utcs = str(abs(utci)) utcs = "0" * (4 - len(utcs)) + utcs - prefix = (utci < 0 and '-') or '+' + prefix = (utci < 0 and "-") or "+" return prefix + utcs @@ -133,22 +164,23 @@ def verify_utctz(offset: str) -> str: raise fmt_exc if offset[0] not in "+-": raise fmt_exc - if offset[1] not in digits or\ - offset[2] not in digits or\ - offset[3] not in digits or\ - offset[4] not in digits: + if ( + offset[1] not in digits + or offset[2] not in digits + or offset[3] not in digits + or offset[4] not in digits + ): raise fmt_exc # END for each char return offset class tzoffset(tzinfo): - def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None: self._offset = timedelta(seconds=-secs_west_of_utc) - self._name = name or 'fixed' + self._name = name or "fixed" - def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]: + def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]: return tzoffset, (-self._offset.total_seconds(), self._name) def utcoffset(self, dt: Union[datetime, None]) -> timedelta: @@ -161,7 +193,7 @@ class tzoffset(tzinfo): return ZERO -utc = tzoffset(0, 'UTC') +utc = tzoffset(0, "UTC") def from_timestamp(timestamp: float, tz_offset: float) -> datetime: @@ -190,23 +222,27 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: """ if isinstance(string_date, datetime): if string_date.tzinfo: - utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None + utcoffset = cast( + timedelta, string_date.utcoffset() + ) # typeguard, if tzinfoand is not None offset = -int(utcoffset.total_seconds()) return int(string_date.astimezone(utc).timestamp()), offset else: - raise ValueError(f"string_date datetime object without tzinfo, {string_date}") + raise ValueError( + f"string_date datetime object without tzinfo, {string_date}" + ) # git time try: - if string_date.count(' ') == 1 and string_date.rfind(':') == -1: + if string_date.count(" ") == 1 and string_date.rfind(":") == -1: timestamp, offset_str = string_date.split() - if timestamp.startswith('@'): + if timestamp.startswith("@"): timestamp = timestamp[1:] timestamp_int = int(timestamp) return timestamp_int, utctz_to_altz(verify_utctz(offset_str)) else: - offset_str = "+0000" # local time by default - if string_date[-5] in '-+': + offset_str = "+0000" # local time by default + if string_date[-5] in "-+": offset_str = verify_utctz(string_date[-5:]) string_date = string_date[:-6] # skip space as well # END split timezone info @@ -215,9 +251,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: # now figure out the date and time portion - split time date_formats = [] splitter = -1 - if ',' in string_date: + if "," in string_date: date_formats.append("%a, %d %b %Y") - splitter = string_date.rfind(' ') + splitter = string_date.rfind(" ") else: # iso plus additional date_formats.append("%Y-%m-%d") @@ -225,16 +261,16 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: date_formats.append("%m/%d/%Y") date_formats.append("%d.%m.%Y") - splitter = string_date.rfind('T') + splitter = string_date.rfind("T") if splitter == -1: - splitter = string_date.rfind(' ') + splitter = string_date.rfind(" ") # END handle 'T' and ' ' # END handle rfc or iso assert splitter > -1 # split date and time - time_part = string_date[splitter + 1:] # skip space + time_part = string_date[splitter + 1 :] # skip space date_part = string_date[:splitter] # parse time @@ -243,9 +279,19 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: for fmt in date_formats: try: dtstruct = time.strptime(date_part, fmt) - utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday, - tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec, - dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst)) + utctime = calendar.timegm( + ( + dtstruct.tm_year, + dtstruct.tm_mon, + dtstruct.tm_mday, + tstruct.tm_hour, + tstruct.tm_min, + tstruct.tm_sec, + dtstruct.tm_wday, + dtstruct.tm_yday, + tstruct.tm_isdst, + ) + ) return int(utctime), offset except ValueError: continue @@ -256,13 +302,15 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]: raise ValueError("no format matched") # END handle format except Exception as e: - raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e + raise ValueError( + f"Unsupported date format or type: {string_date}, type={type(string_date)}" + ) from e # END handle exceptions # precompiled regex -_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$') -_re_only_actor = re.compile(r'^.+? (.*)$') +_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$") +_re_only_actor = re.compile(r"^.+? (.*)$") def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: @@ -271,19 +319,21 @@ def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]: author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700 :return: [Actor, int_seconds_since_epoch, int_timezone_offset]""" - actor, epoch, offset = '', '0', '0' + actor, epoch, offset = "", "0", "0" m = _re_actor_epoch.search(line) if m: actor, epoch, offset = m.groups() else: m = _re_only_actor.search(line) - actor = m.group(1) if m else line or '' + actor = m.group(1) if m else line or "" return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset)) -#} END functions + +# } END functions -#{ Classes +# { Classes + class ProcessStreamAdapter(object): @@ -292,9 +342,10 @@ class ProcessStreamAdapter(object): Use this type to hide the underlying process to provide access only to a specified stream. The process is usually wrapped into an AutoInterrupt class to kill it if the instance goes out of scope.""" + __slots__ = ("_proc", "_stream") - def __init__(self, process: 'Popen', stream_name: str) -> None: + def __init__(self, process: "Popen", stream_name: str) -> None: self._proc = process self._stream: StringIO = getattr(process, stream_name) # guessed type @@ -312,11 +363,12 @@ class Traversable(Protocol): Defined subclasses = [Commit, Tree, SubModule] """ + __slots__ = () @classmethod @abstractmethod - def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']: + def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]: """ Returns: Tuple of items connected to the given item. @@ -331,15 +383,18 @@ class Traversable(Protocol): @abstractmethod def list_traverse(self, *args: Any, **kwargs: Any) -> Any: """ """ - warnings.warn("list_traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) + warnings.warn( + "list_traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2, + ) return self._list_traverse(*args, **kwargs) - def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any - ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]: + def _list_traverse( + self, as_edge: bool = False, *args: Any, **kwargs: Any + ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]: """ :return: IterableList with the results of the traversal as produced by traverse() @@ -352,11 +407,13 @@ class Traversable(Protocol): if isinstance(self, Has_id_attribute): id = self._id_attribute_ else: - id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ + id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_ # could add _id_attribute_ to Traversable, or make all Traversable also Iterable? if not as_edge: - out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id) + out: IterableList[ + Union["Commit", "Submodule", "Tree", "Blob"] + ] = IterableList(id) out.extend(self.traverse(as_edge=as_edge, *args, **kwargs)) return out # overloads in subclasses (mypy doesn't allow typing self: subclass) @@ -366,23 +423,32 @@ class Traversable(Protocol): out_list: IterableList = IterableList(self.traverse(*args, **kwargs)) return out_list - @ abstractmethod + @abstractmethod def traverse(self, *args: Any, **kwargs: Any) -> Any: """ """ - warnings.warn("traverse() method should only be called from subclasses." - "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" - "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", - DeprecationWarning, - stacklevel=2) + warnings.warn( + "traverse() method should only be called from subclasses." + "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20" + "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit", + DeprecationWarning, + stacklevel=2, + ) return self._traverse(*args, **kwargs) - def _traverse(self, - predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True, - prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[Union['Traversable', 'Blob']], - Iterator[TraversedTup]]: + def _traverse( + self, + predicate: Callable[ + [Union["Traversable", "Blob", TraversedTup], int], bool + ] = lambda i, d: True, + prune: Callable[ + [Union["Traversable", "Blob", TraversedTup], int], bool + ] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = True, + ignore_self: int = 1, + as_edge: bool = False, + ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]: """:return: iterator yielding of items found when traversing self :param predicate: f(i,d) returns False if item i at depth d should not be included in the result @@ -426,24 +492,30 @@ class Traversable(Protocol): visited = set() stack: Deque[TraverseNT] = deque() - stack.append(TraverseNT(0, self, None)) # self is always depth level 0 - - def addToStack(stack: Deque[TraverseNT], - src_item: 'Traversable', - branch_first: bool, - depth: int) -> None: + stack.append(TraverseNT(0, self, None)) # self is always depth level 0 + + def addToStack( + stack: Deque[TraverseNT], + src_item: "Traversable", + branch_first: bool, + depth: int, + ) -> None: lst = self._get_intermediate_items(item) - if not lst: # empty list + if not lst: # empty list return None if branch_first: stack.extendleft(TraverseNT(depth, i, src_item) for i in lst) else: - reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1)) + reviter = ( + TraverseNT(depth, lst[i], src_item) + for i in range(len(lst) - 1, -1, -1) + ) stack.extend(reviter) + # END addToStack local method while stack: - d, item, src = stack.pop() # depth of item, item, item_source + d, item, src = stack.pop() # depth of item, item, item_source if visit_once and item in visited: continue @@ -451,8 +523,10 @@ class Traversable(Protocol): if visit_once: visited.add(item) - rval: Union[TraversedTup, 'Traversable', 'Blob'] - if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item) + rval: Union[TraversedTup, "Traversable", "Blob"] + if ( + as_edge + ): # if as_edge return (src, item) unless rrc is None (e.g. for first item) rval = (src, item) else: rval = item @@ -473,14 +547,15 @@ class Traversable(Protocol): # END for each item on work stack -@ runtime_checkable +@runtime_checkable class Serializable(Protocol): """Defines methods to serialize and deserialize objects from and into a data stream""" + __slots__ = () # @abstractmethod - def _serialize(self, stream: 'BytesIO') -> 'Serializable': + def _serialize(self, stream: "BytesIO") -> "Serializable": """Serialize the data of this object into the given data stream :note: a serialized object would ``_deserialize`` into the same object :param stream: a file-like object @@ -488,7 +563,7 @@ class Serializable(Protocol): raise NotImplementedError("To be implemented in subclass") # @abstractmethod - def _deserialize(self, stream: 'BytesIO') -> 'Serializable': + def _deserialize(self, stream: "BytesIO") -> "Serializable": """Deserialize all information regarding this object from the stream :param stream: a file-like object :return: self""" @@ -500,54 +575,76 @@ class TraversableIterableObj(IterableObj, Traversable): TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj] - def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]: - return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs) + def list_traverse( + self: T_TIobj, *args: Any, **kwargs: Any + ) -> IterableList[T_TIobj]: + return super(TraversableIterableObj, self)._list_traverse(*args, **kwargs) - @ overload # type: ignore - def traverse(self: T_TIobj - ) -> Iterator[T_TIobj]: + @overload # type: ignore + def traverse(self: T_TIobj) -> Iterator[T_TIobj]: ... - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[False], - ) -> Iterator[T_TIobj]: + @overload + def traverse( + self: T_TIobj, + predicate: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + prune: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + depth: int, + branch_first: bool, + visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[False], + ) -> Iterator[T_TIobj]: ... - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[False], - as_edge: Literal[True], - ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: + @overload + def traverse( + self: T_TIobj, + predicate: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + prune: Callable[ + [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool + ], + depth: int, + branch_first: bool, + visit_once: bool, + ignore_self: Literal[False], + as_edge: Literal[True], + ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]: ... - @ overload - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], - depth: int, branch_first: bool, visit_once: bool, - ignore_self: Literal[True], - as_edge: Literal[True], - ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: + @overload + def traverse( + self: T_TIobj, + predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool], + depth: int, + branch_first: bool, + visit_once: bool, + ignore_self: Literal[True], + as_edge: Literal[True], + ) -> Iterator[Tuple[T_TIobj, T_TIobj]]: ... - def traverse(self: T_TIobj, - predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: True, - prune: Callable[[Union[T_TIobj, TIobj_tuple], int], - bool] = lambda i, d: False, - depth: int = -1, branch_first: bool = True, visit_once: bool = True, - ignore_self: int = 1, as_edge: bool = False - ) -> Union[Iterator[T_TIobj], - Iterator[Tuple[T_TIobj, T_TIobj]], - Iterator[TIobj_tuple]]: + def traverse( + self: T_TIobj, + predicate: Callable[ + [Union[T_TIobj, TIobj_tuple], int], bool + ] = lambda i, d: True, + prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False, + depth: int = -1, + branch_first: bool = True, + visit_once: bool = True, + ignore_self: int = 1, + as_edge: bool = False, + ) -> Union[ + Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple] + ]: """For documentation, see util.Traversable._traverse()""" """ @@ -566,8 +663,9 @@ class TraversableIterableObj(IterableObj, Traversable): assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}" return ret_tup[0] """ - return cast(Union[Iterator[T_TIobj], - Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], - super(TraversableIterableObj, self)._traverse( - predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore - )) + return cast( + Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]], + super(TraversableIterableObj, self)._traverse( + predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore + ), + ) |