summaryrefslogtreecommitdiff
path: root/paramiko/pkey.py
diff options
context:
space:
mode:
authorChris Rose <offline@offby1.net>2018-05-17 10:13:38 -0400
committerChris Rose <offline@offby1.net>2018-05-17 10:13:38 -0400
commit7f2c35052183b400827d9949a68b41c90f90a32d (patch)
treefea4a1ec04b7ee3ced14d61e8b6cf3f479e22704 /paramiko/pkey.py
parent52551321a2297bdb966869fa719e584c868dd857 (diff)
downloadparamiko-7f2c35052183b400827d9949a68b41c90f90a32d.tar.gz
Blacken Paramiko on 2.4
Diffstat (limited to 'paramiko/pkey.py')
-rw-r--r--paramiko/pkey.py100
1 files changed, 51 insertions, 49 deletions
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 808215f8..a01d4fd8 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -43,23 +43,23 @@ class PKey(object):
# known encryption types for private key files:
_CIPHER_TABLE = {
- 'AES-128-CBC': {
- 'cipher': algorithms.AES,
- 'keysize': 16,
- 'blocksize': 16,
- 'mode': modes.CBC
+ "AES-128-CBC": {
+ "cipher": algorithms.AES,
+ "keysize": 16,
+ "blocksize": 16,
+ "mode": modes.CBC,
},
- 'AES-256-CBC': {
- 'cipher': algorithms.AES,
- 'keysize': 32,
- 'blocksize': 16,
- 'mode': modes.CBC
+ "AES-256-CBC": {
+ "cipher": algorithms.AES,
+ "keysize": 32,
+ "blocksize": 16,
+ "mode": modes.CBC,
},
- 'DES-EDE3-CBC': {
- 'cipher': algorithms.TripleDES,
- 'keysize': 24,
- 'blocksize': 8,
- 'mode': modes.CBC
+ "DES-EDE3-CBC": {
+ "cipher": algorithms.TripleDES,
+ "keysize": 24,
+ "blocksize": 8,
+ "mode": modes.CBC,
},
}
@@ -107,7 +107,7 @@ class PKey(object):
hs = hash(self)
ho = hash(other)
if hs != ho:
- return cmp(hs, ho) # noqa
+ return cmp(hs, ho) # noqa
return cmp(self.asbytes(), other.asbytes()) # noqa
def __eq__(self, other):
@@ -121,7 +121,7 @@ class PKey(object):
name of this private key type, in SSH terminology, as a `str` (for
example, ``"ssh-rsa"``).
"""
- return ''
+ return ""
def get_bits(self):
"""
@@ -158,7 +158,7 @@ class PKey(object):
:return: a base64 `string <str>` containing the public part of the key.
"""
- return u(encodebytes(self.asbytes())).replace('\n', '')
+ return u(encodebytes(self.asbytes())).replace("\n", "")
def sign_ssh_data(self, data):
"""
@@ -239,7 +239,7 @@ class PKey(object):
:raises: ``IOError`` -- if there was an error writing the file
:raises: `.SSHException` -- if the key is invalid
"""
- raise Exception('Not implemented in PKey')
+ raise Exception("Not implemented in PKey")
def write_private_key(self, file_obj, password=None):
"""
@@ -252,7 +252,7 @@ class PKey(object):
:raises: ``IOError`` -- if there was an error writing to the file
:raises: `.SSHException` -- if the key is invalid
"""
- raise Exception('Not implemented in PKey')
+ raise Exception("Not implemented in PKey")
def _read_private_key_file(self, tag, filename, password=None):
"""
@@ -275,60 +275,61 @@ class PKey(object):
encrypted, and ``password`` is ``None``.
:raises: `.SSHException` -- if the key file is invalid.
"""
- with open(filename, 'r') as f:
+ with open(filename, "r") as f:
data = self._read_private_key(tag, f, password)
return data
def _read_private_key(self, tag, f, password=None):
lines = f.readlines()
start = 0
- beginning_of_key = '-----BEGIN ' + tag + ' PRIVATE KEY-----'
+ beginning_of_key = "-----BEGIN " + tag + " PRIVATE KEY-----"
while start < len(lines) and lines[start].strip() != beginning_of_key:
start += 1
if start >= len(lines):
- raise SSHException('not a valid ' + tag + ' private key file')
+ raise SSHException("not a valid " + tag + " private key file")
# parse any headers first
headers = {}
start += 1
while start < len(lines):
- l = lines[start].split(': ')
+ l = lines[start].split(": ")
if len(l) == 1:
break
headers[l[0].lower()] = l[1].strip()
start += 1
# find end
end = start
- ending_of_key = '-----END ' + tag + ' PRIVATE KEY-----'
+ ending_of_key = "-----END " + tag + " PRIVATE KEY-----"
while end < len(lines) and lines[end].strip() != ending_of_key:
end += 1
# if we trudged to the end of the file, just try to cope.
try:
- data = decodebytes(b(''.join(lines[start:end])))
+ data = decodebytes(b("".join(lines[start:end])))
except base64.binascii.Error as e:
- raise SSHException('base64 decoding error: ' + str(e))
- if 'proc-type' not in headers:
+ raise SSHException("base64 decoding error: " + str(e))
+ if "proc-type" not in headers:
# unencryped: done
return data
# encrypted keyfile: will need a password
- proc_type = headers['proc-type']
- if proc_type != '4,ENCRYPTED':
+ proc_type = headers["proc-type"]
+ if proc_type != "4,ENCRYPTED":
raise SSHException(
'Unknown private key structure "{}"'.format(proc_type)
)
try:
- encryption_type, saltstr = headers['dek-info'].split(',')
+ encryption_type, saltstr = headers["dek-info"].split(",")
except:
raise SSHException("Can't parse DEK-info in private key file")
if encryption_type not in self._CIPHER_TABLE:
raise SSHException(
- 'Unknown private key cipher "{}"'.format(encryption_type))
+ 'Unknown private key cipher "{}"'.format(encryption_type)
+ )
# if no password was passed in,
# raise an exception pointing out that we need one
if password is None:
- raise PasswordRequiredException('Private key file is encrypted')
- cipher = self._CIPHER_TABLE[encryption_type]['cipher']
- keysize = self._CIPHER_TABLE[encryption_type]['keysize']
- mode = self._CIPHER_TABLE[encryption_type]['mode']
+ raise PasswordRequiredException("Private key file is encrypted")
+ cipher = self._CIPHER_TABLE[encryption_type]["cipher"]
+ keysize = self._CIPHER_TABLE[encryption_type]["keysize"]
+ mode = self._CIPHER_TABLE[encryption_type]["mode"]
salt = unhexlify(b(saltstr))
key = util.generate_key_bytes(md5, salt, password, keysize)
decryptor = Cipher(
@@ -351,7 +352,7 @@ class PKey(object):
:raises: ``IOError`` -- if there was an error writing the file.
"""
- with open(filename, 'w') as f:
+ with open(filename, "w") as f:
os.chmod(filename, o600)
self._write_private_key(f, key, format, password=password)
@@ -361,11 +362,11 @@ class PKey(object):
else:
encryption = serialization.BestAvailableEncryption(b(password))
- f.write(key.private_bytes(
- serialization.Encoding.PEM,
- format,
- encryption
- ).decode())
+ f.write(
+ key.private_bytes(
+ serialization.Encoding.PEM, format, encryption
+ ).decode()
+ )
def _check_type_and_load_cert(self, msg, key_type, cert_type):
"""
@@ -388,7 +389,7 @@ class PKey(object):
cert_types = [cert_types]
# Can't do much with no message, that should've been handled elsewhere
if msg is None:
- raise SSHException('Key object may not be empty')
+ raise SSHException("Key object may not be empty")
# First field is always key type, in either kind of object. (make sure
# we rewind before grabbing it - sometimes caller had to do their own
# introspection first!)
@@ -411,7 +412,7 @@ class PKey(object):
# (requires going back into per-type subclasses.)
msg.get_string()
else:
- err = 'Invalid key (class: {}, data type: {}'
+ err = "Invalid key (class: {}, data type: {}"
raise SSHException(err.format(self.__class__.__name__, type_))
def load_certificate(self, value):
@@ -434,11 +435,11 @@ class PKey(object):
successfully.
"""
if isinstance(value, Message):
- constructor = 'from_message'
+ constructor = "from_message"
elif os.path.isfile(value):
- constructor = 'from_file'
+ constructor = "from_file"
else:
- constructor = 'from_string'
+ constructor = "from_string"
blob = getattr(PublicBlob, constructor)(value)
if not blob.key_type.startswith(self.get_name()):
err = "PublicBlob type {} incompatible with key type {}"
@@ -464,6 +465,7 @@ class PublicBlob(object):
`from_message` for useful instantiation, the main constructor is
basically "I should be using ``attrs`` for this."
"""
+
def __init__(self, type_, blob, comment=None):
"""
Create a new public blob of given type and contents.
@@ -505,7 +507,7 @@ class PublicBlob(object):
m = Message(key_blob)
blob_type = m.get_text()
if blob_type != key_type:
- msg = "Invalid PublicBlob contents: key type={!r}, but blob type={!r}" # noqa
+ msg = "Invalid PublicBlob contents: key type={!r}, but blob type={!r}" # noqa
raise ValueError(msg.format(key_type, blob_type))
# All good? All good.
return cls(type_=key_type, blob=key_blob, comment=comment)
@@ -522,7 +524,7 @@ class PublicBlob(object):
return cls(type_=type_, blob=message.asbytes())
def __str__(self):
- ret = '{} public key/certificate'.format(self.key_type)
+ ret = "{} public key/certificate".format(self.key_type)
if self.comment:
ret += "- {}".format(self.comment)
return ret