summaryrefslogtreecommitdiff
path: root/paramiko/auth_handler.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2021-12-24 13:30:06 -0500
committerJeff Forcier <jeff@bitprophet.org>2021-12-24 14:41:25 -0500
commitd2865ed7c3364406b2f1f4f984dcdd315196a353 (patch)
treee79c63e1cdc3bec6652a6e6a9bb5b7a2e473acdb /paramiko/auth_handler.py
parent69fb31fcc14fef16b612d18b78016e74732b2de3 (diff)
downloadparamiko-d2865ed7c3364406b2f1f4f984dcdd315196a353.tar.gz
Fix #1955
Diffstat (limited to 'paramiko/auth_handler.py')
-rw-r--r--paramiko/auth_handler.py53
1 files changed, 32 insertions, 21 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index 845b9143..da109d7c 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -206,24 +206,19 @@ class AuthHandler(object):
self.transport._send_message(m)
self.transport.close()
- def _get_algorithm_and_bits(self, key):
+ def _get_key_type_and_bits(self, key):
"""
- Given any key, return appropriate signing algorithm & bits-to-sign.
+ Given any key, return its type/algorithm & bits-to-sign.
Intended for input to or verification of, key signatures.
"""
- key_type, bits = None, None
# Use certificate contents, if available, plain pubkey otherwise
if key.public_blob:
- key_type = key.public_blob.key_type
- bits = key.public_blob.key_blob
+ return key.public_blob.key_type, key.public_blob.key_blob
else:
- key_type = key.get_name()
- bits = key
- algorithm = self._finalize_pubkey_algorithm(key_type)
- return algorithm, bits
+ return key.get_name(), key
- def _get_session_blob(self, key, service, username):
+ def _get_session_blob(self, key, service, username, algorithm):
m = Message()
m.add_string(self.transport.session_id)
m.add_byte(cMSG_USERAUTH_REQUEST)
@@ -231,7 +226,7 @@ class AuthHandler(object):
m.add_string(service)
m.add_string("publickey")
m.add_boolean(True)
- algorithm, bits = self._get_algorithm_and_bits(key)
+ _, bits = self._get_key_type_and_bits(key)
m.add_string(algorithm)
m.add_string(bits)
return m.asbytes()
@@ -282,6 +277,17 @@ class AuthHandler(object):
# dunno this one
self._disconnect_service_not_available()
+ def _generate_key_from_request(self, algorithm, keyblob):
+ # For use in server mode.
+ options = self.transport.preferred_pubkeys
+ if algorithm.replace("-cert-v01@openssh.com", "") not in options:
+ err = (
+ "Auth rejected: pubkey algorithm '{}' unsupported or disabled"
+ )
+ self._log(INFO, err.format(algorithm))
+ return None
+ return self.transport._key_info[algorithm](Message(keyblob))
+
def _finalize_pubkey_algorithm(self, key_type):
# Short-circuit for non-RSA keys
if "rsa" not in key_type:
@@ -333,13 +339,15 @@ class AuthHandler(object):
m.add_string(password)
elif self.auth_method == "publickey":
m.add_boolean(True)
- algorithm, bits = self._get_algorithm_and_bits(
- self.private_key
- )
+ key_type, bits = self._get_key_type_and_bits(self.private_key)
+ algorithm = self._finalize_pubkey_algorithm(key_type)
m.add_string(algorithm)
m.add_string(bits)
blob = self._get_session_blob(
- self.private_key, "ssh-connection", self.username
+ self.private_key,
+ "ssh-connection",
+ self.username,
+ algorithm,
)
sig = self.private_key.sign_ssh_data(blob, algorithm)
m.add_string(sig)
@@ -551,10 +559,13 @@ Error Message: {}
)
elif method == "publickey":
sig_attached = m.get_boolean()
- keytype = m.get_text()
+ # NOTE: server never wants to guess a client's algo, they're
+ # telling us directly. No need for _finalize_pubkey_algorithm
+ # anywhere in this flow.
+ algorithm = m.get_text()
keyblob = m.get_binary()
try:
- key = self.transport._key_info[keytype](Message(keyblob))
+ key = self._generate_key_from_request(algorithm, keyblob)
except SSHException as e:
self._log(INFO, "Auth rejected: public key: {}".format(str(e)))
key = None
@@ -572,20 +583,20 @@ Error Message: {}
username, key
)
if result != AUTH_FAILED:
- sig_algo = self._finalize_pubkey_algorithm(keytype)
# key is okay, verify it
if not sig_attached:
# client wants to know if this key is acceptable, before it
# signs anything... send special "ok" message
m = Message()
m.add_byte(cMSG_USERAUTH_PK_OK)
- # TODO: suspect we're not testing this
- m.add_string(sig_algo)
+ m.add_string(algorithm)
m.add_string(keyblob)
self.transport._send_message(m)
return
sig = Message(m.get_binary())
- blob = self._get_session_blob(key, service, username)
+ blob = self._get_session_blob(
+ key, service, username, algorithm
+ )
if not key.verify_ssh_sig(blob, sig):
self._log(INFO, "Auth rejected: invalid signature")
result = AUTH_FAILED