summaryrefslogtreecommitdiff
path: root/paramiko/config.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2019-08-27 14:20:27 -0400
committerJeff Forcier <jeff@bitprophet.org>2019-09-27 14:17:36 -0500
commit4c4de253e3909adb99505b6723c58c23d64f7988 (patch)
treef14352dbfc135fd781360f0a909342079a82f9c1 /paramiko/config.py
parentb1bbacdcc4f0be50b8fe584f329d344fb13544bd (diff)
downloadparamiko-4c4de253e3909adb99505b6723c58c23d64f7988.tar.gz
Implement ssh_config hostname canonicalization (WIP)
- Refactor DNS lookup related junk previously only relevant to %h - Refactor guts of lookup() so it can be done >1 time - Changelog/tests/implementation for canonicalization itself Closes #897
Diffstat (limited to 'paramiko/config.py')
-rw-r--r--paramiko/config.py149
1 files changed, 116 insertions, 33 deletions
diff --git a/paramiko/config.py b/paramiko/config.py
index f9ea02dc..6430f1e0 100644
--- a/paramiko/config.py
+++ b/paramiko/config.py
@@ -29,6 +29,8 @@ import socket
from .py3compat import StringIO
+from .ssh_exception import CouldNotCanonicalize
+
SSH_PORT = 22
@@ -170,29 +172,90 @@ class SSHConfig(object):
.. versionchanged:: 2.5
Returns `SSHConfigDict` objects instead of dict literals.
+ .. versionchanged:: 2.7
+ Added canonicalization support.
"""
+ # First pass
+ options = self._lookup(hostname=hostname)
+ # Handle canonicalization
+ canon = options.get("canonicalizehostname", None) in ("yes", "always")
+ maxdots = int(options.get("canonicalizemaxdots", 1))
+ if canon and hostname.count(".") <= maxdots:
+ # NOTE: OpenSSH manpage does not explicitly state this, but its
+ # implementation for CanonicalDomains is 'split on any whitespace'.
+ domains = options["canonicaldomains"].split()
+ hostname = self.canonicalize(hostname, options, domains)
+ options["hostname"] = hostname
+ options = self._lookup(hostname, options)
+ return options
+
+ def _lookup(self, hostname, options=None):
matches = [
config
for config in self._config
if self._allowed(config["host"], hostname)
]
- ret = SSHConfigDict()
+ if options is None:
+ options = SSHConfigDict()
for match in matches:
for key, value in match["config"].items():
- if key not in ret:
+ if key not in options:
# Create a copy of the original value,
# else it will reference the original list
# in self._config and update that value too
# when the extend() is being called.
- ret[key] = value[:] if value is not None else value
+ options[key] = value[:] if value is not None else value
elif key == "identityfile":
- ret[key].extend(value)
- ret = self._expand_variables(ret, hostname)
+ options[key].extend(
+ x for x in value if x not in options[key]
+ )
+ options = self._expand_variables(options, hostname)
# TODO: remove in 3.x re #670
- if "proxycommand" in ret and ret["proxycommand"] is None:
- del ret["proxycommand"]
- return ret
+ if "proxycommand" in options and options["proxycommand"] is None:
+ del options["proxycommand"]
+ return options
+
+ def canonicalize(self, hostname, options, domains):
+ """
+ Return canonicalized version of ``hostname``.
+
+ :param str hostname: Target hostname.
+ :param options: An `SSHConfigDict` from a previous lookup pass.
+ :param list domains: List of domains (e.g. ``["paramiko.org"]``).
+
+ :returns: A canonicalized hostname if one was found, else ``None``.
+
+ .. versionadded:: 2.7
+ """
+ found = False
+ for domain in domains:
+ candidate = "{}.{}".format(hostname, domain)
+ family_specific = _addressfamily_host_lookup(candidate, options)
+ if family_specific is not None:
+ # TODO: would we want to dig deeper into other results? e.g. to
+ # find something that satisfies PermittedCNAMEs when that is
+ # implemented?
+ found = family_specific[0]
+ else:
+ # TODO: what does ssh use here and is there a reason to use
+ # that instead of gethostbyname?
+ try:
+ found = socket.gethostbyname(candidate)
+ except socket.gaierror:
+ pass
+ if found:
+ # TODO: follow CNAME (implied by found != candidate?) if
+ # CanonicalizePermittedCNAMEs allows it
+ return candidate
+ # If we got here, it means canonicalization failed.
+ # When CanonicalizeFallbackLocal is undefined or 'yes', we just spit
+ # back the original hostname.
+ if options.get("canonicalizefallbacklocal", "yes") == "yes":
+ return hostname
+ # And here, we failed AND fallback was set to a non-yes value, so we
+ # need to get mad.
+ raise CouldNotCanonicalize(hostname)
def get_hostnames(self):
"""
@@ -296,6 +359,43 @@ class SSHConfig(object):
raise Exception("Unparsable host {}".format(host))
+def _addressfamily_host_lookup(hostname, options):
+ """
+ Try looking up ``hostname`` in an IPv4 or IPv6 specific manner.
+
+ This is an odd duck due to needing use in two divergent use cases. It looks
+ up ``AddressFamily`` in ``options`` and if it is ``inet`` or ``inet6``,
+ this function uses `socket.getaddrinfo` to perform a family-specific
+ lookup, returning the result if successful.
+
+ In any other situation -- lookup failure, or ``AddressFamily`` being
+ unspecified or ``any`` -- ``None`` is returned instead and the caller is
+ expected to do something situation-appropriate like calling
+ `socket.gethostbyname`.
+
+ :param str hostname: Hostname to look up.
+ :param options: `SSHConfigDict` instance w/ parsed options.
+ :returns: ``getaddrinfo``-style tuples, or ``None``, depending.
+ """
+ address_family = options.get("addressfamily", "any").lower()
+ if address_family == "any":
+ return
+ try:
+ family = socket.AF_INET6
+ if address_family == "inet":
+ family = socket.AF_INET
+ return socket.getaddrinfo(
+ hostname,
+ None,
+ family,
+ socket.SOCK_DGRAM,
+ socket.IPPROTO_IP,
+ socket.AI_CANONNAME,
+ )
+ except socket.gaierror:
+ pass
+
+
class LazyFqdn(object):
"""
Returns the host's fqdn on request as string.
@@ -319,31 +419,14 @@ class LazyFqdn(object):
# Handle specific option
fqdn = None
- address_family = self.config.get("addressfamily", "any").lower()
- if address_family != "any":
- try:
- family = socket.AF_INET6
- if address_family == "inet":
- socket.AF_INET
- results = socket.getaddrinfo(
- self.host,
- None,
- family,
- socket.SOCK_DGRAM,
- socket.IPPROTO_IP,
- socket.AI_CANONNAME,
- )
- for res in results:
- af, socktype, proto, canonname, sa = res
- if canonname and "." in canonname:
- fqdn = canonname
- break
- # giaerror -> socket.getaddrinfo() can't resolve self.host
- # (which is from socket.gethostname()). Fall back to the
- # getfqdn() call below.
- except socket.gaierror:
- pass
- # Handle 'any' / unspecified
+ results = _addressfamily_host_lookup(self.host, self.config)
+ if results is not None:
+ for res in results:
+ af, socktype, proto, canonname, sa = res
+ if canonname and "." in canonname:
+ fqdn = canonname
+ break
+ # Handle 'any' / unspecified / lookup failure
if fqdn is None:
fqdn = socket.getfqdn()
# Cache