diff options
author | Vlastimil Zíma <vlastimil.zima@nic.cz> | 2018-04-10 15:29:26 +0200 |
---|---|---|
committer | Vlastimil Zíma <vlastimil.zima@nic.cz> | 2018-04-10 15:29:26 +0200 |
commit | c99f3da49c6075a273aa6d3141ee6af5e6c6e655 (patch) | |
tree | c15fde7ca021747414e3c68e55dfdf1ed15a038b | |
parent | 54d0a90d9125c36202e70c7dd79c3d9411863f86 (diff) | |
download | openid-c99f3da49c6075a273aa6d3141ee6af5e6c6e655.tar.gz |
Deprecate pape drafts
-rw-r--r-- | openid/extensions/__init__.py | 2 | ||||
-rw-r--r-- | openid/extensions/draft/pape2.py | 4 | ||||
-rw-r--r-- | openid/extensions/draft/pape5.py | 460 | ||||
-rw-r--r-- | openid/extensions/pape.py | 473 | ||||
-rw-r--r-- | openid/test/test_pape.py | 390 | ||||
-rw-r--r-- | openid/test/test_pape_draft5.py | 392 |
6 files changed, 874 insertions, 847 deletions
diff --git a/openid/extensions/__init__.py b/openid/extensions/__init__.py index 710b200..5394e7a 100644 --- a/openid/extensions/__init__.py +++ b/openid/extensions/__init__.py @@ -1,5 +1,3 @@ """OpenID Extension modules.""" __all__ = ['ax', 'pape', 'sreg'] - -from openid.extensions.draft import pape5 as pape diff --git a/openid/extensions/draft/pape2.py b/openid/extensions/draft/pape2.py index d1790a8..a26ddfc 100644 --- a/openid/extensions/draft/pape2.py +++ b/openid/extensions/draft/pape2.py @@ -16,9 +16,13 @@ __all__ = [ ] import re +import warnings from openid.extension import Extension +warnings.warn("Module 'openid.extensions.draft.pape2' is deprecated. Use 'openid.extensions.pape' instead.", + DeprecationWarning) + ns_uri = "http://specs.openid.net/extensions/pape/1.0" AUTH_MULTI_FACTOR_PHYSICAL = \ diff --git a/openid/extensions/draft/pape5.py b/openid/extensions/draft/pape5.py index 7065500..47cf9b2 100644 --- a/openid/extensions/draft/pape5.py +++ b/openid/extensions/draft/pape5.py @@ -5,6 +5,10 @@ Extension 1.0, Draft 5 @since: 2.1.0 """ +import warnings + +from openid.extensions.pape import (AUTH_MULTI_FACTOR, AUTH_MULTI_FACTOR_PHYSICAL, AUTH_PHISHING_RESISTANT, LEVELS_JISA, + LEVELS_NIST, Request, Response, ns_uri) __all__ = [ 'Request', @@ -17,457 +21,5 @@ __all__ = [ 'LEVELS_JISA', ] -import re -import warnings - -from openid.extension import Extension - -ns_uri = "http://specs.openid.net/extensions/pape/1.0" - -AUTH_MULTI_FACTOR_PHYSICAL = \ - 'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical' -AUTH_MULTI_FACTOR = \ - 'http://schemas.openid.net/pape/policies/2007/06/multi-factor' -AUTH_PHISHING_RESISTANT = \ - 'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant' -AUTH_NONE = \ - 'http://schemas.openid.net/pape/policies/2007/06/none' - -TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$') - -LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf' -LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html' - - -class PAPEExtension(Extension): - _default_auth_level_aliases = { - 'nist': LEVELS_NIST, - 'jisa': LEVELS_JISA, - } - - def __init__(self): - self.auth_level_aliases = self._default_auth_level_aliases.copy() - - def _addAuthLevelAlias(self, auth_level_uri, alias=None): - """Add an auth level URI alias to this request. - - @param auth_level_uri: The auth level URI to send in the - request. - - @param alias: The namespace alias to use for this auth level - in this message. May be None if the alias is not - important. - """ - if alias is None: - try: - alias = self._getAlias(auth_level_uri) - except KeyError: - alias = self._generateAlias() - else: - existing_uri = self.auth_level_aliases.get(alias) - if existing_uri is not None and existing_uri != auth_level_uri: - raise KeyError('Attempting to redefine alias %r from %r to %r', - alias, existing_uri, auth_level_uri) - - self.auth_level_aliases[alias] = auth_level_uri - - def _generateAlias(self): - """Return an unused auth level alias""" - for i in xrange(1000): - alias = 'cust%d' % (i,) - if alias not in self.auth_level_aliases: - return alias - - raise RuntimeError('Could not find an unused alias (tried 1000!)') - - def _getAlias(self, auth_level_uri): - """Return the alias for the specified auth level URI. - - @raises KeyError: if no alias is defined - """ - for (alias, existing_uri) in self.auth_level_aliases.iteritems(): - if auth_level_uri == existing_uri: - return alias - - raise KeyError(auth_level_uri) - - -class Request(PAPEExtension): - """A Provider Authentication Policy request, sent from a relying - party to a provider - - @ivar preferred_auth_policies: The authentication policies that - the relying party prefers - @type preferred_auth_policies: [str] - - @ivar max_auth_age: The maximum time, in seconds, that the relying - party wants to allow to have elapsed before the user must - re-authenticate - @type max_auth_age: int or NoneType - - @ivar preferred_auth_level_types: Ordered list of authentication - level namespace URIs - - @type preferred_auth_level_types: [str] - """ - - ns_alias = 'pape' - - def __init__(self, preferred_auth_policies=None, max_auth_age=None, - preferred_auth_level_types=None): - super(Request, self).__init__() - if preferred_auth_policies is None: - preferred_auth_policies = [] - - self.preferred_auth_policies = preferred_auth_policies - self.max_auth_age = max_auth_age - self.preferred_auth_level_types = [] - - if preferred_auth_level_types is not None: - for auth_level in preferred_auth_level_types: - self.addAuthLevel(auth_level) - - def __nonzero__(self): - return bool(self.preferred_auth_policies or - self.max_auth_age is not None or - self.preferred_auth_level_types) - - def addPolicyURI(self, policy_uri): - """Add an acceptable authentication policy URI to this request - - This method is intended to be used by the relying party to add - acceptable authentication types to the request. - - @param policy_uri: The identifier for the preferred type of - authentication. - @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies - """ - if policy_uri not in self.preferred_auth_policies: - self.preferred_auth_policies.append(policy_uri) - - def addAuthLevel(self, auth_level_uri, alias=None): - self._addAuthLevelAlias(auth_level_uri, alias) - if auth_level_uri not in self.preferred_auth_level_types: - self.preferred_auth_level_types.append(auth_level_uri) - - def getExtensionArgs(self): - """@see: C{L{Extension.getExtensionArgs}} - """ - ns_args = { - 'preferred_auth_policies': ' '.join(self.preferred_auth_policies), - } - - if self.max_auth_age is not None: - ns_args['max_auth_age'] = str(self.max_auth_age) - - if self.preferred_auth_level_types: - preferred_types = [] - - for auth_level_uri in self.preferred_auth_level_types: - alias = self._getAlias(auth_level_uri) - ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri - preferred_types.append(alias) - - ns_args['preferred_auth_level_types'] = ' '.join(preferred_types) - - return ns_args - - @classmethod - def fromOpenIDRequest(cls, request): - """Instantiate a Request object from the arguments in a - C{checkid_*} OpenID message - """ - self = cls() - args = request.message.getArgs(self.ns_uri) - is_openid1 = request.message.isOpenID1() - - if args == {}: - return None - - self.parseExtensionArgs(args, is_openid1) - return self - - def parseExtensionArgs(self, args, is_openid1, strict=False): - """Set the state of this request to be that expressed in these - PAPE arguments - - @param args: The PAPE arguments without a namespace - - @param strict: Whether to raise an exception if the input is - out of spec or otherwise malformed. If strict is false, - malformed input will be ignored. - - @param is_openid1: Whether the input should be treated as part - of an OpenID1 request - - @rtype: None - - @raises ValueError: When the max_auth_age is not parseable as - an integer - """ - - # preferred_auth_policies is a space-separated list of policy URIs - self.preferred_auth_policies = [] - - policies_str = args.get('preferred_auth_policies') - if policies_str: - for uri in policies_str.split(' '): - if uri not in self.preferred_auth_policies: - self.preferred_auth_policies.append(uri) - - # max_auth_age is base-10 integer number of seconds - max_auth_age_str = args.get('max_auth_age') - self.max_auth_age = None - - if max_auth_age_str: - try: - self.max_auth_age = int(max_auth_age_str) - except ValueError: - if strict: - raise - - # Parse auth level information - preferred_auth_level_types = args.get('preferred_auth_level_types') - if preferred_auth_level_types: - aliases = preferred_auth_level_types.strip().split() - - for alias in aliases: - key = 'auth_level.ns.%s' % (alias,) - try: - uri = args[key] - except KeyError: - if is_openid1: - uri = self._default_auth_level_aliases.get(alias) - else: - uri = None - - if uri is None: - if strict: - raise ValueError('preferred auth level %r is not ' - 'defined in this message' % (alias,)) - else: - self.addAuthLevel(uri, alias) - - def preferredTypes(self, supported_types): - """Given a list of authentication policy URIs that a provider - supports, this method returns the subsequence of those types - that are preferred by the relying party. - - @param supported_types: A sequence of authentication policy - type URIs that are supported by a provider - - @returns: The sub-sequence of the supported types that are - preferred by the relying party. This list will be ordered - in the order that the types appear in the supported_types - sequence, and may be empty if the provider does not prefer - any of the supported authentication types. - - @returntype: [str] - """ - return [i for i in supported_types if i in self.preferred_auth_policies] - - -Request.ns_uri = ns_uri - - -class Response(PAPEExtension): - """A Provider Authentication Policy response, sent from a provider - to a relying party - - @ivar auth_policies: List of authentication policies conformed to - by this OpenID assertion, represented as policy URIs - """ - - ns_alias = 'pape' - - def __init__(self, auth_policies=None, auth_time=None, - auth_levels=None): - super(Response, self).__init__() - if auth_policies: - self.auth_policies = auth_policies - else: - self.auth_policies = [] - - self.auth_time = auth_time - self.auth_levels = {} - - if auth_levels is None: - auth_levels = {} - - for uri, level in auth_levels.iteritems(): - self.setAuthLevel(uri, level) - - def setAuthLevel(self, level_uri, level, alias=None): - """Set the value for the given auth level type. - - @param level: string representation of an authentication level - valid for level_uri - - @param alias: An optional namespace alias for the given auth - level URI. May be omitted if the alias is not - significant. The library will use a reasonable default for - widely-used auth level types. - """ - self._addAuthLevelAlias(level_uri, alias) - self.auth_levels[level_uri] = level - - def getAuthLevel(self, level_uri): - """Return the auth level for the specified auth level - identifier - - @returns: A string that should map to the auth levels defined - for the auth level type - - @raises KeyError: If the auth level type is not present in - this message - """ - return self.auth_levels[level_uri] - - @property - def nist_auth_level(self): - """Backward-compatibility accessor for the NIST auth level.""" - try: - return int(self.getAuthLevel(LEVELS_NIST)) - except KeyError: - return None - - def addPolicyURI(self, policy_uri): - """Add a authentication policy to this response - - This method is intended to be used by the provider to add a - policy that the provider conformed to when authenticating the user. - - @param policy_uri: The identifier for the preferred type of - authentication. - @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies - """ - if policy_uri == AUTH_NONE: - raise RuntimeError( - 'To send no policies, do not set any on the response.') - - if policy_uri not in self.auth_policies: - self.auth_policies.append(policy_uri) - - @classmethod - def fromSuccessResponse(cls, success_response): - """Create a C{L{Response}} object from a successful OpenID - library response - (C{L{openid.consumer.consumer.SuccessResponse}}) response - message - - @param success_response: A SuccessResponse from consumer.complete() - @type success_response: C{L{openid.consumer.consumer.SuccessResponse}} - - @rtype: Response or None - @returns: A provider authentication policy response from the - data that was supplied with the C{id_res} response or None - if the provider sent no signed PAPE response arguments. - """ - self = cls() - - # PAPE requires that the args be signed. - args = success_response.getSignedNS(self.ns_uri) - is_openid1 = success_response.isOpenID1() - - # Only try to construct a PAPE response if the arguments were - # signed in the OpenID response. If not, return None. - if args is not None: - self.parseExtensionArgs(args, is_openid1) - return self - else: - return None - - def parseExtensionArgs(self, args, is_openid1, strict=False): - """Parse the provider authentication policy arguments into the - internal state of this object - - @param args: unqualified provider authentication policy - arguments - - @param strict: Whether to raise an exception when bad data is - encountered - - @returns: None. The data is parsed into the internal fields of - this object. - """ - policies_str = args.get('auth_policies') - if policies_str: - auth_policies = policies_str.split(' ') - elif strict: - raise ValueError('Missing auth_policies') - else: - auth_policies = [] - - if (len(auth_policies) > 1 and strict and AUTH_NONE in auth_policies): - raise ValueError('Got some auth policies, as well as the special ' - '"none" URI: %r' % (auth_policies,)) - - if 'none' in auth_policies: - msg = '"none" used as a policy URI (see PAPE draft < 5)' - if strict: - raise ValueError(msg) - else: - warnings.warn(msg, stacklevel=2) - - auth_policies = [u for u in auth_policies - if u not in ['none', AUTH_NONE]] - - self.auth_policies = auth_policies - - for (key, val) in args.iteritems(): - if key.startswith('auth_level.'): - alias = key[11:] - - # skip the already-processed namespace declarations - if alias.startswith('ns.'): - continue - - try: - uri = args['auth_level.ns.%s' % (alias,)] - except KeyError: - if is_openid1: - uri = self._default_auth_level_aliases.get(alias) - else: - uri = None - - if uri is None: - if strict: - raise ValueError( - 'Undefined auth level alias: %r' % (alias,)) - else: - self.setAuthLevel(uri, val, alias) - - auth_time = args.get('auth_time') - if auth_time: - if TIME_VALIDATOR.match(auth_time): - self.auth_time = auth_time - elif strict: - raise ValueError("auth_time must be in RFC3339 format") - - def getExtensionArgs(self): - """@see: C{L{Extension.getExtensionArgs}} - """ - if len(self.auth_policies) == 0: - ns_args = { - 'auth_policies': AUTH_NONE, - } - else: - ns_args = { - 'auth_policies': ' '.join(self.auth_policies), - } - - for level_type, level in self.auth_levels.iteritems(): - alias = self._getAlias(level_type) - ns_args['auth_level.ns.%s' % (alias,)] = level_type - ns_args['auth_level.%s' % (alias,)] = str(level) - - if self.auth_time is not None: - if not TIME_VALIDATOR.match(self.auth_time): - raise ValueError('auth_time must be in RFC3339 format') - - ns_args['auth_time'] = self.auth_time - - return ns_args - - -Response.ns_uri = ns_uri +warnings.warn("Module 'openid.extensions.draft.pape5' is deprecated in favor of 'openid.extensions.pape'.", + DeprecationWarning) diff --git a/openid/extensions/pape.py b/openid/extensions/pape.py new file mode 100644 index 0000000..7065500 --- /dev/null +++ b/openid/extensions/pape.py @@ -0,0 +1,473 @@ +"""An implementation of the OpenID Provider Authentication Policy +Extension 1.0, Draft 5 + +@see: http://openid.net/developers/specs/ + +@since: 2.1.0 +""" + +__all__ = [ + 'Request', + 'Response', + 'ns_uri', + 'AUTH_PHISHING_RESISTANT', + 'AUTH_MULTI_FACTOR', + 'AUTH_MULTI_FACTOR_PHYSICAL', + 'LEVELS_NIST', + 'LEVELS_JISA', +] + +import re +import warnings + +from openid.extension import Extension + +ns_uri = "http://specs.openid.net/extensions/pape/1.0" + +AUTH_MULTI_FACTOR_PHYSICAL = \ + 'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical' +AUTH_MULTI_FACTOR = \ + 'http://schemas.openid.net/pape/policies/2007/06/multi-factor' +AUTH_PHISHING_RESISTANT = \ + 'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant' +AUTH_NONE = \ + 'http://schemas.openid.net/pape/policies/2007/06/none' + +TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$') + +LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf' +LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html' + + +class PAPEExtension(Extension): + _default_auth_level_aliases = { + 'nist': LEVELS_NIST, + 'jisa': LEVELS_JISA, + } + + def __init__(self): + self.auth_level_aliases = self._default_auth_level_aliases.copy() + + def _addAuthLevelAlias(self, auth_level_uri, alias=None): + """Add an auth level URI alias to this request. + + @param auth_level_uri: The auth level URI to send in the + request. + + @param alias: The namespace alias to use for this auth level + in this message. May be None if the alias is not + important. + """ + if alias is None: + try: + alias = self._getAlias(auth_level_uri) + except KeyError: + alias = self._generateAlias() + else: + existing_uri = self.auth_level_aliases.get(alias) + if existing_uri is not None and existing_uri != auth_level_uri: + raise KeyError('Attempting to redefine alias %r from %r to %r', + alias, existing_uri, auth_level_uri) + + self.auth_level_aliases[alias] = auth_level_uri + + def _generateAlias(self): + """Return an unused auth level alias""" + for i in xrange(1000): + alias = 'cust%d' % (i,) + if alias not in self.auth_level_aliases: + return alias + + raise RuntimeError('Could not find an unused alias (tried 1000!)') + + def _getAlias(self, auth_level_uri): + """Return the alias for the specified auth level URI. + + @raises KeyError: if no alias is defined + """ + for (alias, existing_uri) in self.auth_level_aliases.iteritems(): + if auth_level_uri == existing_uri: + return alias + + raise KeyError(auth_level_uri) + + +class Request(PAPEExtension): + """A Provider Authentication Policy request, sent from a relying + party to a provider + + @ivar preferred_auth_policies: The authentication policies that + the relying party prefers + @type preferred_auth_policies: [str] + + @ivar max_auth_age: The maximum time, in seconds, that the relying + party wants to allow to have elapsed before the user must + re-authenticate + @type max_auth_age: int or NoneType + + @ivar preferred_auth_level_types: Ordered list of authentication + level namespace URIs + + @type preferred_auth_level_types: [str] + """ + + ns_alias = 'pape' + + def __init__(self, preferred_auth_policies=None, max_auth_age=None, + preferred_auth_level_types=None): + super(Request, self).__init__() + if preferred_auth_policies is None: + preferred_auth_policies = [] + + self.preferred_auth_policies = preferred_auth_policies + self.max_auth_age = max_auth_age + self.preferred_auth_level_types = [] + + if preferred_auth_level_types is not None: + for auth_level in preferred_auth_level_types: + self.addAuthLevel(auth_level) + + def __nonzero__(self): + return bool(self.preferred_auth_policies or + self.max_auth_age is not None or + self.preferred_auth_level_types) + + def addPolicyURI(self, policy_uri): + """Add an acceptable authentication policy URI to this request + + This method is intended to be used by the relying party to add + acceptable authentication types to the request. + + @param policy_uri: The identifier for the preferred type of + authentication. + @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies + """ + if policy_uri not in self.preferred_auth_policies: + self.preferred_auth_policies.append(policy_uri) + + def addAuthLevel(self, auth_level_uri, alias=None): + self._addAuthLevelAlias(auth_level_uri, alias) + if auth_level_uri not in self.preferred_auth_level_types: + self.preferred_auth_level_types.append(auth_level_uri) + + def getExtensionArgs(self): + """@see: C{L{Extension.getExtensionArgs}} + """ + ns_args = { + 'preferred_auth_policies': ' '.join(self.preferred_auth_policies), + } + + if self.max_auth_age is not None: + ns_args['max_auth_age'] = str(self.max_auth_age) + + if self.preferred_auth_level_types: + preferred_types = [] + + for auth_level_uri in self.preferred_auth_level_types: + alias = self._getAlias(auth_level_uri) + ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri + preferred_types.append(alias) + + ns_args['preferred_auth_level_types'] = ' '.join(preferred_types) + + return ns_args + + @classmethod + def fromOpenIDRequest(cls, request): + """Instantiate a Request object from the arguments in a + C{checkid_*} OpenID message + """ + self = cls() + args = request.message.getArgs(self.ns_uri) + is_openid1 = request.message.isOpenID1() + + if args == {}: + return None + + self.parseExtensionArgs(args, is_openid1) + return self + + def parseExtensionArgs(self, args, is_openid1, strict=False): + """Set the state of this request to be that expressed in these + PAPE arguments + + @param args: The PAPE arguments without a namespace + + @param strict: Whether to raise an exception if the input is + out of spec or otherwise malformed. If strict is false, + malformed input will be ignored. + + @param is_openid1: Whether the input should be treated as part + of an OpenID1 request + + @rtype: None + + @raises ValueError: When the max_auth_age is not parseable as + an integer + """ + + # preferred_auth_policies is a space-separated list of policy URIs + self.preferred_auth_policies = [] + + policies_str = args.get('preferred_auth_policies') + if policies_str: + for uri in policies_str.split(' '): + if uri not in self.preferred_auth_policies: + self.preferred_auth_policies.append(uri) + + # max_auth_age is base-10 integer number of seconds + max_auth_age_str = args.get('max_auth_age') + self.max_auth_age = None + + if max_auth_age_str: + try: + self.max_auth_age = int(max_auth_age_str) + except ValueError: + if strict: + raise + + # Parse auth level information + preferred_auth_level_types = args.get('preferred_auth_level_types') + if preferred_auth_level_types: + aliases = preferred_auth_level_types.strip().split() + + for alias in aliases: + key = 'auth_level.ns.%s' % (alias,) + try: + uri = args[key] + except KeyError: + if is_openid1: + uri = self._default_auth_level_aliases.get(alias) + else: + uri = None + + if uri is None: + if strict: + raise ValueError('preferred auth level %r is not ' + 'defined in this message' % (alias,)) + else: + self.addAuthLevel(uri, alias) + + def preferredTypes(self, supported_types): + """Given a list of authentication policy URIs that a provider + supports, this method returns the subsequence of those types + that are preferred by the relying party. + + @param supported_types: A sequence of authentication policy + type URIs that are supported by a provider + + @returns: The sub-sequence of the supported types that are + preferred by the relying party. This list will be ordered + in the order that the types appear in the supported_types + sequence, and may be empty if the provider does not prefer + any of the supported authentication types. + + @returntype: [str] + """ + return [i for i in supported_types if i in self.preferred_auth_policies] + + +Request.ns_uri = ns_uri + + +class Response(PAPEExtension): + """A Provider Authentication Policy response, sent from a provider + to a relying party + + @ivar auth_policies: List of authentication policies conformed to + by this OpenID assertion, represented as policy URIs + """ + + ns_alias = 'pape' + + def __init__(self, auth_policies=None, auth_time=None, + auth_levels=None): + super(Response, self).__init__() + if auth_policies: + self.auth_policies = auth_policies + else: + self.auth_policies = [] + + self.auth_time = auth_time + self.auth_levels = {} + + if auth_levels is None: + auth_levels = {} + + for uri, level in auth_levels.iteritems(): + self.setAuthLevel(uri, level) + + def setAuthLevel(self, level_uri, level, alias=None): + """Set the value for the given auth level type. + + @param level: string representation of an authentication level + valid for level_uri + + @param alias: An optional namespace alias for the given auth + level URI. May be omitted if the alias is not + significant. The library will use a reasonable default for + widely-used auth level types. + """ + self._addAuthLevelAlias(level_uri, alias) + self.auth_levels[level_uri] = level + + def getAuthLevel(self, level_uri): + """Return the auth level for the specified auth level + identifier + + @returns: A string that should map to the auth levels defined + for the auth level type + + @raises KeyError: If the auth level type is not present in + this message + """ + return self.auth_levels[level_uri] + + @property + def nist_auth_level(self): + """Backward-compatibility accessor for the NIST auth level.""" + try: + return int(self.getAuthLevel(LEVELS_NIST)) + except KeyError: + return None + + def addPolicyURI(self, policy_uri): + """Add a authentication policy to this response + + This method is intended to be used by the provider to add a + policy that the provider conformed to when authenticating the user. + + @param policy_uri: The identifier for the preferred type of + authentication. + @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies + """ + if policy_uri == AUTH_NONE: + raise RuntimeError( + 'To send no policies, do not set any on the response.') + + if policy_uri not in self.auth_policies: + self.auth_policies.append(policy_uri) + + @classmethod + def fromSuccessResponse(cls, success_response): + """Create a C{L{Response}} object from a successful OpenID + library response + (C{L{openid.consumer.consumer.SuccessResponse}}) response + message + + @param success_response: A SuccessResponse from consumer.complete() + @type success_response: C{L{openid.consumer.consumer.SuccessResponse}} + + @rtype: Response or None + @returns: A provider authentication policy response from the + data that was supplied with the C{id_res} response or None + if the provider sent no signed PAPE response arguments. + """ + self = cls() + + # PAPE requires that the args be signed. + args = success_response.getSignedNS(self.ns_uri) + is_openid1 = success_response.isOpenID1() + + # Only try to construct a PAPE response if the arguments were + # signed in the OpenID response. If not, return None. + if args is not None: + self.parseExtensionArgs(args, is_openid1) + return self + else: + return None + + def parseExtensionArgs(self, args, is_openid1, strict=False): + """Parse the provider authentication policy arguments into the + internal state of this object + + @param args: unqualified provider authentication policy + arguments + + @param strict: Whether to raise an exception when bad data is + encountered + + @returns: None. The data is parsed into the internal fields of + this object. + """ + policies_str = args.get('auth_policies') + if policies_str: + auth_policies = policies_str.split(' ') + elif strict: + raise ValueError('Missing auth_policies') + else: + auth_policies = [] + + if (len(auth_policies) > 1 and strict and AUTH_NONE in auth_policies): + raise ValueError('Got some auth policies, as well as the special ' + '"none" URI: %r' % (auth_policies,)) + + if 'none' in auth_policies: + msg = '"none" used as a policy URI (see PAPE draft < 5)' + if strict: + raise ValueError(msg) + else: + warnings.warn(msg, stacklevel=2) + + auth_policies = [u for u in auth_policies + if u not in ['none', AUTH_NONE]] + + self.auth_policies = auth_policies + + for (key, val) in args.iteritems(): + if key.startswith('auth_level.'): + alias = key[11:] + + # skip the already-processed namespace declarations + if alias.startswith('ns.'): + continue + + try: + uri = args['auth_level.ns.%s' % (alias,)] + except KeyError: + if is_openid1: + uri = self._default_auth_level_aliases.get(alias) + else: + uri = None + + if uri is None: + if strict: + raise ValueError( + 'Undefined auth level alias: %r' % (alias,)) + else: + self.setAuthLevel(uri, val, alias) + + auth_time = args.get('auth_time') + if auth_time: + if TIME_VALIDATOR.match(auth_time): + self.auth_time = auth_time + elif strict: + raise ValueError("auth_time must be in RFC3339 format") + + def getExtensionArgs(self): + """@see: C{L{Extension.getExtensionArgs}} + """ + if len(self.auth_policies) == 0: + ns_args = { + 'auth_policies': AUTH_NONE, + } + else: + ns_args = { + 'auth_policies': ' '.join(self.auth_policies), + } + + for level_type, level in self.auth_levels.iteritems(): + alias = self._getAlias(level_type) + ns_args['auth_level.ns.%s' % (alias,)] = level_type + ns_args['auth_level.%s' % (alias,)] = str(level) + + if self.auth_time is not None: + if not TIME_VALIDATOR.match(self.auth_time): + raise ValueError('auth_time must be in RFC3339 format') + + ns_args['auth_time'] = self.auth_time + + return ns_args + + +Response.ns_uri = ns_uri diff --git a/openid/test/test_pape.py b/openid/test/test_pape.py index 0507b2c..056fb89 100644 --- a/openid/test/test_pape.py +++ b/openid/test/test_pape.py @@ -1,10 +1,390 @@ - import unittest +import warnings from openid.extensions import pape +from openid.message import OPENID2_NS, Message +from openid.server import server + +warnings.filterwarnings('ignore', module=__name__, + message='"none" used as a policy URI') + + +class PapeRequestTestCase(unittest.TestCase): + def setUp(self): + self.req = pape.Request() + + def test_construct(self): + self.assertEqual(self.req.preferred_auth_policies, []) + self.assertIsNone(self.req.max_auth_age) + self.assertEqual(self.req.ns_alias, 'pape') + self.assertFalse(self.req.preferred_auth_level_types) + + bogus_levels = ['http://janrain.com/our_levels'] + req2 = pape.Request( + [pape.AUTH_MULTI_FACTOR], 1000, bogus_levels) + self.assertEqual(req2.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR]) + self.assertEqual(req2.max_auth_age, 1000) + self.assertEqual(req2.preferred_auth_level_types, bogus_levels) + + def test_addAuthLevel(self): + self.req.addAuthLevel('http://example.com/', 'example') + self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/']) + self.assertEqual(self.req.auth_level_aliases['example'], 'http://example.com/') + + self.req.addAuthLevel('http://example.com/1', 'example1') + self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/', 'http://example.com/1']) + + self.req.addAuthLevel('http://example.com/', 'exmpl') + self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/', 'http://example.com/1']) + + self.req.addAuthLevel('http://example.com/', 'example') + self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/', 'http://example.com/1']) + + self.assertRaises(KeyError, self.req.addAuthLevel, 'http://example.com/2', 'example') + + # alias is None; we expect a new one to be generated. + uri = 'http://another.example.com/' + self.req.addAuthLevel(uri) + self.assert_(uri in self.req.auth_level_aliases.values()) + + # We don't expect a new alias to be generated if one already + # exists. + before_aliases = self.req.auth_level_aliases.keys() + self.req.addAuthLevel(uri) + after_aliases = self.req.auth_level_aliases.keys() + self.assertEqual(after_aliases, before_aliases) + + def test_add_policy_uri(self): + self.assertEqual(self.req.preferred_auth_policies, []) + self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) + self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR]) + self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) + self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR]) + self.req.addPolicyURI(pape.AUTH_PHISHING_RESISTANT) + self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) + self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) + self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) + + def test_getExtensionArgs(self): + self.assertEqual(self.req.getExtensionArgs(), {'preferred_auth_policies': ''}) + self.req.addPolicyURI('http://uri') + self.assertEqual(self.req.getExtensionArgs(), {'preferred_auth_policies': 'http://uri'}) + self.req.addPolicyURI('http://zig') + self.assertEqual(self.req.getExtensionArgs(), {'preferred_auth_policies': 'http://uri http://zig'}) + self.req.max_auth_age = 789 + self.assertEqual(self.req.getExtensionArgs(), + {'preferred_auth_policies': 'http://uri http://zig', 'max_auth_age': '789'}) + + def test_getExtensionArgsWithAuthLevels(self): + uri = 'http://example.com/auth_level' + alias = 'my_level' + self.req.addAuthLevel(uri, alias) + + uri2 = 'http://example.com/auth_level_2' + alias2 = 'my_level_2' + self.req.addAuthLevel(uri2, alias2) + + expected_args = { + ('auth_level.ns.%s' % alias): uri, + ('auth_level.ns.%s' % alias2): uri2, + 'preferred_auth_level_types': ' '.join([alias, alias2]), + 'preferred_auth_policies': '', + } + + self.assertEqual(self.req.getExtensionArgs(), expected_args) + + def test_parseExtensionArgsWithAuthLevels(self): + uri = 'http://example.com/auth_level' + alias = 'my_level' + + uri2 = 'http://example.com/auth_level_2' + alias2 = 'my_level_2' + + request_args = { + ('auth_level.ns.%s' % alias): uri, + ('auth_level.ns.%s' % alias2): uri2, + 'preferred_auth_level_types': ' '.join([alias, alias2]), + 'preferred_auth_policies': '', + } + + # Check request object state + self.req.parseExtensionArgs(request_args, is_openid1=False, strict=False) + + expected_auth_levels = [uri, uri2] + + self.assertEqual(self.req.preferred_auth_level_types, expected_auth_levels) + self.assertEqual(self.req.auth_level_aliases[alias], uri) + self.assertEqual(self.req.auth_level_aliases[alias2], uri2) + + def test_parseExtensionArgsWithAuthLevels_openID1(self): + request_args = { + 'preferred_auth_level_types': 'nist jisa', + } + expected_auth_levels = [pape.LEVELS_NIST, pape.LEVELS_JISA] + self.req.parseExtensionArgs(request_args, is_openid1=True) + self.assertEqual(self.req.preferred_auth_level_types, expected_auth_levels) + + self.req = pape.Request() + self.req.parseExtensionArgs(request_args, is_openid1=False) + self.assertEqual(self.req.preferred_auth_level_types, []) + + self.req = pape.Request() + self.assertRaises(ValueError, self.req.parseExtensionArgs, request_args, is_openid1=False, strict=True) + + def test_parseExtensionArgs_ignoreBadAuthLevels(self): + request_args = {'preferred_auth_level_types': 'monkeys'} + self.req.parseExtensionArgs(request_args, False) + self.assertEqual(self.req.preferred_auth_level_types, []) + + def test_parseExtensionArgs_strictBadAuthLevels(self): + request_args = {'preferred_auth_level_types': 'monkeys'} + self.assertRaises(ValueError, self.req.parseExtensionArgs, request_args, is_openid1=False, strict=True) + + def test_parseExtensionArgs(self): + args = {'preferred_auth_policies': 'http://foo http://bar', + 'max_auth_age': '9'} + self.req.parseExtensionArgs(args, False) + self.assertEqual(self.req.max_auth_age, 9) + self.assertEqual(self.req.preferred_auth_policies, ['http://foo', 'http://bar']) + self.assertEqual(self.req.preferred_auth_level_types, []) + + def test_parseExtensionArgs_strict_bad_auth_age(self): + args = {'max_auth_age': 'not an int'} + self.assertRaises(ValueError, self.req.parseExtensionArgs, args, is_openid1=False, strict=True) + + def test_parseExtensionArgs_empty(self): + self.req.parseExtensionArgs({}, False) + self.assertIsNone(self.req.max_auth_age) + self.assertEqual(self.req.preferred_auth_policies, []) + self.assertEqual(self.req.preferred_auth_level_types, []) + + def test_fromOpenIDRequest(self): + policy_uris = [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT] + openid_req_msg = Message.fromOpenIDArgs({ + 'mode': 'checkid_setup', + 'ns': OPENID2_NS, + 'ns.pape': pape.ns_uri, + 'pape.preferred_auth_policies': ' '.join(policy_uris), + 'pape.max_auth_age': '5476' + }) + oid_req = server.OpenIDRequest() + oid_req.message = openid_req_msg + req = pape.Request.fromOpenIDRequest(oid_req) + self.assertEqual(req.preferred_auth_policies, policy_uris) + self.assertEqual(req.max_auth_age, 5476) + + def test_fromOpenIDRequest_no_pape(self): + message = Message() + openid_req = server.OpenIDRequest() + openid_req.message = message + pape_req = pape.Request.fromOpenIDRequest(openid_req) + assert(pape_req is None) + + def test_preferred_types(self): + self.req.addPolicyURI(pape.AUTH_PHISHING_RESISTANT) + self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) + pt = self.req.preferredTypes([pape.AUTH_MULTI_FACTOR, + pape.AUTH_MULTI_FACTOR_PHYSICAL]) + self.assertEqual(pt, [pape.AUTH_MULTI_FACTOR]) + + +class DummySuccessResponse: + def __init__(self, message, signed_stuff): + self.message = message + self.signed_stuff = signed_stuff + + def isOpenID1(self): + return False + + def getSignedNS(self, ns_uri): + return self.signed_stuff + + +class PapeResponseTestCase(unittest.TestCase): + def setUp(self): + self.resp = pape.Response() + + def test_construct(self): + self.assertEqual(self.resp.auth_policies, []) + self.assertIsNone(self.resp.auth_time) + self.assertEqual(self.resp.ns_alias, 'pape') + self.assertIsNone(self.resp.nist_auth_level) + + req2 = pape.Response([pape.AUTH_MULTI_FACTOR], + "2004-12-11T10:30:44Z", {pape.LEVELS_NIST: 3}) + self.assertEqual(req2.auth_policies, [pape.AUTH_MULTI_FACTOR]) + self.assertEqual(req2.auth_time, "2004-12-11T10:30:44Z") + self.assertEqual(req2.nist_auth_level, 3) + + def test_add_policy_uri(self): + self.assertEqual(self.resp.auth_policies, []) + self.resp.addPolicyURI(pape.AUTH_MULTI_FACTOR) + self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR]) + self.resp.addPolicyURI(pape.AUTH_MULTI_FACTOR) + self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR]) + self.resp.addPolicyURI(pape.AUTH_PHISHING_RESISTANT) + self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) + self.resp.addPolicyURI(pape.AUTH_MULTI_FACTOR) + self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) + + self.assertRaises(RuntimeError, self.resp.addPolicyURI, pape.AUTH_NONE) + + def test_getExtensionArgs(self): + self.assertEqual(self.resp.getExtensionArgs(), {'auth_policies': pape.AUTH_NONE}) + self.resp.addPolicyURI('http://uri') + self.assertEqual(self.resp.getExtensionArgs(), {'auth_policies': 'http://uri'}) + self.resp.addPolicyURI('http://zig') + self.assertEqual(self.resp.getExtensionArgs(), {'auth_policies': 'http://uri http://zig'}) + self.resp.auth_time = "1776-07-04T14:43:12Z" + self.assertEqual(self.resp.getExtensionArgs(), + {'auth_policies': 'http://uri http://zig', 'auth_time': "1776-07-04T14:43:12Z"}) + self.resp.setAuthLevel(pape.LEVELS_NIST, '3') + nist_args = {'auth_policies': 'http://uri http://zig', 'auth_time': "1776-07-04T14:43:12Z", + 'auth_level.nist': '3', 'auth_level.ns.nist': pape.LEVELS_NIST} + self.assertEqual(self.resp.getExtensionArgs(), nist_args) + + def test_getExtensionArgs_error_auth_age(self): + self.resp.auth_time = "long ago" + self.assertRaises(ValueError, self.resp.getExtensionArgs) + + def test_parseExtensionArgs(self): + args = {'auth_policies': 'http://foo http://bar', + 'auth_time': '1970-01-01T00:00:00Z'} + self.resp.parseExtensionArgs(args, is_openid1=False) + self.assertEqual(self.resp.auth_time, '1970-01-01T00:00:00Z') + self.assertEqual(self.resp.auth_policies, ['http://foo', 'http://bar']) + + def test_parseExtensionArgs_valid_none(self): + args = {'auth_policies': pape.AUTH_NONE} + self.resp.parseExtensionArgs(args, is_openid1=False) + self.assertEqual(self.resp.auth_policies, []) + + def test_parseExtensionArgs_old_none(self): + args = {'auth_policies': 'none'} + self.resp.parseExtensionArgs(args, is_openid1=False) + self.assertEqual(self.resp.auth_policies, []) + + def test_parseExtensionArgs_old_none_strict(self): + args = {'auth_policies': 'none'} + self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) + + def test_parseExtensionArgs_empty(self): + self.resp.parseExtensionArgs({}, is_openid1=False) + self.assertIsNone(self.resp.auth_time) + self.assertEqual(self.resp.auth_policies, []) + + def test_parseExtensionArgs_empty_strict(self): + self.assertRaises(ValueError, self.resp.parseExtensionArgs, {}, is_openid1=False, strict=True) + + def test_parseExtensionArgs_ignore_superfluous_none(self): + policies = [pape.AUTH_NONE, pape.AUTH_MULTI_FACTOR_PHYSICAL] + + args = { + 'auth_policies': ' '.join(policies), + } + + self.resp.parseExtensionArgs(args, is_openid1=False, strict=False) + + self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR_PHYSICAL]) + + def test_parseExtensionArgs_none_strict(self): + policies = [pape.AUTH_NONE, pape.AUTH_MULTI_FACTOR_PHYSICAL] + + args = { + 'auth_policies': ' '.join(policies), + } + + self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) + + def test_parseExtensionArgs_strict_bogus1(self): + args = {'auth_policies': 'http://foo http://bar', + 'auth_time': 'yesterday'} + self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) + + def test_parseExtensionArgs_openid1_strict(self): + args = {'auth_level.nist': '0', + 'auth_policies': pape.AUTH_NONE, + } + self.resp.parseExtensionArgs(args, strict=True, is_openid1=True) + self.assertEqual(self.resp.getAuthLevel(pape.LEVELS_NIST), '0') + self.assertEqual(self.resp.auth_policies, []) + + def test_parseExtensionArgs_strict_no_namespace_decl_openid2(self): + # Test the case where the namespace is not declared for an + # auth level. + args = {'auth_policies': pape.AUTH_NONE, + 'auth_level.nist': '0', + } + self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) + + def test_parseExtensionArgs_nostrict_no_namespace_decl_openid2(self): + # Test the case where the namespace is not declared for an + # auth level. + args = {'auth_policies': pape.AUTH_NONE, + 'auth_level.nist': '0', + } + self.resp.parseExtensionArgs(args, is_openid1=False, strict=False) + + # There is no namespace declaration for this auth level. + self.assertRaises(KeyError, self.resp.getAuthLevel, pape.LEVELS_NIST) + + def test_parseExtensionArgs_strict_good(self): + args = {'auth_policies': 'http://foo http://bar', + 'auth_time': '1970-01-01T00:00:00Z', + 'auth_level.nist': '0', + 'auth_level.ns.nist': pape.LEVELS_NIST} + self.resp.parseExtensionArgs(args, is_openid1=False, strict=True) + self.assertEqual(self.resp.auth_policies, ['http://foo', 'http://bar']) + self.assertEqual(self.resp.auth_time, '1970-01-01T00:00:00Z') + self.assertEqual(self.resp.nist_auth_level, 0) + + def test_parseExtensionArgs_nostrict_bogus(self): + args = {'auth_policies': 'http://foo http://bar', + 'auth_time': 'when the cows come home', + 'nist_auth_level': 'some'} + self.resp.parseExtensionArgs(args, is_openid1=False) + self.assertEqual(self.resp.auth_policies, ['http://foo', 'http://bar']) + self.assertIsNone(self.resp.auth_time) + self.assertIsNone(self.resp.nist_auth_level) + + def test_fromSuccessResponse(self): + policy_uris = [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT] + openid_req_msg = Message.fromOpenIDArgs({ + 'mode': 'id_res', + 'ns': OPENID2_NS, + 'ns.pape': pape.ns_uri, + 'pape.auth_policies': ' '.join(policy_uris), + 'pape.auth_time': '1970-01-01T00:00:00Z' + }) + signed_stuff = { + 'auth_policies': ' '.join(policy_uris), + 'auth_time': '1970-01-01T00:00:00Z' + } + oid_req = DummySuccessResponse(openid_req_msg, signed_stuff) + req = pape.Response.fromSuccessResponse(oid_req) + self.assertEqual(req.auth_policies, policy_uris) + self.assertEqual(req.auth_time, '1970-01-01T00:00:00Z') + + def test_fromSuccessResponseNoSignedArgs(self): + policy_uris = [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT] + openid_req_msg = Message.fromOpenIDArgs({ + 'mode': 'id_res', + 'ns': OPENID2_NS, + 'ns.pape': pape.ns_uri, + 'pape.auth_policies': ' '.join(policy_uris), + 'pape.auth_time': '1970-01-01T00:00:00Z' + }) + + signed_stuff = {} + + class NoSigningDummyResponse(DummySuccessResponse): + def getSignedNS(self, ns_uri): + return None + + oid_req = NoSigningDummyResponse(openid_req_msg, signed_stuff) + resp = pape.Response.fromSuccessResponse(oid_req) + self.assertIsNone(resp) -class PapeImportTestCase(unittest.TestCase): - def test_version(self): - from openid.extensions.draft import pape5 - self.assert_(pape is pape5) +if __name__ == '__main__': + unittest.main() diff --git a/openid/test/test_pape_draft5.py b/openid/test/test_pape_draft5.py index 104411a..9585206 100644 --- a/openid/test/test_pape_draft5.py +++ b/openid/test/test_pape_draft5.py @@ -1,390 +1,10 @@ import unittest -import warnings -from openid.extensions.draft import pape5 as pape -from openid.message import OPENID2_NS, Message -from openid.server import server +from openid.extensions import pape -warnings.filterwarnings('ignore', module=__name__, - message='"none" used as a policy URI') - -class PapeRequestTestCase(unittest.TestCase): - def setUp(self): - self.req = pape.Request() - - def test_construct(self): - self.assertEqual(self.req.preferred_auth_policies, []) - self.assertIsNone(self.req.max_auth_age) - self.assertEqual(self.req.ns_alias, 'pape') - self.assertFalse(self.req.preferred_auth_level_types) - - bogus_levels = ['http://janrain.com/our_levels'] - req2 = pape.Request( - [pape.AUTH_MULTI_FACTOR], 1000, bogus_levels) - self.assertEqual(req2.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR]) - self.assertEqual(req2.max_auth_age, 1000) - self.assertEqual(req2.preferred_auth_level_types, bogus_levels) - - def test_addAuthLevel(self): - self.req.addAuthLevel('http://example.com/', 'example') - self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/']) - self.assertEqual(self.req.auth_level_aliases['example'], 'http://example.com/') - - self.req.addAuthLevel('http://example.com/1', 'example1') - self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/', 'http://example.com/1']) - - self.req.addAuthLevel('http://example.com/', 'exmpl') - self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/', 'http://example.com/1']) - - self.req.addAuthLevel('http://example.com/', 'example') - self.assertEqual(self.req.preferred_auth_level_types, ['http://example.com/', 'http://example.com/1']) - - self.assertRaises(KeyError, self.req.addAuthLevel, 'http://example.com/2', 'example') - - # alias is None; we expect a new one to be generated. - uri = 'http://another.example.com/' - self.req.addAuthLevel(uri) - self.assert_(uri in self.req.auth_level_aliases.values()) - - # We don't expect a new alias to be generated if one already - # exists. - before_aliases = self.req.auth_level_aliases.keys() - self.req.addAuthLevel(uri) - after_aliases = self.req.auth_level_aliases.keys() - self.assertEqual(after_aliases, before_aliases) - - def test_add_policy_uri(self): - self.assertEqual(self.req.preferred_auth_policies, []) - self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) - self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR]) - self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) - self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR]) - self.req.addPolicyURI(pape.AUTH_PHISHING_RESISTANT) - self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) - self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) - self.assertEqual(self.req.preferred_auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) - - def test_getExtensionArgs(self): - self.assertEqual(self.req.getExtensionArgs(), {'preferred_auth_policies': ''}) - self.req.addPolicyURI('http://uri') - self.assertEqual(self.req.getExtensionArgs(), {'preferred_auth_policies': 'http://uri'}) - self.req.addPolicyURI('http://zig') - self.assertEqual(self.req.getExtensionArgs(), {'preferred_auth_policies': 'http://uri http://zig'}) - self.req.max_auth_age = 789 - self.assertEqual(self.req.getExtensionArgs(), - {'preferred_auth_policies': 'http://uri http://zig', 'max_auth_age': '789'}) - - def test_getExtensionArgsWithAuthLevels(self): - uri = 'http://example.com/auth_level' - alias = 'my_level' - self.req.addAuthLevel(uri, alias) - - uri2 = 'http://example.com/auth_level_2' - alias2 = 'my_level_2' - self.req.addAuthLevel(uri2, alias2) - - expected_args = { - ('auth_level.ns.%s' % alias): uri, - ('auth_level.ns.%s' % alias2): uri2, - 'preferred_auth_level_types': ' '.join([alias, alias2]), - 'preferred_auth_policies': '', - } - - self.assertEqual(self.req.getExtensionArgs(), expected_args) - - def test_parseExtensionArgsWithAuthLevels(self): - uri = 'http://example.com/auth_level' - alias = 'my_level' - - uri2 = 'http://example.com/auth_level_2' - alias2 = 'my_level_2' - - request_args = { - ('auth_level.ns.%s' % alias): uri, - ('auth_level.ns.%s' % alias2): uri2, - 'preferred_auth_level_types': ' '.join([alias, alias2]), - 'preferred_auth_policies': '', - } - - # Check request object state - self.req.parseExtensionArgs(request_args, is_openid1=False, strict=False) - - expected_auth_levels = [uri, uri2] - - self.assertEqual(self.req.preferred_auth_level_types, expected_auth_levels) - self.assertEqual(self.req.auth_level_aliases[alias], uri) - self.assertEqual(self.req.auth_level_aliases[alias2], uri2) - - def test_parseExtensionArgsWithAuthLevels_openID1(self): - request_args = { - 'preferred_auth_level_types': 'nist jisa', - } - expected_auth_levels = [pape.LEVELS_NIST, pape.LEVELS_JISA] - self.req.parseExtensionArgs(request_args, is_openid1=True) - self.assertEqual(self.req.preferred_auth_level_types, expected_auth_levels) - - self.req = pape.Request() - self.req.parseExtensionArgs(request_args, is_openid1=False) - self.assertEqual(self.req.preferred_auth_level_types, []) - - self.req = pape.Request() - self.assertRaises(ValueError, self.req.parseExtensionArgs, request_args, is_openid1=False, strict=True) - - def test_parseExtensionArgs_ignoreBadAuthLevels(self): - request_args = {'preferred_auth_level_types': 'monkeys'} - self.req.parseExtensionArgs(request_args, False) - self.assertEqual(self.req.preferred_auth_level_types, []) - - def test_parseExtensionArgs_strictBadAuthLevels(self): - request_args = {'preferred_auth_level_types': 'monkeys'} - self.assertRaises(ValueError, self.req.parseExtensionArgs, request_args, is_openid1=False, strict=True) - - def test_parseExtensionArgs(self): - args = {'preferred_auth_policies': 'http://foo http://bar', - 'max_auth_age': '9'} - self.req.parseExtensionArgs(args, False) - self.assertEqual(self.req.max_auth_age, 9) - self.assertEqual(self.req.preferred_auth_policies, ['http://foo', 'http://bar']) - self.assertEqual(self.req.preferred_auth_level_types, []) - - def test_parseExtensionArgs_strict_bad_auth_age(self): - args = {'max_auth_age': 'not an int'} - self.assertRaises(ValueError, self.req.parseExtensionArgs, args, is_openid1=False, strict=True) - - def test_parseExtensionArgs_empty(self): - self.req.parseExtensionArgs({}, False) - self.assertIsNone(self.req.max_auth_age) - self.assertEqual(self.req.preferred_auth_policies, []) - self.assertEqual(self.req.preferred_auth_level_types, []) - - def test_fromOpenIDRequest(self): - policy_uris = [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT] - openid_req_msg = Message.fromOpenIDArgs({ - 'mode': 'checkid_setup', - 'ns': OPENID2_NS, - 'ns.pape': pape.ns_uri, - 'pape.preferred_auth_policies': ' '.join(policy_uris), - 'pape.max_auth_age': '5476' - }) - oid_req = server.OpenIDRequest() - oid_req.message = openid_req_msg - req = pape.Request.fromOpenIDRequest(oid_req) - self.assertEqual(req.preferred_auth_policies, policy_uris) - self.assertEqual(req.max_auth_age, 5476) - - def test_fromOpenIDRequest_no_pape(self): - message = Message() - openid_req = server.OpenIDRequest() - openid_req.message = message - pape_req = pape.Request.fromOpenIDRequest(openid_req) - assert(pape_req is None) - - def test_preferred_types(self): - self.req.addPolicyURI(pape.AUTH_PHISHING_RESISTANT) - self.req.addPolicyURI(pape.AUTH_MULTI_FACTOR) - pt = self.req.preferredTypes([pape.AUTH_MULTI_FACTOR, - pape.AUTH_MULTI_FACTOR_PHYSICAL]) - self.assertEqual(pt, [pape.AUTH_MULTI_FACTOR]) - - -class DummySuccessResponse: - def __init__(self, message, signed_stuff): - self.message = message - self.signed_stuff = signed_stuff - - def isOpenID1(self): - return False - - def getSignedNS(self, ns_uri): - return self.signed_stuff - - -class PapeResponseTestCase(unittest.TestCase): - def setUp(self): - self.resp = pape.Response() - - def test_construct(self): - self.assertEqual(self.resp.auth_policies, []) - self.assertIsNone(self.resp.auth_time) - self.assertEqual(self.resp.ns_alias, 'pape') - self.assertIsNone(self.resp.nist_auth_level) - - req2 = pape.Response([pape.AUTH_MULTI_FACTOR], - "2004-12-11T10:30:44Z", {pape.LEVELS_NIST: 3}) - self.assertEqual(req2.auth_policies, [pape.AUTH_MULTI_FACTOR]) - self.assertEqual(req2.auth_time, "2004-12-11T10:30:44Z") - self.assertEqual(req2.nist_auth_level, 3) - - def test_add_policy_uri(self): - self.assertEqual(self.resp.auth_policies, []) - self.resp.addPolicyURI(pape.AUTH_MULTI_FACTOR) - self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR]) - self.resp.addPolicyURI(pape.AUTH_MULTI_FACTOR) - self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR]) - self.resp.addPolicyURI(pape.AUTH_PHISHING_RESISTANT) - self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) - self.resp.addPolicyURI(pape.AUTH_MULTI_FACTOR) - self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT]) - - self.assertRaises(RuntimeError, self.resp.addPolicyURI, pape.AUTH_NONE) - - def test_getExtensionArgs(self): - self.assertEqual(self.resp.getExtensionArgs(), {'auth_policies': pape.AUTH_NONE}) - self.resp.addPolicyURI('http://uri') - self.assertEqual(self.resp.getExtensionArgs(), {'auth_policies': 'http://uri'}) - self.resp.addPolicyURI('http://zig') - self.assertEqual(self.resp.getExtensionArgs(), {'auth_policies': 'http://uri http://zig'}) - self.resp.auth_time = "1776-07-04T14:43:12Z" - self.assertEqual(self.resp.getExtensionArgs(), - {'auth_policies': 'http://uri http://zig', 'auth_time': "1776-07-04T14:43:12Z"}) - self.resp.setAuthLevel(pape.LEVELS_NIST, '3') - nist_args = {'auth_policies': 'http://uri http://zig', 'auth_time': "1776-07-04T14:43:12Z", - 'auth_level.nist': '3', 'auth_level.ns.nist': pape.LEVELS_NIST} - self.assertEqual(self.resp.getExtensionArgs(), nist_args) - - def test_getExtensionArgs_error_auth_age(self): - self.resp.auth_time = "long ago" - self.assertRaises(ValueError, self.resp.getExtensionArgs) - - def test_parseExtensionArgs(self): - args = {'auth_policies': 'http://foo http://bar', - 'auth_time': '1970-01-01T00:00:00Z'} - self.resp.parseExtensionArgs(args, is_openid1=False) - self.assertEqual(self.resp.auth_time, '1970-01-01T00:00:00Z') - self.assertEqual(self.resp.auth_policies, ['http://foo', 'http://bar']) - - def test_parseExtensionArgs_valid_none(self): - args = {'auth_policies': pape.AUTH_NONE} - self.resp.parseExtensionArgs(args, is_openid1=False) - self.assertEqual(self.resp.auth_policies, []) - - def test_parseExtensionArgs_old_none(self): - args = {'auth_policies': 'none'} - self.resp.parseExtensionArgs(args, is_openid1=False) - self.assertEqual(self.resp.auth_policies, []) - - def test_parseExtensionArgs_old_none_strict(self): - args = {'auth_policies': 'none'} - self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) - - def test_parseExtensionArgs_empty(self): - self.resp.parseExtensionArgs({}, is_openid1=False) - self.assertIsNone(self.resp.auth_time) - self.assertEqual(self.resp.auth_policies, []) - - def test_parseExtensionArgs_empty_strict(self): - self.assertRaises(ValueError, self.resp.parseExtensionArgs, {}, is_openid1=False, strict=True) - - def test_parseExtensionArgs_ignore_superfluous_none(self): - policies = [pape.AUTH_NONE, pape.AUTH_MULTI_FACTOR_PHYSICAL] - - args = { - 'auth_policies': ' '.join(policies), - } - - self.resp.parseExtensionArgs(args, is_openid1=False, strict=False) - - self.assertEqual(self.resp.auth_policies, [pape.AUTH_MULTI_FACTOR_PHYSICAL]) - - def test_parseExtensionArgs_none_strict(self): - policies = [pape.AUTH_NONE, pape.AUTH_MULTI_FACTOR_PHYSICAL] - - args = { - 'auth_policies': ' '.join(policies), - } - - self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) - - def test_parseExtensionArgs_strict_bogus1(self): - args = {'auth_policies': 'http://foo http://bar', - 'auth_time': 'yesterday'} - self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) - - def test_parseExtensionArgs_openid1_strict(self): - args = {'auth_level.nist': '0', - 'auth_policies': pape.AUTH_NONE, - } - self.resp.parseExtensionArgs(args, strict=True, is_openid1=True) - self.assertEqual(self.resp.getAuthLevel(pape.LEVELS_NIST), '0') - self.assertEqual(self.resp.auth_policies, []) - - def test_parseExtensionArgs_strict_no_namespace_decl_openid2(self): - # Test the case where the namespace is not declared for an - # auth level. - args = {'auth_policies': pape.AUTH_NONE, - 'auth_level.nist': '0', - } - self.assertRaises(ValueError, self.resp.parseExtensionArgs, args, is_openid1=False, strict=True) - - def test_parseExtensionArgs_nostrict_no_namespace_decl_openid2(self): - # Test the case where the namespace is not declared for an - # auth level. - args = {'auth_policies': pape.AUTH_NONE, - 'auth_level.nist': '0', - } - self.resp.parseExtensionArgs(args, is_openid1=False, strict=False) - - # There is no namespace declaration for this auth level. - self.assertRaises(KeyError, self.resp.getAuthLevel, pape.LEVELS_NIST) - - def test_parseExtensionArgs_strict_good(self): - args = {'auth_policies': 'http://foo http://bar', - 'auth_time': '1970-01-01T00:00:00Z', - 'auth_level.nist': '0', - 'auth_level.ns.nist': pape.LEVELS_NIST} - self.resp.parseExtensionArgs(args, is_openid1=False, strict=True) - self.assertEqual(self.resp.auth_policies, ['http://foo', 'http://bar']) - self.assertEqual(self.resp.auth_time, '1970-01-01T00:00:00Z') - self.assertEqual(self.resp.nist_auth_level, 0) - - def test_parseExtensionArgs_nostrict_bogus(self): - args = {'auth_policies': 'http://foo http://bar', - 'auth_time': 'when the cows come home', - 'nist_auth_level': 'some'} - self.resp.parseExtensionArgs(args, is_openid1=False) - self.assertEqual(self.resp.auth_policies, ['http://foo', 'http://bar']) - self.assertIsNone(self.resp.auth_time) - self.assertIsNone(self.resp.nist_auth_level) - - def test_fromSuccessResponse(self): - policy_uris = [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT] - openid_req_msg = Message.fromOpenIDArgs({ - 'mode': 'id_res', - 'ns': OPENID2_NS, - 'ns.pape': pape.ns_uri, - 'pape.auth_policies': ' '.join(policy_uris), - 'pape.auth_time': '1970-01-01T00:00:00Z' - }) - signed_stuff = { - 'auth_policies': ' '.join(policy_uris), - 'auth_time': '1970-01-01T00:00:00Z' - } - oid_req = DummySuccessResponse(openid_req_msg, signed_stuff) - req = pape.Response.fromSuccessResponse(oid_req) - self.assertEqual(req.auth_policies, policy_uris) - self.assertEqual(req.auth_time, '1970-01-01T00:00:00Z') - - def test_fromSuccessResponseNoSignedArgs(self): - policy_uris = [pape.AUTH_MULTI_FACTOR, pape.AUTH_PHISHING_RESISTANT] - openid_req_msg = Message.fromOpenIDArgs({ - 'mode': 'id_res', - 'ns': OPENID2_NS, - 'ns.pape': pape.ns_uri, - 'pape.auth_policies': ' '.join(policy_uris), - 'pape.auth_time': '1970-01-01T00:00:00Z' - }) - - signed_stuff = {} - - class NoSigningDummyResponse(DummySuccessResponse): - def getSignedNS(self, ns_uri): - return None - - oid_req = NoSigningDummyResponse(openid_req_msg, signed_stuff) - resp = pape.Response.fromSuccessResponse(oid_req) - self.assertIsNone(resp) - - -if __name__ == '__main__': - unittest.main() +class PapeImportTestCase(unittest.TestCase): + def test_version(self): + from openid.extensions.draft import pape5 + self.assertEqual(pape.Request, pape5.Request) + self.assertEqual(pape.Response, pape5.Response) |