diff options
-rw-r--r-- | admin/runtests | 2 | ||||
-rw-r--r-- | openid/consumer/consumer.py | 5 | ||||
-rw-r--r-- | openid/message.py | 2 | ||||
-rw-r--r-- | openid/test/test_association_response.py | 333 | ||||
-rw-r--r-- | openid/test/test_consumer.py | 138 |
5 files changed, 340 insertions, 140 deletions
diff --git a/admin/runtests b/admin/runtests index 610334d..4586566 100644 --- a/admin/runtests +++ b/admin/runtests @@ -61,6 +61,7 @@ def pyunitTests(): from openid.test import test_fetchers from openid.test import test_urinorm from openid.test import test_nonce + from openid.test import test_association_response # yadis tests from openid.test import test_parsehtml from openid.test import test_yadis_discover @@ -77,6 +78,7 @@ def pyunitTests(): test_etxrd, test_xri, test_xrires, + test_association_response, ] # Some modules have data-driven tests, and they use custom methods diff --git a/openid/consumer/consumer.py b/openid/consumer/consumer.py index 5274bfd..d0ecb04 100644 --- a/openid/consumer/consumer.py +++ b/openid/consumer/consumer.py @@ -407,7 +407,8 @@ class PlainTextConsumerSession(object): return {} def extractSecret(self, response): - return oidutil.fromBase64(response.getArg(OPENID_NS, 'mac_key')) + mac_key64 = response.getArg(OPENID_NS, 'mac_key', no_default) + return oidutil.fromBase64(mac_key64) class UnsupportedAssocType(Exception): """Exception raised when the server tells us that the session type @@ -858,8 +859,6 @@ class GenericConsumer(object): # 'no-encryption' so that it can be handled in the same # way as OpenID 2 'no-encryption' respones. elif session_type == '' or session_type is None: - oidutil.log('Falling back to no-encryption association ' - 'session from %s' % assoc_session.session_type) session_type = 'no-encryption' return session_type diff --git a/openid/message.py b/openid/message.py index 76477f5..7fd7583 100644 --- a/openid/message.py +++ b/openid/message.py @@ -378,7 +378,7 @@ class Message(object): return self.args[args_key] except KeyError: if default is no_default: - raise + raise KeyError((namespace, key)) else: return default diff --git a/openid/test/test_association_response.py b/openid/test/test_association_response.py new file mode 100644 index 0000000..6b809c3 --- /dev/null +++ b/openid/test/test_association_response.py @@ -0,0 +1,333 @@ +"""Tests for consumer handling of association responses + +This duplicates some things that are covered by test_consumer, but +this works for now. +""" +from openid import oidutil +from openid.test.test_consumer import CatchLogs +from openid.message import Message, OPENID2_NS, OPENID_NS +from openid.server.server import DiffieHellmanSHA1ServerSession +from openid.consumer.consumer import GenericConsumer, \ + DiffieHellmanSHA1ConsumerSession +from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_1_1_TYPE, OPENID_2_0_TYPE +import _memstore +import unittest + +# Some values we can use for convenience (see mkAssocResponse) +association_response_values = { + 'expires_in': 'a time', + 'assoc_handle':'a handle', + 'assoc_type':'a type', + 'session_type':'a session type', + 'ns':OPENID2_NS, + } + +def mkAssocResponse(*keys): + """Build an association response message that contains the + specified subset of keys. The values come from + `association_response_values`. + + This is useful for testing for missing keys and other times that + we don't care what the values are.""" + args = dict([(key, association_response_values[key]) for key in keys]) + return Message.fromOpenIDArgs(args) + +class BaseTestParseAssociationMissingFields(CatchLogs, unittest.TestCase): + """ + According to 'Association Session Response' subsection 'Common + Response Parameters', the following fields are required for OpenID + 2.0: + + * ns + * session_type + * assoc_handle + * assoc_type + * expires_in + + If 'ns' is missing, it will fall back to OpenID 1 checking. In + OpenID 1, everything except 'session_type' and 'ns' are required. + """ + + def mkTest(keys): + """Make a test that ensures that an association response that + is missing required fields will short-circuit return None.""" + + def test(self): + msg = mkAssocResponse(*keys) + + # Store should not be needed + consumer = GenericConsumer(store=None) + + result = consumer._parseAssociation(msg, None, 'dummy.url') + self.failUnless(result is None) + self.failUnlessEqual(len(self.messages), 1) + self.failUnless(self.messages[0].startswith( + 'Getting association: missing key')) + + return test + + mkTest = staticmethod(mkTest) + +class TestParseAssociationMissingFieldsOpenID2( + BaseTestParseAssociationMissingFields): + """Test for returning an error upon missing fields in association + responses for OpenID 2""" + mkTest = BaseTestParseAssociationMissingFields.mkTest + + test_noFields_openid2 = mkTest(['ns']) + + test_missingExpires_openid2 = mkTest( + ['assoc_handle', 'assoc_type', 'session_type', 'ns']) + + test_missingHandle_openid2 = mkTest( + ['expires_in', 'assoc_type', 'session_type', 'ns']) + + test_missingAssocType_openid2 = mkTest( + ['expires_in', 'assoc_handle', 'session_type', 'ns']) + + test_missingSessionType_openid2 = mkTest( + ['expires_in', 'assoc_handle', 'assoc_type', 'ns']) + +class TestParseAssociationMissingFieldsOpenID1( + BaseTestParseAssociationMissingFields): + """Test for returning an error upon missing fields in association + responses for OpenID 2""" + mkTest = BaseTestParseAssociationMissingFields.mkTest + + test_noFields_openid1 = mkTest([]) + + test_missingExpires_openid1 = mkTest(['assoc_handle', 'assoc_type']) + + test_missingHandle_openid1 = mkTest(['expires_in', 'assoc_type']) + + test_missingAssocType_openid1 = mkTest(['expires_in', 'assoc_handle']) + +class DummyAssocationSession(object): + def __init__(self, session_type, allowed_assoc_types=()): + self.session_type = session_type + self.allowed_assoc_types = allowed_assoc_types + +class ParseAssociationSessionTypeMismatch(unittest.TestCase): + def mkTest(requested_session_type, response_session_type, openid1=False): + def test(self): + assoc_session = DummyAssocationSession(requested_session_type) + consumer = GenericConsumer(store=None) + keys = association_response_values.keys() + if openid1: + keys.remove('ns') + msg = mkAssocResponse(keys) + msg.setArg(OPENID_NS, 'session_type', response_session_type) + result = consumer._parseAssociation( + msg, assoc_session, server_url='dummy.url') + self.failUnless(result is None) + + test_typeMismatch = mkTest( + requested_session_type='no-encryption', + response_session_type='', + ) + + test_typeMismatch = mkTest( + requested_session_type='DH-SHA1', + response_session_type='no-encryption', + ) + + test_typeMismatch = mkTest( + requested_session_type='DH-SHA256', + response_session_type='no-encryption', + ) + + test_typeMismatch = mkTest( + requested_session_type='no-encryption', + response_session_type='DH-SHA1', + ) + + +class TestOpenID1AssociationResponseSessionType(CatchLogs, unittest.TestCase): + def mkTest(expected_session_type, session_type_value): + """Return a test method that will check what session type will + be used if the OpenID 1 response to an associate call sets the + 'session_type' field to `session_type_value` + """ + def test(self): + self._doTest(expected_session_type, session_type_value) + self.failUnlessEqual(0, len(self.messages)) + + return test + + def _doTest(self, expected_session_type, session_type_value): + # Create a Message with just 'session_type' in it, since + # that's all this function will use. 'session_type' may be + # absent if it's set to None. + args = {} + if session_type_value is not None: + args['session_type'] = session_type_value + message = Message.fromOpenIDArgs(args) + self.failUnless(message.isOpenID1()) + + # Store should not be needed + consumer = GenericConsumer(store=None) + + actual_session_type = consumer._getOpenID1SessionType(message) + error_message = ('Returned sesion type parameter %r was expected ' + 'to yield session type %r, but yielded %r' % + (session_type_value, expected_session_type, + actual_session_type)) + self.failUnlessEqual( + expected_session_type, actual_session_type, error_message) + + test_none = mkTest( + session_type_value=None, + expected_session_type='no-encryption', + ) + + test_empty = mkTest( + session_type_value='', + expected_session_type='no-encryption', + ) + + # This one's different because it expects log messages + def test_explicitNoEncryption(self): + self._doTest( + session_type_value='no-encryption', + expected_session_type='no-encryption', + ) + self.failUnlessEqual(1, len(self.messages)) + self.failUnless(self.messages[0].startswith( + 'WARNING: OpenID server sent "no-encryption"')) + + test_dhSHA1 = mkTest( + session_type_value='DH-SHA1', + expected_session_type='DH-SHA1', + ) + + # DH-SHA256 is not a valid session type for OpenID1, but this + # function does not test that. This is mostly just to make sure + # that it will pass-through stuff that is not explicitly handled, + # so it will get handled the same way as it is handled for OpenID + # 2 + test_dhSHA256 = mkTest( + session_type_value='DH-SHA256', + expected_session_type='DH-SHA256', + ) + +class TestAssocTypeInvalidForSession(CatchLogs, unittest.TestCase): + def _setup(self, assoc_type): + no_encryption_session = DummyAssocationSession('matching-session-type', + ['good-assoc-type']) + msg = mkAssocResponse(*association_response_values.keys()) + msg.setArg(OPENID2_NS, 'session_type', 'matching-session-type') + msg.setArg(OPENID2_NS, 'assoc_type', assoc_type) + + # Store should not be needed + consumer = GenericConsumer(store=None) + + result = consumer._parseAssociation( + msg, no_encryption_session, 'dummy.url') + + + def test_badAssocType(self): + self._setup('unsupported') + self.failUnlessEqual(1, len(self.messages)) + self.failUnless(self.messages[0].startswith( + 'Unsupported assoc_type for session')) + + def test_badExpiresIn(self): + self._setup('good-assoc-type') + self.failUnlessEqual(1, len(self.messages)) + self.failUnless(self.messages[0].startswith( + 'Getting Association: invalid expires_in')) + + +# XXX: This is what causes most of the imports in this file. It is +# sort of a unit test and sort of a functional test. I'm not terribly +# fond of it. +class TestParseAssociation(unittest.TestCase): + secret = 'x' * 20 + + def setUp(self): + self.store = _memstore.MemoryStore() + self.consumer = GenericConsumer(self.store) + self.endpoint = OpenIDServiceEndpoint() + + def _setUpDH(self): + sess, args = self.consumer._createAssociateRequest( + self.endpoint, 'HMAC-SHA1', 'DH-SHA1') + + assert self.endpoint.compatibilityMode() == \ + (args.get('openid.ns') is None), \ + "Endpoint compat mode %r != (openid.ns in args)" % \ + (self.endpoint.compatibilityMode()) + + message = Message.fromPostArgs(args) + server_sess = DiffieHellmanSHA1ServerSession.fromMessage(message) + server_resp = server_sess.answer(self.secret) + server_resp['assoc_type'] = 'HMAC-SHA1' + server_resp['assoc_handle'] = 'handle' + server_resp['expires_in'] = '1000' + server_resp['session_type'] = 'DH-SHA1' + return sess, Message.fromOpenIDArgs(server_resp) + + def test_success(self): + sess, server_resp = self._setUpDH() + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failIf(ret is None) + self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1') + self.failUnlessEqual(ret.secret, self.secret) + self.failUnlessEqual(ret.handle, 'handle') + self.failUnlessEqual(ret.lifetime, 1000) + + def test_openid2success(self): + # Use openid 2 type in endpoint so _setUpDH checks + # compatibility mode state properly + self.endpoint.type_uris = [OPENID_2_0_TYPE, OPENID_1_1_TYPE] + self.test_success() + + def test_badAssocType(self): + sess, server_resp = self._setUpDH() + server_resp.setArg(OPENID_NS, 'assoc_type', 'Crazy Low Prices!!!') + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failUnless(ret is None) + + def test_badExpiresIn(self): + sess, server_resp = self._setUpDH() + server_resp.setArg(OPENID_NS, 'expires_in', 'Crazy Low Prices!!!') + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failUnless(ret is None) + + def test_badSessionType(self): + sess, server_resp = self._setUpDH() + server_resp.setArg(OPENID_NS, 'session_type', '|/iA6rA') + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failUnless(ret is None) + + def test_plainFallback(self): + sess = DiffieHellmanSHA1ConsumerSession() + server_resp = Message.fromOpenIDArgs({ + 'assoc_type': 'HMAC-SHA1', + 'assoc_handle': 'handle', + 'expires_in': '1000', + 'mac_key': oidutil.toBase64(self.secret), + }) + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failIf(ret is None) + self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1') + self.failUnlessEqual(ret.secret, self.secret) + self.failUnlessEqual(ret.handle, 'handle') + self.failUnlessEqual(ret.lifetime, 1000) + + def test_plainFallbackFailure(self): + sess = DiffieHellmanSHA1ConsumerSession() + # missing mac_key + server_resp = Message.fromOpenIDArgs({ + 'assoc_type': 'HMAC-SHA1', + 'assoc_handle': 'handle', + 'expires_in': '1000', + }) + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failUnless(ret is None) + + def test_badDHValues(self): + sess, server_resp = self._setUpDH() + server_resp.setArg(OPENID_NS, 'enc_mac_key', '\x00\x00\x00') + ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') + self.failUnless(ret is None) diff --git a/openid/test/test_consumer.py b/openid/test/test_consumer.py index 34daa16..1256328 100644 --- a/openid/test/test_consumer.py +++ b/openid/test/test_consumer.py @@ -2,7 +2,8 @@ import urlparse import cgi import time -from openid.message import Message, OPENID_NS, OPENID2_NS, IDENTIFIER_SELECT +from openid.message import Message, OPENID_NS, OPENID2_NS, IDENTIFIER_SELECT, \ + OPENID1_NS from openid import cryptutil, dh, oidutil, kvform from openid.store.nonce import mkNonce, split as splitNonce from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE, \ @@ -1028,99 +1029,6 @@ class TestSuccessResponse(unittest.TestCase): resp = mkSuccess(self.endpoint, {'openid.return_to':'return_to'}) self.failUnlessEqual(resp.getReturnTo(), 'return_to') -class TestParseAssociation(TestIdRes): - secret = 'x' * 20 - - def test_missing(self): - # Missing required arguments - result = self.consumer._parseAssociation({}, None, 'server_url') - self.failUnless(result is None) - - def _setUpDH(self): - sess, args = \ - self.consumer._createAssociateRequest(self.endpoint, - 'HMAC-SHA1', - 'DH-SHA1') - - assert self.endpoint.compatibilityMode() == \ - (args.get('openid.ns') is None), \ - "Endpoint compat mode %r != (openid.ns in args)" % \ - (self.endpoint.compatibilityMode()) - - message = Message.fromPostArgs(args) - server_sess = DiffieHellmanSHA1ServerSession.fromMessage(message) - server_resp = server_sess.answer(self.secret) - server_resp['assoc_type'] = 'HMAC-SHA1' - server_resp['assoc_handle'] = 'handle' - server_resp['expires_in'] = '1000' - server_resp['session_type'] = 'DH-SHA1' - return sess, server_resp - - def test_success(self): - sess, server_resp = self._setUpDH() - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failIf(ret is None) - self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1') - self.failUnlessEqual(ret.secret, self.secret) - self.failUnlessEqual(ret.handle, 'handle') - self.failUnlessEqual(ret.lifetime, 1000) - - def test_openid2success(self): - # Use openid 2 type in endpoint so _setUpDH checks - # compatibility mode state properly - self.endpoint.type_uris = [OPENID_2_0_TYPE, OPENID_1_1_TYPE] - self.test_success() - - def test_badAssocType(self): - sess, server_resp = self._setUpDH() - server_resp['assoc_type'] = 'Crazy Low Prices!!!' - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failUnless(ret is None) - - def test_badExpiresIn(self): - sess, server_resp = self._setUpDH() - server_resp['expires_in'] = 'Crazy Low Prices!!!' - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failUnless(ret is None) - - def test_badSessionType(self): - sess, server_resp = self._setUpDH() - server_resp['session_type'] = '|/iA6rA' - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failUnless(ret is None) - - def test_plainFallback(self): - sess = DiffieHellmanSHA1ConsumerSession() - server_resp = { - 'assoc_type': 'HMAC-SHA1', - 'assoc_handle': 'handle', - 'expires_in': '1000', - 'mac_key': oidutil.toBase64(self.secret), - } - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failIf(ret is None) - self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1') - self.failUnlessEqual(ret.secret, self.secret) - self.failUnlessEqual(ret.handle, 'handle') - self.failUnlessEqual(ret.lifetime, 1000) - - def test_plainFallbackFailure(self): - sess = DiffieHellmanSHA1ConsumerSession() - # missing mac_key - server_resp = { - 'assoc_type': 'HMAC-SHA1', - 'assoc_handle': 'handle', - 'expires_in': '1000', - } - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failUnless(ret is None) - - def test_badDHValues(self): - sess, server_resp = self._setUpDH() - server_resp['enc_mac_key'] = '\x00\x00\x00' - ret = self.consumer._parseAssociation(server_resp, sess, 'server_url') - self.failUnless(ret is None) - class StubConsumer(object): def __init__(self): self.assoc = object() @@ -1460,47 +1368,5 @@ class TestCreateAssociationRequest(unittest.TestCase): }, args) # XXX: test the other types - -# XXX: NOT TO CHECK IN -class TestParseAssociation(unittest.TestCase): - def setUp(self): - self.args = { - } - self.msg = Message.fromOpenIDArgs() - -class TestOpenID1AssociationResponseSessionType(unittest.TestCase): - def mkTest(expected_session_type, session_type_value=None): - def test(self): - # Create a Message with just 'session_type' in it, since - # that's all this function will use. 'session_type' may be - # absent if it's set to None. - args = {} - if session_type_value is not None: - args['session_type'] = session_type_value - message = Message.fromOpenIDArgs(args) - self.failUnless(message.isOpenID1()) - - # Store should not be needed - consumer = GenericConsumer(store=None) - - actual_session_type = consumer._getOpenID1SessionType(message) - error_message = ('Returned sesion type parameter %r was expected ' - 'to yield session type %r, but yielded %r' % - (session_type_value, expected_session_type, - actual_session_type)) - self.failUnlessEqual( - expected_session_type, actual_session_type, error_message) - - test_none = mkTest('no-encryption', None) - test_empty = mkTest('no-encryption', '') - test_explicitNoEncryption = mkTest('no-encryption', 'no-encryption') - test_dhSHA1 = mkTest('DH-SHA1', 'DH-SHA1') - - # This is not a valid session type for OpenID1, but this function - # does not test that. This is mostly just to make sure that it - # will pass-through stuff that is not explicitly handled, so it - # will get handled the same way as it is handled for OpenID 2 - test_dhSHA256 = mkTest('DH-SHA256', 'DH-SHA256') - if __name__ == '__main__': unittest.main() |