summaryrefslogtreecommitdiff
path: root/git/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'git/util.py')
-rw-r--r--git/util.py87
1 files changed, 22 insertions, 65 deletions
diff --git a/git/util.py b/git/util.py
index edc8750d..11139156 100644
--- a/git/util.py
+++ b/git/util.py
@@ -70,9 +70,7 @@ from .types import (
Has_id_attribute,
)
-T_IterableObj = TypeVar(
- "T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True
-)
+T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True)
# So IterableList[Head] is subtype of IterableList[IterableObj]
# ---------------------------------------------------------------------
@@ -125,9 +123,7 @@ log = logging.getLogger(__name__)
#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
#: till then, we wish to hide them.
HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True)
-HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get(
- "HIDE_WINDOWS_FREEZE_ERRORS", True
-)
+HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True)
# { Utility Methods
@@ -143,9 +139,7 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T:
if self.repo.bare:
- raise InvalidGitRepositoryError(
- "Method '%s' cannot operate on bare repositories" % func.__name__
- )
+ raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
# END bare method
return func(self, *args, **kwargs)
@@ -180,9 +174,7 @@ def rmtree(path: PathLike) -> None:
if HIDE_WINDOWS_KNOWN_ERRORS:
from unittest import SkipTest
- raise SkipTest(
- "FIXME: fails with: PermissionError\n {}".format(ex)
- ) from ex
+ raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex
raise
return shutil.rmtree(path, False, onerror)
@@ -196,9 +188,7 @@ def rmfile(path: PathLike) -> None:
os.remove(path)
-def stream_copy(
- source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024
-) -> int:
+def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
"""Copy all data from the source stream into the destination stream in chunks
of size chunk_size
@@ -278,11 +268,7 @@ def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
def _get_exe_extensions() -> Sequence[str]:
PATHEXT = os.environ.get("PATHEXT", None)
return (
- tuple(p.upper() for p in PATHEXT.split(os.pathsep))
- if PATHEXT
- else (".BAT", "COM", ".EXE")
- if is_win
- else ("")
+ tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("")
)
@@ -294,11 +280,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
return (
osp.isfile(fpath)
and os.access(fpath, os.X_OK)
- and (
- os.name != "nt"
- or not winprog_exts
- or any(fpath.upper().endswith(ext) for ext in winprog_exts)
- )
+ and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts))
)
progs = []
@@ -338,10 +320,7 @@ _cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = (
# and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
(
re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
- (
- lambda server, share, rest_path: "//%s/%s/%s"
- % (server, share, rest_path.replace("\\", "/"))
- ),
+ (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))),
False,
),
(re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False),
@@ -416,9 +395,7 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
# Just a name given, not a real path.
uname_cmd = osp.join(git_dir, "uname")
- process = subprocess.Popen(
- [uname_cmd], stdout=subprocess.PIPE, universal_newlines=True
- )
+ process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True)
uname_out, _ = process.communicate()
# retcode = process.poll()
is_cygwin = "CYGWIN" in uname_out
@@ -434,9 +411,7 @@ def get_user_id() -> str:
return "%s@%s" % (getpass.getuser(), platform.node())
-def finalize_process(
- proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any
-) -> None:
+def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None:
"""Wait for the process (clone, fetch, pull or push) and handle its errors accordingly"""
# TODO: No close proc-streams??
proc.wait(**kwargs)
@@ -453,9 +428,7 @@ def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
...
-def expand_path(
- p: Union[None, PathLike], expand_vars: bool = True
-) -> Optional[PathLike]:
+def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]:
if isinstance(p, pathlib.Path):
return p.resolve()
try:
@@ -808,9 +781,7 @@ class Actor(object):
return actor
@classmethod
- def committer(
- cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None
- ) -> "Actor":
+ def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
"""
:return: Actor instance corresponding to the configured committer. It behaves
similar to the git implementation, such that the environment will override
@@ -818,14 +789,10 @@ class Actor(object):
generated
:param config_reader: ConfigReader to use to retrieve the values from in case
they are not set in the environment"""
- return cls._main_actor(
- cls.env_committer_name, cls.env_committer_email, config_reader
- )
+ return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
@classmethod
- def author(
- cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None
- ) -> "Actor":
+ def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
"""Same as committer(), but defines the main author. It may be specified in the environment,
but defaults to the committer"""
return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
@@ -1038,9 +1005,9 @@ class BlockingLockFile(LockFile):
# readable anymore, raise an exception
curtime = time.time()
if not osp.isdir(osp.dirname(self._lock_file_path())):
- msg = (
- "Directory containing the lockfile %r was not readable anymore after waiting %g seconds"
- % (self._lock_file_path(), curtime - starttime)
+ msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
+ self._lock_file_path(),
+ curtime - starttime,
)
raise IOError(msg) from e
# END handle missing directory
@@ -1115,9 +1082,7 @@ class IterableList(List[T_IterableObj]):
def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore
- assert isinstance(
- index, (int, str, slice)
- ), "Index of IterableList should be an int or str"
+ assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
if isinstance(index, int):
return list.__getitem__(self, index)
@@ -1127,16 +1092,12 @@ class IterableList(List[T_IterableObj]):
try:
return getattr(self, index)
except AttributeError as e:
- raise IndexError(
- "No item found with id %r" % (self._prefix + index)
- ) from e
+ raise IndexError("No item found with id %r" % (self._prefix + index)) from e
# END handle getattr
def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
- assert isinstance(
- index, (int, str)
- ), "Index of IterableList should be an int or str"
+ assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
delindex = cast(int, index)
if not isinstance(index, int):
@@ -1213,9 +1174,7 @@ class IterableObj(Protocol):
_id_attribute_: str
@classmethod
- def list_items(
- cls, repo: "Repo", *args: Any, **kwargs: Any
- ) -> IterableList[T_IterableObj]:
+ def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
"""
Find all items of this type - subclasses can specify args and kwargs differently.
If no args are given, subclasses are obliged to return all items if no additional
@@ -1230,9 +1189,7 @@ class IterableObj(Protocol):
@classmethod
@abstractmethod
- def iter_items(
- cls, repo: "Repo", *args: Any, **kwargs: Any
- ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
+ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]:
# return typed to be compatible with subtypes e.g. Remote
"""For more information about the arguments, see list_items
:return: iterator yielding Items"""