summaryrefslogtreecommitdiff
path: root/paramiko/auth_handler.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2021-12-13 15:55:36 -0500
committerJeff Forcier <jeff@bitprophet.org>2021-12-23 00:31:01 -0500
commit363a28d94cada17f012c1604a3c99c71a2bda003 (patch)
tree6979a1d39ced84c3b29d366a0026db5fd9a62851 /paramiko/auth_handler.py
parentdfffaeaa0170c784307d1c89dad60528a59b6ff2 (diff)
downloadparamiko-363a28d94cada17f012c1604a3c99c71a2bda003.tar.gz
Add support for RSA SHA2 host and public keys
Includes a handful of refactors and new semiprivate attributes on Transport and AuthHandler for better test visibility.
Diffstat (limited to 'paramiko/auth_handler.py')
-rw-r--r--paramiko/auth_handler.py81
1 files changed, 63 insertions, 18 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index 011e57f3..845b9143 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -61,7 +61,7 @@ from paramiko.common import (
cMSG_USERAUTH_BANNER,
)
from paramiko.message import Message
-from paramiko.py3compat import b
+from paramiko.py3compat import b, u
from paramiko.ssh_exception import (
SSHException,
AuthenticationException,
@@ -206,6 +206,23 @@ class AuthHandler(object):
self.transport._send_message(m)
self.transport.close()
+ def _get_algorithm_and_bits(self, key):
+ """
+ Given any key, return appropriate signing algorithm & bits-to-sign.
+
+ Intended for input to or verification of, key signatures.
+ """
+ key_type, bits = None, None
+ # Use certificate contents, if available, plain pubkey otherwise
+ if key.public_blob:
+ key_type = key.public_blob.key_type
+ bits = key.public_blob.key_blob
+ else:
+ key_type = key.get_name()
+ bits = key
+ algorithm = self._finalize_pubkey_algorithm(key_type)
+ return algorithm, bits
+
def _get_session_blob(self, key, service, username):
m = Message()
m.add_string(self.transport.session_id)
@@ -214,13 +231,9 @@ class AuthHandler(object):
m.add_string(service)
m.add_string("publickey")
m.add_boolean(True)
- # Use certificate contents, if available, plain pubkey otherwise
- if key.public_blob:
- m.add_string(key.public_blob.key_type)
- m.add_string(key.public_blob.key_blob)
- else:
- m.add_string(key.get_name())
- m.add_string(key)
+ algorithm, bits = self._get_algorithm_and_bits(key)
+ m.add_string(algorithm)
+ m.add_string(bits)
return m.asbytes()
def wait_for_response(self, event):
@@ -269,6 +282,39 @@ class AuthHandler(object):
# dunno this one
self._disconnect_service_not_available()
+ def _finalize_pubkey_algorithm(self, key_type):
+ # Short-circuit for non-RSA keys
+ if "rsa" not in key_type:
+ return key_type
+ self._log(
+ DEBUG,
+ "Finalizing pubkey algorithm for key of type {!r}".format(
+ key_type
+ ),
+ )
+ # Only consider RSA algos from our list, lest we agree on another!
+ my_algos = [x for x in self.transport.preferred_pubkeys if "rsa" in x]
+ self._log(DEBUG, "Our pubkey algorithm list: {}".format(my_algos))
+ # Short-circuit negatively if user disabled all RSA algos (heh)
+ if not my_algos:
+ raise SSHException(
+ "An RSA key was specified, but no RSA pubkey algorithms are configured!" # noqa
+ )
+ # Check for server-sig-algs if supported & sent
+ server_algos = u(
+ self.transport.server_extensions.get("server-sig-algs", b(""))
+ ).split(",")
+ self._log(DEBUG, "Server-side algorithm list: {}".format(server_algos))
+ # Only use algos from our list that the server likes, in our own
+ # preference order. (NOTE: purposefully using same style as in
+ # Transport...expect to refactor later)
+ agreement = list(filter(server_algos.__contains__, my_algos))
+ # Fallback: first one in our (possibly tweaked by caller) list
+ final = agreement[0] if agreement else my_algos[0]
+ self.transport._agreed_pubkey_algorithm = final
+ self._log(DEBUG, "Agreed upon {!r} pubkey algorithm".format(final))
+ return final
+
def _parse_service_accept(self, m):
service = m.get_text()
if service == "ssh-userauth":
@@ -287,18 +333,15 @@ class AuthHandler(object):
m.add_string(password)
elif self.auth_method == "publickey":
m.add_boolean(True)
- # Use certificate contents, if available, plain pubkey
- # otherwise
- if self.private_key.public_blob:
- m.add_string(self.private_key.public_blob.key_type)
- m.add_string(self.private_key.public_blob.key_blob)
- else:
- m.add_string(self.private_key.get_name())
- m.add_string(self.private_key)
+ algorithm, bits = self._get_algorithm_and_bits(
+ self.private_key
+ )
+ m.add_string(algorithm)
+ m.add_string(bits)
blob = self._get_session_blob(
self.private_key, "ssh-connection", self.username
)
- sig = self.private_key.sign_ssh_data(blob)
+ sig = self.private_key.sign_ssh_data(blob, algorithm)
m.add_string(sig)
elif self.auth_method == "keyboard-interactive":
m.add_string("")
@@ -529,13 +572,15 @@ Error Message: {}
username, key
)
if result != AUTH_FAILED:
+ sig_algo = self._finalize_pubkey_algorithm(keytype)
# key is okay, verify it
if not sig_attached:
# client wants to know if this key is acceptable, before it
# signs anything... send special "ok" message
m = Message()
m.add_byte(cMSG_USERAUTH_PK_OK)
- m.add_string(keytype)
+ # TODO: suspect we're not testing this
+ m.add_string(sig_algo)
m.add_string(keyblob)
self.transport._send_message(m)
return