From 65997b1761b3ccbaef84cbc4ffc24585717e8b02 Mon Sep 17 00:00:00 2001 From: Casey Deccio Date: Thu, 19 Nov 2020 09:51:29 -0700 Subject: Add support for DigestSign* and DigestVerify* Add support for DigestSign* and DigestVerify* OpenSSL functions, for use with ED25519, etc. Allow PKey to support non-digest algorithms, such as ED25519, but keep default behaviors. Include tests for both digest and non-digest algorithms. --- M2Crypto/EVP.py | 112 +++++++++++++++++++++++++++++++++++++++++-- SWIG/_evp.i | 128 ++++++++++++++++++++++++++++++++++++++++++++++++- tests/ec.pub2.pem | 5 ++ tests/ed25519.priv.pem | 3 ++ tests/ed25519.pub.pem | 3 ++ tests/ed25519.pub2.pem | 3 ++ tests/test_evp.py | 54 +++++++++++++++++++++ 7 files changed, 303 insertions(+), 5 deletions(-) create mode 100644 tests/ec.pub2.pem create mode 100644 tests/ed25519.priv.pem create mode 100644 tests/ed25519.pub.pem create mode 100644 tests/ed25519.pub2.pem diff --git a/M2Crypto/EVP.py b/M2Crypto/EVP.py index b21dec6..6e7d508 100644 --- a/M2Crypto/EVP.py +++ b/M2Crypto/EVP.py @@ -187,10 +187,13 @@ class PKey(object): def _set_context(self, md): # type: (str) -> None - mda = getattr(m2, md, None) # type: Optional[Callable] - if mda is None: - raise ValueError('unknown message digest', md) - self.md = mda() + if not md: + self.md = None + else: + mda = getattr(m2, md, None) # type: Optional[Callable] + if mda is None: + raise ValueError('unknown message digest', md) + self.md = mda() self.ctx = m2.md_ctx_new() # type: Context def reset_context(self, md='sha1'): @@ -259,6 +262,86 @@ class PKey(object): """ return m2.verify_final(self.ctx, sign, self.pkey) + def digest_sign_init(self): + # type: () -> None + """ + Initialise digest signing operation with self. + """ + if self.md is None: + m2.digest_sign_init(self.ctx, self.pkey) + else: + m2.digest_sign_init(self.ctx, None, self.md, None, self.pkey) + + def digest_sign_update(self, data): + # type: (bytes) -> None + """ + Feed data to digest signing operation. + + :param data: Data to be signed. + """ + m2.digest_sign_update(self.ctx, data) + + def digest_sign_final(self): + # type: () -> bytes + """ + Return signature. + + :return: The signature. + """ + return m2.digest_sign_final(self.ctx) + + def digest_sign(self, data): + # type: () -> bytes + """ + Return signature. + + :return: The signature. + """ + return m2.digest_sign(self.ctx, data) + + def digest_verify_init(self): + # type: () -> None + """ + Initialise verification operation with self. + """ + if self.md is None: + m2.digest_verify_init(self.ctx, self.pkey) + else: + m2.digest_verify_init(self.ctx, None, self.md, None, self.pkey) + + def digest_verify_update(self, data): + # type: (bytes) -> int + """ + Feed data to verification operation. + + :param data: Data to be verified. + :return: -1 on Python error, 1 for success, 0 for OpenSSL error + """ + return m2.digest_verify_update(self.ctx, data) + + def digest_verify_final(self, sign): + # type: (bytes) -> int + """ + Feed data to digest verification operation. + + :param sign: Signature to use for verification + :return: Result of verification: 1 for success, 0 for failure, -1 on + other error. + """ + return m2.digest_verify_final(self.ctx, sign) + + def digest_verify(self, sign, data): + # type: (bytes) -> int + """ + Return result of verification. + + :param sign: Signature to use for verification + :param data: Data to be verified. + :return: Result of verification: 1 for success, 0 for failure, -1 on + other error. + """ + return m2.digest_verify(self.ctx, sign, data) + def assign_rsa(self, rsa, capture=1): # type: (RSA.RSA, int) -> int """ @@ -397,6 +480,27 @@ def load_key(file, callback=util.passphrase_callback): return PKey(cptr, 1) +def load_key_pubkey(file, callback=util.passphrase_callback): + # type: (AnyStr, Callable) -> PKey + """ + Load an M2Crypto.EVP.PKey from a public key as a file. + + :param file: Name of file containing the key in PEM format. + + :param callback: A Python callable object that is invoked + to acquire a passphrase with which to protect the + key. + + :return: M2Crypto.EVP.PKey object. + """ + + with BIO.openfile(file, 'r') as bio: + cptr = m2.pkey_read_pem_pubkey(bio._ptr(), callback) + if cptr is None: + raise EVPError(Err.get_error()) + return PKey(cptr, 1) + + def load_key_bio(bio, callback=util.passphrase_callback): # type: (BIO.BIO, Callable) -> PKey """ diff --git a/SWIG/_evp.i b/SWIG/_evp.i index 9a3b67b..61f0f23 100644 --- a/SWIG/_evp.i +++ b/SWIG/_evp.i @@ -184,6 +184,10 @@ extern int EVP_PKEY_set1_RSA(EVP_PKEY *, RSA *); extern int EVP_SignInit(EVP_MD_CTX *, const EVP_MD *); %rename(verify_init) EVP_VerifyInit; extern int EVP_VerifyInit(EVP_MD_CTX *, const EVP_MD *); +%rename(digest_sign_init) EVP_DigestSignInit; +extern int EVP_DigestSignInit(EVP_MD_CTX *, EVP_PKEY_CTX **, const EVP_MD *, ENGINE *, EVP_PKEY *); +%rename(digest_verify_init) EVP_DigestVerifyInit; +extern int EVP_DigestVerifyInit(EVP_MD_CTX *, EVP_PKEY_CTX **, const EVP_MD *, ENGINE *, EVP_PKEY *); %rename(pkey_size) EVP_PKEY_size; extern int EVP_PKEY_size(EVP_PKEY *); @@ -546,7 +550,6 @@ int verify_update(EVP_MD_CTX *ctx, PyObject *blob) { return EVP_VerifyUpdate(ctx, buf, len); } - int verify_final(EVP_MD_CTX *ctx, PyObject *blob, EVP_PKEY *pkey) { unsigned char *kbuf; int len = 0; @@ -556,6 +559,129 @@ int verify_final(EVP_MD_CTX *ctx, PyObject *blob, EVP_PKEY *pkey) { return EVP_VerifyFinal(ctx, kbuf, len, pkey); } + +int digest_sign_init(EVP_MD_CTX *ctx, EVP_PKEY *pkey) { + return EVP_DigestSignInit(ctx, NULL, NULL, NULL, pkey); +} + +PyObject *digest_sign_update(EVP_MD_CTX *ctx, PyObject *blob) { + const void *buf; + int len = 0; + + if (m2_PyObject_AsReadBufferInt(blob, (const void **)&buf, &len) == -1) + return NULL; + + if (!EVP_DigestSignUpdate(ctx, buf, len)) { + m2_PyErr_Msg(_evp_err); + return NULL; + } + Py_RETURN_NONE; +} + +PyObject *digest_sign_final(EVP_MD_CTX *ctx) { + PyObject *ret; + unsigned char *sigbuf; + size_t siglen; + + if (!EVP_DigestSignFinal(ctx, NULL, &siglen)) { + m2_PyErr_Msg(_evp_err); + return NULL; + } + + sigbuf = (unsigned char*)OPENSSL_malloc(siglen); + if (!sigbuf) { + PyErr_SetString(PyExc_MemoryError, "digest_sign_final"); + return NULL; + } + + if (!EVP_DigestSignFinal(ctx, sigbuf, &siglen)) { + m2_PyErr_Msg(_evp_err); + OPENSSL_cleanse(sigbuf, siglen); + OPENSSL_free(sigbuf); + return NULL; + } + + ret = PyBytes_FromStringAndSize((char*)sigbuf, siglen); + + OPENSSL_cleanse(sigbuf, siglen); + OPENSSL_free(sigbuf); + return ret; +} + +PyObject *digest_sign(EVP_MD_CTX *ctx, PyObject *msg) { + PyObject *ret; + const void *msgbuf; + unsigned char *sigbuf; + int msglen = 0; + size_t siglen = 0; + + if (m2_PyObject_AsReadBufferInt(msg, (const void **)&msgbuf, &msglen) == -1) + return NULL; + + if (!EVP_DigestSign(ctx, NULL, &siglen, msgbuf, msglen)) { + m2_PyErr_Msg(_evp_err); + return NULL; + } + + sigbuf = (unsigned char*)OPENSSL_malloc(siglen); + if (!sigbuf) { + PyErr_SetString(PyExc_MemoryError, "digest_sign"); + return NULL; + } + + if (!EVP_DigestSign(ctx, sigbuf, &siglen, msgbuf, msglen)) { + m2_PyErr_Msg(_evp_err); + OPENSSL_cleanse(sigbuf, siglen); + OPENSSL_free(sigbuf); + return NULL; + } + + ret = PyBytes_FromStringAndSize((char*)sigbuf, siglen); + + OPENSSL_cleanse(sigbuf, siglen); + OPENSSL_free(sigbuf); + return ret; + +} + +int digest_verify_init(EVP_MD_CTX *ctx, EVP_PKEY *pkey) { + return EVP_DigestVerifyInit(ctx, NULL, NULL, NULL, pkey); +} + +int digest_verify_update(EVP_MD_CTX *ctx, PyObject *blob) { + const void *buf; + int len = 0; + + if (m2_PyObject_AsReadBufferInt(blob, (const void **)&buf, &len) == -1) + return -1; + + return EVP_DigestVerifyUpdate(ctx, buf, len); +} + +int digest_verify_final(EVP_MD_CTX *ctx, PyObject *blob) { + unsigned char *sigbuf; + int len = 0; + + if (m2_PyObject_AsReadBufferInt(blob, (const void **)&sigbuf, &len) == -1) + return -1; + + return EVP_DigestVerifyFinal(ctx, sigbuf, len); +} + +int digest_verify(EVP_MD_CTX *ctx, PyObject *sig, PyObject *msg) { + unsigned char *sigbuf; + unsigned char *msgbuf; + int siglen = 0; + int msglen = 0; + + if (m2_PyObject_AsReadBufferInt(sig, (const void **)&sigbuf, &siglen) == -1) + return -1; + + if (m2_PyObject_AsReadBufferInt(msg, (const void **)&msgbuf, &msglen) == -1) + return -1; + + return EVP_DigestVerify(ctx, sigbuf, siglen, msgbuf, msglen); +} %} %typemap(out) EVP_MD * { diff --git a/tests/ec.pub2.pem b/tests/ec.pub2.pem new file mode 100644 index 0000000..80e740f --- /dev/null +++ b/tests/ec.pub2.pem @@ -0,0 +1,5 @@ +-----BEGIN PUBLIC KEY----- +MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAEQdk1A7xe7OelJhoFfE7x0u3gFSACecUC +lM/9SqYiPusEpxI9XOe7te/8N/uavHd7z/GxnWSS7cTWuKUzkBgmggZkeVwz1a0a ++trEhzxxiEN50Puuc0ExtNFQcgvIa6en +-----END PUBLIC KEY----- diff --git a/tests/ed25519.priv.pem b/tests/ed25519.priv.pem new file mode 100644 index 0000000..6f764c4 --- /dev/null +++ b/tests/ed25519.priv.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIHYXZmLr4mS9wVx+PZVTNalGrxvuuS2SwrJqEmCUsYaC +-----END PRIVATE KEY----- diff --git a/tests/ed25519.pub.pem b/tests/ed25519.pub.pem new file mode 100644 index 0000000..e3c5d01 --- /dev/null +++ b/tests/ed25519.pub.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VwAyEAqVvZ55o7SU/yNEr/07AwMzxtm7teY6xhnhR9bDoWm2k= +-----END PUBLIC KEY----- diff --git a/tests/ed25519.pub2.pem b/tests/ed25519.pub2.pem new file mode 100644 index 0000000..557f28e --- /dev/null +++ b/tests/ed25519.pub2.pem @@ -0,0 +1,3 @@ +-----BEGIN PUBLIC KEY----- +MCowBQYDK2VwAyEANSn/xYZB3kS30u9Gxb7RriRu80oJYsQd6jmA3u7TaVY= +-----END PUBLIC KEY----- diff --git a/tests/test_evp.py b/tests/test_evp.py index fff7127..c98b50c 100644 --- a/tests/test_evp.py +++ b/tests/test_evp.py @@ -248,6 +248,60 @@ class EVPTestCase(unittest.TestCase): pubkey.verify_update(b'test message not') self.assertEqual(pubkey.verify_final(sig), 0) + @unittest.skipIf(m2.OPENSSL_VERSION_NUMBER < 0x10101000, + 'Relies on support for Ed25519 which was introduced in OpenSSL 1.1.1') + def test_digest_verify(self): + pkey = EVP.load_key('tests/ed25519.priv.pem') + pkey.reset_context(None) + pkey.digest_sign_init() + sig = pkey.digest_sign(b'test message') + + # OK + pkey = EVP.load_key_pubkey('tests/ed25519.pub.pem') + pkey.reset_context(None) + pkey.digest_verify_init() + self.assertEqual(pkey.digest_verify(sig, b'test message'), 1) + + # wrong public key + pkey = EVP.load_key_pubkey('tests/ed25519.pub2.pem') + pkey.reset_context(None) + pkey.digest_verify_init() + self.assertEqual(pkey.digest_verify(sig, b'test message'), 0) + + # wrong message + pkey = EVP.load_key_pubkey('tests/ed25519.pub.pem') + pkey.reset_context(None) + pkey.digest_verify_init() + self.assertEqual(pkey.digest_verify(sig, b'test message not'), 0) + + def test_digest_verify_final(self): + pkey = EVP.load_key('tests/ec.priv.pem') + pkey.reset_context('sha256') + pkey.digest_sign_init() + pkey.digest_sign_update(b'test message') + sig = pkey.digest_sign_final() + + # OK + pkey = EVP.load_key_pubkey('tests/ec.pub.pem') + pkey.reset_context('sha256') + pkey.digest_verify_init() + pkey.digest_verify_update(b'test message') + self.assertEqual(pkey.digest_verify_final(sig), 1) + + # wrong public key + pkey = EVP.load_key_pubkey('tests/ec.pub2.pem') + pkey.reset_context('sha256') + pkey.digest_verify_init() + pkey.digest_verify_update(b'test message') + self.assertEqual(pkey.digest_verify_final(sig), 0) + + # wrong message + pkey = EVP.load_key_pubkey('tests/ec.pub.pem') + pkey.reset_context('sha256') + pkey.digest_verify_init() + pkey.digest_verify_update(b'test message not') + self.assertEqual(pkey.digest_verify_final(sig), 0) + def test_load_bad(self): with self.assertRaises(BIO.BIOError): EVP.load_key('thisdoesnotexist-dfgh56789') -- cgit v1.2.1