From 0656a84f886455b108d0fb3a5424989f41faee5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C4=9Bj=20Cepl?= Date: Tue, 13 Oct 2015 13:05:26 +0200 Subject: Clean up whitespace etc. --- tests/test_ssl.py | 229 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 131 insertions(+), 98 deletions(-) (limited to 'tests/test_ssl.py') diff --git a/tests/test_ssl.py b/tests/test_ssl.py index 7918609..0747de8 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -20,13 +20,18 @@ Others: - ThreadingSSLServer """ -import os, signal, socket, sys, tempfile, thread, time +import os +import signal +import socket +import sys +import tempfile +import time try: import unittest2 as unittest except ImportError: import unittest -from M2Crypto import Rand, SSL, m2, Err +from M2Crypto import Err, Rand, SSL, m2 from fips import fips_mode @@ -58,7 +63,7 @@ def verify_cb_new_function(ok, store): stack = store.get1_chain() assert len(stack) == 1 assert stack[0].as_pem() == x509.as_pem() - except AssertionError, e: + except AssertionError: # If we let exceptions propagate from here the # caller may see strange errors. This is cleaner. return 0 @@ -93,7 +98,7 @@ class BaseSSLClientTestCase(unittest.TestCase): def start_server(self, args): if not self.openssl_in_path: raise Exception('openssl command not in PATH') - + pid = os.fork() if pid == 0: # openssl must be started in the tests directory for it @@ -103,7 +108,7 @@ class BaseSSLClientTestCase(unittest.TestCase): os.execvp('openssl', args) finally: os.chdir('..') - + else: time.sleep(sleepTime) return pid @@ -120,7 +125,7 @@ class BaseSSLClientTestCase(unittest.TestCase): r = s.recv(4096) if not r: break - except SSL.SSLError: # s_server throws an 'unexpected eof'... + except SSL.SSLError: # s_server throws an 'unexpected eof'... break resp = resp + r return resp @@ -136,7 +141,7 @@ class BaseSSLClientTestCase(unittest.TestCase): class PassSSLClientTestCase(BaseSSLClientTestCase): - + def test_pass(self): pass @@ -161,22 +166,26 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase): ctx = SSL.Context() ctx.load_verify_locations(cafile='tests/ca.pem') ctx.load_cert('tests/x509.pem') - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 1) ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT) - c = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx) + c = httpslib.HTTPSConnection(srv_host, self.srv_port, + ssl_context=ctx) c.request('GET', '/') ses = c.get_session() t = ses.as_text() data = c.getresponse().read() # Appearently closing connection here screws session; Ali Polatel? # c.close() - + ctx2 = SSL.Context() ctx2.load_verify_locations(cafile='tests/ca.pem') ctx2.load_cert('tests/x509.pem') - ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1) + ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 1) ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT) - c2 = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx2) + c2 = httpslib.HTTPSConnection(srv_host, self.srv_port, + ssl_context=ctx2) c2.set_session(ses) c2.request('GET', '/') ses2 = c2.get_session() @@ -187,16 +196,18 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase): assert t == t2, "Sessions did not match" finally: self.stop_server(pid) - self.failIf(date.find('s_server -quiet -www') == -1) + self.failIf(data.find('s_server -quiet -www') == -1) def test_HTTPSConnection_secure_context(self): pid = self.start_server(self.args) try: from M2Crypto import httpslib ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') - c = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx) + c = httpslib.HTTPSConnection(srv_host, self.srv_port, + ssl_context=ctx) c.request('GET', '/') data = c.getresponse().read() c.close() @@ -209,9 +220,11 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase): try: from M2Crypto import httpslib ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/server.pem') - c = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx) + c = httpslib.HTTPSConnection(srv_host, self.srv_port, + ssl_context=ctx) self.assertRaises(SSL.SSLError, c.request, 'GET', '/') c.close() finally: @@ -240,7 +253,8 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase): try: from M2Crypto import httpslib ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') c = httpslib.HTTPS(srv_host, self.srv_port, ssl_context=ctx) c.putrequest('GET', '/') @@ -261,7 +275,8 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase): try: from M2Crypto import httpslib ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/server.pem') c = httpslib.HTTPS(srv_host, self.srv_port, ssl_context=ctx) c.putrequest('GET', '/') @@ -271,7 +286,7 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase): c.close() finally: self.stop_server(pid) - + def test_HTTPSConnection_illegalkeywordarg(self): from M2Crypto import httpslib self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org', @@ -282,8 +297,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): def test_no_connection(self): ctx = SSL.Context() - s = SSL.Connection(ctx) - + SSL.Connection(ctx) + def test_server_simple(self): pid = self.start_server(self.args) try: @@ -302,7 +317,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') s = SSL.Connection(ctx) s.connect(self.srv_addr) @@ -316,7 +332,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/server.pem') s = SSL.Connection(ctx) self.assertRaises(SSL.SSLError, s.connect, self.srv_addr) @@ -330,7 +347,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): self.assertRaises(ValueError, SSL.Context, 'tlsv5') ctx = SSL.Context() s = SSL.Connection(ctx) - + r = s.get_socket_read_timeout() w = s.get_socket_write_timeout() assert r.sec == 0, r.sec @@ -339,14 +356,14 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): assert w.microsec == 0, w.microsec s.set_socket_read_timeout(SSL.timeout()) - s.set_socket_write_timeout(SSL.timeout(909,9)) + s.set_socket_write_timeout(SSL.timeout(909, 9)) r = s.get_socket_read_timeout() w = s.get_socket_write_timeout() assert r.sec == 600, r.sec assert r.microsec == 0, r.microsec assert w.sec == 909, w.sec #assert w.microsec == 9, w.microsec XXX 4000 - + s.connect(self.srv_addr) data = self.http_get(s) s.close() @@ -418,7 +435,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): try: ctx = SSL.Context('sslv23', weak_crypto=1) s = SSL.Connection(ctx) - if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL + # SSLv2 ciphers disabled by default in newer OpenSSL + if m2.OPENSSL_VERSION_NUMBER < 0x10000000: s.connect(self.srv_addr) self.failUnlessEqual(s.get_version(), 'SSLv2') else: @@ -441,7 +459,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): s.close() finally: self.stop_server(pid) - + def test_no_such_cipher(self): self.args = self.args + ['-cipher', 'AES128-SHA'] pid = self.start_server(self.args) @@ -456,7 +474,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): s.close() finally: self.stop_server(pid) - + # TLS is required in FIPS mode @unittest.skipIf(fips_mode, "Can't be run in FIPS mode") def test_no_weak_cipher(self): @@ -472,7 +490,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): s.close() finally: self.stop_server(pid) - + # TLS is required in FIPS mode @unittest.skipIf(fips_mode, "Can't be run in FIPS mode") def test_use_weak_cipher(self): @@ -487,7 +505,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): finally: self.stop_server(pid) self.failIf(data.find('s_server -quiet -www') == -1) - + def test_cipher_ok(self): self.args = self.args + ['-cipher', 'AES128-SHA'] pid = self.start_server(self.args) @@ -497,16 +515,17 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): s.set_cipher_list('AES128-SHA') s.connect(self.srv_addr) data = self.http_get(s) - + assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name() - + cipher_stack = s.get_ciphers() - assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name() + assert cipher_stack[0].name() == 'AES128-SHA', \ + cipher_stack[0].name() self.assertRaises(IndexError, cipher_stack.__getitem__, 2) # For some reason there are 2 entries in the stack #assert len(cipher_stack) == 1, len(cipher_stack) assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list() - + # Test Cipher_Stack iterator i = 0 for cipher in cipher_stack: @@ -516,12 +535,12 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): # For some reason there are 2 entries in the stack #assert i == 1, i self.assertEqual(i, len(cipher_stack)) - + s.close() finally: self.stop_server(pid) self.failIf(data.find('s_server -quiet -www') == -1) - + def verify_cb_new(self, ok, store): return verify_cb_new_function(ok, store) @@ -529,8 +548,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - self.verify_cb_new) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, self.verify_cb_new) s = SSL.Connection(ctx) try: s.connect(self.srv_addr) @@ -546,8 +565,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - VerifyCB()) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, VerifyCB()) s = SSL.Connection(ctx) try: s.connect(self.srv_addr) @@ -563,8 +582,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - verify_cb_new_function) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, verify_cb_new_function) s = SSL.Connection(ctx) try: s.connect(self.srv_addr) @@ -580,8 +599,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - lambda ok, store: 1) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, lambda ok, store: 1) s = SSL.Connection(ctx) try: s.connect(self.srv_addr) @@ -594,14 +613,14 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): self.failIf(data.find('s_server -quiet -www') == -1) def verify_cb_exception(self, ok, store): - raise Exception, 'We should fail verification' + self.fail('We should fail verification') def test_verify_cb_exception(self): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - self.verify_cb_exception) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, self.verify_cb_exception) s = SSL.Connection(ctx) self.assertRaises(SSL.SSLError, s.connect, self.srv_addr) s.close() @@ -620,8 +639,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - lambda _: '') + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, lambda _: '') s = SSL.Connection(ctx) self.assertRaises(SSL.SSLError, s.connect, self.srv_addr) s.close() @@ -633,9 +652,9 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): from M2Crypto import X509 assert not ok assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \ - err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \ - err == m2.X509_V_ERR_CERT_UNTRUSTED or \ - err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE + err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \ + err == m2.X509_V_ERR_CERT_UNTRUSTED or \ + err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE assert m2.ssl_ctx_get_cert_store(ctx_ptr) assert X509.X509(x509_ptr).as_pem() except AssertionError: @@ -648,8 +667,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - self.verify_cb_old) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, self.verify_cb_old) s = SSL.Connection(ctx) try: s.connect(self.srv_addr) @@ -665,8 +684,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - SSL.cb.ssl_verify_callback) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, SSL.cb.ssl_verify_callback) ctx.set_allow_unknown_ca(1) s = SSL.Connection(ctx) try: @@ -683,8 +702,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9, - SSL.cb.ssl_verify_callback_allow_unknown_ca) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9, SSL.cb.ssl_verify_callback_allow_unknown_ca) s = SSL.Connection(ctx) try: s.connect(self.srv_addr) @@ -700,7 +719,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') s = SSL.Connection(ctx) try: @@ -717,7 +737,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/server.pem') s = SSL.Connection(ctx) self.assertRaises(SSL.SSLError, s.connect, self.srv_addr) @@ -730,7 +751,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') ctx.load_cert('tests/x509.pem') s = SSL.Connection(ctx) @@ -749,7 +771,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') ctx.load_cert('tests/x509.pem') s = SSL.Connection(ctx) @@ -768,7 +791,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') s = SSL.Connection(ctx) self.assertRaises(SSL.SSLError, s.connect, self.srv_addr) @@ -777,11 +801,12 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase): self.stop_server(pid) def test_verify_nocert_fail(self): - self.args.extend(['-nocert']) + self.args.extend(['-nocert']) pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, + 9) ctx.load_verify_locations('tests/ca.pem') s = SSL.Connection(ctx) self.assertRaises(SSL.SSLError, s.connect, self.srv_addr) @@ -898,7 +923,7 @@ class UrllibSSLClientTestCase(BaseSSLClientTestCase): class Urllib2SSLClientTestCase(BaseSSLClientTestCase): - if sys.version_info >= (2,4): + if sys.version_info >= (2, 4): def test_urllib2(self): pid = self.start_server(self.args) try: @@ -911,14 +936,15 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase): finally: self.stop_server(pid) self.failIf(data.find('s_server -quiet -www') == -1) - + def test_urllib2_secure_context(self): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify( + SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) ctx.load_verify_locations('tests/ca.pem') - + from M2Crypto import m2urllib2 opener = m2urllib2.build_opener(ctx) opener.addheaders = [('Connection', 'close')] @@ -928,18 +954,20 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase): finally: self.stop_server(pid) self.failIf(data.find('s_server -quiet -www') == -1) - + def test_urllib2_secure_context_fail(self): pid = self.start_server(self.args) try: ctx = SSL.Context() - ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) + ctx.set_verify( + SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9) ctx.load_verify_locations('tests/server.pem') - + from M2Crypto import m2urllib2 opener = m2urllib2.build_opener(ctx) opener.addheaders = [('Connection', 'close')] - self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, self.srv_port)) + self.assertRaises(SSL.SSLError, opener.open, + 'https://%s:%s/' % (srv_host, self.srv_port)) finally: self.stop_server(pid) @@ -949,9 +977,11 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase): ctx = SSL.Context() from M2Crypto import m2urllib2 - opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler()) + opener = m2urllib2.build_opener( + ctx, m2urllib2.HTTPBasicAuthHandler()) m2urllib2.install_opener(opener) - req = m2urllib2.Request('https://%s:%s/' % (srv_host, self.srv_port)) + req = m2urllib2.Request('https://%s:%s/' % + (srv_host, self.srv_port)) u = m2urllib2.urlopen(req) data = u.read() u.close() @@ -963,8 +993,7 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase): ctx = SSL.Context() from M2Crypto import m2urllib2 - opener = m2urllib2.build_opener(ctx, - m2urllib2.HTTPBasicAuthHandler()) + m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler()) def test_urllib2_leak(self): pid = self.start_server(self.args) @@ -1015,12 +1044,12 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase): self.failIf(data.find('s_server -quiet -www') == -1) def test_makefile_timeout_fires(self): - # This is convoluted because (openssl s_server -www) starts writing the - # response as soon as it receives the first line of the request, so it's - # possible for it to send the response before the request is sent and - # there would be no timeout. So, let the server spend time reading from - # an empty pipe - FIFO_NAME = 'test_makefile_timeout_fires_fifo' + # This is convoluted because (openssl s_server -www) starts + # writing the response as soon as it receives the first line of + # the request, so it's possible for it to send the response + # before the request is sent and there would be no timeout. So, + # let the server spend time reading from an empty pipe + FIFO_NAME = 'test_makefile_timeout_fires_fifo' # noqa os.mkfifo('tests/' + FIFO_NAME) pipe_pid = os.fork() try: @@ -1062,9 +1091,10 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase): import M2Crypto.SSL.TwistedProtocolWrapper as wrapper except ImportError: import warnings - warnings.warn('Skipping twisted wrapper test because twisted not found') + warnings.warn( + 'Skipping twisted wrapper test because twisted not found') return - + class EchoClient(LineReceiver): def connectionMade(self): self.sendLine('GET / HTTP/1.0\n\n') @@ -1075,14 +1105,14 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase): class EchoClientFactory(ClientFactory): protocol = EchoClient - + def clientConnectionFailed(self, connector, reason): reactor.stop() assert 0, reason - + def clientConnectionLost(self, connector, reason): reactor.stop() - + pid = self.start_server(self.args) class ContextFactory: @@ -1092,11 +1122,13 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase): try: global twisted_data twisted_data = '' - - contextFactory = ContextFactory() + + context_factory = ContextFactory() factory = EchoClientFactory() - wrapper.connectSSL(srv_host, self.srv_port, factory, contextFactory) - reactor.run() # This will block until reactor.stop() is called + wrapper.connectSSL(srv_host, self.srv_port, factory, + context_factory) + # This will block until reactor.stop() is called + reactor.run() finally: self.stop_server(pid) self.failIf(twisted_data.find('s_server -quiet -www') == -1) @@ -1129,7 +1161,8 @@ class FtpslibTestCase(unittest.TestCase): from M2Crypto import ftpslib f = ftpslib.FTP_TLS() # 2.6 used to raise AttributeError: - self.assertRaises(socket.gaierror, f.connect, 'no-such-host-dfgHJK56789', 990) + self.assertRaises(socket.gaierror, f.connect, + 'no-such-host-dfgHJK56789', 990) def suite(): @@ -1144,7 +1177,7 @@ def suite(): suite.addTest(unittest.makeSuite(MiscSSLClientTestCase)) suite.addTest(unittest.makeSuite(FtpslibTestCase)) try: - import M2Crypto.SSL.TwistedProtocolWrapper as wrapper + import M2Crypto.SSL.TwistedProtocolWrapper as wrapper # noqa suite.addTest(unittest.makeSuite(TwistedSSLClientTestCase)) except ImportError: pass @@ -1171,12 +1204,12 @@ def zap_servers(): if __name__ == '__main__': report_leaks = 0 - + if report_leaks: import gc gc.enable() gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL) - + try: Rand.load_file('randpool.dat', -1) unittest.TextTestRunner().run(suite()) -- cgit v1.2.1