summaryrefslogtreecommitdiff
path: root/src/click/_compat.py
diff options
context:
space:
mode:
authorDavid Lord <davidism@gmail.com>2020-03-06 13:50:04 -0800
committerDavid Lord <davidism@gmail.com>2020-03-06 13:50:04 -0800
commit93ba3ba112d2f8ba7bdd8b231e510f74dd0b037e (patch)
tree409b93ee7ec2209b9e52256ed77b0292f4e49720 /src/click/_compat.py
parent488739dfe0d51f415c7b20466648cc519962ecbb (diff)
downloadclick-93ba3ba112d2f8ba7bdd8b231e510f74dd0b037e.tar.gz
apply black
Diffstat (limited to 'src/click/_compat.py')
-rw-r--r--src/click/_compat.py224
1 files changed, 121 insertions, 103 deletions
diff --git a/src/click/_compat.py b/src/click/_compat.py
index 7b7a55f..6431fc2 100644
--- a/src/click/_compat.py
+++ b/src/click/_compat.py
@@ -6,12 +6,13 @@ import sys
from weakref import WeakKeyDictionary
PY2 = sys.version_info[0] == 2
-CYGWIN = sys.platform.startswith('cygwin')
-MSYS2 = sys.platform.startswith('win') and ('GCC' in sys.version)
+CYGWIN = sys.platform.startswith("cygwin")
+MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
# Determine local App Engine environment, per Google's own suggestion
-APP_ENGINE = ('APPENGINE_RUNTIME' in os.environ and
- 'Development/' in os.environ.get('SERVER_SOFTWARE', ''))
-WIN = sys.platform.startswith('win') and not APP_ENGINE and not MSYS2
+APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
+ "SERVER_SOFTWARE", ""
+)
+WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
DEFAULT_COLUMNS = 80
@@ -22,46 +23,57 @@ def get_filesystem_encoding():
return sys.getfilesystemencoding() or sys.getdefaultencoding()
-def _make_text_stream(stream, encoding, errors,
- force_readable=False, force_writable=False):
+def _make_text_stream(
+ stream, encoding, errors, force_readable=False, force_writable=False
+):
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
- errors = 'replace'
- return _NonClosingTextIOWrapper(stream, encoding, errors,
- line_buffering=True,
- force_readable=force_readable,
- force_writable=force_writable)
+ errors = "replace"
+ return _NonClosingTextIOWrapper(
+ stream,
+ encoding,
+ errors,
+ line_buffering=True,
+ force_readable=force_readable,
+ force_writable=force_writable,
+ )
def is_ascii_encoding(encoding):
"""Checks if a given encoding is ascii."""
try:
- return codecs.lookup(encoding).name == 'ascii'
+ return codecs.lookup(encoding).name == "ascii"
except LookupError:
return False
def get_best_encoding(stream):
"""Returns the default stream encoding if not found."""
- rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding()
+ rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
if is_ascii_encoding(rv):
- return 'utf-8'
+ return "utf-8"
return rv
class _NonClosingTextIOWrapper(io.TextIOWrapper):
-
- def __init__(self, stream, encoding, errors,
- force_readable=False, force_writable=False, **extra):
- self._stream = stream = _FixupStream(stream, force_readable,
- force_writable)
+ def __init__(
+ self,
+ stream,
+ encoding,
+ errors,
+ force_readable=False,
+ force_writable=False,
+ **extra
+ ):
+ self._stream = stream = _FixupStream(stream, force_readable, force_writable)
io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
# The io module is a place where the Python 3 text behavior
# was forced upon Python 2, so we need to unbreak
# it to look like Python 2.
if PY2:
+
def write(self, x):
if isinstance(x, str) or is_bytes(x):
try:
@@ -105,7 +117,7 @@ class _FixupStream(object):
return getattr(self._stream, name)
def read1(self, size):
- f = getattr(self._stream, 'read1', None)
+ f = getattr(self._stream, "read1", None)
if f is not None:
return f(size)
# We only dispatch to readline instead of read in Python 2 as we
@@ -118,7 +130,7 @@ class _FixupStream(object):
def readable(self):
if self._force_readable:
return True
- x = getattr(self._stream, 'readable', None)
+ x = getattr(self._stream, "readable", None)
if x is not None:
return x()
try:
@@ -130,20 +142,20 @@ class _FixupStream(object):
def writable(self):
if self._force_writable:
return True
- x = getattr(self._stream, 'writable', None)
+ x = getattr(self._stream, "writable", None)
if x is not None:
return x()
try:
- self._stream.write('')
+ self._stream.write("")
except Exception:
try:
- self._stream.write(b'')
+ self._stream.write(b"")
except Exception:
return False
return True
def seekable(self):
- x = getattr(self._stream, 'seekable', None)
+ x = getattr(self._stream, "seekable", None)
if x is not None:
return x()
try:
@@ -166,7 +178,7 @@ if PY2:
def is_bytes(x):
return isinstance(x, (buffer, bytearray))
- _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
+ _identifier_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
# For Windows, we need to force stdout/stdin/stderr to binary if it's
# fetched for that. This obviously is not the most correct way to do
@@ -194,6 +206,7 @@ if PY2:
except ImportError:
pass
else:
+
def set_binary_mode(f):
try:
fileno = f.fileno()
@@ -208,6 +221,7 @@ if PY2:
except ImportError:
pass
else:
+
def set_binary_mode(f):
try:
fileno = f.fileno()
@@ -225,42 +239,42 @@ if PY2:
return set_binary_mode(sys.stdin)
def get_binary_stdout():
- _wrap_std_stream('stdout')
+ _wrap_std_stream("stdout")
return set_binary_mode(sys.stdout)
def get_binary_stderr():
- _wrap_std_stream('stderr')
+ _wrap_std_stream("stderr")
return set_binary_mode(sys.stderr)
def get_text_stdin(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdin, encoding, errors)
if rv is not None:
return rv
- return _make_text_stream(sys.stdin, encoding, errors,
- force_readable=True)
+ return _make_text_stream(sys.stdin, encoding, errors, force_readable=True)
def get_text_stdout(encoding=None, errors=None):
- _wrap_std_stream('stdout')
+ _wrap_std_stream("stdout")
rv = _get_windows_console_stream(sys.stdout, encoding, errors)
if rv is not None:
return rv
- return _make_text_stream(sys.stdout, encoding, errors,
- force_writable=True)
+ return _make_text_stream(sys.stdout, encoding, errors, force_writable=True)
def get_text_stderr(encoding=None, errors=None):
- _wrap_std_stream('stderr')
+ _wrap_std_stream("stderr")
rv = _get_windows_console_stream(sys.stderr, encoding, errors)
if rv is not None:
return rv
- return _make_text_stream(sys.stderr, encoding, errors,
- force_writable=True)
+ return _make_text_stream(sys.stderr, encoding, errors, force_writable=True)
def filename_to_ui(value):
if isinstance(value, bytes):
- value = value.decode(get_filesystem_encoding(), 'replace')
+ value = value.decode(get_filesystem_encoding(), "replace")
return value
+
+
else:
import io
+
text_type = str
raw_input = input
string_types = (str,)
@@ -284,10 +298,10 @@ else:
def _is_binary_writer(stream, default=False):
try:
- stream.write(b'')
+ stream.write(b"")
except Exception:
try:
- stream.write('')
+ stream.write("")
return False
except Exception:
pass
@@ -302,7 +316,7 @@ else:
if _is_binary_reader(stream, False):
return stream
- buf = getattr(stream, 'buffer', None)
+ buf = getattr(stream, "buffer", None)
# Same situation here; this time we assume that the buffer is
# actually binary in case it's closed.
@@ -317,7 +331,7 @@ else:
if _is_binary_writer(stream, False):
return stream
- buf = getattr(stream, 'buffer', None)
+ buf = getattr(stream, "buffer", None)
# Same situation here; this time we assume that the buffer is
# actually binary in case it's closed.
@@ -330,7 +344,7 @@ else:
# to ASCII. This appears to happen in certain unittest
# environments. It's not quite clear what the correct behavior is
# but this at least will force Click to recover somehow.
- return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii')
+ return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
def _is_compat_stream_attr(stream, attr, value):
"""A stream attribute is compatible if it is equal to the
@@ -344,10 +358,9 @@ else:
"""Check if a stream's encoding and errors attributes are
compatible with the desired values.
"""
- return (
- _is_compat_stream_attr(stream, "encoding", encoding)
- and _is_compat_stream_attr(stream, "errors", errors)
- )
+ return _is_compat_stream_attr(
+ stream, "encoding", encoding
+ ) and _is_compat_stream_attr(stream, "errors", errors)
def _force_correct_text_stream(
text_stream,
@@ -363,12 +376,8 @@ else:
else:
# If the stream looks compatible, and won't default to a
# misconfigured ascii encoding, return it as-is.
- if (
- _is_compatible_text_stream(text_stream, encoding, errors)
- and not (
- encoding is None
- and _stream_is_misconfigured(text_stream)
- )
+ if _is_compatible_text_stream(text_stream, encoding, errors) and not (
+ encoding is None and _stream_is_misconfigured(text_stream)
):
return text_stream
@@ -418,56 +427,61 @@ else:
def get_binary_stdin():
reader = _find_binary_reader(sys.stdin)
if reader is None:
- raise RuntimeError('Was not able to determine binary '
- 'stream for sys.stdin.')
+ raise RuntimeError(
+ "Was not able to determine binary stream for sys.stdin."
+ )
return reader
def get_binary_stdout():
writer = _find_binary_writer(sys.stdout)
if writer is None:
- raise RuntimeError('Was not able to determine binary '
- 'stream for sys.stdout.')
+ raise RuntimeError(
+ "Was not able to determine binary stream for sys.stdout."
+ )
return writer
def get_binary_stderr():
writer = _find_binary_writer(sys.stderr)
if writer is None:
- raise RuntimeError('Was not able to determine binary '
- 'stream for sys.stderr.')
+ raise RuntimeError(
+ "Was not able to determine binary stream for sys.stderr."
+ )
return writer
def get_text_stdin(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdin, encoding, errors)
if rv is not None:
return rv
- return _force_correct_text_reader(sys.stdin, encoding, errors,
- force_readable=True)
+ return _force_correct_text_reader(
+ sys.stdin, encoding, errors, force_readable=True
+ )
def get_text_stdout(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdout, encoding, errors)
if rv is not None:
return rv
- return _force_correct_text_writer(sys.stdout, encoding, errors,
- force_writable=True)
+ return _force_correct_text_writer(
+ sys.stdout, encoding, errors, force_writable=True
+ )
def get_text_stderr(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stderr, encoding, errors)
if rv is not None:
return rv
- return _force_correct_text_writer(sys.stderr, encoding, errors,
- force_writable=True)
+ return _force_correct_text_writer(
+ sys.stderr, encoding, errors, force_writable=True
+ )
def filename_to_ui(value):
if isinstance(value, bytes):
- value = value.decode(get_filesystem_encoding(), 'replace')
+ value = value.decode(get_filesystem_encoding(), "replace")
else:
- value = value.encode('utf-8', 'surrogateescape') \
- .decode('utf-8', 'replace')
+ value = value.encode("utf-8", "surrogateescape").decode("utf-8", "replace")
return value
def get_streerror(e, default=None):
- if hasattr(e, 'strerror'):
+ if hasattr(e, "strerror"):
msg = e.strerror
else:
if default is not None:
@@ -475,7 +489,7 @@ def get_streerror(e, default=None):
else:
msg = str(e)
if isinstance(msg, bytes):
- msg = msg.decode('utf-8', 'replace')
+ msg = msg.decode("utf-8", "replace")
return msg
@@ -501,13 +515,13 @@ def _wrap_io_open(file, mode, encoding, errors):
return _make_text_stream(f, **kwargs)
-def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False):
+def open_stream(filename, mode="r", encoding=None, errors="strict", atomic=False):
binary = "b" in mode
# Standard streams first. These are simple because they don't need
# special handling for the atomic flag. It's entirely ignored.
- if filename == '-':
- if any(m in mode for m in ['w', 'a', 'x']):
+ if filename == "-":
+ if any(m in mode for m in ["w", "a", "x"]):
if binary:
return get_binary_stdout(), False
return get_text_stdout(encoding=encoding, errors=errors), False
@@ -520,17 +534,17 @@ def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False
return _wrap_io_open(filename, mode, encoding, errors), True
# Some usability stuff for atomic writes
- if 'a' in mode:
+ if "a" in mode:
raise ValueError(
- 'Appending to an existing file is not supported, because that '
- 'would involve an expensive `copy`-operation to a temporary '
- 'file. Open the file in normal `w`-mode and copy explicitly '
- 'if that\'s what you\'re after.'
+ "Appending to an existing file is not supported, because that "
+ "would involve an expensive `copy`-operation to a temporary "
+ "file. Open the file in normal `w`-mode and copy explicitly "
+ "if that's what you're after."
)
- if 'x' in mode:
- raise ValueError('Use the `overwrite`-parameter instead.')
- if 'w' not in mode:
- raise ValueError('Atomic writes only make sense with `w`-mode.')
+ if "x" in mode:
+ raise ValueError("Use the `overwrite`-parameter instead.")
+ if "w" not in mode:
+ raise ValueError("Atomic writes only make sense with `w`-mode.")
# Atomic writes are more complicated. They work by opening a file
# as a proxy in the same folder and then using the fdopen
@@ -547,12 +561,12 @@ def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False
flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
if binary:
- flags |= getattr(os, 'O_BINARY', 0)
+ flags |= getattr(os, "O_BINARY", 0)
while True:
tmp_filename = os.path.join(
os.path.dirname(filename),
- '.__atomic-write%08x' % (random.randrange(1 << 32),),
+ ".__atomic-write%08x" % (random.randrange(1 << 32),),
)
try:
fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
@@ -575,7 +589,7 @@ def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False
# Used in a destructor call, needs extra protection from interpreter cleanup.
-if hasattr(os, 'replace'):
+if hasattr(os, "replace"):
_replace = os.replace
_can_replace = True
else:
@@ -584,7 +598,6 @@ else:
class _AtomicFile(object):
-
def __init__(self, f, tmp_filename, real_filename):
self._f = f
self._tmp_filename = tmp_filename
@@ -626,7 +639,7 @@ get_winterm_size = None
def strip_ansi(value):
- return _ansi_re.sub('', value)
+ return _ansi_re.sub("", value)
def _is_jupyter_kernel_output(stream):
@@ -660,16 +673,18 @@ if WIN:
def _get_argv_encoding():
import locale
+
return locale.getpreferredencoding()
if PY2:
- def raw_input(prompt=''):
+
+ def raw_input(prompt=""):
sys.stderr.flush()
if prompt:
stdout = _default_text_stdout()
stdout.write(prompt)
stdin = _default_text_stdin()
- return stdin.readline().rstrip('\r\n')
+ return stdin.readline().rstrip("\r\n")
try:
import colorama
@@ -711,11 +726,15 @@ if WIN:
def get_winterm_size():
win = colorama.win32.GetConsoleScreenBufferInfo(
- colorama.win32.STDOUT).srWindow
+ colorama.win32.STDOUT
+ ).srWindow
return win.Right - win.Left, win.Bottom - win.Top
+
+
else:
+
def _get_argv_encoding():
- return getattr(sys.stdin, 'encoding', None) or get_filesystem_encoding()
+ return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
_get_windows_console_stream = lambda *x: None
_wrap_std_stream = lambda *x: None
@@ -734,6 +753,7 @@ def isatty(stream):
def _make_cached_stream_func(src_func, wrapper_func):
cache = WeakKeyDictionary()
+
def func():
stream = src_func()
try:
@@ -749,25 +769,23 @@ def _make_cached_stream_func(src_func, wrapper_func):
except Exception:
pass
return rv
+
return func
-_default_text_stdin = _make_cached_stream_func(
- lambda: sys.stdin, get_text_stdin)
-_default_text_stdout = _make_cached_stream_func(
- lambda: sys.stdout, get_text_stdout)
-_default_text_stderr = _make_cached_stream_func(
- lambda: sys.stderr, get_text_stderr)
+_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
+_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
+_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
binary_streams = {
- 'stdin': get_binary_stdin,
- 'stdout': get_binary_stdout,
- 'stderr': get_binary_stderr,
+ "stdin": get_binary_stdin,
+ "stdout": get_binary_stdout,
+ "stderr": get_binary_stderr,
}
text_streams = {
- 'stdin': get_text_stdin,
- 'stdout': get_text_stdout,
- 'stderr': get_text_stderr,
+ "stdin": get_text_stdin,
+ "stdout": get_text_stdout,
+ "stderr": get_text_stderr,
}