summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Paul Calderone <exarkun@divmod.com>2011-05-20 20:10:39 -0400
committerJean-Paul Calderone <exarkun@divmod.com>2011-05-20 20:10:39 -0400
commitee6532d8a4045f34fe77552ccdaa68dd43ccd2f6 (patch)
treef4c5b76696228e0f46cd84435b8c58063741586b
parent9c77167dee628f276ed495a570985e5c68e347cc (diff)
parent13d190b5294a3795edd4bcba5c9a2d0559c2ba0f (diff)
downloadpyopenssl-ee6532d8a4045f34fe77552ccdaa68dd43ccd2f6.tar.gz
Add a Connection method for inspecting the certificate chain.
-rw-r--r--ChangeLog6
-rwxr-xr-xOpenSSL/ssl/connection.c39
-rw-r--r--OpenSSL/test/test_ssl.py196
-rw-r--r--OpenSSL/tsafe.py2
-rw-r--r--doc/pyOpenSSL.tex4
5 files changed, 183 insertions, 64 deletions
diff --git a/ChangeLog b/ChangeLog
index 5f60a16..2a4c6ec 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+2011-05-20 Jean-Paul Calderone <exarkun@twistedmatrix.com>
+
+ * OpenSSL/ssl/connection.c, OpenSSL/test/test_ssl.py: Add a new
+ method to the Connection type, get_peer_cert_chain, for retrieving
+ the peer's certificate chain.
+
2011-05-19 Jean-Paul Calderone <exarkun@twistedmatrix.com>
* OpenSSL/crypto/x509.c, OpenSSL/test/test_crypto.py: Add a new
diff --git a/OpenSSL/ssl/connection.c b/OpenSSL/ssl/connection.c
index 5b304b1..a3ec0f0 100755
--- a/OpenSSL/ssl/connection.c
+++ b/OpenSSL/ssl/connection.c
@@ -1098,6 +1098,44 @@ ssl_Connection_get_peer_certificate(ssl_ConnectionObj *self, PyObject *args)
}
}
+static char ssl_Connection_get_peer_cert_chain_doc[] = "\n\
+Retrieve the other side's certificate (if any)\n\
+\n\
+@return: A list of X509 instances giving the peer's certificate chain,\n\
+ or None if it does not have one.\n\
+";
+static PyObject *
+ssl_Connection_get_peer_cert_chain(ssl_ConnectionObj *self, PyObject *args) {
+ STACK_OF(X509) *sk;
+ PyObject *chain;
+ crypto_X509Obj *cert;
+ Py_ssize_t i;
+
+ if (!PyArg_ParseTuple(args, ":get_peer_cert_chain")) {
+ return NULL;
+ }
+
+ sk = SSL_get_peer_cert_chain(self->ssl);
+ if (sk != NULL) {
+ chain = PyList_New(sk_X509_num(sk));
+ for (i = 0; i < sk_X509_num(sk); i++) {
+ cert = new_x509(sk_X509_value(sk, i), 1);
+ if (!cert) {
+ /* XXX Untested */
+ Py_DECREF(chain);
+ return NULL;
+ }
+ CRYPTO_add(&cert->x509->references, 1, CRYPTO_LOCK_X509);
+ PyList_SET_ITEM(chain, i, (PyObject *)cert);
+ }
+ return chain;
+ } else {
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
+
+}
+
static char ssl_Connection_want_read_doc[] = "\n\
Checks if more data has to be read from the transport layer to complete an\n\
operation.\n\
@@ -1175,6 +1213,7 @@ static PyMethodDef ssl_Connection_methods[] =
ADD_METHOD(master_key),
ADD_METHOD(sock_shutdown),
ADD_METHOD(get_peer_certificate),
+ ADD_METHOD(get_peer_cert_chain),
ADD_METHOD(want_read),
ADD_METHOD(want_write),
ADD_METHOD(set_accept_state),
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index ff2e725..2761cec 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -12,7 +12,7 @@ from os import makedirs
from os.path import join
from unittest import main
-from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1
+from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
from OpenSSL.crypto import PKey, X509, X509Extension
from OpenSSL.crypto import dump_privatekey, load_privatekey
from OpenSSL.crypto import dump_certificate, load_certificate
@@ -22,14 +22,19 @@ from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON
from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
-from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
-from OpenSSL.SSL import Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version
+from OpenSSL.SSL import (
+ VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE, VERIFY_NONE)
+from OpenSSL.SSL import (
+ Error, SysCallError, WantReadError, ZeroReturnError, SSLeay_version)
from OpenSSL.SSL import Context, ContextType, Connection, ConnectionType
from OpenSSL.test.util import TestCase, bytes, b
-from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
-from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem
-from OpenSSL.test.test_crypto import server_cert_pem, server_key_pem, root_cert_pem
+from OpenSSL.test.test_crypto import (
+ cleartextCertificatePEM, cleartextPrivateKeyPEM)
+from OpenSSL.test.test_crypto import (
+ client_cert_pem, client_key_pem, server_cert_pem, server_key_pem,
+ root_cert_pem)
+
try:
from OpenSSL.SSL import OP_NO_QUERY_MTU
except ImportError:
@@ -63,6 +68,7 @@ MBYCEQCobsg29c9WZP/54oAPcwiDAgEC
def verify_cb(conn, cert, errnum, depth, ok):
return ok
+
def socket_pair():
"""
Establish and return a pair of network sockets connected to each other.
@@ -105,6 +111,60 @@ def handshake(client, server):
conns.remove(conn)
+def _create_certificate_chain():
+ """
+ Construct and return a chain of certificates.
+
+ 1. A new self-signed certificate authority certificate (cacert)
+ 2. A new intermediate certificate signed by cacert (icert)
+ 3. A new server certificate signed by icert (scert)
+ """
+ caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
+
+ # Step 1
+ cakey = PKey()
+ cakey.generate_key(TYPE_RSA, 512)
+ cacert = X509()
+ cacert.get_subject().commonName = "Authority Certificate"
+ cacert.set_issuer(cacert.get_subject())
+ cacert.set_pubkey(cakey)
+ cacert.set_notBefore(b("20000101000000Z"))
+ cacert.set_notAfter(b("20200101000000Z"))
+ cacert.add_extensions([caext])
+ cacert.set_serial_number(0)
+ cacert.sign(cakey, "sha1")
+
+ # Step 2
+ ikey = PKey()
+ ikey.generate_key(TYPE_RSA, 512)
+ icert = X509()
+ icert.get_subject().commonName = "Intermediate Certificate"
+ icert.set_issuer(cacert.get_subject())
+ icert.set_pubkey(ikey)
+ icert.set_notBefore(b("20000101000000Z"))
+ icert.set_notAfter(b("20200101000000Z"))
+ icert.add_extensions([caext])
+ icert.set_serial_number(0)
+ icert.sign(cakey, "sha1")
+
+ # Step 3
+ skey = PKey()
+ skey.generate_key(TYPE_RSA, 512)
+ scert = X509()
+ scert.get_subject().commonName = "Server Certificate"
+ scert.set_issuer(icert.get_subject())
+ scert.set_pubkey(skey)
+ scert.set_notBefore(b("20000101000000Z"))
+ scert.set_notAfter(b("20200101000000Z"))
+ scert.add_extensions([
+ X509Extension(b('basicConstraints'), True, b('CA:false'))])
+ scert.set_serial_number(0)
+ scert.sign(ikey, "sha1")
+
+ return [(cakey, cacert), (ikey, icert), (skey, scert)]
+
+
+
class _LoopbackMixin:
"""
Helper mixin which defines methods for creating a connected socket pair and
@@ -150,7 +210,7 @@ class _LoopbackMixin:
# Give the side a chance to generate some more bytes, or
# succeed.
try:
- bytes = read.recv(2 ** 16)
+ data = read.recv(2 ** 16)
except WantReadError:
# It didn't succeed, so we'll hope it generated some
# output.
@@ -158,7 +218,7 @@ class _LoopbackMixin:
else:
# It did succeed, so we'll stop now and let the caller deal
# with it.
- return (read, bytes)
+ return (read, data)
while True:
# Keep copying as long as there's more stuff there.
@@ -175,6 +235,7 @@ class _LoopbackMixin:
write.bio_write(dirty)
+
class VersionTests(TestCase):
"""
Tests for version information exposed by
@@ -638,59 +699,6 @@ class ContextTests(TestCase, _LoopbackMixin):
self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
- def _create_certificate_chain(self):
- """
- Construct and return a chain of certificates.
-
- 1. A new self-signed certificate authority certificate (cacert)
- 2. A new intermediate certificate signed by cacert (icert)
- 3. A new server certificate signed by icert (scert)
- """
- caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
-
- # Step 1
- cakey = PKey()
- cakey.generate_key(TYPE_RSA, 512)
- cacert = X509()
- cacert.get_subject().commonName = "Authority Certificate"
- cacert.set_issuer(cacert.get_subject())
- cacert.set_pubkey(cakey)
- cacert.set_notBefore(b("20000101000000Z"))
- cacert.set_notAfter(b("20200101000000Z"))
- cacert.add_extensions([caext])
- cacert.set_serial_number(0)
- cacert.sign(cakey, "sha1")
-
- # Step 2
- ikey = PKey()
- ikey.generate_key(TYPE_RSA, 512)
- icert = X509()
- icert.get_subject().commonName = "Intermediate Certificate"
- icert.set_issuer(cacert.get_subject())
- icert.set_pubkey(ikey)
- icert.set_notBefore(b("20000101000000Z"))
- icert.set_notAfter(b("20200101000000Z"))
- icert.add_extensions([caext])
- icert.set_serial_number(0)
- icert.sign(cakey, "sha1")
-
- # Step 3
- skey = PKey()
- skey.generate_key(TYPE_RSA, 512)
- scert = X509()
- scert.get_subject().commonName = "Server Certificate"
- scert.set_issuer(icert.get_subject())
- scert.set_pubkey(skey)
- scert.set_notBefore(b("20000101000000Z"))
- scert.set_notAfter(b("20200101000000Z"))
- scert.add_extensions([
- X509Extension(b('basicConstraints'), True, b('CA:false'))])
- scert.set_serial_number(0)
- scert.sign(ikey, "sha1")
-
- return [(cakey, cacert), (ikey, icert), (skey, scert)]
-
-
def _handshake_test(self, serverContext, clientContext):
"""
Verify that a client and server created with the given contexts can
@@ -726,7 +734,7 @@ class ContextTests(TestCase, _LoopbackMixin):
to it with a client which trusts cacert and requires verification to
succeed.
"""
- chain = self._create_certificate_chain()
+ chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
# Dump the CA certificate to a file because that's the only way to load
@@ -767,7 +775,7 @@ class ContextTests(TestCase, _LoopbackMixin):
to it with a client which trusts cacert and requires verification to
succeed.
"""
- chain = self._create_certificate_chain()
+ chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
# Write out the chain file.
@@ -1095,6 +1103,68 @@ class ConnectionTests(TestCase, _LoopbackMixin):
self.assertRaises(NotImplementedError, conn.makefile)
+ def test_get_peer_cert_chain_wrong_args(self):
+ """
+ L{Connection.get_peer_cert_chain} raises L{TypeError} if called with any
+ arguments.
+ """
+ conn = Connection(Context(TLSv1_METHOD), None)
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, 1)
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, "foo")
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, object())
+ self.assertRaises(TypeError, conn.get_peer_cert_chain, [])
+
+
+ def test_get_peer_cert_chain(self):
+ """
+ L{Connection.get_peer_cert_chain} returns a list of certificates which
+ the connected server returned for the certification verification.
+ """
+ chain = _create_certificate_chain()
+ [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
+
+ serverContext = Context(TLSv1_METHOD)
+ serverContext.use_privatekey(skey)
+ serverContext.use_certificate(scert)
+ serverContext.add_extra_chain_cert(icert)
+ serverContext.add_extra_chain_cert(cacert)
+ server = Connection(serverContext, None)
+ server.set_accept_state()
+
+ # Create the client
+ clientContext = Context(TLSv1_METHOD)
+ clientContext.set_verify(VERIFY_NONE, verify_cb)
+ client = Connection(clientContext, None)
+ client.set_connect_state()
+
+ self._interactInMemory(client, server)
+
+ chain = client.get_peer_cert_chain()
+ self.assertEqual(len(chain), 3)
+ self.assertEqual(
+ "Server Certificate", chain[0].get_subject().CN)
+ self.assertEqual(
+ "Intermediate Certificate", chain[1].get_subject().CN)
+ self.assertEqual(
+ "Authority Certificate", chain[2].get_subject().CN)
+
+
+ def test_get_peer_cert_chain_none(self):
+ """
+ L{Connection.get_peer_cert_chain} returns C{None} if the peer sends no
+ certificate chain.
+ """
+ ctx = Context(TLSv1_METHOD)
+ ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
+ ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
+ server = Connection(ctx, None)
+ server.set_accept_state()
+ client = Connection(Context(TLSv1_METHOD), None)
+ client.set_connect_state()
+ self._interactInMemory(client, server)
+ self.assertIdentical(None, server.get_peer_cert_chain())
+
+
class ConnectionGetCipherListTests(TestCase):
"""
diff --git a/OpenSSL/tsafe.py b/OpenSSL/tsafe.py
index fe4b75f..9d7ad2f 100644
--- a/OpenSSL/tsafe.py
+++ b/OpenSSL/tsafe.py
@@ -16,7 +16,7 @@ class Connection:
'setblocking', 'fileno', 'shutdown', 'close', 'get_cipher_list',
'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
'makefile', 'get_app_data', 'set_app_data', 'state_string',
- 'sock_shutdown', 'get_peer_certificate', 'want_read',
+ 'sock_shutdown', 'get_peer_certificate', 'get_peer_cert_chain', 'want_read',
'want_write', 'set_connect_state', 'set_accept_state',
'connect_ex', 'sendall'):
exec("""def %s(self, *args):
diff --git a/doc/pyOpenSSL.tex b/doc/pyOpenSSL.tex
index fa7638d..93213f9 100644
--- a/doc/pyOpenSSL.tex
+++ b/doc/pyOpenSSL.tex
@@ -1203,6 +1203,10 @@ Retrieve the Context object associated with this Connection.
Retrieve the other side's certificate (if any)
\end{methoddesc}
+\begin{methoddesc}[Connection]{get_peer_cert_chain}{}
+Retrieve the tuple of the other side's certificate chain (if any)
+\end{methoddesc}
+
\begin{methoddesc}[Connection]{getpeername}{}
Call the \method{getpeername} method of the underlying socket.
\end{methoddesc}