diff options
author | Sybren A. Stüvel <sybren@stuvel.eu> | 2021-03-29 23:17:55 +0200 |
---|---|---|
committer | Sybren A. Stüvel <sybren@stuvel.eu> | 2021-03-29 23:17:55 +0200 |
commit | 35e962d9ce424ef5ea35a9787b7b165fc034712d (patch) | |
tree | d2f93be0eb8e288ce4c2021527ddd9e57489ca7c /rsa | |
parent | 7bdbdaa16ee644d00845b94eaa3008061dc5ec79 (diff) | |
download | rsa-git-35e962d9ce424ef5ea35a9787b7b165fc034712d.tar.gz |
Reformatting with Black
No functional changes.
Diffstat (limited to 'rsa')
-rw-r--r-- | rsa/__init__.py | 34 | ||||
-rw-r--r-- | rsa/asn1.py | 19 | ||||
-rw-r--r-- | rsa/cli.py | 207 | ||||
-rw-r--r-- | rsa/common.py | 9 | ||||
-rw-r--r-- | rsa/core.py | 16 | ||||
-rw-r--r-- | rsa/key.py | 215 | ||||
-rw-r--r-- | rsa/parallel.py | 13 | ||||
-rw-r--r-- | rsa/pem.py | 22 | ||||
-rw-r--r-- | rsa/pkcs1.py | 117 | ||||
-rw-r--r-- | rsa/pkcs1_v2.py | 20 | ||||
-rw-r--r-- | rsa/prime.py | 10 | ||||
-rw-r--r-- | rsa/randnum.py | 5 | ||||
-rw-r--r-- | rsa/transform.py | 8 | ||||
-rw-r--r-- | rsa/util.py | 74 |
14 files changed, 434 insertions, 335 deletions
diff --git a/rsa/__init__.py b/rsa/__init__.py index 4bc164e..8d23986 100644 --- a/rsa/__init__.py +++ b/rsa/__init__.py @@ -22,12 +22,21 @@ prevent repetitions, or other common security improvements. Use with care. """ from rsa.key import newkeys, PrivateKey, PublicKey -from rsa.pkcs1 import encrypt, decrypt, sign, verify, DecryptionError, \ - VerificationError, find_signature_hash, sign_hash, compute_hash +from rsa.pkcs1 import ( + encrypt, + decrypt, + sign, + verify, + DecryptionError, + VerificationError, + find_signature_hash, + sign_hash, + compute_hash, +) __author__ = "Sybren Stuvel, Barry Mead and Yesudeep Mangalapilly" -__date__ = '2021-02-24' -__version__ = '4.8-dev0' +__date__ = "2021-02-24" +__version__ = "4.8-dev0" # Do doctest if we're run directly if __name__ == "__main__": @@ -35,6 +44,17 @@ if __name__ == "__main__": doctest.testmod() -__all__ = ["newkeys", "encrypt", "decrypt", "sign", "verify", 'PublicKey', - 'PrivateKey', 'DecryptionError', 'VerificationError', - 'find_signature_hash', 'compute_hash', 'sign_hash'] +__all__ = [ + "newkeys", + "encrypt", + "decrypt", + "sign", + "verify", + "PublicKey", + "PrivateKey", + "DecryptionError", + "VerificationError", + "find_signature_hash", + "compute_hash", + "sign_hash", +] diff --git a/rsa/asn1.py b/rsa/asn1.py index 32b1eb4..4cc4dd3 100644 --- a/rsa/asn1.py +++ b/rsa/asn1.py @@ -22,18 +22,19 @@ from pyasn1.type import univ, namedtype, tag class PubKeyHeader(univ.Sequence): componentType = namedtype.NamedTypes( - namedtype.NamedType('oid', univ.ObjectIdentifier()), - namedtype.NamedType('parameters', univ.Null()), + namedtype.NamedType("oid", univ.ObjectIdentifier()), + namedtype.NamedType("parameters", univ.Null()), ) class OpenSSLPubKey(univ.Sequence): componentType = namedtype.NamedTypes( - namedtype.NamedType('header', PubKeyHeader()), - - # This little hack (the implicit tag) allows us to get a Bit String as Octet String - namedtype.NamedType('key', univ.OctetString().subtype( - implicitTag=tag.Tag(tagClass=0, tagFormat=0, tagId=3))), + namedtype.NamedType("header", PubKeyHeader()), + # This little hack (the implicit tag) allows us to get a Bit String as Octet String + namedtype.NamedType( + "key", + univ.OctetString().subtype(implicitTag=tag.Tag(tagClass=0, tagFormat=0, tagId=3)), + ), ) @@ -46,6 +47,6 @@ class AsnPubKey(univ.Sequence): """ componentType = namedtype.NamedTypes( - namedtype.NamedType('modulus', univ.Integer()), - namedtype.NamedType('publicExponent', univ.Integer()), + namedtype.NamedType("modulus", univ.Integer()), + namedtype.NamedType("publicExponent", univ.Integer()), ) @@ -34,21 +34,33 @@ def keygen() -> None: """Key generator.""" # Parse the CLI options - parser = optparse.OptionParser(usage='usage: %prog [options] keysize', - description='Generates a new RSA keypair of "keysize" bits.') - - parser.add_option('--pubout', type='string', - help='Output filename for the public key. The public key is ' - 'not saved if this option is not present. You can use ' - 'pyrsa-priv2pub to create the public key file later.') - - parser.add_option('-o', '--out', type='string', - help='Output filename for the private key. The key is ' - 'written to stdout if this option is not present.') - - parser.add_option('--form', - help='key format of the private and public keys - default PEM', - choices=('PEM', 'DER'), default='PEM') + parser = optparse.OptionParser( + usage="usage: %prog [options] keysize", + description='Generates a new RSA keypair of "keysize" bits.', + ) + + parser.add_option( + "--pubout", + type="string", + help="Output filename for the public key. The public key is " + "not saved if this option is not present. You can use " + "pyrsa-priv2pub to create the public key file later.", + ) + + parser.add_option( + "-o", + "--out", + type="string", + help="Output filename for the private key. The key is " + "written to stdout if this option is not present.", + ) + + parser.add_option( + "--form", + help="key format of the private and public keys - default PEM", + choices=("PEM", "DER"), + default="PEM", + ) (cli, cli_args) = parser.parse_args(sys.argv[1:]) @@ -60,44 +72,45 @@ def keygen() -> None: keysize = int(cli_args[0]) except ValueError as ex: parser.print_help() - print('Not a valid number: %s' % cli_args[0], file=sys.stderr) + print("Not a valid number: %s" % cli_args[0], file=sys.stderr) raise SystemExit(1) from ex - print('Generating %i-bit key' % keysize, file=sys.stderr) + print("Generating %i-bit key" % keysize, file=sys.stderr) (pub_key, priv_key) = rsa.newkeys(keysize) # Save public key if cli.pubout: - print('Writing public key to %s' % cli.pubout, file=sys.stderr) + print("Writing public key to %s" % cli.pubout, file=sys.stderr) data = pub_key.save_pkcs1(format=cli.form) - with open(cli.pubout, 'wb') as outfile: + with open(cli.pubout, "wb") as outfile: outfile.write(data) # Save private key data = priv_key.save_pkcs1(format=cli.form) if cli.out: - print('Writing private key to %s' % cli.out, file=sys.stderr) - with open(cli.out, 'wb') as outfile: + print("Writing private key to %s" % cli.out, file=sys.stderr) + with open(cli.out, "wb") as outfile: outfile.write(data) else: - print('Writing private key to stdout', file=sys.stderr) + print("Writing private key to stdout", file=sys.stderr) sys.stdout.buffer.write(data) class CryptoOperation(metaclass=abc.ABCMeta): """CLI callable that operates with input, output, and a key.""" - keyname = 'public' # or 'private' - usage = 'usage: %%prog [options] %(keyname)s_key' - description = '' - operation = 'decrypt' - operation_past = 'decrypted' - operation_progressive = 'decrypting' - input_help = 'Name of the file to %(operation)s. Reads from stdin if ' \ - 'not specified.' - output_help = 'Name of the file to write the %(operation_past)s file ' \ - 'to. Written to stdout if this option is not present.' + keyname = "public" # or 'private' + usage = "usage: %%prog [options] %(keyname)s_key" + description = "" + operation = "decrypt" + operation_past = "decrypted" + operation_progressive = "decrypting" + input_help = "Name of the file to %(operation)s. Reads from stdin if " "not specified." + output_help = ( + "Name of the file to write the %(operation_past)s file " + "to. Written to stdout if this option is not present." + ) expected_cli_args = 1 has_output = True @@ -109,8 +122,9 @@ class CryptoOperation(metaclass=abc.ABCMeta): self.output_help = self.output_help % self.__class__.__dict__ @abc.abstractmethod - def perform_operation(self, indata: bytes, key: rsa.key.AbstractKey, - cli_args: Indexable) -> typing.Any: + def perform_operation( + self, indata: bytes, key: rsa.key.AbstractKey, cli_args: Indexable + ) -> typing.Any: """Performs the program's operation. Implement in a subclass. @@ -141,14 +155,17 @@ class CryptoOperation(metaclass=abc.ABCMeta): parser = optparse.OptionParser(usage=self.usage, description=self.description) - parser.add_option('-i', '--input', type='string', help=self.input_help) + parser.add_option("-i", "--input", type="string", help=self.input_help) if self.has_output: - parser.add_option('-o', '--output', type='string', help=self.output_help) + parser.add_option("-o", "--output", type="string", help=self.output_help) - parser.add_option('--keyform', - help='Key format of the %s key - default PEM' % self.keyname, - choices=('PEM', 'DER'), default='PEM') + parser.add_option( + "--keyform", + help="Key format of the %s key - default PEM" % self.keyname, + choices=("PEM", "DER"), + default="PEM", + ) (cli, cli_args) = parser.parse_args(sys.argv[1:]) @@ -161,8 +178,8 @@ class CryptoOperation(metaclass=abc.ABCMeta): def read_key(self, filename: str, keyform: str) -> rsa.key.AbstractKey: """Reads a public or private key.""" - print('Reading %s key from %s' % (self.keyname, filename), file=sys.stderr) - with open(filename, 'rb') as keyfile: + print("Reading %s key from %s" % (self.keyname, filename), file=sys.stderr) + with open(filename, "rb") as keyfile: keydata = keyfile.read() return self.key_class.load_pkcs1(keydata, keyform) @@ -171,37 +188,39 @@ class CryptoOperation(metaclass=abc.ABCMeta): """Read the input file""" if inname: - print('Reading input from %s' % inname, file=sys.stderr) - with open(inname, 'rb') as infile: + print("Reading input from %s" % inname, file=sys.stderr) + with open(inname, "rb") as infile: return infile.read() - print('Reading input from stdin', file=sys.stderr) + print("Reading input from stdin", file=sys.stderr) return sys.stdin.buffer.read() def write_outfile(self, outdata: bytes, outname: str) -> None: """Write the output file""" if outname: - print('Writing output to %s' % outname, file=sys.stderr) - with open(outname, 'wb') as outfile: + print("Writing output to %s" % outname, file=sys.stderr) + with open(outname, "wb") as outfile: outfile.write(outdata) else: - print('Writing output to stdout', file=sys.stderr) + print("Writing output to stdout", file=sys.stderr) sys.stdout.buffer.write(outdata) class EncryptOperation(CryptoOperation): """Encrypts a file.""" - keyname = 'public' - description = ('Encrypts a file. The file must be shorter than the key ' - 'length in order to be encrypted.') - operation = 'encrypt' - operation_past = 'encrypted' - operation_progressive = 'encrypting' - - def perform_operation(self, indata: bytes, pub_key: rsa.key.AbstractKey, - cli_args: Indexable = ()) -> bytes: + keyname = "public" + description = ( + "Encrypts a file. The file must be shorter than the key " "length in order to be encrypted." + ) + operation = "encrypt" + operation_past = "encrypted" + operation_progressive = "encrypting" + + def perform_operation( + self, indata: bytes, pub_key: rsa.key.AbstractKey, cli_args: Indexable = () + ) -> bytes: """Encrypts files.""" assert isinstance(pub_key, rsa.key.PublicKey) return rsa.encrypt(indata, pub_key) @@ -210,16 +229,19 @@ class EncryptOperation(CryptoOperation): class DecryptOperation(CryptoOperation): """Decrypts a file.""" - keyname = 'private' - description = ('Decrypts a file. The original file must be shorter than ' - 'the key length in order to have been encrypted.') - operation = 'decrypt' - operation_past = 'decrypted' - operation_progressive = 'decrypting' + keyname = "private" + description = ( + "Decrypts a file. The original file must be shorter than " + "the key length in order to have been encrypted." + ) + operation = "decrypt" + operation_past = "decrypted" + operation_progressive = "decrypting" key_class = rsa.PrivateKey - def perform_operation(self, indata: bytes, priv_key: rsa.key.AbstractKey, - cli_args: Indexable = ()) -> bytes: + def perform_operation( + self, indata: bytes, priv_key: rsa.key.AbstractKey, cli_args: Indexable = () + ) -> bytes: """Decrypts files.""" assert isinstance(priv_key, rsa.key.PrivateKey) return rsa.decrypt(indata, priv_key) @@ -228,28 +250,32 @@ class DecryptOperation(CryptoOperation): class SignOperation(CryptoOperation): """Signs a file.""" - keyname = 'private' - usage = 'usage: %%prog [options] private_key hash_method' - description = ('Signs a file, outputs the signature. Choose the hash ' - 'method from %s' % ', '.join(HASH_METHODS)) - operation = 'sign' - operation_past = 'signature' - operation_progressive = 'Signing' + keyname = "private" + usage = "usage: %%prog [options] private_key hash_method" + description = ( + "Signs a file, outputs the signature. Choose the hash " + "method from %s" % ", ".join(HASH_METHODS) + ) + operation = "sign" + operation_past = "signature" + operation_progressive = "Signing" key_class = rsa.PrivateKey expected_cli_args = 2 - output_help = ('Name of the file to write the signature to. Written ' - 'to stdout if this option is not present.') + output_help = ( + "Name of the file to write the signature to. Written " + "to stdout if this option is not present." + ) - def perform_operation(self, indata: bytes, priv_key: rsa.key.AbstractKey, - cli_args: Indexable) -> bytes: + def perform_operation( + self, indata: bytes, priv_key: rsa.key.AbstractKey, cli_args: Indexable + ) -> bytes: """Signs files.""" assert isinstance(priv_key, rsa.key.PrivateKey) hash_method = cli_args[1] if hash_method not in HASH_METHODS: - raise SystemExit('Invalid hash method, choose one of %s' % - ', '.join(HASH_METHODS)) + raise SystemExit("Invalid hash method, choose one of %s" % ", ".join(HASH_METHODS)) return rsa.sign(indata, priv_key, hash_method) @@ -257,33 +283,36 @@ class SignOperation(CryptoOperation): class VerifyOperation(CryptoOperation): """Verify a signature.""" - keyname = 'public' - usage = 'usage: %%prog [options] public_key signature_file' - description = ('Verifies a signature, exits with status 0 upon success, ' - 'prints an error message and exits with status 1 upon error.') - operation = 'verify' - operation_past = 'verified' - operation_progressive = 'Verifying' + keyname = "public" + usage = "usage: %%prog [options] public_key signature_file" + description = ( + "Verifies a signature, exits with status 0 upon success, " + "prints an error message and exits with status 1 upon error." + ) + operation = "verify" + operation_past = "verified" + operation_progressive = "Verifying" key_class = rsa.PublicKey expected_cli_args = 2 has_output = False - def perform_operation(self, indata: bytes, pub_key: rsa.key.AbstractKey, - cli_args: Indexable) -> None: + def perform_operation( + self, indata: bytes, pub_key: rsa.key.AbstractKey, cli_args: Indexable + ) -> None: """Verifies files.""" assert isinstance(pub_key, rsa.key.PublicKey) signature_file = cli_args[1] - with open(signature_file, 'rb') as sigfile: + with open(signature_file, "rb") as sigfile: signature = sigfile.read() try: rsa.verify(indata, signature, pub_key) except rsa.VerificationError as ex: - raise SystemExit('Verification failed.') from ex + raise SystemExit("Verification failed.") from ex - print('Verification OK', file=sys.stderr) + print("Verification OK", file=sys.stderr) encrypt = EncryptOperation() diff --git a/rsa/common.py b/rsa/common.py index b5a966a..59d5e62 100644 --- a/rsa/common.py +++ b/rsa/common.py @@ -18,7 +18,7 @@ import typing class NotRelativePrimeError(ValueError): - def __init__(self, a: int, b: int, d: int, msg: str = '') -> None: + def __init__(self, a: int, b: int, d: int, msg: str = "") -> None: super().__init__(msg or "%d and %d are not relatively prime, divider=%i" % (a, b, d)) self.a = a self.b = b @@ -50,7 +50,7 @@ def bit_size(num: int) -> int: try: return num.bit_length() except AttributeError as ex: - raise TypeError('bit_size(num) only supports integers, not %r' % type(num)) from ex + raise TypeError("bit_size(num) only supports integers, not %r" % type(num)) from ex def byte_size(number: int) -> int: @@ -103,8 +103,7 @@ def ceil_div(num: int, div: int) -> int: def extended_gcd(a: int, b: int) -> typing.Tuple[int, int, int]: - """Returns a tuple (r, i, j) such that r = gcd(a, b) = ia + jb - """ + """Returns a tuple (r, i, j) such that r = gcd(a, b) = ia + jb""" # r = gcd(a,b) i = multiplicitive inverse of a mod b # or j = multiplicitive inverse of b mod a # Neg return values for i or j are made positive mod b or a respectively @@ -179,7 +178,7 @@ def crt(a_values: typing.Iterable[int], modulo_values: typing.Iterable[int]) -> return x -if __name__ == '__main__': +if __name__ == "__main__": import doctest doctest.testmod() diff --git a/rsa/core.py b/rsa/core.py index 23032e3..84ed3f8 100644 --- a/rsa/core.py +++ b/rsa/core.py @@ -23,18 +23,18 @@ def assert_int(var: int, name: str) -> None: if isinstance(var, int): return - raise TypeError('%s should be an integer, not %s' % (name, var.__class__)) + raise TypeError("%s should be an integer, not %s" % (name, var.__class__)) def encrypt_int(message: int, ekey: int, n: int) -> int: """Encrypts a message using encryption key 'ekey', working modulo n""" - assert_int(message, 'message') - assert_int(ekey, 'ekey') - assert_int(n, 'n') + assert_int(message, "message") + assert_int(ekey, "ekey") + assert_int(n, "n") if message < 0: - raise ValueError('Only non-negative numbers are supported') + raise ValueError("Only non-negative numbers are supported") if message > n: raise OverflowError("The message %i is too long for n=%i" % (message, n)) @@ -45,9 +45,9 @@ def encrypt_int(message: int, ekey: int, n: int) -> int: def decrypt_int(cyphertext: int, dkey: int, n: int) -> int: """Decrypts a cypher text using the decryption key 'dkey', working modulo n""" - assert_int(cyphertext, 'cyphertext') - assert_int(dkey, 'dkey') - assert_int(n, 'n') + assert_int(cyphertext, "cyphertext") + assert_int(dkey, "dkey") + assert_int(n, "n") message = pow(cyphertext, dkey, n) return message @@ -50,7 +50,7 @@ DEFAULT_EXPONENT = 65537 class AbstractKey: """Abstract superclass for private and public keys.""" - __slots__ = ('n', 'e', 'blindfac', 'blindfac_inverse', 'mutex') + __slots__ = ("n", "e", "blindfac", "blindfac_inverse", "mutex") def __init__(self, n: int, e: int) -> None: self.n = n @@ -64,7 +64,7 @@ class AbstractKey: self.mutex = threading.Lock() @classmethod - def _load_pkcs1_pem(cls, keyfile: bytes) -> 'AbstractKey': + def _load_pkcs1_pem(cls, keyfile: bytes) -> "AbstractKey": """Loads a key in PKCS#1 PEM format, implement in a subclass. :param keyfile: contents of a PEM-encoded file that contains @@ -76,7 +76,7 @@ class AbstractKey: """ @classmethod - def _load_pkcs1_der(cls, keyfile: bytes) -> 'AbstractKey': + def _load_pkcs1_der(cls, keyfile: bytes) -> "AbstractKey": """Loads a key in PKCS#1 PEM format, implement in a subclass. :param keyfile: contents of a DER-encoded file that contains @@ -102,7 +102,7 @@ class AbstractKey: """ @classmethod - def load_pkcs1(cls, keyfile: bytes, format: str = 'PEM') -> 'AbstractKey': + def load_pkcs1(cls, keyfile: bytes, format: str = "PEM") -> "AbstractKey": """Loads a key in PKCS#1 DER or PEM format. :param keyfile: contents of a DER- or PEM-encoded file that contains @@ -116,27 +116,28 @@ class AbstractKey: """ methods = { - 'PEM': cls._load_pkcs1_pem, - 'DER': cls._load_pkcs1_der, + "PEM": cls._load_pkcs1_pem, + "DER": cls._load_pkcs1_der, } method = cls._assert_format_exists(format, methods) return method(keyfile) @staticmethod - def _assert_format_exists(file_format: str, methods: typing.Mapping[str, typing.Callable]) \ - -> typing.Callable: - """Checks whether the given file format exists in 'methods'. - """ + def _assert_format_exists( + file_format: str, methods: typing.Mapping[str, typing.Callable] + ) -> typing.Callable: + """Checks whether the given file format exists in 'methods'.""" try: return methods[file_format] except KeyError as ex: - formats = ', '.join(sorted(methods.keys())) - raise ValueError('Unsupported format: %r, try one of %s' % (file_format, - formats)) from ex + formats = ", ".join(sorted(methods.keys())) + raise ValueError( + "Unsupported format: %r, try one of %s" % (file_format, formats) + ) from ex - def save_pkcs1(self, format: str = 'PEM') -> bytes: + def save_pkcs1(self, format: str = "PEM") -> bytes: """Saves the key in PKCS#1 DER or PEM format. :param format: the format to save; 'PEM' or 'DER' @@ -146,8 +147,8 @@ class AbstractKey: """ methods = { - 'PEM': self._save_pkcs1_pem, - 'DER': self._save_pkcs1_der, + "PEM": self._save_pkcs1_pem, + "DER": self._save_pkcs1_der, } method = self._assert_format_exists(format, methods) @@ -186,7 +187,7 @@ class AbstractKey: blind_r = rsa.randnum.randint(self.n - 1) if rsa.prime.are_relatively_prime(self.n, blind_r): return blind_r - raise RuntimeError('unable to find blinding factor') + raise RuntimeError("unable to find blinding factor") def _update_blinding_factor(self) -> typing.Tuple[int, int]: """Update blinding factors. @@ -212,6 +213,7 @@ class AbstractKey: return self.blindfac, self.blindfac_inverse + class PublicKey(AbstractKey): """Represents a public RSA key. @@ -236,13 +238,13 @@ class PublicKey(AbstractKey): """ - __slots__ = ('n', 'e') + __slots__ = ("n", "e") def __getitem__(self, key: str) -> int: return getattr(self, key) def __repr__(self) -> str: - return 'PublicKey(%i, %i)' % (self.n, self.e) + return "PublicKey(%i, %i)" % (self.n, self.e) def __getstate__(self) -> typing.Tuple[int, int]: """Returns the key as tuple for pickling.""" @@ -269,7 +271,7 @@ class PublicKey(AbstractKey): return hash((self.n, self.e)) @classmethod - def _load_pkcs1_der(cls, keyfile: bytes) -> 'PublicKey': + def _load_pkcs1_der(cls, keyfile: bytes) -> "PublicKey": """Loads a key in PKCS#1 DER format. :param keyfile: contents of a DER-encoded file that contains the public @@ -293,7 +295,7 @@ class PublicKey(AbstractKey): from rsa.asn1 import AsnPubKey (priv, _) = decoder.decode(keyfile, asn1Spec=AsnPubKey()) - return cls(n=int(priv['modulus']), e=int(priv['publicExponent'])) + return cls(n=int(priv["modulus"]), e=int(priv["publicExponent"])) def _save_pkcs1_der(self) -> bytes: """Saves the public key in PKCS#1 DER format. @@ -307,13 +309,13 @@ class PublicKey(AbstractKey): # Create the ASN object asn_key = AsnPubKey() - asn_key.setComponentByName('modulus', self.n) - asn_key.setComponentByName('publicExponent', self.e) + asn_key.setComponentByName("modulus", self.n) + asn_key.setComponentByName("publicExponent", self.e) return encoder.encode(asn_key) @classmethod - def _load_pkcs1_pem(cls, keyfile: bytes) -> 'PublicKey': + def _load_pkcs1_pem(cls, keyfile: bytes) -> "PublicKey": """Loads a PKCS#1 PEM-encoded public key file. The contents of the file before the "-----BEGIN RSA PUBLIC KEY-----" and @@ -324,7 +326,7 @@ class PublicKey(AbstractKey): :return: a PublicKey object """ - der = rsa.pem.load_pem(keyfile, 'RSA PUBLIC KEY') + der = rsa.pem.load_pem(keyfile, "RSA PUBLIC KEY") return cls._load_pkcs1_der(der) def _save_pkcs1_pem(self) -> bytes: @@ -335,10 +337,10 @@ class PublicKey(AbstractKey): """ der = self._save_pkcs1_der() - return rsa.pem.save_pem(der, 'RSA PUBLIC KEY') + return rsa.pem.save_pem(der, "RSA PUBLIC KEY") @classmethod - def load_pkcs1_openssl_pem(cls, keyfile: bytes) -> 'PublicKey': + def load_pkcs1_openssl_pem(cls, keyfile: bytes) -> "PublicKey": """Loads a PKCS#1.5 PEM-encoded public key file from OpenSSL. These files can be recognised in that they start with BEGIN PUBLIC KEY @@ -353,11 +355,11 @@ class PublicKey(AbstractKey): :return: a PublicKey object """ - der = rsa.pem.load_pem(keyfile, 'PUBLIC KEY') + der = rsa.pem.load_pem(keyfile, "PUBLIC KEY") return cls.load_pkcs1_openssl_der(der) @classmethod - def load_pkcs1_openssl_der(cls, keyfile: bytes) -> 'PublicKey': + def load_pkcs1_openssl_der(cls, keyfile: bytes) -> "PublicKey": """Loads a PKCS#1 DER-encoded public key file from OpenSSL. :param keyfile: contents of a DER-encoded file that contains the public @@ -371,10 +373,10 @@ class PublicKey(AbstractKey): (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) - if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'): + if keyinfo["header"]["oid"] != univ.ObjectIdentifier("1.2.840.113549.1.1.1"): raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") - return cls._load_pkcs1_der(keyinfo['key'][1:]) + return cls._load_pkcs1_der(keyinfo["key"][1:]) class PrivateKey(AbstractKey): @@ -401,7 +403,7 @@ class PrivateKey(AbstractKey): """ - __slots__ = ('n', 'e', 'd', 'p', 'q', 'exp1', 'exp2', 'coef') + __slots__ = ("n", "e", "d", "p", "q", "exp1", "exp2", "coef") def __init__(self, n: int, e: int, d: int, p: int, q: int) -> None: AbstractKey.__init__(self, n, e) @@ -418,7 +420,13 @@ class PrivateKey(AbstractKey): return getattr(self, key) def __repr__(self) -> str: - return 'PrivateKey(%i, %i, %i, %i, %i)' % (self.n, self.e, self.d, self.p, self.q) + return "PrivateKey(%i, %i, %i, %i, %i)" % ( + self.n, + self.e, + self.d, + self.p, + self.q, + ) def __getstate__(self) -> typing.Tuple[int, int, int, int, int, int, int, int]: """Returns the key as tuple for pickling.""" @@ -436,14 +444,16 @@ class PrivateKey(AbstractKey): if not isinstance(other, PrivateKey): return False - return (self.n == other.n and - self.e == other.e and - self.d == other.d and - self.p == other.p and - self.q == other.q and - self.exp1 == other.exp1 and - self.exp2 == other.exp2 and - self.coef == other.coef) + return ( + self.n == other.n + and self.e == other.e + and self.d == other.d + and self.p == other.p + and self.q == other.q + and self.exp1 == other.exp1 + and self.exp2 == other.exp2 + and self.coef == other.coef + ) def __ne__(self, other: typing.Any) -> bool: return not (self == other) @@ -481,7 +491,7 @@ class PrivateKey(AbstractKey): return self.unblind(encrypted, blindfac_inverse) @classmethod - def _load_pkcs1_der(cls, keyfile: bytes) -> 'PrivateKey': + def _load_pkcs1_der(cls, keyfile: bytes) -> "PrivateKey": """Loads a key in PKCS#1 DER format. :param keyfile: contents of a DER-encoded file that contains the private @@ -503,6 +513,7 @@ class PrivateKey(AbstractKey): """ from pyasn1.codec.der import decoder + (priv, _) = decoder.decode(keyfile) # ASN.1 contents of DER encoded private key: @@ -521,7 +532,7 @@ class PrivateKey(AbstractKey): # } if priv[0] != 0: - raise ValueError('Unable to read this file, version %s != 0' % priv[0]) + raise ValueError("Unable to read this file, version %s != 0" % priv[0]) as_ints = map(int, priv[1:6]) key = cls(*as_ints) @@ -530,9 +541,9 @@ class PrivateKey(AbstractKey): if (key.exp1, key.exp2, key.coef) != (exp1, exp2, coef): warnings.warn( - 'You have provided a malformed keyfile. Either the exponents ' - 'or the coefficient are incorrect. Using the correct values ' - 'instead.', + "You have provided a malformed keyfile. Either the exponents " + "or the coefficient are incorrect. Using the correct values " + "instead.", UserWarning, ) @@ -550,33 +561,33 @@ class PrivateKey(AbstractKey): class AsnPrivKey(univ.Sequence): componentType = namedtype.NamedTypes( - namedtype.NamedType('version', univ.Integer()), - namedtype.NamedType('modulus', univ.Integer()), - namedtype.NamedType('publicExponent', univ.Integer()), - namedtype.NamedType('privateExponent', univ.Integer()), - namedtype.NamedType('prime1', univ.Integer()), - namedtype.NamedType('prime2', univ.Integer()), - namedtype.NamedType('exponent1', univ.Integer()), - namedtype.NamedType('exponent2', univ.Integer()), - namedtype.NamedType('coefficient', univ.Integer()), + namedtype.NamedType("version", univ.Integer()), + namedtype.NamedType("modulus", univ.Integer()), + namedtype.NamedType("publicExponent", univ.Integer()), + namedtype.NamedType("privateExponent", univ.Integer()), + namedtype.NamedType("prime1", univ.Integer()), + namedtype.NamedType("prime2", univ.Integer()), + namedtype.NamedType("exponent1", univ.Integer()), + namedtype.NamedType("exponent2", univ.Integer()), + namedtype.NamedType("coefficient", univ.Integer()), ) # Create the ASN object asn_key = AsnPrivKey() - asn_key.setComponentByName('version', 0) - asn_key.setComponentByName('modulus', self.n) - asn_key.setComponentByName('publicExponent', self.e) - asn_key.setComponentByName('privateExponent', self.d) - asn_key.setComponentByName('prime1', self.p) - asn_key.setComponentByName('prime2', self.q) - asn_key.setComponentByName('exponent1', self.exp1) - asn_key.setComponentByName('exponent2', self.exp2) - asn_key.setComponentByName('coefficient', self.coef) + asn_key.setComponentByName("version", 0) + asn_key.setComponentByName("modulus", self.n) + asn_key.setComponentByName("publicExponent", self.e) + asn_key.setComponentByName("privateExponent", self.d) + asn_key.setComponentByName("prime1", self.p) + asn_key.setComponentByName("prime2", self.q) + asn_key.setComponentByName("exponent1", self.exp1) + asn_key.setComponentByName("exponent2", self.exp2) + asn_key.setComponentByName("coefficient", self.coef) return encoder.encode(asn_key) @classmethod - def _load_pkcs1_pem(cls, keyfile: bytes) -> 'PrivateKey': + def _load_pkcs1_pem(cls, keyfile: bytes) -> "PrivateKey": """Loads a PKCS#1 PEM-encoded private key file. The contents of the file before the "-----BEGIN RSA PRIVATE KEY-----" and @@ -588,7 +599,7 @@ class PrivateKey(AbstractKey): :return: a PrivateKey object """ - der = rsa.pem.load_pem(keyfile, b'RSA PRIVATE KEY') + der = rsa.pem.load_pem(keyfile, b"RSA PRIVATE KEY") return cls._load_pkcs1_der(der) def _save_pkcs1_pem(self) -> bytes: @@ -599,12 +610,14 @@ class PrivateKey(AbstractKey): """ der = self._save_pkcs1_der() - return rsa.pem.save_pem(der, b'RSA PRIVATE KEY') + return rsa.pem.save_pem(der, b"RSA PRIVATE KEY") -def find_p_q(nbits: int, - getprime_func: typing.Callable[[int], int] = rsa.prime.getprime, - accurate: bool = True) -> typing.Tuple[int, int]: +def find_p_q( + nbits: int, + getprime_func: typing.Callable[[int], int] = rsa.prime.getprime, + accurate: bool = True, +) -> typing.Tuple[int, int]: """Returns a tuple of two different primes of nbits bits each. The resulting p * q has exacty 2 * nbits bits, and the returned p and q @@ -644,16 +657,16 @@ def find_p_q(nbits: int, qbits = nbits - shift # Choose the two initial primes - log.debug('find_p_q(%i): Finding p', nbits) + log.debug("find_p_q(%i): Finding p", nbits) p = getprime_func(pbits) - log.debug('find_p_q(%i): Finding q', nbits) + log.debug("find_p_q(%i): Finding q", nbits) q = getprime_func(qbits) def is_acceptable(p: int, q: int) -> bool: """Returns True iff p and q are acceptable: - - p and q differ - - (p * q) has the right nr of bits (when accurate=True) + - p and q differ + - (p * q) has the right nr of bits (when accurate=True) """ if p == q: @@ -701,13 +714,17 @@ def calculate_keys_custom_exponent(p: int, q: int, exponent: int) -> typing.Tupl d = rsa.common.inverse(exponent, phi_n) except rsa.common.NotRelativePrimeError as ex: raise rsa.common.NotRelativePrimeError( - exponent, phi_n, ex.d, - msg="e (%d) and phi_n (%d) are not relatively prime (divider=%i)" % - (exponent, phi_n, ex.d)) from ex + exponent, + phi_n, + ex.d, + msg="e (%d) and phi_n (%d) are not relatively prime (divider=%i)" + % (exponent, phi_n, ex.d), + ) from ex if (exponent * d) % phi_n != 1: - raise ValueError("e (%d) and d (%d) are not mult. inv. modulo " - "phi_n (%d)" % (exponent, d, phi_n)) + raise ValueError( + "e (%d) and d (%d) are not mult. inv. modulo " "phi_n (%d)" % (exponent, d, phi_n) + ) return exponent, d @@ -725,10 +742,12 @@ def calculate_keys(p: int, q: int) -> typing.Tuple[int, int]: return calculate_keys_custom_exponent(p, q, DEFAULT_EXPONENT) -def gen_keys(nbits: int, - getprime_func: typing.Callable[[int], int], - accurate: bool = True, - exponent: int = DEFAULT_EXPONENT) -> typing.Tuple[int, int, int, int]: +def gen_keys( + nbits: int, + getprime_func: typing.Callable[[int], int], + accurate: bool = True, + exponent: int = DEFAULT_EXPONENT, +) -> typing.Tuple[int, int, int, int]: """Generate RSA keys of nbits bits. Returns (p, q, e, d). Note: this can take a long time, depending on the key size. @@ -756,10 +775,12 @@ def gen_keys(nbits: int, return p, q, e, d -def newkeys(nbits: int, - accurate: bool = True, - poolsize: int = 1, - exponent: int = DEFAULT_EXPONENT) -> typing.Tuple[PublicKey, PrivateKey]: +def newkeys( + nbits: int, + accurate: bool = True, + poolsize: int = 1, + exponent: int = DEFAULT_EXPONENT, +) -> typing.Tuple[PublicKey, PrivateKey]: """Generates public and private keys, and returns them as (pub, priv). The public key is also known as the 'encryption key', and is a @@ -786,10 +807,10 @@ def newkeys(nbits: int, """ if nbits < 16: - raise ValueError('Key too small') + raise ValueError("Key too small") if poolsize < 1: - raise ValueError('Pool size (%i) should be >= 1' % poolsize) + raise ValueError("Pool size (%i) should be >= 1" % poolsize) # Determine which getprime function to use if poolsize > 1: @@ -797,6 +818,7 @@ def newkeys(nbits: int, def getprime_func(nbits: int) -> int: return parallel.getprime(nbits, poolsize=poolsize) + else: getprime_func = rsa.prime.getprime @@ -806,15 +828,12 @@ def newkeys(nbits: int, # Create the key objects n = p * q - return ( - PublicKey(n, e), - PrivateKey(n, e, d, p, q) - ) + return (PublicKey(n, e), PrivateKey(n, e, d, p, q)) -__all__ = ['PublicKey', 'PrivateKey', 'newkeys'] +__all__ = ["PublicKey", "PrivateKey", "newkeys"] -if __name__ == '__main__': +if __name__ == "__main__": import doctest try: @@ -824,8 +843,8 @@ if __name__ == '__main__': break if (count % 10 == 0 and count) or count == 1: - print('%i times' % count) + print("%i times" % count) except KeyboardInterrupt: - print('Aborted') + print("Aborted") else: - print('Doctests done') + print("Doctests done") diff --git a/rsa/parallel.py b/rsa/parallel.py index f9afedb..5020edb 100644 --- a/rsa/parallel.py +++ b/rsa/parallel.py @@ -62,8 +62,7 @@ def getprime(nbits: int, poolsize: int) -> int: # Create processes try: - procs = [mp.Process(target=_find_prime, args=(nbits, pipe_send)) - for _ in range(poolsize)] + procs = [mp.Process(target=_find_prime, args=(nbits, pipe_send)) for _ in range(poolsize)] # Start processes for p in procs: p.start() @@ -80,10 +79,10 @@ def getprime(nbits: int, poolsize: int) -> int: return result -__all__ = ['getprime'] +__all__ = ["getprime"] -if __name__ == '__main__': - print('Running doctests 1000x or until failure') +if __name__ == "__main__": + print("Running doctests 1000x or until failure") import doctest for count in range(100): @@ -92,6 +91,6 @@ if __name__ == '__main__': break if count % 10 == 0 and count: - print('%i times' % count) + print("%i times" % count) - print('Doctests done') + print("Doctests done") @@ -27,10 +27,12 @@ def _markers(pem_marker: FlexiText) -> typing.Tuple[bytes, bytes]: """ if not isinstance(pem_marker, bytes): - pem_marker = pem_marker.encode('ascii') + pem_marker = pem_marker.encode("ascii") - return (b'-----BEGIN ' + pem_marker + b'-----', - b'-----END ' + pem_marker + b'-----') + return ( + b"-----BEGIN " + pem_marker + b"-----", + b"-----END " + pem_marker + b"-----", + ) def _pem_lines(contents: bytes, pem_start: bytes, pem_end: bytes) -> typing.Iterator[bytes]: @@ -65,7 +67,7 @@ def _pem_lines(contents: bytes, pem_start: bytes, pem_end: bytes) -> typing.Iter break # Load fields - if b':' in line: + if b":" in line: continue yield line @@ -95,13 +97,13 @@ def load_pem(contents: FlexiText, pem_marker: FlexiText) -> bytes: # We want bytes, not text. If it's text, it can be converted to ASCII bytes. if not isinstance(contents, bytes): - contents = contents.encode('ascii') + contents = contents.encode("ascii") (pem_start, pem_end) = _markers(pem_marker) pem_lines = [line for line in _pem_lines(contents, pem_start, pem_end)] # Base64-decode the contents - pem = b''.join(pem_lines) + pem = b"".join(pem_lines) return base64.standard_b64decode(pem) @@ -119,14 +121,14 @@ def save_pem(contents: bytes, pem_marker: FlexiText) -> bytes: (pem_start, pem_end) = _markers(pem_marker) - b64 = base64.standard_b64encode(contents).replace(b'\n', b'') + b64 = base64.standard_b64encode(contents).replace(b"\n", b"") pem_lines = [pem_start] for block_start in range(0, len(b64), 64): - block = b64[block_start:block_start + 64] + block = b64[block_start : block_start + 64] pem_lines.append(block) pem_lines.append(pem_end) - pem_lines.append(b'') + pem_lines.append(b"") - return b'\n'.join(pem_lines) + return b"\n".join(pem_lines) diff --git a/rsa/pkcs1.py b/rsa/pkcs1.py index 9adad90..5992c7f 100644 --- a/rsa/pkcs1.py +++ b/rsa/pkcs1.py @@ -41,37 +41,41 @@ else: # ASN.1 codes that describe the hash algorithm used. HASH_ASN1 = { - 'MD5': b'\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10', - 'SHA-1': b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14', - 'SHA-224': b'\x30\x2d\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x04\x05\x00\x04\x1c', - 'SHA-256': b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20', - 'SHA-384': b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30', - 'SHA-512': b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40', + "MD5": b"\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10", + "SHA-1": b"\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14", + "SHA-224": b"\x30\x2d\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x04\x05\x00\x04\x1c", + "SHA-256": b"\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20", + "SHA-384": b"\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30", + "SHA-512": b"\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40", } HASH_METHODS: typing.Dict[str, typing.Callable[[], HashType]] = { - 'MD5': hashlib.md5, - 'SHA-1': hashlib.sha1, - 'SHA-224': hashlib.sha224, - 'SHA-256': hashlib.sha256, - 'SHA-384': hashlib.sha384, - 'SHA-512': hashlib.sha512, + "MD5": hashlib.md5, + "SHA-1": hashlib.sha1, + "SHA-224": hashlib.sha224, + "SHA-256": hashlib.sha256, + "SHA-384": hashlib.sha384, + "SHA-512": hashlib.sha512, } if sys.version_info >= (3, 6): # Python 3.6 introduced SHA3 support. - HASH_ASN1.update({ - 'SHA3-256': b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x08\x05\x00\x04\x20', - 'SHA3-384': b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x09\x05\x00\x04\x30', - 'SHA3-512': b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x0a\x05\x00\x04\x40', - }) - - HASH_METHODS.update({ - 'SHA3-256': hashlib.sha3_256, - 'SHA3-384': hashlib.sha3_384, - 'SHA3-512': hashlib.sha3_512, - }) + HASH_ASN1.update( + { + "SHA3-256": b"\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x08\x05\x00\x04\x20", + "SHA3-384": b"\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x09\x05\x00\x04\x30", + "SHA3-512": b"\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x0a\x05\x00\x04\x40", + } + ) + + HASH_METHODS.update( + { + "SHA3-256": hashlib.sha3_256, + "SHA3-384": hashlib.sha3_384, + "SHA3-512": hashlib.sha3_512, + } + ) class CryptoError(Exception): @@ -105,11 +109,13 @@ def _pad_for_encryption(message: bytes, target_length: int) -> bytes: msglength = len(message) if msglength > max_msglength: - raise OverflowError('%i bytes needed for message, but there is only' - ' space for %i' % (msglength, max_msglength)) + raise OverflowError( + "%i bytes needed for message, but there is only" + " space for %i" % (msglength, max_msglength) + ) # Get random padding - padding = b'' + padding = b"" padding_length = target_length - msglength - 3 # We remove 0-bytes, so we'll end up with less padding than we've asked for, @@ -121,15 +127,12 @@ def _pad_for_encryption(message: bytes, target_length: int) -> bytes: # after removing the 0-bytes. This increases the chance of getting # enough bytes, especially when needed_bytes is small new_padding = os.urandom(needed_bytes + 5) - new_padding = new_padding.replace(b'\x00', b'') + new_padding = new_padding.replace(b"\x00", b"") padding = padding + new_padding[:needed_bytes] assert len(padding) == padding_length - return b''.join([b'\x00\x02', - padding, - b'\x00', - message]) + return b"".join([b"\x00\x02", padding, b"\x00", message]) def _pad_for_signing(message: bytes, target_length: int) -> bytes: @@ -155,15 +158,14 @@ def _pad_for_signing(message: bytes, target_length: int) -> bytes: msglength = len(message) if msglength > max_msglength: - raise OverflowError('%i bytes needed for message, but there is only' - ' space for %i' % (msglength, max_msglength)) + raise OverflowError( + "%i bytes needed for message, but there is only" + " space for %i" % (msglength, max_msglength) + ) padding_length = target_length - msglength - 3 - return b''.join([b'\x00\x01', - padding_length * b'\xff', - b'\x00', - message]) + return b"".join([b"\x00\x01", padding_length * b"\xff", b"\x00", message]) def encrypt(message: bytes, pub_key: key.PublicKey) -> bytes: @@ -259,13 +261,13 @@ def decrypt(crypto: bytes, priv_key: key.PrivateKey) -> bytes: # integer). This fixes CVE-2020-13757. if len(crypto) > blocksize: # This is operating on public information, so doesn't need to be constant-time. - raise DecryptionError('Decryption failed') + raise DecryptionError("Decryption failed") # If we can't find the cleartext marker, decryption failed. - cleartext_marker_bad = not compare_digest(cleartext[:2], b'\x00\x02') + cleartext_marker_bad = not compare_digest(cleartext[:2], b"\x00\x02") # Find the 00 separator between the padding and the message - sep_idx = cleartext.find(b'\x00', 2) + sep_idx = cleartext.find(b"\x00", 2) # sep_idx indicates the position of the `\x00` separator that separates the # padding from the actual message. The padding should be at least 8 bytes @@ -276,9 +278,9 @@ def decrypt(crypto: bytes, priv_key: key.PrivateKey) -> bytes: anything_bad = cleartext_marker_bad | sep_idx_bad if anything_bad: - raise DecryptionError('Decryption failed') + raise DecryptionError("Decryption failed") - return cleartext[sep_idx + 1:] + return cleartext[sep_idx + 1 :] def sign_hash(hash_value: bytes, priv_key: key.PrivateKey, hash_method: str) -> bytes: @@ -299,7 +301,7 @@ def sign_hash(hash_value: bytes, priv_key: key.PrivateKey, hash_method: str) -> # Get the ASN1 code for this hash method if hash_method not in HASH_ASN1: - raise ValueError('Invalid hash method: %s' % hash_method) + raise ValueError("Invalid hash method: %s" % hash_method) asn1code = HASH_ASN1[hash_method] # Encrypt the hash with the private key @@ -365,11 +367,11 @@ def verify(message: bytes, signature: bytes, pub_key: key.PublicKey) -> str: expected = _pad_for_signing(cleartext, keylength) if len(signature) != keylength: - raise VerificationError('Verification failed') + raise VerificationError("Verification failed") # Compare with the signed one if expected != clearsig: - raise VerificationError('Verification failed') + raise VerificationError("Verification failed") return method_name @@ -426,7 +428,7 @@ def compute_hash(message: typing.Union[bytes, typing.BinaryIO], method_name: str """ if method_name not in HASH_METHODS: - raise ValueError('Invalid hash method: %s' % method_name) + raise ValueError("Invalid hash method: %s" % method_name) method = HASH_METHODS[method_name] hasher = method() @@ -434,7 +436,7 @@ def compute_hash(message: typing.Union[bytes, typing.BinaryIO], method_name: str if isinstance(message, bytes): hasher.update(message) else: - assert hasattr(message, 'read') and hasattr(message.read, '__call__') + assert hasattr(message, "read") and hasattr(message.read, "__call__") # read as 1K blocks for block in yield_fixedblocks(message, 1024): hasher.update(block) @@ -454,14 +456,21 @@ def _find_method_hash(clearsig: bytes) -> str: if asn1code in clearsig: return hashname - raise VerificationError('Verification failed') + raise VerificationError("Verification failed") -__all__ = ['encrypt', 'decrypt', 'sign', 'verify', - 'DecryptionError', 'VerificationError', 'CryptoError'] +__all__ = [ + "encrypt", + "decrypt", + "sign", + "verify", + "DecryptionError", + "VerificationError", + "CryptoError", +] -if __name__ == '__main__': - print('Running doctests 1000x or until failure') +if __name__ == "__main__": + print("Running doctests 1000x or until failure") import doctest for count in range(1000): @@ -470,6 +479,6 @@ if __name__ == '__main__': break if count % 100 == 0 and count: - print('%i times' % count) + print("%i times" % count) - print('Doctests done') + print("Doctests done") diff --git a/rsa/pkcs1_v2.py b/rsa/pkcs1_v2.py index ed616f5..d68b907 100644 --- a/rsa/pkcs1_v2.py +++ b/rsa/pkcs1_v2.py @@ -25,7 +25,7 @@ from rsa import ( ) -def mgf1(seed: bytes, length: int, hasher: str = 'SHA-1') -> bytes: +def mgf1(seed: bytes, length: int, hasher: str = "SHA-1") -> bytes: """ MGF1 is a Mask Generation Function based on a hash function. @@ -51,13 +51,13 @@ def mgf1(seed: bytes, length: int, hasher: str = 'SHA-1') -> bytes: hash_length = pkcs1.HASH_METHODS[hasher]().digest_size except KeyError as ex: raise ValueError( - 'Invalid `hasher` specified. Please select one of: {hash_list}'.format( - hash_list=', '.join(sorted(pkcs1.HASH_METHODS.keys())) + "Invalid `hasher` specified. Please select one of: {hash_list}".format( + hash_list=", ".join(sorted(pkcs1.HASH_METHODS.keys())) ) ) from ex # If l > 2^32(hLen), output "mask too long" and stop. - if length > (2**32 * hash_length): + if length > (2 ** 32 * hash_length): raise OverflowError( "Desired length should be at most 2**32 times the hasher's output " "length ({hash_length} for {hasher} function)".format( @@ -69,7 +69,7 @@ def mgf1(seed: bytes, length: int, hasher: str = 'SHA-1') -> bytes: # Looping `counter` from 0 to ceil(l / hLen)-1, build `output` based on the # hashes formed by (`seed` + C), being `C` an octet string of length 4 # generated by converting `counter` with the primitive I2OSP - output = b''.join( + output = b"".join( pkcs1.compute_hash( seed + transform.int2bytes(counter, fill_size=4), method_name=hasher, @@ -82,11 +82,11 @@ def mgf1(seed: bytes, length: int, hasher: str = 'SHA-1') -> bytes: __all__ = [ - 'mgf1', + "mgf1", ] -if __name__ == '__main__': - print('Running doctests 1000x or until failure') +if __name__ == "__main__": + print("Running doctests 1000x or until failure") import doctest for count in range(1000): @@ -95,6 +95,6 @@ if __name__ == '__main__': break if count % 100 == 0 and count: - print('%i times' % count) + print("%i times" % count) - print('Doctests done') + print("Doctests done") diff --git a/rsa/prime.py b/rsa/prime.py index 853aca5..d8980d0 100644 --- a/rsa/prime.py +++ b/rsa/prime.py @@ -21,7 +21,7 @@ Roberto Tamassia, 2002. import rsa.common import rsa.randnum -__all__ = ['getprime', 'are_relatively_prime'] +__all__ = ["getprime", "are_relatively_prime"] def gcd(p: int, q: int) -> int: @@ -183,8 +183,8 @@ def are_relatively_prime(a: int, b: int) -> bool: return d == 1 -if __name__ == '__main__': - print('Running doctests 1000x or until failure') +if __name__ == "__main__": + print("Running doctests 1000x or until failure") import doctest for count in range(1000): @@ -193,6 +193,6 @@ if __name__ == '__main__': break if count % 100 == 0 and count: - print('%i times' % count) + print("%i times" % count) - print('Doctests done') + print("Doctests done") diff --git a/rsa/randnum.py b/rsa/randnum.py index a5bb850..c65facd 100644 --- a/rsa/randnum.py +++ b/rsa/randnum.py @@ -37,15 +37,14 @@ def read_random_bits(nbits: int) -> bytes: # Add the remaining random bits if rbits > 0: randomvalue = ord(os.urandom(1)) - randomvalue >>= (8 - rbits) + randomvalue >>= 8 - rbits randomdata = struct.pack("B", randomvalue) + randomdata return randomdata def read_random_int(nbits: int) -> int: - """Reads a random integer of approximately nbits bits. - """ + """Reads a random integer of approximately nbits bits.""" randomdata = read_random_bits(nbits) value = transform.bytes2int(randomdata) diff --git a/rsa/transform.py b/rsa/transform.py index 03c4a77..c609b65 100644 --- a/rsa/transform.py +++ b/rsa/transform.py @@ -31,7 +31,7 @@ def bytes2int(raw_bytes: bytes) -> int: 8405007 """ - return int.from_bytes(raw_bytes, 'big', signed=False) + return int.from_bytes(raw_bytes, "big", signed=False) def int2bytes(number: int, fill_size: int = 0) -> bytes: @@ -61,12 +61,12 @@ def int2bytes(number: int, fill_size: int = 0) -> bytes: bytes_required = max(1, math.ceil(number.bit_length() / 8)) if fill_size > 0: - return number.to_bytes(fill_size, 'big') + return number.to_bytes(fill_size, "big") - return number.to_bytes(bytes_required, 'big') + return number.to_bytes(bytes_required, "big") -if __name__ == '__main__': +if __name__ == "__main__": import doctest doctest.testmod() diff --git a/rsa/util.py b/rsa/util.py index cb31c46..087caf8 100644 --- a/rsa/util.py +++ b/rsa/util.py @@ -24,36 +24,57 @@ def private_to_public() -> None: """Reads a private key and outputs the corresponding public key.""" # Parse the CLI options - parser = OptionParser(usage='usage: %prog [options]', - description='Reads a private key and outputs the ' - 'corresponding public key. Both private and public keys use ' - 'the format described in PKCS#1 v1.5') + parser = OptionParser( + usage="usage: %prog [options]", + description="Reads a private key and outputs the " + "corresponding public key. Both private and public keys use " + "the format described in PKCS#1 v1.5", + ) - parser.add_option('-i', '--input', dest='infilename', type='string', - help='Input filename. Reads from stdin if not specified') - parser.add_option('-o', '--output', dest='outfilename', type='string', - help='Output filename. Writes to stdout of not specified') + parser.add_option( + "-i", + "--input", + dest="infilename", + type="string", + help="Input filename. Reads from stdin if not specified", + ) + parser.add_option( + "-o", + "--output", + dest="outfilename", + type="string", + help="Output filename. Writes to stdout of not specified", + ) - parser.add_option('--inform', dest='inform', - help='key format of input - default PEM', - choices=('PEM', 'DER'), default='PEM') + parser.add_option( + "--inform", + dest="inform", + help="key format of input - default PEM", + choices=("PEM", "DER"), + default="PEM", + ) - parser.add_option('--outform', dest='outform', - help='key format of output - default PEM', - choices=('PEM', 'DER'), default='PEM') + parser.add_option( + "--outform", + dest="outform", + help="key format of output - default PEM", + choices=("PEM", "DER"), + default="PEM", + ) (cli, cli_args) = parser.parse_args(sys.argv) # Read the input data if cli.infilename: - print('Reading private key from %s in %s format' % - (cli.infilename, cli.inform), file=sys.stderr) - with open(cli.infilename, 'rb') as infile: + print( + "Reading private key from %s in %s format" % (cli.infilename, cli.inform), + file=sys.stderr, + ) + with open(cli.infilename, "rb") as infile: in_data = infile.read() else: - print('Reading private key from stdin in %s format' % cli.inform, - file=sys.stderr) - in_data = sys.stdin.read().encode('ascii') + print("Reading private key from stdin in %s format" % cli.inform, file=sys.stderr) + in_data = sys.stdin.read().encode("ascii") assert type(in_data) == bytes, type(in_data) @@ -65,11 +86,12 @@ def private_to_public() -> None: out_data = pub_key.save_pkcs1(cli.outform) if cli.outfilename: - print('Writing public key to %s in %s format' % - (cli.outfilename, cli.outform), file=sys.stderr) - with open(cli.outfilename, 'wb') as outfile: + print( + "Writing public key to %s in %s format" % (cli.outfilename, cli.outform), + file=sys.stderr, + ) + with open(cli.outfilename, "wb") as outfile: outfile.write(out_data) else: - print('Writing public key to stdout in %s format' % cli.outform, - file=sys.stderr) - sys.stdout.write(out_data.decode('ascii')) + print("Writing public key to stdout in %s format" % cli.outform, file=sys.stderr) + sys.stdout.write(out_data.decode("ascii")) |