diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2019-09-30 12:23:00 -0400 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2019-12-02 21:06:53 -0500 |
commit | 004462b40ea156b783456463b042a8f71bd22d1e (patch) | |
tree | 7de6b49927dbbc86455461d532a3a98e21f0adcd /paramiko/config.py | |
parent | c99388364bb840677e9ea27c7755f4a0af621e1b (diff) | |
download | paramiko-004462b40ea156b783456463b042a8f71bd22d1e.tar.gz |
Base case re #717 works now.
Huge ass squashed commit because I was experimenting
with "commit entire feature at once so you do not leave
broken tests around to break bisecting". Not sure it's worth it,
at least not for large-ish, overhauling-existing-code feature adds.
Breaking the work up over months did not help either, L M A O
Diffstat (limited to 'paramiko/config.py')
-rw-r--r-- | paramiko/config.py | 352 |
1 files changed, 270 insertions, 82 deletions
diff --git a/paramiko/config.py b/paramiko/config.py index 5336454c..b668be69 100644 --- a/paramiko/config.py +++ b/paramiko/config.py @@ -22,14 +22,22 @@ Configuration file (aka ``ssh_config``) support. """ import fnmatch +import getpass import os import re import shlex import socket +from functools import partial from .py3compat import StringIO -from .ssh_exception import CouldNotCanonicalize +invoke, invoke_import_error = None, None +try: + import invoke +except ImportError as e: + invoke_import_error = e + +from .ssh_exception import CouldNotCanonicalize, ConfigParseError SSH_PORT = 22 @@ -48,6 +56,17 @@ class SSHConfig(object): SETTINGS_REGEX = re.compile(r"(\w+)(?:\s*=\s*|\s+)(.+)") + # TODO: do a full scan of ssh.c & friends to make sure we're fully + # compatible across the board, e.g. OpenSSH 8.1 added %n to ProxyCommand. + TOKENS_BY_CONFIG_KEY = { + "controlpath": ["%h", "%l", "%L", "%n", "%p", "%r", "%u"], + "identityfile": ["~", "%d", "%h", "%l", "%u", "%r"], + "proxycommand": ["~", "%h", "%p", "%r"], + # Doesn't seem worth making this 'special' for now, it will fit well + # enough (no actual match-exec config key to be confused with). + "match-exec": ["%d", "%h", "%L", "%l", "%n", "%p", "%r", "%u"], + } + def __init__(self): """ Create a new OpenSSH config object. @@ -105,28 +124,44 @@ class SSHConfig(object): :param file_obj: a file-like object to read the config file from """ - host = {"host": ["*"], "config": {}} + # Start out w/ implicit/anonymous global host-like block to hold + # anything not contained by an explicit one. + context = {"host": ["*"], "config": {}} for line in file_obj: # Strip any leading or trailing whitespace from the line. # Refer to https://github.com/paramiko/paramiko/issues/499 line = line.strip() + # Skip blanks, comments if not line or line.startswith("#"): continue + # Parse line into key, value match = re.match(self.SETTINGS_REGEX, line) if not match: - raise Exception("Unparsable line {}".format(line)) + raise ConfigParseError("Unparsable line {}".format(line)) key = match.group(1).lower() value = match.group(2) - if key == "host": - self._config.append(host) - host = {"host": self._get_hosts(value), "config": {}} + # Host keyword triggers switch to new block/context + if key in ("host", "match"): + self._config.append(context) + context = {"config": {}} + if key == "host": + # TODO 3.0: make these real objects or at least name this + # "hosts" to acknowledge it's an iterable. (Doing so prior + # to 3.0, despite it being a private API, feels bad - + # surely such an old codebase has folks actually relying on + # these keys.) + context["host"] = self._get_hosts(value) + else: + context["matches"] = self._get_matches(value) + # Special-case for noop ProxyCommands elif key == "proxycommand" and value.lower() == "none": # Store 'none' as None; prior to 3.x, it will get stripped out # at the end (for compatibility with issue #415). After 3.x, it # will simply not get stripped, leaving a nice explicit marker. - host["config"][key] = None + context["config"][key] = None + # All other keywords get stored, directly or via append else: if value.startswith('"') and value.endswith('"'): value = value[1:-1] @@ -135,13 +170,14 @@ class SSHConfig(object): # cases, since they are allowed to be specified multiple times # and they should be tried in order of specification. if key in ["identityfile", "localforward", "remoteforward"]: - if key in host["config"]: - host["config"][key].append(value) + if key in context["config"]: + context["config"][key].append(value) else: - host["config"][key] = [value] - elif key not in host["config"]: - host["config"][key] = value - self._config.append(host) + context["config"][key] = [value] + elif key not in context["config"]: + context["config"][key] = value + # Store last 'open' block and we're done + self._config.append(context) def lookup(self, hostname): """ @@ -149,9 +185,9 @@ class SSHConfig(object): The host-matching rules of OpenSSH's ``ssh_config`` man page are used: For each parameter, the first obtained value will be used. The - configuration files contain sections separated by ``Host`` - specifications, and that section is only applied for hosts that match - one of the patterns given in the specification. + configuration files contain sections separated by ``Host`` and/or + ``Match`` specifications, and that section is only applied for hosts + which match the given patterns or keywords Since the first obtained value for each parameter is used, more host- specific declarations should be given near the beginning of the file, @@ -168,15 +204,26 @@ class SSHConfig(object): assert conf['passwordauthentication'] == 'yes' assert conf.as_bool('passwordauthentication') is True + .. note:: + If there is no explicitly configured ``HostName`` value, it will be + set to the being-looked-up hostname, which is as close as we can + get to OpenSSH's behavior around that particular option. + :param str hostname: the hostname to lookup .. versionchanged:: 2.5 Returns `SSHConfigDict` objects instead of dict literals. .. versionchanged:: 2.7 Added canonicalization support. + .. versionchanged:: 2.7 + Added ``Match`` support. """ # First pass options = self._lookup(hostname=hostname) + # Inject HostName if it was not set (this used to be done incidentally + # during tokenization, for some reason). + if "hostname" not in options: + options["hostname"] = hostname # Handle canonicalization canon = options.get("canonicalizehostname", None) in ("yes", "always") maxdots = int(options.get("canonicalizemaxdots", 1)) @@ -185,21 +232,26 @@ class SSHConfig(object): # implementation for CanonicalDomains is 'split on any whitespace'. domains = options["canonicaldomains"].split() hostname = self.canonicalize(hostname, options, domains) + # Overwrite HostName again here (this is also what OpenSSH does) options["hostname"] = hostname - options = self._lookup(hostname, options) + options = self._lookup(hostname, options, canonical=True) return options - def _lookup(self, hostname, options=None): - matches = [ - config - for config in self._config - if self._allowed(config["host"], hostname) - ] - + def _lookup(self, hostname, options=None, canonical=False): + # Init if options is None: options = SSHConfigDict() - for match in matches: - for key, value in match["config"].items(): + # Iterate all stanzas, applying any that match, in turn (so that things + # like Match can reference currently understood state) + for context in self._config: + if not ( + self._pattern_matches(context.get("host", []), hostname) + or self._does_match( + context.get("matches", []), hostname, canonical, options + ) + ): + continue + for key, value in context["config"].items(): if key not in options: # Create a copy of the original value, # else it will reference the original list @@ -210,6 +262,8 @@ class SSHConfig(object): options[key].extend( x for x in value if x not in options[key] ) + # Expand variables in resulting values (besides 'Match exec' which was + # already handled above) options = self._expand_variables(options, hostname) # TODO: remove in 3.x re #670 if "proxycommand" in options and options["proxycommand"] is None: @@ -267,86 +321,176 @@ class SSHConfig(object): hosts.update(entry["host"]) return hosts - def _allowed(self, hosts, hostname): + def _pattern_matches(self, patterns, target): + # Convenience auto-splitter if not already a list + if hasattr(patterns, "split"): + patterns = patterns.split(",") match = False - for host in hosts: - if host.startswith("!") and fnmatch.fnmatch(hostname, host[1:]): + for pattern in patterns: + # Short-circuit if target matches a negated pattern + if pattern.startswith("!") and fnmatch.fnmatch( + target, pattern[1:] + ): return False - elif fnmatch.fnmatch(hostname, host): + # Flag a match, but continue (in case of later negation) if regular + # match occurs + elif fnmatch.fnmatch(target, pattern): match = True return match - def _expand_variables(self, config, hostname): + # TODO 3.0: remove entirely (is now unused internally) + def _allowed(self, hosts, hostname): + return self._pattern_matches(hosts, hostname) + + def _does_match(self, match_list, target_hostname, canonical, options): + matched = [] + candidates = match_list[:] + local_username = getpass.getuser() + while candidates: + candidate = candidates.pop(0) + # Obtain latest host/user value every loop, so later Match may + # reference values assigned within a prior Match. + configured_host = options.get("hostname", None) + configured_user = options.get("user", None) + type_, param = candidate["type"], candidate["param"] + # Canonical is a hard pass/fail based on whether this is a + # canonicalized re-lookup. + if type_ == "canonical": + if self._should_fail(canonical, candidate): + return False + # The parse step ensures we only see this by itself or after + # canonical, so it's also an easy hard pass. (No negation here as + # that would be uh, pretty weird?) + if type_ == "all": + return True + # From here, we are testing various non-hard criteria, + # short-circuiting only on fail + if type_ == "host": + hostval = configured_host or target_hostname + passed = self._pattern_matches(param, hostval) + if self._should_fail(passed, candidate): + return False + if type_ == "originalhost": + passed = self._pattern_matches(param, target_hostname) + if self._should_fail(passed, candidate): + return False + if type_ == "user": + user = configured_user or local_username + passed = self._pattern_matches(param, user) + if self._should_fail(passed, candidate): + return False + if type_ == "localuser": + passed = self._pattern_matches(param, local_username) + if self._should_fail(passed, candidate): + return False + if type_ == "exec": + exec_cmd = self._tokenize( + options, target_hostname, "match-exec", param + ) + # Like OpenSSH, we 'redirect' stdout but let stderr bubble up + passed = invoke.run(exec_cmd, hide="stdout", warn=True).ok + if self._should_fail(passed, candidate): + return False + # Made it all the way here? Everything matched! + matched.append(candidate) + # Did anything match? (To be treated as bool, usually.) + return matched + + def _should_fail(self, would_pass, candidate): + return would_pass if candidate["negate"] else not would_pass + + def _tokenize(self, config, target_hostname, key, value): """ - Return a dict of config options with expanded substitutions - for a given hostname. + Tokenize a string based on current config/hostname data. - Please refer to man ``ssh_config`` for the parameters that - are replaced. + :param config: Current config data. + :param target_hostname: Original target connection hostname. + :param key: Config key being tokenized (used to filter token list). + :param value: Config value being tokenized. - :param dict config: the config for the hostname - :param str hostname: the hostname that the config belongs to + :returns: The tokenized version of the input ``value`` string. """ - + allowed_tokens = self._allowed_tokens(key) + # Short-circuit if no tokenization possible + if not allowed_tokens: + return value + # Obtain potentially configured (and even possibly itself tokenized) + # hostname, for use with %h in other values. + configured_hostname = target_hostname if "hostname" in config: - config["hostname"] = config["hostname"].replace("%h", hostname) - else: - config["hostname"] = hostname - + configured_hostname = config["hostname"].replace( + "%h", target_hostname + ) + # Ditto the rest of the source values if "port" in config: port = config["port"] else: port = SSH_PORT - - user = os.getenv("USER") + user = getpass.getuser() if "user" in config: remoteuser = config["user"] else: remoteuser = user - - host = socket.gethostname().split(".")[0] - fqdn = LazyFqdn(config, host) + local_hostname = socket.gethostname().split(".")[0] + local_fqdn = LazyFqdn(config, local_hostname) homedir = os.path.expanduser("~") + # The actual tokens! replacements = { - "controlpath": [ - ("%h", config["hostname"]), - ("%l", fqdn), - ("%L", host), - ("%n", hostname), - ("%p", port), - ("%r", remoteuser), - ("%u", user), - ], - "identityfile": [ - ("~", homedir), - ("%d", homedir), - ("%h", config["hostname"]), - ("%l", fqdn), - ("%u", user), - ("%r", remoteuser), - ], - "proxycommand": [ - ("~", homedir), - ("%h", config["hostname"]), - ("%p", port), - ("%r", remoteuser), - ], + # TODO: %%??? + # TODO: %C? + "%d": homedir, + "%h": configured_hostname, + # TODO: %i? + "%L": local_hostname, + "%l": local_fqdn, + # also this is pseudo buggy when not in Match exec mode so document + # that. also WHY is that the case?? don't we do all of this late? + "%n": target_hostname, + "%p": port, + "%r": remoteuser, + # TODO: %T? don't believe this is possible however + "%u": user, + "~": homedir, } + # Do the thing with the stuff + tokenized = value + for find, replace in replacements.items(): + if find not in allowed_tokens: + continue + tokenized = tokenized.replace(find, str(replace)) + # TODO: log? eg that value -> tokenized + return tokenized + def _allowed_tokens(self, key): + """ + Given config ``key``, return list of token strings to tokenize. + + .. note:: + This feels like it wants to eventually go away, but is used to + preserve as-strict-as-possible compatibility with OpenSSH, which + for whatever reason only applies some tokens to some config keys. + """ + return self.TOKENS_BY_CONFIG_KEY.get(key, []) + + def _expand_variables(self, config, target_hostname): + """ + Return a dict of config options with expanded substitutions + for a given original & current target hostname. + + Please refer to :doc:`/api/config` for details. + + :param dict config: the currently parsed config + :param str hostname: the hostname whose config is being looked up + """ for k in config: if config[k] is None: continue - if k in replacements: - for find, replace in replacements[k]: - if isinstance(config[k], list): - for item in range(len(config[k])): - if find in config[k][item]: - config[k][item] = config[k][item].replace( - find, str(replace) - ) - else: - if find in config[k]: - config[k] = config[k].replace(find, str(replace)) + tokenizer = partial(self._tokenize, config, target_hostname, k) + if isinstance(config[k], list): + for i, value in enumerate(config[k]): + config[k][i] = tokenizer(value) + else: + config[k] = tokenizer(config[k]) return config def _get_hosts(self, host): @@ -356,7 +500,51 @@ class SSHConfig(object): try: return shlex.split(host) except ValueError: - raise Exception("Unparsable host {}".format(host)) + raise ConfigParseError("Unparsable host {}".format(host)) + + def _get_matches(self, match): + """ + Parse a specific Match config line into a list-of-dicts for its values. + + Performs some parse-time validation as well. + """ + matches = [] + tokens = shlex.split(match) + while tokens: + match = {"type": None, "param": None, "negate": False} + type_ = tokens.pop(0) + # Handle per-keyword negation + if type_.startswith("!"): + match["negate"] = True + type_ = type_[1:] + match["type"] = type_ + # all/canonical have no params (everything else does) + if type_ in ("all", "canonical"): + matches.append(match) + continue + if not tokens: + raise ConfigParseError( + "Missing parameter to Match '{}' keyword".format(type_) + ) + match["param"] = tokens.pop(0) + matches.append(match) + # Perform some (easier to do now than in the middle) validation that is + # better handled here than at lookup time. + keywords = [x["type"] for x in matches] + if "all" in keywords: + allowable = ("all", "canonical") + ok, bad = ( + list(filter(lambda x: x in allowable, keywords)), + list(filter(lambda x: x not in allowable, keywords)), + ) + err = None + if any(bad): + err = "Match does not allow 'all' mixed with anything but 'canonical'" # noqa + elif "canonical" in ok and ok.index("canonical") > ok.index("all"): + err = "Match does not allow 'all' before 'canonical'" + if err is not None: + raise ConfigParseError(err) + return matches def _addressfamily_host_lookup(hostname, options): |