summaryrefslogtreecommitdiff
path: root/tests/test_ssl.py
diff options
context:
space:
mode:
authorMatěj Cepl <mcepl@cepl.eu>2015-10-13 13:05:26 +0200
committerMatěj Cepl <mcepl@cepl.eu>2015-10-13 15:52:36 +0200
commit0656a84f886455b108d0fb3a5424989f41faee5a (patch)
tree577d485cef4f34927c5d0e1320155199dceec0f9 /tests/test_ssl.py
parentf8813cb81e670494c71357b4118fb0480e811c8d (diff)
downloadm2crypto-0656a84f886455b108d0fb3a5424989f41faee5a.tar.gz
Clean up whitespace etc.
Diffstat (limited to 'tests/test_ssl.py')
-rw-r--r--tests/test_ssl.py229
1 files changed, 131 insertions, 98 deletions
diff --git a/tests/test_ssl.py b/tests/test_ssl.py
index 7918609..0747de8 100644
--- a/tests/test_ssl.py
+++ b/tests/test_ssl.py
@@ -20,13 +20,18 @@ Others:
- ThreadingSSLServer
"""
-import os, signal, socket, sys, tempfile, thread, time
+import os
+import signal
+import socket
+import sys
+import tempfile
+import time
try:
import unittest2 as unittest
except ImportError:
import unittest
-from M2Crypto import Rand, SSL, m2, Err
+from M2Crypto import Err, Rand, SSL, m2
from fips import fips_mode
@@ -58,7 +63,7 @@ def verify_cb_new_function(ok, store):
stack = store.get1_chain()
assert len(stack) == 1
assert stack[0].as_pem() == x509.as_pem()
- except AssertionError, e:
+ except AssertionError:
# If we let exceptions propagate from here the
# caller may see strange errors. This is cleaner.
return 0
@@ -93,7 +98,7 @@ class BaseSSLClientTestCase(unittest.TestCase):
def start_server(self, args):
if not self.openssl_in_path:
raise Exception('openssl command not in PATH')
-
+
pid = os.fork()
if pid == 0:
# openssl must be started in the tests directory for it
@@ -103,7 +108,7 @@ class BaseSSLClientTestCase(unittest.TestCase):
os.execvp('openssl', args)
finally:
os.chdir('..')
-
+
else:
time.sleep(sleepTime)
return pid
@@ -120,7 +125,7 @@ class BaseSSLClientTestCase(unittest.TestCase):
r = s.recv(4096)
if not r:
break
- except SSL.SSLError: # s_server throws an 'unexpected eof'...
+ except SSL.SSLError: # s_server throws an 'unexpected eof'...
break
resp = resp + r
return resp
@@ -136,7 +141,7 @@ class BaseSSLClientTestCase(unittest.TestCase):
class PassSSLClientTestCase(BaseSSLClientTestCase):
-
+
def test_pass(self):
pass
@@ -161,22 +166,26 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
ctx = SSL.Context()
ctx.load_verify_locations(cafile='tests/ca.pem')
ctx.load_cert('tests/x509.pem')
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 1)
ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
- c = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx)
+ c = httpslib.HTTPSConnection(srv_host, self.srv_port,
+ ssl_context=ctx)
c.request('GET', '/')
ses = c.get_session()
t = ses.as_text()
data = c.getresponse().read()
# Appearently closing connection here screws session; Ali Polatel?
# c.close()
-
+
ctx2 = SSL.Context()
ctx2.load_verify_locations(cafile='tests/ca.pem')
ctx2.load_cert('tests/x509.pem')
- ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)
+ ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 1)
ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)
- c2 = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx2)
+ c2 = httpslib.HTTPSConnection(srv_host, self.srv_port,
+ ssl_context=ctx2)
c2.set_session(ses)
c2.request('GET', '/')
ses2 = c2.get_session()
@@ -187,16 +196,18 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
assert t == t2, "Sessions did not match"
finally:
self.stop_server(pid)
- self.failIf(date.find('s_server -quiet -www') == -1)
+ self.failIf(data.find('s_server -quiet -www') == -1)
def test_HTTPSConnection_secure_context(self):
pid = self.start_server(self.args)
try:
from M2Crypto import httpslib
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
- c = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx)
+ c = httpslib.HTTPSConnection(srv_host, self.srv_port,
+ ssl_context=ctx)
c.request('GET', '/')
data = c.getresponse().read()
c.close()
@@ -209,9 +220,11 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
try:
from M2Crypto import httpslib
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/server.pem')
- c = httpslib.HTTPSConnection(srv_host, self.srv_port, ssl_context=ctx)
+ c = httpslib.HTTPSConnection(srv_host, self.srv_port,
+ ssl_context=ctx)
self.assertRaises(SSL.SSLError, c.request, 'GET', '/')
c.close()
finally:
@@ -240,7 +253,8 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
try:
from M2Crypto import httpslib
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
c = httpslib.HTTPS(srv_host, self.srv_port, ssl_context=ctx)
c.putrequest('GET', '/')
@@ -261,7 +275,8 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
try:
from M2Crypto import httpslib
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/server.pem')
c = httpslib.HTTPS(srv_host, self.srv_port, ssl_context=ctx)
c.putrequest('GET', '/')
@@ -271,7 +286,7 @@ class HttpslibSSLClientTestCase(BaseSSLClientTestCase):
c.close()
finally:
self.stop_server(pid)
-
+
def test_HTTPSConnection_illegalkeywordarg(self):
from M2Crypto import httpslib
self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',
@@ -282,8 +297,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
def test_no_connection(self):
ctx = SSL.Context()
- s = SSL.Connection(ctx)
-
+ SSL.Connection(ctx)
+
def test_server_simple(self):
pid = self.start_server(self.args)
try:
@@ -302,7 +317,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
s.connect(self.srv_addr)
@@ -316,7 +332,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/server.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
@@ -330,7 +347,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
self.assertRaises(ValueError, SSL.Context, 'tlsv5')
ctx = SSL.Context()
s = SSL.Connection(ctx)
-
+
r = s.get_socket_read_timeout()
w = s.get_socket_write_timeout()
assert r.sec == 0, r.sec
@@ -339,14 +356,14 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
assert w.microsec == 0, w.microsec
s.set_socket_read_timeout(SSL.timeout())
- s.set_socket_write_timeout(SSL.timeout(909,9))
+ s.set_socket_write_timeout(SSL.timeout(909, 9))
r = s.get_socket_read_timeout()
w = s.get_socket_write_timeout()
assert r.sec == 600, r.sec
assert r.microsec == 0, r.microsec
assert w.sec == 909, w.sec
#assert w.microsec == 9, w.microsec XXX 4000
-
+
s.connect(self.srv_addr)
data = self.http_get(s)
s.close()
@@ -418,7 +435,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
try:
ctx = SSL.Context('sslv23', weak_crypto=1)
s = SSL.Connection(ctx)
- if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL
+ # SSLv2 ciphers disabled by default in newer OpenSSL
+ if m2.OPENSSL_VERSION_NUMBER < 0x10000000:
s.connect(self.srv_addr)
self.failUnlessEqual(s.get_version(), 'SSLv2')
else:
@@ -441,7 +459,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
s.close()
finally:
self.stop_server(pid)
-
+
def test_no_such_cipher(self):
self.args = self.args + ['-cipher', 'AES128-SHA']
pid = self.start_server(self.args)
@@ -456,7 +474,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
s.close()
finally:
self.stop_server(pid)
-
+
# TLS is required in FIPS mode
@unittest.skipIf(fips_mode, "Can't be run in FIPS mode")
def test_no_weak_cipher(self):
@@ -472,7 +490,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
s.close()
finally:
self.stop_server(pid)
-
+
# TLS is required in FIPS mode
@unittest.skipIf(fips_mode, "Can't be run in FIPS mode")
def test_use_weak_cipher(self):
@@ -487,7 +505,7 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
finally:
self.stop_server(pid)
self.failIf(data.find('s_server -quiet -www') == -1)
-
+
def test_cipher_ok(self):
self.args = self.args + ['-cipher', 'AES128-SHA']
pid = self.start_server(self.args)
@@ -497,16 +515,17 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
s.set_cipher_list('AES128-SHA')
s.connect(self.srv_addr)
data = self.http_get(s)
-
+
assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()
-
+
cipher_stack = s.get_ciphers()
- assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()
+ assert cipher_stack[0].name() == 'AES128-SHA', \
+ cipher_stack[0].name()
self.assertRaises(IndexError, cipher_stack.__getitem__, 2)
# For some reason there are 2 entries in the stack
#assert len(cipher_stack) == 1, len(cipher_stack)
assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()
-
+
# Test Cipher_Stack iterator
i = 0
for cipher in cipher_stack:
@@ -516,12 +535,12 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
# For some reason there are 2 entries in the stack
#assert i == 1, i
self.assertEqual(i, len(cipher_stack))
-
+
s.close()
finally:
self.stop_server(pid)
self.failIf(data.find('s_server -quiet -www') == -1)
-
+
def verify_cb_new(self, ok, store):
return verify_cb_new_function(ok, store)
@@ -529,8 +548,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- self.verify_cb_new)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, self.verify_cb_new)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
@@ -546,8 +565,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- VerifyCB())
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, VerifyCB())
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
@@ -563,8 +582,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- verify_cb_new_function)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, verify_cb_new_function)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
@@ -580,8 +599,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- lambda ok, store: 1)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, lambda ok, store: 1)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
@@ -594,14 +613,14 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
self.failIf(data.find('s_server -quiet -www') == -1)
def verify_cb_exception(self, ok, store):
- raise Exception, 'We should fail verification'
+ self.fail('We should fail verification')
def test_verify_cb_exception(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- self.verify_cb_exception)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, self.verify_cb_exception)
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
@@ -620,8 +639,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- lambda _: '')
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, lambda _: '')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
s.close()
@@ -633,9 +652,9 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
from M2Crypto import X509
assert not ok
assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \
- err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
- err == m2.X509_V_ERR_CERT_UNTRUSTED or \
- err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
+ err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \
+ err == m2.X509_V_ERR_CERT_UNTRUSTED or \
+ err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE
assert m2.ssl_ctx_get_cert_store(ctx_ptr)
assert X509.X509(x509_ptr).as_pem()
except AssertionError:
@@ -648,8 +667,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- self.verify_cb_old)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, self.verify_cb_old)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
@@ -665,8 +684,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- SSL.cb.ssl_verify_callback)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, SSL.cb.ssl_verify_callback)
ctx.set_allow_unknown_ca(1)
s = SSL.Connection(ctx)
try:
@@ -683,8 +702,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,
- SSL.cb.ssl_verify_callback_allow_unknown_ca)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9, SSL.cb.ssl_verify_callback_allow_unknown_ca)
s = SSL.Connection(ctx)
try:
s.connect(self.srv_addr)
@@ -700,7 +719,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
try:
@@ -717,7 +737,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/server.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
@@ -730,7 +751,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
ctx.load_cert('tests/x509.pem')
s = SSL.Connection(ctx)
@@ -749,7 +771,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
ctx.load_cert('tests/x509.pem')
s = SSL.Connection(ctx)
@@ -768,7 +791,8 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
@@ -777,11 +801,12 @@ class MiscSSLClientTestCase(BaseSSLClientTestCase):
self.stop_server(pid)
def test_verify_nocert_fail(self):
- self.args.extend(['-nocert'])
+ self.args.extend(['-nocert'])
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,
+ 9)
ctx.load_verify_locations('tests/ca.pem')
s = SSL.Connection(ctx)
self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)
@@ -898,7 +923,7 @@ class UrllibSSLClientTestCase(BaseSSLClientTestCase):
class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
- if sys.version_info >= (2,4):
+ if sys.version_info >= (2, 4):
def test_urllib2(self):
pid = self.start_server(self.args)
try:
@@ -911,14 +936,15 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
finally:
self.stop_server(pid)
self.failIf(data.find('s_server -quiet -www') == -1)
-
+
def test_urllib2_secure_context(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(
+ SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/ca.pem')
-
+
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx)
opener.addheaders = [('Connection', 'close')]
@@ -928,18 +954,20 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
finally:
self.stop_server(pid)
self.failIf(data.find('s_server -quiet -www') == -1)
-
+
def test_urllib2_secure_context_fail(self):
pid = self.start_server(self.args)
try:
ctx = SSL.Context()
- ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
+ ctx.set_verify(
+ SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)
ctx.load_verify_locations('tests/server.pem')
-
+
from M2Crypto import m2urllib2
opener = m2urllib2.build_opener(ctx)
opener.addheaders = [('Connection', 'close')]
- self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, self.srv_port))
+ self.assertRaises(SSL.SSLError, opener.open,
+ 'https://%s:%s/' % (srv_host, self.srv_port))
finally:
self.stop_server(pid)
@@ -949,9 +977,11 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
ctx = SSL.Context()
from M2Crypto import m2urllib2
- opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())
+ opener = m2urllib2.build_opener(
+ ctx, m2urllib2.HTTPBasicAuthHandler())
m2urllib2.install_opener(opener)
- req = m2urllib2.Request('https://%s:%s/' % (srv_host, self.srv_port))
+ req = m2urllib2.Request('https://%s:%s/' %
+ (srv_host, self.srv_port))
u = m2urllib2.urlopen(req)
data = u.read()
u.close()
@@ -963,8 +993,7 @@ class Urllib2SSLClientTestCase(BaseSSLClientTestCase):
ctx = SSL.Context()
from M2Crypto import m2urllib2
- opener = m2urllib2.build_opener(ctx,
- m2urllib2.HTTPBasicAuthHandler())
+ m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())
def test_urllib2_leak(self):
pid = self.start_server(self.args)
@@ -1015,12 +1044,12 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase):
self.failIf(data.find('s_server -quiet -www') == -1)
def test_makefile_timeout_fires(self):
- # This is convoluted because (openssl s_server -www) starts writing the
- # response as soon as it receives the first line of the request, so it's
- # possible for it to send the response before the request is sent and
- # there would be no timeout. So, let the server spend time reading from
- # an empty pipe
- FIFO_NAME = 'test_makefile_timeout_fires_fifo'
+ # This is convoluted because (openssl s_server -www) starts
+ # writing the response as soon as it receives the first line of
+ # the request, so it's possible for it to send the response
+ # before the request is sent and there would be no timeout. So,
+ # let the server spend time reading from an empty pipe
+ FIFO_NAME = 'test_makefile_timeout_fires_fifo' # noqa
os.mkfifo('tests/' + FIFO_NAME)
pipe_pid = os.fork()
try:
@@ -1062,9 +1091,10 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase):
import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
except ImportError:
import warnings
- warnings.warn('Skipping twisted wrapper test because twisted not found')
+ warnings.warn(
+ 'Skipping twisted wrapper test because twisted not found')
return
-
+
class EchoClient(LineReceiver):
def connectionMade(self):
self.sendLine('GET / HTTP/1.0\n\n')
@@ -1075,14 +1105,14 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase):
class EchoClientFactory(ClientFactory):
protocol = EchoClient
-
+
def clientConnectionFailed(self, connector, reason):
reactor.stop()
assert 0, reason
-
+
def clientConnectionLost(self, connector, reason):
reactor.stop()
-
+
pid = self.start_server(self.args)
class ContextFactory:
@@ -1092,11 +1122,13 @@ class TwistedSSLClientTestCase(BaseSSLClientTestCase):
try:
global twisted_data
twisted_data = ''
-
- contextFactory = ContextFactory()
+
+ context_factory = ContextFactory()
factory = EchoClientFactory()
- wrapper.connectSSL(srv_host, self.srv_port, factory, contextFactory)
- reactor.run() # This will block until reactor.stop() is called
+ wrapper.connectSSL(srv_host, self.srv_port, factory,
+ context_factory)
+ # This will block until reactor.stop() is called
+ reactor.run()
finally:
self.stop_server(pid)
self.failIf(twisted_data.find('s_server -quiet -www') == -1)
@@ -1129,7 +1161,8 @@ class FtpslibTestCase(unittest.TestCase):
from M2Crypto import ftpslib
f = ftpslib.FTP_TLS()
# 2.6 used to raise AttributeError:
- self.assertRaises(socket.gaierror, f.connect, 'no-such-host-dfgHJK56789', 990)
+ self.assertRaises(socket.gaierror, f.connect,
+ 'no-such-host-dfgHJK56789', 990)
def suite():
@@ -1144,7 +1177,7 @@ def suite():
suite.addTest(unittest.makeSuite(MiscSSLClientTestCase))
suite.addTest(unittest.makeSuite(FtpslibTestCase))
try:
- import M2Crypto.SSL.TwistedProtocolWrapper as wrapper
+ import M2Crypto.SSL.TwistedProtocolWrapper as wrapper # noqa
suite.addTest(unittest.makeSuite(TwistedSSLClientTestCase))
except ImportError:
pass
@@ -1171,12 +1204,12 @@ def zap_servers():
if __name__ == '__main__':
report_leaks = 0
-
+
if report_leaks:
import gc
gc.enable()
gc.set_debug(gc.DEBUG_LEAK & ~gc.DEBUG_SAVEALL)
-
+
try:
Rand.load_file('randpool.dat', -1)
unittest.TextTestRunner().run(suite())