diff options
author | Chris Rose <offline@offby1.net> | 2018-05-17 10:13:38 -0400 |
---|---|---|
committer | Chris Rose <offline@offby1.net> | 2018-05-17 10:13:38 -0400 |
commit | 7f2c35052183b400827d9949a68b41c90f90a32d (patch) | |
tree | fea4a1ec04b7ee3ced14d61e8b6cf3f479e22704 /paramiko/pkey.py | |
parent | 52551321a2297bdb966869fa719e584c868dd857 (diff) | |
download | paramiko-7f2c35052183b400827d9949a68b41c90f90a32d.tar.gz |
Blacken Paramiko on 2.4
Diffstat (limited to 'paramiko/pkey.py')
-rw-r--r-- | paramiko/pkey.py | 100 |
1 files changed, 51 insertions, 49 deletions
diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 808215f8..a01d4fd8 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -43,23 +43,23 @@ class PKey(object): # known encryption types for private key files: _CIPHER_TABLE = { - 'AES-128-CBC': { - 'cipher': algorithms.AES, - 'keysize': 16, - 'blocksize': 16, - 'mode': modes.CBC + "AES-128-CBC": { + "cipher": algorithms.AES, + "keysize": 16, + "blocksize": 16, + "mode": modes.CBC, }, - 'AES-256-CBC': { - 'cipher': algorithms.AES, - 'keysize': 32, - 'blocksize': 16, - 'mode': modes.CBC + "AES-256-CBC": { + "cipher": algorithms.AES, + "keysize": 32, + "blocksize": 16, + "mode": modes.CBC, }, - 'DES-EDE3-CBC': { - 'cipher': algorithms.TripleDES, - 'keysize': 24, - 'blocksize': 8, - 'mode': modes.CBC + "DES-EDE3-CBC": { + "cipher": algorithms.TripleDES, + "keysize": 24, + "blocksize": 8, + "mode": modes.CBC, }, } @@ -107,7 +107,7 @@ class PKey(object): hs = hash(self) ho = hash(other) if hs != ho: - return cmp(hs, ho) # noqa + return cmp(hs, ho) # noqa return cmp(self.asbytes(), other.asbytes()) # noqa def __eq__(self, other): @@ -121,7 +121,7 @@ class PKey(object): name of this private key type, in SSH terminology, as a `str` (for example, ``"ssh-rsa"``). """ - return '' + return "" def get_bits(self): """ @@ -158,7 +158,7 @@ class PKey(object): :return: a base64 `string <str>` containing the public part of the key. """ - return u(encodebytes(self.asbytes())).replace('\n', '') + return u(encodebytes(self.asbytes())).replace("\n", "") def sign_ssh_data(self, data): """ @@ -239,7 +239,7 @@ class PKey(object): :raises: ``IOError`` -- if there was an error writing the file :raises: `.SSHException` -- if the key is invalid """ - raise Exception('Not implemented in PKey') + raise Exception("Not implemented in PKey") def write_private_key(self, file_obj, password=None): """ @@ -252,7 +252,7 @@ class PKey(object): :raises: ``IOError`` -- if there was an error writing to the file :raises: `.SSHException` -- if the key is invalid """ - raise Exception('Not implemented in PKey') + raise Exception("Not implemented in PKey") def _read_private_key_file(self, tag, filename, password=None): """ @@ -275,60 +275,61 @@ class PKey(object): encrypted, and ``password`` is ``None``. :raises: `.SSHException` -- if the key file is invalid. """ - with open(filename, 'r') as f: + with open(filename, "r") as f: data = self._read_private_key(tag, f, password) return data def _read_private_key(self, tag, f, password=None): lines = f.readlines() start = 0 - beginning_of_key = '-----BEGIN ' + tag + ' PRIVATE KEY-----' + beginning_of_key = "-----BEGIN " + tag + " PRIVATE KEY-----" while start < len(lines) and lines[start].strip() != beginning_of_key: start += 1 if start >= len(lines): - raise SSHException('not a valid ' + tag + ' private key file') + raise SSHException("not a valid " + tag + " private key file") # parse any headers first headers = {} start += 1 while start < len(lines): - l = lines[start].split(': ') + l = lines[start].split(": ") if len(l) == 1: break headers[l[0].lower()] = l[1].strip() start += 1 # find end end = start - ending_of_key = '-----END ' + tag + ' PRIVATE KEY-----' + ending_of_key = "-----END " + tag + " PRIVATE KEY-----" while end < len(lines) and lines[end].strip() != ending_of_key: end += 1 # if we trudged to the end of the file, just try to cope. try: - data = decodebytes(b(''.join(lines[start:end]))) + data = decodebytes(b("".join(lines[start:end]))) except base64.binascii.Error as e: - raise SSHException('base64 decoding error: ' + str(e)) - if 'proc-type' not in headers: + raise SSHException("base64 decoding error: " + str(e)) + if "proc-type" not in headers: # unencryped: done return data # encrypted keyfile: will need a password - proc_type = headers['proc-type'] - if proc_type != '4,ENCRYPTED': + proc_type = headers["proc-type"] + if proc_type != "4,ENCRYPTED": raise SSHException( 'Unknown private key structure "{}"'.format(proc_type) ) try: - encryption_type, saltstr = headers['dek-info'].split(',') + encryption_type, saltstr = headers["dek-info"].split(",") except: raise SSHException("Can't parse DEK-info in private key file") if encryption_type not in self._CIPHER_TABLE: raise SSHException( - 'Unknown private key cipher "{}"'.format(encryption_type)) + 'Unknown private key cipher "{}"'.format(encryption_type) + ) # if no password was passed in, # raise an exception pointing out that we need one if password is None: - raise PasswordRequiredException('Private key file is encrypted') - cipher = self._CIPHER_TABLE[encryption_type]['cipher'] - keysize = self._CIPHER_TABLE[encryption_type]['keysize'] - mode = self._CIPHER_TABLE[encryption_type]['mode'] + raise PasswordRequiredException("Private key file is encrypted") + cipher = self._CIPHER_TABLE[encryption_type]["cipher"] + keysize = self._CIPHER_TABLE[encryption_type]["keysize"] + mode = self._CIPHER_TABLE[encryption_type]["mode"] salt = unhexlify(b(saltstr)) key = util.generate_key_bytes(md5, salt, password, keysize) decryptor = Cipher( @@ -351,7 +352,7 @@ class PKey(object): :raises: ``IOError`` -- if there was an error writing the file. """ - with open(filename, 'w') as f: + with open(filename, "w") as f: os.chmod(filename, o600) self._write_private_key(f, key, format, password=password) @@ -361,11 +362,11 @@ class PKey(object): else: encryption = serialization.BestAvailableEncryption(b(password)) - f.write(key.private_bytes( - serialization.Encoding.PEM, - format, - encryption - ).decode()) + f.write( + key.private_bytes( + serialization.Encoding.PEM, format, encryption + ).decode() + ) def _check_type_and_load_cert(self, msg, key_type, cert_type): """ @@ -388,7 +389,7 @@ class PKey(object): cert_types = [cert_types] # Can't do much with no message, that should've been handled elsewhere if msg is None: - raise SSHException('Key object may not be empty') + raise SSHException("Key object may not be empty") # First field is always key type, in either kind of object. (make sure # we rewind before grabbing it - sometimes caller had to do their own # introspection first!) @@ -411,7 +412,7 @@ class PKey(object): # (requires going back into per-type subclasses.) msg.get_string() else: - err = 'Invalid key (class: {}, data type: {}' + err = "Invalid key (class: {}, data type: {}" raise SSHException(err.format(self.__class__.__name__, type_)) def load_certificate(self, value): @@ -434,11 +435,11 @@ class PKey(object): successfully. """ if isinstance(value, Message): - constructor = 'from_message' + constructor = "from_message" elif os.path.isfile(value): - constructor = 'from_file' + constructor = "from_file" else: - constructor = 'from_string' + constructor = "from_string" blob = getattr(PublicBlob, constructor)(value) if not blob.key_type.startswith(self.get_name()): err = "PublicBlob type {} incompatible with key type {}" @@ -464,6 +465,7 @@ class PublicBlob(object): `from_message` for useful instantiation, the main constructor is basically "I should be using ``attrs`` for this." """ + def __init__(self, type_, blob, comment=None): """ Create a new public blob of given type and contents. @@ -505,7 +507,7 @@ class PublicBlob(object): m = Message(key_blob) blob_type = m.get_text() if blob_type != key_type: - msg = "Invalid PublicBlob contents: key type={!r}, but blob type={!r}" # noqa + msg = "Invalid PublicBlob contents: key type={!r}, but blob type={!r}" # noqa raise ValueError(msg.format(key_type, blob_type)) # All good? All good. return cls(type_=key_type, blob=key_blob, comment=comment) @@ -522,7 +524,7 @@ class PublicBlob(object): return cls(type_=type_, blob=message.asbytes()) def __str__(self): - ret = '{} public key/certificate'.format(self.key_type) + ret = "{} public key/certificate".format(self.key_type) if self.comment: ret += "- {}".format(self.comment) return ret |