diff options
author | Heikki Toivonen <heikki@heikkitoivonen.net> | 2006-03-14 18:36:23 +0000 |
---|---|---|
committer | Heikki Toivonen <heikki@heikkitoivonen.net> | 2006-03-14 18:36:23 +0000 |
commit | 53df409746502c4f3196c9d59d8895c0e987983d (patch) | |
tree | 31d03ef9173d281d35b4bae1cb89a914537c0b4c /tests/test_bio_ssl.py | |
parent | 3ab2470204d90997ffc893a519597960f5b68355 (diff) | |
download | m2crypto-53df409746502c4f3196c9d59d8895c0e987983d.tar.gz |
Bug 5381, SSLBio and related additions to help do SSL with
BIOs directly, original patch by Matt Rodriguez.
git-svn-id: http://svn.osafoundation.org/m2crypto/trunk@381 2715db39-9adf-0310-9c64-84f055769b4b
Diffstat (limited to 'tests/test_bio_ssl.py')
-rw-r--r-- | tests/test_bio_ssl.py | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/tests/test_bio_ssl.py b/tests/test_bio_ssl.py new file mode 100644 index 0000000..e40fef9 --- /dev/null +++ b/tests/test_bio_ssl.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python + +"""Unit tests for M2Crypto.BIO.File. + +Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved.""" + +import unittest, threading, sys, socket + +from M2Crypto import BIO +from M2Crypto import SSL +from M2Crypto import Err +from M2Crypto import Rand +from M2Crypto import threading as m2threading + +from test_ssl import srv_host, srv_port + +class HandshakeClient(threading.Thread): + + def __init__(self, host, port): + threading.Thread.__init__(self) + self.host = host + self.port = port + + def run(self): + ctx = SSL.Context() + ctx.load_cert_chain("server.pem") + conn = SSL.Connection(ctx) + cipher_list = conn.get_cipher_list() + sslbio = BIO.SSLBio() + readbio = BIO.MemoryBuffer() + writebio = BIO.MemoryBuffer() + sslbio.set_ssl(conn) + conn.set_bio(readbio, writebio) + conn.set_connect_state() + sock = socket.socket() + sock.connect((self.host, self.port)) + + handshake_complete = False + while not handshake_complete: + ret = sslbio.do_handshake() + if ret <= 0: + if not sslbio.should_retry() or not sslbio.should_read(): + err_string = Err.get_error() + print err_string + sys.exit("unrecoverable error in handshake - client") + else: + output_token = writebio.read() + if output_token is not None: + sock.sendall(output_token) + else: + input_token = sock.recv(1024) + readbio.write(input_token) + else: + handshake_complete = True + + sock.close() + + +class SSLTestCase(unittest.TestCase): + + def setUp(self): + self.sslbio = BIO.SSLBio() + + def check_set_ssl(self): + ctx = SSL.Context() + conn = SSL.Connection(ctx) + self.sslbio.set_ssl(conn) + + def check_do_handshake_fail(self): + ctx = SSL.Context() + conn = SSL.Connection(ctx) + conn.set_connect_state() + self.sslbio.set_ssl(conn) + ret = self.sslbio.do_handshake() + assert ret == 0 + + def check_should_retry_fail(self): + ctx = SSL.Context() + conn = SSL.Connection(ctx) + self.sslbio.set_ssl(conn) + ret = self.sslbio.do_handshake() + assert ret == -1 + ret = self.sslbio.should_retry() + assert ret == 0 + + def check_should_write_fail(self): + ctx = SSL.Context() + conn = SSL.Connection(ctx) + self.sslbio.set_ssl(conn) + ret = self.sslbio.do_handshake() + assert ret == -1 + ret = self.sslbio.should_write() + assert ret == 0 + + def check_should_read_fail(self): + ctx = SSL.Context() + conn = SSL.Connection(ctx) + self.sslbio.set_ssl(conn) + ret = self.sslbio.do_handshake() + assert ret == -1 + ret = self.sslbio.should_read() + assert ret == 0 + + def check_do_handshake_succeed(self): + ctx = SSL.Context() + ctx.load_cert_chain("server.pem") + conn = SSL.Connection(ctx) + self.sslbio.set_ssl(conn) + readbio = BIO.MemoryBuffer() + writebio = BIO.MemoryBuffer() + conn.set_bio(readbio, writebio) + conn.set_accept_state() + handshake_complete = False + sock = socket.socket() + sock.bind((srv_host, srv_port)) + sock.listen(5) + handshake_client = HandshakeClient(srv_host, srv_port) + handshake_client.start() + new_sock, addr = sock.accept() + while not handshake_complete: + input_token = new_sock.recv(1024) + readbio.write(input_token) + + ret = self.sslbio.do_handshake() + if ret <= 0: + if not self.sslbio.should_retry() or not self.sslbio.should_read(): + sys.exit("unrecoverable error in handshake - server") + else: + handshake_complete = True + + output_token = writebio.read() + if output_token is not None: + new_sock.sendall(output_token) + + handshake_client.join() + sock.close() + new_sock.close() + +def suite(): + return unittest.makeSuite(SSLTestCase, 'check_') + + +if __name__ == '__main__': + Rand.load_file('randpool.dat', -1) + m2threading.init() + unittest.TextTestRunner().run(suite()) + m2threading.cleanup() + Rand.save_file('randpool.dat') |