summaryrefslogtreecommitdiff
path: root/openid/extensions
diff options
context:
space:
mode:
authortailor <cygnus@janrain.com>2008-10-13 19:18:24 +0000
committertailor <cygnus@janrain.com>2008-10-13 19:18:24 +0000
commit55fd91f77f840d8d10eace04a61a04f0df93eee7 (patch)
tree8655bd503f64e4cc911939bee5a137a1c55085d0 /openid/extensions
parent8baecbf399aad60b0034b9e6356443f7a8862010 (diff)
downloadopenid-55fd91f77f840d8d10eace04a61a04f0df93eee7.tar.gz
[project @ Start of PAPE draft 5 implementation]
Diffstat (limited to 'openid/extensions')
-rw-r--r--openid/extensions/draft/pape5.py350
1 files changed, 350 insertions, 0 deletions
diff --git a/openid/extensions/draft/pape5.py b/openid/extensions/draft/pape5.py
new file mode 100644
index 0000000..400c76c
--- /dev/null
+++ b/openid/extensions/draft/pape5.py
@@ -0,0 +1,350 @@
+"""An implementation of the OpenID Provider Authentication Policy
+Extension 1.0, Draft 5
+
+@see: http://openid.net/developers/specs/
+
+@since: 2.1.0
+"""
+
+__all__ = [
+ 'Request',
+ 'Response',
+ 'ns_uri',
+ 'AUTH_PHISHING_RESISTANT',
+ 'AUTH_MULTI_FACTOR',
+ 'AUTH_MULTI_FACTOR_PHYSICAL',
+ 'AUTH_NONE',
+ 'LEVELS_NIST',
+ 'LEVELS_JISA',
+ ]
+
+from openid.extension import Extension
+import re
+
+ns_uri = "http://specs.openid.net/extensions/pape/1.0"
+
+AUTH_MULTI_FACTOR_PHYSICAL = \
+ 'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical'
+AUTH_MULTI_FACTOR = \
+ 'http://schemas.openid.net/pape/policies/2007/06/multi-factor'
+AUTH_PHISHING_RESISTANT = \
+ 'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant'
+AUTH_NONE = \
+ 'http://schemas.openid.net/pape/policies/2007/06/none'
+
+TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$')
+
+LEVELS_NIST = 'http://csrc.nist.gov/publications/nistpubs/800-63/SP800-63V1_0_2.pdf'
+LEVELS_JISA = 'http://www.jisa.or.jp/spec/auth_level.html'
+
+_default_auth_level_aliases = {
+ 'nist': LEVELS_NIST,
+ 'jisa': LEVELS_JISA,
+ }
+
+class Request(Extension):
+ """A Provider Authentication Policy request, sent from a relying
+ party to a provider
+
+ @ivar preferred_auth_policies: The authentication policies that
+ the relying party prefers
+ @type preferred_auth_policies: [str]
+
+ @ivar max_auth_age: The maximum time, in seconds, that the relying
+ party wants to allow to have elapsed before the user must
+ re-authenticate
+ @type max_auth_age: int or NoneType
+
+ @ivar auth_levels: Ordered list of authentication level namespace
+ URIs
+ @type auth_levels: [str]
+ """
+
+ ns_alias = 'pape'
+
+ def __init__(self, preferred_auth_policies=None, max_auth_age=None, auth_levels=None):
+ super(Request, self).__init__(self)
+ if preferred_auth_policies is None:
+ preferred_auth_policies = []
+
+ self.preferred_auth_policies = preferred_auth_policies
+ self.max_auth_age = max_auth_age
+ self.auth_levels = []
+ self.auth_level_aliases = _default_auth_level_aliases.copy()
+
+ if auth_levels is not None:
+ for auth_level in auth_levels:
+ self.addAuthLevel(auth_level)
+
+ def __nonzero__(self):
+ return bool(self.preferred_auth_policies or
+ self.max_auth_age is not None or
+ self.auth_levels)
+
+ def addPolicyURI(self, policy_uri):
+ """Add an acceptable authentication policy URI to this request
+
+ This method is intended to be used by the relying party to add
+ acceptable authentication types to the request.
+
+ @param policy_uri: The identifier for the preferred type of
+ authentication.
+ @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-05.html#auth_policies
+ """
+ if policy_uri not in self.preferred_auth_policies:
+ self.preferred_auth_policies.append(policy_uri)
+
+ def addAuthLevel(self, auth_level_uri, alias=None):
+ """Add an auth level URI and alias to this request.
+
+ @param auth_level_uri: The auth level URI to send in the
+ request.
+
+ @param alias: The namespace alias to use for this auth level
+ in the request. May be None if the alias is not important.
+ """
+ if alias is None:
+ try:
+ alias = self._getAlias(auth_level_uri)
+ except KeyError:
+ alias = self._generateAlias()
+ else:
+ existing_uri = self.auth_level_aliases.get(alias)
+ if existing_uri is not None and existing_uri != auth_level_uri:
+ raise KeyError('Attempting to redefine alias %r from %r to %r',
+ alias, existing_uri, auth_level_uri)
+
+ self.auth_level_aliases[alias] = auth_level_uri
+ if auth_level_uri not in self.auth_levels:
+ self.auth_levels.append(auth_level_uri)
+
+ def _generateAlias(self):
+ for i in xrange(1000):
+ alias = 'cust%d' % (i,)
+ if alias not in self.auth_level_aliases:
+ return alias
+
+ raise RuntimeError('Could not find an unused alias (tried 1000!)')
+
+ def _getAlias(self, auth_level_uri):
+ for (alias, existing_uri) in self.auth_level_aliases.iteritems():
+ if auth_level_uri == existing_uri:
+ return alias
+
+ raise KeyError(auth_level_uri)
+
+ def getExtensionArgs(self):
+ """@see: C{L{Extension.getExtensionArgs}}
+ """
+ ns_args = {
+ 'preferred_auth_policies':' '.join(self.preferred_auth_policies),
+ }
+
+ if self.max_auth_age is not None:
+ ns_args['max_auth_age'] = str(self.max_auth_age)
+
+ if self.auth_levels:
+ preferred_types = []
+
+ for auth_level_uri in self.auth_levels:
+ alias = self._getAlias(auth_level_uri)
+ ns_args['auth_level.ns.%s' % (alias,)] = auth_level_uri
+ preferred_types.append(alias)
+
+ ns_args['preferred_auth_level_types'] = ' '.join(preferred_types)
+
+ return ns_args
+
+ def fromOpenIDRequest(cls, request):
+ """Instantiate a Request object from the arguments in a
+ C{checkid_*} OpenID message
+ """
+ self = cls()
+ args = request.message.getArgs(self.ns_uri)
+
+ if args == {}:
+ return None
+
+ self.parseExtensionArgs(args)
+ return self
+
+ fromOpenIDRequest = classmethod(fromOpenIDRequest)
+
+ def parseExtensionArgs(self, args):
+ """Set the state of this request to be that expressed in these
+ PAPE arguments
+
+ @param args: The PAPE arguments without a namespace
+
+ @rtype: None
+
+ @raises ValueError: When the max_auth_age is not parseable as
+ an integer
+ """
+
+ # preferred_auth_policies is a space-separated list of policy URIs
+ self.preferred_auth_policies = []
+
+ policies_str = args.get('preferred_auth_policies')
+ if policies_str:
+ for uri in policies_str.split(' '):
+ if uri not in self.preferred_auth_policies:
+ self.preferred_auth_policies.append(uri)
+
+ # max_auth_age is base-10 integer number of seconds
+ max_auth_age_str = args.get('max_auth_age')
+ self.max_auth_age = None
+
+ if max_auth_age_str:
+ try:
+ self.max_auth_age = int(max_auth_age_str)
+ except ValueError:
+ pass
+
+ def preferredTypes(self, supported_types):
+ """Given a list of authentication policy URIs that a provider
+ supports, this method returns the subsequence of those types
+ that are preferred by the relying party.
+
+ @param supported_types: A sequence of authentication policy
+ type URIs that are supported by a provider
+
+ @returns: The sub-sequence of the supported types that are
+ preferred by the relying party. This list will be ordered
+ in the order that the types appear in the supported_types
+ sequence, and may be empty if the provider does not prefer
+ any of the supported authentication types.
+
+ @returntype: [str]
+ """
+ return filter(self.preferred_auth_policies.__contains__,
+ supported_types)
+
+Request.ns_uri = ns_uri
+
+
+class Response(Extension):
+ """A Provider Authentication Policy response, sent from a provider
+ to a relying party
+ """
+
+ ns_alias = 'pape'
+
+ def __init__(self, auth_policies=None, auth_time=None,
+ nist_auth_level=None):
+ super(Response, self).__init__(self)
+ if auth_policies:
+ self.auth_policies = auth_policies
+ else:
+ self.auth_policies = []
+
+ self.auth_time = auth_time
+ self.nist_auth_level = nist_auth_level
+
+ def addPolicyURI(self, policy_uri):
+ """Add a authentication policy to this response
+
+ This method is intended to be used by the provider to add a
+ policy that the provider conformed to when authenticating the user.
+
+ @param policy_uri: The identifier for the preferred type of
+ authentication.
+ @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies
+ """
+ if policy_uri not in self.auth_policies:
+ self.auth_policies.append(policy_uri)
+
+ def fromSuccessResponse(cls, success_response):
+ """Create a C{L{Response}} object from a successful OpenID
+ library response
+ (C{L{openid.consumer.consumer.SuccessResponse}}) response
+ message
+
+ @param success_response: A SuccessResponse from consumer.complete()
+ @type success_response: C{L{openid.consumer.consumer.SuccessResponse}}
+
+ @rtype: Response or None
+ @returns: A provider authentication policy response from the
+ data that was supplied with the C{id_res} response or None
+ if the provider sent no signed PAPE response arguments.
+ """
+ self = cls()
+
+ # PAPE requires that the args be signed.
+ args = success_response.getSignedNS(self.ns_uri)
+
+ # Only try to construct a PAPE response if the arguments were
+ # signed in the OpenID response. If not, return None.
+ if args is not None:
+ self.parseExtensionArgs(args)
+ return self
+ else:
+ return None
+
+ def parseExtensionArgs(self, args, strict=False):
+ """Parse the provider authentication policy arguments into the
+ internal state of this object
+
+ @param args: unqualified provider authentication policy
+ arguments
+
+ @param strict: Whether to raise an exception when bad data is
+ encountered
+
+ @returns: None. The data is parsed into the internal fields of
+ this object.
+ """
+ policies_str = args.get('auth_policies')
+ if policies_str and policies_str != 'none':
+ self.auth_policies = policies_str.split(' ')
+
+ nist_level_str = args.get('nist_auth_level')
+ if nist_level_str:
+ try:
+ nist_level = int(nist_level_str)
+ except ValueError:
+ if strict:
+ raise ValueError('nist_auth_level must be an integer between '
+ 'zero and four, inclusive')
+ else:
+ self.nist_auth_level = None
+ else:
+ if 0 <= nist_level < 5:
+ self.nist_auth_level = nist_level
+
+ auth_time = args.get('auth_time')
+ if auth_time:
+ if TIME_VALIDATOR.match(auth_time):
+ self.auth_time = auth_time
+ elif strict:
+ raise ValueError("auth_time must be in RFC3339 format")
+
+ fromSuccessResponse = classmethod(fromSuccessResponse)
+
+ def getExtensionArgs(self):
+ """@see: C{L{Extension.getExtensionArgs}}
+ """
+ if len(self.auth_policies) == 0:
+ ns_args = {
+ 'auth_policies':'none',
+ }
+ else:
+ ns_args = {
+ 'auth_policies':' '.join(self.auth_policies),
+ }
+
+ if self.nist_auth_level is not None:
+ if self.nist_auth_level not in range(0, 5):
+ raise ValueError('nist_auth_level must be an integer between '
+ 'zero and four, inclusive')
+ ns_args['nist_auth_level'] = str(self.nist_auth_level)
+
+ if self.auth_time is not None:
+ if not TIME_VALIDATOR.match(self.auth_time):
+ raise ValueError('auth_time must be in RFC3339 format')
+
+ ns_args['auth_time'] = self.auth_time
+
+ return ns_args
+
+Response.ns_uri = ns_uri