summaryrefslogtreecommitdiff
path: root/git/objects
diff options
context:
space:
mode:
Diffstat (limited to 'git/objects')
-rw-r--r--git/objects/__init__.py10
-rw-r--r--git/objects/base.py61
-rw-r--r--git/objects/blob.py7
-rw-r--r--git/objects/commit.py339
-rw-r--r--git/objects/fun.py75
-rw-r--r--git/objects/submodule/base.py536
-rw-r--r--git/objects/submodule/root.py204
-rw-r--r--git/objects/submodule/util.py40
-rw-r--r--git/objects/tag.py50
-rw-r--r--git/objects/tree.py158
-rw-r--r--git/objects/util.py362
11 files changed, 1220 insertions, 622 deletions
diff --git a/git/objects/__init__.py b/git/objects/__init__.py
index 1d0bb7a5..d2e1e53a 100644
--- a/git/objects/__init__.py
+++ b/git/objects/__init__.py
@@ -12,13 +12,17 @@ from .submodule.base import *
from .submodule.root import *
from .tag import *
from .tree import *
+
# Fix import dependency - add IndexObject to the util module, so that it can be
# imported by the submodule.base
smutil.IndexObject = IndexObject # type: ignore[attr-defined]
smutil.Object = Object # type: ignore[attr-defined]
-del(smutil)
+del smutil
# must come after submodule was made available
-__all__ = [name for name, obj in locals().items()
- if not (name.startswith('_') or inspect.ismodule(obj))]
+__all__ = [
+ name
+ for name, obj in locals().items()
+ if not (name.startswith("_") or inspect.ismodule(obj))
+]
diff --git a/git/objects/base.py b/git/objects/base.py
index 66e15a8f..9d005725 100644
--- a/git/objects/base.py
+++ b/git/objects/base.py
@@ -27,7 +27,7 @@ if TYPE_CHECKING:
from .submodule.base import Submodule
from git.refs.reference import Reference
-IndexObjUnion = Union['Tree', 'Blob', 'Submodule']
+IndexObjUnion = Union["Tree", "Blob", "Submodule"]
# --------------------------------------------------------------------------
@@ -40,14 +40,20 @@ __all__ = ("Object", "IndexObject")
class Object(LazyMixin):
"""Implements an Object which may be Blobs, Trees, Commits and Tags"""
- NULL_HEX_SHA = '0' * 40
- NULL_BIN_SHA = b'\0' * 20
- TYPES = (dbtyp.str_blob_type, dbtyp.str_tree_type, dbtyp.str_commit_type, dbtyp.str_tag_type)
+ NULL_HEX_SHA = "0" * 40
+ NULL_BIN_SHA = b"\0" * 20
+
+ TYPES = (
+ dbtyp.str_blob_type,
+ dbtyp.str_tree_type,
+ dbtyp.str_commit_type,
+ dbtyp.str_tag_type,
+ )
__slots__ = ("repo", "binsha", "size")
type: Union[Lit_commit_ish, None] = None
- def __init__(self, repo: 'Repo', binsha: bytes):
+ def __init__(self, repo: "Repo", binsha: bytes):
"""Initialize an object by identifying it by its binary sha.
All keyword arguments will be set on demand if None.
@@ -57,10 +63,13 @@ class Object(LazyMixin):
super(Object, self).__init__()
self.repo = repo
self.binsha = binsha
- assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % (binsha, len(binsha))
+ assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % (
+ binsha,
+ len(binsha),
+ )
@classmethod
- def new(cls, repo: 'Repo', id: Union[str, 'Reference']) -> Commit_ish:
+ def new(cls, repo: "Repo", id: Union[str, "Reference"]) -> Commit_ish:
"""
:return: New Object instance of a type appropriate to the object type behind
id. The id of the newly created object will be a binsha even though
@@ -73,14 +82,14 @@ class Object(LazyMixin):
return repo.rev_parse(str(id))
@classmethod
- def new_from_sha(cls, repo: 'Repo', sha1: bytes) -> Commit_ish:
+ def new_from_sha(cls, repo: "Repo", sha1: bytes) -> Commit_ish:
"""
:return: new object instance of a type appropriate to represent the given
binary sha1
:param sha1: 20 byte binary sha1"""
if sha1 == cls.NULL_BIN_SHA:
# the NULL binsha is always the root commit
- return get_object_type_by_name(b'commit')(repo, sha1)
+ return get_object_type_by_name(b"commit")(repo, sha1)
# END handle special case
oinfo = repo.odb.info(sha1)
inst = get_object_type_by_name(oinfo.type)(repo, oinfo.binsha)
@@ -98,13 +107,13 @@ class Object(LazyMixin):
def __eq__(self, other: Any) -> bool:
""":return: True if the objects have the same SHA1"""
- if not hasattr(other, 'binsha'):
+ if not hasattr(other, "binsha"):
return False
return self.binsha == other.binsha
def __ne__(self, other: Any) -> bool:
- """:return: True if the objects do not have the same SHA1 """
- if not hasattr(other, 'binsha'):
+ """:return: True if the objects do not have the same SHA1"""
+ if not hasattr(other, "binsha"):
return True
return self.binsha != other.binsha
@@ -124,15 +133,15 @@ class Object(LazyMixin):
def hexsha(self) -> str:
""":return: 40 byte hex version of our 20 byte binary sha"""
# b2a_hex produces bytes
- return bin_to_hex(self.binsha).decode('ascii')
+ return bin_to_hex(self.binsha).decode("ascii")
@property
- def data_stream(self) -> 'OStream':
- """ :return: File Object compatible stream to the uncompressed raw data of the object
+ def data_stream(self) -> "OStream":
+ """:return: File Object compatible stream to the uncompressed raw data of the object
:note: returned streams must be read in order"""
return self.repo.odb.stream(self.binsha)
- def stream_data(self, ostream: 'OStream') -> 'Object':
+ def stream_data(self, ostream: "OStream") -> "Object":
"""Writes our data directly to the given output stream
:param ostream: File object compatible stream object.
:return: self"""
@@ -145,14 +154,19 @@ class IndexObject(Object):
"""Base for all objects that can be part of the index file , namely Tree, Blob and
SubModule objects"""
+
__slots__ = ("path", "mode")
# for compatibility with iterable lists
- _id_attribute_ = 'path'
-
- def __init__(self,
- repo: 'Repo', binsha: bytes, mode: Union[None, int] = None, path: Union[None, PathLike] = None
- ) -> None:
+ _id_attribute_ = "path"
+
+ def __init__(
+ self,
+ repo: "Repo",
+ binsha: bytes,
+ mode: Union[None, int] = None,
+ path: Union[None, PathLike] = None,
+ ) -> None:
"""Initialize a newly instanced IndexObject
:param repo: is the Repo we are located in
@@ -184,7 +198,8 @@ class IndexObject(Object):
# they cannot be retrieved lateron ( not without searching for them )
raise AttributeError(
"Attribute '%s' unset: path and mode attributes must have been set during %s object creation"
- % (attr, type(self).__name__))
+ % (attr, type(self).__name__)
+ )
else:
super(IndexObject, self)._set_cache_(attr)
# END handle slot attribute
@@ -201,7 +216,7 @@ class IndexObject(Object):
Absolute path to this index object in the file system ( as opposed to the
.path field which is a path relative to the git repository ).
- The returned path will be native to the system and contains '\' on windows. """
+ The returned path will be native to the system and contains '\' on windows."""
if self.repo.working_tree_dir is not None:
return join_path_native(self.repo.working_tree_dir, self.path)
else:
diff --git a/git/objects/blob.py b/git/objects/blob.py
index 99b5c636..1881f210 100644
--- a/git/objects/blob.py
+++ b/git/objects/blob.py
@@ -8,14 +8,15 @@ from . import base
from git.types import Literal
-__all__ = ('Blob', )
+__all__ = ("Blob",)
class Blob(base.IndexObject):
"""A Blob encapsulates a git blob object"""
+
DEFAULT_MIME_TYPE = "text/plain"
- type: Literal['blob'] = "blob"
+ type: Literal["blob"] = "blob"
# valid blob modes
executable_mode = 0o100755
@@ -28,7 +29,7 @@ class Blob(base.IndexObject):
def mime_type(self) -> str:
"""
:return: String describing the mime type of this file (based on the filename)
- :note: Defaults to 'text/plain' in case the actual file type is unknown. """
+ :note: Defaults to 'text/plain' in case the actual file type is unknown."""
guesses = None
if self.path:
guesses = guess_type(str(self.path))
diff --git a/git/objects/commit.py b/git/objects/commit.py
index 96a2a8e5..137cc620 100644
--- a/git/objects/commit.py
+++ b/git/objects/commit.py
@@ -6,12 +6,7 @@
import datetime
from subprocess import Popen, PIPE
from gitdb import IStream
-from git.util import (
- hex_to_bin,
- Actor,
- Stats,
- finalize_process
-)
+from git.util import hex_to_bin, Actor, Stats, finalize_process
from git.diff import Diffable
from git.cmd import Git
@@ -26,13 +21,7 @@ from .util import (
from_timestamp,
)
-from time import (
- time,
- daylight,
- altzone,
- timezone,
- localtime
-)
+from time import time, daylight, altzone, timezone, localtime
import os
from io import BytesIO
import logging
@@ -40,7 +29,18 @@ import logging
# typing ------------------------------------------------------------------
-from typing import Any, IO, Iterator, List, Sequence, Tuple, Union, TYPE_CHECKING, cast, Dict
+from typing import (
+ Any,
+ IO,
+ Iterator,
+ List,
+ Sequence,
+ Tuple,
+ Union,
+ TYPE_CHECKING,
+ cast,
+ Dict,
+)
from git.types import PathLike, Literal
@@ -50,10 +50,10 @@ if TYPE_CHECKING:
# ------------------------------------------------------------------------
-log = logging.getLogger('git.objects.commit')
+log = logging.getLogger("git.objects.commit")
log.addHandler(logging.NullHandler())
-__all__ = ('Commit', )
+__all__ = ("Commit",)
class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
@@ -69,30 +69,44 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
env_committer_date = "GIT_COMMITTER_DATE"
# CONFIGURATION KEYS
- conf_encoding = 'i18n.commitencoding'
+ conf_encoding = "i18n.commitencoding"
# INVARIANTS
default_encoding = "UTF-8"
# object configuration
- type: Literal['commit'] = "commit"
- __slots__ = ("tree",
- "author", "authored_date", "author_tz_offset",
- "committer", "committed_date", "committer_tz_offset",
- "message", "parents", "encoding", "gpgsig")
+ type: Literal["commit"] = "commit"
+ __slots__ = (
+ "tree",
+ "author",
+ "authored_date",
+ "author_tz_offset",
+ "committer",
+ "committed_date",
+ "committer_tz_offset",
+ "message",
+ "parents",
+ "encoding",
+ "gpgsig",
+ )
_id_attribute_ = "hexsha"
- def __init__(self, repo: 'Repo', binsha: bytes, tree: Union[Tree, None] = None,
- author: Union[Actor, None] = None,
- authored_date: Union[int, None] = None,
- author_tz_offset: Union[None, float] = None,
- committer: Union[Actor, None] = None,
- committed_date: Union[int, None] = None,
- committer_tz_offset: Union[None, float] = None,
- message: Union[str, bytes, None] = None,
- parents: Union[Sequence['Commit'], None] = None,
- encoding: Union[str, None] = None,
- gpgsig: Union[str, None] = None) -> None:
+ def __init__(
+ self,
+ repo: "Repo",
+ binsha: bytes,
+ tree: Union[Tree, None] = None,
+ author: Union[Actor, None] = None,
+ authored_date: Union[int, None] = None,
+ author_tz_offset: Union[None, float] = None,
+ committer: Union[Actor, None] = None,
+ committed_date: Union[int, None] = None,
+ committer_tz_offset: Union[None, float] = None,
+ message: Union[str, bytes, None] = None,
+ parents: Union[Sequence["Commit"], None] = None,
+ encoding: Union[str, None] = None,
+ gpgsig: Union[str, None] = None,
+ ) -> None:
"""Instantiate a new Commit. All keyword arguments taking None as default will
be implicitly set on first query.
@@ -130,7 +144,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
super(Commit, self).__init__(repo, binsha)
self.binsha = binsha
if tree is not None:
- assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
+ assert isinstance(
+ tree, Tree
+ ), "Tree needs to be a Tree instance, was %s" % type(tree)
if tree is not None:
self.tree = tree
if author is not None:
@@ -155,16 +171,16 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
self.gpgsig = gpgsig
@classmethod
- def _get_intermediate_items(cls, commit: 'Commit') -> Tuple['Commit', ...]:
+ def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]:
return tuple(commit.parents)
@classmethod
- def _calculate_sha_(cls, repo: 'Repo', commit: 'Commit') -> bytes:
- '''Calculate the sha of a commit.
+ def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes:
+ """Calculate the sha of a commit.
:param repo: Repo object the commit should be part of
:param commit: Commit object for which to generate the sha
- '''
+ """
stream = BytesIO()
commit._serialize(stream)
@@ -174,18 +190,18 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
istream = repo.odb.store(IStream(cls.type, streamlen, stream))
return istream.binsha
- def replace(self, **kwargs: Any) -> 'Commit':
- '''Create new commit object from existing commit object.
+ def replace(self, **kwargs: Any) -> "Commit":
+ """Create new commit object from existing commit object.
Any values provided as keyword arguments will replace the
corresponding attribute in the new object.
- '''
+ """
attrs = {k: getattr(self, k) for k in self.__slots__}
for attrname in kwargs:
if attrname not in self.__slots__:
- raise ValueError('invalid attribute name')
+ raise ValueError("invalid attribute name")
attrs.update(kwargs)
new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs)
@@ -214,11 +230,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
def summary(self) -> Union[str, bytes]:
""":return: First line of the commit message"""
if isinstance(self.message, str):
- return self.message.split('\n', 1)[0]
+ return self.message.split("\n", 1)[0]
else:
- return self.message.split(b'\n', 1)[0]
+ return self.message.split(b"\n", 1)[0]
- def count(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> int:
+ def count(
+ self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any
+ ) -> int:
"""Count the number of commits reachable from this commit
:param paths:
@@ -232,7 +250,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# yes, it makes a difference whether empty paths are given or not in our case
# as the empty paths version will ignore merge commits for some reason.
if paths:
- return len(self.repo.git.rev_list(self.hexsha, '--', paths, **kwargs).splitlines())
+ return len(
+ self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines()
+ )
return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
@property
@@ -244,9 +264,13 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
return self.repo.git.name_rev(self)
@classmethod
- def iter_items(cls, repo: 'Repo', rev: Union[str, 'Commit', 'SymbolicReference'], # type: ignore
- paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any
- ) -> Iterator['Commit']:
+ def iter_items(
+ cls,
+ repo: "Repo",
+ rev: Union[str, "Commit", "SymbolicReference"], # type: ignore
+ paths: Union[PathLike, Sequence[PathLike]] = "",
+ **kwargs: Any,
+ ) -> Iterator["Commit"]:
"""Find all commits matching the given criteria.
:param repo: is the Repo
@@ -260,19 +284,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
``skip`` is the number of commits to skip
``since`` all commits since i.e. '1970-01-01'
:return: iterator yielding Commit items"""
- if 'pretty' in kwargs:
- raise ValueError("--pretty cannot be used as parsing expects single sha's only")
+ if "pretty" in kwargs:
+ raise ValueError(
+ "--pretty cannot be used as parsing expects single sha's only"
+ )
# END handle pretty
# use -- in any case, to prevent possibility of ambiguous arguments
# see https://github.com/gitpython-developers/GitPython/issues/264
- args_list: List[PathLike] = ['--']
+ args_list: List[PathLike] = ["--"]
if paths:
paths_tup: Tuple[PathLike, ...]
if isinstance(paths, (str, os.PathLike)):
- paths_tup = (paths, )
+ paths_tup = (paths,)
else:
paths_tup = tuple(paths)
@@ -282,37 +308,41 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)
- def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator['Commit']:
+ def iter_parents(
+ self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any
+ ) -> Iterator["Commit"]:
"""Iterate _all_ parents of this commit.
:param paths:
Optional path or list of paths limiting the Commits to those that
contain at least one of the paths
:param kwargs: All arguments allowed by git-rev-list
- :return: Iterator yielding Commit objects which are parents of self """
+ :return: Iterator yielding Commit objects which are parents of self"""
# skip ourselves
skip = kwargs.get("skip", 1)
- if skip == 0: # skip ourselves
+ if skip == 0: # skip ourselves
skip = 1
- kwargs['skip'] = skip
+ kwargs["skip"] = skip
return self.iter_items(self.repo, self, paths, **kwargs)
- @ property
+ @property
def stats(self) -> Stats:
"""Create a git stat from changes between this commit and its first parent
or from all changes done if this is the very first commit.
:return: git.Stats"""
if not self.parents:
- text = self.repo.git.diff_tree(self.hexsha, '--', numstat=True, root=True)
+ text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, root=True)
text2 = ""
for line in text.splitlines()[1:]:
(insertions, deletions, filename) = line.split("\t")
text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename)
text = text2
else:
- text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, '--', numstat=True)
+ text = self.repo.git.diff(
+ self.parents[0].hexsha, self.hexsha, "--", numstat=True
+ )
return Stats._list_from_string(self.repo, text)
@property
@@ -352,19 +382,21 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
"""
d = {}
- cmd = ['git', 'interpret-trailers', '--parse']
+ cmd = ["git", "interpret-trailers", "--parse"]
proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE) # type: ignore
trailer: str = proc.communicate(str(self.message).encode())[0].decode()
- if trailer.endswith('\n'):
+ if trailer.endswith("\n"):
trailer = trailer[0:-1]
- if trailer != '':
- for line in trailer.split('\n'):
- key, value = line.split(':', 1)
+ if trailer != "":
+ for line in trailer.split("\n"):
+ key, value = line.split(":", 1)
d[key.strip()] = value.strip()
return d
- @ classmethod
- def _iter_from_process_or_stream(cls, repo: 'Repo', proc_or_stream: Union[Popen, IO]) -> Iterator['Commit']:
+ @classmethod
+ def _iter_from_process_or_stream(
+ cls, repo: "Repo", proc_or_stream: Union[Popen, IO]
+ ) -> Iterator["Commit"]:
"""Parse out commit information into a list of Commit objects
We expect one-line per commit, and parse the actual commit information directly
from our lighting fast object database
@@ -378,11 +410,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# def is_stream(inp) -> TypeGuard[IO]:
# return hasattr(proc_or_stream, 'readline')
- if hasattr(proc_or_stream, 'wait'):
+ if hasattr(proc_or_stream, "wait"):
proc_or_stream = cast(Popen, proc_or_stream)
if proc_or_stream.stdout is not None:
stream = proc_or_stream.stdout
- elif hasattr(proc_or_stream, 'readline'):
+ elif hasattr(proc_or_stream, "readline"):
proc_or_stream = cast(IO, proc_or_stream)
stream = proc_or_stream
@@ -402,15 +434,23 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END for each line in stream
# TODO: Review this - it seems process handling got a bit out of control
# due to many developers trying to fix the open file handles issue
- if hasattr(proc_or_stream, 'wait'):
+ if hasattr(proc_or_stream, "wait"):
proc_or_stream = cast(Popen, proc_or_stream)
finalize_process(proc_or_stream)
- @ classmethod
- def create_from_tree(cls, repo: 'Repo', tree: Union[Tree, str], message: str,
- parent_commits: Union[None, List['Commit']] = None, head: bool = False,
- author: Union[None, Actor] = None, committer: Union[None, Actor] = None,
- author_date: Union[None, str] = None, commit_date: Union[None, str] = None) -> 'Commit':
+ @classmethod
+ def create_from_tree(
+ cls,
+ repo: "Repo",
+ tree: Union[Tree, str],
+ message: str,
+ parent_commits: Union[None, List["Commit"]] = None,
+ head: bool = False,
+ author: Union[None, Actor] = None,
+ committer: Union[None, Actor] = None,
+ author_date: Union[None, str] = None,
+ commit_date: Union[None, str] = None,
+ ) -> "Commit":
"""Commit the given tree, creating a commit object.
:param repo: Repo object the commit should be part of
@@ -473,7 +513,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
is_dst = daylight and localtime().tm_isdst > 0
offset = altzone if is_dst else timezone
- author_date_str = env.get(cls.env_author_date, '')
+ author_date_str = env.get(cls.env_author_date, "")
if author_date:
author_time, author_offset = parse_date(author_date)
elif author_date_str:
@@ -482,7 +522,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
author_time, author_offset = unix_time, offset
# END set author time
- committer_date_str = env.get(cls.env_committer_date, '')
+ committer_date_str = env.get(cls.env_committer_date, "")
if commit_date:
committer_time, committer_offset = parse_date(commit_date)
elif committer_date_str:
@@ -492,7 +532,7 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END set committer time
# assume utf8 encoding
- enc_section, enc_option = cls.conf_encoding.split('.')
+ enc_section, enc_option = cls.conf_encoding.split(".")
conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding)
if not isinstance(conf_encoding, str):
raise TypeError("conf_encoding could not be coerced to str")
@@ -504,10 +544,20 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END tree conversion
# CREATE NEW COMMIT
- new_commit = cls(repo, cls.NULL_BIN_SHA, tree,
- author, author_time, author_offset,
- committer, committer_time, committer_offset,
- message, parent_commits, conf_encoding)
+ new_commit = cls(
+ repo,
+ cls.NULL_BIN_SHA,
+ tree,
+ author,
+ author_time,
+ author_offset,
+ committer,
+ committer_time,
+ committer_offset,
+ message,
+ parent_commits,
+ conf_encoding,
+ )
new_commit.binsha = cls._calculate_sha_(repo, new_commit)
@@ -515,48 +565,74 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# need late import here, importing git at the very beginning throws
# as well ...
import git.refs
+
try:
repo.head.set_commit(new_commit, logmsg=message)
except ValueError:
# head is not yet set to the ref our HEAD points to
# Happens on first commit
- master = git.refs.Head.create(repo, repo.head.ref, new_commit, logmsg="commit (initial): %s" % message)
- repo.head.set_reference(master, logmsg='commit: Switching to %s' % master)
+ master = git.refs.Head.create(
+ repo,
+ repo.head.ref,
+ new_commit,
+ logmsg="commit (initial): %s" % message,
+ )
+ repo.head.set_reference(
+ master, logmsg="commit: Switching to %s" % master
+ )
# END handle empty repositories
# END advance head handling
return new_commit
- #{ Serializable Implementation
+ # { Serializable Implementation
- def _serialize(self, stream: BytesIO) -> 'Commit':
+ def _serialize(self, stream: BytesIO) -> "Commit":
write = stream.write
- write(("tree %s\n" % self.tree).encode('ascii'))
+ write(("tree %s\n" % self.tree).encode("ascii"))
for p in self.parents:
- write(("parent %s\n" % p).encode('ascii'))
+ write(("parent %s\n" % p).encode("ascii"))
a = self.author
aname = a.name
c = self.committer
fmt = "%s %s <%s> %s %s\n"
- write((fmt % ("author", aname, a.email,
- self.authored_date,
- altz_to_utctz_str(self.author_tz_offset))).encode(self.encoding))
+ write(
+ (
+ fmt
+ % (
+ "author",
+ aname,
+ a.email,
+ self.authored_date,
+ altz_to_utctz_str(self.author_tz_offset),
+ )
+ ).encode(self.encoding)
+ )
# encode committer
aname = c.name
- write((fmt % ("committer", aname, c.email,
- self.committed_date,
- altz_to_utctz_str(self.committer_tz_offset))).encode(self.encoding))
+ write(
+ (
+ fmt
+ % (
+ "committer",
+ aname,
+ c.email,
+ self.committed_date,
+ altz_to_utctz_str(self.committer_tz_offset),
+ )
+ ).encode(self.encoding)
+ )
if self.encoding != self.default_encoding:
- write(("encoding %s\n" % self.encoding).encode('ascii'))
+ write(("encoding %s\n" % self.encoding).encode("ascii"))
try:
- if self.__getattribute__('gpgsig'):
+ if self.__getattribute__("gpgsig"):
write(b"gpgsig")
for sigline in self.gpgsig.rstrip("\n").split("\n"):
- write((" " + sigline + "\n").encode('ascii'))
+ write((" " + sigline + "\n").encode("ascii"))
except AttributeError:
pass
@@ -570,23 +646,29 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# END handle encoding
return self
- def _deserialize(self, stream: BytesIO) -> 'Commit':
+ def _deserialize(self, stream: BytesIO) -> "Commit":
"""
:param from_rev_list: if true, the stream format is coming from the rev-list command
Otherwise it is assumed to be a plain data stream from our object
"""
readline = stream.readline
- self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, '')
+ self.tree = Tree(
+ self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, ""
+ )
self.parents = []
next_line = None
while True:
parent_line = readline()
- if not parent_line.startswith(b'parent'):
+ if not parent_line.startswith(b"parent"):
next_line = parent_line
break
# END abort reading parents
- self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode('ascii'))))
+ self.parents.append(
+ type(self)(
+ self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))
+ )
+ )
# END for each parent line
self.parents = tuple(self.parents)
@@ -596,9 +678,9 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
# we might run into one or more mergetag blocks, skip those for now
next_line = readline()
- while next_line.startswith(b'mergetag '):
+ while next_line.startswith(b"mergetag "):
next_line = readline()
- while next_line.startswith(b' '):
+ while next_line.startswith(b" "):
next_line = readline()
# end skip mergetags
@@ -612,10 +694,11 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
buf = enc.strip()
while buf:
if buf[0:10] == b"encoding ":
- self.encoding = buf[buf.find(b' ') + 1:].decode(
- self.encoding, 'ignore')
+ self.encoding = buf[buf.find(b" ") + 1 :].decode(
+ self.encoding, "ignore"
+ )
elif buf[0:7] == b"gpgsig ":
- sig = buf[buf.find(b' ') + 1:] + b"\n"
+ sig = buf[buf.find(b" ") + 1 :] + b"\n"
is_next_header = False
while True:
sigbuf = readline()
@@ -627,37 +710,55 @@ class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
break
sig += sigbuf[1:]
# end read all signature
- self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, 'ignore')
+ self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore")
if is_next_header:
continue
buf = readline().strip()
# decode the authors name
try:
- (self.author, self.authored_date, self.author_tz_offset) = \
- parse_actor_and_date(author_line.decode(self.encoding, 'replace'))
+ (
+ self.author,
+ self.authored_date,
+ self.author_tz_offset,
+ ) = parse_actor_and_date(author_line.decode(self.encoding, "replace"))
except UnicodeDecodeError:
- log.error("Failed to decode author line '%s' using encoding %s", author_line, self.encoding,
- exc_info=True)
+ log.error(
+ "Failed to decode author line '%s' using encoding %s",
+ author_line,
+ self.encoding,
+ exc_info=True,
+ )
try:
- self.committer, self.committed_date, self.committer_tz_offset = \
- parse_actor_and_date(committer_line.decode(self.encoding, 'replace'))
+ (
+ self.committer,
+ self.committed_date,
+ self.committer_tz_offset,
+ ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace"))
except UnicodeDecodeError:
- log.error("Failed to decode committer line '%s' using encoding %s", committer_line, self.encoding,
- exc_info=True)
+ log.error(
+ "Failed to decode committer line '%s' using encoding %s",
+ committer_line,
+ self.encoding,
+ exc_info=True,
+ )
# END handle author's encoding
# a stream from our data simply gives us the plain message
# The end of our message stream is marked with a newline that we strip
self.message = stream.read()
try:
- self.message = self.message.decode(self.encoding, 'replace')
+ self.message = self.message.decode(self.encoding, "replace")
except UnicodeDecodeError:
- log.error("Failed to decode message '%s' using encoding %s",
- self.message, self.encoding, exc_info=True)
+ log.error(
+ "Failed to decode message '%s' using encoding %s",
+ self.message,
+ self.encoding,
+ exc_info=True,
+ )
# END exception handling
return self
- #} END serializable implementation
+ # } END serializable implementation
diff --git a/git/objects/fun.py b/git/objects/fun.py
index 19b4e525..de065599 100644
--- a/git/objects/fun.py
+++ b/git/objects/fun.py
@@ -2,14 +2,20 @@
from stat import S_ISDIR
-from git.compat import (
- safe_decode,
- defenc
-)
+from git.compat import safe_decode, defenc
# typing ----------------------------------------------
-from typing import Callable, List, MutableSequence, Sequence, Tuple, TYPE_CHECKING, Union, overload
+from typing import (
+ Callable,
+ List,
+ MutableSequence,
+ Sequence,
+ Tuple,
+ TYPE_CHECKING,
+ Union,
+ overload,
+)
if TYPE_CHECKING:
from _typeshed import ReadableBuffer
@@ -21,19 +27,25 @@ EntryTupOrNone = Union[EntryTup, None]
# ---------------------------------------------------
-__all__ = ('tree_to_stream', 'tree_entries_from_data', 'traverse_trees_recursive',
- 'traverse_tree_recursive')
+__all__ = (
+ "tree_to_stream",
+ "tree_entries_from_data",
+ "traverse_trees_recursive",
+ "traverse_tree_recursive",
+)
-def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer'], Union[int, None]]) -> None:
+def tree_to_stream(
+ entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]
+) -> None:
"""Write the give list of entries into a stream using its write method
:param entries: **sorted** list of tuples with (binsha, mode, name)
:param write: write method which takes a data string"""
- ord_zero = ord('0')
- bit_mask = 7 # 3 bits set
+ ord_zero = ord("0")
+ bit_mask = 7 # 3 bits set
for binsha, mode, name in entries:
- mode_str = b''
+ mode_str = b""
for i in range(6):
mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str
# END for each 8 octal value
@@ -52,7 +64,7 @@ def tree_to_stream(entries: Sequence[EntryTup], write: Callable[['ReadableBuffer
name_bytes = name.encode(defenc)
else:
name_bytes = name # type: ignore[unreachable] # check runtime types - is always str?
- write(b''.join((mode_str, b' ', name_bytes, b'\0', binsha)))
+ write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha)))
# END for each item
@@ -60,8 +72,8 @@ def tree_entries_from_data(data: bytes) -> List[EntryTup]:
"""Reads the binary representation of a tree and returns tuples of Tree items
:param data: data block with tree data (as bytes)
:return: list(tuple(binsha, mode, tree_relative_path), ...)"""
- ord_zero = ord('0')
- space_ord = ord(' ')
+ ord_zero = ord("0")
+ space_ord = ord(" ")
len_data = len(data)
i = 0
out = []
@@ -95,15 +107,16 @@ def tree_entries_from_data(data: bytes) -> List[EntryTup]:
# byte is NULL, get next 20
i += 1
- sha = data[i:i + 20]
+ sha = data[i : i + 20]
i = i + 20
out.append((sha, mode, name))
# END for each byte in data stream
return out
-def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int
- ) -> EntryTupOrNone:
+def _find_by_name(
+ tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int
+) -> EntryTupOrNone:
"""return data entry matching the given name and tree mode
or None.
Before the item is returned, the respective data item is set
@@ -126,12 +139,12 @@ def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir:
return None
-@ overload
+@overload
def _to_full_path(item: None, path_prefix: str) -> None:
...
-@ overload
+@overload
def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup:
...
@@ -143,8 +156,9 @@ def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone:
return (item[0], item[1], path_prefix + item[2])
-def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[bytes, None]],
- path_prefix: str) -> List[Tuple[EntryTupOrNone, ...]]:
+def traverse_trees_recursive(
+ odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str
+) -> List[Tuple[EntryTupOrNone, ...]]:
"""
:return: list of list with entries according to the given binary tree-shas.
The result is encoded in a list
@@ -187,7 +201,7 @@ def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[by
entries = [None for _ in range(nt)]
entries[ti] = item
_sha, mode, name = item
- is_dir = S_ISDIR(mode) # type mode bits
+ is_dir = S_ISDIR(mode) # type mode bits
# find this item in all other tree data items
# wrap around, but stop one before our current index, hence
@@ -199,8 +213,13 @@ def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[by
# END for each other item data
# if we are a directory, enter recursion
if is_dir:
- out.extend(traverse_trees_recursive(
- odb, [((ei and ei[0]) or None) for ei in entries], path_prefix + name + '/'))
+ out.extend(
+ traverse_trees_recursive(
+ odb,
+ [((ei and ei[0]) or None) for ei in entries],
+ path_prefix + name + "/",
+ )
+ )
else:
out.append(tuple(_to_full_path(e, path_prefix) for e in entries))
@@ -210,12 +229,14 @@ def traverse_trees_recursive(odb: 'GitCmdObjectDB', tree_shas: Sequence[Union[by
# END for each item
# we are done with one tree, set all its data empty
- del(tree_data[:])
+ del tree_data[:]
# END for each tree_data chunk
return out
-def traverse_tree_recursive(odb: 'GitCmdObjectDB', tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
+def traverse_tree_recursive(
+ odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str
+) -> List[EntryTup]:
"""
:return: list of entries of the tree pointed to by the binary tree_sha. An entry
has the following format:
@@ -229,7 +250,7 @@ def traverse_tree_recursive(odb: 'GitCmdObjectDB', tree_sha: bytes, path_prefix:
# unpacking/packing is faster than accessing individual items
for sha, mode, name in data:
if S_ISDIR(mode):
- entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + '/'))
+ entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/"))
else:
entries.append((sha, mode, path_prefix + name))
# END for each item
diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py
index f7820455..84a34206 100644
--- a/git/objects/submodule/base.py
+++ b/git/objects/submodule/base.py
@@ -11,16 +11,12 @@ from git.compat import (
defenc,
is_win,
)
-from git.config import (
- SectionConstraint,
- GitConfigParser,
- cp
-)
+from git.config import SectionConstraint, GitConfigParser, cp
from git.exc import (
InvalidGitRepositoryError,
NoSuchPathError,
RepositoryDirtyError,
- BadName
+ BadName,
)
from git.objects.base import IndexObject, Object
from git.objects.util import TraversableIterableObj
@@ -31,7 +27,7 @@ from git.util import (
RemoteProgress,
rmtree,
unbare_repo,
- IterableList
+ IterableList,
)
from git.util import HIDE_WINDOWS_KNOWN_ERRORS
@@ -42,7 +38,7 @@ from .util import (
sm_name,
sm_section,
SubmoduleConfigParser,
- find_first_remote_branch
+ find_first_remote_branch,
)
@@ -63,7 +59,7 @@ if TYPE_CHECKING:
__all__ = ["Submodule", "UpdateProgress"]
-log = logging.getLogger('git.objects.submodule.base')
+log = logging.getLogger("git.objects.submodule.base")
log.addHandler(logging.NullHandler())
@@ -71,7 +67,11 @@ class UpdateProgress(RemoteProgress):
"""Class providing detailed progress information to the caller who should
derive from it and implement the ``update(...)`` message"""
- CLONE, FETCH, UPDWKTREE = [1 << x for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)]
+
+ CLONE, FETCH, UPDWKTREE = [
+ 1 << x
+ for x in range(RemoteProgress._num_op_codes, RemoteProgress._num_op_codes + 3)
+ ]
_num_op_codes: int = RemoteProgress._num_op_codes + 3
__slots__ = ()
@@ -98,25 +98,30 @@ class Submodule(IndexObject, TraversableIterableObj):
All methods work in bare and non-bare repositories."""
_id_attribute_ = "name"
- k_modules_file = '.gitmodules'
- k_head_option = 'branch'
- k_head_default = 'master'
- k_default_mode = stat.S_IFDIR | stat.S_IFLNK # submodules are directories with link-status
+ k_modules_file = ".gitmodules"
+ k_head_option = "branch"
+ k_head_default = "master"
+ k_default_mode = (
+ stat.S_IFDIR | stat.S_IFLNK
+ ) # submodules are directories with link-status
# this is a bogus type for base class compatibility
- type: Literal['submodule'] = 'submodule' # type: ignore
-
- __slots__ = ('_parent_commit', '_url', '_branch_path', '_name', '__weakref__')
- _cache_attrs = ('path', '_url', '_branch_path')
-
- def __init__(self, repo: 'Repo', binsha: bytes,
- mode: Union[int, None] = None,
- path: Union[PathLike, None] = None,
- name: Union[str, None] = None,
- parent_commit: Union[Commit_ish, None] = None,
- url: Union[str, None] = None,
- branch_path: Union[PathLike, None] = None
- ) -> None:
+ type: Literal["submodule"] = "submodule" # type: ignore
+
+ __slots__ = ("_parent_commit", "_url", "_branch_path", "_name", "__weakref__")
+ _cache_attrs = ("path", "_url", "_branch_path")
+
+ def __init__(
+ self,
+ repo: "Repo",
+ binsha: bytes,
+ mode: Union[int, None] = None,
+ path: Union[PathLike, None] = None,
+ name: Union[str, None] = None,
+ parent_commit: Union[Commit_ish, None] = None,
+ url: Union[str, None] = None,
+ branch_path: Union[PathLike, None] = None,
+ ) -> None:
"""Initialize this instance with its attributes. We only document the ones
that differ from ``IndexObject``
@@ -137,32 +142,38 @@ class Submodule(IndexObject, TraversableIterableObj):
self._name = name
def _set_cache_(self, attr: str) -> None:
- if attr in ('path', '_url', '_branch_path'):
+ if attr in ("path", "_url", "_branch_path"):
reader: SectionConstraint = self.config_reader()
# default submodule values
try:
- self.path = reader.get('path')
+ self.path = reader.get("path")
except cp.NoSectionError as e:
if self.repo.working_tree_dir is not None:
- raise ValueError("This submodule instance does not exist anymore in '%s' file"
- % osp.join(self.repo.working_tree_dir, '.gitmodules')) from e
+ raise ValueError(
+ "This submodule instance does not exist anymore in '%s' file"
+ % osp.join(self.repo.working_tree_dir, ".gitmodules")
+ ) from e
# end
- self._url = reader.get('url')
+ self._url = reader.get("url")
# git-python extension values - optional
- self._branch_path = reader.get_value(self.k_head_option, git.Head.to_full_path(self.k_head_default))
- elif attr == '_name':
- raise AttributeError("Cannot retrieve the name of a submodule if it was not set initially")
+ self._branch_path = reader.get_value(
+ self.k_head_option, git.Head.to_full_path(self.k_head_default)
+ )
+ elif attr == "_name":
+ raise AttributeError(
+ "Cannot retrieve the name of a submodule if it was not set initially"
+ )
else:
super(Submodule, self)._set_cache_(attr)
# END handle attribute name
@classmethod
- def _get_intermediate_items(cls, item: 'Submodule') -> IterableList['Submodule']:
+ def _get_intermediate_items(cls, item: "Submodule") -> IterableList["Submodule"]:
""":return: all the submodules of our module repository"""
try:
return cls.list_items(item.module())
except InvalidGitRepositoryError:
- return IterableList('')
+ return IterableList("")
# END handle intermediate items
@classmethod
@@ -188,13 +199,18 @@ class Submodule(IndexObject, TraversableIterableObj):
return self._name
def __repr__(self) -> str:
- return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)"\
- % (type(self).__name__, self._name, self.path, self.url, self.branch_path)
+ return "git.%s(name=%s, path=%s, url=%s, branch_path=%s)" % (
+ type(self).__name__,
+ self._name,
+ self.path,
+ self.url,
+ self.branch_path,
+ )
@classmethod
- def _config_parser(cls, repo: 'Repo',
- parent_commit: Union[Commit_ish, None],
- read_only: bool) -> SubmoduleConfigParser:
+ def _config_parser(
+ cls, repo: "Repo", parent_commit: Union[Commit_ish, None], read_only: bool
+ ) -> SubmoduleConfigParser:
""":return: Config Parser constrained to our submodule in read or write mode
:raise IOError: If the .gitmodules file cannot be found, either locally or in the repository
at the given parent commit. Otherwise the exception would be delayed until the first
@@ -211,17 +227,23 @@ class Submodule(IndexObject, TraversableIterableObj):
if not repo.bare and parent_matches_head and repo.working_tree_dir:
fp_module = osp.join(repo.working_tree_dir, cls.k_modules_file)
else:
- assert parent_commit is not None, "need valid parent_commit in bare repositories"
+ assert (
+ parent_commit is not None
+ ), "need valid parent_commit in bare repositories"
try:
fp_module = cls._sio_modules(parent_commit)
except KeyError as e:
- raise IOError("Could not find %s file in the tree of parent commit %s" %
- (cls.k_modules_file, parent_commit)) from e
+ raise IOError(
+ "Could not find %s file in the tree of parent commit %s"
+ % (cls.k_modules_file, parent_commit)
+ ) from e
# END handle exceptions
# END handle non-bare working tree
if not read_only and (repo.bare or not parent_matches_head):
- raise ValueError("Cannot write blobs of 'historical' submodule configurations")
+ raise ValueError(
+ "Cannot write blobs of 'historical' submodule configurations"
+ )
# END handle writes of historical submodules
return SubmoduleConfigParser(fp_module, read_only=read_only)
@@ -246,7 +268,7 @@ class Submodule(IndexObject, TraversableIterableObj):
def _config_parser_constrained(self, read_only: bool) -> SectionConstraint:
""":return: Config Parser constrained to our submodule in read or write mode"""
try:
- pc: Union['Commit_ish', None] = self.parent_commit
+ pc: Union["Commit_ish", None] = self.parent_commit
except ValueError:
pc = None
# end handle empty parent repository
@@ -255,16 +277,20 @@ class Submodule(IndexObject, TraversableIterableObj):
return SectionConstraint(parser, sm_section(self.name))
@classmethod
- def _module_abspath(cls, parent_repo: 'Repo', path: PathLike, name: str) -> PathLike:
+ def _module_abspath(
+ cls, parent_repo: "Repo", path: PathLike, name: str
+ ) -> PathLike:
if cls._need_gitfile_submodules(parent_repo.git):
- return osp.join(parent_repo.git_dir, 'modules', name)
+ return osp.join(parent_repo.git_dir, "modules", name)
if parent_repo.working_tree_dir:
return osp.join(parent_repo.working_tree_dir, path)
raise NotADirectoryError()
# end
@classmethod
- def _clone_repo(cls, repo: 'Repo', url: str, path: PathLike, name: str, **kwargs: Any) -> 'Repo':
+ def _clone_repo(
+ cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any
+ ) -> "Repo":
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
@@ -274,7 +300,7 @@ class Submodule(IndexObject, TraversableIterableObj):
module_abspath = cls._module_abspath(repo, path, name)
module_checkout_path = module_abspath
if cls._need_gitfile_submodules(repo.git):
- kwargs['separate_git_dir'] = module_abspath
+ kwargs["separate_git_dir"] = module_abspath
module_abspath_dir = osp.dirname(module_abspath)
if not osp.isdir(module_abspath_dir):
os.makedirs(module_abspath_dir)
@@ -288,29 +314,36 @@ class Submodule(IndexObject, TraversableIterableObj):
return clone
@classmethod
- def _to_relative_path(cls, parent_repo: 'Repo', path: PathLike) -> PathLike:
+ def _to_relative_path(cls, parent_repo: "Repo", path: PathLike) -> PathLike:
""":return: a path guaranteed to be relative to the given parent - repository
:raise ValueError: if path is not contained in the parent repository's working tree"""
path = to_native_path_linux(path)
- if path.endswith('/'):
+ if path.endswith("/"):
path = path[:-1]
# END handle trailing slash
if osp.isabs(path) and parent_repo.working_tree_dir:
working_tree_linux = to_native_path_linux(parent_repo.working_tree_dir)
if not path.startswith(working_tree_linux):
- raise ValueError("Submodule checkout path '%s' needs to be within the parents repository at '%s'"
- % (working_tree_linux, path))
- path = path[len(working_tree_linux.rstrip('/')) + 1:]
+ raise ValueError(
+ "Submodule checkout path '%s' needs to be within the parents repository at '%s'"
+ % (working_tree_linux, path)
+ )
+ path = path[len(working_tree_linux.rstrip("/")) + 1 :]
if not path:
- raise ValueError("Absolute submodule path '%s' didn't yield a valid relative path" % path)
+ raise ValueError(
+ "Absolute submodule path '%s' didn't yield a valid relative path"
+ % path
+ )
# end verify converted relative path makes sense
# end convert to a relative path
return path
@classmethod
- def _write_git_file_and_module_config(cls, working_tree_dir: PathLike, module_abspath: PathLike) -> None:
+ def _write_git_file_and_module_config(
+ cls, working_tree_dir: PathLike, module_abspath: PathLike
+ ) -> None:
"""Writes a .git file containing a(preferably) relative path to the actual git module repository.
It is an error if the module_abspath cannot be made into a relative path, relative to the working_tree_dir
:note: will overwrite existing files !
@@ -320,26 +353,40 @@ class Submodule(IndexObject, TraversableIterableObj):
:param working_tree_dir: directory to write the .git file into
:param module_abspath: absolute path to the bare repository
"""
- git_file = osp.join(working_tree_dir, '.git')
+ git_file = osp.join(working_tree_dir, ".git")
rela_path = osp.relpath(module_abspath, start=working_tree_dir)
if is_win:
if osp.isfile(git_file):
os.remove(git_file)
- with open(git_file, 'wb') as fp:
+ with open(git_file, "wb") as fp:
fp.write(("gitdir: %s" % rela_path).encode(defenc))
- with GitConfigParser(osp.join(module_abspath, 'config'),
- read_only=False, merge_includes=False) as writer:
- writer.set_value('core', 'worktree',
- to_native_path_linux(osp.relpath(working_tree_dir, start=module_abspath)))
+ with GitConfigParser(
+ osp.join(module_abspath, "config"), read_only=False, merge_includes=False
+ ) as writer:
+ writer.set_value(
+ "core",
+ "worktree",
+ to_native_path_linux(
+ osp.relpath(working_tree_dir, start=module_abspath)
+ ),
+ )
- #{ Edit Interface
+ # { Edit Interface
@classmethod
- def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = None,
- branch: Union[str, None] = None, no_checkout: bool = False, depth: Union[int, None] = None,
- env: Union[Mapping[str, str], None] = None, clone_multi_options: Union[Sequence[TBD], None] = None
- ) -> 'Submodule':
+ def add(
+ cls,
+ repo: "Repo",
+ name: str,
+ path: PathLike,
+ url: Union[str, None] = None,
+ branch: Union[str, None] = None,
+ no_checkout: bool = False,
+ depth: Union[int, None] = None,
+ env: Union[Mapping[str, str], None] = None,
+ clone_multi_options: Union[Sequence[TBD], None] = None,
+ ) -> "Submodule":
"""Add a new submodule to the given repository. This will alter the index
as well as the .gitmodules file, but will not create a new commit.
If the submodule already exists, no matter if the configuration differs
@@ -379,7 +426,9 @@ class Submodule(IndexObject, TraversableIterableObj):
update fails for instance"""
if repo.bare:
- raise InvalidGitRepositoryError("Cannot add submodules to bare repositories")
+ raise InvalidGitRepositoryError(
+ "Cannot add submodules to bare repositories"
+ )
# END handle bare repos
path = cls._to_relative_path(repo, path)
@@ -391,7 +440,14 @@ class Submodule(IndexObject, TraversableIterableObj):
# END assure url correctness
# INSTANTIATE INTERMEDIATE SM
- sm = cls(repo, cls.NULL_BIN_SHA, cls.k_default_mode, path, name, url='invalid-temporary')
+ sm = cls(
+ repo,
+ cls.NULL_BIN_SHA,
+ cls.k_default_mode,
+ path,
+ name,
+ url="invalid-temporary",
+ )
if sm.exists():
# reretrieve submodule from tree
try:
@@ -414,7 +470,9 @@ class Submodule(IndexObject, TraversableIterableObj):
if has_module and url is not None:
if url not in [r.url for r in sm.module().remotes]:
raise ValueError(
- "Specified URL '%s' does not match any remote url of the repository at '%s'" % (url, sm.abspath))
+ "Specified URL '%s' does not match any remote url of the repository at '%s'"
+ % (url, sm.abspath)
+ )
# END check url
# END verify urls match
@@ -422,29 +480,33 @@ class Submodule(IndexObject, TraversableIterableObj):
if url is None:
if not has_module:
- raise ValueError("A URL was not given and a repository did not exist at %s" % path)
+ raise ValueError(
+ "A URL was not given and a repository did not exist at %s" % path
+ )
# END check url
mrepo = sm.module()
# assert isinstance(mrepo, git.Repo)
urls = [r.url for r in mrepo.remotes]
if not urls:
- raise ValueError("Didn't find any remote url in repository at %s" % sm.abspath)
+ raise ValueError(
+ "Didn't find any remote url in repository at %s" % sm.abspath
+ )
# END verify we have url
url = urls[0]
else:
# clone new repo
- kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {'n': no_checkout}
+ kwargs: Dict[str, Union[bool, int, str, Sequence[TBD]]] = {"n": no_checkout}
if not branch_is_default:
- kwargs['b'] = br.name
+ kwargs["b"] = br.name
# END setup checkout-branch
if depth:
if isinstance(depth, int):
- kwargs['depth'] = depth
+ kwargs["depth"] = depth
else:
raise ValueError("depth should be an integer")
if clone_multi_options:
- kwargs['multi_options'] = clone_multi_options
+ kwargs["multi_options"] = clone_multi_options
# _clone_repo(cls, repo, url, path, name, **kwargs):
mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs)
@@ -460,13 +522,13 @@ class Submodule(IndexObject, TraversableIterableObj):
writer: Union[GitConfigParser, SectionConstraint]
with sm.repo.config_writer() as writer:
- writer.set_value(sm_section(name), 'url', url)
+ writer.set_value(sm_section(name), "url", url)
# update configuration and index
index = sm.repo.index
with sm.config_writer(index=index, write=False) as writer:
- writer.set_value('url', url)
- writer.set_value('path', path)
+ writer.set_value("url", url)
+ writer.set_value("path", path)
sm._url = url
if not branch_is_default:
@@ -481,10 +543,18 @@ class Submodule(IndexObject, TraversableIterableObj):
return sm
- def update(self, recursive: bool = False, init: bool = True, to_latest_revision: bool = False,
- progress: Union['UpdateProgress', None] = None, dry_run: bool = False,
- force: bool = False, keep_going: bool = False, env: Union[Mapping[str, str], None] = None,
- clone_multi_options: Union[Sequence[TBD], None] = None) -> 'Submodule':
+ def update(
+ self,
+ recursive: bool = False,
+ init: bool = True,
+ to_latest_revision: bool = False,
+ progress: Union["UpdateProgress", None] = None,
+ dry_run: bool = False,
+ force: bool = False,
+ keep_going: bool = False,
+ env: Union[Mapping[str, str], None] = None,
+ clone_multi_options: Union[Sequence[TBD], None] = None,
+ ) -> "Submodule":
"""Update the repository of this submodule to point to the checkout
we point at with the binsha of this instance.
@@ -527,7 +597,7 @@ class Submodule(IndexObject, TraversableIterableObj):
if progress is None:
progress = UpdateProgress()
# END handle progress
- prefix = ''
+ prefix = ""
if dry_run:
prefix = "DRY-RUN: "
# END handle prefix
@@ -550,17 +620,27 @@ class Submodule(IndexObject, TraversableIterableObj):
op |= BEGIN
# END handle start
- progress.update(op, i, len_rmts, prefix + "Fetching remote %s of submodule %r"
- % (remote, self.name))
- #===============================
+ progress.update(
+ op,
+ i,
+ len_rmts,
+ prefix
+ + "Fetching remote %s of submodule %r" % (remote, self.name),
+ )
+ # ===============================
if not dry_run:
remote.fetch(progress=progress)
# END handle dry-run
- #===============================
+ # ===============================
if i == len_rmts - 1:
op |= END
# END handle end
- progress.update(op, i, len_rmts, prefix + "Done fetching remote of submodule %r" % self.name)
+ progress.update(
+ op,
+ i,
+ len_rmts,
+ prefix + "Done fetching remote of submodule %r" % self.name,
+ )
# END fetch new data
except InvalidGitRepositoryError:
mrepo = None
@@ -574,27 +654,49 @@ class Submodule(IndexObject, TraversableIterableObj):
try:
os.rmdir(checkout_module_abspath)
except OSError as e:
- raise OSError("Module directory at %r does already exist and is non-empty"
- % checkout_module_abspath) from e
+ raise OSError(
+ "Module directory at %r does already exist and is non-empty"
+ % checkout_module_abspath
+ ) from e
# END handle OSError
# END handle directory removal
# don't check it out at first - nonetheless it will create a local
# branch according to the remote-HEAD if possible
- progress.update(BEGIN | CLONE, 0, 1, prefix + "Cloning url '%s' to '%s' in submodule %r" %
- (self.url, checkout_module_abspath, self.name))
+ progress.update(
+ BEGIN | CLONE,
+ 0,
+ 1,
+ prefix
+ + "Cloning url '%s' to '%s' in submodule %r"
+ % (self.url, checkout_module_abspath, self.name),
+ )
if not dry_run:
- mrepo = self._clone_repo(self.repo, self.url, self.path, self.name, n=True, env=env,
- multi_options=clone_multi_options)
+ mrepo = self._clone_repo(
+ self.repo,
+ self.url,
+ self.path,
+ self.name,
+ n=True,
+ env=env,
+ multi_options=clone_multi_options,
+ )
# END handle dry-run
- progress.update(END | CLONE, 0, 1, prefix + "Done cloning to %s" % checkout_module_abspath)
+ progress.update(
+ END | CLONE,
+ 0,
+ 1,
+ prefix + "Done cloning to %s" % checkout_module_abspath,
+ )
if not dry_run:
# see whether we have a valid branch to checkout
try:
- mrepo = cast('Repo', mrepo)
+ mrepo = cast("Repo", mrepo)
# find a remote which has our branch - we try to be flexible
- remote_branch = find_first_remote_branch(mrepo.remotes, self.branch_name)
+ remote_branch = find_first_remote_branch(
+ mrepo.remotes, self.branch_name
+ )
local_branch = mkhead(mrepo, self.branch_path)
# have a valid branch, but no checkout - make sure we can figure
@@ -603,10 +705,15 @@ class Submodule(IndexObject, TraversableIterableObj):
# END initial checkout + branch creation
# make sure HEAD is not detached
- mrepo.head.set_reference(local_branch, logmsg="submodule: attaching head to %s" % local_branch)
+ mrepo.head.set_reference(
+ local_branch,
+ logmsg="submodule: attaching head to %s" % local_branch,
+ )
mrepo.head.reference.set_tracking_branch(remote_branch)
except (IndexError, InvalidGitRepositoryError):
- log.warning("Failed to checkout tracking branch %s", self.branch_path)
+ log.warning(
+ "Failed to checkout tracking branch %s", self.branch_path
+ )
# END handle tracking branch
# NOTE: Have to write the repo config file as well, otherwise
@@ -614,7 +721,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# Maybe this is a good way to assure it doesn't get into our way, but
# we want to stay backwards compatible too ... . Its so redundant !
with self.repo.config_writer() as writer:
- writer.set_value(sm_section(self.name), 'url', self.url)
+ writer.set_value(sm_section(self.name), "url", self.url)
# END handle dry_run
# END handle initialization
@@ -628,7 +735,10 @@ class Submodule(IndexObject, TraversableIterableObj):
# END handle dry_run
if mrepo is not None and to_latest_revision:
- msg_base = "Cannot update to latest revision in repository at %r as " % mrepo.working_dir
+ msg_base = (
+ "Cannot update to latest revision in repository at %r as "
+ % mrepo.working_dir
+ )
if not is_detached:
rref = mrepo.head.reference.tracking_branch()
if rref is not None:
@@ -636,8 +746,11 @@ class Submodule(IndexObject, TraversableIterableObj):
binsha = rcommit.binsha
hexsha = rcommit.hexsha
else:
- log.error("%s a tracking branch was not set for local branch '%s'",
- msg_base, mrepo.head.reference)
+ log.error(
+ "%s a tracking branch was not set for local branch '%s'",
+ msg_base,
+ mrepo.head.reference,
+ )
# END handle remote ref
else:
log.error("%s there was no local tracking branch", msg_base)
@@ -654,28 +767,47 @@ class Submodule(IndexObject, TraversableIterableObj):
may_reset = True
if mrepo.head.commit.binsha != self.NULL_BIN_SHA:
base_commit = mrepo.merge_base(mrepo.head.commit, hexsha)
- if len(base_commit) == 0 or (base_commit[0] is not None and base_commit[0].hexsha == hexsha):
+ if len(base_commit) == 0 or (
+ base_commit[0] is not None and base_commit[0].hexsha == hexsha
+ ):
if force:
msg = "Will force checkout or reset on local branch that is possibly in the future of"
msg += "the commit it will be checked out to, effectively 'forgetting' new commits"
log.debug(msg)
else:
msg = "Skipping %s on branch '%s' of submodule repo '%s' as it contains un-pushed commits"
- msg %= (is_detached and "checkout" or "reset", mrepo.head, mrepo)
+ msg %= (
+ is_detached and "checkout" or "reset",
+ mrepo.head,
+ mrepo,
+ )
log.info(msg)
may_reset = False
# end handle force
# end handle if we are in the future
- if may_reset and not force and mrepo.is_dirty(index=True, working_tree=True, untracked_files=True):
- raise RepositoryDirtyError(mrepo, "Cannot reset a dirty repository")
+ if (
+ may_reset
+ and not force
+ and mrepo.is_dirty(
+ index=True, working_tree=True, untracked_files=True
+ )
+ ):
+ raise RepositoryDirtyError(
+ mrepo, "Cannot reset a dirty repository"
+ )
# end handle force and dirty state
# end handle empty repo
# end verify future/past
- progress.update(BEGIN | UPDWKTREE, 0, 1, prefix +
- "Updating working tree at %s for submodule %r to revision %s"
- % (self.path, self.name, hexsha))
+ progress.update(
+ BEGIN | UPDWKTREE,
+ 0,
+ 1,
+ prefix
+ + "Updating working tree at %s for submodule %r to revision %s"
+ % (self.path, self.name, hexsha),
+ )
if not dry_run and may_reset:
if is_detached:
@@ -688,8 +820,12 @@ class Submodule(IndexObject, TraversableIterableObj):
mrepo.head.reset(hexsha, index=True, working_tree=True)
# END handle checkout
# if we may reset/checkout
- progress.update(END | UPDWKTREE, 0, 1, prefix + "Done updating working tree for submodule %r"
- % self.name)
+ progress.update(
+ END | UPDWKTREE,
+ 0,
+ 1,
+ prefix + "Done updating working tree for submodule %r" % self.name,
+ )
# END update to new commit only if needed
except Exception as err:
if not keep_going:
@@ -703,8 +839,15 @@ class Submodule(IndexObject, TraversableIterableObj):
# in dry_run mode, the module might not exist
if mrepo is not None:
for submodule in self.iter_items(self.module()):
- submodule.update(recursive, init, to_latest_revision, progress=progress, dry_run=dry_run,
- force=force, keep_going=keep_going)
+ submodule.update(
+ recursive,
+ init,
+ to_latest_revision,
+ progress=progress,
+ dry_run=dry_run,
+ force=force,
+ keep_going=keep_going,
+ )
# END handle recursive update
# END handle dry run
# END for each submodule
@@ -712,7 +855,9 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
@unbare_repo
- def move(self, module_path: PathLike, configuration: bool = True, module: bool = True) -> 'Submodule':
+ def move(
+ self, module_path: PathLike, configuration: bool = True, module: bool = True
+ ) -> "Submodule":
"""Move the submodule to a another module path. This involves physically moving
the repository at our current path, changing the configuration, as well as
adjusting our index entry accordingly.
@@ -732,7 +877,9 @@ class Submodule(IndexObject, TraversableIterableObj):
in an inconsistent state if a sub - step fails for some reason
"""
if module + configuration < 1:
- raise ValueError("You must specify to move at least the module or the configuration of the submodule")
+ raise ValueError(
+ "You must specify to move at least the module or the configuration of the submodule"
+ )
# END handle input
module_checkout_path = self._to_relative_path(self.repo, module_path)
@@ -742,9 +889,13 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
# END handle no change
- module_checkout_abspath = join_path_native(str(self.repo.working_tree_dir), module_checkout_path)
+ module_checkout_abspath = join_path_native(
+ str(self.repo.working_tree_dir), module_checkout_path
+ )
if osp.isfile(module_checkout_abspath):
- raise ValueError("Cannot move repository onto a file: %s" % module_checkout_abspath)
+ raise ValueError(
+ "Cannot move repository onto a file: %s" % module_checkout_abspath
+ )
# END handle target files
index = self.repo.index
@@ -780,9 +931,11 @@ class Submodule(IndexObject, TraversableIterableObj):
os.renames(cur_path, module_checkout_abspath)
renamed_module = True
- if osp.isfile(osp.join(module_checkout_abspath, '.git')):
+ if osp.isfile(osp.join(module_checkout_abspath, ".git")):
module_abspath = self._module_abspath(self.repo, self.path, self.name)
- self._write_git_file_and_module_config(module_checkout_abspath, module_abspath)
+ self._write_git_file_and_module_config(
+ module_checkout_abspath, module_abspath
+ )
# end handle git file rewrite
# END move physical module
@@ -794,16 +947,20 @@ class Submodule(IndexObject, TraversableIterableObj):
try:
ekey = index.entry_key(self.path, 0)
entry = index.entries[ekey]
- del(index.entries[ekey])
- nentry = git.IndexEntry(entry[:3] + (module_checkout_path,) + entry[4:])
+ del index.entries[ekey]
+ nentry = git.IndexEntry(
+ entry[:3] + (module_checkout_path,) + entry[4:]
+ )
index.entries[tekey] = nentry
except KeyError as e:
- raise InvalidGitRepositoryError("Submodule's entry at %r did not exist" % (self.path)) from e
+ raise InvalidGitRepositoryError(
+ "Submodule's entry at %r did not exist" % (self.path)
+ ) from e
# END handle submodule doesn't exist
# update configuration
- with self.config_writer(index=index) as writer: # auto-write
- writer.set_value('path', module_checkout_path)
+ with self.config_writer(index=index) as writer: # auto-write
+ writer.set_value("path", module_checkout_path)
self.path = module_checkout_path
# END handle configuration flag
except Exception:
@@ -821,8 +978,13 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
@unbare_repo
- def remove(self, module: bool = True, force: bool = False,
- configuration: bool = True, dry_run: bool = False) -> 'Submodule':
+ def remove(
+ self,
+ module: bool = True,
+ force: bool = False,
+ configuration: bool = True,
+ dry_run: bool = False,
+ ) -> "Submodule":
"""Remove this submodule from the repository. This will remove our entry
from the .gitmodules file and the entry in the .git / config file.
@@ -850,7 +1012,9 @@ class Submodule(IndexObject, TraversableIterableObj):
:raise InvalidGitRepositoryError: thrown if the repository cannot be deleted
:raise OSError: if directories or files could not be removed"""
if not (module or configuration):
- raise ValueError("Need to specify to delete at least the module, or the configuration")
+ raise ValueError(
+ "Need to specify to delete at least the module, or the configuration"
+ )
# END handle parameters
# Recursively remove children of this submodule
@@ -858,12 +1022,14 @@ class Submodule(IndexObject, TraversableIterableObj):
for csm in self.children():
nc += 1
csm.remove(module, force, configuration, dry_run)
- del(csm)
+ del csm
# end
if configuration and not dry_run and nc > 0:
# Assure we don't leave the parent repository in a dirty state, and commit our changes
# It's important for recursive, unforced, deletions to work as expected
- self.module().index.commit("Removed at least one of child-modules of '%s'" % self.name)
+ self.module().index.commit(
+ "Removed at least one of child-modules of '%s'" % self.name
+ )
# end handle recursion
# DELETE REPOSITORY WORKING TREE
@@ -882,7 +1048,9 @@ class Submodule(IndexObject, TraversableIterableObj):
elif osp.isdir(mp):
method = rmtree
elif osp.exists(mp):
- raise AssertionError("Cannot forcibly delete repository as it was neither a link, nor a directory")
+ raise AssertionError(
+ "Cannot forcibly delete repository as it was neither a link, nor a directory"
+ )
# END handle brutal deletion
if not dry_run:
assert method
@@ -893,7 +1061,8 @@ class Submodule(IndexObject, TraversableIterableObj):
if mod.is_dirty(index=True, working_tree=True, untracked_files=True):
raise InvalidGitRepositoryError(
"Cannot delete module at %s with any modifications, unless force is specified"
- % mod.working_tree_dir)
+ % mod.working_tree_dir
+ )
# END check for dirt
# figure out whether we have new commits compared to the remotes
@@ -910,30 +1079,36 @@ class Submodule(IndexObject, TraversableIterableObj):
# not a single remote branch contained all our commits
if len(rrefs) and num_branches_with_new_commits == len(rrefs):
raise InvalidGitRepositoryError(
- "Cannot delete module at %s as there are new commits" % mod.working_tree_dir)
+ "Cannot delete module at %s as there are new commits"
+ % mod.working_tree_dir
+ )
# END handle new commits
# have to manually delete references as python's scoping is
# not existing, they could keep handles open ( on windows this is a problem )
if len(rrefs):
- del(rref) # skipcq: PYL-W0631
+ del rref # skipcq: PYL-W0631
# END handle remotes
- del(rrefs)
- del(remote)
+ del rrefs
+ del remote
# END for each remote
# finally delete our own submodule
if not dry_run:
self._clear_cache()
wtd = mod.working_tree_dir
- del(mod) # release file-handles (windows)
+ del mod # release file-handles (windows)
import gc
+
gc.collect()
try:
rmtree(str(wtd))
except Exception as ex:
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
+
+ raise SkipTest(
+ "FIXME: fails with: PermissionError\n {}".format(ex)
+ ) from ex
raise
# END delete tree if possible
# END handle force
@@ -945,7 +1120,10 @@ class Submodule(IndexObject, TraversableIterableObj):
except Exception as ex:
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex
+
+ raise SkipTest(
+ f"FIXME: fails with: PermissionError\n {ex}"
+ ) from ex
else:
raise
# end handle separate bare repository
@@ -961,7 +1139,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# first the index-entry
parent_index = self.repo.index
try:
- del(parent_index.entries[parent_index.entry_key(self.path, 0)])
+ del parent_index.entries[parent_index.entry_key(self.path, 0)]
except KeyError:
pass
# END delete entry
@@ -979,7 +1157,9 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
- def set_parent_commit(self, commit: Union[Commit_ish, None], check: bool = True) -> 'Submodule':
+ def set_parent_commit(
+ self, commit: Union[Commit_ish, None], check: bool = True
+ ) -> "Submodule":
"""Set this instance to use the given commit whose tree is supposed to
contain the .gitmodules blob.
@@ -1000,7 +1180,10 @@ class Submodule(IndexObject, TraversableIterableObj):
pcommit = self.repo.commit(commit)
pctree = pcommit.tree
if self.k_modules_file not in pctree:
- raise ValueError("Tree of commit %s did not contain the %s file" % (commit, self.k_modules_file))
+ raise ValueError(
+ "Tree of commit %s did not contain the %s file"
+ % (commit, self.k_modules_file)
+ )
# END handle exceptions
prev_pc = self._parent_commit
@@ -1010,7 +1193,10 @@ class Submodule(IndexObject, TraversableIterableObj):
parser = self._config_parser(self.repo, self._parent_commit, read_only=True)
if not parser.has_section(sm_section(self.name)):
self._parent_commit = prev_pc
- raise ValueError("Submodule at path %r did not exist in parent commit %s" % (self.path, commit))
+ raise ValueError(
+ "Submodule at path %r did not exist in parent commit %s"
+ % (self.path, commit)
+ )
# END handle submodule did not exist
# END handle checking mode
@@ -1027,8 +1213,9 @@ class Submodule(IndexObject, TraversableIterableObj):
return self
@unbare_repo
- def config_writer(self, index: Union['IndexFile', None] = None, write: bool = True
- ) -> SectionConstraint['SubmoduleConfigParser']:
+ def config_writer(
+ self, index: Union["IndexFile", None] = None, write: bool = True
+ ) -> SectionConstraint["SubmoduleConfigParser"]:
""":return: a config writer instance allowing you to read and write the data
belonging to this submodule into the .gitmodules file.
@@ -1049,7 +1236,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return writer
@unbare_repo
- def rename(self, new_name: str) -> 'Submodule':
+ def rename(self, new_name: str) -> "Submodule":
"""Rename this submodule
:note: This method takes care of renaming the submodule in various places, such as
@@ -1081,7 +1268,9 @@ class Submodule(IndexObject, TraversableIterableObj):
# .git/modules
mod = self.module()
if mod.has_separate_working_tree():
- destination_module_abspath = self._module_abspath(self.repo, self.path, new_name)
+ destination_module_abspath = self._module_abspath(
+ self.repo, self.path, new_name
+ )
source_dir = mod.git_dir
# Let's be sure the submodule name is not so obviously tied to a directory
if str(destination_module_abspath).startswith(str(mod.git_dir)):
@@ -1091,17 +1280,19 @@ class Submodule(IndexObject, TraversableIterableObj):
# end handle self-containment
os.renames(source_dir, destination_module_abspath)
if mod.working_tree_dir:
- self._write_git_file_and_module_config(mod.working_tree_dir, destination_module_abspath)
+ self._write_git_file_and_module_config(
+ mod.working_tree_dir, destination_module_abspath
+ )
# end move separate git repository
return self
- #} END edit interface
+ # } END edit interface
- #{ Query Interface
+ # { Query Interface
@unbare_repo
- def module(self) -> 'Repo':
+ def module(self) -> "Repo":
""":return: Repo instance initialized from the repository at our submodule path
:raise InvalidGitRepositoryError: if a repository was not available. This could
also mean that it was not yet initialized"""
@@ -1113,9 +1304,13 @@ class Submodule(IndexObject, TraversableIterableObj):
return repo
# END handle repo uninitialized
except (InvalidGitRepositoryError, NoSuchPathError) as e:
- raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath) from e
+ raise InvalidGitRepositoryError(
+ "No valid repository at %s" % module_checkout_abspath
+ ) from e
else:
- raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath)
+ raise InvalidGitRepositoryError(
+ "Repository at %r was not yet checked out" % module_checkout_abspath
+ )
# END handle exceptions
def module_exists(self) -> bool:
@@ -1162,7 +1357,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# END handle object state consistency
@property
- def branch(self) -> 'Head':
+ def branch(self) -> "Head":
""":return: The branch instance that we are to checkout
:raise InvalidGitRepositoryError: if our module is not yet checked out"""
return mkhead(self.module(), self._branch_path)
@@ -1187,7 +1382,7 @@ class Submodule(IndexObject, TraversableIterableObj):
return self._url
@property
- def parent_commit(self) -> 'Commit_ish':
+ def parent_commit(self) -> "Commit_ish":
""":return: Commit instance with the tree containing the .gitmodules file
:note: will always point to the current head's commit if it was not set explicitly"""
if self._parent_commit is None:
@@ -1215,22 +1410,27 @@ class Submodule(IndexObject, TraversableIterableObj):
:raise IOError: If the .gitmodules file/blob could not be read"""
return self._config_parser_constrained(read_only=True)
- def children(self) -> IterableList['Submodule']:
+ def children(self) -> IterableList["Submodule"]:
"""
:return: IterableList(Submodule, ...) an iterable list of submodules instances
which are children of this submodule or 0 if the submodule is not checked out"""
return self._get_intermediate_items(self)
- #} END query interface
+ # } END query interface
- #{ Iterable Interface
+ # { Iterable Interface
@classmethod
- def iter_items(cls, repo: 'Repo', parent_commit: Union[Commit_ish, str] = 'HEAD', *Args: Any, **kwargs: Any
- ) -> Iterator['Submodule']:
+ def iter_items(
+ cls,
+ repo: "Repo",
+ parent_commit: Union[Commit_ish, str] = "HEAD",
+ *Args: Any,
+ **kwargs: Any,
+ ) -> Iterator["Submodule"]:
""":return: iterator yielding Submodule instances available in the given repository"""
try:
- pc = repo.commit(parent_commit) # parent commit instance
+ pc = repo.commit(parent_commit) # parent commit instance
parser = cls._config_parser(repo, pc, read_only=True)
except (IOError, BadName):
return iter([])
@@ -1238,8 +1438,8 @@ class Submodule(IndexObject, TraversableIterableObj):
for sms in parser.sections():
n = sm_name(sms)
- p = parser.get(sms, 'path')
- u = parser.get(sms, 'url')
+ p = parser.get(sms, "path")
+ u = parser.get(sms, "url")
b = cls.k_head_default
if parser.has_option(sms, cls.k_head_option):
b = str(parser.get(sms, cls.k_head_option))
@@ -1248,7 +1448,7 @@ class Submodule(IndexObject, TraversableIterableObj):
# get the binsha
index = repo.index
try:
- rt = pc.tree # root tree
+ rt = pc.tree # root tree
sm = rt[p]
except KeyError:
# try the index, maybe it was just added
@@ -1273,4 +1473,4 @@ class Submodule(IndexObject, TraversableIterableObj):
yield sm
# END for each section
- #} END iterable interface
+ # } END iterable interface
diff --git a/git/objects/submodule/root.py b/git/objects/submodule/root.py
index 08e1f954..16f0f91f 100644
--- a/git/objects/submodule/root.py
+++ b/git/objects/submodule/root.py
@@ -1,7 +1,4 @@
-from .base import (
- Submodule,
- UpdateProgress
-)
+from .base import Submodule, UpdateProgress
from .util import find_first_remote_branch
from git.exc import InvalidGitRepositoryError
import git
@@ -22,14 +19,17 @@ if TYPE_CHECKING:
__all__ = ["RootModule", "RootUpdateProgress"]
-log = logging.getLogger('git.objects.submodule.root')
+log = logging.getLogger("git.objects.submodule.root")
log.addHandler(logging.NullHandler())
class RootUpdateProgress(UpdateProgress):
"""Utility class which adds more opcodes to the UpdateProgress"""
+
REMOVE, PATHCHANGE, BRANCHCHANGE, URLCHANGE = [
- 1 << x for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4)]
+ 1 << x
+ for x in range(UpdateProgress._num_op_codes, UpdateProgress._num_op_codes + 4)
+ ]
_num_op_codes = UpdateProgress._num_op_codes + 4
__slots__ = ()
@@ -50,32 +50,39 @@ class RootModule(Submodule):
__slots__ = ()
- k_root_name = '__ROOT__'
+ k_root_name = "__ROOT__"
- def __init__(self, repo: 'Repo'):
+ def __init__(self, repo: "Repo"):
# repo, binsha, mode=None, path=None, name = None, parent_commit=None, url=None, ref=None)
super(RootModule, self).__init__(
repo,
binsha=self.NULL_BIN_SHA,
mode=self.k_default_mode,
- path='',
+ path="",
name=self.k_root_name,
parent_commit=repo.head.commit,
- url='',
- branch_path=git.Head.to_full_path(self.k_head_default)
+ url="",
+ branch_path=git.Head.to_full_path(self.k_head_default),
)
def _clear_cache(self) -> None:
"""May not do anything"""
pass
- #{ Interface
-
- def update(self, previous_commit: Union[Commit_ish, None] = None, # type: ignore[override]
- recursive: bool = True, force_remove: bool = False, init: bool = True,
- to_latest_revision: bool = False, progress: Union[None, 'RootUpdateProgress'] = None,
- dry_run: bool = False, force_reset: bool = False, keep_going: bool = False
- ) -> 'RootModule':
+ # { Interface
+
+ def update(
+ self,
+ previous_commit: Union[Commit_ish, None] = None, # type: ignore[override]
+ recursive: bool = True,
+ force_remove: bool = False,
+ init: bool = True,
+ to_latest_revision: bool = False,
+ progress: Union[None, "RootUpdateProgress"] = None,
+ dry_run: bool = False,
+ force_reset: bool = False,
+ keep_going: bool = False,
+ ) -> "RootModule":
"""Update the submodules of this repository to the current HEAD commit.
This method behaves smartly by determining changes of the path of a submodules
repository, next to changes to the to-be-checked-out commit or the branch to be
@@ -109,16 +116,18 @@ class RootModule(Submodule):
In conjunction with dry_run, it can be useful to anticipate all errors when updating submodules
:return: self"""
if self.repo.bare:
- raise InvalidGitRepositoryError("Cannot update submodules in bare repositories")
+ raise InvalidGitRepositoryError(
+ "Cannot update submodules in bare repositories"
+ )
# END handle bare
if progress is None:
progress = RootUpdateProgress()
# END assure progress is set
- prefix = ''
+ prefix = ""
if dry_run:
- prefix = 'DRY-RUN: '
+ prefix = "DRY-RUN: "
repo = self.repo
@@ -137,17 +146,19 @@ class RootModule(Submodule):
previous_commit = cur_commit
# END exception handling
else:
- previous_commit = repo.commit(previous_commit) # obtain commit object
+ previous_commit = repo.commit(previous_commit) # obtain commit object
# END handle previous commit
- psms: 'IterableList[Submodule]' = self.list_items(repo, parent_commit=previous_commit)
- sms: 'IterableList[Submodule]' = self.list_items(repo)
+ psms: "IterableList[Submodule]" = self.list_items(
+ repo, parent_commit=previous_commit
+ )
+ sms: "IterableList[Submodule]" = self.list_items(repo)
spsms = set(psms)
ssms = set(sms)
# HANDLE REMOVALS
###################
- rrsm = (spsms - ssms)
+ rrsm = spsms - ssms
len_rrsm = len(rrsm)
for i, rsm in enumerate(rrsm):
@@ -158,37 +169,58 @@ class RootModule(Submodule):
# fake it into thinking its at the current commit to allow deletion
# of previous module. Trigger the cache to be updated before that
- progress.update(op, i, len_rrsm, prefix + "Removing submodule %r at %s" % (rsm.name, rsm.abspath))
+ progress.update(
+ op,
+ i,
+ len_rrsm,
+ prefix + "Removing submodule %r at %s" % (rsm.name, rsm.abspath),
+ )
rsm._parent_commit = repo.head.commit
- rsm.remove(configuration=False, module=True, force=force_remove, dry_run=dry_run)
+ rsm.remove(
+ configuration=False,
+ module=True,
+ force=force_remove,
+ dry_run=dry_run,
+ )
if i == len_rrsm - 1:
op |= END
# END handle end
- progress.update(op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name)
+ progress.update(
+ op, i, len_rrsm, prefix + "Done removing submodule %r" % rsm.name
+ )
# END for each removed submodule
# HANDLE PATH RENAMES
#####################
# url changes + branch changes
- csms = (spsms & ssms)
+ csms = spsms & ssms
len_csms = len(csms)
for i, csm in enumerate(csms):
- psm: 'Submodule' = psms[csm.name]
- sm: 'Submodule' = sms[csm.name]
+ psm: "Submodule" = psms[csm.name]
+ sm: "Submodule" = sms[csm.name]
# PATH CHANGES
##############
if sm.path != psm.path and psm.module_exists():
- progress.update(BEGIN | PATHCHANGE, i, len_csms, prefix +
- "Moving repository of submodule %r from %s to %s"
- % (sm.name, psm.abspath, sm.abspath))
+ progress.update(
+ BEGIN | PATHCHANGE,
+ i,
+ len_csms,
+ prefix
+ + "Moving repository of submodule %r from %s to %s"
+ % (sm.name, psm.abspath, sm.abspath),
+ )
# move the module to the new path
if not dry_run:
psm.move(sm.path, module=True, configuration=False)
# END handle dry_run
progress.update(
- END | PATHCHANGE, i, len_csms, prefix + "Done moving repository of submodule %r" % sm.name)
+ END | PATHCHANGE,
+ i,
+ len_csms,
+ prefix + "Done moving repository of submodule %r" % sm.name,
+ )
# END handle path changes
if sm.module_exists():
@@ -198,14 +230,20 @@ class RootModule(Submodule):
# Add the new remote, remove the old one
# This way, if the url just changes, the commits will not
# have to be re-retrieved
- nn = '__new_origin__'
+ nn = "__new_origin__"
smm = sm.module()
rmts = smm.remotes
# don't do anything if we already have the url we search in place
if len([r for r in rmts if r.url == sm.url]) == 0:
- progress.update(BEGIN | URLCHANGE, i, len_csms, prefix +
- "Changing url of submodule %r from %s to %s" % (sm.name, psm.url, sm.url))
+ progress.update(
+ BEGIN | URLCHANGE,
+ i,
+ len_csms,
+ prefix
+ + "Changing url of submodule %r from %s to %s"
+ % (sm.name, psm.url, sm.url),
+ )
if not dry_run:
assert nn not in [r.name for r in rmts]
@@ -214,7 +252,16 @@ class RootModule(Submodule):
# If we have a tracking branch, it should be available
# in the new remote as well.
- if len([r for r in smr.refs if r.remote_head == sm.branch_name]) == 0:
+ if (
+ len(
+ [
+ r
+ for r in smr.refs
+ if r.remote_head == sm.branch_name
+ ]
+ )
+ == 0
+ ):
raise ValueError(
"Submodule branch named %r was not available in new submodule remote at %r"
% (sm.branch_name, sm.url)
@@ -242,7 +289,9 @@ class RootModule(Submodule):
# Alternatively we could just generate a unique name and leave all
# existing ones in place
raise InvalidGitRepositoryError(
- "Couldn't find original remote-repo at url %r" % psm.url)
+ "Couldn't find original remote-repo at url %r"
+ % psm.url
+ )
# END handle one single remote
# END handle check we found a remote
@@ -277,15 +326,23 @@ class RootModule(Submodule):
# this way, it will be checked out in the next step
# This will change the submodule relative to us, so
# the user will be able to commit the change easily
- log.warning("Current sha %s was not contained in the tracking\
- branch at the new remote, setting it the the remote's tracking branch", sm.hexsha)
+ log.warning(
+ "Current sha %s was not contained in the tracking\
+ branch at the new remote, setting it the the remote's tracking branch",
+ sm.hexsha,
+ )
sm.binsha = rref.commit.binsha
# END reset binsha
# NOTE: All checkout is performed by the base implementation of update
# END handle dry_run
progress.update(
- END | URLCHANGE, i, len_csms, prefix + "Done adjusting url of submodule %r" % (sm.name))
+ END | URLCHANGE,
+ i,
+ len_csms,
+ prefix
+ + "Done adjusting url of submodule %r" % (sm.name),
+ )
# END skip remote handling if new url already exists in module
# END handle url
@@ -294,9 +351,14 @@ class RootModule(Submodule):
if sm.branch_path != psm.branch_path:
# finally, create a new tracking branch which tracks the
# new remote branch
- progress.update(BEGIN | BRANCHCHANGE, i, len_csms, prefix +
- "Changing branch of submodule %r from %s to %s"
- % (sm.name, psm.branch_path, sm.branch_path))
+ progress.update(
+ BEGIN | BRANCHCHANGE,
+ i,
+ len_csms,
+ prefix
+ + "Changing branch of submodule %r from %s to %s"
+ % (sm.name, psm.branch_path, sm.branch_path),
+ )
if not dry_run:
smm = sm.module()
smmr = smm.remotes
@@ -306,13 +368,19 @@ class RootModule(Submodule):
# end for each remote
try:
- tbr = git.Head.create(smm, sm.branch_name, logmsg='branch: Created from HEAD')
+ tbr = git.Head.create(
+ smm,
+ sm.branch_name,
+ logmsg="branch: Created from HEAD",
+ )
except OSError:
# ... or reuse the existing one
tbr = git.Head(smm, sm.branch_path)
# END assure tracking branch exists
- tbr.set_tracking_branch(find_first_remote_branch(smmr, sm.branch_name))
+ tbr.set_tracking_branch(
+ find_first_remote_branch(smmr, sm.branch_name)
+ )
# NOTE: All head-resetting is done in the base implementation of update
# but we will have to checkout the new branch here. As it still points to the currently
# checkout out commit, we don't do any harm.
@@ -321,7 +389,11 @@ class RootModule(Submodule):
# END handle dry_run
progress.update(
- END | BRANCHCHANGE, i, len_csms, prefix + "Done changing branch of submodule %r" % sm.name)
+ END | BRANCHCHANGE,
+ i,
+ len_csms,
+ prefix + "Done changing branch of submodule %r" % sm.name,
+ )
# END handle branch
# END handle
# END for each common submodule
@@ -335,8 +407,15 @@ class RootModule(Submodule):
######################################
for sm in sms:
# update the submodule using the default method
- sm.update(recursive=False, init=init, to_latest_revision=to_latest_revision,
- progress=progress, dry_run=dry_run, force=force_reset, keep_going=keep_going)
+ sm.update(
+ recursive=False,
+ init=init,
+ to_latest_revision=to_latest_revision,
+ progress=progress,
+ dry_run=dry_run,
+ force=force_reset,
+ keep_going=keep_going,
+ )
# update recursively depth first - question is which inconsistent
# state will be better in case it fails somewhere. Defective branch
@@ -345,18 +424,27 @@ class RootModule(Submodule):
if recursive:
# the module would exist by now if we are not in dry_run mode
if sm.module_exists():
- type(self)(sm.module()).update(recursive=True, force_remove=force_remove,
- init=init, to_latest_revision=to_latest_revision,
- progress=progress, dry_run=dry_run, force_reset=force_reset,
- keep_going=keep_going)
+ type(self)(sm.module()).update(
+ recursive=True,
+ force_remove=force_remove,
+ init=init,
+ to_latest_revision=to_latest_revision,
+ progress=progress,
+ dry_run=dry_run,
+ force_reset=force_reset,
+ keep_going=keep_going,
+ )
# END handle dry_run
# END handle recursive
# END for each submodule to update
return self
- def module(self) -> 'Repo':
+ def module(self) -> "Repo":
""":return: the actual repository containing the submodules"""
return self.repo
- #} END interface
-#} END classes
+
+ # } END interface
+
+
+# } END classes
diff --git a/git/objects/submodule/util.py b/git/objects/submodule/util.py
index cc1cd60a..456ae34b 100644
--- a/git/objects/submodule/util.py
+++ b/git/objects/submodule/util.py
@@ -20,10 +20,15 @@ if TYPE_CHECKING:
from git.refs import RemoteReference
-__all__ = ('sm_section', 'sm_name', 'mkhead', 'find_first_remote_branch',
- 'SubmoduleConfigParser')
+__all__ = (
+ "sm_section",
+ "sm_name",
+ "mkhead",
+ "find_first_remote_branch",
+ "SubmoduleConfigParser",
+)
-#{ Utilities
+# { Utilities
def sm_section(name: str) -> str:
@@ -37,12 +42,14 @@ def sm_name(section: str) -> str:
return section[11:-1]
-def mkhead(repo: 'Repo', path: PathLike) -> 'Head':
+def mkhead(repo: "Repo", path: PathLike) -> "Head":
""":return: New branch/head instance"""
return git.Head(repo, git.Head.to_full_path(path))
-def find_first_remote_branch(remotes: Sequence['Remote'], branch_name: str) -> 'RemoteReference':
+def find_first_remote_branch(
+ remotes: Sequence["Remote"], branch_name: str
+) -> "RemoteReference":
"""Find the remote branch matching the name of the given branch or raise InvalidGitRepositoryError"""
for remote in remotes:
try:
@@ -51,12 +58,16 @@ def find_first_remote_branch(remotes: Sequence['Remote'], branch_name: str) -> '
continue
# END exception handling
# END for remote
- raise InvalidGitRepositoryError("Didn't find remote branch '%r' in any of the given remotes" % branch_name)
+ raise InvalidGitRepositoryError(
+ "Didn't find remote branch '%r' in any of the given remotes" % branch_name
+ )
-#} END utilities
+# } END utilities
+
+
+# { Classes
-#{ Classes
class SubmoduleConfigParser(GitConfigParser):
@@ -70,13 +81,13 @@ class SubmoduleConfigParser(GitConfigParser):
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
- self._smref: Union['ReferenceType[Submodule]', None] = None
+ self._smref: Union["ReferenceType[Submodule]", None] = None
self._index = None
self._auto_write = True
super(SubmoduleConfigParser, self).__init__(*args, **kwargs)
- #{ Interface
- def set_submodule(self, submodule: 'Submodule') -> None:
+ # { Interface
+ def set_submodule(self, submodule: "Submodule") -> None:
"""Set this instance's submodule. It must be called before
the first write operation begins"""
self._smref = weakref.ref(submodule)
@@ -97,14 +108,15 @@ class SubmoduleConfigParser(GitConfigParser):
sm._clear_cache()
# END handle weakref
- #} END interface
+ # } END interface
- #{ Overridden Methods
+ # { Overridden Methods
def write(self) -> None: # type: ignore[override]
rval: None = super(SubmoduleConfigParser, self).write()
self.flush_to_index()
return rval
+
# END overridden methods
-#} END classes
+# } END classes
diff --git a/git/objects/tag.py b/git/objects/tag.py
index 7048eb40..3956a89e 100644
--- a/git/objects/tag.py
+++ b/git/objects/tag.py
@@ -20,23 +20,34 @@ if TYPE_CHECKING:
from .blob import Blob
from .tree import Tree
-__all__ = ("TagObject", )
+__all__ = ("TagObject",)
class TagObject(base.Object):
"""Non-Lightweight tag carrying additional information about an object we are pointing to."""
- type: Literal['tag'] = "tag"
- __slots__ = ("object", "tag", "tagger", "tagged_date", "tagger_tz_offset", "message")
- def __init__(self, repo: 'Repo', binsha: bytes,
- object: Union[None, base.Object] = None,
- tag: Union[None, str] = None,
- tagger: Union[None, 'Actor'] = None,
- tagged_date: Union[int, None] = None,
- tagger_tz_offset: Union[int, None] = None,
- message: Union[str, None] = None
- ) -> None: # @ReservedAssignment
+ type: Literal["tag"] = "tag"
+ __slots__ = (
+ "object",
+ "tag",
+ "tagger",
+ "tagged_date",
+ "tagger_tz_offset",
+ "message",
+ )
+
+ def __init__(
+ self,
+ repo: "Repo",
+ binsha: bytes,
+ object: Union[None, base.Object] = None,
+ tag: Union[None, str] = None,
+ tagger: Union[None, "Actor"] = None,
+ tagged_date: Union[int, None] = None,
+ tagger_tz_offset: Union[int, None] = None,
+ message: Union[str, None] = None,
+ ) -> None: # @ReservedAssignment
"""Initialize a tag object with additional data
:param repo: repository this object is located in
@@ -51,7 +62,7 @@ class TagObject(base.Object):
authored_date is in, in a format similar to time.altzone"""
super(TagObject, self).__init__(repo, binsha)
if object is not None:
- self.object: Union['Commit', 'Blob', 'Tree', 'TagObject'] = object
+ self.object: Union["Commit", "Blob", "Tree", "TagObject"] = object
if tag is not None:
self.tag = tag
if tagger is not None:
@@ -67,19 +78,22 @@ class TagObject(base.Object):
"""Cache all our attributes at once"""
if attr in TagObject.__slots__:
ostream = self.repo.odb.stream(self.binsha)
- lines: List[str] = ostream.read().decode(defenc, 'replace').splitlines()
+ lines: List[str] = ostream.read().decode(defenc, "replace").splitlines()
_obj, hexsha = lines[0].split(" ")
_type_token, type_name = lines[1].split(" ")
- object_type = get_object_type_by_name(type_name.encode('ascii'))
- self.object = \
- object_type(self.repo, hex_to_bin(hexsha))
+ object_type = get_object_type_by_name(type_name.encode("ascii"))
+ self.object = object_type(self.repo, hex_to_bin(hexsha))
self.tag = lines[2][4:] # tag <tag name>
if len(lines) > 3:
tagger_info = lines[3] # tagger <actor> <date>
- self.tagger, self.tagged_date, self.tagger_tz_offset = parse_actor_and_date(tagger_info)
+ (
+ self.tagger,
+ self.tagged_date,
+ self.tagger_tz_offset,
+ ) = parse_actor_and_date(tagger_info)
# line 4 empty - it could mark the beginning of the next header
# in case there really is no message, it would not exist. Otherwise
@@ -87,7 +101,7 @@ class TagObject(base.Object):
if len(lines) > 5:
self.message = "\n".join(lines[5:])
else:
- self.message = ''
+ self.message = ""
# END check our attributes
else:
super(TagObject, self)._set_cache_(attr)
diff --git a/git/objects/tree.py b/git/objects/tree.py
index 22531895..e1fcced7 100644
--- a/git/objects/tree.py
+++ b/git/objects/tree.py
@@ -13,16 +13,24 @@ from .base import IndexObject, IndexObjUnion
from .blob import Blob
from .submodule.base import Submodule
-from .fun import (
- tree_entries_from_data,
- tree_to_stream
-)
+from .fun import tree_entries_from_data, tree_to_stream
# typing -------------------------------------------------
-from typing import (Any, Callable, Dict, Iterable, Iterator, List,
- Tuple, Type, Union, cast, TYPE_CHECKING)
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Tuple,
+ Type,
+ Union,
+ cast,
+ TYPE_CHECKING,
+)
from git.types import PathLike, Literal
@@ -32,14 +40,15 @@ if TYPE_CHECKING:
TreeCacheTup = Tuple[bytes, int, str]
-TraversedTreeTup = Union[Tuple[Union['Tree', None], IndexObjUnion,
- Tuple['Submodule', 'Submodule']]]
+TraversedTreeTup = Union[
+ Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]
+]
# def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]:
# return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
-#--------------------------------------------------------
+# --------------------------------------------------------
cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
@@ -60,8 +69,9 @@ def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int:
return len_a - len_b
-def merge_sort(a: List[TreeCacheTup],
- cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None:
+def merge_sort(
+ a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]
+) -> None:
if len(a) < 2:
return None
@@ -102,7 +112,8 @@ class TreeModifier(object):
Once all adjustments are complete, the _cache, which really is a reference to
the cache of a tree, will be sorted. Assuring it will be in a serializable state"""
- __slots__ = '_cache'
+
+ __slots__ = "_cache"
def __init__(self, cache: List[TreeCacheTup]) -> None:
self._cache = cache
@@ -116,18 +127,21 @@ class TreeModifier(object):
# END for each item in cache
return -1
- #{ Interface
- def set_done(self) -> 'TreeModifier':
+ # { Interface
+ def set_done(self) -> "TreeModifier":
"""Call this method once you are done modifying the tree information.
It may be called several times, but be aware that each call will cause
a sort operation
:return self:"""
merge_sort(self._cache, git_cmp)
return self
- #} END interface
- #{ Mutators
- def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> 'TreeModifier':
+ # } END interface
+
+ # { Mutators
+ def add(
+ self, sha: bytes, mode: int, name: str, force: bool = False
+ ) -> "TreeModifier":
"""Add the given item to the tree. If an item with the given name already
exists, nothing will be done, but a ValueError will be raised if the
sha and mode of the existing item do not match the one you add, unless
@@ -138,7 +152,7 @@ class TreeModifier(object):
:param force: If True, an item with your name and information will overwrite
any existing item with the same name, no matter which information it has
:return: self"""
- if '/' in name:
+ if "/" in name:
raise ValueError("Name must not contain '/' characters")
if (mode >> 12) not in Tree._map_id_to_type:
raise ValueError("Invalid object type according to mode %o" % mode)
@@ -168,7 +182,11 @@ class TreeModifier(object):
puts the caller into responsibility to assure the input is correct.
For more information on the parameters, see ``add``
:param binsha: 20 byte binary sha"""
- assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
+ assert (
+ isinstance(binsha, bytes)
+ and isinstance(mode, int)
+ and isinstance(name, str)
+ )
tree_cache = (binsha, mode, name)
self._cache.append(tree_cache)
@@ -177,9 +195,9 @@ class TreeModifier(object):
"""Deletes an item with the given name if it exists"""
index = self._index_by_name(name)
if index > -1:
- del(self._cache[index])
+ del self._cache[index]
- #} END mutators
+ # } END mutators
class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
@@ -195,11 +213,11 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
blob = tree[0]
"""
- type: Literal['tree'] = "tree"
+ type: Literal["tree"] = "tree"
__slots__ = "_cache"
# actual integer ids for comparison
- commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link
+ commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link
blob_id = 0o10
symlink_id = 0o12
tree_id = 0o04
@@ -211,12 +229,20 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
# tree id added once Tree is defined
}
- def __init__(self, repo: 'Repo', binsha: bytes, mode: int = tree_id << 12, path: Union[PathLike, None] = None):
+ def __init__(
+ self,
+ repo: "Repo",
+ binsha: bytes,
+ mode: int = tree_id << 12,
+ path: Union[PathLike, None] = None,
+ ):
super(Tree, self).__init__(repo, binsha, mode, path)
- @ classmethod
- def _get_intermediate_items(cls, index_object: IndexObjUnion,
- ) -> Union[Tuple['Tree', ...], Tuple[()]]:
+ @classmethod
+ def _get_intermediate_items(
+ cls,
+ index_object: IndexObjUnion,
+ ) -> Union[Tuple["Tree", ...], Tuple[()]]:
if index_object.type == "tree":
return tuple(index_object._iter_convert_to_object(index_object._cache))
return ()
@@ -230,8 +256,9 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
super(Tree, self)._set_cache_(attr)
# END handle attribute
- def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]
- ) -> Iterator[IndexObjUnion]:
+ def _iter_convert_to_object(
+ self, iterable: Iterable[TreeCacheTup]
+ ) -> Iterator[IndexObjUnion]:
"""Iterable yields tuples of (binsha, mode, name), which will be converted
to the respective object representation"""
for binsha, mode, name in iterable:
@@ -239,7 +266,9 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
try:
yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
except KeyError as e:
- raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
+ raise TypeError(
+ "Unknown mode %o found in tree data for path '%s'" % (mode, path)
+ ) from e
# END for each item
def join(self, file: str) -> IndexObjUnion:
@@ -248,13 +277,13 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
:raise KeyError: if given file or tree does not exist in tree"""
msg = "Blob or Tree named %r not found"
- if '/' in file:
+ if "/" in file:
tree = self
item = self
- tokens = file.split('/')
+ tokens = file.split("/")
for i, token in enumerate(tokens):
item = tree[token]
- if item.type == 'tree':
+ if item.type == "tree":
tree = item
else:
# safety assertion - blobs are at the end of the path
@@ -268,9 +297,10 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
return item
else:
for info in self._cache:
- if info[2] == file: # [2] == name
- return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1],
- join_path(self.path, info[2]))
+ if info[2] == file: # [2] == name
+ return self._map_id_to_type[info[1] >> 12](
+ self.repo, info[0], info[1], join_path(self.path, info[2])
+ )
# END for each obj
raise KeyError(msg % file)
# END handle long paths
@@ -279,17 +309,17 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
"""For PY3 only"""
return self.join(file)
- @ property
- def trees(self) -> List['Tree']:
+ @property
+ def trees(self) -> List["Tree"]:
""":return: list(Tree, ...) list of trees directly below this tree"""
return [i for i in self if i.type == "tree"]
- @ property
+ @property
def blobs(self) -> List[Blob]:
""":return: list(Blob, ...) list of blobs directly below this tree"""
return [i for i in self if i.type == "blob"]
- @ property
+ @property
def cache(self) -> TreeModifier:
"""
:return: An object allowing to modify the internal cache. This can be used
@@ -298,16 +328,20 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
See the ``TreeModifier`` for more information on how to alter the cache"""
return TreeModifier(self._cache)
- def traverse(self, # type: ignore[override]
- predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
- prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
- depth: int = -1,
- branch_first: bool = True,
- visit_once: bool = False,
- ignore_self: int = 1,
- as_edge: bool = False
- ) -> Union[Iterator[IndexObjUnion],
- Iterator[TraversedTreeTup]]:
+ def traverse(
+ self, # type: ignore[override]
+ predicate: Callable[
+ [Union[IndexObjUnion, TraversedTreeTup], int], bool
+ ] = lambda i, d: True,
+ prune: Callable[
+ [Union[IndexObjUnion, TraversedTreeTup], int], bool
+ ] = lambda i, d: False,
+ depth: int = -1,
+ branch_first: bool = True,
+ visit_once: bool = False,
+ ignore_self: int = 1,
+ as_edge: bool = False,
+ ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
"""For documentation, see util.Traversable._traverse()
Trees are set to visit_once = False to gain more performance in the traversal"""
@@ -321,9 +355,17 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
# ret_tup = itertools.tee(ret, 2)
# assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
# return ret_tup[0]"""
- return cast(Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
- super(Tree, self)._traverse(predicate, prune, depth, # type: ignore
- branch_first, visit_once, ignore_self))
+ return cast(
+ Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
+ super(Tree, self)._traverse(
+ predicate,
+ prune,
+ depth, # type: ignore
+ branch_first,
+ visit_once,
+ ignore_self,
+ ),
+ )
def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
"""
@@ -331,7 +373,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
traverse()
Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']]
"""
- return super(Tree, self)._list_traverse(* args, **kwargs)
+ return super(Tree, self)._list_traverse(*args, **kwargs)
# List protocol
@@ -347,7 +389,9 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
if isinstance(item, int):
info = self._cache[item]
- return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
+ return self._map_id_to_type[info[1] >> 12](
+ self.repo, info[0], info[1], join_path(self.path, info[2])
+ )
if isinstance(item, str):
# compatibility
@@ -378,7 +422,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
def __reversed__(self) -> Iterator[IndexObjUnion]:
return reversed(self._iter_convert_to_object(self._cache)) # type: ignore
- def _serialize(self, stream: 'BytesIO') -> 'Tree':
+ def _serialize(self, stream: "BytesIO") -> "Tree":
"""Serialize this tree into the stream. Please note that we will assume
our tree data to be in a sorted state. If this is not the case, serialization
will not generate a correct tree representation as these are assumed to be sorted
@@ -386,7 +430,7 @@ class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
tree_to_stream(self._cache, stream.write)
return self
- def _deserialize(self, stream: 'BytesIO') -> 'Tree':
+ def _deserialize(self, stream: "BytesIO") -> "Tree":
self._cache = tree_entries_from_data(stream.read())
return self
diff --git a/git/objects/util.py b/git/objects/util.py
index 800eccdf..4ba59c8a 100644
--- a/git/objects/util.py
+++ b/git/objects/util.py
@@ -7,11 +7,7 @@
from abc import ABC, abstractmethod
import warnings
-from git.util import (
- IterableList,
- IterableObj,
- Actor
-)
+from git.util import IterableList, IterableObj, Actor
import re
from collections import deque
@@ -22,10 +18,24 @@ import calendar
from datetime import datetime, timedelta, tzinfo
# typing ------------------------------------------------------------
-from typing import (Any, Callable, Deque, Iterator, Generic, NamedTuple, overload, Sequence, # NOQA: F401
- TYPE_CHECKING, Tuple, Type, TypeVar, Union, cast)
+from typing import (
+ Any,
+ Callable,
+ Deque,
+ Iterator,
+ Generic,
+ NamedTuple,
+ overload,
+ Sequence, # NOQA: F401
+ TYPE_CHECKING,
+ Tuple,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+)
-from git.types import Has_id_attribute, Literal, _T # NOQA: F401
+from git.types import Has_id_attribute, Literal, _T # NOQA: F401
if TYPE_CHECKING:
from io import BytesIO, StringIO
@@ -46,24 +56,38 @@ else:
class TraverseNT(NamedTuple):
depth: int
- item: Union['Traversable', 'Blob']
- src: Union['Traversable', None]
+ item: Union["Traversable", "Blob"]
+ src: Union["Traversable", None]
-T_TIobj = TypeVar('T_TIobj', bound='TraversableIterableObj') # for TraversableIterableObj.traverse()
+T_TIobj = TypeVar(
+ "T_TIobj", bound="TraversableIterableObj"
+) # for TraversableIterableObj.traverse()
-TraversedTup = Union[Tuple[Union['Traversable', None], 'Traversable'], # for commit, submodule
- 'TraversedTreeTup'] # for tree.traverse()
+TraversedTup = Union[
+ Tuple[Union["Traversable", None], "Traversable"], # for commit, submodule
+ "TraversedTreeTup",
+] # for tree.traverse()
# --------------------------------------------------------------------
-__all__ = ('get_object_type_by_name', 'parse_date', 'parse_actor_and_date',
- 'ProcessStreamAdapter', 'Traversable', 'altz_to_utctz_str', 'utctz_to_altz',
- 'verify_utctz', 'Actor', 'tzoffset', 'utc')
+__all__ = (
+ "get_object_type_by_name",
+ "parse_date",
+ "parse_actor_and_date",
+ "ProcessStreamAdapter",
+ "Traversable",
+ "altz_to_utctz_str",
+ "utctz_to_altz",
+ "verify_utctz",
+ "Actor",
+ "tzoffset",
+ "utc",
+)
ZERO = timedelta(0)
-#{ Functions
+# { Functions
def mode_str_to_int(modestr: Union[bytes, str]) -> int:
@@ -82,8 +106,9 @@ def mode_str_to_int(modestr: Union[bytes, str]) -> int:
return mode
-def get_object_type_by_name(object_type_name: bytes
- ) -> Union[Type['Commit'], Type['TagObject'], Type['Tree'], Type['Blob']]:
+def get_object_type_by_name(
+ object_type_name: bytes,
+) -> Union[Type["Commit"], Type["TagObject"], Type["Tree"], Type["Blob"]]:
"""
:return: type suitable to handle the given object type name.
Use the type to create new instances.
@@ -93,18 +118,24 @@ def get_object_type_by_name(object_type_name: bytes
:raise ValueError: In case object_type_name is unknown"""
if object_type_name == b"commit":
from . import commit
+
return commit.Commit
elif object_type_name == b"tag":
from . import tag
+
return tag.TagObject
elif object_type_name == b"blob":
from . import blob
+
return blob.Blob
elif object_type_name == b"tree":
from . import tree
+
return tree.Tree
else:
- raise ValueError("Cannot handle unknown object type: %s" % object_type_name.decode())
+ raise ValueError(
+ "Cannot handle unknown object type: %s" % object_type_name.decode()
+ )
def utctz_to_altz(utctz: str) -> int:
@@ -121,7 +152,7 @@ def altz_to_utctz_str(altz: float) -> str:
utci = -1 * int((float(altz) / 3600) * 100)
utcs = str(abs(utci))
utcs = "0" * (4 - len(utcs)) + utcs
- prefix = (utci < 0 and '-') or '+'
+ prefix = (utci < 0 and "-") or "+"
return prefix + utcs
@@ -133,22 +164,23 @@ def verify_utctz(offset: str) -> str:
raise fmt_exc
if offset[0] not in "+-":
raise fmt_exc
- if offset[1] not in digits or\
- offset[2] not in digits or\
- offset[3] not in digits or\
- offset[4] not in digits:
+ if (
+ offset[1] not in digits
+ or offset[2] not in digits
+ or offset[3] not in digits
+ or offset[4] not in digits
+ ):
raise fmt_exc
# END for each char
return offset
class tzoffset(tzinfo):
-
def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> None:
self._offset = timedelta(seconds=-secs_west_of_utc)
- self._name = name or 'fixed'
+ self._name = name or "fixed"
- def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]:
+ def __reduce__(self) -> Tuple[Type["tzoffset"], Tuple[float, str]]:
return tzoffset, (-self._offset.total_seconds(), self._name)
def utcoffset(self, dt: Union[datetime, None]) -> timedelta:
@@ -161,7 +193,7 @@ class tzoffset(tzinfo):
return ZERO
-utc = tzoffset(0, 'UTC')
+utc = tzoffset(0, "UTC")
def from_timestamp(timestamp: float, tz_offset: float) -> datetime:
@@ -190,23 +222,27 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
"""
if isinstance(string_date, datetime):
if string_date.tzinfo:
- utcoffset = cast(timedelta, string_date.utcoffset()) # typeguard, if tzinfoand is not None
+ utcoffset = cast(
+ timedelta, string_date.utcoffset()
+ ) # typeguard, if tzinfoand is not None
offset = -int(utcoffset.total_seconds())
return int(string_date.astimezone(utc).timestamp()), offset
else:
- raise ValueError(f"string_date datetime object without tzinfo, {string_date}")
+ raise ValueError(
+ f"string_date datetime object without tzinfo, {string_date}"
+ )
# git time
try:
- if string_date.count(' ') == 1 and string_date.rfind(':') == -1:
+ if string_date.count(" ") == 1 and string_date.rfind(":") == -1:
timestamp, offset_str = string_date.split()
- if timestamp.startswith('@'):
+ if timestamp.startswith("@"):
timestamp = timestamp[1:]
timestamp_int = int(timestamp)
return timestamp_int, utctz_to_altz(verify_utctz(offset_str))
else:
- offset_str = "+0000" # local time by default
- if string_date[-5] in '-+':
+ offset_str = "+0000" # local time by default
+ if string_date[-5] in "-+":
offset_str = verify_utctz(string_date[-5:])
string_date = string_date[:-6] # skip space as well
# END split timezone info
@@ -215,9 +251,9 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
# now figure out the date and time portion - split time
date_formats = []
splitter = -1
- if ',' in string_date:
+ if "," in string_date:
date_formats.append("%a, %d %b %Y")
- splitter = string_date.rfind(' ')
+ splitter = string_date.rfind(" ")
else:
# iso plus additional
date_formats.append("%Y-%m-%d")
@@ -225,16 +261,16 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
date_formats.append("%m/%d/%Y")
date_formats.append("%d.%m.%Y")
- splitter = string_date.rfind('T')
+ splitter = string_date.rfind("T")
if splitter == -1:
- splitter = string_date.rfind(' ')
+ splitter = string_date.rfind(" ")
# END handle 'T' and ' '
# END handle rfc or iso
assert splitter > -1
# split date and time
- time_part = string_date[splitter + 1:] # skip space
+ time_part = string_date[splitter + 1 :] # skip space
date_part = string_date[:splitter]
# parse time
@@ -243,9 +279,19 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
for fmt in date_formats:
try:
dtstruct = time.strptime(date_part, fmt)
- utctime = calendar.timegm((dtstruct.tm_year, dtstruct.tm_mon, dtstruct.tm_mday,
- tstruct.tm_hour, tstruct.tm_min, tstruct.tm_sec,
- dtstruct.tm_wday, dtstruct.tm_yday, tstruct.tm_isdst))
+ utctime = calendar.timegm(
+ (
+ dtstruct.tm_year,
+ dtstruct.tm_mon,
+ dtstruct.tm_mday,
+ tstruct.tm_hour,
+ tstruct.tm_min,
+ tstruct.tm_sec,
+ dtstruct.tm_wday,
+ dtstruct.tm_yday,
+ tstruct.tm_isdst,
+ )
+ )
return int(utctime), offset
except ValueError:
continue
@@ -256,13 +302,15 @@ def parse_date(string_date: Union[str, datetime]) -> Tuple[int, int]:
raise ValueError("no format matched")
# END handle format
except Exception as e:
- raise ValueError(f"Unsupported date format or type: {string_date}, type={type(string_date)}") from e
+ raise ValueError(
+ f"Unsupported date format or type: {string_date}, type={type(string_date)}"
+ ) from e
# END handle exceptions
# precompiled regex
-_re_actor_epoch = re.compile(r'^.+? (.*) (\d+) ([+-]\d+).*$')
-_re_only_actor = re.compile(r'^.+? (.*)$')
+_re_actor_epoch = re.compile(r"^.+? (.*) (\d+) ([+-]\d+).*$")
+_re_only_actor = re.compile(r"^.+? (.*)$")
def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]:
@@ -271,19 +319,21 @@ def parse_actor_and_date(line: str) -> Tuple[Actor, int, int]:
author Tom Preston-Werner <tom@mojombo.com> 1191999972 -0700
:return: [Actor, int_seconds_since_epoch, int_timezone_offset]"""
- actor, epoch, offset = '', '0', '0'
+ actor, epoch, offset = "", "0", "0"
m = _re_actor_epoch.search(line)
if m:
actor, epoch, offset = m.groups()
else:
m = _re_only_actor.search(line)
- actor = m.group(1) if m else line or ''
+ actor = m.group(1) if m else line or ""
return (Actor._from_string(actor), int(epoch), utctz_to_altz(offset))
-#} END functions
+
+# } END functions
-#{ Classes
+# { Classes
+
class ProcessStreamAdapter(object):
@@ -292,9 +342,10 @@ class ProcessStreamAdapter(object):
Use this type to hide the underlying process to provide access only to a specified
stream. The process is usually wrapped into an AutoInterrupt class to kill
it if the instance goes out of scope."""
+
__slots__ = ("_proc", "_stream")
- def __init__(self, process: 'Popen', stream_name: str) -> None:
+ def __init__(self, process: "Popen", stream_name: str) -> None:
self._proc = process
self._stream: StringIO = getattr(process, stream_name) # guessed type
@@ -312,11 +363,12 @@ class Traversable(Protocol):
Defined subclasses = [Commit, Tree, SubModule]
"""
+
__slots__ = ()
@classmethod
@abstractmethod
- def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']:
+ def _get_intermediate_items(cls, item: Any) -> Sequence["Traversable"]:
"""
Returns:
Tuple of items connected to the given item.
@@ -331,15 +383,18 @@ class Traversable(Protocol):
@abstractmethod
def list_traverse(self, *args: Any, **kwargs: Any) -> Any:
""" """
- warnings.warn("list_traverse() method should only be called from subclasses."
- "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20"
- "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit",
- DeprecationWarning,
- stacklevel=2)
+ warnings.warn(
+ "list_traverse() method should only be called from subclasses."
+ "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20"
+ "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit",
+ DeprecationWarning,
+ stacklevel=2,
+ )
return self._list_traverse(*args, **kwargs)
- def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any
- ) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]:
+ def _list_traverse(
+ self, as_edge: bool = False, *args: Any, **kwargs: Any
+ ) -> IterableList[Union["Commit", "Submodule", "Tree", "Blob"]]:
"""
:return: IterableList with the results of the traversal as produced by
traverse()
@@ -352,11 +407,13 @@ class Traversable(Protocol):
if isinstance(self, Has_id_attribute):
id = self._id_attribute_
else:
- id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_
+ id = "" # shouldn't reach here, unless Traversable subclass created with no _id_attribute_
# could add _id_attribute_ to Traversable, or make all Traversable also Iterable?
if not as_edge:
- out: IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']] = IterableList(id)
+ out: IterableList[
+ Union["Commit", "Submodule", "Tree", "Blob"]
+ ] = IterableList(id)
out.extend(self.traverse(as_edge=as_edge, *args, **kwargs))
return out
# overloads in subclasses (mypy doesn't allow typing self: subclass)
@@ -366,23 +423,32 @@ class Traversable(Protocol):
out_list: IterableList = IterableList(self.traverse(*args, **kwargs))
return out_list
- @ abstractmethod
+ @abstractmethod
def traverse(self, *args: Any, **kwargs: Any) -> Any:
""" """
- warnings.warn("traverse() method should only be called from subclasses."
- "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20"
- "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit",
- DeprecationWarning,
- stacklevel=2)
+ warnings.warn(
+ "traverse() method should only be called from subclasses."
+ "Calling from Traversable abstract class will raise NotImplementedError in 3.1.20"
+ "Builtin sublclasses are 'Submodule', 'Tree' and 'Commit",
+ DeprecationWarning,
+ stacklevel=2,
+ )
return self._traverse(*args, **kwargs)
- def _traverse(self,
- predicate: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: True,
- prune: Callable[[Union['Traversable', 'Blob', TraversedTup], int], bool] = lambda i, d: False,
- depth: int = -1, branch_first: bool = True, visit_once: bool = True,
- ignore_self: int = 1, as_edge: bool = False
- ) -> Union[Iterator[Union['Traversable', 'Blob']],
- Iterator[TraversedTup]]:
+ def _traverse(
+ self,
+ predicate: Callable[
+ [Union["Traversable", "Blob", TraversedTup], int], bool
+ ] = lambda i, d: True,
+ prune: Callable[
+ [Union["Traversable", "Blob", TraversedTup], int], bool
+ ] = lambda i, d: False,
+ depth: int = -1,
+ branch_first: bool = True,
+ visit_once: bool = True,
+ ignore_self: int = 1,
+ as_edge: bool = False,
+ ) -> Union[Iterator[Union["Traversable", "Blob"]], Iterator[TraversedTup]]:
""":return: iterator yielding of items found when traversing self
:param predicate: f(i,d) returns False if item i at depth d should not be included in the result
@@ -426,24 +492,30 @@ class Traversable(Protocol):
visited = set()
stack: Deque[TraverseNT] = deque()
- stack.append(TraverseNT(0, self, None)) # self is always depth level 0
-
- def addToStack(stack: Deque[TraverseNT],
- src_item: 'Traversable',
- branch_first: bool,
- depth: int) -> None:
+ stack.append(TraverseNT(0, self, None)) # self is always depth level 0
+
+ def addToStack(
+ stack: Deque[TraverseNT],
+ src_item: "Traversable",
+ branch_first: bool,
+ depth: int,
+ ) -> None:
lst = self._get_intermediate_items(item)
- if not lst: # empty list
+ if not lst: # empty list
return None
if branch_first:
stack.extendleft(TraverseNT(depth, i, src_item) for i in lst)
else:
- reviter = (TraverseNT(depth, lst[i], src_item) for i in range(len(lst) - 1, -1, -1))
+ reviter = (
+ TraverseNT(depth, lst[i], src_item)
+ for i in range(len(lst) - 1, -1, -1)
+ )
stack.extend(reviter)
+
# END addToStack local method
while stack:
- d, item, src = stack.pop() # depth of item, item, item_source
+ d, item, src = stack.pop() # depth of item, item, item_source
if visit_once and item in visited:
continue
@@ -451,8 +523,10 @@ class Traversable(Protocol):
if visit_once:
visited.add(item)
- rval: Union[TraversedTup, 'Traversable', 'Blob']
- if as_edge: # if as_edge return (src, item) unless rrc is None (e.g. for first item)
+ rval: Union[TraversedTup, "Traversable", "Blob"]
+ if (
+ as_edge
+ ): # if as_edge return (src, item) unless rrc is None (e.g. for first item)
rval = (src, item)
else:
rval = item
@@ -473,14 +547,15 @@ class Traversable(Protocol):
# END for each item on work stack
-@ runtime_checkable
+@runtime_checkable
class Serializable(Protocol):
"""Defines methods to serialize and deserialize objects from and into a data stream"""
+
__slots__ = ()
# @abstractmethod
- def _serialize(self, stream: 'BytesIO') -> 'Serializable':
+ def _serialize(self, stream: "BytesIO") -> "Serializable":
"""Serialize the data of this object into the given data stream
:note: a serialized object would ``_deserialize`` into the same object
:param stream: a file-like object
@@ -488,7 +563,7 @@ class Serializable(Protocol):
raise NotImplementedError("To be implemented in subclass")
# @abstractmethod
- def _deserialize(self, stream: 'BytesIO') -> 'Serializable':
+ def _deserialize(self, stream: "BytesIO") -> "Serializable":
"""Deserialize all information regarding this object from the stream
:param stream: a file-like object
:return: self"""
@@ -500,54 +575,76 @@ class TraversableIterableObj(IterableObj, Traversable):
TIobj_tuple = Tuple[Union[T_TIobj, None], T_TIobj]
- def list_traverse(self: T_TIobj, *args: Any, **kwargs: Any) -> IterableList[T_TIobj]:
- return super(TraversableIterableObj, self)._list_traverse(* args, **kwargs)
+ def list_traverse(
+ self: T_TIobj, *args: Any, **kwargs: Any
+ ) -> IterableList[T_TIobj]:
+ return super(TraversableIterableObj, self)._list_traverse(*args, **kwargs)
- @ overload # type: ignore
- def traverse(self: T_TIobj
- ) -> Iterator[T_TIobj]:
+ @overload # type: ignore
+ def traverse(self: T_TIobj) -> Iterator[T_TIobj]:
...
- @ overload
- def traverse(self: T_TIobj,
- predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
- prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
- depth: int, branch_first: bool, visit_once: bool,
- ignore_self: Literal[True],
- as_edge: Literal[False],
- ) -> Iterator[T_TIobj]:
+ @overload
+ def traverse(
+ self: T_TIobj,
+ predicate: Callable[
+ [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
+ ],
+ prune: Callable[
+ [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
+ ],
+ depth: int,
+ branch_first: bool,
+ visit_once: bool,
+ ignore_self: Literal[True],
+ as_edge: Literal[False],
+ ) -> Iterator[T_TIobj]:
...
- @ overload
- def traverse(self: T_TIobj,
- predicate: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
- prune: Callable[[Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool],
- depth: int, branch_first: bool, visit_once: bool,
- ignore_self: Literal[False],
- as_edge: Literal[True],
- ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]:
+ @overload
+ def traverse(
+ self: T_TIobj,
+ predicate: Callable[
+ [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
+ ],
+ prune: Callable[
+ [Union[T_TIobj, Tuple[Union[T_TIobj, None], T_TIobj]], int], bool
+ ],
+ depth: int,
+ branch_first: bool,
+ visit_once: bool,
+ ignore_self: Literal[False],
+ as_edge: Literal[True],
+ ) -> Iterator[Tuple[Union[T_TIobj, None], T_TIobj]]:
...
- @ overload
- def traverse(self: T_TIobj,
- predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
- prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
- depth: int, branch_first: bool, visit_once: bool,
- ignore_self: Literal[True],
- as_edge: Literal[True],
- ) -> Iterator[Tuple[T_TIobj, T_TIobj]]:
+ @overload
+ def traverse(
+ self: T_TIobj,
+ predicate: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
+ prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool],
+ depth: int,
+ branch_first: bool,
+ visit_once: bool,
+ ignore_self: Literal[True],
+ as_edge: Literal[True],
+ ) -> Iterator[Tuple[T_TIobj, T_TIobj]]:
...
- def traverse(self: T_TIobj,
- predicate: Callable[[Union[T_TIobj, TIobj_tuple], int],
- bool] = lambda i, d: True,
- prune: Callable[[Union[T_TIobj, TIobj_tuple], int],
- bool] = lambda i, d: False,
- depth: int = -1, branch_first: bool = True, visit_once: bool = True,
- ignore_self: int = 1, as_edge: bool = False
- ) -> Union[Iterator[T_TIobj],
- Iterator[Tuple[T_TIobj, T_TIobj]],
- Iterator[TIobj_tuple]]:
+ def traverse(
+ self: T_TIobj,
+ predicate: Callable[
+ [Union[T_TIobj, TIobj_tuple], int], bool
+ ] = lambda i, d: True,
+ prune: Callable[[Union[T_TIobj, TIobj_tuple], int], bool] = lambda i, d: False,
+ depth: int = -1,
+ branch_first: bool = True,
+ visit_once: bool = True,
+ ignore_self: int = 1,
+ as_edge: bool = False,
+ ) -> Union[
+ Iterator[T_TIobj], Iterator[Tuple[T_TIobj, T_TIobj]], Iterator[TIobj_tuple]
+ ]:
"""For documentation, see util.Traversable._traverse()"""
"""
@@ -566,8 +663,9 @@ class TraversableIterableObj(IterableObj, Traversable):
assert is_commit_traversed(ret_tup), f"{[type(x) for x in list(ret_tup[0])]}"
return ret_tup[0]
"""
- return cast(Union[Iterator[T_TIobj],
- Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
- super(TraversableIterableObj, self)._traverse(
- predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore
- ))
+ return cast(
+ Union[Iterator[T_TIobj], Iterator[Tuple[Union[None, T_TIobj], T_TIobj]]],
+ super(TraversableIterableObj, self)._traverse(
+ predicate, prune, depth, branch_first, visit_once, ignore_self, as_edge # type: ignore
+ ),
+ )