summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2009-11-17 10:43:42 +0100
committerRoland Hedberg <roland.hedberg@adm.umu.se>2009-11-17 10:43:42 +0100
commit8b3be05ddf980186b7cc52f1da2bc7331c30d586 (patch)
tree42593cb161a35c4e3b6ad1a3557ec4522220ef82
parentb9a48efbfa61f258900ece2a7dcec6e9ea432ce4 (diff)
downloadpysaml2-8b3be05ddf980186b7cc52f1da2bc7331c30d586.tar.gz
Lots and lots of changes, sorry should be separated but that won't happen
-rwxr-xr-xexample/idp/idp.app.py8
-rw-r--r--example/idp/idp.conf2
-rw-r--r--src/s2repoze/plugins/challenge_decider.py53
-rw-r--r--src/s2repoze/plugins/ini.py35
-rw-r--r--src/s2repoze/plugins/sp.py161
-rw-r--r--src/saml2/attribute_resolver.py30
-rw-r--r--src/saml2/client.py176
-rw-r--r--src/saml2/config.py42
-rw-r--r--src/saml2/samlp.py5
-rw-r--r--src/saml2/server.py375
-rw-r--r--src/saml2/soap.py18
-rw-r--r--src/saml2/time_util.py14
-rw-r--r--tests/metadata.xml6
-rw-r--r--tests/server.config24
-rw-r--r--tests/test_client.py44
-rw-r--r--tests/test_config.py30
-rw-r--r--tests/test_server.py418
-rwxr-xr-xtools/make_metadata.py98
18 files changed, 935 insertions, 604 deletions
diff --git a/example/idp/idp.app.py b/example/idp/idp.app.py
index ed026a9b..5c6acab6 100755
--- a/example/idp/idp.app.py
+++ b/example/idp/idp.app.py
@@ -2,9 +2,9 @@
import re
import base64
-from cgi import escape
+from cgi import escape, parse_qs
import urllib
-import urlparse
+#import urlparse
from saml2 import server
from saml2.utils import make_instance, sid, decode_base64_and_inflate
@@ -76,7 +76,7 @@ def sso(environ, start_response, user, logger):
logger and logger.info("Environ keys: %s" % environ.keys())
if "QUERY_STRING" in environ:
logger and logger.info("Query string: %s" % environ["QUERY_STRING"])
- query = urlparse.parse_qs(environ["QUERY_STRING"])
+ query = parse_qs(environ["QUERY_STRING"])
elif "s2repoze.qinfo" in environ:
query = environ["s2repoze.qinfo"]
# base 64 encoded request
@@ -125,7 +125,7 @@ def not_found(environ, start_response, logger):
def not_authn(environ, start_response, logger):
if "QUERY_STRING" in environ:
- query = urlparse.parse_qs(environ["QUERY_STRING"])
+ query = parse_qs(environ["QUERY_STRING"])
logger and logger.info("query: %s" % query)
start_response('401 Unauthorized', [('Content-Type', 'text/plain')])
return ['Unknown user']
diff --git a/example/idp/idp.conf b/example/idp/idp.conf
index c7c0d55c..04aaa90d 100644
--- a/example/idp/idp.conf
+++ b/example/idp/idp.conf
@@ -1,5 +1,5 @@
{
- "entityid" : "urn:mace:umu.se:saml:roland:idp",
+ "entityid" : "urn:mace:example.com:saml:roland:idp",
"service": ["idp"],
"my_name" : "Rolands IdP",
"debug" : 1,
diff --git a/src/s2repoze/plugins/challenge_decider.py b/src/s2repoze/plugins/challenge_decider.py
index 6656ec9e..0ec64aa7 100644
--- a/src/s2repoze/plugins/challenge_decider.py
+++ b/src/s2repoze/plugins/challenge_decider.py
@@ -1,7 +1,60 @@
from paste.request import construct_url
+import zope.interface
+from repoze.who.interfaces import IRequestClassifier
+from repoze.who.interfaces import IChallengeDecider
+
+from paste.httpheaders import REQUEST_METHOD
+from paste.httpheaders import CONTENT_TYPE
+from paste.httpheaders import USER_AGENT
+from paste.httpheaders import WWW_AUTHENTICATE
import re
+_DAV_METHODS = (
+ 'OPTIONS',
+ 'PROPFIND',
+ 'PROPPATCH',
+ 'MKCOL',
+ 'LOCK',
+ 'UNLOCK',
+ 'TRACE',
+ 'DELETE',
+ 'COPY',
+ 'MOVE'
+ )
+
+_DAV_USERAGENTS = (
+ 'Microsoft Data Access Internet Publishing Provider',
+ 'WebDrive',
+ 'Zope External Editor',
+ 'WebDAVFS',
+ 'Goliath',
+ 'neon',
+ 'davlib',
+ 'wsAPI',
+ 'Microsoft-WebDAV'
+ )
+
+def my_request_classifier(environ):
+ """ Returns one of the classifiers 'dav', 'xmlpost', or 'browser',
+ depending on the imperative logic below"""
+ request_method = REQUEST_METHOD(environ)
+ if request_method in _DAV_METHODS:
+ return 'dav'
+ useragent = USER_AGENT(environ)
+ if useragent:
+ for agent in _DAV_USERAGENTS:
+ if useragent.find(agent) != -1:
+ return 'dav'
+ if request_method == 'POST':
+ if CONTENT_TYPE(environ) == 'text/xml':
+ return 'xmlpost'
+ elif CONTENT_TYPE(environ) == "application/soap+xml":
+ return 'soap'
+ return 'browser'
+
+zope.interface.directlyProvides(my_request_classifier, IRequestClassifier)
+
class my_challenge_decider:
def __init__(self,path_login=""):
self.path_login = path_login
diff --git a/src/s2repoze/plugins/ini.py b/src/s2repoze/plugins/ini.py
index 35f314f4..d3e4eb77 100644
--- a/src/s2repoze/plugins/ini.py
+++ b/src/s2repoze/plugins/ini.py
@@ -2,39 +2,36 @@ import ConfigParser, os
from zope.interface import implements
-from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator
+#from repoze.who.interfaces import IChallenger, IIdentifier, IAuthenticator
from repoze.who.interfaces import IMetadataProvider
class INIMetadataProvider(object):
implements(IMetadataProvider)
- def __init__(self, ini_file):
+ def __init__(self, ini_file, key_attribute):
self.users = ConfigParser.ConfigParser()
self.users.readfp(open(ini_file))
+ self.key_attribute = key_attribute
-# def authenticate(self, environ, identity):
-# try:
-# username = identity['login']
-# password = identity['password']
-# except KeyError:
-# return None
-#
-# success = User.authenticate(username, password)
-#
-# return success
-
def add_metadata(self, environ, identity):
logger = environ.get('repoze.who.logger','')
- username = identity.get('repoze.who.userid')
- logger and logger.info("Identity: %s (before)" % (identity.items(),))
+ key = identity.get('repoze.who.userid')
+ #logger and logger.info("Identity: %s (before)" % (identity.items(),))
try:
- identity["user"] = self.users.items(username)
- logger and logger.info("Identity: %s (after)" % (identity.items(),))
+ if self.key_attribute:
+ for sec in self.users.sections():
+ if self.users.has_option(sec,self.key_attribute):
+ if key in self.users.get(sec, self.key_attribute):
+ identity["user"] = dict(self.users.items(sec))
+ break
+ else:
+ identity["user"] = dict(self.users.items(key))
+ #logger and logger.info("Identity: %s (after)" % (identity.items(),))
except ValueError:
pass
-def make_plugin(ini_file):
- return INIMetadataProvider(ini_file)
+def make_plugin(ini_file, key_attribute=""):
+ return INIMetadataProvider(ini_file, key_attribute)
diff --git a/src/s2repoze/plugins/sp.py b/src/s2repoze/plugins/sp.py
index c4357dcc..f0c2acb8 100644
--- a/src/s2repoze/plugins/sp.py
+++ b/src/s2repoze/plugins/sp.py
@@ -23,6 +23,7 @@ import urlparse
import urllib
import cgi
import os
+import time
from paste.httpheaders import CONTENT_LENGTH
from paste.httpheaders import CONTENT_TYPE
@@ -48,6 +49,7 @@ from saml2.attribute_resolver import AttributeResolver
from saml2.metadata import MetaData
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.config import Config
+from saml2.cache import Cache
def construct_came_from(environ):
""" The URL that the user used when the process where interupted
@@ -65,8 +67,8 @@ class SAML2Plugin(FormPluginBase):
implements(IChallenger, IIdentifier, IAuthenticator, IMetadataProvider)
- def __init__(self, rememberer_name, saml_conf_file, store,
- path_logout, path_toskip, debug):
+ def __init__(self, rememberer_name, saml_conf_file, virtual_organization,
+ cache, path_logout, path_toskip, debug):
self.rememberer_name = rememberer_name
self.path_logout = path_logout
@@ -75,7 +77,17 @@ class SAML2Plugin(FormPluginBase):
self.conf = Config()
self.conf.load_file(saml_conf_file)
-
+ self.sp = self.conf["service"]["sp"]
+ if virtual_organization:
+ self.vo = virtual_organization
+ try:
+ self.vo_conf = self.conf[
+ "virtual_organization"][virtual_organization]
+ except KeyError:
+ self.vo = None
+ else:
+ self.vo = None
+
try:
self.metadata = self.conf["metadata"]
except KeyError:
@@ -83,10 +95,10 @@ class SAML2Plugin(FormPluginBase):
self.outstanding_authn = {}
self.iam = os.uname()[1]
- if store==u"file":
- self.store = shelve.open(store_filename)
- elif store==u"mem":
- self.store = {}
+ if cache:
+ self.cache = Cache(cache)
+ else:
+ self.cache = Cache()
#### IChallenger ####
def challenge(self, environ, status, app_headers, forget_headers):
@@ -104,11 +116,20 @@ class SAML2Plugin(FormPluginBase):
came_from = construct_came_from(environ)
if self.debug:
logger and logger.info("RelayState >> %s" % came_from)
+
+ try:
+ vo = environ["myapp.vo"]
+ except KeyError:
+ vo = self.vo
+ logger and logger.info("VO: %s" % vo)
+ # If more than one idp, I have to do wayf
(sid, result) = cl.authenticate(self.conf["entityid"],
- self.conf["idp_url"],
- self.conf["service_url"],
- self.conf["my_name"],
- relay_state=came_from, log=logger)
+ self.conf["idp"]["url"][0],
+ self.sp["url"],
+ self.sp["my_name"],
+ relay_state=came_from,
+ log=logger,
+ vo=vo)
self.outstanding_authn[sid] = came_from
if self.debug:
@@ -116,7 +137,7 @@ class SAML2Plugin(FormPluginBase):
if isinstance(result, tuple):
return HTTPTemporaryRedirect(headers=[result])
else :
- # possible to normally not used
+ # possible though normally not used
body = "\n".join(result)
def auth_form(environ, start_response):
content_length = CONTENT_LENGTH.tuples(str(len(result)))
@@ -133,8 +154,8 @@ class SAML2Plugin(FormPluginBase):
uri = environ.get('REQUEST_URI',construct_url(environ))
if self.debug:
- logger and logger.info("environ.keys(): %s" % environ.keys())
- logger and logger.info("Environment: %s" % environ)
+ #logger and logger.info("environ.keys(): %s" % environ.keys())
+ #logger and logger.info("Environment: %s" % environ)
logger and logger.info('identify uri: %s' % (uri,))
query = parse_dict_querystring(environ)
@@ -166,6 +187,13 @@ class SAML2Plugin(FormPluginBase):
post_env = environ.copy()
post_env['QUERY_STRING'] = ''
+
+ if environ["CONTENT_LENGTH"]:
+ body = environ["wsgi.input"].read(int(environ["CONTENT_LENGTH"]))
+ from StringIO import StringIO
+ environ['wsgi.input'] = StringIO(body)
+ environ['s2repoze.body'] = body
+
post = cgi.FieldStorage(
fp=environ['wsgi.input'],
environ=post_env,
@@ -173,18 +201,26 @@ class SAML2Plugin(FormPluginBase):
)
if self.debug:
- logger and logger.info('identify post keys: %s' % (post.keys(),))
+ logger and logger.info('identify post: %s' % (post,))
+ try:
+ if not post.has_key("SAMLResponse"):
+ environ["post.fieldstorage"] = post
+ return {}
+ except TypeError:
+ environ["post.fieldstorage"] = post
+ return {}
+
# check for SAML2 authN
cl = Saml2Client(environ, self.conf)
try:
- (ava, came_from) = cl.response(post,
+ (ava, came_from, issuer, not_on_or_after) = cl.response(post,
self.conf["entityid"],
self.outstanding_authn,
logger)
name_id = ava["__userid"]
del ava["__userid"]
- self.store[name_id] = ava
+ self.cache.set( name_id, issuer, ava, not_on_or_after)
if self.debug:
logger and logger.info("stored %s with key %s" % (ava, name_id))
except TypeError:
@@ -205,62 +241,94 @@ class SAML2Plugin(FormPluginBase):
identity["password"] = ""
identity['repoze.who.userid'] = name_id
identity["user"] = ava
+ #identity["issuer"] = issuer
if self.debug:
logger and logger.info("Identity: %s" % identity)
return identity
# IMetadataProvider
def add_metadata(self, environ, identity):
+ subject_id = identity['repoze.who.userid']
+
if self.debug:
logger = environ.get('repoze.who.logger','')
- logger and logger.info(
- "add_metadata for %s" % identity['repoze.who.userid'])
+ if logger:
+ logger.info(
+ "add_metadata for %s" % subject_id)
+ logger.info(
+ "Known subjects: %s" % self.cache.subjects())
+ try:
+ logger.info(
+ "Issuers: %s" % self.cache.issuers(subject_id))
+ except KeyError:
+ pass
+
+ if "user" not in identity:
+ identity["user"] = {}
try:
- ava = self.store[identity['repoze.who.userid']]
+ (ava, old) = self.cache.get_all(subject_id)
+ now = time.gmtime()
if self.debug:
logger and logger.info("Adding %s" % ava)
identity["user"].update(ava)
- self.store[identity['repoze.who.userid']] = identity
except KeyError:
pass
if "pysaml2_vo_expanded" not in identity:
# is this a Virtual Organization situation
- if "virtual_organization" in self.conf:
+ if self.vo:
logger and logger.info("** Do VO aggregation **")
- try:
- subject_id = identity["user"][
- self.conf["common_identifier"]][0]
- except KeyError:
- return
- logger and logger.info("SubjectID: %s" % subject_id)
- ar = AttributeResolver(environ, self.metadata,
- self.conf["xmlsec_binary"],
- self.conf["key_file"],
- self.conf["cert_file"])
+ #try:
+ # This ought to be caseignore
+ #subject_id = identity["user"][
+ # self.vo_conf["common_identifier"]][0]
+ #except KeyError:
+ # logger and logger.error("** No common identifier **")
+ # return
+ logger and logger.info(
+ "SubjectID: %s, VO:%s" % (subject_id, self.vo))
+
vo_members = [
- member for member in self.metadata.vo_members(
- self.conf["virtual_organization"])\
- if member != self.conf["md_idp"]]
+ member for member in self.metadata.vo_members(self.vo)\
+ if member not in self.conf["idp"]["entity_id"]]
+
logger and logger.info("VO members: %s" % vo_members)
+ vo_members = [m for m in vo_members \
+ if not self.cache.active(subject_id, m)]
+ logger and logger.info(
+ "VO members (not cached): %s" % vo_members)
if vo_members:
+ ar = AttributeResolver(environ, self.metadata, self.conf)
+
+ if "name_id_format" in self.vo_conf:
+ name_id_format = self.vo_conf["name_id_format"]
+ sp_name_qualifier=""
+ else:
+ sp_name_qualifier=self.vo
+ name_id_format = ""
+
extra = ar.extend(subject_id,
self.conf["entityid"],
vo_members,
- self.conf["nameid_format"],
+ name_id_format=name_id_format,
+ sp_name_qualifier=sp_name_qualifier,
log=logger)
- for attr,val in extra.items():
- try:
- # might lead to duplicates !
- identity["user"][attr].extend(val)
- except KeyError:
- identity["user"][attr] = val
+ for issuer, tup in extra.items():
+ (not_on_or_after, resp) = tup
+ self.cache.set(subject_id, issuer, resp,
+ not_on_or_after)
+ logger.info(
+ ">Issuers: %s" % self.cache.issuers(subject_id))
+ logger.info(
+ "AVA: %s" % (self.cache.get_all(subject_id),))
+ identity["user"] = self.cache.get_all(subject_id)[0]
# Only do this once
identity["pysaml2_vo_expanded"] = 1
- self.store[identity['repoze.who.userid']] = identity
+ #self.store[identity['repoze.who.userid']] = (
+ # not_on_or_after, identity)
# @return
# used 2 times : one to get the ticket, the other to validate it
@@ -277,7 +345,9 @@ class SAML2Plugin(FormPluginBase):
def make_plugin(rememberer_name=None, # plugin for remember
- store= "mem", # store for remember
+ cache= "", # cache
+ # Which virtual organization to support
+ virtual_organization="",
path_logout='', # regex url to logout
path_toskip='', # regex url to skip
saml_conf="",
@@ -294,7 +364,8 @@ def make_plugin(rememberer_name=None, # plugin for remember
path_logout = path_logout.lstrip().split('\n');
path_toskip = path_toskip.lstrip().splitlines()
- plugin = SAML2Plugin(rememberer_name, saml_conf, store,
+ plugin = SAML2Plugin(rememberer_name, saml_conf,
+ virtual_organization, cache,
path_logout, path_toskip, debug)
return plugin
diff --git a/src/saml2/attribute_resolver.py b/src/saml2/attribute_resolver.py
index 3ce58a1c..4d44fa42 100644
--- a/src/saml2/attribute_resolver.py
+++ b/src/saml2/attribute_resolver.py
@@ -33,16 +33,16 @@ DEFAULT_BINDING = saml2.BINDING_HTTP_REDIRECT
class AttributeResolver(object):
- def __init__(self, environ, metadata=None, xmlsec_binary=None,
- key_file=None, cert_file=None):
+ def __init__(self, environ, metadata=None, config=None, saml2client=None):
self.metadata = metadata
- self.saml2client = Saml2Client(environ, metadata=metadata,
- xmlsec_binary=xmlsec_binary,
- key_file=key_file,
- cert_file=cert_file)
+
+ if saml2client:
+ self.saml2client = saml2client
+ else:
+ self.saml2client = Saml2Client(environ, config)
- def extend(self, subject_id, issuer, vo_members, nameid_format,
- log=None):
+ def extend(self, subject_id, issuer, vo_members, name_id_format=None,
+ sp_name_qualifier=None, log=None):
"""
:param subject_id: The identifier by which the subject is know
among all the participents of the VO
@@ -61,17 +61,15 @@ class AttributeResolver(object):
for attr_serv in ass.attribute_service:
log and log.info("Send attribute request to %s" % \
attr_serv.location)
- resp = self.saml2client.attribute_query(subject_id,
+ (resp, issuer,
+ not_on_or_after) = self.saml2client.attribute_query(
+ subject_id,
issuer,
attr_serv.location,
- format=nameid_format, log=log)
+ sp_name_qualifier=sp_name_qualifier,
+ format=name_id_format, log=log)
if resp:
# unnecessary
del resp["__userid"]
- for attr,val in resp.items():
- try:
- extended_identity[attr].extend(val)
- except KeyError:
- extended_identity[attr] = val
-
+ extended_identity[issuer] = (not_on_or_after, resp)
return extended_identity \ No newline at end of file
diff --git a/src/saml2/client.py b/src/saml2/client.py
index 9f9b8876..c743a38a 100644
--- a/src/saml2/client.py
+++ b/src/saml2/client.py
@@ -46,6 +46,10 @@ LAX = True
class Saml2Client:
def __init__(self, environ, config=None):
+ """
+ :param environ:
+ :param config: A saml2.config.Config instance
+ """
self.environ = environ
if config:
self.config = config
@@ -82,47 +86,7 @@ class Saml2Client:
def scoping_from_metadata(self, entityid, location):
name = metadata.name(entityid)
return make_instance(self.scoping([self.idp_entry(name, location)]))
-
- def create_authn_request(self, query_id, destination, service_url,
- spentityid, my_name, sp_name_qualifier=None,
- scoping=None):
- """ Creates an Authenication Request
-
- :param query_id: Query identifier
- :param destination: Where to send the request
- :param service_url: The page to where the response MUST be sent.
- :param spentityid: My official name
- :param my_name: Who I am
- :param sp_name_qualifier: The domain in which the name should be
- valid
- :param scoping: For which IdPs this query are aimed.
-
- :return: An authentication request
- """
-
- authn_request = self._init_request(samlp.AuthnRequest(query_id),
- destination)
-
- authn_request.assertion_consumer_service_url = service_url
- authn_request.protocol_binding = saml2.BINDING_HTTP_POST
- authn_request.provider_name = my_name
- if scoping:
- authn_request.scoping = scoping
-
- name_id_policy = samlp.NameIDPolicy()
- name_id_policy.allow_create = 'true'
- if sp_name_qualifier:
- name_id_policy.format = saml.NAMEID_FORMAT_PERSISTENT
- name_id_policy.sp_name_qualifier = sp_name_qualifier
- else:
- name_id_policy.format = saml.NAMEID_FORMAT_TRANSIENT
-
-
- authn_request.name_id_policy = name_id_policy
- authn_request.issuer = saml.Issuer(text=spentityid)
-
- return authn_request
-
+
def response(self, post, requestor, outstanding, log=None):
""" Deal with the AuthnResponse
@@ -141,19 +105,54 @@ class Saml2Client:
if post.has_key("SAMLResponse"):
saml_response = post['SAMLResponse'].value
if saml_response:
- (identity, came_from) = self.verify_response(
+ (identity, came_from,
+ not_on_or_after, response_issuer) = self.verify_response(
saml_response, requestor,
outstanding, log,
context="AuthNReq")
#relay_state = post["RelayState"].value
- return (identity, came_from)
+ return (identity, came_from, response_issuer, not_on_or_after)
else:
return None
+ def authn_request(self, query_id, destination, service_url, spentityid,
+ my_name, vo="", scoping=None):
+
+ res = {
+ "id": query_id,
+ "version": VERSION,
+ "issue_instant": instant(),
+ "destination": destination,
+ "assertion_consumer_service_url": service_url,
+ "protocol_binding": saml2.BINDING_HTTP_POST,
+ "provider_name": my_name,
+ }
+
+ if scoping:
+ res["scoping"] = scoping
+
+ name_id_policy = {
+ "allow_create": "true"
+ }
+
+ name_id_policy["format"] = saml.NAMEID_FORMAT_TRANSIENT
+ if vo:
+ try:
+ #if vo in self.config["virtual_organization"]:
+ name_id_policy["sp_name_qualifier"] = vo
+ name_id_policy["format"] = saml.NAMEID_FORMAT_PERSISTENT
+ except KeyError:
+ pass
+
+ res["name_id_policy"] = name_id_policy
+ res["issuer"] = { "text": spentityid }
+
+ return make_instance(samlp.AuthnRequest, res)
+
def authenticate(self, spentityid, location="", service_url="",
my_name="", relay_state="",
binding=saml2.BINDING_HTTP_REDIRECT, log=None,
- scoping=None):
+ vo="", scoping=None):
""" Either verifies an authentication Response or if none is present
send an authentication request.
@@ -161,15 +160,16 @@ class Saml2Client:
:param binding: How the authentication request should be sent to the
IdP
:param location: Where the IdP is.
- :param service_url: The service URL
+ :param service_url: The SP's service URL
:param my_name: The providers name
:param relay_state: To where the user should be returned after
successfull log in.
:param binding: Which binding to use for sending the request
:param log: Where to write log messages
+ :param vo: The entity_id of the virtual organization I'm a member of
:param scoping: For which IdPs this query are aimed.
- :return: AuthnRequest reponse
+ :return: AuthnRequest response
"""
if log:
@@ -178,8 +178,8 @@ class Saml2Client:
log.info("service_url: %s" % service_url)
log.info("my_name: %s" % my_name)
session_id = sid()
- authen_req = "%s" % self.create_authn_request(session_id, location,
- service_url, spentityid, my_name, scoping)
+ authen_req = "%s" % self.authn_request(session_id, location,
+ service_url, spentityid, my_name, vo, scoping)
log and log.info("AuthNReq: %s" % authen_req)
if binding == saml2.BINDING_HTTP_POST:
@@ -237,29 +237,32 @@ class Saml2Client:
response = correctly_signed_response(decoded_xml,
self.config["xmlsec_binary"], log=log)
if not response:
- log and log.error("Response was not correctly signed")
- print "Response was not correctly signed"
+ if log:
+ log.error("Response was not correctly signed")
+ log.info(decoded_xml)
return ({}, "")
else:
log and log.error("Response was correctly signed or nor signed")
log and log.info("response: %s" % (response,))
try:
- (ava, name_id, came_from) = self.do_response(response,
+ (ava, name_id, came_from, not_on_or_after) = self.do_response(
+ response,
requestor,
outstanding=outstanding,
xmlstr=xmlstr,
log=log, context=context)
+ issuer = response.issuer.text
except AttributeError, exc:
log and log.error("AttributeError: %s" % (exc,))
- return ({}, "")
+ return ({}, "", 0, "")
except Exception, exc:
log and log.error("Exception: %s" % (exc,))
- return ({}, "")
+ return ({}, "", 0, "")
# should return userid and attribute value assertions
ava["__userid"] = name_id
- return (ava, came_from)
+ return (ava, came_from, not_on_or_after, issuer)
def _verify_condition(self, assertion, requestor, log):
# The Identity Provider MUST include a <saml:Conditions> element
@@ -283,7 +286,9 @@ class Saml2Client:
if not for_me(condition, requestor):
if not LAX:
raise Exception("Not for me!!!")
-
+
+ return not_on_or_after
+
def _websso(self, assertion, outstanding, requestor, log):
# the assertion MUST contain one AuthNStatement
assert len(assertion.authn_statement) == 1
@@ -305,7 +310,7 @@ class Saml2Client:
#print "Conditions",assertion.conditions
assert assertion.conditions
log and log.info("verify_condition")
- self._verify_condition(assertion, requestor, log)
+ not_on_or_after = self._verify_condition(assertion, requestor, log)
# The assertion can contain zero or one attributeStatements
assert len(assertion.attribute_statement) <= 1
@@ -334,7 +339,7 @@ class Saml2Client:
assert subject.name_id
name_id = subject.name_id.text.strip()
- return (ava, name_id, came_from)
+ return (ava, name_id, came_from, not_on_or_after)
def _encrypted_assertion(self, xmlstr, outstanding, requestor,
log=None, context=""):
@@ -388,7 +393,7 @@ class Saml2Client:
else:
log and log.info("Session id I don't recall using")
raise Exception("Session id I don't recall using")
-
+
# MUST contain *one* assertion
try:
assert len(response.assertion) == 1 or \
@@ -509,13 +514,15 @@ class Saml2Client:
log and log.info("SOAP request sent and got response: %s" % response)
if response:
log and log.info("Verifying response")
- (identity, came_from) = self.verify_response(response,
+ (identity, came_from, not_on_or_after,
+ response_issuer) = self.verify_response(
+ response,
issuer,
outstanding={session_id:""},
log=log, decode=False,
context="AttrReq")
log and log.info("identity: %s" % identity)
- return identity
+ return (identity, response_issuer, not_on_or_after)
else:
log and log.info("No response")
return None
@@ -609,51 +616,4 @@ def _print_statements(states):
def print_response(resp):
print _print_statement(resp)
print resp.to_string()
-
-
-def d_init_request(id, destination):
- return {
- "id": id,
- "version": VERSION,
- "issue_instant": instant(),
- "destination": destination,
- }
-
-def d_authn_request(query_id, destination, service_url,
- spentityid, my_name, sp_name_qualifier=None,
- scoping=None):
- """ Creates an Authenication Request
-
- :param query_id: Query identifier
- :param destination: Where to send the request
- :param service_url: The page to where the response MUST be sent.
- :param spentityid: My official name
- :param my_name: Who I am
- :param sp_name_qualifier: The domain in which the name should be
- valid
- :param scoping: For which IdPs this query are aimed.
-
- :return: An authentication request
- """
-
- authn_request = d_init_request(query_id, destination)
- authn_request["assertion_consumer_service_url"] = service_url
- authn_request["protocol_binding"] = saml2.BINDING_HTTP_POST
- authn_request["provider_name"] = my_name
- if scoping:
- authn_request["scoping"] = scoping
-
- name_id_policy = {
- "allow_create": 'true'
- }
- if sp_name_qualifier:
- name_id_policy["format"] = saml.NAMEID_FORMAT_PERSISTENT
- name_id_policy["sp_name_qualifier"] = sp_name_qualifier
- else:
- name_id_policy["format"] = saml.NAMEID_FORMAT_TRANSIENT
-
-
- authn_request["name_id_policy"] = name_id_policy
- authn_request["issuer"] = spentityid
-
- return make_instance(samlp.AuthnRequest,authn_request)
+ \ No newline at end of file
diff --git a/src/saml2/config.py b/src/saml2/config.py
index c36c9675..e7ef54ec 100644
--- a/src/saml2/config.py
+++ b/src/saml2/config.py
@@ -4,26 +4,39 @@
from saml2 import metadata
+def entity_id2url(md, entity_id):
+ try:
+ # grab the first one
+ return md.single_sign_on_services(entity_id)[0]
+ except Exception:
+ print "idp_entity_id",entity_id
+ print ("idps in metadata",
+ [e for e,d in md.entity.items() if "idp_sso" in d])
+ print "metadata entities", md.entity.keys()
+ for ent, dic in md.entity.items():
+ print ent, dic.keys()
+ return None
+
class Config(dict):
def sp_check(self, config):
+ assert "idp" in config
+
if "metadata" in config:
md = config["metadata"]
- if "idp_entity_id" in config:
- try:
- config["idp_url"] = md.single_sign_on_services(
- config["idp_entity_id"])[0]
- except Exception:
- print "idp_entity_id",config["idp_entity_id"]
- print ("idps in metadata",
- [e for e,d in md.entity.items() if "idp_sso" in d])
- print "metadata entities", md.entity.keys()
- for ent, dic in md.entity.items():
- print ent, dic.keys()
- raise
+ if "entity_id" in config["idp"]:
+ if not "url" in config["idp"]:
+ config["idp"]["url"] = []
+ urls = config["idp"]["url"]
+ for eid in config["idp"]["entity_id"]:
+ url = entity_id2url(md, eid)
+ if url:
+ if url not in urls:
+ urls.append(url)
- assert config["idp_url"]
-
+ assert "sp" in config["service"]
+ assert "url" in config["service"]["sp"]
+
def idp_check(self, config):
pass
@@ -50,7 +63,6 @@ class Config(dict):
assert "xmlsec_binary" in config
assert "service" in config
assert "entityid" in config
- assert "service_url" in config
if "key_file" in config:
# If you have a key file you have to have a cert file
diff --git a/src/saml2/samlp.py b/src/saml2/samlp.py
index 44d8bfba..9a8365d3 100644
--- a/src/saml2/samlp.py
+++ b/src/saml2/samlp.py
@@ -897,9 +897,8 @@ class AuthnRequest(AbstractRequest):
c_children['{%s}Scoping' % NAMESPACE] = ('scoping', Scoping)
c_child_order = AbstractRequest.c_child_order[:]
- c_child_order.extend(['issuer', 'signature', 'extensions', 'subject',
- 'name_id_policy', 'conditions', 'requested_authn_context',
- 'scoping'])
+ c_child_order.extend(['subject', 'name_id_policy', 'conditions',
+ 'requested_authn_context', 'scoping'])
def __init__(self, id=None, version=None, issue_instant=None,
destination=None, consent=None, issuer=None, signature=None,
diff --git a/src/saml2/server.py b/src/saml2/server.py
index 7bd51640..26c7bda1 100644
--- a/src/saml2/server.py
+++ b/src/saml2/server.py
@@ -18,6 +18,8 @@
"""Contains classes and functions that a SAML2.0 Identity provider (IdP)
or attribute authority (AA) may use to conclude its tasks.
"""
+import shelve
+
from saml2 import saml, samlp, VERSION
from saml2.utils import sid, decode_base64_and_inflate, make_instance
from saml2.time_util import instant, in_a_while
@@ -60,95 +62,219 @@ def klassdict(klass, text=None, **kwargs):
spec[key] = val
return spec
-class Server(object):
- def __init__(self, config_file, log=None):
- if config_file:
- self.conf = Config()
- self.conf.load_file(config_file)
- self.metadata = self.conf["metadata"]
- self.log = log
-
- def issuer(self):
- return klassdict( saml.Issuer, self.conf["entityid"],
- format=saml.NAMEID_FORMAT_ENTITY)
-
- def status_from_exception(self, exception):
- return klassdict(samlp.Status,
+def kd_status_from_exception(exception):
+ return klassdict(samlp.Status,
+ status_code=klassdict(samlp.StatusCode,
+ value=samlp.STATUS_RESPONDER,
status_code=klassdict(samlp.StatusCode,
- value=samlp.STATUS_RESPONDER,
- status_code=klassdict(samlp.StatusCode,
- value=EXCEPTION2STATUS[exception.__class__])
- ),
- status_message=exception.args[0],
- )
-
- def name_id(self, text="", **kwargs):
- return klassdict(saml.NameID, text, **kwargs)
+ value=EXCEPTION2STATUS[exception.__class__])
+ ),
+ status_message=exception.args[0],
+ )
+
+def kd_name_id(text="", **kwargs):
+ return klassdict(saml.NameID, text, **kwargs)
- def status_message(self, text="", **kwargs):
- return klassdict(samlp.StatusMessage, text, **kwargs)
+def kd_status_message(text="", **kwargs):
+ return klassdict(samlp.StatusMessage, text, **kwargs)
- def status_code(self, text="", **kwargs):
- return klassdict(samlp.StatusCode, text, **kwargs)
+def kd_status_code(text="", **kwargs):
+ return klassdict(samlp.StatusCode, text, **kwargs)
- def status(self, text="", **kwargs):
- return klassdict(samlp.Status, text, **kwargs)
-
- def success_status(self):
- return self.status(status_code=self.status_code(
- value=samlp.STATUS_SUCCESS))
-
- def audience(self, text="", **kwargs):
- return klassdict(saml.Audience, text, **kwargs)
+def kd_status(text="", **kwargs):
+ return klassdict(samlp.Status, text, **kwargs)
+
+def kd_success_status():
+ return kd_status(status_code=kd_status_code(value=samlp.STATUS_SUCCESS))
+
+def kd_audience(text="", **kwargs):
+ return klassdict(saml.Audience, text, **kwargs)
+
+def kd_audience_restriction(text="", **kwargs):
+ return klassdict(saml.AudienceRestriction, text, **kwargs)
- def audience_restriction(self, text="", **kwargs):
- return klassdict(saml.AudienceRestriction, text, **kwargs)
+def kd_conditions(text="", **kwargs):
+ return klassdict(saml.Conditions, text, **kwargs)
+
+def kd_attribute(text="", **kwargs):
+ return klassdict(saml.Attribute, text, **kwargs)
- def conditions(self, text="", **kwargs):
- return klassdict(saml.Conditions, text, **kwargs)
+def kd_attribute_value(text="", **kwargs):
+ return klassdict(saml.AttributeValue, text, **kwargs)
- def attribute(self, text="", **kwargs):
- return klassdict(saml.Attribute, text, **kwargs)
+def kd_attribute_statement(text="", **kwargs):
+ return klassdict(saml.AttributeStatement, text, **kwargs)
- def attribute_value(self, text="", **kwargs):
- return klassdict(saml.AttributeValue, text, **kwargs)
-
- def attribute_statement(self, text="", **kwargs):
- return klassdict(saml.AttributeStatement, text, **kwargs)
+def kd_subject_confirmation_data(text="", **kwargs):
+ return klassdict(saml.SubjectConfirmationData, text, **kwargs)
- def subject_confirmation_data(self, text="", **kwargs):
- return klassdict(saml.SubjectConfirmationData, text, **kwargs)
-
- def subject_confirmation(self, text="", **kwargs):
- return klassdict(saml.SubjectConfirmation, text, **kwargs)
+def kd_subject_confirmation(text="", **kwargs):
+ return klassdict(saml.SubjectConfirmation, text, **kwargs)
+
+def kd_subject(text="", **kwargs):
+ return klassdict(saml.Subject, text, **kwargs)
+
+def kd_authn_statement(text="", **kwargs):
+ return klassdict(saml.Subject, text, **kwargs)
+
+def kd_assertion(text="", **kwargs):
+ kwargs.update({
+ "version": VERSION,
+ "id" : sid(),
+ "issue_instant" : instant(),
+ })
+ return klassdict(saml.Assertion, text, **kwargs)
+
+def kd_response(signature=False, encrypt=False, **kwargs):
+
+ kwargs.update({
+ "id" : sid(),
+ "version": VERSION,
+ "issue_instant" : instant(),
+ })
+ if signature:
+ kwargs["signature"] = sigver.pre_signature_part(kwargs["id"])
+
+ return kwargs
+
+def do_attribute_statement(identity):
+ """
+ :param identity: A dictionary with fiendly names as keys
+ :return:
+ """
+ attrs = []
+ for key, val in identity.items():
+ dic = {}
+ if isinstance(val,basestring):
+ attrval = kd_attribute_value(val)
+ elif isinstance(val,list):
+ attrval = [kd_attribute_value(v) for v in val]
+ else:
+ raise OtherError("strange value type on: %s" % val)
+ dic["attribute_value"] = attrval
+ if isinstance(key, basestring):
+ dic["name"] = key
+ elif isinstance(key, tuple): # 3-tuple
+ (name,format,friendly) = key
+ if name:
+ dic["name"] = name
+ if format:
+ dic["name_format"] = format
+ if friendly:
+ dic["friendly_name"] = friendly
+ attrs.append(kd_attribute(**dic))
+
+ return kd_attribute_statement(attribute=attrs)
+
+def kd_issuer(text, **kwargs):
+ return klassdict(saml.Issuer, text, **kwargs)
+
+def do_aa_response(consumer_url, in_response_to,
+ sp_entity_id, identity, name_id_policies=None,
+ name_id=None, ip_address="", issuer=None ):
+
+ attr_statement = do_attribute_statement(identity)
+
+ # start using now and for a hour
+ conds = kd_conditions(
+ not_before=instant(),
+ # an hour from now
+ not_on_or_after=in_a_while(hours=1),
+ audience_restriction=kd_audience_restriction(
+ audience=kd_audience(sp_entity_id)))
+
+ # temporary identifier or ??
+ if not name_id:
+ name_id = kd_name_id(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
- def subject(self, text="", **kwargs):
- return klassdict(saml.Subject, text, **kwargs)
+ tmp = kd_response(
+ issuer=issuer,
+ in_response_to=in_response_to,
+ destination=consumer_url,
+ status=kd_success_status(),
+ assertion=kd_assertion(
+ subject = kd_subject(
+ name_id=name_id,
+ method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
+ subject_confirmation=kd_subject_confirmation(
+ subject_confirmation_data=kd_subject_confirmation_data(
+ in_response_to=in_response_to,
+ not_on_or_after=in_a_while(hours=1),
+ address=ip_address,
+ recipient=consumer_url))),
+ attribute_statement = attr_statement,
+ authn_statement= kd_authn_statement(
+ authn_instant=instant(),
+ session_index=sid()),
+ conditions=conds,
+ ),
+ )
+
+ return make_instance(samlp.Response, tmp)
+
+class Server(object):
+ def __init__(self, config_file="", config=None, log=None, debug=0):
+ if config_file:
+ self.conf = Config()
+ self.conf.load_file(config_file)
+ self.metadata = self.conf["metadata"]
+ if "subject_data" in self.conf:
+ self.id_map = shelve.open(self.conf["subject_data"],
+ writeback=True)
+ else:
+ self.id_map = None
+ elif config:
+ self.conf = config
+ self.metadata = self.conf["metadata"]
- def authn_statement(self, text="", **kwargs):
- return klassdict(saml.Subject, text, **kwargs)
+ self.log = log
+ self.debug = debug
- def assertion(self, text="", **kwargs):
- kwargs.update({
- "version": VERSION,
- "id" : sid(),
- "issue_instant" : instant(),
- })
- return klassdict(saml.Assertion, text, **kwargs)
+ def issuer(self):
+ return kd_issuer( self.conf["entityid"],
+ format=saml.NAMEID_FORMAT_ENTITY)
- def response(self, signature=False, encrypt=False, **kwargs):
+ def persistent_id(self, entity_id, subject_id):
+ """
+ :param entity_id: SP entity ID or VO entity ID
+ :param subject_id: The local identifier of the subject
+ :return: A arbitrary identifier for the subject unique to the
+ entity_id
+ """
+ if self.debug:
+ self.log and self.log.debug("Id map keys: %s" % self.id_map.keys())
+
+ try:
+ map = self.id_map[entity_id]
+ except KeyError:
+ map = self.id_map[entity_id] = {"forward":{}, "backward":{}}
- kwargs.update({
- "id" : sid(),
- "version": VERSION,
- "issue_instant" : instant(),
- })
- if signature:
- kwargs["signature"] = sigver.pre_signature_part(kwargs["id"])
+ try:
+ if self.debug:
+ self.log.debug("map forward keys: %s" % map["forward"].keys())
+ return map["forward"][subject_id]
+ except KeyError:
+ while True:
+ temp_id = sid()
+ if temp_id not in map["backward"]:
+ break
+ map["forward"][subject_id] = temp_id
+ map["backward"][temp_id] = subject_id
+ self.id_map[entity_id]= map
+ self.id_map.sync()
+
+ return temp_id
- return kwargs
-
def parse_authn_request(self, enc_request):
+ """Parse a Authentication Request
+
+ :param enc_request: The request in its transport format
+ :return: A tuple of
+ consumer_url - as gotten from the SPs entity_id and the metadata
+ id - the id of the request
+ name_id_policy - how to chose the subjects identifier
+ spentityid - the entity id of the SP
+ """
request_xml = decode_base64_and_inflate(enc_request)
request = samlp.authn_request_from_string(request_xml)
@@ -175,92 +301,77 @@ class Server(object):
return_destination))
print "%s != %s" % (consumer_url, return_destination)
raise OtherError("ConsumerURL and return destination mismatch")
-
- policy = request.name_id_policy
- if policy.allow_create.lower() == "true" and \
- policy.format == saml.NAMEID_FORMAT_TRANSIENT:
- name_id_policies = policy.format
-
- return (consumer_url, id, name_id_policies, spentityid)
+
+ self.log and self.log.info("AuthNRequest: %s" % request)
+ return (consumer_url, id, request.name_id_policy, spentityid)
def allowed_issuer(self, issuer):
+ """ """
return True
def parse_attribute_query(self, xml_string):
query = samlp.attribute_query_from_string(xml_string)
assert query.version == VERSION
- assert query.destination == self.conf["service_url"]
+ self.log and self.log.info(
+ "%s ?= %s" % (query.destination,self.conf["service"]["aa"]["url"]))
+ assert query.destination == self.conf["service"]["aa"]["url"]
self.allowed_issuer(query.issuer)
# verify signature
- return (subject, attribute)
+ subject = query.subject.name_id.text
+ if query.attribute:
+ attribute = query.attribute
+ else:
+ attribute = None
+ return (subject, attribute, query)
def find_subject(self, subject, attribute=None):
pass
-
- def do_attribute_statement(self, identity):
- """
- :param identity: A dictionary with fiendly names as keys
- :return:
- """
- attrs = []
- for key, val in identity.items():
- dic = {}
- if isinstance(val,basestring):
- attrval = self.attribute_value(val)
- elif isinstance(val,list):
- attrval = [self.attribute_value(v) for v in val]
- else:
- raise OtherError("strange value type on: %s" % val)
- dic["attribute_value"] = attrval
- if isinstance(key, basestring):
- dic["name"] = key
- elif isinstance(key, tuple): # 3-tuple
- (name,format,friendly) = key
- if name:
- dic["name"] = name
- if format:
- dic["name_format"] = format
- if friendly:
- dic["friendly_name"] = friendly
- attrs.append(self.attribute(**dic))
-
- return self.attribute_statement(attribute=attrs)
-
+
def do_sso_response(self, consumer_url, in_response_to,
- sp_entity_id, identity, name_id_policies=None,
- subject_id=None ):
+ sp_entity_id, identity, name_id=None ):
- attribute_statement = self.do_attribute_statement(identity)
+ attr_statement = do_attribute_statement(identity)
# start using now and for a hour
- conditions = self.conditions(
+ conds = kd_conditions(
not_before=instant(),
# an hour from now
not_on_or_after=in_a_while(0,0,0,0,0,1),
- audience_restriction=self.audience_restriction(
- audience=self.audience(sp_entity_id)))
+ audience_restriction=kd_audience_restriction(
+ audience=kd_audience(sp_entity_id)))
# temporary identifier or ??
- subject_id = sid()
- tmp = self.response(
+ if not name_id:
+ name_id = kd_name_id(sid(), format=saml.NAMEID_FORMAT_TRANSIENT)
+
+ tmp = kd_response(
+ issuer=self.issuer(),
in_response_to=in_response_to,
destination=consumer_url,
- status=self.success_status(),
- assertion=self.assertion(
- subject = self.subject(
- name_id=self.name_id(subject_id,
- format=saml.NAMEID_FORMAT_TRANSIENT),
- method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
- subject_confirmation=self.subject_confirmation(
- subject_confirmation_data=self.subject_confirmation_data(
- in_response_to=in_response_to))),
- attribute_statement = attribute_statement,
- authn_statement= self.authn_statement(
+ status=kd_success_status(),
+ assertion=kd_assertion(
+ attribute_statement = attr_statement,
+ authn_statement= kd_authn_statement(
authn_instant=instant(),
session_index=sid()),
- conditions=conditions,
+ conditions=conds,
+ subject=kd_subject(
+ name_id=name_id,
+ method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
+ subject_confirmation=kd_subject_confirmation(
+ subject_confirmation_data=kd_subject_confirmation_data(
+ in_response_to=in_response_to))),
),
)
return make_instance(samlp.Response, tmp)
+
+ def do_aa_response(self, consumer_url, in_response_to,
+ sp_entity_id, identity, name_id_policies=None,
+ subject_id=None, ip_address=""):
+
+ return do_aa_response(consumer_url, in_response_to,
+ sp_entity_id, identity, name_id_policies,
+ subject_id, ip_address, self.issuer())
+
diff --git a/src/saml2/soap.py b/src/saml2/soap.py
index 97b3bb25..4ac99890 100644
--- a/src/saml2/soap.py
+++ b/src/saml2/soap.py
@@ -36,9 +36,13 @@ NAMESPACE = "http://schemas.xmlsoap.org/soap/envelope/"
def parse_soap_enveloped_saml_response(text):
expected_tag = '{%s}Response' % SAMLP_NAMESPACE
- return parse_soap_enveloped_saml_thing(text, expected_tag)
-
-def parse_soap_enveloped_saml_thing(text, expected_tag):
+ return parse_soap_enveloped_saml_thingy(text, expected_tag)
+
+def parse_soap_enveloped_saml_attribute_query(text):
+ expected_tag = '{%s}AttributeQuery' % SAMLP_NAMESPACE
+ return parse_soap_enveloped_saml_thingy(text, expected_tag)
+
+def parse_soap_enveloped_saml_thingy(text, expected_tag):
"""Parses a SOAP enveloped SAML thing and returns the thing as
a string.
@@ -59,7 +63,7 @@ def parse_soap_enveloped_saml_thing(text, expected_tag):
else:
return ""
-def make_soap_enveloped_saml_thingy(self, thingy):
+def make_soap_enveloped_saml_thingy(thingy):
""" Returns a soap envelope containing a SAML request
as a text string.
@@ -86,7 +90,9 @@ class _Http(object):
self.server.add_certificate(keyfile, certfile, "")
def write(self, data):
- (response, content) = self.server.request(self.path, "POST", data)
+ (response, content) = self.server.request(self.path,
+ "POST", data,
+ headers={"content-type": "application/soap+xml"})
if response.status == 200:
return content
else:
@@ -98,7 +104,7 @@ class SOAPClient(object):
self.server = _Http(server_url, keyfile, certfile)
def send(self, request):
- soap_message = make_soap_enveloped_saml_request(request)
+ soap_message = make_soap_enveloped_saml_thingy(request)
response = self.server.write(soap_message)
if response:
return parse_soap_enveloped_saml_response(response)
diff --git a/src/saml2/time_util.py b/src/saml2/time_util.py
index 64d89125..53576760 100644
--- a/src/saml2/time_util.py
+++ b/src/saml2/time_util.py
@@ -162,7 +162,7 @@ def add_duration(tid, duration):
# ---------------------------------------------------------------------------
-def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
+def time_in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
minutes=0, hours=0, weeks=0):
"""
format of timedelta:
@@ -173,7 +173,17 @@ def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
t = timedelta(*[days,seconds,microseconds,milliseconds,minutes,
hours,weeks])
soon = now + t
- return soon.strftime(TIME_FORMAT)
+ return soon
+
+def in_a_while(days=0, seconds=0, microseconds=0, milliseconds=0,
+ minutes=0, hours=0, weeks=0):
+ """
+ format of timedelta:
+ timedelta([days[, seconds[, microseconds[, milliseconds[,
+ minutes[, hours[, weeks]]]]]]])
+ """
+ return time_in_a_while(days, seconds, microseconds, milliseconds,
+ minutes, hours, weeks).strftime(TIME_FORMAT)
# ---------------------------------------------------------------------------
diff --git a/tests/metadata.xml b/tests/metadata.xml
index 1766c0a3..2d2c4b0e 100644
--- a/tests/metadata.xml
+++ b/tests/metadata.xml
@@ -1,5 +1,5 @@
<?xml version='1.0' encoding='UTF-8'?>
-<ns0:EntitiesDescriptor name="urn:mace:umu.se:saml:test" validUntil="2009-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
+<ns0:EntitiesDescriptor name="urn:mace:example.com:saml:test" validUntil="2009-12-04T17:31:07Z" xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:sp"><ns0:SPSSODescriptor AuthnRequestsSigned="False" WantAssertionsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
@@ -15,7 +15,7 @@ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
-</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.umu.se/</ns0:OrganizationURL><ns0:OrganizationName>Umea University</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@adm.umu.se</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
+</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087/" index="0" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName>Example Co</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor><ns0:EntityDescriptor entityID="urn:mace:example.com:saml:roland:idp"><ns0:IDPSSODescriptor WantAuthnRequestsSigned="True" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor><ns1:KeyInfo xmlns:ns1="http://www.w3.org/2000/09/xmldsig#"><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
@@ -31,4 +31,4 @@ AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
+vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
-</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.umu.se/</ns0:OrganizationURL><ns0:OrganizationName>Umea University</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@adm.umu.se</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor>
+</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleSignOnService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8088/sso/" /></ns0:IDPSSODescriptor><ns0:Organization><ns0:OrganizationURL>http://www.example.com/</ns0:OrganizationURL><ns0:OrganizationName>Example Co</ns0:OrganizationName></ns0:Organization><ns0:ContactPerson><ns0:GivenName>Roland</ns0:GivenName><ns0:SurName>Hedberg</ns0:SurName><ns0:EmailAddress>roland.hedberg@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor></ns0:EntitiesDescriptor>
diff --git a/tests/server.config b/tests/server.config
index 7ba28cd7..e2905b89 100644
--- a/tests/server.config
+++ b/tests/server.config
@@ -1,14 +1,26 @@
{
- "entityid" : "urn:mace:umu.se:saml:roland:sp",
- "my_name" : "urn:mace:umu.se:saml:roland:sp",
- "service_url" : "http://lingon.catalogix.se:8087/",
- "service": ["sp"],
+ "entityid" : "urn:mace:example.com:saml:roland:sp",
+ "service": {
+ "sp":{
+ "my_name" : "urn:mace:example.com:saml:roland:sp",
+ "url": "http://lingon.catalogix.se:8087/",
+ }
+ },
"debug" : 1,
"my_key" : "./mykey.pem",
"my_cert" : "./mycert.pem",
"xmlsec_binary" : "/opt/local/bin/xmlsec1",
"metadata": {
- "local": ["/Users/rolandh/code/pysaml2/tests/metadata.xml"],
+ "local": ["./tests/metadata.xml", "./tests/vo_metadata.xml"],
+ },
+ "idp":{
+ "entity_id": ["urn:mace:example.com:saml:roland:idp"],
+ },
+ "virtual_organization" : {
+ "urn:mace:example.com:it:tek":{
+ "nameid_format" : "urn:oid:1.3.6.1.4.1.1466.115.121.1.15-NameID",
+ "common_identifier": "umuselin",
+ }
},
- "idp_entity_id": "urn:mace:umu.se:saml:roland:idp",
+ "subject_data": "subject_data.db"
} \ No newline at end of file
diff --git a/tests/test_client.py b/tests/test_client.py
index d838b1e4..138141f0 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from saml2.client import Saml2Client
-from saml2 import samlp, client
+from saml2 import samlp, client, BINDING_HTTP_POST
from saml2 import saml, utils, config
XML_RESPONSE_FILE = "tests/saml_signed.xml"
@@ -190,3 +190,45 @@ class TestClient:
idp_entry = scope.idp_list.idp_entry[0]
assert idp_entry.name == "UmeƄ Universitet"
assert idp_entry.loc == "https://idp.umu.se/"
+
+ def test_create_auth_request_0(self):
+ ar = self.client.authn_request("1",
+ "http://www.example.com/sso",
+ "http://www.example.org/service",
+ "urn:mace:example.org:saml:sp",
+ "My Name")
+
+ print ar
+ assert ar.assertion_consumer_service_url == "http://www.example.org/service"
+ assert ar.destination == "http://www.example.com/sso"
+ assert ar.protocol_binding == BINDING_HTTP_POST
+ assert ar.version == "2.0"
+ assert ar.provider_name == "My Name"
+ assert ar.issuer.text == "urn:mace:example.org:saml:sp"
+ nid_policy = ar.name_id_policy
+ assert nid_policy.allow_create == "true"
+ assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT
+
+ def test_create_auth_request_vo(self):
+ assert self.client.config["virtual_organization"].keys() == [
+ "urn:mace:example.com:it:tek"]
+
+ ar = self.client.authn_request("1",
+ "http://www.example.com/sso",
+ "http://www.example.org/service",
+ "urn:mace:example.org:saml:sp",
+ "My Name",
+ vo="urn:mace:example.com:it:tek")
+
+ print ar
+ assert ar.assertion_consumer_service_url == "http://www.example.org/service"
+ assert ar.destination == "http://www.example.com/sso"
+ assert ar.protocol_binding == BINDING_HTTP_POST
+ assert ar.version == "2.0"
+ assert ar.provider_name == "My Name"
+ assert ar.issuer.text == "urn:mace:example.org:saml:sp"
+ nid_policy = ar.name_id_policy
+ assert nid_policy.allow_create == "true"
+ assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
+ assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"
+
diff --git a/tests/test_config.py b/tests/test_config.py
index 2391a297..f3ab82cd 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -4,26 +4,30 @@
from saml2.config import Config
-c1 = {
- "service": ["sp"],
+sp1 = {
"entityid" : "urn:mace:umu.se:saml:roland:sp",
-# "my_name" : "urn:mace:umu.se:saml:roland:sp",
- "service_url" : "http://lingon.catalogix.se:8087/",
-# "debug" : 1,
+ "service": {
+ "sp": {
+ "url" : "http://lingon.catalogix.se:8087/",
+ "name": "test",
+ }
+ },
"key_file" : "tests/mykey.pem",
"cert_file" : "tests/mycert.pem",
"xmlsec_binary" : "/opt/local/bin/xmlsec1",
"metadata": {
"local": ["tests/metadata.xml",
"tests/urn-mace-swami.se-swamid-test-1.0-metadata.xml"],
-# "remote":{
-# "edugain":{
-# "url": "https://www.example.com/?id=edugain&set=saml2",
-# "cert": "./edugain.pem",
-# }
-# }
+ # "remote":{
+ # "edugain":{
+ # "url": "https://www.example.com/?id=edugain&set=saml2",
+ # "cert": "./edugain.pem",
+ # }
+ # }
+ },
+ "idp" : {
+ "entity_id": ["urn:mace:umu.se:saml:roland:idp"],
},
- "idp_entity_id": "urn:mace:umu.se:saml:roland:idp",
"virtual_organization" : {
"http://vo.example.org/biomed":{
"nameid_format" : "urn:oid:2.16.756.1.2.5.1.1.1-NameID",
@@ -34,4 +38,4 @@ c1 = {
def test_1():
c = Config()
- c.load(c1) \ No newline at end of file
+ c.load(sp1)
diff --git a/tests/test_server.py b/tests/test_server.py
index ad3d9e7b..dd041df9 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -2,9 +2,11 @@
# -*- coding: utf-8 -*-
from saml2.server import Server, OtherError, UnknownPricipal
+from saml2 import server
from saml2 import samlp, saml, client, utils
from saml2.utils import make_instance
from py.test import raises
+import shelve
SUCCESS_STATUS = """<?xml version=\'1.0\' encoding=\'UTF-8\'?>
<ns0:Status xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:StatusCode Value="urn:oasis:names:tc:SAML:2.0:status:Success" /></ns0:Status>"""
@@ -15,67 +17,193 @@ ERROR_STATUS = """<?xml version='1.0' encoding='UTF-8'?>
def _eq(l1,l2):
return set(l1) == set(l2)
-class TestServer():
- def setup_class(self):
- self.server = Server("tests/server.config")
+def test_status_success():
+ stat = server.kd_status(
+ status_code=server.kd_status_code(
+ value=samlp.STATUS_SUCCESS))
+ status = make_instance( samlp.Status, stat)
+ status_text = "%s" % status
+ assert status_text == SUCCESS_STATUS
+ assert status.status_code.value == samlp.STATUS_SUCCESS
+
+def test_success_status():
+ stat = server.kd_success_status()
+ status = make_instance(samlp.Status, stat)
+ status_text = "%s" % status
+ assert status_text == SUCCESS_STATUS
+ assert status.status_code.value == samlp.STATUS_SUCCESS
- def test_status_success(self):
- stat = self.server.status(
- status_code=self.server.status_code(
- value=samlp.STATUS_SUCCESS))
- status = make_instance( samlp.Status, stat)
- status_text = "%s" % status
- assert status_text == SUCCESS_STATUS
- assert status.status_code.value == samlp.STATUS_SUCCESS
+def test_error_status():
+ stat = server.kd_status(
+ status_message=server.kd_status_message(
+ "Error resolving principal"),
+ status_code=server.kd_status_code(
+ value=samlp.STATUS_RESPONDER,
+ status_code=server.kd_status_code(
+ value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
- def test_success_status(self):
- stat = self.server.success_status()
- status = make_instance(samlp.Status, stat)
- status_text = "%s" % status
- assert status_text == SUCCESS_STATUS
- assert status.status_code.value == samlp.STATUS_SUCCESS
+ status_text = "%s" % make_instance( samlp.Status, stat )
+ print status_text
+ assert status_text == ERROR_STATUS
+
+def test_status_from_exception():
+ e = UnknownPricipal("Error resolving principal")
+ stat = server.kd_status_from_exception(e)
+ status_text = "%s" % make_instance( samlp.Status, stat )
- def test_error_status(self):
- stat = self.server.status(
- status_message=self.server.status_message(
- "Error resolving principal"),
- status_code=self.server.status_code(
- value=samlp.STATUS_RESPONDER,
- status_code=self.server.status_code(
- value=samlp.STATUS_UNKNOWN_PRINCIPAL)))
+ assert status_text == ERROR_STATUS
+
+def test_attribute_statement():
+ astat = server.do_attribute_statement({"surName":"Jeter",
+ "givenName":"Derek"})
+ statement = make_instance(saml.AttributeStatement,astat)
+ assert statement.keyswv() == ["attribute"]
+ assert len(statement.attribute) == 2
+ attr0 = statement.attribute[0]
+ assert _eq(attr0.keyswv(), ["name","attribute_value"])
+ assert len(attr0.attribute_value) == 1
+ attr1 = statement.attribute[1]
+ assert _eq(attr1.keyswv(), ["name","attribute_value"])
+ assert len(attr1.attribute_value) == 1
+ if attr0.name == "givenName":
+ assert attr0.attribute_value[0].text == "Derek"
+ assert attr1.name == "surName"
+ assert attr1.attribute_value[0].text == "Jeter"
+ else:
+ assert attr0.name == "surName"
+ assert attr0.attribute_value[0].text == "Jeter"
+ assert attr1.name == "givenName"
+ assert attr1.attribute_value[0].text == "Derek"
+
+def test_audience():
+ aud_restr = make_instance( saml.AudienceRestriction,
+ server.kd_audience_restriction(
+ audience=server.kd_audience("urn:foo:bar")))
- status_text = "%s" % make_instance( samlp.Status, stat )
- print status_text
- assert status_text == ERROR_STATUS
+ assert aud_restr.keyswv() == ["audience"]
+ assert aud_restr.audience.text == "urn:foo:bar"
+
+def test_conditions():
+ conds_dict = server.kd_conditions(
+ not_before="2009-10-30T07:58:10.852Z",
+ not_on_or_after="2009-10-30T08:03:10.852Z",
+ audience_restriction=server.kd_audience_restriction(
+ audience=server.kd_audience("urn:foo:bar")))
+
+ conditions = make_instance(saml.Conditions, conds_dict)
+ assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
+ "audience_restriction"])
+ assert conditions.not_before == "2009-10-30T07:58:10.852Z"
+ assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z"
+ assert conditions.audience_restriction[0].audience.text == "urn:foo:bar"
+
+def test_value_1():
+ #FriendlyName="givenName" Name="urn:oid:2.5.4.42"
+ # NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
+ adict = server.kd_attribute(name="urn:oid:2.5.4.42",
+ name_format=saml.NAME_FORMAT_URI)
+ attribute = make_instance(saml.Attribute, adict)
+ assert _eq(attribute.keyswv(),["name","name_format"])
+ assert attribute.name == "urn:oid:2.5.4.42"
+ assert attribute.name_format == saml.NAME_FORMAT_URI
- def test_status_from_exception(self):
- e = UnknownPricipal("Error resolving principal")
- stat = self.server.status_from_exception(e)
- status_text = "%s" % make_instance( samlp.Status, stat )
-
- assert status_text == ERROR_STATUS
-
- def test_attribute_statement(self):
- astat = self.server.do_attribute_statement({"surName":"Jeter",
- "givenName":"Derek"})
- statement = make_instance(saml.AttributeStatement,astat)
- assert statement.keyswv() == ["attribute"]
- assert len(statement.attribute) == 2
- attr0 = statement.attribute[0]
- assert _eq(attr0.keyswv(), ["name","attribute_value"])
- assert len(attr0.attribute_value) == 1
- attr1 = statement.attribute[1]
- assert _eq(attr1.keyswv(), ["name","attribute_value"])
+def test_value_2():
+ adict = server.kd_attribute(name="urn:oid:2.5.4.42",
+ name_format=saml.NAME_FORMAT_URI,
+ friendly_name="givenName")
+ attribute = make_instance(saml.Attribute, adict)
+ assert _eq(attribute.keyswv(),["name","name_format","friendly_name"])
+ assert attribute.name == "urn:oid:2.5.4.42"
+ assert attribute.name_format == saml.NAME_FORMAT_URI
+ assert attribute.friendly_name == "givenName"
+
+def test_value_3():
+ adict = server.kd_attribute(attribute_value="Derek",
+ name="urn:oid:2.5.4.42",
+ name_format=saml.NAME_FORMAT_URI,
+ friendly_name="givenName")
+ attribute = make_instance(saml.Attribute, adict)
+ assert _eq(attribute.keyswv(),["name", "name_format",
+ "friendly_name", "attribute_value"])
+ assert attribute.name == "urn:oid:2.5.4.42"
+ assert attribute.name_format == saml.NAME_FORMAT_URI
+ assert attribute.friendly_name == "givenName"
+ assert len(attribute.attribute_value) == 1
+ assert attribute.attribute_value[0].text == "Derek"
+
+def test_value_4():
+ adict = server.kd_attribute(attribute_value="Derek",
+ friendly_name="givenName")
+ attribute = make_instance(saml.Attribute, adict)
+ assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
+ assert attribute.friendly_name == "givenName"
+ assert len(attribute.attribute_value) == 1
+ assert attribute.attribute_value[0].text == "Derek"
+
+def test_do_attribute_statement_0():
+ astat = server.do_attribute_statement({"vo_attr":"foobar"})
+ statement = make_instance(saml.AttributeStatement,astat)
+ assert statement.keyswv() == ["attribute"]
+ assert len(statement.attribute) == 1
+ attr0 = statement.attribute[0]
+ assert _eq(attr0.keyswv(), ["name","attribute_value"])
+ assert attr0.name == "vo_attr"
+ assert len(attr0.attribute_value) == 1
+ assert attr0.attribute_value[0].text == "foobar"
+
+def test_do_attribute_statement():
+ astat = server.do_attribute_statement({"surName":"Jeter",
+ "givenName":["Derek","Sanderson"]})
+ statement = make_instance(saml.AttributeStatement,astat)
+ assert statement.keyswv() == ["attribute"]
+ assert len(statement.attribute) == 2
+ attr0 = statement.attribute[0]
+ assert _eq(attr0.keyswv(), ["name","attribute_value"])
+ attr1 = statement.attribute[1]
+ assert _eq(attr1.keyswv(), ["name","attribute_value"])
+ if attr0.name == "givenName":
+ assert len(attr0.attribute_value) == 2
+ assert _eq([av.text for av in attr0.attribute_value],
+ ["Derek","Sanderson"])
+ assert attr1.name == "surName"
+ assert attr1.attribute_value[0].text == "Jeter"
assert len(attr1.attribute_value) == 1
- if attr0.name == "givenName":
- assert attr0.attribute_value[0].text == "Derek"
- assert attr1.name == "surName"
- assert attr1.attribute_value[0].text == "Jeter"
- else:
- assert attr0.name == "surName"
- assert attr0.attribute_value[0].text == "Jeter"
- assert attr1.name == "givenName"
- assert attr1.attribute_value[0].text == "Derek"
+ else:
+ assert attr0.name == "surName"
+ assert attr0.attribute_value[0].text == "Jeter"
+ assert len(attr0.attribute_value) == 1
+ assert attr1.name == "givenName"
+ assert len(attr1.attribute_value) == 2
+ assert _eq([av.text for av in attr1.attribute_value],
+ ["Derek","Sanderson"])
+
+def test_do_attribute_statement_multi():
+ astat = server.do_attribute_statement(
+ {("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
+ "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
+ "eduPersonEntitlement"):"Jeter"})
+ statement = make_instance(saml.AttributeStatement,astat)
+ assert statement.keyswv() == ["attribute"]
+ assert len(statement.attribute)
+ assert _eq(statement.attribute[0].keyswv(),
+ ["name","name_format","friendly_name","attribute_value"])
+ attribute = statement.attribute[0]
+ assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
+ assert attribute.name_format == (
+ "urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
+ assert attribute.friendly_name == "eduPersonEntitlement"
+
+def test_subject():
+ adict = server.kd_subject("_aaa", name_id=saml.NAMEID_FORMAT_TRANSIENT)
+ subject = make_instance(saml.Subject, adict)
+ assert _eq(subject.keyswv(),["text", "name_id"])
+ assert subject.text == "_aaa"
+ assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
+
+
+class TestServer():
+ def setup_class(self):
+ self.server = Server("tests/server.config")
def test_issuer(self):
issuer = make_instance( saml.Issuer, self.server.issuer())
@@ -84,130 +212,16 @@ class TestServer():
assert issuer.format == saml.NAMEID_FORMAT_ENTITY
assert issuer.text == self.server.conf["entityid"]
- def test_audience(self):
- aud_restr = make_instance( saml.AudienceRestriction,
- self.server.audience_restriction(
- audience=self.server.audience("urn:foo:bar")))
-
- assert aud_restr.keyswv() == ["audience"]
- assert aud_restr.audience.text == "urn:foo:bar"
-
- def test_conditions(self):
- conds_dict = self.server.conditions(
- not_before="2009-10-30T07:58:10.852Z",
- not_on_or_after="2009-10-30T08:03:10.852Z",
- audience_restriction=self.server.audience_restriction(
- audience=self.server.audience("urn:foo:bar")))
-
- conditions = make_instance(saml.Conditions, conds_dict)
- assert _eq(conditions.keyswv(), ["not_before", "not_on_or_after",
- "audience_restriction"])
- assert conditions.not_before == "2009-10-30T07:58:10.852Z"
- assert conditions.not_on_or_after == "2009-10-30T08:03:10.852Z"
- assert conditions.audience_restriction[0].audience.text == "urn:foo:bar"
-
- def test_value_1(self):
- #FriendlyName="givenName" Name="urn:oid:2.5.4.42"
- # NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
- adict = self.server.attribute(name="urn:oid:2.5.4.42",
- name_format=saml.NAME_FORMAT_URI)
- attribute = make_instance(saml.Attribute, adict)
- assert _eq(attribute.keyswv(),["name","name_format"])
- assert attribute.name == "urn:oid:2.5.4.42"
- assert attribute.name_format == saml.NAME_FORMAT_URI
-
- def test_value_2(self):
- adict = self.server.attribute(name="urn:oid:2.5.4.42",
- name_format=saml.NAME_FORMAT_URI,
- friendly_name="givenName")
- attribute = make_instance(saml.Attribute, adict)
- assert _eq(attribute.keyswv(),["name","name_format","friendly_name"])
- assert attribute.name == "urn:oid:2.5.4.42"
- assert attribute.name_format == saml.NAME_FORMAT_URI
- assert attribute.friendly_name == "givenName"
- def test_value_3(self):
- adict = self.server.attribute(attribute_value="Derek",
- name="urn:oid:2.5.4.42",
- name_format=saml.NAME_FORMAT_URI,
- friendly_name="givenName")
- attribute = make_instance(saml.Attribute, adict)
- assert _eq(attribute.keyswv(),["name", "name_format",
- "friendly_name", "attribute_value"])
- assert attribute.name == "urn:oid:2.5.4.42"
- assert attribute.name_format == saml.NAME_FORMAT_URI
- assert attribute.friendly_name == "givenName"
- assert len(attribute.attribute_value) == 1
- assert attribute.attribute_value[0].text == "Derek"
-
- def test_value_4(self):
- adict = self.server.attribute(attribute_value="Derek",
- friendly_name="givenName")
- attribute = make_instance(saml.Attribute, adict)
- assert _eq(attribute.keyswv(),["friendly_name", "attribute_value"])
- assert attribute.friendly_name == "givenName"
- assert len(attribute.attribute_value) == 1
- assert attribute.attribute_value[0].text == "Derek"
-
- def test_do_attribute_statement(self):
- astat = self.server.do_attribute_statement({"surName":"Jeter",
- "givenName":["Derek","Sanderson"]})
- statement = make_instance(saml.AttributeStatement,astat)
- assert statement.keyswv() == ["attribute"]
- assert len(statement.attribute) == 2
- attr0 = statement.attribute[0]
- assert _eq(attr0.keyswv(), ["name","attribute_value"])
- attr1 = statement.attribute[1]
- assert _eq(attr1.keyswv(), ["name","attribute_value"])
- if attr0.name == "givenName":
- assert len(attr0.attribute_value) == 2
- assert _eq([av.text for av in attr0.attribute_value],
- ["Derek","Sanderson"])
- assert attr1.name == "surName"
- assert attr1.attribute_value[0].text == "Jeter"
- assert len(attr1.attribute_value) == 1
- else:
- assert attr0.name == "surName"
- assert attr0.attribute_value[0].text == "Jeter"
- assert len(attr0.attribute_value) == 1
- assert attr1.name == "givenName"
- assert len(attr1.attribute_value) == 2
- assert _eq([av.text for av in attr1.attribute_value],
- ["Derek","Sanderson"])
-
- def test_do_attribute_statement_multi(self):
- astat = self.server.do_attribute_statement(
- {("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
- "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
- "eduPersonEntitlement"):"Jeter"})
- statement = make_instance(saml.AttributeStatement,astat)
- assert statement.keyswv() == ["attribute"]
- assert len(statement.attribute)
- assert _eq(statement.attribute[0].keyswv(),
- ["name","name_format","friendly_name","attribute_value"])
- attribute = statement.attribute[0]
- assert attribute.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
- assert attribute.name_format == (
- "urn:oasis:names:tc:SAML:2.0:attrname-format:uri")
- assert attribute.friendly_name == "eduPersonEntitlement"
-
- def test_subject(self):
- adict = self.server.subject("_aaa",
- name_id=saml.NAMEID_FORMAT_TRANSIENT)
- subject = make_instance(saml.Subject, adict)
- assert _eq(subject.keyswv(),["text", "name_id"])
- assert subject.text == "_aaa"
- assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
-
def test_assertion(self):
- tmp = self.server.assertion(
- subject= self.server.subject("_aaa",
+ tmp = server.kd_assertion(
+ subject= server.kd_subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
- attribute_statement = self.server.attribute_statement(
+ attribute_statement = server.kd_attribute_statement(
attribute=[
- self.server.attribute(attribute_value="Derek",
+ server.kd_attribute(attribute_value="Derek",
friendly_name="givenName"),
- self.server.attribute(attribute_value="Jeter",
+ server.kd_attribute(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
@@ -217,7 +231,7 @@ class TestServer():
assert _eq(assertion.keyswv(),['attribute_statement', 'issuer', 'id',
'subject', 'issue_instant', 'version'])
assert assertion.version == "2.0"
- assert assertion.issuer.text == "urn:mace:umu.se:saml:roland:sp"
+ assert assertion.issuer.text == "urn:mace:example.com:saml:roland:sp"
#
assert len(assertion.attribute_statement) == 1
attribute_statement = assertion.attribute_statement[0]
@@ -240,17 +254,17 @@ class TestServer():
assert subject.name_id.text == saml.NAMEID_FORMAT_TRANSIENT
def test_response(self):
- tmp = self.server.response(
+ tmp = server.kd_response(
in_response_to="_012345",
destination="https:#www.example.com",
- status=self.server.success_status(),
- assertion=self.server.assertion(
- subject = self.server.subject("_aaa",
+ status=server.kd_success_status(),
+ assertion=server.kd_assertion(
+ subject = server.kd_subject("_aaa",
name_id=saml.NAMEID_FORMAT_TRANSIENT),
- attribute_statement = self.server.attribute_statement([
- self.server.attribute(attribute_value="Derek",
+ attribute_statement = server.kd_attribute_statement([
+ server.kd_attribute(attribute_value="Derek",
friendly_name="givenName"),
- self.server.attribute(attribute_value="Jeter",
+ server.kd_attribute(attribute_value="Jeter",
friendly_name="surName"),
]),
issuer=self.server.issuer(),
@@ -264,7 +278,7 @@ class TestServer():
'in_response_to', 'issue_instant',
'version', 'issuer', 'id'])
assert response.version == "2.0"
- assert response.issuer.text == "urn:mace:umu.se:saml:roland:sp"
+ assert response.issuer.text == "urn:mace:example.com:saml:roland:sp"
assert response.destination == "https:#www.example.com"
assert response.in_response_to == "_012345"
#
@@ -273,11 +287,12 @@ class TestServer():
assert status.status_code.value == samlp.STATUS_SUCCESS
def test_parse_faulty_request(self):
- authn_request = client.d_authn_request(
+ sc = client.Saml2Client({},None)
+ authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
- spentityid = "urn:mace:umu.se:saml:roland:sp",
+ spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
@@ -286,11 +301,12 @@ class TestServer():
raises(OtherError,self.server.parse_authn_request,intermed)
def test_parse_faulty_request_to_err_status(self):
- authn_request = client.d_authn_request(
+ sc = client.Saml2Client({},None)
+ authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://www.example.org",
- spentityid = "urn:mace:umu.se:saml:roland:sp",
+ spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
@@ -301,7 +317,7 @@ class TestServer():
except OtherError, oe:
print oe.args
status = utils.make_instance(samlp.Status,
- self.server.status_from_exception(oe))
+ server.kd_status_from_exception(oe))
assert status
print status
@@ -314,28 +330,31 @@ class TestServer():
assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self):
- authn_request = client.d_authn_request(
+ sc = client.Saml2Client({},None)
+ authn_request = sc.authn_request(
query_id = "1",
destination = "http://www.example.com",
service_url = "http://localhost:8087/",
- spentityid = "urn:mace:umu.se:saml:roland:sp",
+ spentityid = "urn:mace:example.com:saml:roland:sp",
my_name = "My real name",
)
+ print authn_request
intermed = utils.deflate_and_base64_encode("%s" % authn_request)
- (consumer_url, id, name_id_policies,
+ (consumer_url, id, name_id_policy,
sp) = self.server.parse_authn_request(intermed)
assert consumer_url == "http://localhost:8087/"
assert id == "1"
- assert name_id_policies == saml.NAMEID_FORMAT_TRANSIENT
- assert sp == "urn:mace:umu.se:saml:roland:sp"
+ assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
+ assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
+ assert sp == "urn:mace:example.com:saml:roland:sp"
def test_sso_response(self):
resp = self.server.do_sso_response(
"http://localhost:8087/", # consumer_url
"12", # in_response_to
- "urn:mace:umu.se:saml:roland:sp", # sp_entity_id
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
{("urn:oid:1.3.6.1.4.1.5923.1.1.1.7",
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
"eduPersonEntitlement"):"Jeter"}
@@ -344,7 +363,7 @@ class TestServer():
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'assertion',
'in_response_to', 'issue_instant',
- 'version', 'id'])
+ 'version', 'id', 'issuer'])
assert resp.destination == "http://localhost:8087/"
assert resp.in_response_to == "12"
assert resp.status
@@ -363,3 +382,12 @@ class TestServer():
print confirmation.subject_confirmation_data
assert confirmation.subject_confirmation_data.in_response_to == "12"
+ def test_persistence_0(self):
+ pid1 = self.server.persistent_id(
+ "urn:mace:example.com:saml:roland:sp", "jeter")
+
+ pid2 = self.server.persistent_id(
+ "urn:mace:example.com:saml:roland:sp", "jeter")
+
+ print pid1, pid2
+ assert pid1 == pid2
diff --git a/tools/make_metadata.py b/tools/make_metadata.py
index 1d79b19e..a20169d2 100755
--- a/tools/make_metadata.py
+++ b/tools/make_metadata.py
@@ -1,15 +1,68 @@
#!/usr/bin/env python
import os
from saml2 import utils, md, samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
+from saml2 import BINDING_SOAP
from saml2.time_util import in_a_while
+def do_sp_sso_descriptor(sp, cert):
+ return {
+ "protocol_support_enumeration": samlp.NAMESPACE,
+ "want_assertions_signed": True,
+ "authn_requests_signed": False,
+ "assertion_consumer_service": {
+ "binding": BINDING_HTTP_POST ,
+ "location": sp["url"],
+ "index": 0,
+ },
+ "key_descriptor":{
+ "key_info": {
+ "x509_data": {
+ "x509_certificate": cert
+ }
+ }
+ },
+ }
+
+def do_idp_sso_descriptor(idp, cert):
+ return {
+ "protocol_support_enumeration": samlp.NAMESPACE,
+ "want_authn_requests_signed": True,
+ "single_sign_on_service": {
+ "binding": BINDING_HTTP_REDIRECT ,
+ "location": idp["url"],
+ },
+ "key_descriptor":{
+ "key_info": {
+ "x509_data": {
+ "x509_certificate": cert
+ }
+ }
+ },
+ }
+
+def do_aa_descriptor(aa, cert):
+ return {
+ "protocol_support_enumeration": samlp.NAMESPACE,
+ "attribute_service": {
+ "binding": BINDING_SOAP ,
+ "location": aa["url"],
+ },
+ "key_descriptor":{
+ "key_info": {
+ "x509_data": {
+ "x509_certificate": cert
+ }
+ }
+ },
+ }
+
def entity_descriptor(confd):
mycert = "".join(open(confd["cert_file"]).readlines()[1:-1])
ed = {
"name": "http://%s/saml/test" % os.uname()[1],
- "valid_until": in_a_while(days=30),
+ "valid_until": in_a_while(hours=96),
"entity_id": confd["entityid"],
}
@@ -33,46 +86,21 @@ def entity_descriptor(confd):
if "sp" in confd["service"]:
# The SP
- ed["sp_sso_descriptor"] = {
- "protocol_support_enumeration": samlp.NAMESPACE,
- "want_assertions_signed": True,
- "authn_requests_signed": False,
- "assertion_consumer_service": {
- "binding": BINDING_HTTP_POST ,
- "location": confd["service_url"],
- "index": 0,
- },
- "key_descriptor":{
- "key_info": {
- "x509_data": {
- "x509_certificate": mycert
- }
- }
- },
- }
- elif "idp" in confd["service"]:
- ed["idp_sso_descriptor"] = {
- "protocol_support_enumeration": samlp.NAMESPACE,
- "want_authn_requests_signed": True,
- "single_sign_on_service": {
- "binding": BINDING_HTTP_REDIRECT ,
- "location": confd["service_url"],
- },
- "key_descriptor":{
- "key_info": {
- "x509_data": {
- "x509_certificate": mycert
- }
- }
- },
- }
+ ed["sp_sso_descriptor"] = do_sp_sso_descriptor(confd["service"]["sp"],
+ mycert)
+ if "idp" in confd["service"]:
+ ed["idp_sso_descriptor"] = do_idp_sso_descriptor(
+ confd["service"]["idp"],mycert)
+ if "aa" in confd["service"]:
+ ed["attribute_authority_descriptor"] = do_aa_descriptor(
+ confd["service"]["aa"],mycert)
return ed
def entities_descriptor(eds):
return utils.make_instance(md.EntitiesDescriptor,{
"name": "urn:mace:umu.se:saml:test",
- "valid_until": in_a_while(30),
+ "valid_until": in_a_while(hours=96),
"entity_descriptor": eds})
if __name__ == "__main__":