diff options
Diffstat (limited to 'logilab/common/shellutils.py')
-rw-r--r-- | logilab/common/shellutils.py | 119 |
1 files changed, 75 insertions, 44 deletions
diff --git a/logilab/common/shellutils.py b/logilab/common/shellutils.py index 2764723..557e45d 100644 --- a/logilab/common/shellutils.py +++ b/logilab/common/shellutils.py @@ -46,7 +46,6 @@ from logilab.common.deprecation import deprecated class tempdir(object): - def __enter__(self): self.path = tempfile.mkdtemp() return self.path @@ -82,7 +81,8 @@ def chown(path, login=None, group=None): try: uid = int(login) except ValueError: - import pwd # Platforms: Unix + import pwd # Platforms: Unix + uid = pwd.getpwnam(login).pw_uid if group is None: gid = -1 @@ -91,9 +91,11 @@ def chown(path, login=None, group=None): gid = int(group) except ValueError: import grp + gid = grp.getgrnam(group).gr_gid os.chown(path, uid, gid) + def mv(source, destination, _action=shutil.move): """A shell-like mv, supporting wildcards. """ @@ -106,14 +108,14 @@ def mv(source, destination, _action=shutil.move): try: source = sources[0] except IndexError: - raise OSError('No file matching %s' % source) + raise OSError("No file matching %s" % source) if isdir(destination) and exists(destination): destination = join(destination, basename(source)) try: _action(source, destination) except OSError as ex: - raise OSError('Unable to move %r to %r (%s)' % ( - source, destination, ex)) + raise OSError("Unable to move %r to %r (%s)" % (source, destination, ex)) + def rm(*files): """A shell-like rm, supporting wildcards. @@ -127,12 +129,19 @@ def rm(*files): else: os.remove(filename) + def cp(source, destination): """A shell-like cp, supporting wildcards. """ mv(source, destination, _action=shutil.copy) -def find(directory: str, exts: Union[Tuple[str, ...], str], exclude: bool = False, blacklist: Tuple[str, ...] = STD_BLACKLIST) -> List[str]: + +def find( + directory: str, + exts: Union[Tuple[str, ...], str], + exclude: bool = False, + blacklist: Tuple[str, ...] = STD_BLACKLIST, +) -> List[str]: """Recursively find files ending with the given extensions from the directory. :type directory: str @@ -160,17 +169,21 @@ def find(directory: str, exts: Union[Tuple[str, ...], str], exclude: bool = Fals if isinstance(exts, str): exts = (exts,) if exclude: + def match(filename: str, exts: Tuple[str, ...]) -> bool: for ext in exts: if filename.endswith(ext): return False return True + else: + def match(filename: str, exts: Tuple[str, ...]) -> bool: for ext in exts: if filename.endswith(ext): return True return False + files = [] for dirpath, dirnames, filenames in os.walk(directory): _handle_blacklist(blacklist, dirnames, filenames) @@ -182,7 +195,11 @@ def find(directory: str, exts: Union[Tuple[str, ...], str], exclude: bool = Fals return files -def globfind(directory: str, pattern: str, blacklist: Tuple[str, str, str, str, str, str, str, str] = STD_BLACKLIST) -> Iterator[str]: +def globfind( + directory: str, + pattern: str, + blacklist: Tuple[str, str, str, str, str, str, str, str] = STD_BLACKLIST, +) -> Iterator[str]: """Recursively finds files matching glob `pattern` under `directory`. This is an alternative to `logilab.common.shellutils.find`. @@ -209,21 +226,23 @@ def globfind(directory: str, pattern: str, blacklist: Tuple[str, str, str, str, for fname in fnmatch.filter(filenames, pattern): yield join(curdir, fname) + def unzip(archive, destdir): import zipfile + if not exists(destdir): os.mkdir(destdir) zfobj = zipfile.ZipFile(archive) for name in zfobj.namelist(): - if name.endswith('/'): + if name.endswith("/"): os.mkdir(join(destdir, name)) else: - outfile = open(join(destdir, name), 'wb') + outfile = open(join(destdir, name), "wb") outfile.write(zfobj.read(name)) outfile.close() -@deprecated('Use subprocess.Popen instead') +@deprecated("Use subprocess.Popen instead") class Execute: """This is a deadlock safe version of popen2 (no stdin), that returns an object with errorlevel, out and err. @@ -238,11 +257,13 @@ class Execute: class ProgressBar(object): """A simple text progression bar.""" - def __init__(self, nbops: int, size: int = 20, stream: StringIO = sys.stdout, title: str = '') -> None: + def __init__( + self, nbops: int, size: int = 20, stream: StringIO = sys.stdout, title: str = "" + ) -> None: if title: - self._fstr = '\r%s [%%-%ss]' % (title, int(size)) + self._fstr = "\r%s [%%-%ss]" % (title, int(size)) else: - self._fstr = '\r[%%-%ss]' % int(size) + self._fstr = "\r[%%-%ss]" % int(size) self._stream = stream self._total = nbops self._size = size @@ -280,42 +301,45 @@ class ProgressBar(object): else: self._current += offset - progress = int((float(self._current)/float(self._total))*self._size) + progress = int((float(self._current) / float(self._total)) * self._size) if progress > self._progress: self._progress = progress self.refresh() def refresh(self) -> None: """Refresh the progression bar display.""" - self._stream.write(self._fstr % ('=' * min(self._progress, self._size)) ) + self._stream.write(self._fstr % ("=" * min(self._progress, self._size))) if self._last_text_write_size or self._current_text: - template = ' %%-%is' % (self._last_text_write_size) + template = " %%-%is" % (self._last_text_write_size) text = self._current_text if text is None: - text = '' + text = "" self._stream.write(template % text) self._last_text_write_size = len(text.rstrip()) self._stream.flush() def finish(self): - self._stream.write('\n') + self._stream.write("\n") self._stream.flush() class DummyProgressBar(object): - __slots__ = ('text',) + __slots__ = ("text",) def refresh(self): pass + def update(self): pass + def finish(self): pass _MARKER = object() -class progress(object): + +class progress(object): def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True): self.nbops = nbops self.size = size @@ -326,26 +350,30 @@ class progress(object): def __enter__(self): if self.enabled: kwargs = {} - for attr in ('nbops', 'size', 'stream', 'title'): + for attr in ("nbops", "size", "stream", "title"): value = getattr(self, attr) if value is not _MARKER: kwargs[attr] = value self.pb = ProgressBar(**kwargs) else: - self.pb = DummyProgressBar() + self.pb = DummyProgressBar() return self.pb def __exit__(self, exc_type, exc_val, exc_tb): self.pb.finish() -class RawInput(object): - def __init__(self, input_function: Optional[Callable] = None, printer: Optional[Callable] = None, **kwargs: Any) -> None: - if 'input' in kwargs: - input_function = kwargs.pop('input') +class RawInput(object): + def __init__( + self, + input_function: Optional[Callable] = None, + printer: Optional[Callable] = None, + **kwargs: Any, + ) -> None: + if "input" in kwargs: + input_function = kwargs.pop("input") warnings.warn( - "'input' argument is deprecated," - "use 'input_function' instead", + "'input' argument is deprecated," "use 'input_function' instead", DeprecationWarning, ) self._input = input_function or input @@ -360,35 +388,36 @@ class RawInput(object): else: label = option[0].lower() if len(option) > 1: - label += '(%s)' % option[1:].lower() + label += "(%s)" % option[1:].lower() choices.append((option, label)) - prompt = "%s [%s]: " % (question, - '/'.join([opt[1] for opt in choices])) + prompt = "%s [%s]: " % (question, "/".join([opt[1] for opt in choices])) tries = 3 while tries > 0: answer = self._input(prompt).strip().lower() if not answer: return default - possible = [option for option, label in choices - if option.lower().startswith(answer)] + possible = [option for option, label in choices if option.lower().startswith(answer)] if len(possible) == 1: return possible[0] elif len(possible) == 0: - msg = '%s is not an option.' % answer + msg = "%s is not an option." % answer else: - msg = ('%s is an ambiguous answer, do you mean %s ?' % ( - answer, ' or '.join(possible))) + msg = "%s is an ambiguous answer, do you mean %s ?" % ( + answer, + " or ".join(possible), + ) if self._print: self._print(msg) else: print(msg) tries -= 1 - raise Exception('unable to get a sensible answer') + raise Exception("unable to get a sensible answer") def confirm(self, question: str, default_is_yes: bool = True) -> bool: - default = default_is_yes and 'y' or 'n' - answer = self.ask(question, ('y', 'n'), default) - return answer == 'y' + default = default_is_yes and "y" or "n" + answer = self.ask(question, ("y", "n"), default) + return answer == "y" + ASK = RawInput() @@ -398,15 +427,17 @@ def getlogin(): (man 3 getlogin) Another solution would be to use $LOGNAME, $USER or $USERNAME """ - if sys.platform != 'win32': - import pwd # Platforms: Unix + if sys.platform != "win32": + import pwd # Platforms: Unix + return pwd.getpwuid(os.getuid())[0] else: - return os.environ['USERNAME'] + return os.environ["USERNAME"] + def generate_password(length=8, vocab=string.ascii_letters + string.digits): """dumb password generation function""" - pwd = '' + pwd = "" for i in range(length): pwd += random.choice(vocab) return pwd |