summaryrefslogtreecommitdiff
path: root/logilab/common/shellutils.py
diff options
context:
space:
mode:
authorLaurent Peuch <cortex@worlddomination.be>2020-04-01 00:11:10 +0200
committerLaurent Peuch <cortex@worlddomination.be>2020-04-01 00:11:10 +0200
commitb8899451fa861b04568e2a0bb4e3fe4acc0daee3 (patch)
tree1446c809e19b5571b31b1999246aa0e50b19f5c8 /logilab/common/shellutils.py
parent32cd73810056594f55eff0ffafebbdeb50c7a860 (diff)
downloadlogilab-common-b8899451fa861b04568e2a0bb4e3fe4acc0daee3.tar.gz
Black the whole code base
Diffstat (limited to 'logilab/common/shellutils.py')
-rw-r--r--logilab/common/shellutils.py119
1 files changed, 75 insertions, 44 deletions
diff --git a/logilab/common/shellutils.py b/logilab/common/shellutils.py
index 2764723..557e45d 100644
--- a/logilab/common/shellutils.py
+++ b/logilab/common/shellutils.py
@@ -46,7 +46,6 @@ from logilab.common.deprecation import deprecated
class tempdir(object):
-
def __enter__(self):
self.path = tempfile.mkdtemp()
return self.path
@@ -82,7 +81,8 @@ def chown(path, login=None, group=None):
try:
uid = int(login)
except ValueError:
- import pwd # Platforms: Unix
+ import pwd # Platforms: Unix
+
uid = pwd.getpwnam(login).pw_uid
if group is None:
gid = -1
@@ -91,9 +91,11 @@ def chown(path, login=None, group=None):
gid = int(group)
except ValueError:
import grp
+
gid = grp.getgrnam(group).gr_gid
os.chown(path, uid, gid)
+
def mv(source, destination, _action=shutil.move):
"""A shell-like mv, supporting wildcards.
"""
@@ -106,14 +108,14 @@ def mv(source, destination, _action=shutil.move):
try:
source = sources[0]
except IndexError:
- raise OSError('No file matching %s' % source)
+ raise OSError("No file matching %s" % source)
if isdir(destination) and exists(destination):
destination = join(destination, basename(source))
try:
_action(source, destination)
except OSError as ex:
- raise OSError('Unable to move %r to %r (%s)' % (
- source, destination, ex))
+ raise OSError("Unable to move %r to %r (%s)" % (source, destination, ex))
+
def rm(*files):
"""A shell-like rm, supporting wildcards.
@@ -127,12 +129,19 @@ def rm(*files):
else:
os.remove(filename)
+
def cp(source, destination):
"""A shell-like cp, supporting wildcards.
"""
mv(source, destination, _action=shutil.copy)
-def find(directory: str, exts: Union[Tuple[str, ...], str], exclude: bool = False, blacklist: Tuple[str, ...] = STD_BLACKLIST) -> List[str]:
+
+def find(
+ directory: str,
+ exts: Union[Tuple[str, ...], str],
+ exclude: bool = False,
+ blacklist: Tuple[str, ...] = STD_BLACKLIST,
+) -> List[str]:
"""Recursively find files ending with the given extensions from the directory.
:type directory: str
@@ -160,17 +169,21 @@ def find(directory: str, exts: Union[Tuple[str, ...], str], exclude: bool = Fals
if isinstance(exts, str):
exts = (exts,)
if exclude:
+
def match(filename: str, exts: Tuple[str, ...]) -> bool:
for ext in exts:
if filename.endswith(ext):
return False
return True
+
else:
+
def match(filename: str, exts: Tuple[str, ...]) -> bool:
for ext in exts:
if filename.endswith(ext):
return True
return False
+
files = []
for dirpath, dirnames, filenames in os.walk(directory):
_handle_blacklist(blacklist, dirnames, filenames)
@@ -182,7 +195,11 @@ def find(directory: str, exts: Union[Tuple[str, ...], str], exclude: bool = Fals
return files
-def globfind(directory: str, pattern: str, blacklist: Tuple[str, str, str, str, str, str, str, str] = STD_BLACKLIST) -> Iterator[str]:
+def globfind(
+ directory: str,
+ pattern: str,
+ blacklist: Tuple[str, str, str, str, str, str, str, str] = STD_BLACKLIST,
+) -> Iterator[str]:
"""Recursively finds files matching glob `pattern` under `directory`.
This is an alternative to `logilab.common.shellutils.find`.
@@ -209,21 +226,23 @@ def globfind(directory: str, pattern: str, blacklist: Tuple[str, str, str, str,
for fname in fnmatch.filter(filenames, pattern):
yield join(curdir, fname)
+
def unzip(archive, destdir):
import zipfile
+
if not exists(destdir):
os.mkdir(destdir)
zfobj = zipfile.ZipFile(archive)
for name in zfobj.namelist():
- if name.endswith('/'):
+ if name.endswith("/"):
os.mkdir(join(destdir, name))
else:
- outfile = open(join(destdir, name), 'wb')
+ outfile = open(join(destdir, name), "wb")
outfile.write(zfobj.read(name))
outfile.close()
-@deprecated('Use subprocess.Popen instead')
+@deprecated("Use subprocess.Popen instead")
class Execute:
"""This is a deadlock safe version of popen2 (no stdin), that returns
an object with errorlevel, out and err.
@@ -238,11 +257,13 @@ class Execute:
class ProgressBar(object):
"""A simple text progression bar."""
- def __init__(self, nbops: int, size: int = 20, stream: StringIO = sys.stdout, title: str = '') -> None:
+ def __init__(
+ self, nbops: int, size: int = 20, stream: StringIO = sys.stdout, title: str = ""
+ ) -> None:
if title:
- self._fstr = '\r%s [%%-%ss]' % (title, int(size))
+ self._fstr = "\r%s [%%-%ss]" % (title, int(size))
else:
- self._fstr = '\r[%%-%ss]' % int(size)
+ self._fstr = "\r[%%-%ss]" % int(size)
self._stream = stream
self._total = nbops
self._size = size
@@ -280,42 +301,45 @@ class ProgressBar(object):
else:
self._current += offset
- progress = int((float(self._current)/float(self._total))*self._size)
+ progress = int((float(self._current) / float(self._total)) * self._size)
if progress > self._progress:
self._progress = progress
self.refresh()
def refresh(self) -> None:
"""Refresh the progression bar display."""
- self._stream.write(self._fstr % ('=' * min(self._progress, self._size)) )
+ self._stream.write(self._fstr % ("=" * min(self._progress, self._size)))
if self._last_text_write_size or self._current_text:
- template = ' %%-%is' % (self._last_text_write_size)
+ template = " %%-%is" % (self._last_text_write_size)
text = self._current_text
if text is None:
- text = ''
+ text = ""
self._stream.write(template % text)
self._last_text_write_size = len(text.rstrip())
self._stream.flush()
def finish(self):
- self._stream.write('\n')
+ self._stream.write("\n")
self._stream.flush()
class DummyProgressBar(object):
- __slots__ = ('text',)
+ __slots__ = ("text",)
def refresh(self):
pass
+
def update(self):
pass
+
def finish(self):
pass
_MARKER = object()
-class progress(object):
+
+class progress(object):
def __init__(self, nbops=_MARKER, size=_MARKER, stream=_MARKER, title=_MARKER, enabled=True):
self.nbops = nbops
self.size = size
@@ -326,26 +350,30 @@ class progress(object):
def __enter__(self):
if self.enabled:
kwargs = {}
- for attr in ('nbops', 'size', 'stream', 'title'):
+ for attr in ("nbops", "size", "stream", "title"):
value = getattr(self, attr)
if value is not _MARKER:
kwargs[attr] = value
self.pb = ProgressBar(**kwargs)
else:
- self.pb = DummyProgressBar()
+ self.pb = DummyProgressBar()
return self.pb
def __exit__(self, exc_type, exc_val, exc_tb):
self.pb.finish()
-class RawInput(object):
- def __init__(self, input_function: Optional[Callable] = None, printer: Optional[Callable] = None, **kwargs: Any) -> None:
- if 'input' in kwargs:
- input_function = kwargs.pop('input')
+class RawInput(object):
+ def __init__(
+ self,
+ input_function: Optional[Callable] = None,
+ printer: Optional[Callable] = None,
+ **kwargs: Any,
+ ) -> None:
+ if "input" in kwargs:
+ input_function = kwargs.pop("input")
warnings.warn(
- "'input' argument is deprecated,"
- "use 'input_function' instead",
+ "'input' argument is deprecated," "use 'input_function' instead",
DeprecationWarning,
)
self._input = input_function or input
@@ -360,35 +388,36 @@ class RawInput(object):
else:
label = option[0].lower()
if len(option) > 1:
- label += '(%s)' % option[1:].lower()
+ label += "(%s)" % option[1:].lower()
choices.append((option, label))
- prompt = "%s [%s]: " % (question,
- '/'.join([opt[1] for opt in choices]))
+ prompt = "%s [%s]: " % (question, "/".join([opt[1] for opt in choices]))
tries = 3
while tries > 0:
answer = self._input(prompt).strip().lower()
if not answer:
return default
- possible = [option for option, label in choices
- if option.lower().startswith(answer)]
+ possible = [option for option, label in choices if option.lower().startswith(answer)]
if len(possible) == 1:
return possible[0]
elif len(possible) == 0:
- msg = '%s is not an option.' % answer
+ msg = "%s is not an option." % answer
else:
- msg = ('%s is an ambiguous answer, do you mean %s ?' % (
- answer, ' or '.join(possible)))
+ msg = "%s is an ambiguous answer, do you mean %s ?" % (
+ answer,
+ " or ".join(possible),
+ )
if self._print:
self._print(msg)
else:
print(msg)
tries -= 1
- raise Exception('unable to get a sensible answer')
+ raise Exception("unable to get a sensible answer")
def confirm(self, question: str, default_is_yes: bool = True) -> bool:
- default = default_is_yes and 'y' or 'n'
- answer = self.ask(question, ('y', 'n'), default)
- return answer == 'y'
+ default = default_is_yes and "y" or "n"
+ answer = self.ask(question, ("y", "n"), default)
+ return answer == "y"
+
ASK = RawInput()
@@ -398,15 +427,17 @@ def getlogin():
(man 3 getlogin)
Another solution would be to use $LOGNAME, $USER or $USERNAME
"""
- if sys.platform != 'win32':
- import pwd # Platforms: Unix
+ if sys.platform != "win32":
+ import pwd # Platforms: Unix
+
return pwd.getpwuid(os.getuid())[0]
else:
- return os.environ['USERNAME']
+ return os.environ["USERNAME"]
+
def generate_password(length=8, vocab=string.ascii_letters + string.digits):
"""dumb password generation function"""
- pwd = ''
+ pwd = ""
for i in range(length):
pwd += random.choice(vocab)
return pwd