summaryrefslogtreecommitdiff
path: root/paramiko/config.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2019-09-30 12:23:00 -0400
committerJeff Forcier <jeff@bitprophet.org>2019-12-02 21:06:53 -0500
commit004462b40ea156b783456463b042a8f71bd22d1e (patch)
tree7de6b49927dbbc86455461d532a3a98e21f0adcd /paramiko/config.py
parentc99388364bb840677e9ea27c7755f4a0af621e1b (diff)
downloadparamiko-004462b40ea156b783456463b042a8f71bd22d1e.tar.gz
Base case re #717 works now.
Huge ass squashed commit because I was experimenting with "commit entire feature at once so you do not leave broken tests around to break bisecting". Not sure it's worth it, at least not for large-ish, overhauling-existing-code feature adds. Breaking the work up over months did not help either, L M A O
Diffstat (limited to 'paramiko/config.py')
-rw-r--r--paramiko/config.py352
1 files changed, 270 insertions, 82 deletions
diff --git a/paramiko/config.py b/paramiko/config.py
index 5336454c..b668be69 100644
--- a/paramiko/config.py
+++ b/paramiko/config.py
@@ -22,14 +22,22 @@ Configuration file (aka ``ssh_config``) support.
"""
import fnmatch
+import getpass
import os
import re
import shlex
import socket
+from functools import partial
from .py3compat import StringIO
-from .ssh_exception import CouldNotCanonicalize
+invoke, invoke_import_error = None, None
+try:
+ import invoke
+except ImportError as e:
+ invoke_import_error = e
+
+from .ssh_exception import CouldNotCanonicalize, ConfigParseError
SSH_PORT = 22
@@ -48,6 +56,17 @@ class SSHConfig(object):
SETTINGS_REGEX = re.compile(r"(\w+)(?:\s*=\s*|\s+)(.+)")
+ # TODO: do a full scan of ssh.c & friends to make sure we're fully
+ # compatible across the board, e.g. OpenSSH 8.1 added %n to ProxyCommand.
+ TOKENS_BY_CONFIG_KEY = {
+ "controlpath": ["%h", "%l", "%L", "%n", "%p", "%r", "%u"],
+ "identityfile": ["~", "%d", "%h", "%l", "%u", "%r"],
+ "proxycommand": ["~", "%h", "%p", "%r"],
+ # Doesn't seem worth making this 'special' for now, it will fit well
+ # enough (no actual match-exec config key to be confused with).
+ "match-exec": ["%d", "%h", "%L", "%l", "%n", "%p", "%r", "%u"],
+ }
+
def __init__(self):
"""
Create a new OpenSSH config object.
@@ -105,28 +124,44 @@ class SSHConfig(object):
:param file_obj: a file-like object to read the config file from
"""
- host = {"host": ["*"], "config": {}}
+ # Start out w/ implicit/anonymous global host-like block to hold
+ # anything not contained by an explicit one.
+ context = {"host": ["*"], "config": {}}
for line in file_obj:
# Strip any leading or trailing whitespace from the line.
# Refer to https://github.com/paramiko/paramiko/issues/499
line = line.strip()
+ # Skip blanks, comments
if not line or line.startswith("#"):
continue
+ # Parse line into key, value
match = re.match(self.SETTINGS_REGEX, line)
if not match:
- raise Exception("Unparsable line {}".format(line))
+ raise ConfigParseError("Unparsable line {}".format(line))
key = match.group(1).lower()
value = match.group(2)
- if key == "host":
- self._config.append(host)
- host = {"host": self._get_hosts(value), "config": {}}
+ # Host keyword triggers switch to new block/context
+ if key in ("host", "match"):
+ self._config.append(context)
+ context = {"config": {}}
+ if key == "host":
+ # TODO 3.0: make these real objects or at least name this
+ # "hosts" to acknowledge it's an iterable. (Doing so prior
+ # to 3.0, despite it being a private API, feels bad -
+ # surely such an old codebase has folks actually relying on
+ # these keys.)
+ context["host"] = self._get_hosts(value)
+ else:
+ context["matches"] = self._get_matches(value)
+ # Special-case for noop ProxyCommands
elif key == "proxycommand" and value.lower() == "none":
# Store 'none' as None; prior to 3.x, it will get stripped out
# at the end (for compatibility with issue #415). After 3.x, it
# will simply not get stripped, leaving a nice explicit marker.
- host["config"][key] = None
+ context["config"][key] = None
+ # All other keywords get stored, directly or via append
else:
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
@@ -135,13 +170,14 @@ class SSHConfig(object):
# cases, since they are allowed to be specified multiple times
# and they should be tried in order of specification.
if key in ["identityfile", "localforward", "remoteforward"]:
- if key in host["config"]:
- host["config"][key].append(value)
+ if key in context["config"]:
+ context["config"][key].append(value)
else:
- host["config"][key] = [value]
- elif key not in host["config"]:
- host["config"][key] = value
- self._config.append(host)
+ context["config"][key] = [value]
+ elif key not in context["config"]:
+ context["config"][key] = value
+ # Store last 'open' block and we're done
+ self._config.append(context)
def lookup(self, hostname):
"""
@@ -149,9 +185,9 @@ class SSHConfig(object):
The host-matching rules of OpenSSH's ``ssh_config`` man page are used:
For each parameter, the first obtained value will be used. The
- configuration files contain sections separated by ``Host``
- specifications, and that section is only applied for hosts that match
- one of the patterns given in the specification.
+ configuration files contain sections separated by ``Host`` and/or
+ ``Match`` specifications, and that section is only applied for hosts
+ which match the given patterns or keywords
Since the first obtained value for each parameter is used, more host-
specific declarations should be given near the beginning of the file,
@@ -168,15 +204,26 @@ class SSHConfig(object):
assert conf['passwordauthentication'] == 'yes'
assert conf.as_bool('passwordauthentication') is True
+ .. note::
+ If there is no explicitly configured ``HostName`` value, it will be
+ set to the being-looked-up hostname, which is as close as we can
+ get to OpenSSH's behavior around that particular option.
+
:param str hostname: the hostname to lookup
.. versionchanged:: 2.5
Returns `SSHConfigDict` objects instead of dict literals.
.. versionchanged:: 2.7
Added canonicalization support.
+ .. versionchanged:: 2.7
+ Added ``Match`` support.
"""
# First pass
options = self._lookup(hostname=hostname)
+ # Inject HostName if it was not set (this used to be done incidentally
+ # during tokenization, for some reason).
+ if "hostname" not in options:
+ options["hostname"] = hostname
# Handle canonicalization
canon = options.get("canonicalizehostname", None) in ("yes", "always")
maxdots = int(options.get("canonicalizemaxdots", 1))
@@ -185,21 +232,26 @@ class SSHConfig(object):
# implementation for CanonicalDomains is 'split on any whitespace'.
domains = options["canonicaldomains"].split()
hostname = self.canonicalize(hostname, options, domains)
+ # Overwrite HostName again here (this is also what OpenSSH does)
options["hostname"] = hostname
- options = self._lookup(hostname, options)
+ options = self._lookup(hostname, options, canonical=True)
return options
- def _lookup(self, hostname, options=None):
- matches = [
- config
- for config in self._config
- if self._allowed(config["host"], hostname)
- ]
-
+ def _lookup(self, hostname, options=None, canonical=False):
+ # Init
if options is None:
options = SSHConfigDict()
- for match in matches:
- for key, value in match["config"].items():
+ # Iterate all stanzas, applying any that match, in turn (so that things
+ # like Match can reference currently understood state)
+ for context in self._config:
+ if not (
+ self._pattern_matches(context.get("host", []), hostname)
+ or self._does_match(
+ context.get("matches", []), hostname, canonical, options
+ )
+ ):
+ continue
+ for key, value in context["config"].items():
if key not in options:
# Create a copy of the original value,
# else it will reference the original list
@@ -210,6 +262,8 @@ class SSHConfig(object):
options[key].extend(
x for x in value if x not in options[key]
)
+ # Expand variables in resulting values (besides 'Match exec' which was
+ # already handled above)
options = self._expand_variables(options, hostname)
# TODO: remove in 3.x re #670
if "proxycommand" in options and options["proxycommand"] is None:
@@ -267,86 +321,176 @@ class SSHConfig(object):
hosts.update(entry["host"])
return hosts
- def _allowed(self, hosts, hostname):
+ def _pattern_matches(self, patterns, target):
+ # Convenience auto-splitter if not already a list
+ if hasattr(patterns, "split"):
+ patterns = patterns.split(",")
match = False
- for host in hosts:
- if host.startswith("!") and fnmatch.fnmatch(hostname, host[1:]):
+ for pattern in patterns:
+ # Short-circuit if target matches a negated pattern
+ if pattern.startswith("!") and fnmatch.fnmatch(
+ target, pattern[1:]
+ ):
return False
- elif fnmatch.fnmatch(hostname, host):
+ # Flag a match, but continue (in case of later negation) if regular
+ # match occurs
+ elif fnmatch.fnmatch(target, pattern):
match = True
return match
- def _expand_variables(self, config, hostname):
+ # TODO 3.0: remove entirely (is now unused internally)
+ def _allowed(self, hosts, hostname):
+ return self._pattern_matches(hosts, hostname)
+
+ def _does_match(self, match_list, target_hostname, canonical, options):
+ matched = []
+ candidates = match_list[:]
+ local_username = getpass.getuser()
+ while candidates:
+ candidate = candidates.pop(0)
+ # Obtain latest host/user value every loop, so later Match may
+ # reference values assigned within a prior Match.
+ configured_host = options.get("hostname", None)
+ configured_user = options.get("user", None)
+ type_, param = candidate["type"], candidate["param"]
+ # Canonical is a hard pass/fail based on whether this is a
+ # canonicalized re-lookup.
+ if type_ == "canonical":
+ if self._should_fail(canonical, candidate):
+ return False
+ # The parse step ensures we only see this by itself or after
+ # canonical, so it's also an easy hard pass. (No negation here as
+ # that would be uh, pretty weird?)
+ if type_ == "all":
+ return True
+ # From here, we are testing various non-hard criteria,
+ # short-circuiting only on fail
+ if type_ == "host":
+ hostval = configured_host or target_hostname
+ passed = self._pattern_matches(param, hostval)
+ if self._should_fail(passed, candidate):
+ return False
+ if type_ == "originalhost":
+ passed = self._pattern_matches(param, target_hostname)
+ if self._should_fail(passed, candidate):
+ return False
+ if type_ == "user":
+ user = configured_user or local_username
+ passed = self._pattern_matches(param, user)
+ if self._should_fail(passed, candidate):
+ return False
+ if type_ == "localuser":
+ passed = self._pattern_matches(param, local_username)
+ if self._should_fail(passed, candidate):
+ return False
+ if type_ == "exec":
+ exec_cmd = self._tokenize(
+ options, target_hostname, "match-exec", param
+ )
+ # Like OpenSSH, we 'redirect' stdout but let stderr bubble up
+ passed = invoke.run(exec_cmd, hide="stdout", warn=True).ok
+ if self._should_fail(passed, candidate):
+ return False
+ # Made it all the way here? Everything matched!
+ matched.append(candidate)
+ # Did anything match? (To be treated as bool, usually.)
+ return matched
+
+ def _should_fail(self, would_pass, candidate):
+ return would_pass if candidate["negate"] else not would_pass
+
+ def _tokenize(self, config, target_hostname, key, value):
"""
- Return a dict of config options with expanded substitutions
- for a given hostname.
+ Tokenize a string based on current config/hostname data.
- Please refer to man ``ssh_config`` for the parameters that
- are replaced.
+ :param config: Current config data.
+ :param target_hostname: Original target connection hostname.
+ :param key: Config key being tokenized (used to filter token list).
+ :param value: Config value being tokenized.
- :param dict config: the config for the hostname
- :param str hostname: the hostname that the config belongs to
+ :returns: The tokenized version of the input ``value`` string.
"""
-
+ allowed_tokens = self._allowed_tokens(key)
+ # Short-circuit if no tokenization possible
+ if not allowed_tokens:
+ return value
+ # Obtain potentially configured (and even possibly itself tokenized)
+ # hostname, for use with %h in other values.
+ configured_hostname = target_hostname
if "hostname" in config:
- config["hostname"] = config["hostname"].replace("%h", hostname)
- else:
- config["hostname"] = hostname
-
+ configured_hostname = config["hostname"].replace(
+ "%h", target_hostname
+ )
+ # Ditto the rest of the source values
if "port" in config:
port = config["port"]
else:
port = SSH_PORT
-
- user = os.getenv("USER")
+ user = getpass.getuser()
if "user" in config:
remoteuser = config["user"]
else:
remoteuser = user
-
- host = socket.gethostname().split(".")[0]
- fqdn = LazyFqdn(config, host)
+ local_hostname = socket.gethostname().split(".")[0]
+ local_fqdn = LazyFqdn(config, local_hostname)
homedir = os.path.expanduser("~")
+ # The actual tokens!
replacements = {
- "controlpath": [
- ("%h", config["hostname"]),
- ("%l", fqdn),
- ("%L", host),
- ("%n", hostname),
- ("%p", port),
- ("%r", remoteuser),
- ("%u", user),
- ],
- "identityfile": [
- ("~", homedir),
- ("%d", homedir),
- ("%h", config["hostname"]),
- ("%l", fqdn),
- ("%u", user),
- ("%r", remoteuser),
- ],
- "proxycommand": [
- ("~", homedir),
- ("%h", config["hostname"]),
- ("%p", port),
- ("%r", remoteuser),
- ],
+ # TODO: %%???
+ # TODO: %C?
+ "%d": homedir,
+ "%h": configured_hostname,
+ # TODO: %i?
+ "%L": local_hostname,
+ "%l": local_fqdn,
+ # also this is pseudo buggy when not in Match exec mode so document
+ # that. also WHY is that the case?? don't we do all of this late?
+ "%n": target_hostname,
+ "%p": port,
+ "%r": remoteuser,
+ # TODO: %T? don't believe this is possible however
+ "%u": user,
+ "~": homedir,
}
+ # Do the thing with the stuff
+ tokenized = value
+ for find, replace in replacements.items():
+ if find not in allowed_tokens:
+ continue
+ tokenized = tokenized.replace(find, str(replace))
+ # TODO: log? eg that value -> tokenized
+ return tokenized
+ def _allowed_tokens(self, key):
+ """
+ Given config ``key``, return list of token strings to tokenize.
+
+ .. note::
+ This feels like it wants to eventually go away, but is used to
+ preserve as-strict-as-possible compatibility with OpenSSH, which
+ for whatever reason only applies some tokens to some config keys.
+ """
+ return self.TOKENS_BY_CONFIG_KEY.get(key, [])
+
+ def _expand_variables(self, config, target_hostname):
+ """
+ Return a dict of config options with expanded substitutions
+ for a given original & current target hostname.
+
+ Please refer to :doc:`/api/config` for details.
+
+ :param dict config: the currently parsed config
+ :param str hostname: the hostname whose config is being looked up
+ """
for k in config:
if config[k] is None:
continue
- if k in replacements:
- for find, replace in replacements[k]:
- if isinstance(config[k], list):
- for item in range(len(config[k])):
- if find in config[k][item]:
- config[k][item] = config[k][item].replace(
- find, str(replace)
- )
- else:
- if find in config[k]:
- config[k] = config[k].replace(find, str(replace))
+ tokenizer = partial(self._tokenize, config, target_hostname, k)
+ if isinstance(config[k], list):
+ for i, value in enumerate(config[k]):
+ config[k][i] = tokenizer(value)
+ else:
+ config[k] = tokenizer(config[k])
return config
def _get_hosts(self, host):
@@ -356,7 +500,51 @@ class SSHConfig(object):
try:
return shlex.split(host)
except ValueError:
- raise Exception("Unparsable host {}".format(host))
+ raise ConfigParseError("Unparsable host {}".format(host))
+
+ def _get_matches(self, match):
+ """
+ Parse a specific Match config line into a list-of-dicts for its values.
+
+ Performs some parse-time validation as well.
+ """
+ matches = []
+ tokens = shlex.split(match)
+ while tokens:
+ match = {"type": None, "param": None, "negate": False}
+ type_ = tokens.pop(0)
+ # Handle per-keyword negation
+ if type_.startswith("!"):
+ match["negate"] = True
+ type_ = type_[1:]
+ match["type"] = type_
+ # all/canonical have no params (everything else does)
+ if type_ in ("all", "canonical"):
+ matches.append(match)
+ continue
+ if not tokens:
+ raise ConfigParseError(
+ "Missing parameter to Match '{}' keyword".format(type_)
+ )
+ match["param"] = tokens.pop(0)
+ matches.append(match)
+ # Perform some (easier to do now than in the middle) validation that is
+ # better handled here than at lookup time.
+ keywords = [x["type"] for x in matches]
+ if "all" in keywords:
+ allowable = ("all", "canonical")
+ ok, bad = (
+ list(filter(lambda x: x in allowable, keywords)),
+ list(filter(lambda x: x not in allowable, keywords)),
+ )
+ err = None
+ if any(bad):
+ err = "Match does not allow 'all' mixed with anything but 'canonical'" # noqa
+ elif "canonical" in ok and ok.index("canonical") > ok.index("all"):
+ err = "Match does not allow 'all' before 'canonical'"
+ if err is not None:
+ raise ConfigParseError(err)
+ return matches
def _addressfamily_host_lookup(hostname, options):