diff options
-rwxr-xr-x | example/idp/idp.app.py | 8 | ||||
-rw-r--r-- | example/idp/idp.conf | 2 | ||||
-rw-r--r-- | src/s2repoze/plugins/challenge_decider.py | 53 | ||||
-rw-r--r-- | src/s2repoze/plugins/ini.py | 35 | ||||
-rw-r--r-- | src/s2repoze/plugins/sp.py | 161 | ||||
-rw-r--r-- | src/saml2/attribute_resolver.py | 30 | ||||
-rw-r--r-- | src/saml2/client.py | 176 | ||||
-rw-r--r-- | src/saml2/config.py | 42 | ||||
-rw-r--r-- | src/saml2/samlp.py | 5 | ||||
-rw-r--r-- | src/saml2/server.py | 375 | ||||
-rw-r--r-- | src/saml2/soap.py | 18 | ||||
-rw-r--r-- | src/saml2/time_util.py | 14 | ||||
-rw-r--r-- | tests/metadata.xml | 6 | ||||
-rw-r--r-- | tests/server.config | 24 | ||||
-rw-r--r-- | tests/test_client.py | 44 | ||||
-rw-r--r-- | tests/test_config.py | 30 | ||||
-rw-r--r-- | tests/test_server.py | 418 | ||||
-rwxr-xr-x | tools/make_metadata.py | 98 |
18 files changed, 935 insertions, 604 deletions
diff --git a/example/idp/idp.app.py b/example/idp/idp.app.py index ed026a9b..5c6acab6 100755 --- a/example/idp/idp.app.py +++ b/example/idp/idp.app.py @@ -2,9 +2,9 @@ import re import base64 -from cgi import escape +from cgi import escape, parse_qs import urllib -import urlparse +#import urlparse from saml2 import server from saml2.utils import make_instance, sid, decode_base64_and_inflate @@ -76,7 +76,7 @@ def sso(environ, start_response, user, logger): logger and logger.info("Environ keys: %s" % environ.keys()) if "QUERY_STRING" in environ: logger and logger.info("Query string: %s" % environ["QUERY_STRING"]) - query = urlparse.parse_qs(environ["QUERY_STRING"]) + query = parse_qs(environ["QUERY_STRING"]) elif "s2repoze.qinfo" in environ: query = environ["s2repoze.qinfo"] # base 64 encoded request @@ -125,7 +125,7 @@ def not_found(environ, start_response, logger): def not_authn(environ, start_response, logger): if "QUERY_STRING" in environ: - query = urlparse.parse_qs(environ["QUERY_STRING"]) + query = parse_qs(environ["QUERY_STRING"]) logger and logger.info("query: %s" % query) start_response('401 Unauthorized', [('Content-Type', 'text/plain')]) return ['Unknown user'] diff --git a/example/idp/idp.conf b/example/idp/idp.conf index c7c0d55c..04aaa90d 100644 --- a/example/idp/idp.conf +++ b/example/idp/idp.conf @@ -1,5 +1,5 @@ { - "entityid" : "urn:mace:umu.se:saml:roland:idp", + "entityid" : "urn:mace:example.com:saml:roland:idp", "service": ["idp"], "my_name" : "Rolands IdP", "debug" : 1, diff --git a/src/s2repoze/plugins/challenge_decider.py b/src/s2repoze/plugins/challenge_decider.py index 6656ec9e..0ec64aa7 100644 --- a/src/s2repoze/plugins/challenge_decider.py +++ b/src/s2repoze/plugins/challenge_decider.py @@ -1,7 +1,60 @@ from paste.request import construct_url +import zope.interface +from repoze.who.interfaces import IRequestClassifier +from repoze.who.interfaces import IChallengeDecider + +from paste.httpheaders import REQUEST_METHOD +from paste.httpheaders import CONTENT_TYPE +from paste.httpheaders import USER_AGENT +from paste.httpheaders import WWW_AUTHENTICATE import re +_DAV_METHODS = ( + 'OPTIONS', + 'PROPFIND', + 'PROPPATCH', + 'MKCOL', + 'LOCK', + 'UNLOCK', + 'TRACE', + 'DELETE', + 'COPY', + 'MOVE' + ) + +_DAV_USERAGENTS = ( + 'Microsoft Data Access Internet Publishing Provider', + 'WebDrive', + 'Zope External Editor', + 'WebDAVFS', + 'Goliath', + 'neon', + 'davlib', + 'wsAPI', + 'Microsoft-WebDAV' + ) + +def my_request_classifier(environ): + """ Returns one of the classifiers 'dav', 'xmlpost', or 'browser', + depending on the imperative logic below""" + request_method = REQUEST_METHOD(environ) + if request_method in _DAV_METHODS: + return 'dav' + useragent = USER_AGENT(environ) + if useragent: + for agent in _DAV_USERAGENTS: + if useragent.find(agent) != -1: + return 'dav' + if request_method == 'POST': + if CONTENT_TYPE(environ) == 'text/xml': + return 'xmlpost' + elif CONTENT_TYPE(environ) == "application/soap+xml": + return 'soap' + return 'browser' + +zope.interface.directlyProvides(my_request_classifier, IRequestClassifier) + class my_challenge_decider: def __init__(self,path_login=""): self.path_login = path_login diff --git a/src/s2repoze/plugins/ini.py b/src/s2repoze/plugins/ini.py index 35f314f4..d3e4eb77 100644 --- a/src/s2repoze/plugins/ini.py +++ b/src/s2repoze/plugins/ini.py @@ -2,39 +2,36 @@ import ConfigParser, os from zope.interface import implements -from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator +#from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator from repoze.who.interfaces import IMetadataProvider class INIMetadataProvider(object): implements(IMetadataProvider) - def __init__(self, ini_file): + def __init__(self, ini_file, key_attribute): self.users = ConfigParser.ConfigParser() self.users.readfp(open(ini_file)) + self.key_attribute = key_attribute -# def authenticate(self, environ, identity): -# try: -# username = identity['login'] -# password = identity['password'] -# except KeyError: -# return None -# -# success = User.authenticate(username, password) -# -# return success - def add_metadata(self, environ, identity): logger = environ.get('repoze.who.logger','') - username = identity.get('repoze.who.userid') - logger and logger.info("Identity: %s (before)" % (identity.items(),)) + key = identity.get('repoze.who.userid') + #logger and logger.info("Identity: %s (before)" % (identity.items(),)) try: - identity["user"] = self.users.items(username) - logger and logger.info("Identity: %s (after)" % (identity.items(),)) + if self.key_attribute: + for sec in self.users.sections(): + if self.users.has_option(sec,self.key_attribute): + if key in self.users.get(sec, self.key_attribute): + identity["user"] = dict(self.users.items(sec)) + break + else: + identity["user"] = dict(self.users.items(key)) + #logger and logger.info("Identity: %s (after)" % (identity.items(),)) except ValueError: pass -def make_plugin(ini_file): - return INIMetadataProvider(ini_file) +def make_plugin(ini_file, key_attribute=""): + return INIMetadataProvider(ini_file, key_attribute) diff --git a/src/s2repoze/plugins/sp.py b/src/s2repoze/plugins/sp.py index c4357dcc..f0c2acb8 100644 --- a/src/s2repoze/plugins/sp.py +++ b/src/s2repoze/plugins/sp.py @@ -23,6 +23,7 @@ import urlparse import urllib import cgi import os +import time from paste.httpheaders import CONTENT_LENGTH from paste.httpheaders import CONTENT_TYPE @@ -48,6 +49,7 @@ from saml2.attribute_resolver import AttributeResolver from saml2.metadata import MetaData from saml2.saml import NAMEID_FORMAT_TRANSIENT from saml2.config import Config +from saml2.cache import Cache def construct_came_from(environ): """ The URL that the user used when the process where interupted @@ -65,8 +67,8 @@ class SAML2Plugin(FormPluginBase): implements(IChallenger, IIdentifier, IAuthenticator, IMetadataProvider) - def __init__(self, rememberer_name, saml_conf_file, store, - path_logout, path_toskip, debug): + def __init__(self, rememberer_name, saml_conf_file, virtual_organization, + cache, path_logout, path_toskip, debug): self.rememberer_name = rememberer_name self.path_logout = path_logout @@ -75,7 +77,17 @@ class SAML2Plugin(FormPluginBase): self.conf = Config() self.conf.load_file(saml_conf_file) - + self.sp = self.conf["service"]["sp"] + if virtual_organization: + self.vo = virtual_organization + try: + self.vo_conf = self.conf[ + "virtual_organization"][virtual_organization] + except KeyError: + self.vo = None + else: + self.vo = None + try: self.metadata = self.conf["metadata"] except KeyError: @@ -83,10 +95,10 @@ class SAML2Plugin(FormPluginBase): self.outstanding_authn = {} self.iam = os.uname()[1] - if store==u"file": - self.store = shelve.open(store_filename) - elif store==u"mem": - self.store = {} + if cache: + self.cache = Cache(cache) + else: + self.cache = Cache() #### IChallenger #### def challenge(self, environ, status, app_headers, forget_headers): @@ -104,11 +116,20 @@ class SAML2Plugin(FormPluginBase): came_from = construct_came_from(environ) if self.debug: logger and logger.info("RelayState >> %s" % came_from) + + try: + vo = environ["myapp.vo"] + except KeyError: + vo = self.vo + logger and logger.info("VO: %s" % vo) + # If more than one idp, I have to do wayf (sid, result) = cl.authenticate(self.conf["entityid"], - self.conf["idp_url"], - self.conf["service_url"], - self.conf["my_name"], - relay_state=came_from, log=logger) + self.conf["idp"]["url"][0], + self.sp["url"], + self.sp["my_name"], + relay_state=came_from, + log=logger, + vo=vo) self.outstanding_authn[sid] = came_from if self.debug: @@ -116,7 +137,7 @@ class SAML2Plugin(FormPluginBase): if isinstance(result, tuple): return HTTPTemporaryRedirect(headers=[result]) else : - # possible to normally not used + # possible though normally not used body = "\n".join(result) def auth_form(environ, start_response): content_length = CONTENT_LENGTH.tuples(str(len(result))) @@ -133,8 +154,8 @@ class SAML2Plugin(FormPluginBase): uri = environ.get('REQUEST_URI',construct_url(environ)) if self.debug: - logger and logger.info("environ.keys(): %s" % environ.keys()) - logger and logger.info("Environment: %s" % environ) + #logger and logger.info("environ.keys(): %s" % environ.keys()) + #logger and logger.info("Environment: %s" % environ) logger and logger.info('identify uri: %s' % (uri,)) query = parse_dict_querystring(environ) @@ -166,6 +187,13 @@ class SAML2Plugin(FormPluginBase): post_env = environ.copy() post_env['QUERY_STRING'] = '' + + if environ["CONTENT_LENGTH"]: + body = environ["wsgi.input"].read(int(environ["CONTENT_LENGTH"])) + from StringIO import StringIO + environ['wsgi.input'] = StringIO(body) + environ['s2repoze.body'] = body + post = cgi.FieldStorage( fp=environ['wsgi.input'], environ=post_env, @@ -173,18 +201,26 @@ class SAML2Plugin(FormPluginBase): ) if self.debug: - logger and logger.info('identify post keys: %s' % (post.keys(),)) + logger and logger.info('identify post: %s' % (post,)) + try: + if not post.has_key("SAMLResponse"): + environ["post.fieldstorage"] = post + return {} + except TypeError: + environ["post.fieldstorage"] = post + return {} + # check for SAML2 authN cl = Saml2Client(environ, self.conf) try: - (ava, came_from) = cl.response(post, + (ava, came_from, issuer, not_on_or_after) = cl.response(post, self.conf["entityid"], self.outstanding_authn, logger) name_id = ava["__userid"] del ava["__userid"] - self.store[name_id] = ava + self.cache.set( name_id, issuer, ava, not_on_or_after) if self.debug: logger and logger.info("stored %s with key %s" % (ava, name_id)) except TypeError: @@ -205,62 +241,94 @@ class SAML2Plugin(FormPluginBase): identity["password"] = "" identity['repoze.who.userid'] = name_id identity["user"] = ava + #identity["issuer"] = issuer if self.debug: logger and logger.info("Identity: %s" % identity) return identity # IMetadataProvider def add_metadata(self, environ, identity): + subject_id = identity['repoze.who.userid'] + if self.debug: logger = environ.get('repoze.who.logger','') - logger and logger.info( - "add_metadata for %s" % identity['repoze.who.userid']) + if logger: + logger.info( + "add_metadata for %s" % subject_id) + logger.info( + "Known subjects: %s" % self.cache.subjects()) + try: + logger.info( + "Issuers: %s" % self.cache.issuers(subject_id)) + except KeyError: + pass + + if "user" not in identity: + identity["user"] = {} try: - ava = self.store[identity['repoze.who.userid']] + (ava, old) = self.cache.get_all(subject_id) + now = time.gmtime() if self.debug: logger and logger.info("Adding %s" % ava) identity["user"].update(ava) - self.store[identity['repoze.who.userid']] = identity except KeyError: pass if "pysaml2_vo_expanded" not in identity: # is this a Virtual Organization situation - if "virtual_organization" in self.conf: + if self.vo: logger and logger.info("** Do VO aggregation **") - try: - subject_id = identity["user"][ - self.conf["common_identifier"]][0] - except KeyError: - return - logger and logger.info("SubjectID: %s" % subject_id) - ar = AttributeResolver(environ, self.metadata, - self.conf["xmlsec_binary"], - self.conf["key_file"], - self.conf["cert_file"]) + #try: + # This ought to be caseignore + #subject_id = identity["user"][ + # self.vo_conf["common_identifier"]][0] + #except KeyError: + # logger and logger.error("** No common identifier **") + # return + logger and logger.info( + "SubjectID: %s, VO:%s" % (subject_id, self.vo)) + vo_members = [ - member for member in self.metadata.vo_members( - self.conf["virtual_organization"])\ - if member != self.conf["md_idp"]] + member for member in self.metadata.vo_members(self.vo)\ + if member not in self.conf["idp"]["entity_id"]] + logger and logger.info("VO members: %s" % vo_members) + vo_members = [m for m in vo_members \ + if not self.cache.active(subject_id, m)] + logger and logger.info( + "VO members (not cached): %s" % vo_members) if vo_members: + ar = AttributeResolver(environ, self.metadata, self.conf) + + if "name_id_format" in self.vo_conf: + name_id_format = self.vo_conf["name_id_format"] + sp_name_qualifier="" + else: + sp_name_qualifier=self.vo + name_id_format = "" + extra = ar.extend(subject_id, self.conf["entityid"], vo_members, - self.conf["nameid_format"], + name_id_format=name_id_format, + sp_name_qualifier=sp_name_qualifier, log=logger) - for attr,val in extra.items(): - try: - # might lead to duplicates ! - identity["user"][attr].extend(val) - except KeyError: - identity["user"][attr] = val + for issuer, tup in extra.items(): + (not_on_or_after, resp) = tup + self.cache.set(subject_id, issuer, resp, + not_on_or_after) + logger.info( + ">Issuers: %s" % self.cache.issuers(subject_id)) + logger.info( + "AVA: %s" % (self.cache.get_all(subject_id),)) + identity["user"] = self.cache.get_all(subject_id)[0] # Only do this once identity["pysaml2_vo_expanded"] = 1 - self.store[identity['repoze.who.userid']] = identity + #self.store[identity['repoze.who.userid']] = ( + # not_on_or_after, identity) # @return # used 2 times : one to get the ticket, the other to validate it @@ -277,7 +345,9 @@ class SAML2Plugin(FormPluginBase): def make_plugin(rememberer_name=None, # plugin for remember - store= "mem", # store for remember + cache= "", # cache + # Which virtual organization to support + virtual_organization="", path_logout='', # regex url to logout path_toskip='', # regex url to skip saml_conf="", @@ -294,7 +364,8 @@ def make_plugin(rememberer_name=None, # plugin for remember path_logout = path_logout.lstrip().split('\n'); path_toskip = path_toskip.lstrip().splitlines() - plugin = SAML2Plugin(rememberer_name, saml_conf, store, + plugin = SAML2Plugin(rememberer_name, saml_conf, + virtual_organization, cache, path_logout, path_toskip, debug) return plugin diff --git a/src/saml2/attribute_resolver.py b/src/saml2/attribute_resolver.py index 3ce58a1c..4d44fa42 100644 --- a/src/saml2/attribute_resolver.py +++ b/src/saml2/attribute_resolver.py @@ -33,16 +33,16 @@ DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT class AttributeResolver(object): - def __init__(self, environ, metadata=None, xmlsec_binary=None, - key_file=None, cert_file=None): + def __init__(self, environ, metadata=None, config=None, saml2client=None): self.metadata = metadata - self.saml2client = Saml2Client(environ, metadata=metadata, - xmlsec_binary=xmlsec_binary, - key_file=key_file, - cert_file=cert_file) + + if saml2client: + self.saml2client = saml2client + else: + self.saml2client = Saml2Client(environ, config) - def extend(self, subject_id, issuer, vo_members, nameid_format, - log=None): + def extend(self, subject_id, issuer, vo_members, name_id_format=None, + sp_name_qualifier=None, log=None): """ :param subject_id: The identifier by which the subject is know among all the participents of the VO @@ -61,17 +61,15 @@ class AttributeResolver(object): for attr_serv in ass.attribute_service: log and log.info("Send attribute request to %s" % \ attr_serv.location) - resp = self.saml2client.attribute_query(subject_id, + (resp, issuer, + not_on_or_after) = self.saml2client.attribute_query( + subject_id, issuer, attr_serv.location, - format=nameid_format, log=log) + sp_name_qualifier=sp_name_qualifier, + format=name_id_format, log=log) if resp: # unnecessary del resp["__userid"] - for attr,val in resp.items(): - try: - extended_identity[attr].extend(val) - except KeyError: - extended_identity[attr] = val - + extended_identity[issuer] = (not_on_or_after, resp) return extended_identity
\ No newline at end of file diff --git a/src/saml2/client.py b/src/saml2/client.py index 9f9b8876..c743a38a 100644 --- a/src/saml2/client.py +++ b/src/saml2/client.py @@ -46,6 +46,10 @@ LAX = True class Saml2Client: def __init__(self, environ, config=None): + """ + :param environ: + :param config: A saml2.config.Config instance + """ self.environ = environ if config: self.config = config @@ -82,47 +86,7 @@ class Saml2Client: def scoping_from_metadata(self, entityid, location): name = metadata.name(entityid) return make_instance(self.scoping([self.idp_entry(name, location)])) - - def create_authn_request(self, query_id, destination, service_url, - spentityid, my_name, sp_name_qualifier=None, - scoping=None): - """ Creates an Authenication Request - - :param query_id: Query identifier - :param destination: Where to send the request - :param service_url: The page to where the response MUST be sent. - :param spentityid: My official name - :param my_name: Who I am - :param sp_name_qualifier: The domain in which the name should be - valid - :param scoping: For which IdPs this query are aimed. - - :return: An authentication request - """ - - authn_request = self._init_request(samlp.AuthnRequest(query_id), - destination) - - authn_request.assertion_consumer_service_url = service_url - authn_request.protocol_binding = saml2.BINDING_HTTP_POST - authn_request.provider_name = my_name - if scoping: - authn_request.scoping = scoping - - name_id_policy = samlp.NameIDPolicy() - name_id_policy.allow_create = 'true' - if sp_name_qualifier: - name_id_policy.format = saml.NAMEID_FORMAT_PERSISTENT - name_id_policy.sp_name_qualifier = sp_name_qualifier - else: - name_id_policy.format = saml.NAMEID_FORMAT_TRANSIENT - - - authn_request.name_id_policy = name_id_policy - authn_request.issuer = saml.Issuer(text=spentityid) - - return authn_request - + def response(self, post, requestor, outstanding, log=None): """ Deal with the AuthnResponse @@ -141,19 +105,54 @@ class Saml2Client: if post.has_key("SAMLResponse"): saml_response = post['SAMLResponse'].value if saml_response: - (identity, came_from) = self.verify_response( + (identity, came_from, + not_on_or_after, response_issuer) = self.verify_response( saml_response, requestor, outstanding, log, context="AuthNReq") #relay_state = post["RelayState"].value - return (identity, came_from) + return (identity, came_from, response_issuer, not_on_or_after) else: return None + def authn_request(self, query_id, destination, service_url, spentityid, + my_name, vo="", scoping=None): + + res = { + "id": query_id, + "version": VERSION, + "issue_instant": instant(), + "destination": destination, + "assertion_consumer_service_url": service_url, + "protocol_binding": saml2.BINDING_HTTP_POST, + "provider_name": my_name, + } + + if scoping: + res["scoping"] = scoping + + name_id_policy = { + "allow_create": "true" + } + + name_id_policy["format"] = saml.NAMEID_FORMAT_TRANSIENT + if vo: + try: + #if vo in self.config["virtual_organization"]: + name_id_policy["sp_name_qualifier"] = vo + name_id_policy["format"] = saml.NAMEID_FORMAT_PERSISTENT + except KeyError: + pass + + res["name_id_policy"] = name_id_policy + res["issuer"] = { "text": spentityid } + + return make_instance(samlp.AuthnRequest, res) + def authenticate(self, spentityid, location="", service_url="", my_name="", relay_state="", binding=saml2.BINDING_HTTP_REDIRECT, log=None, - scoping=None): + vo="", scoping=None): """ Either verifies an authentication Response or if none is present send an authentication request. @@ -161,15 +160,16 @@ class Saml2Client: :param binding: How the authentication request should be sent to the IdP :param location: Where the IdP is. - :param service_url: The service URL + :param service_url: The SP's service URL :param my_name: The providers name :param relay_state: To where the user should be returned after successfull log in. :param binding: Which binding to use for sending the request :param log: Where to write log messages + :param vo: The entity_id of the virtual organization I'm a member of :param scoping: For which IdPs this query are aimed. - :return: AuthnRequest reponse + :return: AuthnRequest response """ if log: @@ -178,8 +178,8 @@ class Saml2Client: log.info("service_url: %s" % service_url) log.info("my_name: %s" % my_name) session_id = sid() - authen_req = "%s" % self.create_authn_request(session_id, location, - service_url, spentityid, my_name, scoping) + authen_req = "%s" % self.authn_request(session_id, location, + service_url, spentityid, my_name, vo, scoping) log and log.info("AuthNReq: %s" % authen_req) if binding == saml2.BINDING_HTTP_POST: @@ -237,29 +237,32 @@ class Saml2Client: response = correctly_signed_response(decoded_xml, self.config["xmlsec_binary"], log=log) if not response: - log and log.error("Response was not correctly signed") - print "Response was not correctly signed" + if log: + log.error("Response was not correctly signed") + log.info(decoded_xml) return ({}, "") else: log and log.error("Response was correctly signed or nor signed") log and log.info("response: %s" % (response,)) try: - (ava, name_id, came_from) = self.do_response(response, + (ava, name_id, came_from, not_on_or_after) = self.do_response( + response, requestor, outstanding=outstanding, xmlstr=xmlstr, log=log, context=context) + issuer = response.issuer.text except AttributeError, exc: log and log.error("AttributeError: %s" % (exc,)) - return ({}, "") + return ({}, "", 0, "") except Exception, exc: log and log.error("Exception: %s" % (exc,)) - return ({}, "") + return ({}, "", 0, "") # should return userid and attribute value assertions ava["__userid"] = name_id - return (ava, came_from) + return (ava, came_from, not_on_or_after, issuer) def _verify_condition(self, assertion, requestor, log): # The Identity Provider MUST include a <saml:Conditions> element @@ -283,7 +286,9 @@ class Saml2Client: if not for_me(condition, requestor): if not LAX: raise Exception("Not for me!!!") - + + return not_on_or_after + def _websso(self, assertion, outstanding, requestor, log): # the assertion MUST contain one AuthNStatement assert len(assertion.authn_statement) == 1 @@ -305,7 +310,7 @@ class Saml2Client: #print "Conditions",assertion.conditions assert assertion.conditions log and log.info("verify_condition") - self._verify_condition(assertion, requestor, log) + not_on_or_after = self._verify_condition(assertion, requestor, log) # The assertion can contain zero or one attributeStatements assert len(assertion.attribute_statement) <= 1 @@ -334,7 +339,7 @@ class Saml2Client: assert subject.name_id name_id = subject.name_id.text.strip() - return (ava, name_id, came_from) + return (ava, name_id, came_from, not_on_or_after) def _encrypted_assertion(self, xmlstr, outstanding, requestor, log=None, context=""): @@ -388,7 +393,7 @@ class Saml2Client: else: log and log.info("Session id I don't recall using") raise Exception("Session id I don't recall using") - + # MUST contain *one* assertion try: assert len(response.assertion) == 1 or \ @@ -509,13 +514,15 @@ class Saml2Client: log and log.info("SOAP request sent and got response: %s" % response) if response: log and log.info("Verifying response") - (identity, came_from) = self.verify_response(response, + (identity, came_from, not_on_or_after, + response_issuer) = self.verify_response( + response, issuer, outstanding={session_id:""}, log=log, decode=False, context="AttrReq") log and log.info("identity: %s" % identity) - return identity + return (identity, response_issuer, not_on_or_after) else: log and log.info("No response") return None @@ -609,51 +616,4 @@ def _print_statements(states): def print_response(resp): print _print_statement(resp) print resp.to_string() - - -def d_init_request(id, destination): - return { - "id": id, - "version": VERSION, - "issue_instant": instant(), - "destination": destination, - } - -def d_authn_request(query_id, destination, service_url, - spentityid, my_name, sp_name_qualifier=None, - scoping=None): - """ Creates an Authenication Request - - :param query_id: Query identifier - :param destination: Where to send the request - :param service_url: The page to where the response MUST be sent. - :param spentityid: My official name - :param my_name: Who I am - :param sp_name_qualifier: The domain in which the name should be - valid - :param scoping: For which IdPs this query are aimed. - - :return: An authentication request - """ - - authn_request = d_init_request(query_id, destination) - authn_request["assertion_consumer_service_url"] = service_url - authn_request["protocol_binding"] = saml2.BINDING_HTTP_POST - authn_request["provider_name"] = my_name - if scoping: - authn_request["scoping"] = scoping - - name_id_policy = { - "allow_create": 'true' - } - if sp_name_qualifier: - name_id_policy["format"] = saml.NAMEID_FORMAT_PERSISTENT - name_id_policy["sp_name_qualifier"] = sp_name_qualifier - else: - name_id_policy["format"] = saml.NAMEID_FORMAT_TRANSIENT - - - authn_request["name_id_policy"] = name_id_policy - authn_request["issuer"] = spentityid - - return make_instance(samlp.AuthnRequest,authn_request) +
\ No newline at end of file diff --git a/src/saml2/config.py b/src/saml2/config.py index c36c9675..e7ef54ec 100644 --- a/src/saml2/config.py +++ b/src/saml2/config.py @@ -4,26 +4,39 @@ from saml2 import metadata +def entity_id2url(md, entity_id): + try: + # grab the first one + return md.single_sign_on_services(entity_id)[0] + except Exception: + print "idp_entity_id",entity_id + print ("idps in metadata", + [e for e,d in md.entity.items() if "idp_sso" in d]) + print "metadata entities", md.entity.keys() + for ent, dic in md.entity.items(): + print ent, dic.keys() + return None + class Config(dict): def sp_check(self, config): + assert "idp" in config + if "metadata" in config: md = config["metadata"] - if "idp_entity_id" in config: - try: - config["idp_url"] = md.single_sign_on_services( - config["idp_entity_id"])[0] - except Exception: - print "idp_entity_id",config["idp_entity_id"] - print ("idps in metadata", - [e for e,d in md.entity.items() if "idp_sso" in d]) - print "metadata entities", md.entity.keys() - for ent, dic in md.entity.items(): - print ent, dic.keys() - raise + if "entity_id" in config["idp"]: + if not "url" in config["idp"]: + config["idp"]["url"] = [] + urls = config["idp"]["url"] + for eid in config["idp"]["entity_id"]: + url = entity_id2url(md, eid) + if url: + if url not in urls: + urls.append(url) - assert config["idp_url"] - + assert "sp" in config["service"] + assert "url" in config["service"]["sp"] + def idp_check(self, config): pass @@ -50,7 +63,6 @@ class Config(dict): assert "xmlsec_binary" in config assert "service" in config assert "entityid" in config - assert "service_url" in config if "key_file" in config: # If you have a key file you have to have a cert file diff --git a/src/saml2/samlp.py b/src/saml2/samlp.py index 44d8bfba..9a8365d3 100644 --- a/src/saml2/samlp.py +++ b/src/saml2/samlp.py @@ -897,9 +897,8 @@ class AuthnRequest(AbstractRequest): c_children['{%s}Scoping' % NAMESPACE] = ('scoping', Scoping) c_child_order = AbstractRequest.c_child_order[:] - c_child_order.extend(['issuer', 'signature', 'extensions', 'subject', - 'name_id_policy', 'conditions', 'requested_authn_context', - 'scoping']) + c_child_order.extend(['subject', 'name_id_policy', 'conditions', + 'requested_authn_context', 'scoping']) def __init__(self, id=None, version=None, issue_instant=None, destination=None, consent=None, issuer=None, signature=None, diff --git a/src/saml2/server.py b/src/saml2/server.py index 7bd51640..26c7bda1 100644 --- a/src/saml2/server.py +++ b/src/saml2/server.py @@ -18,6 +18,8 @@ """Contains classes and functions that a SAML2.0 Identity provider (IdP) or attribute authority (AA) may use to conclude its tasks. """ +import shelve + from saml2 import saml, samlp, VERSION from saml2.utils import sid, decode_base64_and_inflate, make_instance from saml2.time_util import instant, in_a_while @@ -60,95 +62,219 @@ def klassdict(klass, text=None, **kwargs): spec[key] = val return spec -class Server(object): - def __init__(self, config_file, log=None): - if config_file: - self.conf = Config() - self.conf.load_file(config_file) - self.metadata = self.conf["metadata"] - self.log = log - - def issuer(self): - return klassdict( saml.Issuer, self.conf["entityid"], - format=saml.NAMEID_FORMAT_ENTITY) - - def status_from_exception(self, exception): - return klassdict(samlp.Status, +def kd_status_from_exception(exception): + return klassdict(samlp.Status, + status_code=klassdict(samlp.StatusCode, + value=samlp.STATUS_RESPONDER, status_code=klassdict(samlp.StatusCode, - value=samlp.STATUS_RESPONDER, - status_code=klassdict(samlp.StatusCode, - value=EXCEPTION2STATUS[exception.__class__]) - ), - status_message=exception.args[0], - ) - - def name_id(self, text="", **kwargs): - return klassdict(saml.NameID, text, **kwargs) + value=EXCEPTION2STATUS[exception.__class__]) + ), + status_message=exception.args[0], + ) + +def kd_name_id(text="", **kwargs): + return klassdict(saml.NameID, text, **kwargs) - def status_message(self, text="", **kwargs): - return klassdict(samlp.StatusMessage, text, **kwargs) +def kd_status_message(text="", **kwargs): + return klassdict(samlp.StatusMessage, text, **kwargs) - def status_code(self, text="", **kwargs): - return klassdict(samlp.StatusCode, text, **kwargs) +def kd_status_code(text="", **kwargs): + return klassdict(samlp.StatusCode, text, **kwargs) - def status(self, text="", **kwargs): - return klassdict(samlp.Status, text, **kwargs) - - def success_status(self): - return self.status(status_code=self.status_code( - value=samlp.STATUS_SUCCESS)) - - def audience(self, text="", **kwargs): - return klassdict(saml.Audience, text, **kwargs) +def kd_status(text="", **kwargs): + return klassdict(samlp.Status, text, **kwargs) + +def kd_success_status(): + return kd_status(status_code=kd_status_code(value=samlp.STATUS_SUCCESS)) + +def kd_audience(text="", **kwargs): + return klassdict(saml.Audience, text, **kwargs) + +def kd_audience_restriction(text="", **kwargs): + return klassdict(saml.AudienceRestriction, text, **kwargs) - def audience_restriction(self, text="", **kwargs): - return klassdict(saml.AudienceRestriction, text, **kwargs) +def kd_conditions(text="", **kwargs): + return klassdict(saml.Conditions, text, **kwargs) + +def kd_attribute(text="", **kwargs): + return klassdict(saml.Attribute, text, **kwargs) - def conditions(self, text="", **kwargs): - return klassdict(saml.Conditions, text, **kwargs) +def kd_attribute_value(text="", **kwargs): + return klassdict(saml.AttributeValue, text, **kwargs) - def attribute(self, text="", **kwargs): - return klassdict(saml.Attribute, text, **kwargs) +def kd_attribute_statement(text="", **kwargs): + return klassdict(saml.AttributeStatement, text, **kwargs) - def attribute_value(self, text="", **kwargs): - return klassdict(saml.AttributeValue, text, **kwargs) - - def attribute_statement(self, text="", **kwargs): - return klassdict(saml.AttributeStatement, text, **kwargs) +def kd_subject_confirmation_data(text="", **kwargs): + return klassdict(saml.SubjectConfirmationData, text, **kwargs) - def subject_confirmation_data(self, text="", **kwargs): - return klassdict(saml.SubjectConfirmationData, text, **kwargs) - - def subject_confirmation(self, text="", **kwargs): - return klassdict(saml.SubjectConfirmation, text, **kwargs) +def kd_subject_confirmation(text="", **kwargs): + return klassdict(saml.SubjectConfirmation, text, **kwargs) + +def kd_subject(text="", **kwargs): + return klassdict(saml.Subject, text, **kwargs) + +def kd_authn_statement(text="", **kwargs): + return klassdict(saml.Subject, text, **kwargs) + +def kd_assertion(text="", **kwargs): + kwargs.update({ + "version": VERSION, + "id" : sid(), + "issue_instant" : instant(), + }) + return klassdict(saml.Assertion, text, **kwargs) + +def kd_response(signature=False, encrypt=False, **kwargs): + + kwargs.update({ + "id" : sid(), + "version": VERSION, + "issue_instant" : instant(), + }) + if signature: + kwargs["signature"] = sigver.pre_signature_part(kwargs["id"]) + + return kwargs + +def do_attribute_statement(identity): + """ + :param identity: A dictionary with fiendly names as keys + :return: + """ + attrs = [] + for key, val in identity.items(): + dic = {} + if isinstance(val,basestring): + attrval = kd_attribute_value(val) + elif isinstance(val,list): + attrval = [kd_attribute_value(v) for v in val] + else: + raise OtherError("strange value type on: %s" % val) + dic["attribute_value"] = attrval + if isinstance(key, basestring): + dic["name"] = key + elif isinstance(key, tuple): # 3-tuple + (name,format,friendly) = key + if name: + dic["name"] = name + if format: + dic["name_format"] = format + if friendly: + dic["friendly_name"] = friendly + attrs.append(kd_attribute(**dic)) + + return kd_attribute_statement(attribute=attrs) + +def kd_issuer(text, **kwargs): + return klassdict(saml.Issuer, text, **kwargs) + +def do_aa_response(consumer_url, in_response_to, + sp_entity_id, identity, name_id_policies=None, + name_id=None, ip_address="", issuer=None ): + + attr_statement = do_attribute_statement(identity) + + # start using now and for a hour + conds = kd_conditions( + not_before=instant(), + # an hour from now + not_on_or_after=in_a_while(hours=1), + audience_restriction=kd_audience_restriction( + audience=kd_audience(sp_entity_id))) + + # temporary identifier or ?? + if not name_id: + name_id = kd_name_id(sid(), format=saml.NAMEID_FORMAT_TRANSIENT) - def subject(self, text="", **kwargs): - return klassdict(saml.Subject, text, **kwargs) + tmp = kd_response( + issuer=issuer, + in_response_to=in_response_to, + destination=consumer_url, + status=kd_success_status(), + assertion=kd_assertion( + subject = kd_subject( + name_id=name_id, + method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER, + subject_confirmation=kd_subject_confirmation( + subject_confirmation_data=kd_subject_confirmation_data( + in_response_to=in_response_to, + not_on_or_after=in_a_while(hours=1), + address=ip_address, + recipient=consumer_url))), + attribute_statement = attr_statement, + authn_statement= kd_authn_statement( + authn_instant=instant(), + session_index=sid()), + conditions=conds, + ), + ) + + return make_instance(samlp.Response, tmp) + +class Server(object): + def __init__(self, config_file="", config=None, log=None, debug=0): + if config_file: + self.conf = Config() + self.conf.load_file(config_file) + self.metadata = self.conf["metadata"] + if "subject_data" in self.conf: + self.id_map = shelve.open(self.conf["subject_data"], + writeback=True) + else: + self.id_map = None + elif config: + self.conf = config + self.metadata = self.conf["metadata"] - def authn_statement(self, text="", **kwargs): - return klassdict(saml.Subject, text, **kwargs) + self.log = log + self.debug = debug - def assertion(self, text="", **kwargs): - kwargs.update({ - "version": VERSION, - "id" : sid(), - "issue_instant" : instant(), - }) - return klassdict(saml.Assertion, text, **kwargs) + def issuer(self): + return kd_issuer( self.conf["entityid"], + format=saml.NAMEID_FORMAT_ENTITY) - def response(self, signature=False, encrypt=False, **kwargs): + def persistent_id(self, entity_id, subject_id): + """ + :param entity_id: SP entity ID or VO entity ID + :param subject_id: The local identifier of the subject + :return: A arbitrary identifier for the subject unique to the + entity_id + """ + if self.debug: + self.log and self.log.debug("Id map keys: %s" % self.id_map.keys()) + + try: + map = self.id_map[entity_id] + except KeyError: + map = self.id_map[entity_id] = {"forward":{}, "backward":{}} - kwargs.update({ - "id" : sid(), - "version": VERSION, - "issue_instant" : instant(), - }) - if signature: - kwargs["signature"] = sigver.pre_signature_part(kwargs["id"]) + try: + if self.debug: + self.log.debug("map forward keys: %s" % map["forward"].keys()) + return map["forward"][subject_id] + except KeyError: + while True: + temp_id = sid() + if temp_id not in map["backward"]: + break + map["forward"][subject_id] = temp_id + map["backward"][temp_id] = subject_id + self.id_map[entity_id]= map + self.id_map.sync() + + return temp_id - return kwargs - def parse_authn_request(self, enc_request): + """Parse a Authentication Request + + :param enc_request: The request in its transport format + :return: A tuple of + consumer_url - as gotten from the SPs entity_id and the metadata + id - the id of the request + name_id_policy - how to chose the subjects identifier + spentityid - the entity id of the SP + """ request_xml = decode_base64_and_inflate(enc_request) request = samlp.authn_request_from_string(request_xml) @@ -175,92 +301,77 @@ class Server(object): return_destination)) print "%s != %s" % (consumer_url, return_destination) raise OtherError("ConsumerURL and return destination mismatch") - - policy = request.name_id_policy - if policy.allow_create.lower() == "true" and \ - policy.format == saml.NAMEID_FORMAT_TRANSIENT: - name_id_policies = policy.format - - return (consumer_url, id, name_id_policies, spentityid) + + self.log and self.log.info("AuthNRequest: %s" % request) + return (consumer_url, id, request.name_id_policy, spentityid) def allowed_issuer(self, issuer): + """ """ return True def parse_attribute_query(self, xml_string): query = samlp.attribute_query_from_string(xml_string) assert query.version == VERSION - assert query.destination == self.conf["service_url"] + self.log and self.log.info( + "%s ?= %s" % (query.destination,self.conf["service"]["aa"]["url"])) + assert query.destination == self.conf["service"]["aa"]["url"] self.allowed_issuer(query.issuer) # verify signature - return (subject, attribute) + subject = query.subject.name_id.text + if query.attribute: + attribute = query.attribute + else: + attribute = None + return (subject, attribute, query) def find_subject(self, subject, attribute=None): pass - - def do_attribute_statement(self, identity): - """ - :param identity: A dictionary with fiendly names as keys - :return: - """ - attrs = [] - for key, val in identity.items(): - dic = {} - if isinstance(val,basestring): - attrval = self.attribute_value(val) - elif isinstance(val,list): - attrval = [self.attribute_value(v) for v in val] - else: - raise OtherError("strange value type on: %s" % val) - dic["attribute_value"] = attrval - if isinstance(key, basestring): - dic["name"] = key - elif isinstance(key, tuple): # 3-tuple - (name,format,friendly) = key - if name: - dic["name"] = name - if format: - dic["name_format"] = format - if friendly: - dic["friendly_name"] = friendly - attrs.append(self.attribute(**dic)) - - return self.attribute_statement(attribute=attrs) - + def do_sso_response(self, consumer_url, in_response_to, - sp_entity_id, identity, name_id_policies=None, - subject_id=None ): + sp_entity_id, identity, name_id=None ): - attribute_statement = self.do_attribute_statement(identity) + attr_statement = do_attribute_statement(identity) # start using now and for a hour - conditions = self.conditions( + conds = kd_conditions( not_before=instant(), # an hour from now not_on_or_after=in_a_while(0,0,0,0,0,1), - audience_restriction=self.audience_restriction( - audience=self.audience(sp_entity_id))) + audience_restriction=kd_audience_restriction( + audience=kd_audience(sp_entity_id))) # temporary identifier or ?? - subject_id = sid() - tmp = self.response( + if not name_id: + name_id = kd_name_id(sid(), format=saml.NAMEID_FORMAT_TRANSIENT) + + tmp = kd_response( + issuer=self.issuer(), in_response_to=in_response_to, destination=consumer_url, - status=self.success_status(), - assertion=self.assertion( - subject = self.subject( - name_id=self.name_id(subject_id, - format=saml.NAMEID_FORMAT_TRANSIENT), - method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER, - subject_confirmation=self.subject_confirmation( - subject_confirmation_data=self.subject_confirmation_data( - in_response_to=in_response_to))), - attribute_statement = attribute_statement, - authn_statement= self.authn_statement( + status=kd_success_status(), + assertion=kd_assertion( + attribute_statement = attr_statement, + authn_statement= kd_authn_statement( authn_instant=instant(), session_index=sid()), - conditions=conditions, + conditions=conds, + subject=kd_subject( + name_id=name_id, + method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER, + subject_confirmation=kd_subject_confirmation( + subject_confirmation_data=kd_subject_confirmation_data( + in_response_to=in_response_to))), ), ) return make_instance(samlp.Response, tmp) + + def do_aa_response(self, consumer_url, in_response_to, + sp_entity_id, identity, name_id_policies=None, + subject_id=None, ip_address=""): + + return do_aa_response(consumer_url, in_response_to, + sp_entity_id, identity, name_id_policies, + subject_id, ip_address, self.issuer()) + diff --git a/src/saml2/soap.py b/src/saml2/soap.py index 97b3bb25..4ac99890 100644 --- a/src/saml2/soap.py +++ b/src/saml2/soap.py @@ -36,9 +36,13 @@ NAMESPACE = "http://schemas.xmlsoap.org/soap/envelope/" def parse_soap_enveloped_saml_response(text): expected_tag = '{%s}Response' % SAMLP_NAMESPACE - return parse_soap_enveloped_saml_thing(text, expected_tag) - -def parse_soap_enveloped_saml_thing(text, expected_tag): + return parse_soap_enveloped_saml_thingy(text, expected_tag) + +def parse_soap_enveloped_saml_attribute_query(text): + expected_tag = '{%s}AttributeQuery' % SAMLP_NAMESPACE + return parse_soap_enveloped_saml_thingy(text, expected_tag) + +def parse_soap_enveloped_saml_thingy(text, expected_tag): """Parses a SOAP enveloped SAML thing and returns the thing as a string. @@ -59,7 +63,7 @@ def parse_soap_enveloped_saml_thing(text, expected_tag): else: return "" -def make_soap_enveloped_saml_thingy(self, thingy): +def make_soap_enveloped_saml_thingy(thingy): """ Returns a soap envelope containing a SAML request as a text string. @@ -86,7 +90,9 @@ class _Http(object): self.server.add_certificate(keyfile, certfile, "") def write(self, data): - (response, content) = self.server.request(self.path, "POST", data) + (response, content) = self.server.request(self.path, + "POST", data, + headers={"content-type": "application/soap+xml"}) if response.status == 200: return content else: @@ -98,7 +104,7 @@ class SOAPClient(object): self.server = _Http(server_url, keyfile, certfile) def send(self, request): - soap_message = make_soap_enveloped_saml_request(request) + soap_message = make_soap_enveloped_saml_thingy(request) response = self.server.write(soap_message) if response: return parse_soap_enveloped_saml_response(response) diff --git a/src/saml2/time_util.py b/src/saml2/time_util.py index 64d89125..53576760 100644 --- a/src/saml2/time_util.py +++ b/src/saml2/time_util.py @@ -162,7 +162,7 @@ def add_duration(tid, duration): # --------------------------------------------------------------------------- -def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0, +def time_in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0): """ format of timedelta: @@ -173,7 +173,17 @@ def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0, t = timedelta(*[days,seconds,microseconds,milliseconds,minutes, hours,weeks]) soon = now + t - return soon.strftime(TIME_FORMAT) + return soon + +def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0, + minutes=0, hours=0, weeks=0): + """ + format of timedelta: + timedelta([days[, seconds[, microseconds[, milliseconds[, + minutes[, hours[, weeks]]]]]]]) + """ + return time_in_a_while(days, seconds, microseconds, milliseconds, + minutes, hours, weeks).strftime(TIME_FORMAT) # --------------------------------------------------------------------------- diff --git a/tests/metadata.xml b/tests/metadata.xml index 1766c0a3..2d2c4b0e 100644 --- a/tests/metadata.xml +++ b/tests/metadata.xml @@ -1,5 +1,5 @@ <?xml version='1.0' encoding='UTF-8'?> -<ns0:EntitiesDescriptor name="urn:mace:umu.se:saml:test" validUntil="2009-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV +<ns0:EntitiesDescriptor name="urn:mace:example.com:saml:test" validUntil="2009-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l @@ -15,7 +15,7 @@ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN +vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI= -</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.umu.se/</ns0:OrganizationURL><ns0:OrganizationName>Umea University</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@adm.umu.se</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV +</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName>Example Co</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l @@ -31,4 +31,4 @@ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN +vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI= -</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.umu.se/</ns0:OrganizationURL><ns0:OrganizationName>Umea University</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@adm.umu.se</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor> +</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName>Example Co</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor> diff --git a/tests/server.config b/tests/server.config index 7ba28cd7..e2905b89 100644 --- a/tests/server.config +++ b/tests/server.config @@ -1,14 +1,26 @@ { - "entityid" : "urn:mace:umu.se:saml:roland:sp", - "my_name" : "urn:mace:umu.se:saml:roland:sp", - "service_url" : "http://lingon.catalogix.se:8087/", - "service": ["sp"], + "entityid" : "urn:mace:example.com:saml:roland:sp", + "service": { + "sp":{ + "my_name" : "urn:mace:example.com:saml:roland:sp", + "url": "http://lingon.catalogix.se:8087/", + } + }, "debug" : 1, "my_key" : "./mykey.pem", "my_cert" : "./mycert.pem", "xmlsec_binary" : "/opt/local/bin/xmlsec1", "metadata": { - "local": ["/Users/rolandh/code/pysaml2/tests/metadata.xml"], + "local": ["./tests/metadata.xml", "./tests/vo_metadata.xml"], + }, + "idp":{ + "entity_id": ["urn:mace:example.com:saml:roland:idp"], + }, + "virtual_organization" : { + "urn:mace:example.com:it:tek":{ + "nameid_format" : "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID", + "common_identifier": "umuselin", + } }, - "idp_entity_id": "urn:mace:umu.se:saml:roland:idp", + "subject_data": "subject_data.db" }
\ No newline at end of file diff --git a/tests/test_client.py b/tests/test_client.py index d838b1e4..138141f0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- from saml2.client import Saml2Client -from saml2 import samlp, client +from saml2 import samlp, client, BINDING_HTTP_POST from saml2 import saml, utils, config XML_RESPONSE_FILE = "tests/saml_signed.xml" @@ -190,3 +190,45 @@ class TestClient: idp_entry = scope.idp_list.idp_entry[0] assert idp_entry.name == "UmeƄ Universitet" assert idp_entry.loc == "https://idp.umu.se/" + + def test_create_auth_request_0(self): + ar = self.client.authn_request("1", + "http://www.example.com/sso", + "http://www.example.org/service", + "urn:mace:example.org:saml:sp", + "My Name") + + print ar + assert ar.assertion_consumer_service_url == "http://www.example.org/service" + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "My Name" + assert ar.issuer.text == "urn:mace:example.org:saml:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "true" + assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT + + def test_create_auth_request_vo(self): + assert self.client.config["virtual_organization"].keys() == [ + "urn:mace:example.com:it:tek"] + + ar = self.client.authn_request("1", + "http://www.example.com/sso", + "http://www.example.org/service", + "urn:mace:example.org:saml:sp", + "My Name", + vo="urn:mace:example.com:it:tek") + + print ar + assert ar.assertion_consumer_service_url == "http://www.example.org/service" + assert ar.destination == "http://www.example.com/sso" + assert ar.protocol_binding == BINDING_HTTP_POST + assert ar.version == "2.0" + assert ar.provider_name == "My Name" + assert ar.issuer.text == "urn:mace:example.org:saml:sp" + nid_policy = ar.name_id_policy + assert nid_policy.allow_create == "true" + assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT + assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek" + diff --git a/tests/test_config.py b/tests/test_config.py index 2391a297..f3ab82cd 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -4,26 +4,30 @@ from saml2.config import Config -c1 = { - "service": ["sp"], +sp1 = { "entityid" : "urn:mace:umu.se:saml:roland:sp", -# "my_name" : "urn:mace:umu.se:saml:roland:sp", - "service_url" : "http://lingon.catalogix.se:8087/", -# "debug" : 1, + "service": { + "sp": { + "url" : "http://lingon.catalogix.se:8087/", + "name": "test", + } + }, "key_file" : "tests/mykey.pem", "cert_file" : "tests/mycert.pem", "xmlsec_binary" : "/opt/local/bin/xmlsec1", "metadata": { "local": ["tests/metadata.xml", "tests/urn-mace-swami.se-swamid-test-1.0-metadata.xml"], -# "remote":{ -# "edugain":{ -# "url": "https://www.example.com/?id=edugain&set=saml2", -# "cert": "./edugain.pem", -# } -# } + # "remote":{ + # "edugain":{ + # "url": "https://www.example.com/?id=edugain&set=saml2", + # "cert": "./edugain.pem", + # } + # } + }, + "idp" : { + "entity_id": ["urn:mace:umu.se:saml:roland:idp"], }, - "idp_entity_id": "urn:mace:umu.se:saml:roland:idp", "virtual_organization" : { "http://vo.example.org/biomed":{ "nameid_format" : "urn:oid:2.16.756.1.2.5.1.1.1-NameID", @@ -34,4 +38,4 @@ c1 = { def test_1(): c = Config() - c.load(c1)
\ No newline at end of file + c.load(sp1) diff --git a/tests/test_server.py b/tests/test_server.py index ad3d9e7b..dd041df9 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -2,9 +2,11 @@ # -*- coding: utf-8 -*- from saml2.server import Server, OtherError, UnknownPricipal +from saml2 import server from saml2 import samlp, saml, client, utils from saml2.utils import make_instance from py.test import raises +import shelve SUCCESS_STATUS = """<?xml version=\'1.0\' encoding=\'UTF-8\'?> <ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>""" @@ -15,67 +17,193 @@ ERROR_STATUS = """<?xml version='1.0' encoding='UTF-8'?> def _eq(l1,l2): return set(l1) == set(l2) -class TestServer(): - def setup_class(self): - self.server = Server("tests/server.config") +def test_status_success(): + stat = server.kd_status( + status_code=server.kd_status_code( + value=samlp.STATUS_SUCCESS)) + status = make_instance( samlp.Status, stat) + status_text = "%s" % status + assert status_text == SUCCESS_STATUS + assert status.status_code.value == samlp.STATUS_SUCCESS + +def test_success_status(): + stat = server.kd_success_status() + status = make_instance(samlp.Status, stat) + status_text = "%s" % status + assert status_text == SUCCESS_STATUS + assert status.status_code.value == samlp.STATUS_SUCCESS - def test_status_success(self): - stat = self.server.status( - status_code=self.server.status_code( - value=samlp.STATUS_SUCCESS)) - status = make_instance( samlp.Status, stat) - status_text = "%s" % status - assert status_text == SUCCESS_STATUS - assert status.status_code.value == samlp.STATUS_SUCCESS +def test_error_status(): + stat = server.kd_status( + status_message=server.kd_status_message( + "Error resolving principal"), + status_code=server.kd_status_code( + value=samlp.STATUS_RESPONDER, + status_code=server.kd_status_code( + value=samlp.STATUS_UNKNOWN_PRINCIPAL))) - def test_success_status(self): - stat = self.server.success_status() - status = make_instance(samlp.Status, stat) - status_text = "%s" % status - assert status_text == SUCCESS_STATUS - assert status.status_code.value == samlp.STATUS_SUCCESS + status_text = "%s" % make_instance( samlp.Status, stat ) + print status_text + assert status_text == ERROR_STATUS + +def test_status_from_exception(): + e = UnknownPricipal("Error resolving principal") + stat = server.kd_status_from_exception(e) + status_text = "%s" % make_instance( samlp.Status, stat ) - def test_error_status(self): - stat = self.server.status( - status_message=self.server.status_message( - "Error resolving principal"), - status_code=self.server.status_code( - value=samlp.STATUS_RESPONDER, - status_code=self.server.status_code( - value=samlp.STATUS_UNKNOWN_PRINCIPAL))) + assert status_text == ERROR_STATUS + +def test_attribute_statement(): + astat = server.do_attribute_statement({"surName":"Jeter", + "givenName":"Derek"}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) == 2 + attr0 = statement.attribute[0] + assert _eq(attr0.keyswv(), ["name","attribute_value"]) + assert len(attr0.attribute_value) == 1 + attr1 = statement.attribute[1] + assert _eq(attr1.keyswv(), ["name","attribute_value"]) + assert len(attr1.attribute_value) == 1 + if attr0.name == "givenName": + assert attr0.attribute_value[0].text == "Derek" + assert attr1.name == "surName" + assert attr1.attribute_value[0].text == "Jeter" + else: + assert attr0.name == "surName" + assert attr0.attribute_value[0].text == "Jeter" + assert attr1.name == "givenName" + assert attr1.attribute_value[0].text == "Derek" + +def test_audience(): + aud_restr = make_instance( saml.AudienceRestriction, + server.kd_audience_restriction( + audience=server.kd_audience("urn:foo:bar"))) - status_text = "%s" % make_instance( samlp.Status, stat ) - print status_text - assert status_text == ERROR_STATUS + assert aud_restr.keyswv() == ["audience"] + assert aud_restr.audience.text == "urn:foo:bar" + +def test_conditions(): + conds_dict = server.kd_conditions( + not_before="2009-10-30T07:58:10.852Z", + not_on_or_after="2009-10-30T08:03:10.852Z", + audience_restriction=server.kd_audience_restriction( + audience=server.kd_audience("urn:foo:bar"))) + + conditions = make_instance(saml.Conditions, conds_dict) + assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after", + "audience_restriction"]) + assert conditions.not_before == "2009-10-30T07:58:10.852Z" + assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z" + assert conditions.audience_restriction[0].audience.text == "urn:foo:bar" + +def test_value_1(): + #FriendlyName="givenName" Name="urn:oid:2.5.4.42" + # NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri" + adict = server.kd_attribute(name="urn:oid:2.5.4.42", + name_format=saml.NAME_FORMAT_URI) + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["name","name_format"]) + assert attribute.name == "urn:oid:2.5.4.42" + assert attribute.name_format == saml.NAME_FORMAT_URI - def test_status_from_exception(self): - e = UnknownPricipal("Error resolving principal") - stat = self.server.status_from_exception(e) - status_text = "%s" % make_instance( samlp.Status, stat ) - - assert status_text == ERROR_STATUS - - def test_attribute_statement(self): - astat = self.server.do_attribute_statement({"surName":"Jeter", - "givenName":"Derek"}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) == 2 - attr0 = statement.attribute[0] - assert _eq(attr0.keyswv(), ["name","attribute_value"]) - assert len(attr0.attribute_value) == 1 - attr1 = statement.attribute[1] - assert _eq(attr1.keyswv(), ["name","attribute_value"]) +def test_value_2(): + adict = server.kd_attribute(name="urn:oid:2.5.4.42", + name_format=saml.NAME_FORMAT_URI, + friendly_name="givenName") + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["name","name_format","friendly_name"]) + assert attribute.name == "urn:oid:2.5.4.42" + assert attribute.name_format == saml.NAME_FORMAT_URI + assert attribute.friendly_name == "givenName" + +def test_value_3(): + adict = server.kd_attribute(attribute_value="Derek", + name="urn:oid:2.5.4.42", + name_format=saml.NAME_FORMAT_URI, + friendly_name="givenName") + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["name", "name_format", + "friendly_name", "attribute_value"]) + assert attribute.name == "urn:oid:2.5.4.42" + assert attribute.name_format == saml.NAME_FORMAT_URI + assert attribute.friendly_name == "givenName" + assert len(attribute.attribute_value) == 1 + assert attribute.attribute_value[0].text == "Derek" + +def test_value_4(): + adict = server.kd_attribute(attribute_value="Derek", + friendly_name="givenName") + attribute = make_instance(saml.Attribute, adict) + assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"]) + assert attribute.friendly_name == "givenName" + assert len(attribute.attribute_value) == 1 + assert attribute.attribute_value[0].text == "Derek" + +def test_do_attribute_statement_0(): + astat = server.do_attribute_statement({"vo_attr":"foobar"}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) == 1 + attr0 = statement.attribute[0] + assert _eq(attr0.keyswv(), ["name","attribute_value"]) + assert attr0.name == "vo_attr" + assert len(attr0.attribute_value) == 1 + assert attr0.attribute_value[0].text == "foobar" + +def test_do_attribute_statement(): + astat = server.do_attribute_statement({"surName":"Jeter", + "givenName":["Derek","Sanderson"]}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) == 2 + attr0 = statement.attribute[0] + assert _eq(attr0.keyswv(), ["name","attribute_value"]) + attr1 = statement.attribute[1] + assert _eq(attr1.keyswv(), ["name","attribute_value"]) + if attr0.name == "givenName": + assert len(attr0.attribute_value) == 2 + assert _eq([av.text for av in attr0.attribute_value], + ["Derek","Sanderson"]) + assert attr1.name == "surName" + assert attr1.attribute_value[0].text == "Jeter" assert len(attr1.attribute_value) == 1 - if attr0.name == "givenName": - assert attr0.attribute_value[0].text == "Derek" - assert attr1.name == "surName" - assert attr1.attribute_value[0].text == "Jeter" - else: - assert attr0.name == "surName" - assert attr0.attribute_value[0].text == "Jeter" - assert attr1.name == "givenName" - assert attr1.attribute_value[0].text == "Derek" + else: + assert attr0.name == "surName" + assert attr0.attribute_value[0].text == "Jeter" + assert len(attr0.attribute_value) == 1 + assert attr1.name == "givenName" + assert len(attr1.attribute_value) == 2 + assert _eq([av.text for av in attr1.attribute_value], + ["Derek","Sanderson"]) + +def test_do_attribute_statement_multi(): + astat = server.do_attribute_statement( + {("urn:oid:1.3.6.1.4.1.5923.1.1.1.7", + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", + "eduPersonEntitlement"):"Jeter"}) + statement = make_instance(saml.AttributeStatement,astat) + assert statement.keyswv() == ["attribute"] + assert len(statement.attribute) + assert _eq(statement.attribute[0].keyswv(), + ["name","name_format","friendly_name","attribute_value"]) + attribute = statement.attribute[0] + assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7" + assert attribute.name_format == ( + "urn:oasis:names:tc:SAML:2.0:attrname-format:uri") + assert attribute.friendly_name == "eduPersonEntitlement" + +def test_subject(): + adict = server.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT) + subject = make_instance(saml.Subject, adict) + assert _eq(subject.keyswv(),["text", "name_id"]) + assert subject.text == "_aaa" + assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT + + +class TestServer(): + def setup_class(self): + self.server = Server("tests/server.config") def test_issuer(self): issuer = make_instance( saml.Issuer, self.server.issuer()) @@ -84,130 +212,16 @@ class TestServer(): assert issuer.format == saml.NAMEID_FORMAT_ENTITY assert issuer.text == self.server.conf["entityid"] - def test_audience(self): - aud_restr = make_instance( saml.AudienceRestriction, - self.server.audience_restriction( - audience=self.server.audience("urn:foo:bar"))) - - assert aud_restr.keyswv() == ["audience"] - assert aud_restr.audience.text == "urn:foo:bar" - - def test_conditions(self): - conds_dict = self.server.conditions( - not_before="2009-10-30T07:58:10.852Z", - not_on_or_after="2009-10-30T08:03:10.852Z", - audience_restriction=self.server.audience_restriction( - audience=self.server.audience("urn:foo:bar"))) - - conditions = make_instance(saml.Conditions, conds_dict) - assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after", - "audience_restriction"]) - assert conditions.not_before == "2009-10-30T07:58:10.852Z" - assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z" - assert conditions.audience_restriction[0].audience.text == "urn:foo:bar" - - def test_value_1(self): - #FriendlyName="givenName" Name="urn:oid:2.5.4.42" - # NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri" - adict = self.server.attribute(name="urn:oid:2.5.4.42", - name_format=saml.NAME_FORMAT_URI) - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["name","name_format"]) - assert attribute.name == "urn:oid:2.5.4.42" - assert attribute.name_format == saml.NAME_FORMAT_URI - - def test_value_2(self): - adict = self.server.attribute(name="urn:oid:2.5.4.42", - name_format=saml.NAME_FORMAT_URI, - friendly_name="givenName") - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["name","name_format","friendly_name"]) - assert attribute.name == "urn:oid:2.5.4.42" - assert attribute.name_format == saml.NAME_FORMAT_URI - assert attribute.friendly_name == "givenName" - def test_value_3(self): - adict = self.server.attribute(attribute_value="Derek", - name="urn:oid:2.5.4.42", - name_format=saml.NAME_FORMAT_URI, - friendly_name="givenName") - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["name", "name_format", - "friendly_name", "attribute_value"]) - assert attribute.name == "urn:oid:2.5.4.42" - assert attribute.name_format == saml.NAME_FORMAT_URI - assert attribute.friendly_name == "givenName" - assert len(attribute.attribute_value) == 1 - assert attribute.attribute_value[0].text == "Derek" - - def test_value_4(self): - adict = self.server.attribute(attribute_value="Derek", - friendly_name="givenName") - attribute = make_instance(saml.Attribute, adict) - assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"]) - assert attribute.friendly_name == "givenName" - assert len(attribute.attribute_value) == 1 - assert attribute.attribute_value[0].text == "Derek" - - def test_do_attribute_statement(self): - astat = self.server.do_attribute_statement({"surName":"Jeter", - "givenName":["Derek","Sanderson"]}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) == 2 - attr0 = statement.attribute[0] - assert _eq(attr0.keyswv(), ["name","attribute_value"]) - attr1 = statement.attribute[1] - assert _eq(attr1.keyswv(), ["name","attribute_value"]) - if attr0.name == "givenName": - assert len(attr0.attribute_value) == 2 - assert _eq([av.text for av in attr0.attribute_value], - ["Derek","Sanderson"]) - assert attr1.name == "surName" - assert attr1.attribute_value[0].text == "Jeter" - assert len(attr1.attribute_value) == 1 - else: - assert attr0.name == "surName" - assert attr0.attribute_value[0].text == "Jeter" - assert len(attr0.attribute_value) == 1 - assert attr1.name == "givenName" - assert len(attr1.attribute_value) == 2 - assert _eq([av.text for av in attr1.attribute_value], - ["Derek","Sanderson"]) - - def test_do_attribute_statement_multi(self): - astat = self.server.do_attribute_statement( - {("urn:oid:1.3.6.1.4.1.5923.1.1.1.7", - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", - "eduPersonEntitlement"):"Jeter"}) - statement = make_instance(saml.AttributeStatement,astat) - assert statement.keyswv() == ["attribute"] - assert len(statement.attribute) - assert _eq(statement.attribute[0].keyswv(), - ["name","name_format","friendly_name","attribute_value"]) - attribute = statement.attribute[0] - assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7" - assert attribute.name_format == ( - "urn:oasis:names:tc:SAML:2.0:attrname-format:uri") - assert attribute.friendly_name == "eduPersonEntitlement" - - def test_subject(self): - adict = self.server.subject("_aaa", - name_id=saml.NAMEID_FORMAT_TRANSIENT) - subject = make_instance(saml.Subject, adict) - assert _eq(subject.keyswv(),["text", "name_id"]) - assert subject.text == "_aaa" - assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT - def test_assertion(self): - tmp = self.server.assertion( - subject= self.server.subject("_aaa", + tmp = server.kd_assertion( + subject= server.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT), - attribute_statement = self.server.attribute_statement( + attribute_statement = server.kd_attribute_statement( attribute=[ - self.server.attribute(attribute_value="Derek", + server.kd_attribute(attribute_value="Derek", friendly_name="givenName"), - self.server.attribute(attribute_value="Jeter", + server.kd_attribute(attribute_value="Jeter", friendly_name="surName"), ]), issuer=self.server.issuer(), @@ -217,7 +231,7 @@ class TestServer(): assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id', 'subject', 'issue_instant', 'version']) assert assertion.version == "2.0" - assert assertion.issuer.text == "urn:mace:umu.se:saml:roland:sp" + assert assertion.issuer.text == "urn:mace:example.com:saml:roland:sp" # assert len(assertion.attribute_statement) == 1 attribute_statement = assertion.attribute_statement[0] @@ -240,17 +254,17 @@ class TestServer(): assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT def test_response(self): - tmp = self.server.response( + tmp = server.kd_response( in_response_to="_012345", destination="https:#www.example.com", - status=self.server.success_status(), - assertion=self.server.assertion( - subject = self.server.subject("_aaa", + status=server.kd_success_status(), + assertion=server.kd_assertion( + subject = server.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT), - attribute_statement = self.server.attribute_statement([ - self.server.attribute(attribute_value="Derek", + attribute_statement = server.kd_attribute_statement([ + server.kd_attribute(attribute_value="Derek", friendly_name="givenName"), - self.server.attribute(attribute_value="Jeter", + server.kd_attribute(attribute_value="Jeter", friendly_name="surName"), ]), issuer=self.server.issuer(), @@ -264,7 +278,7 @@ class TestServer(): 'in_response_to', 'issue_instant', 'version', 'issuer', 'id']) assert response.version == "2.0" - assert response.issuer.text == "urn:mace:umu.se:saml:roland:sp" + assert response.issuer.text == "urn:mace:example.com:saml:roland:sp" assert response.destination == "https:#www.example.com" assert response.in_response_to == "_012345" # @@ -273,11 +287,12 @@ class TestServer(): assert status.status_code.value == samlp.STATUS_SUCCESS def test_parse_faulty_request(self): - authn_request = client.d_authn_request( + sc = client.Saml2Client({},None) + authn_request = sc.authn_request( query_id = "1", destination = "http://www.example.com", service_url = "http://www.example.org", - spentityid = "urn:mace:umu.se:saml:roland:sp", + spentityid = "urn:mace:example.com:saml:roland:sp", my_name = "My real name", ) @@ -286,11 +301,12 @@ class TestServer(): raises(OtherError,self.server.parse_authn_request,intermed) def test_parse_faulty_request_to_err_status(self): - authn_request = client.d_authn_request( + sc = client.Saml2Client({},None) + authn_request = sc.authn_request( query_id = "1", destination = "http://www.example.com", service_url = "http://www.example.org", - spentityid = "urn:mace:umu.se:saml:roland:sp", + spentityid = "urn:mace:example.com:saml:roland:sp", my_name = "My real name", ) @@ -301,7 +317,7 @@ class TestServer(): except OtherError, oe: print oe.args status = utils.make_instance(samlp.Status, - self.server.status_from_exception(oe)) + server.kd_status_from_exception(oe)) assert status print status @@ -314,28 +330,31 @@ class TestServer(): assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL def test_parse_ok_request(self): - authn_request = client.d_authn_request( + sc = client.Saml2Client({},None) + authn_request = sc.authn_request( query_id = "1", destination = "http://www.example.com", service_url = "http://localhost:8087/", - spentityid = "urn:mace:umu.se:saml:roland:sp", + spentityid = "urn:mace:example.com:saml:roland:sp", my_name = "My real name", ) + print authn_request intermed = utils.deflate_and_base64_encode("%s" % authn_request) - (consumer_url, id, name_id_policies, + (consumer_url, id, name_id_policy, sp) = self.server.parse_authn_request(intermed) assert consumer_url == "http://localhost:8087/" assert id == "1" - assert name_id_policies == saml.NAMEID_FORMAT_TRANSIENT - assert sp == "urn:mace:umu.se:saml:roland:sp" + assert _eq(name_id_policy.keyswv(), ["format", "allow_create"]) + assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT + assert sp == "urn:mace:example.com:saml:roland:sp" def test_sso_response(self): resp = self.server.do_sso_response( "http://localhost:8087/", # consumer_url "12", # in_response_to - "urn:mace:umu.se:saml:roland:sp", # sp_entity_id + "urn:mace:example.com:saml:roland:sp", # sp_entity_id {("urn:oid:1.3.6.1.4.1.5923.1.1.1.7", "urn:oasis:names:tc:SAML:2.0:attrname-format:uri", "eduPersonEntitlement"):"Jeter"} @@ -344,7 +363,7 @@ class TestServer(): print resp.keyswv() assert _eq(resp.keyswv(),['status', 'destination', 'assertion', 'in_response_to', 'issue_instant', - 'version', 'id']) + 'version', 'id', 'issuer']) assert resp.destination == "http://localhost:8087/" assert resp.in_response_to == "12" assert resp.status @@ -363,3 +382,12 @@ class TestServer(): print confirmation.subject_confirmation_data assert confirmation.subject_confirmation_data.in_response_to == "12" + def test_persistence_0(self): + pid1 = self.server.persistent_id( + "urn:mace:example.com:saml:roland:sp", "jeter") + + pid2 = self.server.persistent_id( + "urn:mace:example.com:saml:roland:sp", "jeter") + + print pid1, pid2 + assert pid1 == pid2 diff --git a/tools/make_metadata.py b/tools/make_metadata.py index 1d79b19e..a20169d2 100755 --- a/tools/make_metadata.py +++ b/tools/make_metadata.py @@ -1,15 +1,68 @@ #!/usr/bin/env python import os from saml2 import utils, md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT +from saml2 import BINDING_SOAP from saml2.time_util import in_a_while +def do_sp_sso_descriptor(sp, cert): + return { + "protocol_support_enumeration": samlp.NAMESPACE, + "want_assertions_signed": True, + "authn_requests_signed": False, + "assertion_consumer_service": { + "binding": BINDING_HTTP_POST , + "location": sp["url"], + "index": 0, + }, + "key_descriptor":{ + "key_info": { + "x509_data": { + "x509_certificate": cert + } + } + }, + } + +def do_idp_sso_descriptor(idp, cert): + return { + "protocol_support_enumeration": samlp.NAMESPACE, + "want_authn_requests_signed": True, + "single_sign_on_service": { + "binding": BINDING_HTTP_REDIRECT , + "location": idp["url"], + }, + "key_descriptor":{ + "key_info": { + "x509_data": { + "x509_certificate": cert + } + } + }, + } + +def do_aa_descriptor(aa, cert): + return { + "protocol_support_enumeration": samlp.NAMESPACE, + "attribute_service": { + "binding": BINDING_SOAP , + "location": aa["url"], + }, + "key_descriptor":{ + "key_info": { + "x509_data": { + "x509_certificate": cert + } + } + }, + } + def entity_descriptor(confd): mycert = "".join(open(confd["cert_file"]).readlines()[1:-1]) ed = { "name": "http://%s/saml/test" % os.uname()[1], - "valid_until": in_a_while(days=30), + "valid_until": in_a_while(hours=96), "entity_id": confd["entityid"], } @@ -33,46 +86,21 @@ def entity_descriptor(confd): if "sp" in confd["service"]: # The SP - ed["sp_sso_descriptor"] = { - "protocol_support_enumeration": samlp.NAMESPACE, - "want_assertions_signed": True, - "authn_requests_signed": False, - "assertion_consumer_service": { - "binding": BINDING_HTTP_POST , - "location": confd["service_url"], - "index": 0, - }, - "key_descriptor":{ - "key_info": { - "x509_data": { - "x509_certificate": mycert - } - } - }, - } - elif "idp" in confd["service"]: - ed["idp_sso_descriptor"] = { - "protocol_support_enumeration": samlp.NAMESPACE, - "want_authn_requests_signed": True, - "single_sign_on_service": { - "binding": BINDING_HTTP_REDIRECT , - "location": confd["service_url"], - }, - "key_descriptor":{ - "key_info": { - "x509_data": { - "x509_certificate": mycert - } - } - }, - } + ed["sp_sso_descriptor"] = do_sp_sso_descriptor(confd["service"]["sp"], + mycert) + if "idp" in confd["service"]: + ed["idp_sso_descriptor"] = do_idp_sso_descriptor( + confd["service"]["idp"],mycert) + if "aa" in confd["service"]: + ed["attribute_authority_descriptor"] = do_aa_descriptor( + confd["service"]["aa"],mycert) return ed def entities_descriptor(eds): return utils.make_instance(md.EntitiesDescriptor,{ "name": "urn:mace:umu.se:saml:test", - "valid_until": in_a_while(30), + "valid_until": in_a_while(hours=96), "entity_descriptor": eds}) if __name__ == "__main__": |