diff options
author | Jean-Paul Calderone <exarkun@divmod.com> | 2011-05-20 20:10:39 -0400 |
---|---|---|
committer | Jean-Paul Calderone <exarkun@divmod.com> | 2011-05-20 20:10:39 -0400 |
commit | ee6532d8a4045f34fe77552ccdaa68dd43ccd2f6 (patch) | |
tree | f4c5b76696228e0f46cd84435b8c58063741586b /OpenSSL/test | |
parent | 9c77167dee628f276ed495a570985e5c68e347cc (diff) | |
parent | 13d190b5294a3795edd4bcba5c9a2d0559c2ba0f (diff) | |
download | pyopenssl-ee6532d8a4045f34fe77552ccdaa68dd43ccd2f6.tar.gz |
Add a Connection method for inspecting the certificate chain.
Diffstat (limited to 'OpenSSL/test')
-rw-r--r-- | OpenSSL/test/test_ssl.py | 196 |
1 files changed, 133 insertions, 63 deletions
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py index ff2e725..2761cec 100644 --- a/OpenSSL/test/test_ssl.py +++ b/OpenSSL/test/test_ssl.py @@ -12,7 +12,7 @@ from os import makedirs from os.path import join from unittest import main -from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1 +from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM from OpenSSL.crypto import PKey, X509, X509Extension from OpenSSL.crypto import dump_privatekey, load_privatekey from OpenSSL.crypto import dump_certificate, load_certificate @@ -22,14 +22,19 @@ from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE -from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE -from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version +from OpenSSL.SSL import ( + VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE, VERIFY_NONE) +from OpenSSL.SSL import ( + Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version) from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType from OpenSSL.test.util import TestCase, bytes, b -from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM -from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem -from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem +from OpenSSL.test.test_crypto import ( + cleartextCertificatePEM, cleartextPrivateKeyPEM) +from OpenSSL.test.test_crypto import ( + client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, + root_cert_pem) + try: from OpenSSL.SSL import OP_NO_QUERY_MTU except ImportError: @@ -63,6 +68,7 @@ MBYCEQCobsg29c9WZP/54oAPcwiDAgEC def verify_cb(conn, cert, errnum, depth, ok): return ok + def socket_pair(): """ Establish and return a pair of network sockets connected to each other. @@ -105,6 +111,60 @@ def handshake(client, server): conns.remove(conn) +def _create_certificate_chain(): + """ + Construct and return a chain of certificates. + + 1. A new self-signed certificate authority certificate (cacert) + 2. A new intermediate certificate signed by cacert (icert) + 3. A new server certificate signed by icert (scert) + """ + caext = X509Extension(b('basicConstraints'), False, b('CA:true')) + + # Step 1 + cakey = PKey() + cakey.generate_key(TYPE_RSA, 512) + cacert = X509() + cacert.get_subject().commonName = "Authority Certificate" + cacert.set_issuer(cacert.get_subject()) + cacert.set_pubkey(cakey) + cacert.set_notBefore(b("20000101000000Z")) + cacert.set_notAfter(b("20200101000000Z")) + cacert.add_extensions([caext]) + cacert.set_serial_number(0) + cacert.sign(cakey, "sha1") + + # Step 2 + ikey = PKey() + ikey.generate_key(TYPE_RSA, 512) + icert = X509() + icert.get_subject().commonName = "Intermediate Certificate" + icert.set_issuer(cacert.get_subject()) + icert.set_pubkey(ikey) + icert.set_notBefore(b("20000101000000Z")) + icert.set_notAfter(b("20200101000000Z")) + icert.add_extensions([caext]) + icert.set_serial_number(0) + icert.sign(cakey, "sha1") + + # Step 3 + skey = PKey() + skey.generate_key(TYPE_RSA, 512) + scert = X509() + scert.get_subject().commonName = "Server Certificate" + scert.set_issuer(icert.get_subject()) + scert.set_pubkey(skey) + scert.set_notBefore(b("20000101000000Z")) + scert.set_notAfter(b("20200101000000Z")) + scert.add_extensions([ + X509Extension(b('basicConstraints'), True, b('CA:false'))]) + scert.set_serial_number(0) + scert.sign(ikey, "sha1") + + return [(cakey, cacert), (ikey, icert), (skey, scert)] + + + class _LoopbackMixin: """ Helper mixin which defines methods for creating a connected socket pair and @@ -150,7 +210,7 @@ class _LoopbackMixin: # Give the side a chance to generate some more bytes, or # succeed. try: - bytes = read.recv(2 ** 16) + data = read.recv(2 ** 16) except WantReadError: # It didn't succeed, so we'll hope it generated some # output. @@ -158,7 +218,7 @@ class _LoopbackMixin: else: # It did succeed, so we'll stop now and let the caller deal # with it. - return (read, bytes) + return (read, data) while True: # Keep copying as long as there's more stuff there. @@ -175,6 +235,7 @@ class _LoopbackMixin: write.bio_write(dirty) + class VersionTests(TestCase): """ Tests for version information exposed by @@ -638,59 +699,6 @@ class ContextTests(TestCase, _LoopbackMixin): self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object()) - def _create_certificate_chain(self): - """ - Construct and return a chain of certificates. - - 1. A new self-signed certificate authority certificate (cacert) - 2. A new intermediate certificate signed by cacert (icert) - 3. A new server certificate signed by icert (scert) - """ - caext = X509Extension(b('basicConstraints'), False, b('CA:true')) - - # Step 1 - cakey = PKey() - cakey.generate_key(TYPE_RSA, 512) - cacert = X509() - cacert.get_subject().commonName = "Authority Certificate" - cacert.set_issuer(cacert.get_subject()) - cacert.set_pubkey(cakey) - cacert.set_notBefore(b("20000101000000Z")) - cacert.set_notAfter(b("20200101000000Z")) - cacert.add_extensions([caext]) - cacert.set_serial_number(0) - cacert.sign(cakey, "sha1") - - # Step 2 - ikey = PKey() - ikey.generate_key(TYPE_RSA, 512) - icert = X509() - icert.get_subject().commonName = "Intermediate Certificate" - icert.set_issuer(cacert.get_subject()) - icert.set_pubkey(ikey) - icert.set_notBefore(b("20000101000000Z")) - icert.set_notAfter(b("20200101000000Z")) - icert.add_extensions([caext]) - icert.set_serial_number(0) - icert.sign(cakey, "sha1") - - # Step 3 - skey = PKey() - skey.generate_key(TYPE_RSA, 512) - scert = X509() - scert.get_subject().commonName = "Server Certificate" - scert.set_issuer(icert.get_subject()) - scert.set_pubkey(skey) - scert.set_notBefore(b("20000101000000Z")) - scert.set_notAfter(b("20200101000000Z")) - scert.add_extensions([ - X509Extension(b('basicConstraints'), True, b('CA:false'))]) - scert.set_serial_number(0) - scert.sign(ikey, "sha1") - - return [(cakey, cacert), (ikey, icert), (skey, scert)] - - def _handshake_test(self, serverContext, clientContext): """ Verify that a client and server created with the given contexts can @@ -726,7 +734,7 @@ class ContextTests(TestCase, _LoopbackMixin): to it with a client which trusts cacert and requires verification to succeed. """ - chain = self._create_certificate_chain() + chain = _create_certificate_chain() [(cakey, cacert), (ikey, icert), (skey, scert)] = chain # Dump the CA certificate to a file because that's the only way to load @@ -767,7 +775,7 @@ class ContextTests(TestCase, _LoopbackMixin): to it with a client which trusts cacert and requires verification to succeed. """ - chain = self._create_certificate_chain() + chain = _create_certificate_chain() [(cakey, cacert), (ikey, icert), (skey, scert)] = chain # Write out the chain file. @@ -1095,6 +1103,68 @@ class ConnectionTests(TestCase, _LoopbackMixin): self.assertRaises(NotImplementedError, conn.makefile) + def test_get_peer_cert_chain_wrong_args(self): + """ + L{Connection.get_peer_cert_chain} raises L{TypeError} if called with any + arguments. + """ + conn = Connection(Context(TLSv1_METHOD), None) + self.assertRaises(TypeError, conn.get_peer_cert_chain, 1) + self.assertRaises(TypeError, conn.get_peer_cert_chain, "foo") + self.assertRaises(TypeError, conn.get_peer_cert_chain, object()) + self.assertRaises(TypeError, conn.get_peer_cert_chain, []) + + + def test_get_peer_cert_chain(self): + """ + L{Connection.get_peer_cert_chain} returns a list of certificates which + the connected server returned for the certification verification. + """ + chain = _create_certificate_chain() + [(cakey, cacert), (ikey, icert), (skey, scert)] = chain + + serverContext = Context(TLSv1_METHOD) + serverContext.use_privatekey(skey) + serverContext.use_certificate(scert) + serverContext.add_extra_chain_cert(icert) + serverContext.add_extra_chain_cert(cacert) + server = Connection(serverContext, None) + server.set_accept_state() + + # Create the client + clientContext = Context(TLSv1_METHOD) + clientContext.set_verify(VERIFY_NONE, verify_cb) + client = Connection(clientContext, None) + client.set_connect_state() + + self._interactInMemory(client, server) + + chain = client.get_peer_cert_chain() + self.assertEqual(len(chain), 3) + self.assertEqual( + "Server Certificate", chain[0].get_subject().CN) + self.assertEqual( + "Intermediate Certificate", chain[1].get_subject().CN) + self.assertEqual( + "Authority Certificate", chain[2].get_subject().CN) + + + def test_get_peer_cert_chain_none(self): + """ + L{Connection.get_peer_cert_chain} returns C{None} if the peer sends no + certificate chain. + """ + ctx = Context(TLSv1_METHOD) + ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) + ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) + server = Connection(ctx, None) + server.set_accept_state() + client = Connection(Context(TLSv1_METHOD), None) + client.set_connect_state() + self._interactInMemory(client, server) + self.assertIdentical(None, server.get_peer_cert_chain()) + + class ConnectionGetCipherListTests(TestCase): """ |