diff options
Diffstat (limited to 'git/util.py')
-rw-r--r-- | git/util.py | 87 |
1 files changed, 22 insertions, 65 deletions
diff --git a/git/util.py b/git/util.py index edc8750d..11139156 100644 --- a/git/util.py +++ b/git/util.py @@ -70,9 +70,7 @@ from .types import ( Has_id_attribute, ) -T_IterableObj = TypeVar( - "T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True -) +T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) # So IterableList[Head] is subtype of IterableList[IterableObj] # --------------------------------------------------------------------- @@ -125,9 +123,7 @@ log = logging.getLogger(__name__) #: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, #: till then, we wish to hide them. HIDE_WINDOWS_KNOWN_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_KNOWN_ERRORS", True) -HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get( - "HIDE_WINDOWS_FREEZE_ERRORS", True -) +HIDE_WINDOWS_FREEZE_ERRORS = is_win and os.environ.get("HIDE_WINDOWS_FREEZE_ERRORS", True) # { Utility Methods @@ -143,9 +139,7 @@ def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: if self.repo.bare: - raise InvalidGitRepositoryError( - "Method '%s' cannot operate on bare repositories" % func.__name__ - ) + raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) # END bare method return func(self, *args, **kwargs) @@ -180,9 +174,7 @@ def rmtree(path: PathLike) -> None: if HIDE_WINDOWS_KNOWN_ERRORS: from unittest import SkipTest - raise SkipTest( - "FIXME: fails with: PermissionError\n {}".format(ex) - ) from ex + raise SkipTest("FIXME: fails with: PermissionError\n {}".format(ex)) from ex raise return shutil.rmtree(path, False, onerror) @@ -196,9 +188,7 @@ def rmfile(path: PathLike) -> None: os.remove(path) -def stream_copy( - source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024 -) -> int: +def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: """Copy all data from the source stream into the destination stream in chunks of size chunk_size @@ -278,11 +268,7 @@ def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: def _get_exe_extensions() -> Sequence[str]: PATHEXT = os.environ.get("PATHEXT", None) return ( - tuple(p.upper() for p in PATHEXT.split(os.pathsep)) - if PATHEXT - else (".BAT", "COM", ".EXE") - if is_win - else ("") + tuple(p.upper() for p in PATHEXT.split(os.pathsep)) if PATHEXT else (".BAT", "COM", ".EXE") if is_win else ("") ) @@ -294,11 +280,7 @@ def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: return ( osp.isfile(fpath) and os.access(fpath, os.X_OK) - and ( - os.name != "nt" - or not winprog_exts - or any(fpath.upper().endswith(ext) for ext in winprog_exts) - ) + and (os.name != "nt" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts)) ) progs = [] @@ -338,10 +320,7 @@ _cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths ( re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), - ( - lambda server, share, rest_path: "//%s/%s/%s" - % (server, share, rest_path.replace("\\", "/")) - ), + (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), False, ), (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), @@ -416,9 +395,7 @@ def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: # Just a name given, not a real path. uname_cmd = osp.join(git_dir, "uname") - process = subprocess.Popen( - [uname_cmd], stdout=subprocess.PIPE, universal_newlines=True - ) + process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) uname_out, _ = process.communicate() # retcode = process.poll() is_cygwin = "CYGWIN" in uname_out @@ -434,9 +411,7 @@ def get_user_id() -> str: return "%s@%s" % (getpass.getuser(), platform.node()) -def finalize_process( - proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any -) -> None: +def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: """Wait for the process (clone, fetch, pull or push) and handle its errors accordingly""" # TODO: No close proc-streams?? proc.wait(**kwargs) @@ -453,9 +428,7 @@ def expand_path(p: PathLike, expand_vars: bool = ...) -> str: ... -def expand_path( - p: Union[None, PathLike], expand_vars: bool = True -) -> Optional[PathLike]: +def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: if isinstance(p, pathlib.Path): return p.resolve() try: @@ -808,9 +781,7 @@ class Actor(object): return actor @classmethod - def committer( - cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None - ) -> "Actor": + def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": """ :return: Actor instance corresponding to the configured committer. It behaves similar to the git implementation, such that the environment will override @@ -818,14 +789,10 @@ class Actor(object): generated :param config_reader: ConfigReader to use to retrieve the values from in case they are not set in the environment""" - return cls._main_actor( - cls.env_committer_name, cls.env_committer_email, config_reader - ) + return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) @classmethod - def author( - cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None - ) -> "Actor": + def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": """Same as committer(), but defines the main author. It may be specified in the environment, but defaults to the committer""" return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) @@ -1038,9 +1005,9 @@ class BlockingLockFile(LockFile): # readable anymore, raise an exception curtime = time.time() if not osp.isdir(osp.dirname(self._lock_file_path())): - msg = ( - "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" - % (self._lock_file_path(), curtime - starttime) + msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( + self._lock_file_path(), + curtime - starttime, ) raise IOError(msg) from e # END handle missing directory @@ -1115,9 +1082,7 @@ class IterableList(List[T_IterableObj]): def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore - assert isinstance( - index, (int, str, slice) - ), "Index of IterableList should be an int or str" + assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" if isinstance(index, int): return list.__getitem__(self, index) @@ -1127,16 +1092,12 @@ class IterableList(List[T_IterableObj]): try: return getattr(self, index) except AttributeError as e: - raise IndexError( - "No item found with id %r" % (self._prefix + index) - ) from e + raise IndexError("No item found with id %r" % (self._prefix + index)) from e # END handle getattr def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: - assert isinstance( - index, (int, str) - ), "Index of IterableList should be an int or str" + assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" delindex = cast(int, index) if not isinstance(index, int): @@ -1213,9 +1174,7 @@ class IterableObj(Protocol): _id_attribute_: str @classmethod - def list_items( - cls, repo: "Repo", *args: Any, **kwargs: Any - ) -> IterableList[T_IterableObj]: + def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: """ Find all items of this type - subclasses can specify args and kwargs differently. If no args are given, subclasses are obliged to return all items if no additional @@ -1230,9 +1189,7 @@ class IterableObj(Protocol): @classmethod @abstractmethod - def iter_items( - cls, repo: "Repo", *args: Any, **kwargs: Any - ) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]: + def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: # Iterator[T_IterableObj]: # return typed to be compatible with subtypes e.g. Remote """For more information about the arguments, see list_items :return: iterator yielding Items""" |