summaryrefslogtreecommitdiff
path: root/tests/test_bio_ssl.py
diff options
context:
space:
mode:
authorHeikki Toivonen <heikki@heikkitoivonen.net>2006-03-14 18:36:23 +0000
committerHeikki Toivonen <heikki@heikkitoivonen.net>2006-03-14 18:36:23 +0000
commit53df409746502c4f3196c9d59d8895c0e987983d (patch)
tree31d03ef9173d281d35b4bae1cb89a914537c0b4c /tests/test_bio_ssl.py
parent3ab2470204d90997ffc893a519597960f5b68355 (diff)
downloadm2crypto-53df409746502c4f3196c9d59d8895c0e987983d.tar.gz
Bug 5381, SSLBio and related additions to help do SSL with
BIOs directly, original patch by Matt Rodriguez. git-svn-id: http://svn.osafoundation.org/m2crypto/trunk@381 2715db39-9adf-0310-9c64-84f055769b4b
Diffstat (limited to 'tests/test_bio_ssl.py')
-rw-r--r--tests/test_bio_ssl.py148
1 files changed, 148 insertions, 0 deletions
diff --git a/tests/test_bio_ssl.py b/tests/test_bio_ssl.py
new file mode 100644
index 0000000..e40fef9
--- /dev/null
+++ b/tests/test_bio_ssl.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python
+
+"""Unit tests for M2Crypto.BIO.File.
+
+Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved."""
+
+import unittest, threading, sys, socket
+
+from M2Crypto import BIO
+from M2Crypto import SSL
+from M2Crypto import Err
+from M2Crypto import Rand
+from M2Crypto import threading as m2threading
+
+from test_ssl import srv_host, srv_port
+
+class HandshakeClient(threading.Thread):
+
+ def __init__(self, host, port):
+ threading.Thread.__init__(self)
+ self.host = host
+ self.port = port
+
+ def run(self):
+ ctx = SSL.Context()
+ ctx.load_cert_chain("server.pem")
+ conn = SSL.Connection(ctx)
+ cipher_list = conn.get_cipher_list()
+ sslbio = BIO.SSLBio()
+ readbio = BIO.MemoryBuffer()
+ writebio = BIO.MemoryBuffer()
+ sslbio.set_ssl(conn)
+ conn.set_bio(readbio, writebio)
+ conn.set_connect_state()
+ sock = socket.socket()
+ sock.connect((self.host, self.port))
+
+ handshake_complete = False
+ while not handshake_complete:
+ ret = sslbio.do_handshake()
+ if ret <= 0:
+ if not sslbio.should_retry() or not sslbio.should_read():
+ err_string = Err.get_error()
+ print err_string
+ sys.exit("unrecoverable error in handshake - client")
+ else:
+ output_token = writebio.read()
+ if output_token is not None:
+ sock.sendall(output_token)
+ else:
+ input_token = sock.recv(1024)
+ readbio.write(input_token)
+ else:
+ handshake_complete = True
+
+ sock.close()
+
+
+class SSLTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.sslbio = BIO.SSLBio()
+
+ def check_set_ssl(self):
+ ctx = SSL.Context()
+ conn = SSL.Connection(ctx)
+ self.sslbio.set_ssl(conn)
+
+ def check_do_handshake_fail(self):
+ ctx = SSL.Context()
+ conn = SSL.Connection(ctx)
+ conn.set_connect_state()
+ self.sslbio.set_ssl(conn)
+ ret = self.sslbio.do_handshake()
+ assert ret == 0
+
+ def check_should_retry_fail(self):
+ ctx = SSL.Context()
+ conn = SSL.Connection(ctx)
+ self.sslbio.set_ssl(conn)
+ ret = self.sslbio.do_handshake()
+ assert ret == -1
+ ret = self.sslbio.should_retry()
+ assert ret == 0
+
+ def check_should_write_fail(self):
+ ctx = SSL.Context()
+ conn = SSL.Connection(ctx)
+ self.sslbio.set_ssl(conn)
+ ret = self.sslbio.do_handshake()
+ assert ret == -1
+ ret = self.sslbio.should_write()
+ assert ret == 0
+
+ def check_should_read_fail(self):
+ ctx = SSL.Context()
+ conn = SSL.Connection(ctx)
+ self.sslbio.set_ssl(conn)
+ ret = self.sslbio.do_handshake()
+ assert ret == -1
+ ret = self.sslbio.should_read()
+ assert ret == 0
+
+ def check_do_handshake_succeed(self):
+ ctx = SSL.Context()
+ ctx.load_cert_chain("server.pem")
+ conn = SSL.Connection(ctx)
+ self.sslbio.set_ssl(conn)
+ readbio = BIO.MemoryBuffer()
+ writebio = BIO.MemoryBuffer()
+ conn.set_bio(readbio, writebio)
+ conn.set_accept_state()
+ handshake_complete = False
+ sock = socket.socket()
+ sock.bind((srv_host, srv_port))
+ sock.listen(5)
+ handshake_client = HandshakeClient(srv_host, srv_port)
+ handshake_client.start()
+ new_sock, addr = sock.accept()
+ while not handshake_complete:
+ input_token = new_sock.recv(1024)
+ readbio.write(input_token)
+
+ ret = self.sslbio.do_handshake()
+ if ret <= 0:
+ if not self.sslbio.should_retry() or not self.sslbio.should_read():
+ sys.exit("unrecoverable error in handshake - server")
+ else:
+ handshake_complete = True
+
+ output_token = writebio.read()
+ if output_token is not None:
+ new_sock.sendall(output_token)
+
+ handshake_client.join()
+ sock.close()
+ new_sock.close()
+
+def suite():
+ return unittest.makeSuite(SSLTestCase, 'check_')
+
+
+if __name__ == '__main__':
+ Rand.load_file('randpool.dat', -1)
+ m2threading.init()
+ unittest.TextTestRunner().run(suite())
+ m2threading.cleanup()
+ Rand.save_file('randpool.dat')