diff options
author | Jean-Paul Calderone <exarkun@divmod.com> | 2011-05-20 20:10:39 -0400 |
---|---|---|
committer | Jean-Paul Calderone <exarkun@divmod.com> | 2011-05-20 20:10:39 -0400 |
commit | ee6532d8a4045f34fe77552ccdaa68dd43ccd2f6 (patch) | |
tree | f4c5b76696228e0f46cd84435b8c58063741586b | |
parent | 9c77167dee628f276ed495a570985e5c68e347cc (diff) | |
parent | 13d190b5294a3795edd4bcba5c9a2d0559c2ba0f (diff) | |
download | pyopenssl-ee6532d8a4045f34fe77552ccdaa68dd43ccd2f6.tar.gz |
Add a Connection method for inspecting the certificate chain.
-rw-r--r-- | ChangeLog | 6 | ||||
-rwxr-xr-x | OpenSSL/ssl/connection.c | 39 | ||||
-rw-r--r-- | OpenSSL/test/test_ssl.py | 196 | ||||
-rw-r--r-- | OpenSSL/tsafe.py | 2 | ||||
-rw-r--r-- | doc/pyOpenSSL.tex | 4 |
5 files changed, 183 insertions, 64 deletions
@@ -1,3 +1,9 @@ +2011-05-20 Jean-Paul Calderone <exarkun@twistedmatrix.com> + + * OpenSSL/ssl/connection.c, OpenSSL/test/test_ssl.py: Add a new + method to the Connection type, get_peer_cert_chain, for retrieving + the peer's certificate chain. + 2011-05-19 Jean-Paul Calderone <exarkun@twistedmatrix.com> * OpenSSL/crypto/x509.c, OpenSSL/test/test_crypto.py: Add a new diff --git a/OpenSSL/ssl/connection.c b/OpenSSL/ssl/connection.c index 5b304b1..a3ec0f0 100755 --- a/OpenSSL/ssl/connection.c +++ b/OpenSSL/ssl/connection.c @@ -1098,6 +1098,44 @@ ssl_Connection_get_peer_certificate(ssl_ConnectionObj *self, PyObject *args) } } +static char ssl_Connection_get_peer_cert_chain_doc[] = "\n\ +Retrieve the other side's certificate (if any)\n\ +\n\ +@return: A list of X509 instances giving the peer's certificate chain,\n\ + or None if it does not have one.\n\ +"; +static PyObject * +ssl_Connection_get_peer_cert_chain(ssl_ConnectionObj *self, PyObject *args) { + STACK_OF(X509) *sk; + PyObject *chain; + crypto_X509Obj *cert; + Py_ssize_t i; + + if (!PyArg_ParseTuple(args, ":get_peer_cert_chain")) { + return NULL; + } + + sk = SSL_get_peer_cert_chain(self->ssl); + if (sk != NULL) { + chain = PyList_New(sk_X509_num(sk)); + for (i = 0; i < sk_X509_num(sk); i++) { + cert = new_x509(sk_X509_value(sk, i), 1); + if (!cert) { + /* XXX Untested */ + Py_DECREF(chain); + return NULL; + } + CRYPTO_add(&cert->x509->references, 1, CRYPTO_LOCK_X509); + PyList_SET_ITEM(chain, i, (PyObject *)cert); + } + return chain; + } else { + Py_INCREF(Py_None); + return Py_None; + } + +} + static char ssl_Connection_want_read_doc[] = "\n\ Checks if more data has to be read from the transport layer to complete an\n\ operation.\n\ @@ -1175,6 +1213,7 @@ static PyMethodDef ssl_Connection_methods[] = ADD_METHOD(master_key), ADD_METHOD(sock_shutdown), ADD_METHOD(get_peer_certificate), + ADD_METHOD(get_peer_cert_chain), ADD_METHOD(want_read), ADD_METHOD(want_write), ADD_METHOD(set_accept_state), diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py index ff2e725..2761cec 100644 --- a/OpenSSL/test/test_ssl.py +++ b/OpenSSL/test/test_ssl.py @@ -12,7 +12,7 @@ from os import makedirs from os.path import join from unittest import main -from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1 +from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM from OpenSSL.crypto import PKey, X509, X509Extension from OpenSSL.crypto import dump_privatekey, load_privatekey from OpenSSL.crypto import dump_certificate, load_certificate @@ -22,14 +22,19 @@ from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE -from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE -from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version +from OpenSSL.SSL import ( + VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE, VERIFY_NONE) +from OpenSSL.SSL import ( + Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version) from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType from OpenSSL.test.util import TestCase, bytes, b -from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM -from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem -from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem +from OpenSSL.test.test_crypto import ( + cleartextCertificatePEM, cleartextPrivateKeyPEM) +from OpenSSL.test.test_crypto import ( + client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, + root_cert_pem) + try: from OpenSSL.SSL import OP_NO_QUERY_MTU except ImportError: @@ -63,6 +68,7 @@ MBYCEQCobsg29c9WZP/54oAPcwiDAgEC def verify_cb(conn, cert, errnum, depth, ok): return ok + def socket_pair(): """ Establish and return a pair of network sockets connected to each other. @@ -105,6 +111,60 @@ def handshake(client, server): conns.remove(conn) +def _create_certificate_chain(): + """ + Construct and return a chain of certificates. + + 1. A new self-signed certificate authority certificate (cacert) + 2. A new intermediate certificate signed by cacert (icert) + 3. A new server certificate signed by icert (scert) + """ + caext = X509Extension(b('basicConstraints'), False, b('CA:true')) + + # Step 1 + cakey = PKey() + cakey.generate_key(TYPE_RSA, 512) + cacert = X509() + cacert.get_subject().commonName = "Authority Certificate" + cacert.set_issuer(cacert.get_subject()) + cacert.set_pubkey(cakey) + cacert.set_notBefore(b("20000101000000Z")) + cacert.set_notAfter(b("20200101000000Z")) + cacert.add_extensions([caext]) + cacert.set_serial_number(0) + cacert.sign(cakey, "sha1") + + # Step 2 + ikey = PKey() + ikey.generate_key(TYPE_RSA, 512) + icert = X509() + icert.get_subject().commonName = "Intermediate Certificate" + icert.set_issuer(cacert.get_subject()) + icert.set_pubkey(ikey) + icert.set_notBefore(b("20000101000000Z")) + icert.set_notAfter(b("20200101000000Z")) + icert.add_extensions([caext]) + icert.set_serial_number(0) + icert.sign(cakey, "sha1") + + # Step 3 + skey = PKey() + skey.generate_key(TYPE_RSA, 512) + scert = X509() + scert.get_subject().commonName = "Server Certificate" + scert.set_issuer(icert.get_subject()) + scert.set_pubkey(skey) + scert.set_notBefore(b("20000101000000Z")) + scert.set_notAfter(b("20200101000000Z")) + scert.add_extensions([ + X509Extension(b('basicConstraints'), True, b('CA:false'))]) + scert.set_serial_number(0) + scert.sign(ikey, "sha1") + + return [(cakey, cacert), (ikey, icert), (skey, scert)] + + + class _LoopbackMixin: """ Helper mixin which defines methods for creating a connected socket pair and @@ -150,7 +210,7 @@ class _LoopbackMixin: # Give the side a chance to generate some more bytes, or # succeed. try: - bytes = read.recv(2 ** 16) + data = read.recv(2 ** 16) except WantReadError: # It didn't succeed, so we'll hope it generated some # output. @@ -158,7 +218,7 @@ class _LoopbackMixin: else: # It did succeed, so we'll stop now and let the caller deal # with it. - return (read, bytes) + return (read, data) while True: # Keep copying as long as there's more stuff there. @@ -175,6 +235,7 @@ class _LoopbackMixin: write.bio_write(dirty) + class VersionTests(TestCase): """ Tests for version information exposed by @@ -638,59 +699,6 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object()) - def _create_certificate_chain(self): - """ - Construct and return a chain of certificates. - - 1. A new self-signed certificate authority certificate (cacert) - 2. A new intermediate certificate signed by cacert (icert) - 3. A new server certificate signed by icert (scert) - """ - caext = X509Extension(b('basicConstraints'), False, b('CA:true')) - - # Step 1 - cakey = PKey() - cakey.generate_key(TYPE_RSA, 512) - cacert = X509() - cacert.get_subject().commonName = "Authority Certificate" - cacert.set_issuer(cacert.get_subject()) - cacert.set_pubkey(cakey) - cacert.set_notBefore(b("20000101000000Z")) - cacert.set_notAfter(b("20200101000000Z")) - cacert.add_extensions([caext]) - cacert.set_serial_number(0) - cacert.sign(cakey, "sha1") - - # Step 2 - ikey = PKey() - ikey.generate_key(TYPE_RSA, 512) - icert = X509() - icert.get_subject().commonName = "Intermediate Certificate" - icert.set_issuer(cacert.get_subject()) - icert.set_pubkey(ikey) - icert.set_notBefore(b("20000101000000Z")) - icert.set_notAfter(b("20200101000000Z")) - icert.add_extensions([caext]) - icert.set_serial_number(0) - icert.sign(cakey, "sha1") - - # Step 3 - skey = PKey() - skey.generate_key(TYPE_RSA, 512) - scert = X509() - scert.get_subject().commonName = "Server Certificate" - scert.set_issuer(icert.get_subject()) - scert.set_pubkey(skey) - scert.set_notBefore(b("20000101000000Z")) - scert.set_notAfter(b("20200101000000Z")) - scert.add_extensions([ - X509Extension(b('basicConstraints'), True, b('CA:false'))]) - scert.set_serial_number(0) - scert.sign(ikey, "sha1") - - return [(cakey, cacert), (ikey, icert), (skey, scert)] - - def _handshake_test(self, serverContext, clientContext): """ Verify that a client and server created with the given contexts can @@ -726,7 +734,7 @@ class ContextTests(TestCase, _LoopbackMixin): to it with a client which trusts cacert and requires verification to succeed. """ - chain = self._create_certificate_chain() + chain = _create_certificate_chain() [(cakey, cacert), (ikey, icert), (skey, scert)] = chain # Dump the CA certificate to a file because that's the only way to load @@ -767,7 +775,7 @@ class ContextTests(TestCase, _LoopbackMixin): to it with a client which trusts cacert and requires verification to succeed. """ - chain = self._create_certificate_chain() + chain = _create_certificate_chain() [(cakey, cacert), (ikey, icert), (skey, scert)] = chain # Write out the chain file. @@ -1095,6 +1103,68 @@ class ConnectionTests(TestCase, _LoopbackMixin): self.assertRaises(NotImplementedError, conn.makefile) + def test_get_peer_cert_chain_wrong_args(self): + """ + L{Connection.get_peer_cert_chain} raises L{TypeError} if called with any + arguments. + """ + conn = Connection(Context(TLSv1_METHOD), None) + self.assertRaises(TypeError, conn.get_peer_cert_chain, 1) + self.assertRaises(TypeError, conn.get_peer_cert_chain, "foo") + self.assertRaises(TypeError, conn.get_peer_cert_chain, object()) + self.assertRaises(TypeError, conn.get_peer_cert_chain, []) + + + def test_get_peer_cert_chain(self): + """ + L{Connection.get_peer_cert_chain} returns a list of certificates which + the connected server returned for the certification verification. + """ + chain = _create_certificate_chain() + [(cakey, cacert), (ikey, icert), (skey, scert)] = chain + + serverContext = Context(TLSv1_METHOD) + serverContext.use_privatekey(skey) + serverContext.use_certificate(scert) + serverContext.add_extra_chain_cert(icert) + serverContext.add_extra_chain_cert(cacert) + server = Connection(serverContext, None) + server.set_accept_state() + + # Create the client + clientContext = Context(TLSv1_METHOD) + clientContext.set_verify(VERIFY_NONE, verify_cb) + client = Connection(clientContext, None) + client.set_connect_state() + + self._interactInMemory(client, server) + + chain = client.get_peer_cert_chain() + self.assertEqual(len(chain), 3) + self.assertEqual( + "Server Certificate", chain[0].get_subject().CN) + self.assertEqual( + "Intermediate Certificate", chain[1].get_subject().CN) + self.assertEqual( + "Authority Certificate", chain[2].get_subject().CN) + + + def test_get_peer_cert_chain_none(self): + """ + L{Connection.get_peer_cert_chain} returns C{None} if the peer sends no + certificate chain. + """ + ctx = Context(TLSv1_METHOD) + ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) + ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) + server = Connection(ctx, None) + server.set_accept_state() + client = Connection(Context(TLSv1_METHOD), None) + client.set_connect_state() + self._interactInMemory(client, server) + self.assertIdentical(None, server.get_peer_cert_chain()) + + class ConnectionGetCipherListTests(TestCase): """ diff --git a/OpenSSL/tsafe.py b/OpenSSL/tsafe.py index fe4b75f..9d7ad2f 100644 --- a/OpenSSL/tsafe.py +++ b/OpenSSL/tsafe.py @@ -16,7 +16,7 @@ class Connection: 'setblocking', 'fileno', 'shutdown', 'close', 'get_cipher_list', 'getpeername', 'getsockname', 'getsockopt', 'setsockopt', 'makefile', 'get_app_data', 'set_app_data', 'state_string', - 'sock_shutdown', 'get_peer_certificate', 'want_read', + 'sock_shutdown', 'get_peer_certificate', 'get_peer_cert_chain', 'want_read', 'want_write', 'set_connect_state', 'set_accept_state', 'connect_ex', 'sendall'): exec("""def %s(self, *args): diff --git a/doc/pyOpenSSL.tex b/doc/pyOpenSSL.tex index fa7638d..93213f9 100644 --- a/doc/pyOpenSSL.tex +++ b/doc/pyOpenSSL.tex @@ -1203,6 +1203,10 @@ Retrieve the Context object associated with this Connection. Retrieve the other side's certificate (if any) \end{methoddesc} +\begin{methoddesc}[Connection]{get_peer_cert_chain}{} +Retrieve the tuple of the other side's certificate chain (if any) +\end{methoddesc} + \begin{methoddesc}[Connection]{getpeername}{} Call the \method{getpeername} method of the underlying socket. \end{methoddesc} |