diff options
author | David Lord <davidism@gmail.com> | 2020-03-06 13:50:04 -0800 |
---|---|---|
committer | David Lord <davidism@gmail.com> | 2020-03-06 13:50:04 -0800 |
commit | 93ba3ba112d2f8ba7bdd8b231e510f74dd0b037e (patch) | |
tree | 409b93ee7ec2209b9e52256ed77b0292f4e49720 /src/click/_compat.py | |
parent | 488739dfe0d51f415c7b20466648cc519962ecbb (diff) | |
download | click-93ba3ba112d2f8ba7bdd8b231e510f74dd0b037e.tar.gz |
apply black
Diffstat (limited to 'src/click/_compat.py')
-rw-r--r-- | src/click/_compat.py | 224 |
1 files changed, 121 insertions, 103 deletions
diff --git a/src/click/_compat.py b/src/click/_compat.py index 7b7a55f..6431fc2 100644 --- a/src/click/_compat.py +++ b/src/click/_compat.py @@ -6,12 +6,13 @@ import sys from weakref import WeakKeyDictionary PY2 = sys.version_info[0] == 2 -CYGWIN = sys.platform.startswith('cygwin') -MSYS2 = sys.platform.startswith('win') and ('GCC' in sys.version) +CYGWIN = sys.platform.startswith("cygwin") +MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version) # Determine local App Engine environment, per Google's own suggestion -APP_ENGINE = ('APPENGINE_RUNTIME' in os.environ and - 'Development/' in os.environ.get('SERVER_SOFTWARE', '')) -WIN = sys.platform.startswith('win') and not APP_ENGINE and not MSYS2 +APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get( + "SERVER_SOFTWARE", "" +) +WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2 DEFAULT_COLUMNS = 80 @@ -22,46 +23,57 @@ def get_filesystem_encoding(): return sys.getfilesystemencoding() or sys.getdefaultencoding() -def _make_text_stream(stream, encoding, errors, - force_readable=False, force_writable=False): +def _make_text_stream( + stream, encoding, errors, force_readable=False, force_writable=False +): if encoding is None: encoding = get_best_encoding(stream) if errors is None: - errors = 'replace' - return _NonClosingTextIOWrapper(stream, encoding, errors, - line_buffering=True, - force_readable=force_readable, - force_writable=force_writable) + errors = "replace" + return _NonClosingTextIOWrapper( + stream, + encoding, + errors, + line_buffering=True, + force_readable=force_readable, + force_writable=force_writable, + ) def is_ascii_encoding(encoding): """Checks if a given encoding is ascii.""" try: - return codecs.lookup(encoding).name == 'ascii' + return codecs.lookup(encoding).name == "ascii" except LookupError: return False def get_best_encoding(stream): """Returns the default stream encoding if not found.""" - rv = getattr(stream, 'encoding', None) or sys.getdefaultencoding() + rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() if is_ascii_encoding(rv): - return 'utf-8' + return "utf-8" return rv class _NonClosingTextIOWrapper(io.TextIOWrapper): - - def __init__(self, stream, encoding, errors, - force_readable=False, force_writable=False, **extra): - self._stream = stream = _FixupStream(stream, force_readable, - force_writable) + def __init__( + self, + stream, + encoding, + errors, + force_readable=False, + force_writable=False, + **extra + ): + self._stream = stream = _FixupStream(stream, force_readable, force_writable) io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra) # The io module is a place where the Python 3 text behavior # was forced upon Python 2, so we need to unbreak # it to look like Python 2. if PY2: + def write(self, x): if isinstance(x, str) or is_bytes(x): try: @@ -105,7 +117,7 @@ class _FixupStream(object): return getattr(self._stream, name) def read1(self, size): - f = getattr(self._stream, 'read1', None) + f = getattr(self._stream, "read1", None) if f is not None: return f(size) # We only dispatch to readline instead of read in Python 2 as we @@ -118,7 +130,7 @@ class _FixupStream(object): def readable(self): if self._force_readable: return True - x = getattr(self._stream, 'readable', None) + x = getattr(self._stream, "readable", None) if x is not None: return x() try: @@ -130,20 +142,20 @@ class _FixupStream(object): def writable(self): if self._force_writable: return True - x = getattr(self._stream, 'writable', None) + x = getattr(self._stream, "writable", None) if x is not None: return x() try: - self._stream.write('') + self._stream.write("") except Exception: try: - self._stream.write(b'') + self._stream.write(b"") except Exception: return False return True def seekable(self): - x = getattr(self._stream, 'seekable', None) + x = getattr(self._stream, "seekable", None) if x is not None: return x() try: @@ -166,7 +178,7 @@ if PY2: def is_bytes(x): return isinstance(x, (buffer, bytearray)) - _identifier_re = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') + _identifier_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$") # For Windows, we need to force stdout/stdin/stderr to binary if it's # fetched for that. This obviously is not the most correct way to do @@ -194,6 +206,7 @@ if PY2: except ImportError: pass else: + def set_binary_mode(f): try: fileno = f.fileno() @@ -208,6 +221,7 @@ if PY2: except ImportError: pass else: + def set_binary_mode(f): try: fileno = f.fileno() @@ -225,42 +239,42 @@ if PY2: return set_binary_mode(sys.stdin) def get_binary_stdout(): - _wrap_std_stream('stdout') + _wrap_std_stream("stdout") return set_binary_mode(sys.stdout) def get_binary_stderr(): - _wrap_std_stream('stderr') + _wrap_std_stream("stderr") return set_binary_mode(sys.stderr) def get_text_stdin(encoding=None, errors=None): rv = _get_windows_console_stream(sys.stdin, encoding, errors) if rv is not None: return rv - return _make_text_stream(sys.stdin, encoding, errors, - force_readable=True) + return _make_text_stream(sys.stdin, encoding, errors, force_readable=True) def get_text_stdout(encoding=None, errors=None): - _wrap_std_stream('stdout') + _wrap_std_stream("stdout") rv = _get_windows_console_stream(sys.stdout, encoding, errors) if rv is not None: return rv - return _make_text_stream(sys.stdout, encoding, errors, - force_writable=True) + return _make_text_stream(sys.stdout, encoding, errors, force_writable=True) def get_text_stderr(encoding=None, errors=None): - _wrap_std_stream('stderr') + _wrap_std_stream("stderr") rv = _get_windows_console_stream(sys.stderr, encoding, errors) if rv is not None: return rv - return _make_text_stream(sys.stderr, encoding, errors, - force_writable=True) + return _make_text_stream(sys.stderr, encoding, errors, force_writable=True) def filename_to_ui(value): if isinstance(value, bytes): - value = value.decode(get_filesystem_encoding(), 'replace') + value = value.decode(get_filesystem_encoding(), "replace") return value + + else: import io + text_type = str raw_input = input string_types = (str,) @@ -284,10 +298,10 @@ else: def _is_binary_writer(stream, default=False): try: - stream.write(b'') + stream.write(b"") except Exception: try: - stream.write('') + stream.write("") return False except Exception: pass @@ -302,7 +316,7 @@ else: if _is_binary_reader(stream, False): return stream - buf = getattr(stream, 'buffer', None) + buf = getattr(stream, "buffer", None) # Same situation here; this time we assume that the buffer is # actually binary in case it's closed. @@ -317,7 +331,7 @@ else: if _is_binary_writer(stream, False): return stream - buf = getattr(stream, 'buffer', None) + buf = getattr(stream, "buffer", None) # Same situation here; this time we assume that the buffer is # actually binary in case it's closed. @@ -330,7 +344,7 @@ else: # to ASCII. This appears to happen in certain unittest # environments. It's not quite clear what the correct behavior is # but this at least will force Click to recover somehow. - return is_ascii_encoding(getattr(stream, 'encoding', None) or 'ascii') + return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") def _is_compat_stream_attr(stream, attr, value): """A stream attribute is compatible if it is equal to the @@ -344,10 +358,9 @@ else: """Check if a stream's encoding and errors attributes are compatible with the desired values. """ - return ( - _is_compat_stream_attr(stream, "encoding", encoding) - and _is_compat_stream_attr(stream, "errors", errors) - ) + return _is_compat_stream_attr( + stream, "encoding", encoding + ) and _is_compat_stream_attr(stream, "errors", errors) def _force_correct_text_stream( text_stream, @@ -363,12 +376,8 @@ else: else: # If the stream looks compatible, and won't default to a # misconfigured ascii encoding, return it as-is. - if ( - _is_compatible_text_stream(text_stream, encoding, errors) - and not ( - encoding is None - and _stream_is_misconfigured(text_stream) - ) + if _is_compatible_text_stream(text_stream, encoding, errors) and not ( + encoding is None and _stream_is_misconfigured(text_stream) ): return text_stream @@ -418,56 +427,61 @@ else: def get_binary_stdin(): reader = _find_binary_reader(sys.stdin) if reader is None: - raise RuntimeError('Was not able to determine binary ' - 'stream for sys.stdin.') + raise RuntimeError( + "Was not able to determine binary stream for sys.stdin." + ) return reader def get_binary_stdout(): writer = _find_binary_writer(sys.stdout) if writer is None: - raise RuntimeError('Was not able to determine binary ' - 'stream for sys.stdout.') + raise RuntimeError( + "Was not able to determine binary stream for sys.stdout." + ) return writer def get_binary_stderr(): writer = _find_binary_writer(sys.stderr) if writer is None: - raise RuntimeError('Was not able to determine binary ' - 'stream for sys.stderr.') + raise RuntimeError( + "Was not able to determine binary stream for sys.stderr." + ) return writer def get_text_stdin(encoding=None, errors=None): rv = _get_windows_console_stream(sys.stdin, encoding, errors) if rv is not None: return rv - return _force_correct_text_reader(sys.stdin, encoding, errors, - force_readable=True) + return _force_correct_text_reader( + sys.stdin, encoding, errors, force_readable=True + ) def get_text_stdout(encoding=None, errors=None): rv = _get_windows_console_stream(sys.stdout, encoding, errors) if rv is not None: return rv - return _force_correct_text_writer(sys.stdout, encoding, errors, - force_writable=True) + return _force_correct_text_writer( + sys.stdout, encoding, errors, force_writable=True + ) def get_text_stderr(encoding=None, errors=None): rv = _get_windows_console_stream(sys.stderr, encoding, errors) if rv is not None: return rv - return _force_correct_text_writer(sys.stderr, encoding, errors, - force_writable=True) + return _force_correct_text_writer( + sys.stderr, encoding, errors, force_writable=True + ) def filename_to_ui(value): if isinstance(value, bytes): - value = value.decode(get_filesystem_encoding(), 'replace') + value = value.decode(get_filesystem_encoding(), "replace") else: - value = value.encode('utf-8', 'surrogateescape') \ - .decode('utf-8', 'replace') + value = value.encode("utf-8", "surrogateescape").decode("utf-8", "replace") return value def get_streerror(e, default=None): - if hasattr(e, 'strerror'): + if hasattr(e, "strerror"): msg = e.strerror else: if default is not None: @@ -475,7 +489,7 @@ def get_streerror(e, default=None): else: msg = str(e) if isinstance(msg, bytes): - msg = msg.decode('utf-8', 'replace') + msg = msg.decode("utf-8", "replace") return msg @@ -501,13 +515,13 @@ def _wrap_io_open(file, mode, encoding, errors): return _make_text_stream(f, **kwargs) -def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False): +def open_stream(filename, mode="r", encoding=None, errors="strict", atomic=False): binary = "b" in mode # Standard streams first. These are simple because they don't need # special handling for the atomic flag. It's entirely ignored. - if filename == '-': - if any(m in mode for m in ['w', 'a', 'x']): + if filename == "-": + if any(m in mode for m in ["w", "a", "x"]): if binary: return get_binary_stdout(), False return get_text_stdout(encoding=encoding, errors=errors), False @@ -520,17 +534,17 @@ def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False return _wrap_io_open(filename, mode, encoding, errors), True # Some usability stuff for atomic writes - if 'a' in mode: + if "a" in mode: raise ValueError( - 'Appending to an existing file is not supported, because that ' - 'would involve an expensive `copy`-operation to a temporary ' - 'file. Open the file in normal `w`-mode and copy explicitly ' - 'if that\'s what you\'re after.' + "Appending to an existing file is not supported, because that " + "would involve an expensive `copy`-operation to a temporary " + "file. Open the file in normal `w`-mode and copy explicitly " + "if that's what you're after." ) - if 'x' in mode: - raise ValueError('Use the `overwrite`-parameter instead.') - if 'w' not in mode: - raise ValueError('Atomic writes only make sense with `w`-mode.') + if "x" in mode: + raise ValueError("Use the `overwrite`-parameter instead.") + if "w" not in mode: + raise ValueError("Atomic writes only make sense with `w`-mode.") # Atomic writes are more complicated. They work by opening a file # as a proxy in the same folder and then using the fdopen @@ -547,12 +561,12 @@ def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False flags = os.O_RDWR | os.O_CREAT | os.O_EXCL if binary: - flags |= getattr(os, 'O_BINARY', 0) + flags |= getattr(os, "O_BINARY", 0) while True: tmp_filename = os.path.join( os.path.dirname(filename), - '.__atomic-write%08x' % (random.randrange(1 << 32),), + ".__atomic-write%08x" % (random.randrange(1 << 32),), ) try: fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) @@ -575,7 +589,7 @@ def open_stream(filename, mode='r', encoding=None, errors='strict', atomic=False # Used in a destructor call, needs extra protection from interpreter cleanup. -if hasattr(os, 'replace'): +if hasattr(os, "replace"): _replace = os.replace _can_replace = True else: @@ -584,7 +598,6 @@ else: class _AtomicFile(object): - def __init__(self, f, tmp_filename, real_filename): self._f = f self._tmp_filename = tmp_filename @@ -626,7 +639,7 @@ get_winterm_size = None def strip_ansi(value): - return _ansi_re.sub('', value) + return _ansi_re.sub("", value) def _is_jupyter_kernel_output(stream): @@ -660,16 +673,18 @@ if WIN: def _get_argv_encoding(): import locale + return locale.getpreferredencoding() if PY2: - def raw_input(prompt=''): + + def raw_input(prompt=""): sys.stderr.flush() if prompt: stdout = _default_text_stdout() stdout.write(prompt) stdin = _default_text_stdin() - return stdin.readline().rstrip('\r\n') + return stdin.readline().rstrip("\r\n") try: import colorama @@ -711,11 +726,15 @@ if WIN: def get_winterm_size(): win = colorama.win32.GetConsoleScreenBufferInfo( - colorama.win32.STDOUT).srWindow + colorama.win32.STDOUT + ).srWindow return win.Right - win.Left, win.Bottom - win.Top + + else: + def _get_argv_encoding(): - return getattr(sys.stdin, 'encoding', None) or get_filesystem_encoding() + return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding() _get_windows_console_stream = lambda *x: None _wrap_std_stream = lambda *x: None @@ -734,6 +753,7 @@ def isatty(stream): def _make_cached_stream_func(src_func, wrapper_func): cache = WeakKeyDictionary() + def func(): stream = src_func() try: @@ -749,25 +769,23 @@ def _make_cached_stream_func(src_func, wrapper_func): except Exception: pass return rv + return func -_default_text_stdin = _make_cached_stream_func( - lambda: sys.stdin, get_text_stdin) -_default_text_stdout = _make_cached_stream_func( - lambda: sys.stdout, get_text_stdout) -_default_text_stderr = _make_cached_stream_func( - lambda: sys.stderr, get_text_stderr) +_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) +_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) +_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) binary_streams = { - 'stdin': get_binary_stdin, - 'stdout': get_binary_stdout, - 'stderr': get_binary_stderr, + "stdin": get_binary_stdin, + "stdout": get_binary_stdout, + "stderr": get_binary_stderr, } text_streams = { - 'stdin': get_text_stdin, - 'stdout': get_text_stdout, - 'stderr': get_text_stderr, + "stdin": get_text_stdin, + "stdout": get_text_stdout, + "stderr": get_text_stderr, } |