diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2021-12-13 15:55:36 -0500 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2021-12-23 00:31:01 -0500 |
commit | 363a28d94cada17f012c1604a3c99c71a2bda003 (patch) | |
tree | 6979a1d39ced84c3b29d366a0026db5fd9a62851 /paramiko/auth_handler.py | |
parent | dfffaeaa0170c784307d1c89dad60528a59b6ff2 (diff) | |
download | paramiko-363a28d94cada17f012c1604a3c99c71a2bda003.tar.gz |
Add support for RSA SHA2 host and public keys
Includes a handful of refactors and new semiprivate
attributes on Transport and AuthHandler for better
test visibility.
Diffstat (limited to 'paramiko/auth_handler.py')
-rw-r--r-- | paramiko/auth_handler.py | 81 |
1 files changed, 63 insertions, 18 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py index 011e57f3..845b9143 100644 --- a/paramiko/auth_handler.py +++ b/paramiko/auth_handler.py @@ -61,7 +61,7 @@ from paramiko.common import ( cMSG_USERAUTH_BANNER, ) from paramiko.message import Message -from paramiko.py3compat import b +from paramiko.py3compat import b, u from paramiko.ssh_exception import ( SSHException, AuthenticationException, @@ -206,6 +206,23 @@ class AuthHandler(object): self.transport._send_message(m) self.transport.close() + def _get_algorithm_and_bits(self, key): + """ + Given any key, return appropriate signing algorithm & bits-to-sign. + + Intended for input to or verification of, key signatures. + """ + key_type, bits = None, None + # Use certificate contents, if available, plain pubkey otherwise + if key.public_blob: + key_type = key.public_blob.key_type + bits = key.public_blob.key_blob + else: + key_type = key.get_name() + bits = key + algorithm = self._finalize_pubkey_algorithm(key_type) + return algorithm, bits + def _get_session_blob(self, key, service, username): m = Message() m.add_string(self.transport.session_id) @@ -214,13 +231,9 @@ class AuthHandler(object): m.add_string(service) m.add_string("publickey") m.add_boolean(True) - # Use certificate contents, if available, plain pubkey otherwise - if key.public_blob: - m.add_string(key.public_blob.key_type) - m.add_string(key.public_blob.key_blob) - else: - m.add_string(key.get_name()) - m.add_string(key) + algorithm, bits = self._get_algorithm_and_bits(key) + m.add_string(algorithm) + m.add_string(bits) return m.asbytes() def wait_for_response(self, event): @@ -269,6 +282,39 @@ class AuthHandler(object): # dunno this one self._disconnect_service_not_available() + def _finalize_pubkey_algorithm(self, key_type): + # Short-circuit for non-RSA keys + if "rsa" not in key_type: + return key_type + self._log( + DEBUG, + "Finalizing pubkey algorithm for key of type {!r}".format( + key_type + ), + ) + # Only consider RSA algos from our list, lest we agree on another! + my_algos = [x for x in self.transport.preferred_pubkeys if "rsa" in x] + self._log(DEBUG, "Our pubkey algorithm list: {}".format(my_algos)) + # Short-circuit negatively if user disabled all RSA algos (heh) + if not my_algos: + raise SSHException( + "An RSA key was specified, but no RSA pubkey algorithms are configured!" # noqa + ) + # Check for server-sig-algs if supported & sent + server_algos = u( + self.transport.server_extensions.get("server-sig-algs", b("")) + ).split(",") + self._log(DEBUG, "Server-side algorithm list: {}".format(server_algos)) + # Only use algos from our list that the server likes, in our own + # preference order. (NOTE: purposefully using same style as in + # Transport...expect to refactor later) + agreement = list(filter(server_algos.__contains__, my_algos)) + # Fallback: first one in our (possibly tweaked by caller) list + final = agreement[0] if agreement else my_algos[0] + self.transport._agreed_pubkey_algorithm = final + self._log(DEBUG, "Agreed upon {!r} pubkey algorithm".format(final)) + return final + def _parse_service_accept(self, m): service = m.get_text() if service == "ssh-userauth": @@ -287,18 +333,15 @@ class AuthHandler(object): m.add_string(password) elif self.auth_method == "publickey": m.add_boolean(True) - # Use certificate contents, if available, plain pubkey - # otherwise - if self.private_key.public_blob: - m.add_string(self.private_key.public_blob.key_type) - m.add_string(self.private_key.public_blob.key_blob) - else: - m.add_string(self.private_key.get_name()) - m.add_string(self.private_key) + algorithm, bits = self._get_algorithm_and_bits( + self.private_key + ) + m.add_string(algorithm) + m.add_string(bits) blob = self._get_session_blob( self.private_key, "ssh-connection", self.username ) - sig = self.private_key.sign_ssh_data(blob) + sig = self.private_key.sign_ssh_data(blob, algorithm) m.add_string(sig) elif self.auth_method == "keyboard-interactive": m.add_string("") @@ -529,13 +572,15 @@ Error Message: {} username, key ) if result != AUTH_FAILED: + sig_algo = self._finalize_pubkey_algorithm(keytype) # key is okay, verify it if not sig_attached: # client wants to know if this key is acceptable, before it # signs anything... send special "ok" message m = Message() m.add_byte(cMSG_USERAUTH_PK_OK) - m.add_string(keytype) + # TODO: suspect we're not testing this + m.add_string(sig_algo) m.add_string(keyblob) self.transport._send_message(m) return |