summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--admin/runtests2
-rw-r--r--openid/consumer/consumer.py5
-rw-r--r--openid/message.py2
-rw-r--r--openid/test/test_association_response.py333
-rw-r--r--openid/test/test_consumer.py138
5 files changed, 340 insertions, 140 deletions
diff --git a/admin/runtests b/admin/runtests
index 610334d..4586566 100644
--- a/admin/runtests
+++ b/admin/runtests
@@ -61,6 +61,7 @@ def pyunitTests():
from openid.test import test_fetchers
from openid.test import test_urinorm
from openid.test import test_nonce
+ from openid.test import test_association_response
# yadis tests
from openid.test import test_parsehtml
from openid.test import test_yadis_discover
@@ -77,6 +78,7 @@ def pyunitTests():
test_etxrd,
test_xri,
test_xrires,
+ test_association_response,
]
# Some modules have data-driven tests, and they use custom methods
diff --git a/openid/consumer/consumer.py b/openid/consumer/consumer.py
index 5274bfd..d0ecb04 100644
--- a/openid/consumer/consumer.py
+++ b/openid/consumer/consumer.py
@@ -407,7 +407,8 @@ class PlainTextConsumerSession(object):
return {}
def extractSecret(self, response):
- return oidutil.fromBase64(response.getArg(OPENID_NS, 'mac_key'))
+ mac_key64 = response.getArg(OPENID_NS, 'mac_key', no_default)
+ return oidutil.fromBase64(mac_key64)
class UnsupportedAssocType(Exception):
"""Exception raised when the server tells us that the session type
@@ -858,8 +859,6 @@ class GenericConsumer(object):
# 'no-encryption' so that it can be handled in the same
# way as OpenID 2 'no-encryption' respones.
elif session_type == '' or session_type is None:
- oidutil.log('Falling back to no-encryption association '
- 'session from %s' % assoc_session.session_type)
session_type = 'no-encryption'
return session_type
diff --git a/openid/message.py b/openid/message.py
index 76477f5..7fd7583 100644
--- a/openid/message.py
+++ b/openid/message.py
@@ -378,7 +378,7 @@ class Message(object):
return self.args[args_key]
except KeyError:
if default is no_default:
- raise
+ raise KeyError((namespace, key))
else:
return default
diff --git a/openid/test/test_association_response.py b/openid/test/test_association_response.py
new file mode 100644
index 0000000..6b809c3
--- /dev/null
+++ b/openid/test/test_association_response.py
@@ -0,0 +1,333 @@
+"""Tests for consumer handling of association responses
+
+This duplicates some things that are covered by test_consumer, but
+this works for now.
+"""
+from openid import oidutil
+from openid.test.test_consumer import CatchLogs
+from openid.message import Message, OPENID2_NS, OPENID_NS
+from openid.server.server import DiffieHellmanSHA1ServerSession
+from openid.consumer.consumer import GenericConsumer, \
+ DiffieHellmanSHA1ConsumerSession
+from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_1_1_TYPE, OPENID_2_0_TYPE
+import _memstore
+import unittest
+
+# Some values we can use for convenience (see mkAssocResponse)
+association_response_values = {
+ 'expires_in': 'a time',
+ 'assoc_handle':'a handle',
+ 'assoc_type':'a type',
+ 'session_type':'a session type',
+ 'ns':OPENID2_NS,
+ }
+
+def mkAssocResponse(*keys):
+ """Build an association response message that contains the
+ specified subset of keys. The values come from
+ `association_response_values`.
+
+ This is useful for testing for missing keys and other times that
+ we don't care what the values are."""
+ args = dict([(key, association_response_values[key]) for key in keys])
+ return Message.fromOpenIDArgs(args)
+
+class BaseTestParseAssociationMissingFields(CatchLogs, unittest.TestCase):
+ """
+ According to 'Association Session Response' subsection 'Common
+ Response Parameters', the following fields are required for OpenID
+ 2.0:
+
+ * ns
+ * session_type
+ * assoc_handle
+ * assoc_type
+ * expires_in
+
+ If 'ns' is missing, it will fall back to OpenID 1 checking. In
+ OpenID 1, everything except 'session_type' and 'ns' are required.
+ """
+
+ def mkTest(keys):
+ """Make a test that ensures that an association response that
+ is missing required fields will short-circuit return None."""
+
+ def test(self):
+ msg = mkAssocResponse(*keys)
+
+ # Store should not be needed
+ consumer = GenericConsumer(store=None)
+
+ result = consumer._parseAssociation(msg, None, 'dummy.url')
+ self.failUnless(result is None)
+ self.failUnlessEqual(len(self.messages), 1)
+ self.failUnless(self.messages[0].startswith(
+ 'Getting association: missing key'))
+
+ return test
+
+ mkTest = staticmethod(mkTest)
+
+class TestParseAssociationMissingFieldsOpenID2(
+ BaseTestParseAssociationMissingFields):
+ """Test for returning an error upon missing fields in association
+ responses for OpenID 2"""
+ mkTest = BaseTestParseAssociationMissingFields.mkTest
+
+ test_noFields_openid2 = mkTest(['ns'])
+
+ test_missingExpires_openid2 = mkTest(
+ ['assoc_handle', 'assoc_type', 'session_type', 'ns'])
+
+ test_missingHandle_openid2 = mkTest(
+ ['expires_in', 'assoc_type', 'session_type', 'ns'])
+
+ test_missingAssocType_openid2 = mkTest(
+ ['expires_in', 'assoc_handle', 'session_type', 'ns'])
+
+ test_missingSessionType_openid2 = mkTest(
+ ['expires_in', 'assoc_handle', 'assoc_type', 'ns'])
+
+class TestParseAssociationMissingFieldsOpenID1(
+ BaseTestParseAssociationMissingFields):
+ """Test for returning an error upon missing fields in association
+ responses for OpenID 2"""
+ mkTest = BaseTestParseAssociationMissingFields.mkTest
+
+ test_noFields_openid1 = mkTest([])
+
+ test_missingExpires_openid1 = mkTest(['assoc_handle', 'assoc_type'])
+
+ test_missingHandle_openid1 = mkTest(['expires_in', 'assoc_type'])
+
+ test_missingAssocType_openid1 = mkTest(['expires_in', 'assoc_handle'])
+
+class DummyAssocationSession(object):
+ def __init__(self, session_type, allowed_assoc_types=()):
+ self.session_type = session_type
+ self.allowed_assoc_types = allowed_assoc_types
+
+class ParseAssociationSessionTypeMismatch(unittest.TestCase):
+ def mkTest(requested_session_type, response_session_type, openid1=False):
+ def test(self):
+ assoc_session = DummyAssocationSession(requested_session_type)
+ consumer = GenericConsumer(store=None)
+ keys = association_response_values.keys()
+ if openid1:
+ keys.remove('ns')
+ msg = mkAssocResponse(keys)
+ msg.setArg(OPENID_NS, 'session_type', response_session_type)
+ result = consumer._parseAssociation(
+ msg, assoc_session, server_url='dummy.url')
+ self.failUnless(result is None)
+
+ test_typeMismatch = mkTest(
+ requested_session_type='no-encryption',
+ response_session_type='',
+ )
+
+ test_typeMismatch = mkTest(
+ requested_session_type='DH-SHA1',
+ response_session_type='no-encryption',
+ )
+
+ test_typeMismatch = mkTest(
+ requested_session_type='DH-SHA256',
+ response_session_type='no-encryption',
+ )
+
+ test_typeMismatch = mkTest(
+ requested_session_type='no-encryption',
+ response_session_type='DH-SHA1',
+ )
+
+
+class TestOpenID1AssociationResponseSessionType(CatchLogs, unittest.TestCase):
+ def mkTest(expected_session_type, session_type_value):
+ """Return a test method that will check what session type will
+ be used if the OpenID 1 response to an associate call sets the
+ 'session_type' field to `session_type_value`
+ """
+ def test(self):
+ self._doTest(expected_session_type, session_type_value)
+ self.failUnlessEqual(0, len(self.messages))
+
+ return test
+
+ def _doTest(self, expected_session_type, session_type_value):
+ # Create a Message with just 'session_type' in it, since
+ # that's all this function will use. 'session_type' may be
+ # absent if it's set to None.
+ args = {}
+ if session_type_value is not None:
+ args['session_type'] = session_type_value
+ message = Message.fromOpenIDArgs(args)
+ self.failUnless(message.isOpenID1())
+
+ # Store should not be needed
+ consumer = GenericConsumer(store=None)
+
+ actual_session_type = consumer._getOpenID1SessionType(message)
+ error_message = ('Returned sesion type parameter %r was expected '
+ 'to yield session type %r, but yielded %r' %
+ (session_type_value, expected_session_type,
+ actual_session_type))
+ self.failUnlessEqual(
+ expected_session_type, actual_session_type, error_message)
+
+ test_none = mkTest(
+ session_type_value=None,
+ expected_session_type='no-encryption',
+ )
+
+ test_empty = mkTest(
+ session_type_value='',
+ expected_session_type='no-encryption',
+ )
+
+ # This one's different because it expects log messages
+ def test_explicitNoEncryption(self):
+ self._doTest(
+ session_type_value='no-encryption',
+ expected_session_type='no-encryption',
+ )
+ self.failUnlessEqual(1, len(self.messages))
+ self.failUnless(self.messages[0].startswith(
+ 'WARNING: OpenID server sent "no-encryption"'))
+
+ test_dhSHA1 = mkTest(
+ session_type_value='DH-SHA1',
+ expected_session_type='DH-SHA1',
+ )
+
+ # DH-SHA256 is not a valid session type for OpenID1, but this
+ # function does not test that. This is mostly just to make sure
+ # that it will pass-through stuff that is not explicitly handled,
+ # so it will get handled the same way as it is handled for OpenID
+ # 2
+ test_dhSHA256 = mkTest(
+ session_type_value='DH-SHA256',
+ expected_session_type='DH-SHA256',
+ )
+
+class TestAssocTypeInvalidForSession(CatchLogs, unittest.TestCase):
+ def _setup(self, assoc_type):
+ no_encryption_session = DummyAssocationSession('matching-session-type',
+ ['good-assoc-type'])
+ msg = mkAssocResponse(*association_response_values.keys())
+ msg.setArg(OPENID2_NS, 'session_type', 'matching-session-type')
+ msg.setArg(OPENID2_NS, 'assoc_type', assoc_type)
+
+ # Store should not be needed
+ consumer = GenericConsumer(store=None)
+
+ result = consumer._parseAssociation(
+ msg, no_encryption_session, 'dummy.url')
+
+
+ def test_badAssocType(self):
+ self._setup('unsupported')
+ self.failUnlessEqual(1, len(self.messages))
+ self.failUnless(self.messages[0].startswith(
+ 'Unsupported assoc_type for session'))
+
+ def test_badExpiresIn(self):
+ self._setup('good-assoc-type')
+ self.failUnlessEqual(1, len(self.messages))
+ self.failUnless(self.messages[0].startswith(
+ 'Getting Association: invalid expires_in'))
+
+
+# XXX: This is what causes most of the imports in this file. It is
+# sort of a unit test and sort of a functional test. I'm not terribly
+# fond of it.
+class TestParseAssociation(unittest.TestCase):
+ secret = 'x' * 20
+
+ def setUp(self):
+ self.store = _memstore.MemoryStore()
+ self.consumer = GenericConsumer(self.store)
+ self.endpoint = OpenIDServiceEndpoint()
+
+ def _setUpDH(self):
+ sess, args = self.consumer._createAssociateRequest(
+ self.endpoint, 'HMAC-SHA1', 'DH-SHA1')
+
+ assert self.endpoint.compatibilityMode() == \
+ (args.get('openid.ns') is None), \
+ "Endpoint compat mode %r != (openid.ns in args)" % \
+ (self.endpoint.compatibilityMode())
+
+ message = Message.fromPostArgs(args)
+ server_sess = DiffieHellmanSHA1ServerSession.fromMessage(message)
+ server_resp = server_sess.answer(self.secret)
+ server_resp['assoc_type'] = 'HMAC-SHA1'
+ server_resp['assoc_handle'] = 'handle'
+ server_resp['expires_in'] = '1000'
+ server_resp['session_type'] = 'DH-SHA1'
+ return sess, Message.fromOpenIDArgs(server_resp)
+
+ def test_success(self):
+ sess, server_resp = self._setUpDH()
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failIf(ret is None)
+ self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1')
+ self.failUnlessEqual(ret.secret, self.secret)
+ self.failUnlessEqual(ret.handle, 'handle')
+ self.failUnlessEqual(ret.lifetime, 1000)
+
+ def test_openid2success(self):
+ # Use openid 2 type in endpoint so _setUpDH checks
+ # compatibility mode state properly
+ self.endpoint.type_uris = [OPENID_2_0_TYPE, OPENID_1_1_TYPE]
+ self.test_success()
+
+ def test_badAssocType(self):
+ sess, server_resp = self._setUpDH()
+ server_resp.setArg(OPENID_NS, 'assoc_type', 'Crazy Low Prices!!!')
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failUnless(ret is None)
+
+ def test_badExpiresIn(self):
+ sess, server_resp = self._setUpDH()
+ server_resp.setArg(OPENID_NS, 'expires_in', 'Crazy Low Prices!!!')
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failUnless(ret is None)
+
+ def test_badSessionType(self):
+ sess, server_resp = self._setUpDH()
+ server_resp.setArg(OPENID_NS, 'session_type', '|/iA6rA')
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failUnless(ret is None)
+
+ def test_plainFallback(self):
+ sess = DiffieHellmanSHA1ConsumerSession()
+ server_resp = Message.fromOpenIDArgs({
+ 'assoc_type': 'HMAC-SHA1',
+ 'assoc_handle': 'handle',
+ 'expires_in': '1000',
+ 'mac_key': oidutil.toBase64(self.secret),
+ })
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failIf(ret is None)
+ self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1')
+ self.failUnlessEqual(ret.secret, self.secret)
+ self.failUnlessEqual(ret.handle, 'handle')
+ self.failUnlessEqual(ret.lifetime, 1000)
+
+ def test_plainFallbackFailure(self):
+ sess = DiffieHellmanSHA1ConsumerSession()
+ # missing mac_key
+ server_resp = Message.fromOpenIDArgs({
+ 'assoc_type': 'HMAC-SHA1',
+ 'assoc_handle': 'handle',
+ 'expires_in': '1000',
+ })
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failUnless(ret is None)
+
+ def test_badDHValues(self):
+ sess, server_resp = self._setUpDH()
+ server_resp.setArg(OPENID_NS, 'enc_mac_key', '\x00\x00\x00')
+ ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
+ self.failUnless(ret is None)
diff --git a/openid/test/test_consumer.py b/openid/test/test_consumer.py
index 34daa16..1256328 100644
--- a/openid/test/test_consumer.py
+++ b/openid/test/test_consumer.py
@@ -2,7 +2,8 @@ import urlparse
import cgi
import time
-from openid.message import Message, OPENID_NS, OPENID2_NS, IDENTIFIER_SELECT
+from openid.message import Message, OPENID_NS, OPENID2_NS, IDENTIFIER_SELECT, \
+ OPENID1_NS
from openid import cryptutil, dh, oidutil, kvform
from openid.store.nonce import mkNonce, split as splitNonce
from openid.consumer.discover import OpenIDServiceEndpoint, OPENID_2_0_TYPE, \
@@ -1028,99 +1029,6 @@ class TestSuccessResponse(unittest.TestCase):
resp = mkSuccess(self.endpoint, {'openid.return_to':'return_to'})
self.failUnlessEqual(resp.getReturnTo(), 'return_to')
-class TestParseAssociation(TestIdRes):
- secret = 'x' * 20
-
- def test_missing(self):
- # Missing required arguments
- result = self.consumer._parseAssociation({}, None, 'server_url')
- self.failUnless(result is None)
-
- def _setUpDH(self):
- sess, args = \
- self.consumer._createAssociateRequest(self.endpoint,
- 'HMAC-SHA1',
- 'DH-SHA1')
-
- assert self.endpoint.compatibilityMode() == \
- (args.get('openid.ns') is None), \
- "Endpoint compat mode %r != (openid.ns in args)" % \
- (self.endpoint.compatibilityMode())
-
- message = Message.fromPostArgs(args)
- server_sess = DiffieHellmanSHA1ServerSession.fromMessage(message)
- server_resp = server_sess.answer(self.secret)
- server_resp['assoc_type'] = 'HMAC-SHA1'
- server_resp['assoc_handle'] = 'handle'
- server_resp['expires_in'] = '1000'
- server_resp['session_type'] = 'DH-SHA1'
- return sess, server_resp
-
- def test_success(self):
- sess, server_resp = self._setUpDH()
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failIf(ret is None)
- self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1')
- self.failUnlessEqual(ret.secret, self.secret)
- self.failUnlessEqual(ret.handle, 'handle')
- self.failUnlessEqual(ret.lifetime, 1000)
-
- def test_openid2success(self):
- # Use openid 2 type in endpoint so _setUpDH checks
- # compatibility mode state properly
- self.endpoint.type_uris = [OPENID_2_0_TYPE, OPENID_1_1_TYPE]
- self.test_success()
-
- def test_badAssocType(self):
- sess, server_resp = self._setUpDH()
- server_resp['assoc_type'] = 'Crazy Low Prices!!!'
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failUnless(ret is None)
-
- def test_badExpiresIn(self):
- sess, server_resp = self._setUpDH()
- server_resp['expires_in'] = 'Crazy Low Prices!!!'
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failUnless(ret is None)
-
- def test_badSessionType(self):
- sess, server_resp = self._setUpDH()
- server_resp['session_type'] = '|/iA6rA'
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failUnless(ret is None)
-
- def test_plainFallback(self):
- sess = DiffieHellmanSHA1ConsumerSession()
- server_resp = {
- 'assoc_type': 'HMAC-SHA1',
- 'assoc_handle': 'handle',
- 'expires_in': '1000',
- 'mac_key': oidutil.toBase64(self.secret),
- }
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failIf(ret is None)
- self.failUnlessEqual(ret.assoc_type, 'HMAC-SHA1')
- self.failUnlessEqual(ret.secret, self.secret)
- self.failUnlessEqual(ret.handle, 'handle')
- self.failUnlessEqual(ret.lifetime, 1000)
-
- def test_plainFallbackFailure(self):
- sess = DiffieHellmanSHA1ConsumerSession()
- # missing mac_key
- server_resp = {
- 'assoc_type': 'HMAC-SHA1',
- 'assoc_handle': 'handle',
- 'expires_in': '1000',
- }
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failUnless(ret is None)
-
- def test_badDHValues(self):
- sess, server_resp = self._setUpDH()
- server_resp['enc_mac_key'] = '\x00\x00\x00'
- ret = self.consumer._parseAssociation(server_resp, sess, 'server_url')
- self.failUnless(ret is None)
-
class StubConsumer(object):
def __init__(self):
self.assoc = object()
@@ -1460,47 +1368,5 @@ class TestCreateAssociationRequest(unittest.TestCase):
}, args)
# XXX: test the other types
-
-# XXX: NOT TO CHECK IN
-class TestParseAssociation(unittest.TestCase):
- def setUp(self):
- self.args = {
- }
- self.msg = Message.fromOpenIDArgs()
-
-class TestOpenID1AssociationResponseSessionType(unittest.TestCase):
- def mkTest(expected_session_type, session_type_value=None):
- def test(self):
- # Create a Message with just 'session_type' in it, since
- # that's all this function will use. 'session_type' may be
- # absent if it's set to None.
- args = {}
- if session_type_value is not None:
- args['session_type'] = session_type_value
- message = Message.fromOpenIDArgs(args)
- self.failUnless(message.isOpenID1())
-
- # Store should not be needed
- consumer = GenericConsumer(store=None)
-
- actual_session_type = consumer._getOpenID1SessionType(message)
- error_message = ('Returned sesion type parameter %r was expected '
- 'to yield session type %r, but yielded %r' %
- (session_type_value, expected_session_type,
- actual_session_type))
- self.failUnlessEqual(
- expected_session_type, actual_session_type, error_message)
-
- test_none = mkTest('no-encryption', None)
- test_empty = mkTest('no-encryption', '')
- test_explicitNoEncryption = mkTest('no-encryption', 'no-encryption')
- test_dhSHA1 = mkTest('DH-SHA1', 'DH-SHA1')
-
- # This is not a valid session type for OpenID1, but this function
- # does not test that. This is mostly just to make sure that it
- # will pass-through stuff that is not explicitly handled, so it
- # will get handled the same way as it is handled for OpenID 2
- test_dhSHA256 = mkTest('DH-SHA256', 'DH-SHA256')
-
if __name__ == '__main__':
unittest.main()