summaryrefslogtreecommitdiff
path: root/OpenSSL/test
diff options
context:
space:
mode:
authorJean-Paul Calderone <exarkun@divmod.com>2011-05-20 20:10:39 -0400
committerJean-Paul Calderone <exarkun@divmod.com>2011-05-20 20:10:39 -0400
commitee6532d8a4045f34fe77552ccdaa68dd43ccd2f6 (patch)
treef4c5b76696228e0f46cd84435b8c58063741586b /OpenSSL/test
parent9c77167dee628f276ed495a570985e5c68e347cc (diff)
parent13d190b5294a3795edd4bcba5c9a2d0559c2ba0f (diff)
downloadpyopenssl-ee6532d8a4045f34fe77552ccdaa68dd43ccd2f6.tar.gz
Add a Connection method for inspecting the certificate chain.
Diffstat (limited to 'OpenSSL/test')
-rw-r--r--OpenSSL/test/test_ssl.py196
1 files changed, 133 insertions, 63 deletions
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index ff2e725..2761cec 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -12,7 +12,7 @@ from os import makedirs
from os.path import join
from unittest import main
-from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1
+from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
from OpenSSL.crypto import PKey, X509, X509Extension
from OpenSSL.crypto import dump_privatekey, load_privatekey
from OpenSSL.crypto import dump_certificate, load_certificate
@@ -22,14 +22,19 @@ from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
-from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
-from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version
+from OpenSSL.SSL import (
+ VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE, VERIFY_NONE)
+from OpenSSL.SSL import (
+ Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version)
from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType
from OpenSSL.test.util import TestCase, bytes, b
-from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
-from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem
-from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem
+from OpenSSL.test.test_crypto import (
+ cleartextCertificatePEM, cleartextPrivateKeyPEM)
+from OpenSSL.test.test_crypto import (
+ client_cert_pem, client_key_pem, server_cert_pem, server_key_pem,
+ root_cert_pem)
+
try:
from OpenSSL.SSL import OP_NO_QUERY_MTU
except ImportError:
@@ -63,6 +68,7 @@ MBYCEQCobsg29c9WZP/54oAPcwiDAgEC
def verify_cb(conn, cert, errnum, depth, ok):
return ok
+
def socket_pair():
"""
Establish and return a pair of network sockets connected to each other.
@@ -105,6 +111,60 @@ def handshake(client, server):
conns.remove(conn)
+def _create_certificate_chain():
+ """
+ Construct and return a chain of certificates.
+
+ 1. A new self-signed certificate authority certificate (cacert)
+ 2. A new intermediate certificate signed by cacert (icert)
+ 3. A new server certificate signed by icert (scert)
+ """
+ caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
+
+ # Step 1
+ cakey = PKey()
+ cakey.generate_key(TYPE_RSA, 512)
+ cacert = X509()
+ cacert.get_subject().commonName = "Authority Certificate"
+ cacert.set_issuer(cacert.get_subject())
+ cacert.set_pubkey(cakey)
+ cacert.set_notBefore(b("20000101000000Z"))
+ cacert.set_notAfter(b("20200101000000Z"))
+ cacert.add_extensions([caext])
+ cacert.set_serial_number(0)
+ cacert.sign(cakey, "sha1")
+
+ # Step 2
+ ikey = PKey()
+ ikey.generate_key(TYPE_RSA, 512)
+ icert = X509()
+ icert.get_subject().commonName = "Intermediate Certificate"
+ icert.set_issuer(cacert.get_subject())
+ icert.set_pubkey(ikey)
+ icert.set_notBefore(b("20000101000000Z"))
+ icert.set_notAfter(b("20200101000000Z"))
+ icert.add_extensions([caext])
+ icert.set_serial_number(0)
+ icert.sign(cakey, "sha1")
+
+ # Step 3
+ skey = PKey()
+ skey.generate_key(TYPE_RSA, 512)
+ scert = X509()
+ scert.get_subject().commonName = "Server Certificate"
+ scert.set_issuer(icert.get_subject())
+ scert.set_pubkey(skey)
+ scert.set_notBefore(b("20000101000000Z"))
+ scert.set_notAfter(b("20200101000000Z"))
+ scert.add_extensions([
+ X509Extension(b('basicConstraints'), True, b('CA:false'))])
+ scert.set_serial_number(0)
+ scert.sign(ikey, "sha1")
+
+ return [(cakey, cacert), (ikey, icert), (skey, scert)]
+
+
+
class _LoopbackMixin:
"""
Helper mixin which defines methods for creating a connected socket pair and
@@ -150,7 +210,7 @@ class _LoopbackMixin:
# Give the side a chance to generate some more bytes, or
# succeed.
try:
- bytes = read.recv(2 ** 16)
+ data = read.recv(2 ** 16)
except WantReadError:
# It didn't succeed, so we'll hope it generated some
# output.
@@ -158,7 +218,7 @@ class _LoopbackMixin:
else:
# It did succeed, so we'll stop now and let the caller deal
# with it.
- return (read, bytes)
+ return (read, data)
while True:
# Keep copying as long as there's more stuff there.
@@ -175,6 +235,7 @@ class _LoopbackMixin:
write.bio_write(dirty)
+
class VersionTests(TestCase):
"""
Tests for version information exposed by
@@ -638,59 +699,6 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
- def _create_certificate_chain(self):
- """
- Construct and return a chain of certificates.
-
- 1. A new self-signed certificate authority certificate (cacert)
- 2. A new intermediate certificate signed by cacert (icert)
- 3. A new server certificate signed by icert (scert)
- """
- caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
-
- # Step 1
- cakey = PKey()
- cakey.generate_key(TYPE_RSA, 512)
- cacert = X509()
- cacert.get_subject().commonName = "Authority Certificate"
- cacert.set_issuer(cacert.get_subject())
- cacert.set_pubkey(cakey)
- cacert.set_notBefore(b("20000101000000Z"))
- cacert.set_notAfter(b("20200101000000Z"))
- cacert.add_extensions([caext])
- cacert.set_serial_number(0)
- cacert.sign(cakey, "sha1")
-
- # Step 2
- ikey = PKey()
- ikey.generate_key(TYPE_RSA, 512)
- icert = X509()
- icert.get_subject().commonName = "Intermediate Certificate"
- icert.set_issuer(cacert.get_subject())
- icert.set_pubkey(ikey)
- icert.set_notBefore(b("20000101000000Z"))
- icert.set_notAfter(b("20200101000000Z"))
- icert.add_extensions([caext])
- icert.set_serial_number(0)
- icert.sign(cakey, "sha1")
-
- # Step 3
- skey = PKey()
- skey.generate_key(TYPE_RSA, 512)
- scert = X509()
- scert.get_subject().commonName = "Server Certificate"
- scert.set_issuer(icert.get_subject())
- scert.set_pubkey(skey)
- scert.set_notBefore(b("20000101000000Z"))
- scert.set_notAfter(b("20200101000000Z"))
- scert.add_extensions([
- X509Extension(b('basicConstraints'), True, b('CA:false'))])
- scert.set_serial_number(0)
- scert.sign(ikey, "sha1")
-
- return [(cakey, cacert), (ikey, icert), (skey, scert)]
-
-
def _handshake_test(self, serverContext, clientContext):
"""
Verify that a client and server created with the given contexts can
@@ -726,7 +734,7 @@ class ContextTests(TestCase, _LoopbackMixin):
to it with a client which trusts cacert and requires verification to
succeed.
"""
- chain = self._create_certificate_chain()
+ chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
# Dump the CA certificate to a file because that's the only way to load
@@ -767,7 +775,7 @@ class ContextTests(TestCase, _LoopbackMixin):
to it with a client which trusts cacert and requires verification to
succeed.
"""
- chain = self._create_certificate_chain()
+ chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
# Write out the chain file.
@@ -1095,6 +1103,68 @@ class ConnectionTests(TestCase, _LoopbackMixin):
self.assertRaises(NotImplementedError, conn.makefile)
+ def test_get_peer_cert_chain_wrong_args(self):
+ """
+ L{Connection.get_peer_cert_chain} raises L{TypeError} if called with any
+ arguments.
+ """
+ conn = Connection(Context(TLSv1_METHOD), None)
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, 1)
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, "foo")
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, object())
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, [])
+
+
+ def test_get_peer_cert_chain(self):
+ """
+ L{Connection.get_peer_cert_chain} returns a list of certificates which
+ the connected server returned for the certification verification.
+ """
+ chain = _create_certificate_chain()
+ [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
+
+ serverContext = Context(TLSv1_METHOD)
+ serverContext.use_privatekey(skey)
+ serverContext.use_certificate(scert)
+ serverContext.add_extra_chain_cert(icert)
+ serverContext.add_extra_chain_cert(cacert)
+ server = Connection(serverContext, None)
+ server.set_accept_state()
+
+ # Create the client
+ clientContext = Context(TLSv1_METHOD)
+ clientContext.set_verify(VERIFY_NONE, verify_cb)
+ client = Connection(clientContext, None)
+ client.set_connect_state()
+
+ self._interactInMemory(client, server)
+
+ chain = client.get_peer_cert_chain()
+ self.assertEqual(len(chain), 3)
+ self.assertEqual(
+ "Server Certificate", chain[0].get_subject().CN)
+ self.assertEqual(
+ "Intermediate Certificate", chain[1].get_subject().CN)
+ self.assertEqual(
+ "Authority Certificate", chain[2].get_subject().CN)
+
+
+ def test_get_peer_cert_chain_none(self):
+ """
+ L{Connection.get_peer_cert_chain} returns C{None} if the peer sends no
+ certificate chain.
+ """
+ ctx = Context(TLSv1_METHOD)
+ ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
+ ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
+ server = Connection(ctx, None)
+ server.set_accept_state()
+ client = Connection(Context(TLSv1_METHOD), None)
+ client.set_connect_state()
+ self._interactInMemory(client, server)
+ self.assertIdentical(None, server.get_peer_cert_chain())
+
+
class ConnectionGetCipherListTests(TestCase):
"""