summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xexample/idp2/idp.py7
-rw-r--r--example/idp2/idp_conf.py4
-rwxr-xr-xexample/sp/sp.py37
-rw-r--r--example/sp/sp.xml91
-rw-r--r--example/sp/sp_conf.py10
-rw-r--r--src/saml2/attribute_resolver.py17
-rw-r--r--src/saml2/cache.py77
-rw-r--r--src/saml2/client.py89
-rw-r--r--src/saml2/client_base.py112
-rw-r--r--src/saml2/httpbase.py4
-rw-r--r--src/saml2/ident.py46
-rw-r--r--src/saml2/pack.py8
-rw-r--r--src/saml2/population.py54
-rw-r--r--src/saml2/response.py115
-rw-r--r--src/saml2/virtual_org.py40
-rw-r--r--tests/fakeIDP.py2
-rw-r--r--tests/test_32_cache.py59
-rw-r--r--tests/test_34_population.py82
-rw-r--r--tests/test_50_server.py56
-rw-r--r--tests/test_51_client.py247
-rw-r--r--tests/test_62_vo.py46
21 files changed, 514 insertions, 689 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py
index 3e5662de..e78cee96 100755
--- a/example/idp2/idp.py
+++ b/example/idp2/idp.py
@@ -31,7 +31,6 @@ from saml2.saml import AUTHN_PASSWORD
logger = logging.getLogger("saml2.idp")
-
def _expiration(timeout, tformat="%a, %d-%b-%Y %H:%M:%S GMT"):
"""
@@ -143,7 +142,9 @@ class Service(object):
"""
Single log out using HTTP_SOAP binding
"""
+ logger.debug("- SOAP -")
_dict = self.unpack_soap()
+ logger.debug("_dict: %s" % _dict)
return self.operation(_dict, BINDING_SOAP)
def uri(self):
@@ -424,7 +425,9 @@ class SLO(Service):
def do(self, request, binding, relay_state=""):
logger.info("--- Single Log Out Service ---")
try:
- req_info = IDP.parse_logout_request(request, binding)
+ _, body = request.split("\n")
+ logger.debug("req: '%s'" % body)
+ req_info = IDP.parse_logout_request(body, binding)
except Exception, exc:
logger.error("Bad request: %s" % exc)
resp = BadRequest("%s" % exc)
diff --git a/example/idp2/idp_conf.py b/example/idp2/idp_conf.py
index bced5766..4a291164 100644
--- a/example/idp2/idp_conf.py
+++ b/example/idp2/idp_conf.py
@@ -79,7 +79,7 @@ CONFIG={
"name_form": NAME_FORMAT_URI
},
},
- "subject_data": "./idp.subject.db",
+ "subject_data": "./idp.subject",
"name_id_format": [NAMEID_FORMAT_TRANSIENT,
NAMEID_FORMAT_PERSISTENT]
},
@@ -88,7 +88,7 @@ CONFIG={
"key_file" : "pki/mykey.pem",
"cert_file" : "pki/mycert.pem",
"metadata" : {
- "local": ["../sp.xml"],
+ "local": ["../sp/sp.xml"],
},
"organization": {
"display_name": "Rolands Identiteter",
diff --git a/example/sp/sp.py b/example/sp/sp.py
index e2683949..d57a1d63 100755
--- a/example/sp/sp.py
+++ b/example/sp/sp.py
@@ -14,6 +14,8 @@ from saml2.httputil import Redirect
logger = logging.getLogger("saml2.SP")
# -----------------------------------------------------------------------------
+
+
def dict_to_table(ava, lev=0, width=1):
txt = ['<table border=%s bordercolor="black">\n' % width]
for prop, valarr in ava.items():
@@ -29,12 +31,12 @@ def dict_to_table(ava, lev=0, width=1):
n = len(valarr)
for val in valarr:
if not i:
- txt.append("<th rowspan=%d>%s</td>\n" % (len(valarr),prop))
+ txt.append("<th rowspan=%d>%s</td>\n" % (len(valarr), prop))
else:
txt.append("<tr>\n")
if isinstance(val, dict):
txt.append("<td>\n")
- txt.extend(dict_to_table(val, lev+1, width-1))
+ txt.extend(dict_to_table(val, lev + 1, width - 1))
txt.append("</td>\n")
else:
try:
@@ -48,12 +50,13 @@ def dict_to_table(ava, lev=0, width=1):
elif isinstance(valarr, dict):
txt.append("<th>%s</th>\n" % prop)
txt.append("<td>\n")
- txt.extend(dict_to_table(valarr, lev+1, width-1))
+ txt.extend(dict_to_table(valarr, lev + 1, width - 1))
txt.append("</td>\n")
txt.append("</tr>\n")
txt.append('</table>\n')
return txt
+
def _expiration(timeout, tformat=None):
if timeout == "now":
return time_util.instant(tformat)
@@ -61,6 +64,7 @@ def _expiration(timeout, tformat=None):
# validity time should match lifetime of assertions
return time_util.in_a_while(minutes=timeout, format=tformat)
+
def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '')
if kaka:
@@ -68,13 +72,14 @@ def delete_cookie(environ, name):
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = morsel
- cookie[name]["expires"] =\
- _expiration("now", "%a, %d-%b-%Y %H:%M:%S CET")
+ cookie[name]["expires"] = _expiration("now",
+ "%a, %d-%b-%Y %H:%M:%S CET")
return tuple(cookie.output().split(": ", 1))
return None
# ----------------------------------------------------------------------------
+
#noinspection PyUnusedLocal
def whoami(environ, start_response, user):
identity = environ["repoze.who.identity"]["user"]
@@ -86,17 +91,20 @@ def whoami(environ, start_response, user):
resp = Response(response)
return resp(environ, start_response)
+
#noinspection PyUnusedLocal
def not_found(environ, start_response):
"""Called if no URL matches."""
resp = NotFound('Not Found')
return resp(environ, start_response)
+
#noinspection PyUnusedLocal
def not_authn(environ, start_response):
resp = Unauthorized('Unknown user')
return resp(environ, start_response)
+
#noinspection PyUnusedLocal
def slo(environ, start_response, user):
# so here I might get either a LogoutResponse or a LogoutRequest
@@ -107,8 +115,8 @@ def slo(environ, start_response, user):
query = parse_qs(environ["QUERY_STRING"])
logger.info("query: %s" % query)
try:
- response = sc.parse_logout_request_response(query["SAMLResponse"][0],
- binding=BINDING_HTTP_REDIRECT)
+ response = sc.parse_logout_request_response(
+ query["SAMLResponse"][0], binding=BINDING_HTTP_REDIRECT)
if response:
logger.info("LOGOUT response parsed OK")
except KeyError:
@@ -125,6 +133,7 @@ def slo(environ, start_response, user):
resp = Redirect("Successful Logout", headers=headers)
return resp(environ, start_response)
+
#noinspection PyUnusedLocal
def logout(environ, start_response, user):
# This is where it starts when a user wants to log out
@@ -150,6 +159,7 @@ def logout(environ, start_response, user):
# start_response("500 Internal Server Error")
# return []
+
#noinspection PyUnusedLocal
def done(environ, start_response, user):
# remove cookie and stored info
@@ -157,7 +167,7 @@ def done(environ, start_response, user):
subject_id = environ["repoze.who.identity"]['repoze.who.userid']
client = environ['repoze.who.plugins']["saml2auth"]
logger.info("[logout done] remaining subjects: %s" % (
- client.saml_client.users.subjects(),))
+ client.saml_client.users.subjects(),))
start_response('200 OK', [('Content-Type', 'text/html')])
return ["<h3>You are now logged out from this service</h3>"]
@@ -175,6 +185,7 @@ urls = [
# ----------------------------------------------------------------------------
+
def application(environ, start_response):
"""
The main WSGI application. Dispatch the current request to
@@ -207,21 +218,23 @@ def application(environ, start_response):
environ['myapp.url_args'] = path
return callback(environ, start_response, user)
else:
- return not_authn(environ, start_response)
+ return not_authn(environ, start_response)
+
return not_found(environ, start_response)
# ----------------------------------------------------------------------------
from repoze.who.config import make_middleware_with_config
-app_with_auth = make_middleware_with_config(application, {"here":"."},
- './who.ini', log_file="repoze_who.log")
+app_with_auth = make_middleware_with_config(application, {"here": "."},
+ './who.ini',
+ log_file="repoze_who.log")
# ----------------------------------------------------------------------------
PORT = 8087
if __name__ == '__main__':
from wsgiref.simple_server import make_server
- srv = make_server('localhost', PORT, app_with_auth)
+ srv = make_server('', PORT, app_with_auth)
print "SP listening on port: %s" % PORT
srv.serve_forever() \ No newline at end of file
diff --git a/example/sp/sp.xml b/example/sp/sp.xml
index c4a0a4b8..1d13fa78 100644
--- a/example/sp/sp.xml
+++ b/example/sp/sp.xml
@@ -1,75 +1,18 @@
<?xml version='1.0' encoding='UTF-8'?>
-<ns0:EntitiesDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata"
- xmlns:ns1="http://www.w3.org/2000/09/xmldsig#">
- <ns0:EntityDescriptor entityID="urn:mace:umu.se:saml:roland:sp">
- <ns0:SPSSODescriptor AuthnRequestsSigned="false"
- WantAssertionsSigned="true"
- protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol">
- <ns0:KeyDescriptor>
- <ns1:KeyInfo>
- <ns1:X509Data>
- <ns1:X509Certificate>
- MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
- BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
- EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
- MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
- YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
- DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
- bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
- FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
- mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
- BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
- o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
- BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
- AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
- BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
- zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
- +vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
- </ns1:X509Certificate>
- </ns1:X509Data>
- </ns1:KeyInfo>
- </ns0:KeyDescriptor>
- <ns0:SingleLogoutService
- Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
- Location="http://localhost:8087/slo"/>
- <ns0:AssertionConsumerService
- Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
- Location="http://localhost:8087/" index="1"/>
- <ns0:AttributeConsumingService index="1">
- <ns0:ServiceName xml:lang="en">Rolands SP</ns0:ServiceName>
- <ns0:ServiceDescription xml:lang="en">My SP
- </ns0:ServiceDescription>
- <ns0:RequestedAttribute FriendlyName="surname"
- Name="urn:oid:2.5.4.4"
- NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
- isRequired="true"/>
- <ns0:RequestedAttribute FriendlyName="givenname"
- Name="urn:oid:2.5.4.42"
- NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
- isRequired="true"/>
- <ns0:RequestedAttribute Name="edupersonaffiliation"
- NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
- isRequired="true"/>
- <ns0:RequestedAttribute FriendlyName="title"
- Name="urn:oid:2.5.4.12"
- NameFormat="urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
- isRequired="false"/>
- </ns0:AttributeConsumingService>
- </ns0:SPSSODescriptor>
- <ns0:Organization>
- <ns0:OrganizationName xml:lang="en">Exempel AB
- </ns0:OrganizationName>
- <ns0:OrganizationDisplayName xml:lang="se">Exempel AB
- </ns0:OrganizationDisplayName>
- <ns0:OrganizationDisplayName xml:lang="en">Example Co.
- </ns0:OrganizationDisplayName>
- <ns0:OrganizationURL xml:lang="en">http://www.example.com/roland
- </ns0:OrganizationURL>
- </ns0:Organization>
- <ns0:ContactPerson contactType="technical">
- <ns0:GivenName>John</ns0:GivenName>
- <ns0:SurName>Smith</ns0:SurName>
- <ns0:EmailAddress>john.smith@example.com</ns0:EmailAddress>
- </ns0:ContactPerson>
- </ns0:EntityDescriptor>
-</ns0:EntitiesDescriptor>
+<ns0:EntityDescriptor xmlns:ns0="urn:oasis:names:tc:SAML:2.0:metadata" xmlns:ns1="http://www.w3.org/2000/09/xmldsig#" entityID="http://localhost:8087/sp.xml"><ns0:SPSSODescriptor AuthnRequestsSigned="false" WantAssertionsSigned="true" protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"><ns0:KeyDescriptor use="signing"><ns1:KeyInfo><ns1:X509Data><ns1:X509Certificate>MIIC8jCCAlugAwIBAgIJAJHg2V5J31I8MA0GCSqGSIb3DQEBBQUAMFoxCzAJBgNV
+BAYTAlNFMQ0wCwYDVQQHEwRVbWVhMRgwFgYDVQQKEw9VbWVhIFVuaXZlcnNpdHkx
+EDAOBgNVBAsTB0lUIFVuaXQxEDAOBgNVBAMTB1Rlc3QgU1AwHhcNMDkxMDI2MTMz
+MTE1WhcNMTAxMDI2MTMzMTE1WjBaMQswCQYDVQQGEwJTRTENMAsGA1UEBxMEVW1l
+YTEYMBYGA1UEChMPVW1lYSBVbml2ZXJzaXR5MRAwDgYDVQQLEwdJVCBVbml0MRAw
+DgYDVQQDEwdUZXN0IFNQMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDkJWP7
+bwOxtH+E15VTaulNzVQ/0cSbM5G7abqeqSNSs0l0veHr6/ROgW96ZeQ57fzVy2MC
+FiQRw2fzBs0n7leEmDJyVVtBTavYlhAVXDNa3stgvh43qCfLx+clUlOvtnsoMiiR
+mo7qf0BoPKTj7c0uLKpDpEbAHQT4OF1HRYVxMwIDAQABo4G/MIG8MB0GA1UdDgQW
+BBQ7RgbMJFDGRBu9o3tDQDuSoBy7JjCBjAYDVR0jBIGEMIGBgBQ7RgbMJFDGRBu9
+o3tDQDuSoBy7JqFepFwwWjELMAkGA1UEBhMCU0UxDTALBgNVBAcTBFVtZWExGDAW
+BgNVBAoTD1VtZWEgVW5pdmVyc2l0eTEQMA4GA1UECxMHSVQgVW5pdDEQMA4GA1UE
+AxMHVGVzdCBTUIIJAJHg2V5J31I8MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEF
+BQADgYEAMuRwwXRnsiyWzmRikpwinnhTmbooKm5TINPE7A7gSQ710RxioQePPhZO
+zkM27NnHTrCe2rBVg0EGz7QTd1JIwLPvgoj4VTi/fSha/tXrYUaqc9AqU1kWI4WN
++vffBGQ09mo+6CffuFTZYeOhzP/2stAPwCTU4kxEoiy0KpZMANI=
+</ns1:X509Certificate></ns1:X509Data></ns1:KeyInfo></ns0:KeyDescriptor><ns0:SingleLogoutService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" Location="http://localhost:8087/slo" /><ns0:AssertionConsumerService Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" Location="http://localhost:8087" index="1" /></ns0:SPSSODescriptor><ns0:Organization><ns0:OrganizationName xml:lang="en">Exempel AB</ns0:OrganizationName><ns0:OrganizationDisplayName xml:lang="se">Exempel AB</ns0:OrganizationDisplayName><ns0:OrganizationDisplayName xml:lang="en">Example Co.</ns0:OrganizationDisplayName><ns0:OrganizationURL xml:lang="en">http://www.example.com/roland</ns0:OrganizationURL></ns0:Organization><ns0:ContactPerson contactType="technical"><ns0:GivenName>John</ns0:GivenName><ns0:SurName>Smith</ns0:SurName><ns0:EmailAddress>john.smith@example.com</ns0:EmailAddress></ns0:ContactPerson></ns0:EntityDescriptor>
diff --git a/example/sp/sp_conf.py b/example/sp/sp_conf.py
index faebdf87..84115996 100644
--- a/example/sp/sp_conf.py
+++ b/example/sp/sp_conf.py
@@ -1,23 +1,23 @@
from saml2 import BINDING_HTTP_REDIRECT
from saml2.saml import NAME_FORMAT_URI
-BASE= "http://localhost:8087/"
+BASE= "http://localhost:8087"
+#BASE= "http://lingon.catalogix.se:8087"
CONFIG = {
- "entityid" : "urn:mace:umu.se:saml:roland:sp",
+ "entityid" : "%s/sp.xml" % BASE,
"description": "My SP",
"service": {
"sp":{
"name" : "Rolands SP",
"endpoints":{
"assertion_consumer_service": [BASE],
- "single_logout_service" : [(BASE+"slo",
+ "single_logout_service" : [(BASE+"/slo",
BINDING_HTTP_REDIRECT)],
},
"required_attributes": ["surname", "givenname",
"edupersonaffiliation"],
"optional_attributes": ["title"],
- "idp": [ "urn:mace:umu.se:saml:roland:idp"],
}
},
"debug" : 1,
@@ -25,7 +25,7 @@ CONFIG = {
"cert_file" : "pki/mycert.pem",
"attribute_map_dir" : "./attributemaps",
"metadata" : {
- "local": ["../idp/idp.xml"],
+ "local": ["../idp2/idp.xml"],
},
# -- below used by make_metadata --
"organization": {
diff --git a/src/saml2/attribute_resolver.py b/src/saml2/attribute_resolver.py
index 06dbf125..dab809ce 100644
--- a/src/saml2/attribute_resolver.py
+++ b/src/saml2/attribute_resolver.py
@@ -35,16 +35,13 @@ class AttributeResolver(object):
self.saml2client = saml2client
self.metadata = saml2client.config.metadata
- def extend(self, subject_id, issuer, vo_members, name_id_format=None,
- sp_name_qualifier=None, real_id=None):
+ def extend(self, name_id, issuer, vo_members):
"""
- :param subject_id: The identifier by which the subject is know
+ :param name_id: The identifier by which the subject is know
among all the participents of the VO
:param issuer: Who am I the poses the query
:param vo_members: The entity IDs of the IdP who I'm going to ask
for extra attributes
- :param name_id_format: Used to make the IdPs aware of what's going
- on here
:return: A dictionary with all the collected information about the
subject
"""
@@ -53,17 +50,13 @@ class AttributeResolver(object):
for ass in self.metadata.attribute_consuming_service(member):
for attr_serv in ass.attribute_service:
logger.info(
- "Send attribute request to %s" % attr_serv.location)
+ "Send attribute request to %s" % attr_serv.location)
if attr_serv.binding != BINDING_SOAP:
continue
# attribute query assumes SOAP binding
session_info = self.saml2client.attribute_query(
- subject_id,
- attr_serv.location,
- issuer_id=issuer,
- sp_name_qualifier=sp_name_qualifier,
- nameid_format=name_id_format,
- real_id=real_id)
+ name_id, attr_serv.location, issuer_id=issuer,
+)
if session_info:
result.append(session_info)
return result
diff --git a/src/saml2/cache.py b/src/saml2/cache.py
index fc7b57c4..a128da3f 100644
--- a/src/saml2/cache.py
+++ b/src/saml2/cache.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import shelve
+from saml2.ident import code, decode
from saml2 import time_util
import logging
@@ -10,12 +11,15 @@ logger = logging.getLogger(__name__)
# gathered from several different sources, all with their own
# timeout time.
+
class ToOld(Exception):
pass
+
class CacheError(Exception):
pass
+
class Cache(object):
def __init__(self, filename=None):
if filename:
@@ -25,18 +29,25 @@ class Cache(object):
self._db = {}
self._sync = False
- def delete(self, subject_id):
- del self._db[subject_id]
+ def delete(self, name_id):
+ """
+
+ :param name_id: The subject identifier, a NameID instance
+ """
+ del self._db[code(name_id)]
if self._sync:
- self._db.sync()
+ try:
+ self._db.sync()
+ except AttributeError:
+ pass
- def get_identity(self, subject_id, entities=None,
+ def get_identity(self, name_id, entities=None,
check_not_on_or_after=True):
""" Get all the identity information that has been received and
are still valid about the subject.
- :param subject_id: The identifier of the subject
+ :param name_id: The subject identifier, a NameID instance
:param entities: The identifiers of the entities whoes assertions are
interesting. If the list is empty all entities are interesting.
:return: A 2-tuple consisting of the identity information (a
@@ -45,7 +56,8 @@ class Cache(object):
"""
if not entities:
try:
- entities = self._db[subject_id].keys()
+ cni = code(name_id)
+ entities = self._db[cni].keys()
except KeyError:
return {}, []
@@ -53,7 +65,7 @@ class Cache(object):
oldees = []
for entity_id in entities:
try:
- info = self.get(subject_id, entity_id, check_not_on_or_after)
+ info = self.get(name_id, entity_id, check_not_on_or_after)
except ToOld:
oldees.append(entity_id)
continue
@@ -70,74 +82,81 @@ class Cache(object):
res[key] = vals
return res, oldees
- def get(self, subject_id, entity_id, check_not_on_or_after=True):
+ def get(self, name_id, entity_id, check_not_on_or_after=True):
""" Get session information about a subject gotten from a
specified IdP/AA.
- :param subject_id: The identifier of the subject
+ :param name_id: The subject identifier, a NameID instance
:param entity_id: The identifier of the entity_id
:param check_not_on_or_after: if True it will check if this
subject is still valid or if it is too old. Otherwise it
will not check this. True by default.
:return: The session information
"""
- (timestamp, info) = self._db[subject_id][entity_id]
+ cni = code(name_id)
+ (timestamp, info) = self._db[cni][entity_id]
if check_not_on_or_after and time_util.after(timestamp):
raise ToOld("past %s" % timestamp)
return info or None
- def set(self, subject_id, entity_id, info, not_on_or_after=0):
- """ Stores session information in the cache. Assumes that the subject_id
+ def set(self, name_id, entity_id, info, not_on_or_after=0):
+ """ Stores session information in the cache. Assumes that the name_id
is unique within the context of the Service Provider.
- :param subject_id: The subject identifier
+ :param name_id: The subject identifier, a NameID instance
:param entity_id: The identifier of the entity_id/receiver of an
assertion
:param info: The session info, the assertion is part of this
:param not_on_or_after: A time after which the assertion is not valid.
"""
- if subject_id not in self._db:
- self._db[subject_id] = {}
+ cni = code(name_id)
+ if cni not in self._db:
+ self._db[cni] = {}
- self._db[subject_id][entity_id] = (not_on_or_after, info)
+ self._db[cni][entity_id] = (not_on_or_after, info)
if self._sync:
- self._db.sync()
+ try:
+ self._db.sync()
+ except AttributeError:
+ pass
- def reset(self, subject_id, entity_id):
+ def reset(self, name_id, entity_id):
""" Scrap the assertions received from a IdP or an AA about a special
subject.
- :param subject_id: The subjects identifier
+ :param name_id: The subject identifier, a NameID instance
:param entity_id: The identifier of the entity_id of the assertion
:return:
"""
- self.set(subject_id, entity_id, {}, 0)
+ self.set(name_id, entity_id, {}, 0)
- def entities(self, subject_id):
+ def entities(self, name_id):
""" Returns all the entities of assertions for a subject, disregarding
whether the assertion still is valid or not.
- :param subject_id: The identifier of the subject
+ :param name_id: The subject identifier, a NameID instance
:return: A possibly empty list of entity identifiers
"""
- return self._db[subject_id].keys()
+ cni = code(name_id)
+ return self._db[cni].keys()
- def receivers(self, subject_id):
+ def receivers(self, name_id):
""" Another name for entities() just to make it more logic in the IdP
scenario """
- return self.entities(subject_id)
+ return self.entities(name_id)
- def active(self, subject_id, entity_id):
+ def active(self, name_id, entity_id):
""" Returns the status of assertions from a specific entity_id.
- :param subject_id: The ID of the subject
+ :param name_id: The ID of the subject
:param entity_id: The entity ID of the entity_id of the assertion
:return: True or False depending on if the assertion is still
valid or not.
"""
try:
- (timestamp, info) = self._db[subject_id][entity_id]
+ cni = code(name_id)
+ (timestamp, info) = self._db[cni][entity_id]
except KeyError:
return False
@@ -151,4 +170,4 @@ class Cache(object):
:return: list of subject identifiers
"""
- return self._db.keys()
+ return [decode(c) for c in self._db.keys()]
diff --git a/src/saml2/client.py b/src/saml2/client.py
index 0e23fef2..f7a79395 100644
--- a/src/saml2/client.py
+++ b/src/saml2/client.py
@@ -20,7 +20,6 @@ to conclude its tasks.
"""
from saml2.httpbase import HTTPError
from saml2.s_utils import sid
-from saml2.samlp import logout_response_from_string
import saml2
try:
@@ -46,6 +45,7 @@ from saml2 import BINDING_SOAP
import logging
logger = logging.getLogger(__name__)
+
class Saml2Client(Base):
""" The basic pySAML2 service provider class """
@@ -81,12 +81,12 @@ class Saml2Client(Base):
return req.id, info
- def global_logout(self, subject_id, reason="", expire=None, sign=None):
+ def global_logout(self, name_id, reason="", expire=None, sign=None):
""" More or less a layer of indirection :-/
Bootstrapping the whole thing by finding all the IdPs that should
be notified.
- :param subject_id: The identifier of the subject that wants to be
+ :param name_id: The identifier of the subject that wants to be
logged out.
:param reason: Why the subject wants to log out
:param expire: The latest the log out should happen.
@@ -99,17 +99,17 @@ class Saml2Client(Base):
conversation.
"""
- logger.info("logout request for: %s" % subject_id)
+ logger.info("logout request for: %s" % name_id)
# find out which IdPs/AAs I should notify
- entity_ids = self.users.issuers_of_info(subject_id)
+ entity_ids = self.users.issuers_of_info(name_id)
- return self.do_logout(subject_id, entity_ids, reason, expire, sign)
+ return self.do_logout(name_id, entity_ids, reason, expire, sign)
- def do_logout(self, subject_id, entity_ids, reason, expire, sign=None):
+ def do_logout(self, name_id, entity_ids, reason, expire, sign=None):
"""
- :param subject_id: Identifier of the Subject
+ :param name_id: Identifier of the Subject a NameID instance
:param entity_ids: List of entity ids for the IdPs that have provided
information concerning the subject
:param reason: The reason for doing the logout
@@ -118,34 +118,33 @@ class Saml2Client(Base):
:return:
"""
# check time
- if not not_on_or_after(expire): # I've run out of time
+ if not not_on_or_after(expire): # I've run out of time
# Do the local logout anyway
- self.local_logout(subject_id)
+ self.local_logout(name_id)
return 0, "504 Gateway Timeout", [], []
- # for all where I can use the SOAP binding, do those first
not_done = entity_ids[:]
responses = {}
for entity_id in entity_ids:
- response = False
-
- for binding in [BINDING_SOAP,
- BINDING_HTTP_POST,
+ logger.debug("Logout from '%s'" % entity_id)
+ # for all where I can use the SOAP binding, do those first
+ for binding in [BINDING_SOAP, BINDING_HTTP_POST,
BINDING_HTTP_REDIRECT]:
srvs = self.metadata.single_logout_service(entity_id, binding,
"idpsso")
if not srvs:
+ logger.debug("No SLO '%s' service" % binding)
continue
destination = destinations(srvs)[0]
-
logger.info("destination to provider: %s" % destination)
request = self.create_logout_request(destination, entity_id,
- subject_id, reason=reason,
+ name_id=name_id,
+ reason=reason,
expire=expire)
- to_sign = []
+ #to_sign = []
if binding.startswith("http://"):
sign = True
@@ -160,28 +159,28 @@ class Saml2Client(Base):
relay_state = self._relay_state(request.id)
http_info = self.apply_binding(binding, srequest, destination,
- relay_state)
+ relay_state)
if binding == BINDING_SOAP:
- if response:
- logger.info("Verifying response")
- response = self.send(**http_info)
+ response = self.send(**http_info)
- if response:
+ if response and response.status_code == 200:
not_done.remove(entity_id)
- logger.info("OK response from %s" % destination)
- responses[entity_id] = logout_response_from_string(response)
+ response = response.text
+ logger.info("Response: %s" % response)
+ res = self.parse_logout_request_response(response)
+ responses[entity_id] = res
else:
logger.info("NOT OK response from %s" % destination)
else:
self.state[request.id] = {"entity_id": entity_id,
- "operation": "SLO",
- "entity_ids": entity_ids,
- "subject_id": subject_id,
- "reason": reason,
- "not_on_of_after": expire,
- "sign": sign}
+ "operation": "SLO",
+ "entity_ids": entity_ids,
+ "name_id": name_id,
+ "reason": reason,
+ "not_on_of_after": expire,
+ "sign": sign}
responses[entity_id] = (binding, http_info)
not_done.remove(entity_id)
@@ -217,9 +216,9 @@ class Saml2Client(Base):
issuer = response.issuer()
logger.info("issuer: %s" % issuer)
del self.state[response.in_response_to]
- if status["entity_ids"] == [issuer]: # done
+ if status["entity_ids"] == [issuer]: # done
self.local_logout(status["subject_id"])
- return 0, "200 Ok", [("Content-type","text/html")], []
+ return 0, "200 Ok", [("Content-type", "text/html")], []
else:
status["entity_ids"].remove(issuer)
return self.do_logout(status["subject_id"], status["entity_ids"],
@@ -277,16 +276,15 @@ class Saml2Client(Base):
consent=None, extensions=None, sign=False):
subject = saml.Subject(
- name_id = saml.NameID(text=subject_id,
- format=nameid_format,
- sp_name_qualifier=sp_name_qualifier,
- name_qualifier=name_qualifier))
+ name_id=saml.NameID(text=subject_id, format=nameid_format,
+ sp_name_qualifier=sp_name_qualifier,
+ name_qualifier=name_qualifier))
srvs = self.metadata.authz_service(entity_id, BINDING_SOAP)
for dest in destinations(srvs):
resp = self._use_soap(dest, "authz_decision_query",
- action=action, evidence=evidence,
- resource=resource, subject=subject)
+ action=action, evidence=evidence,
+ resource=resource, subject=subject)
if resp:
return resp
@@ -308,8 +306,8 @@ class Saml2Client(Base):
for destination in destinations(srvs):
res = self._use_soap(destination, "assertion_id_request",
- assertion_id_refs=_id_refs, consent=consent,
- extensions=extensions, sign=sign)
+ assertion_id_refs=_id_refs, consent=consent,
+ extensions=extensions, sign=sign)
if res:
return res
@@ -321,9 +319,8 @@ class Saml2Client(Base):
srvs = self.metadata.authn_request_service(entity_id, BINDING_SOAP)
for destination in destinations(srvs):
- resp = self._use_soap(destination, "authn_query",
- consent=consent, extensions=extensions,
- sign=sign)
+ resp = self._use_soap(destination, "authn_query", consent=consent,
+ extensions=extensions, sign=sign)
if resp:
return resp
@@ -339,7 +336,8 @@ class Saml2Client(Base):
:param entityid: To whom the query should be sent
:param subject_id: The identifier of the subject
- :param attribute: A dictionary of attributes and values that is asked for
+ :param attribute: A dictionary of attributes and values that is
+ asked for
:param sp_name_qualifier: The unique identifier of the
service provider or affiliation of providers for whom the
identifier was generated.
@@ -353,7 +351,6 @@ class Saml2Client(Base):
HTTP args if BINDING_HTT_POST was used.
"""
-
if real_id:
response_args = {"real_id": real_id}
else:
diff --git a/src/saml2/client_base.py b/src/saml2/client_base.py
index 70e7a0c6..b00abaa1 100644
--- a/src/saml2/client_base.py
+++ b/src/saml2/client_base.py
@@ -78,23 +78,28 @@ ECP_SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
ACTOR = "http://schemas.xmlsoap.org/soap/actor/next"
MIME_PAOS = "application/vnd.paos+xml"
+
class IdpUnspecified(Exception):
pass
+
class VerifyError(Exception):
pass
+
class LogoutError(Exception):
pass
+
class NoServiceDefined(Exception):
pass
+
class Base(Entity):
""" The basic pySAML2 service provider class """
def __init__(self, config=None, identity_cache=None, state_cache=None,
- virtual_organization="",config_file=""):
+ virtual_organization="", config_file=""):
"""
:param config: A saml2.config.Config instance
:param identity_cache: Where the class should store identity information
@@ -108,12 +113,12 @@ class Base(Entity):
# for server state storage
if state_cache is None:
- self.state = {} # in memory storage
+ self.state = {} # in memory storage
else:
self.state = state_cache
for foo in ["allow_unsolicited", "authn_requests_signed",
- "logout_requests_signed"]:
+ "logout_requests_signed"]:
if self.config.getattr("sp", foo) == 'true':
setattr(self, foo, True)
else:
@@ -166,25 +171,25 @@ class Base(Entity):
# Public API
#
- def add_vo_information_about_user(self, subject_id):
+ def add_vo_information_about_user(self, name_id):
""" Add information to the knowledge I have about the user. This is
for Virtual organizations.
- :param subject_id: The subject identifier
+ :param name_id: The subject identifier
:return: A possibly extended knowledge.
"""
ava = {}
try:
- (ava, _) = self.users.get_identity(subject_id)
+ (ava, _) = self.users.get_identity(name_id)
except KeyError:
pass
# is this a Virtual Organization situation
if self.vorg:
- if self.vorg.do_aggregation(subject_id):
+ if self.vorg.do_aggregation(name_id):
# Get the extended identity
- ava = self.users.get_identity(subject_id)[0]
+ ava = self.users.get_identity(name_id)[0]
return ava
#noinspection PyUnusedLocal
@@ -228,7 +233,8 @@ class Base(Entity):
args = {}
try:
- args["assertion_consumer_service_url"] = kwargs["assertion_consumer_service_url"]
+ args["assertion_consumer_service_url"] = kwargs[
+ "assertion_consumer_service_url"]
except KeyError:
if service_url_binding is None:
service_url = self.service_url(binding)
@@ -247,16 +253,17 @@ class Base(Entity):
try:
args["name_id_policy"] = kwargs["name_id_policy"]
del kwargs["name_id_policy"]
- except:
+ except KeyError:
if allow_create:
- allow_create="true"
+ allow_create = "true"
else:
- allow_create="false"
+ allow_create = "false"
# Profile stuff, should be configurable
- if nameid_format is None or nameid_format == NAMEID_FORMAT_TRANSIENT:
- name_id_policy = samlp.NameIDPolicy(allow_create=allow_create,
- format=NAMEID_FORMAT_TRANSIENT)
+ if nameid_format is None or \
+ nameid_format == NAMEID_FORMAT_TRANSIENT:
+ name_id_policy = samlp.NameIDPolicy(
+ allow_create=allow_create, format=NAMEID_FORMAT_TRANSIENT)
else:
name_id_policy = samlp.NameIDPolicy(allow_create=allow_create,
format=nameid_format)
@@ -272,28 +279,31 @@ class Base(Entity):
if kwargs:
if extensions is None:
extensions = []
- fargs = [p for p,c,r in AuthnRequest.c_attributes.values()]
- fargs.extend([p for p,c in AuthnRequest.c_children.values()])
- for key,val in kwargs.items():
+ fargs = [p for p, c, r in AuthnRequest.c_attributes.values()]
+ fargs.extend([p for p, c in AuthnRequest.c_children.values()])
+ for key, val in kwargs.items():
if key not in fargs:
# extension elements allowed
extensions.append(saml2.element_to_extension_element(val))
else:
args[key] = val
+ try:
+ del args["id"]
+ except KeyError:
+ pass
return self._message(AuthnRequest, destination, sid, consent,
extensions, sign,
protocol_binding=binding,
scoping=scoping, **args)
-
def create_attribute_query(self, destination, name_id=None,
attribute=None, sid=0, consent=None,
extensions=None, sign=False, **kwargs):
""" Constructs an AttributeQuery
:param destination: To whom the query should be sent
- :param subject_id: The identifier of the subject
+ :param name_id: The identifier of the subject
:param attribute: A dictionary of attributes and values that is
asked for. The key are one of 4 variants:
3-tuple of name_format,name and friendly_name,
@@ -333,7 +343,7 @@ class Base(Entity):
except KeyError:
pass
- subject = saml.Subject(name_id = name_id)
+ subject = saml.Subject(name_id=name_id)
if attribute:
attribute = do_attributes(attribute)
@@ -342,11 +352,9 @@ class Base(Entity):
extensions, sign, subject=subject,
attribute=attribute)
-
# MUST use SOAP for
# AssertionIDRequest, SubjectQuery,
# AuthnQuery, AttributeQuery, or AuthzDecisionQuery
-
def create_authz_decision_query(self, destination, action,
evidence=None, resource=None, subject=None,
sid=0, consent=None, extensions=None,
@@ -369,8 +377,9 @@ class Base(Entity):
extensions, sign, action=action, evidence=evidence,
resource=resource, subject=subject)
- def create_authz_decision_query_using_assertion(self, destination, assertion,
- action=None, resource=None,
+ def create_authz_decision_query_using_assertion(self, destination,
+ assertion, action=None,
+ resource=None,
subject=None, sid=0,
consent=None,
extensions=None,
@@ -397,14 +406,10 @@ class Base(Entity):
else:
_action = None
- return self.create_authz_decision_query(destination,
- _action,
- saml.Evidence(assertion=assertion),
- resource, subject,
- sid=sid,
- consent=consent,
- extensions=extensions,
- sign=sign)
+ return self.create_authz_decision_query(
+ destination, _action, saml.Evidence(assertion=assertion),
+ resource, subject, sid=sid, consent=consent, extensions=extensions,
+ sign=sign)
def create_assertion_id_request(self, assertion_id_refs, **kwargs):
"""
@@ -442,10 +447,10 @@ class Base(Entity):
requested_authn_context=authn_context)
def create_name_id_mapping_request(self, name_id_policy,
- name_id=None, base_id=None,
- encrypted_id=None, destination=None,
- sid=0, consent=None, extensions=None,
- sign=False):
+ name_id=None, base_id=None,
+ encrypted_id=None, destination=None,
+ sid=0, consent=None, extensions=None,
+ sign=False):
"""
:param name_id_policy:
@@ -464,16 +469,17 @@ class Base(Entity):
assert name_id or base_id or encrypted_id
if name_id:
- return self._message(NameIDMappingRequest, destination, sid, consent,
- extensions, sign, name_id_policy=name_id_policy,
- name_id=name_id)
+ return self._message(NameIDMappingRequest, destination, sid,
+ consent, extensions, sign,
+ name_id_policy=name_id_policy, name_id=name_id)
elif base_id:
- return self._message(NameIDMappingRequest, destination, sid, consent,
- extensions, sign, name_id_policy=name_id_policy,
- base_id=base_id)
+ return self._message(NameIDMappingRequest, destination, sid,
+ consent, extensions, sign,
+ name_id_policy=name_id_policy, base_id=base_id)
else:
- return self._message(NameIDMappingRequest, destination, sid, consent,
- extensions, sign, name_id_policy=name_id_policy,
+ return self._message(NameIDMappingRequest, destination, sid,
+ consent, extensions, sign,
+ name_id_policy=name_id_policy,
encrypted_id=encrypted_id)
# ======== response handling ===========
@@ -549,7 +555,7 @@ class Base(Entity):
"attribute_converters": self.config.attribute_converters}
res = self._parse_response(response, AssertionIDResponse, "", binding,
- **kwargs)
+ **kwargs)
return res
# ------------------------------------------------------------------------
@@ -594,7 +600,7 @@ class Base(Entity):
#
paos_request = paos.Request(must_understand="1", actor=ACTOR,
response_consumer_url=my_url,
- service = ECP_SERVICE)
+ service=ECP_SERVICE)
# ----------------------------------------
# <ecp:RelayState>
@@ -622,16 +628,16 @@ class Base(Entity):
# SingleSignOnService
_, location = self.pick_binding("single_sign_on_service",
[_binding], entity_id=entityid)
- authn_req = self.create_authn_request(location,
- service_url_binding=BINDING_PAOS,
- **kwargs)
+ authn_req = self.create_authn_request(
+ location, service_url_binding=BINDING_PAOS, **kwargs)
# ----------------------------------------
# The SOAP envelope
# ----------------------------------------
- soap_envelope = make_soap_enveloped_saml_thingy(authn_req,[paos_request,
- relay_state])
+ soap_envelope = make_soap_enveloped_saml_thingy(authn_req,
+ [paos_request,
+ relay_state])
return authn_req.id, "%s" % soap_envelope
@@ -644,7 +650,7 @@ class Base(Entity):
_relay_state = None
for item in rdict["header"]:
if item.c_tag == "RelayState" and\
- item.c_namespace == ecp.NAMESPACE:
+ item.c_namespace == ecp.NAMESPACE:
_relay_state = item
response = self.parse_authn_request_response(rdict["body"],
diff --git a/src/saml2/httpbase.py b/src/saml2/httpbase.py
index 0eb66661..9643e839 100644
--- a/src/saml2/httpbase.py
+++ b/src/saml2/httpbase.py
@@ -140,9 +140,9 @@ class HTTPBase(object):
if morsel["max-age"]:
std_attr["expires"] = _since_epoch(morsel["max-age"])
- for att, set in PAIRS.items():
+ for att, item in PAIRS.items():
if std_attr[att]:
- std_attr[set] = True
+ std_attr[item] = True
if std_attr["domain"] and std_attr["domain"].startswith("."):
std_attr["domain_initial_dot"] = True
diff --git a/src/saml2/ident.py b/src/saml2/ident.py
index c091f286..6fabe0b8 100644
--- a/src/saml2/ident.py
+++ b/src/saml2/ident.py
@@ -19,9 +19,11 @@ logger = logging.getLogger(__name__)
ATTR = ["name_qualifier", "sp_name_qualifier", "format", "sp_provided_id",
"text"]
+
class Unknown(Exception):
pass
+
def code(item):
_res = []
i = 0
@@ -32,13 +34,15 @@ def code(item):
i += 1
return ",".join(_res)
-def decode(str):
+
+def decode(txt):
_nid = NameID()
- for part in str.split(","):
+ for part in txt.split(","):
i, val = part.split("=")
setattr(_nid, ATTR[int(i)], unquote(val))
return _nid
+
class IdentDB(object):
""" A class that handles identifiers of entities
Keeps a list of all nameIDs returned per SP
@@ -51,19 +55,19 @@ class IdentDB(object):
self.domain = domain
self.name_qualifier = name_qualifier
- def _create_id(self, format, name_qualifier="", sp_name_qualifier=""):
+ def _create_id(self, nformat, name_qualifier="", sp_name_qualifier=""):
_id = sha256(rndstr(32))
- _id.update(format)
+ _id.update(nformat)
if name_qualifier:
_id.update(name_qualifier)
if sp_name_qualifier:
_id.update(sp_name_qualifier)
return _id.hexdigest()
- def create_id(self, format, name_qualifier="", sp_name_qualifier=""):
- _id = self._create_id(format, name_qualifier, sp_name_qualifier)
+ def create_id(self, nformat, name_qualifier="", sp_name_qualifier=""):
+ _id = self._create_id(nformat, name_qualifier, sp_name_qualifier)
while _id in self.db:
- _id = self._create_id(format, name_qualifier, sp_name_qualifier)
+ _id = self._create_id(nformat, name_qualifier, sp_name_qualifier)
return _id
def store(self, ident, name_id):
@@ -92,30 +96,30 @@ class IdentDB(object):
del self.db[_cn]
- def remove_local(self, id):
- if isinstance(id, unicode):
- id = id.encode("utf-8")
+ def remove_local(self, sid):
+ if isinstance(sid, unicode):
+ sid = sid.encode("utf-8")
try:
- for val in self.db[id].split(" "):
+ for val in self.db[sid].split(" "):
try:
del self.db[val]
except KeyError:
pass
- del self.db[id]
+ del self.db[sid]
except KeyError:
pass
- def get_nameid(self, userid, format, sp_name_qualifier, name_qualifier):
- _id = self.create_id(format, name_qualifier, sp_name_qualifier)
+ def get_nameid(self, userid, nformat, sp_name_qualifier, name_qualifier):
+ _id = self.create_id(nformat, name_qualifier, sp_name_qualifier)
- if format == NAMEID_FORMAT_EMAILADDRESS:
+ if nformat == NAMEID_FORMAT_EMAILADDRESS:
if not self.domain:
raise Exception("Can't issue email nameids, unknown domain")
_id = "%s@%s" % (_id, self.domain)
- nameid = NameID(format=format, sp_name_qualifier=sp_name_qualifier,
+ nameid = NameID(format=nformat, sp_name_qualifier=sp_name_qualifier,
name_qualifier=name_qualifier, text=_id)
self.store(userid, nameid)
@@ -150,8 +154,9 @@ class IdentDB(object):
if not name_qualifier:
name_qualifier = self.name_qualifier
- return {"format":nameid_format, "sp_name_qualifier": sp_name_qualifier,
- "name_qualifier":name_qualifier}
+ return {"nformat": nameid_format,
+ "sp_name_qualifier": sp_name_qualifier,
+ "name_qualifier": name_qualifier}
def construct_nameid(self, userid, local_policy=None,
sp_name_qualifier=None, name_id_policy=None,
@@ -175,7 +180,8 @@ class IdentDB(object):
return self.get_nameid(userid, NAMEID_FORMAT_TRANSIENT,
sp_name_qualifier, name_qualifier)
- def persistent_nameid(self, userid, sp_name_qualifier="", name_qualifier=""):
+ def persistent_nameid(self, userid, sp_name_qualifier="",
+ name_qualifier=""):
nameid = self.match_local_id(userid, sp_name_qualifier, name_qualifier)
if nameid:
return nameid
@@ -194,6 +200,8 @@ class IdentDB(object):
try:
return self.db[code(name_id)]
except KeyError:
+ logger.debug("name: %s" % code(name_id))
+ logger.debug("id keys: %s" % self.db.keys())
return None
def match_local_id(self, userid, sp_name_qualifier, name_qualifier):
diff --git a/src/saml2/pack.py b/src/saml2/pack.py
index d758ce9e..a985e7a4 100644
--- a/src/saml2/pack.py
+++ b/src/saml2/pack.py
@@ -154,9 +154,11 @@ def make_soap_enveloped_saml_thingy(thingy, header_parts=None):
if isinstance(thingy, basestring):
# remove the first XML version/encoding line
+ logger.debug("thingy0: %s" % thingy)
_part = thingy.split("\n")
- thingy = _part[1]
+ thingy = "".join(_part[1:])
thingy = thingy.replace(PREFIX, "")
+ logger.debug("thingy: %s" % thingy)
_child = ElementTree.Element('')
_child.tag = '{%s}FuddleMuddle' % DUMMY_NAMESPACE
body.append(_child)
@@ -165,12 +167,12 @@ def make_soap_enveloped_saml_thingy(thingy, header_parts=None):
# find an remove the namespace definition
i = _str.find(DUMMY_NAMESPACE)
j = _str.rfind("xmlns:", 0, i)
- cut1 = _str[j:i+len(DUMMY_NAMESPACE)+1]
+ cut1 = _str[j:i + len(DUMMY_NAMESPACE) + 1]
_str = _str.replace(cut1, "")
first = _str.find("<%s:FuddleMuddle" % (cut1[6:9],))
last = _str.find(">", first+14)
cut2 = _str[first:last+1]
- return _str.replace(cut2,thingy)
+ return _str.replace(cut2, thingy)
else:
thingy.become_child_element_of(body)
return ElementTree.tostring(envelope, encoding="UTF-8")
diff --git a/src/saml2/population.py b/src/saml2/population.py
index 58ee1cd4..503375a5 100644
--- a/src/saml2/population.py
+++ b/src/saml2/population.py
@@ -3,6 +3,7 @@ from saml2.cache import Cache
logger = logging.getLogger(__name__)
+
class Population(object):
def __init__(self, cache=None):
if cache:
@@ -17,45 +18,48 @@ class Population(object):
"""If there already are information from this source in the cache
this function will overwrite that information"""
- subject_id = session_info["name_id"]
+ name_id = session_info["name_id"]
issuer = session_info["issuer"]
del session_info["issuer"]
- self.cache.set(subject_id, issuer, session_info,
- session_info["not_on_or_after"])
- return subject_id
+ self.cache.set(name_id, issuer, session_info,
+ session_info["not_on_or_after"])
+ return name_id
- def stale_sources_for_person(self, subject_id, sources=None):
- if not sources: # assume that all the members has be asked
- # once before, hence they are represented in the cache
- sources = self.cache.entities(subject_id)
- sources = [m for m in sources \
- if not self.cache.active(subject_id, m)]
+ def stale_sources_for_person(self, name_id, sources=None):
+ """
+
+ :param name_id: Identifier of the subject, a NameID instance
+ :param sources: Sources for information about the subject
+ :return:
+ """
+ if not sources: # assume that all the members has be asked
+ # once before, hence they are represented in the cache
+ sources = self.cache.entities(name_id)
+ sources = [m for m in sources if not self.cache.active(name_id, m)]
return sources
- def issuers_of_info(self, subject_id):
- return self.cache.entities(subject_id)
+ def issuers_of_info(self, name_id):
+ return self.cache.entities(name_id)
- def get_identity(self, subject_id, entities=None,
- check_not_on_or_after=True):
- return self.cache.get_identity(subject_id, entities,
- check_not_on_or_after)
+ def get_identity(self, name_id, entities=None, check_not_on_or_after=True):
+ return self.cache.get_identity(name_id, entities, check_not_on_or_after)
- def get_info_from(self, subject_id, entity_id):
- return self.cache.get(subject_id, entity_id)
+ def get_info_from(self, name_id, entity_id):
+ return self.cache.get(name_id, entity_id)
def subjects(self):
"""Returns the name id's for all the persons in the cache"""
return self.cache.subjects()
- def remove_person(self, subject_id):
- self.cache.delete(subject_id)
+ def remove_person(self, name_id):
+ self.cache.delete(name_id)
- def get_entityid(self, subject_id, source_id, check_not_on_or_after=True):
+ def get_entityid(self, name_id, source_id, check_not_on_or_after=True):
try:
- return self.cache.get(subject_id, source_id,
- check_not_on_or_after)["name_id"]
+ return self.cache.get(name_id, source_id, check_not_on_or_after)[
+ "name_id"]
except (KeyError, ValueError):
return ""
- def sources(self, subject_id):
- return self.cache.entities(subject_id)
+ def sources(self, name_id):
+ return self.cache.entities(name_id)
diff --git a/src/saml2/response.py b/src/saml2/response.py
index b82a2820..b83c97da 100644
--- a/src/saml2/response.py
+++ b/src/saml2/response.py
@@ -49,17 +49,21 @@ logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
+
class IncorrectlySigned(Exception):
pass
+
class VerificationError(Exception):
pass
# ---------------------------------------------------------------------------
+
def _dummy(_):
return None
+
def for_me(condition, myself):
# Am I among the intended audiences
for restriction in condition.audience_restriction:
@@ -72,6 +76,7 @@ def for_me(condition, myself):
return False
+
def authn_response(conf, return_addr, outstanding_queries=None, timeslack=0,
asynchop=True, allow_unsolicited=False):
sec = security_context(conf)
@@ -82,8 +87,9 @@ def authn_response(conf, return_addr, outstanding_queries=None, timeslack=0,
timeslack = 0
return AuthnResponse(sec, conf.attribute_converters, conf.entityid,
- return_addr, outstanding_queries, timeslack,
- asynchop=asynchop, allow_unsolicited=allow_unsolicited)
+ return_addr, outstanding_queries, timeslack,
+ asynchop=asynchop, allow_unsolicited=allow_unsolicited)
+
# comes in over SOAP so synchronous
def attribute_response(conf, return_addr, timeslack=0, asynchop=False,
@@ -96,8 +102,9 @@ def attribute_response(conf, return_addr, timeslack=0, asynchop=False,
timeslack = 0
return AttributeResponse(sec, conf.attribute_converters, conf.entityid,
- return_addr, timeslack, asynchop=asynchop,
- test=test)
+ return_addr, timeslack, asynchop=asynchop,
+ test=test)
+
class StatusResponse(object):
msgtype = "status_response"
@@ -111,7 +118,7 @@ class StatusResponse(object):
self.request_id = request_id
self.xmlstr = ""
- self.name_id = ""
+ self.name_id = None
self.response = None
self.not_on_or_after = 0
self.in_response_to = None
@@ -121,7 +128,7 @@ class StatusResponse(object):
def _clear(self):
self.xmlstr = ""
- self.name_id = ""
+ self.name_id = None
self.response = None
self.not_on_or_after = 0
@@ -149,9 +156,10 @@ class StatusResponse(object):
# This will check signature on Assertion which is the default
try:
self.response = self.sec.check_signature(instance)
- except SignatureError: # The response as a whole might be signed or not
- self.response = self.sec.check_signature(instance,
- samlp.NAMESPACE+":Response")
+ except SignatureError:
+ # The response as a whole might be signed or not
+ self.response = self.sec.check_signature(
+ instance, samlp.NAMESPACE + ":Response")
else:
self.not_signed = True
self.response = instance
@@ -190,9 +198,9 @@ class StatusResponse(object):
def issue_instant_ok(self):
""" Check that the response was issued at a reasonable time """
upper = time_util.shift_time(time_util.time_in_a_while(days=1),
- self.timeslack).timetuple()
+ self.timeslack).timetuple()
lower = time_util.shift_time(time_util.time_a_while_ago(days=1),
- -self.timeslack).timetuple()
+ -self.timeslack).timetuple()
# print "issue_instant: %s" % self.response.issue_instant
# print "%s < x < %s" % (lower, upper)
issued_at = str_to_time(self.response.issue_instant)
@@ -200,10 +208,9 @@ class StatusResponse(object):
def _verify(self):
if self.request_id and self.in_response_to and \
- self.in_response_to != self.request_id:
+ self.in_response_to != self.request_id:
logger.error("Not the id I expected: %s != %s" % (
- self.in_response_to,
- self.request_id))
+ self.in_response_to, self.request_id))
return None
try:
@@ -217,9 +224,9 @@ class StatusResponse(object):
if self.asynchop:
if self.response.destination and \
- self.response.destination != self.return_addr:
+ self.response.destination != self.return_addr:
logger.error("%s != %s" % (self.response.destination,
- self.return_addr))
+ self.return_addr))
return None
assert self.issue_instant_ok()
@@ -244,14 +251,17 @@ class StatusResponse(object):
def issuer(self):
return self.response.issuer.text.strip()
+
class LogoutResponse(StatusResponse):
msgtype = "logout_response"
+
def __init__(self, sec_context, return_addr=None, timeslack=0,
asynchop=True):
StatusResponse.__init__(self, sec_context, return_addr, timeslack,
asynchop=asynchop)
self.signature_check = self.sec.correctly_signed_logout_response
+
class NameIDMappingResponse(StatusResponse):
msgtype = "name_id_mapping_response"
@@ -261,6 +271,7 @@ class NameIDMappingResponse(StatusResponse):
request_id, asynchop)
self.signature_check = self.sec.correctly_signed_name_id_mapping_response
+
class ManageNameIDResponse(StatusResponse):
msgtype = "manage_name_id_response"
@@ -273,15 +284,16 @@ class ManageNameIDResponse(StatusResponse):
# ----------------------------------------------------------------------------
+
class AuthnResponse(StatusResponse):
""" This is where all the profile compliance is checked.
This one does saml2int compliance. """
msgtype = "authn_response"
- def __init__(self, sec_context, attribute_converters, entity_id,
- return_addr=None, outstanding_queries=None,
- timeslack=0, asynchop=True, allow_unsolicited=False,
- test=False):
+ def __init__(self, sec_context, attribute_converters, entity_id,
+ return_addr=None, outstanding_queries=None,
+ timeslack=0, asynchop=True, allow_unsolicited=False,
+ test=False):
StatusResponse.__init__(self, sec_context, return_addr, timeslack,
asynchop=asynchop)
@@ -335,7 +347,8 @@ class AuthnResponse(StatusResponse):
if validate_on_or_after(authn_statement.session_not_on_or_after,
self.timeslack):
self.session_not_on_or_after = calendar.timegm(
- time_util.str_to_time(authn_statement.session_not_on_or_after))
+ time_util.str_to_time(
+ authn_statement.session_not_on_or_after))
else:
return False
return True
@@ -364,8 +377,7 @@ class AuthnResponse(StatusResponse):
try:
if condition.not_on_or_after:
self.not_on_or_after = validate_on_or_after(
- condition.not_on_or_after,
- self.timeslack)
+ condition.not_on_or_after, self.timeslack)
if condition.not_before:
validate_before(condition.not_before, self.timeslack)
except Exception, excp:
@@ -375,7 +387,6 @@ class AuthnResponse(StatusResponse):
else:
self.not_on_or_after = 0
-
if not for_me(condition, self.entity_id):
if not lax:
#print condition
@@ -490,7 +501,7 @@ class AuthnResponse(StatusResponse):
pass
else:
raise ValueError("Unknown subject confirmation method: %s" % (
- subject_confirmation.method,))
+ subject_confirmation.method,))
subjconf.append(subject_confirmation)
@@ -501,7 +512,7 @@ class AuthnResponse(StatusResponse):
# The subject must contain a name_id
assert subject.name_id
- self.name_id = subject.name_id.text.strip()
+ self.name_id = subject.name_id
return self.name_id
def _assertion(self, assertion):
@@ -561,7 +572,7 @@ class AuthnResponse(StatusResponse):
def parse_assertion(self):
try:
assert len(self.response.assertion) == 1 or \
- len(self.response.encrypted_assertion) == 1
+ len(self.response.encrypted_assertion) == 1
except AssertionError:
raise Exception("No assertion part")
@@ -571,8 +582,7 @@ class AuthnResponse(StatusResponse):
else:
logger.debug("***Encrypted response***")
return self._encrypted_assertion(
- self.response.encrypted_assertion[0])
-
+ self.response.encrypted_assertion[0])
def verify(self):
""" Verify that the assertion is syntactically correct and
@@ -615,7 +625,7 @@ class AuthnResponse(StatusResponse):
return res
def authz_decision_info(self):
- res = {"permit":[], "deny": [], "indeterminate":[] }
+ res = {"permit": [], "deny": [], "indeterminate": []}
for adstat in self.assertion.authz_decision_statement:
# one of 'Permit', 'Deny', 'Indeterminate'
res[adstat.decision.text.lower()] = adstat
@@ -632,19 +642,18 @@ class AuthnResponse(StatusResponse):
nooa = self.not_on_or_after
if self.context == "AuthzQuery":
- return {"name_id": self.name_id,
- "came_from": self.came_from, "issuer": self.issuer(),
- "not_on_or_after": nooa,
+ return {"name_id": self.name_id, "came_from": self.came_from,
+ "issuer": self.issuer(), "not_on_or_after": nooa,
"authz_decision_info": self.authz_decision_info() }
else:
- return { "ava": self.ava, "name_id": self.name_id,
+ return {"ava": self.ava, "name_id": self.name_id,
"came_from": self.came_from, "issuer": self.issuer(),
- "not_on_or_after": nooa,
- "authn_info": self.authn_info() }
+ "not_on_or_after": nooa, "authn_info": self.authn_info()}
def __str__(self):
return "%s" % self.xmlstr
+
class AuthnQueryResponse(AuthnResponse):
msgtype = "authn_query_response"
@@ -659,39 +668,44 @@ class AuthnQueryResponse(AuthnResponse):
self.assertion = None
self.context = "AuthnQueryResponse"
- def condition_ok(self, lax=False): # Should I care about conditions ?
+ def condition_ok(self, lax=False): # Should I care about conditions ?
return True
+
class AttributeResponse(AuthnResponse):
msgtype = "attribute_response"
def __init__(self, sec_context, attribute_converters, entity_id,
- return_addr=None, timeslack=0, asynchop=False, test=False):
+ return_addr=None, timeslack=0, asynchop=False, test=False):
AuthnResponse.__init__(self, sec_context, attribute_converters,
- entity_id, return_addr, timeslack=timeslack,
- asynchop=asynchop, test=test)
+ entity_id, return_addr, timeslack=timeslack,
+ asynchop=asynchop, test=test)
self.entity_id = entity_id
self.attribute_converters = attribute_converters
self.assertion = None
self.context = "AttrQuery"
+
class AuthzResponse(AuthnResponse):
""" A successful response will be in the form of assertions containing
authorization decision statements."""
msgtype = "authz_decision_response"
+
def __init__(self, sec_context, attribute_converters, entity_id,
- return_addr=None, timeslack=0, asynchop=False):
+ return_addr=None, timeslack=0, asynchop=False):
AuthnResponse.__init__(self, sec_context, attribute_converters,
- entity_id, return_addr,
- timeslack=timeslack, asynchop=asynchop)
+ entity_id, return_addr, timeslack=timeslack,
+ asynchop=asynchop)
self.entity_id = entity_id
self.attribute_converters = attribute_converters
self.assertion = None
self.context = "AuthzQuery"
+
class ArtifactResponse(AuthnResponse):
msgtype = "artifact_response"
+
def __init__(self, sec_context, attribute_converters, entity_id,
return_addr=None, timeslack=0, asynchop=False, test=False):
@@ -704,10 +718,9 @@ class ArtifactResponse(AuthnResponse):
self.context = "ArtifactResolve"
-def response_factory(xmlstr, conf, return_addr=None,
- outstanding_queries=None,
- timeslack=0, decode=True, request_id=0,
- origxml=None, asynchop=True, allow_unsolicited=False):
+def response_factory(xmlstr, conf, return_addr=None, outstanding_queries=None,
+ timeslack=0, decode=True, request_id=0, origxml=None,
+ asynchop=True, allow_unsolicited=False):
sec_context = security_context(conf)
if not timeslack:
try:
@@ -723,9 +736,10 @@ def response_factory(xmlstr, conf, return_addr=None,
try:
response.loads(xmlstr, decode, origxml)
if response.response.assertion or response.response.encrypted_assertion:
- authnresp = AuthnResponse(sec_context, attribute_converters,
- entity_id, return_addr, outstanding_queries,
- timeslack, asynchop, allow_unsolicited)
+ authnresp = AuthnResponse(sec_context, attribute_converters,
+ entity_id, return_addr,
+ outstanding_queries, timeslack, asynchop,
+ allow_unsolicited)
authnresp.update(response)
return authnresp
except TypeError:
@@ -741,6 +755,7 @@ def response_factory(xmlstr, conf, return_addr=None,
# ===========================================================================
# A class of it's own
+
class AssertionIDResponse(object):
msgtype = "assertion_id_response"
diff --git a/src/saml2/virtual_org.py b/src/saml2/virtual_org.py
index 9369e602..f2ffaec1 100644
--- a/src/saml2/virtual_org.py
+++ b/src/saml2/virtual_org.py
@@ -4,9 +4,10 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT
logger = logging.getLogger(__name__)
+
class VirtualOrg(object):
def __init__(self, sp, vorg, cnf):
- self.sp = sp # The parent SP client instance
+ self.sp = sp # The parent SP client instance
self._name = vorg
self.common_identifier = cnf["common_identifier"]
try:
@@ -28,7 +29,7 @@ class VirtualOrg(object):
"""
return self.sp.config.metadata.vo_members(self._name)
- def members_to_ask(self, subject_id):
+ def members_to_ask(self, name_id):
"""Find the member of the Virtual Organization that I haven't already
spoken too
"""
@@ -40,12 +41,12 @@ class VirtualOrg(object):
# Remove the ones I have cached data from about this subject
vo_members = [m for m in vo_members if not self.sp.users.cache.active(
- subject_id, m)]
+ name_id, m)]
logger.info("VO members (not cached): %s" % vo_members)
return vo_members
- def get_common_identifier(self, subject_id):
- (ava, _) = self.sp.users.get_identity(subject_id)
+ def get_common_identifier(self, name_id):
+ (ava, _) = self.sp.users.get_identity(name_id)
if ava == {}:
return None
@@ -56,36 +57,23 @@ class VirtualOrg(object):
except KeyError:
return None
- def do_aggregation(self, subject_id):
+ def do_aggregation(self, name_id):
logger.info("** Do VO aggregation **\nSubjectID: %s, VO:%s" % (
- subject_id, self._name))
+ name_id, self._name))
- to_ask = self.members_to_ask(subject_id)
+ to_ask = self.members_to_ask(name_id)
if to_ask:
- # Find the NameIDFormat and the SPNameQualifier
- if self.nameid_format:
- name_id_format = self.nameid_format
- sp_name_qualifier = ""
- else:
- sp_name_qualifier = self._name
- name_id_format = ""
-
- com_identifier = self.get_common_identifier(subject_id)
+ com_identifier = self.get_common_identifier(name_id)
resolver = AttributeResolver(self.sp)
# extends returns a list of session_infos
- for session_info in resolver.extend(com_identifier,
- self.sp.config.entityid,
- to_ask,
- name_id_format=name_id_format,
- sp_name_qualifier=sp_name_qualifier,
- real_id=subject_id):
+ for session_info in resolver.extend(
+ com_identifier, self.sp.config.entityid, to_ask):
_ = self._cache_session(session_info)
- logger.info(">Issuers: %s" % self.sp.users.issuers_of_info(
- subject_id))
- logger.info("AVA: %s" % (self.sp.users.get_identity(subject_id),))
+ logger.info(">Issuers: %s" % self.sp.users.issuers_of_info(name_id))
+ logger.info("AVA: %s" % (self.sp.users.get_identity(name_id),))
return True
else:
diff --git a/tests/fakeIDP.py b/tests/fakeIDP.py
index 224f254c..971281cd 100644
--- a/tests/fakeIDP.py
+++ b/tests/fakeIDP.py
@@ -163,7 +163,7 @@ class FakeIDP(Server):
req = logout_request_from_string(_str)
- _resp = self.create_logout_response(req, binding)
+ _resp = self.create_logout_response(req, [binding])
if binding == BINDING_SOAP:
# SOAP packing
diff --git a/tests/test_32_cache.py b/tests/test_32_cache.py
index 9fc4c410..81a413f6 100644
--- a/tests/test_32_cache.py
+++ b/tests/test_32_cache.py
@@ -2,8 +2,10 @@
import time
import py
+from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
from saml2.cache import Cache
from saml2.time_util import in_a_while, str_to_time
+from saml2.ident import code
SESSION_INFO_PATTERN = {"ava":{}, "came from":"", "not_on_or_after":0,
"issuer":"", "session_id":-1}
@@ -11,7 +13,14 @@ SESSION_INFO_PATTERN = {"ava":{}, "came from":"", "not_on_or_after":0,
def _eq(l1,l2):
return set(l1) == set(l2)
-
+
+def nid_eq(l1, l2):
+ return _eq([code(c) for c in l1], [code(c) for c in l2])
+
+nid = [
+ NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="1234"),
+ NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="9876"),
+ NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT, text="1000")]
class TestClass:
def setup_class(self):
@@ -22,10 +31,9 @@ class TestClass:
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Derek"]}
- self.cache.set("1234", "abcd", session_info,
- not_on_or_after)
+ self.cache.set(nid[0], "abcd", session_info, not_on_or_after)
- (ava, inactive) = self.cache.get_identity("1234")
+ (ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == []
assert ava.keys() == ["givenName"]
assert ava["givenName"] == ["Derek"]
@@ -34,84 +42,83 @@ class TestClass:
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"surName":["Jeter"]}
- self.cache.set("1234", "bcde", session_info,
- not_on_or_after)
+ self.cache.set(nid[0], "bcde", session_info, not_on_or_after)
- (ava, inactive) = self.cache.get_identity("1234")
+ (ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == []
assert _eq(ava.keys(), ["givenName","surName"])
assert ava["givenName"] == ["Derek"]
assert ava["surName"] == ["Jeter"]
def test_from_one_target_source(self):
- session_info = self.cache.get("1234","bcde")
+ session_info = self.cache.get(nid[0], "bcde")
ava = session_info["ava"]
assert _eq(ava.keys(), ["surName"])
assert ava["surName"] == ["Jeter"]
- session_info = self.cache.get("1234","abcd")
+ session_info = self.cache.get(nid[0], "abcd")
ava = session_info["ava"]
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Derek"]
def test_entities(self):
- assert _eq(self.cache.entities("1234"), ["abcd", "bcde"])
+ assert _eq(self.cache.entities(nid[0]), ["abcd", "bcde"])
py.test.raises(Exception, "self.cache.entities('6666')")
def test_remove_info(self):
- self.cache.reset("1234", "bcde")
- assert self.cache.active("1234", "bcde") == False
- assert self.cache.active("1234", "abcd")
+ self.cache.reset(nid[0], "bcde")
+ assert self.cache.active(nid[0], "bcde") == False
+ assert self.cache.active(nid[0], "abcd")
- (ava, inactive) = self.cache.get_identity("1234")
+ (ava, inactive) = self.cache.get_identity(nid[0])
assert inactive == ['bcde']
assert _eq(ava.keys(), ["givenName"])
assert ava["givenName"] == ["Derek"]
def test_active(self):
- assert self.cache.active("1234", "bcde") == False
- assert self.cache.active("1234", "abcd")
+ assert self.cache.active(nid[0], "bcde") == False
+ assert self.cache.active(nid[0], "abcd")
def test_subjects(self):
- assert self.cache.subjects() == ["1234"]
+ assert nid_eq(self.cache.subjects(), [nid[0]])
def test_second_subject(self):
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Ichiro"],
"surName":["Suzuki"]}
- self.cache.set("9876", "abcd", session_info,
+ self.cache.set(nid[1], "abcd", session_info,
not_on_or_after)
- (ava, inactive) = self.cache.get_identity("9876")
+ (ava, inactive) = self.cache.get_identity(nid[1])
assert inactive == []
assert _eq(ava.keys(), ["givenName","surName"])
assert ava["givenName"] == ["Ichiro"]
assert ava["surName"] == ["Suzuki"]
- assert _eq(self.cache.subjects(), ["1234","9876"])
+ assert nid_eq(self.cache.subjects(), [nid[0], nid[1]])
def test_receivers(self):
- assert _eq(self.cache.receivers("9876"), ["abcd"])
+ assert _eq(self.cache.receivers(nid[1]), ["abcd"])
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Ichiro"],
"surName":["Suzuki"]}
- self.cache.set("9876", "bcde", session_info,
+ self.cache.set(nid[1], "bcde", session_info,
not_on_or_after)
- assert _eq(self.cache.receivers("9876"), ["abcd", "bcde"])
- assert _eq(self.cache.subjects(), ["1234","9876"])
+ assert _eq(self.cache.receivers(nid[1]), ["abcd", "bcde"])
+ assert nid_eq(self.cache.subjects(), nid[0:2])
def test_timeout(self):
not_on_or_after = str_to_time(in_a_while(seconds=1))
session_info = SESSION_INFO_PATTERN.copy()
session_info["ava"] = {"givenName":["Alex"],
"surName":["Rodriguez"]}
- self.cache.set("1000", "bcde", session_info,
+ self.cache.set(nid[2], "bcde", session_info,
not_on_or_after)
time.sleep(2)
- (ava, inactive) = self.cache.get_identity("1000")
+ (ava, inactive) = self.cache.get_identity(nid[2])
assert inactive == ["bcde"]
assert ava == {}
diff --git a/tests/test_34_population.py b/tests/test_34_population.py
index 9e03f140..a4094289 100644
--- a/tests/test_34_population.py
+++ b/tests/test_34_population.py
@@ -1,4 +1,6 @@
#!/usr/bin/env python
+from saml2.ident import code
+from saml2.saml import NAMEID_FORMAT_TRANSIENT, NameID
from saml2.population import Population
from saml2.time_util import in_a_while
@@ -6,6 +8,14 @@ from saml2.time_util import in_a_while
IDP_ONE = "urn:mace:example.com:saml:one:idp"
IDP_OTHER = "urn:mace:example.com:saml:other:idp"
+nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
+ text="123456")
+nida = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
+ text="abcdef")
+
+cnid = code(nid)
+cnida = code(nida)
+
def _eq(l1, l2):
return set(l1) == set(l2)
@@ -15,7 +25,7 @@ class TestPopulationMemoryBased():
def test_add_person(self):
session_info = {
- "name_id": "123456",
+ "name_id": nid,
"issuer": IDP_ONE,
"not_on_or_after": in_a_while(minutes=15),
"ava": {
@@ -26,34 +36,34 @@ class TestPopulationMemoryBased():
}
self.population.add_information_about_person(session_info)
- issuers = self.population.issuers_of_info("123456")
+ issuers = self.population.issuers_of_info(nid)
assert issuers == [IDP_ONE]
- subjects = self.population.subjects()
- assert subjects == ["123456"]
+ subjects = [code(c) for c in self.population.subjects()]
+ assert subjects == [cnid]
# Are any of the sources gone stale
- stales = self.population.stale_sources_for_person("123456")
+ stales = self.population.stale_sources_for_person(nid)
assert stales == []
# are any of the possible sources not used or gone stale
possible = [IDP_ONE, IDP_OTHER]
- stales = self.population.stale_sources_for_person("123456", possible)
+ stales = self.population.stale_sources_for_person(nid, possible)
assert stales == [IDP_OTHER]
- (identity, stale) = self.population.get_identity("123456")
+ (identity, stale) = self.population.get_identity(nid)
assert stale == []
assert identity == {'mail': 'anders.andersson@example.com',
'givenName': 'Anders',
'surName': 'Andersson'}
- info = self.population.get_info_from("123456", IDP_ONE)
+ info = self.population.get_info_from(nid, IDP_ONE)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
- assert info["name_id"] == '123456'
+ assert info["name_id"] == nid
assert info["ava"] == {'mail': 'anders.andersson@example.com',
'givenName': 'Anders',
'surName': 'Andersson'}
def test_extend_person(self):
session_info = {
- "name_id": "123456",
+ "name_id": nid,
"issuer": IDP_OTHER,
"not_on_or_after": in_a_while(minutes=15),
"ava": {
@@ -63,33 +73,33 @@ class TestPopulationMemoryBased():
self.population.add_information_about_person(session_info)
- issuers = self.population.issuers_of_info("123456")
+ issuers = self.population.issuers_of_info(nid)
assert _eq(issuers, [IDP_ONE, IDP_OTHER])
- subjects = self.population.subjects()
- assert subjects == ["123456"]
+ subjects = [code(c) for c in self.population.subjects()]
+ assert subjects == [cnid]
# Are any of the sources gone stale
- stales = self.population.stale_sources_for_person("123456")
+ stales = self.population.stale_sources_for_person(nid)
assert stales == []
# are any of the possible sources not used or gone stale
possible = [IDP_ONE, IDP_OTHER]
- stales = self.population.stale_sources_for_person("123456", possible)
+ stales = self.population.stale_sources_for_person(nid, possible)
assert stales == []
- (identity, stale) = self.population.get_identity("123456")
+ (identity, stale) = self.population.get_identity(nid)
assert stale == []
assert identity == {'mail': 'anders.andersson@example.com',
'givenName': 'Anders',
'surName': 'Andersson',
"eduPersonEntitlement": "Anka"}
- info = self.population.get_info_from("123456", IDP_OTHER)
+ info = self.population.get_info_from(nid, IDP_OTHER)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
- assert info["name_id"] == '123456'
+ assert info["name_id"] == nid
assert info["ava"] == {"eduPersonEntitlement": "Anka"}
def test_add_another_person(self):
session_info = {
- "name_id": "abcdef",
+ "name_id": nida,
"issuer": IDP_ONE,
"not_on_or_after": in_a_while(minutes=15),
"ava": {
@@ -100,28 +110,28 @@ class TestPopulationMemoryBased():
}
self.population.add_information_about_person(session_info)
- issuers = self.population.issuers_of_info("abcdef")
+ issuers = self.population.issuers_of_info(nida)
assert issuers == [IDP_ONE]
- subjects = self.population.subjects()
- assert _eq(subjects, ["123456", "abcdef"])
+ subjects = [code(c) for c in self.population.subjects()]
+ assert _eq(subjects, [cnid, cnida])
- stales = self.population.stale_sources_for_person("abcdef")
+ stales = self.population.stale_sources_for_person(nida)
assert stales == []
# are any of the possible sources not used or gone stale
possible = [IDP_ONE, IDP_OTHER]
- stales = self.population.stale_sources_for_person("abcdef", possible)
+ stales = self.population.stale_sources_for_person(nida, possible)
assert stales == [IDP_OTHER]
- (identity, stale) = self.population.get_identity("abcdef")
+ (identity, stale) = self.population.get_identity(nida)
assert stale == []
assert identity == {"givenName": "Bertil",
"surName": "Bertilsson",
"mail": "bertil.bertilsson@example.com"
}
- info = self.population.get_info_from("abcdef", IDP_ONE)
+ info = self.population.get_info_from(nida, IDP_ONE)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
- assert info["name_id"] == 'abcdef'
+ assert info["name_id"] == nida
assert info["ava"] == {"givenName": "Bertil",
"surName": "Bertilsson",
"mail": "bertil.bertilsson@example.com"
@@ -129,7 +139,7 @@ class TestPopulationMemoryBased():
def test_modify_person(self):
session_info = {
- "name_id": "123456",
+ "name_id": nid,
"issuer": IDP_ONE,
"not_on_or_after": in_a_while(minutes=15),
"ava": {
@@ -140,26 +150,26 @@ class TestPopulationMemoryBased():
}
self.population.add_information_about_person(session_info)
- issuers = self.population.issuers_of_info("123456")
+ issuers = self.population.issuers_of_info(nid)
assert _eq(issuers, [IDP_ONE, IDP_OTHER])
- subjects = self.population.subjects()
- assert _eq(subjects, ["123456", "abcdef"])
+ subjects = [code(c) for c in self.population.subjects()]
+ assert _eq(subjects, [cnid, cnida])
# Are any of the sources gone stale
- stales = self.population.stale_sources_for_person("123456")
+ stales = self.population.stale_sources_for_person(nid)
assert stales == []
# are any of the possible sources not used or gone stale
possible = [IDP_ONE, IDP_OTHER]
- stales = self.population.stale_sources_for_person("123456", possible)
+ stales = self.population.stale_sources_for_person(nid, possible)
assert stales == []
- (identity, stale) = self.population.get_identity("123456")
+ (identity, stale) = self.population.get_identity(nid)
assert stale == []
assert identity == {'mail': 'arne.andersson@example.com',
'givenName': 'Arne',
'surName': 'Andersson',
"eduPersonEntitlement": "Anka"}
- info = self.population.get_info_from("123456", IDP_OTHER)
+ info = self.population.get_info_from(nid, IDP_OTHER)
assert info.keys() == ["not_on_or_after", "name_id", "ava"]
- assert info["name_id"] == '123456'
+ assert info["name_id"] == nid
assert info["ava"] == {"eduPersonEntitlement": "Anka"} \ No newline at end of file
diff --git a/tests/test_50_server.py b/tests/test_50_server.py
index eea9c8a1..2d395a7f 100644
--- a/tests/test_50_server.py
+++ b/tests/test_50_server.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
import base64
from urlparse import parse_qs
-from saml2.saml import AUTHN_PASSWORD
+from saml2.saml import AUTHN_PASSWORD, NameID, NAMEID_FORMAT_TRANSIENT
from saml2.samlp import response_from_string
from saml2.server import Server
@@ -18,6 +18,9 @@ from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from py.test import raises
import os
+nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
+ text="123456")
+
def _eq(l1,l2):
return set(l1) == set(l2)
@@ -150,13 +153,12 @@ class TestServer1():
def test_parse_ok_request(self):
authn_request = self.client.create_authn_request(
- id = "id1",
- destination = "http://localhost:8088/sso")
+ sid="id1", destination="http://localhost:8088/sso")
print authn_request
binding = BINDING_HTTP_REDIRECT
htargs = self.client.apply_binding(binding, "%s" % authn_request,
- "http://www.example.com", "abcd")
+ "http://www.example.com", "abcd")
_dict = parse_qs(htargs["headers"][0][1].split('?')[1])
print _dict
@@ -176,17 +178,17 @@ class TestServer1():
"urn:mace:example.com:saml:roland:sp",
"id12")
resp = self.server.create_authn_response(
- {"eduPersonEntitlement": "Short stop",
- "surName": "Jeter",
- "givenName": "Derek",
- "mail": "derek.jeter@nyy.mlb.com",
- "title": "The man"},
- "id12", # in_response_to
- "http://localhost:8087/", # destination
- "urn:mace:example.com:saml:roland:sp", # sp_entity_id
- name_id=name_id,
- authn=(AUTHN_PASSWORD, "http://www.example.com/login")
- )
+ {"eduPersonEntitlement": "Short stop",
+ "surName": "Jeter",
+ "givenName": "Derek",
+ "mail": "derek.jeter@nyy.mlb.com",
+ "title": "The man"},
+ "id12", # in_response_to
+ "http://localhost:8087/", # destination
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ name_id=name_id,
+ authn=(AUTHN_PASSWORD, "http://www.example.com/login")
+ )
print resp.keyswv()
assert _eq(resp.keyswv(),['status', 'destination', 'assertion',
@@ -335,7 +337,7 @@ class TestServer1():
def test_slo_http_post(self):
soon = time_util.in_a_while(days=1)
sinfo = {
- "name_id": "foba0001",
+ "name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after" : soon,
"user": {
@@ -346,10 +348,9 @@ class TestServer1():
self.client.users.add_information_about_person(sinfo)
logout_request = self.client.create_logout_request(
- destination = "http://localhost:8088/slop",
- subject_id="foba0001",
- issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
- reason = "I'm tired of this")
+ destination="http://localhost:8088/slop", name_id=nid,
+ issuer_entity_id="urn:mace:example.com:saml:roland:idp",
+ reason="I'm tired of this")
intermed = base64.b64encode("%s" % logout_request)
@@ -360,9 +361,9 @@ class TestServer1():
def test_slo_soap(self):
soon = time_util.in_a_while(days=1)
sinfo = {
- "name_id": "foba0001",
+ "name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
- "not_on_or_after" : soon,
+ "not_on_or_after": soon,
"user": {
"givenName": "Leo",
"surName": "Laport",
@@ -373,10 +374,9 @@ class TestServer1():
sp.users.add_information_about_person(sinfo)
logout_request = sp.create_logout_request(
- subject_id = "foba0001",
- destination = "http://localhost:8088/slo",
- issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
- reason = "I'm tired of this")
+ name_id=nid, destination="http://localhost:8088/slo",
+ issuer_entity_id="urn:mace:example.com:saml:roland:idp",
+ reason="I'm tired of this")
#_ = s_utils.deflate_and_base64_encode("%s" % (logout_request,))
@@ -429,7 +429,7 @@ def _logout_request(conf_file):
soon = time_util.in_a_while(days=1)
sinfo = {
- "name_id": "foba0001",
+ "name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after" : soon,
"user": {
@@ -440,7 +440,7 @@ def _logout_request(conf_file):
sp.users.add_information_about_person(sinfo)
return sp.create_logout_request(
- subject_id = "foba0001",
+ name_id = nid,
destination = "http://localhost:8088/slo",
issuer_entity_id = "urn:mace:example.com:saml:roland:idp",
reason = "I'm tired of this")
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index b2be2598..29788746 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -3,14 +3,17 @@
import base64
import urllib
+from saml2.response import LogoutResponse
from saml2.samlp import logout_request_from_string
from saml2.client import Saml2Client
from saml2 import samlp, BINDING_HTTP_POST
from saml2 import saml, config, class_name
from saml2.config import SPConfig
-from saml2.saml import NAMEID_FORMAT_PERSISTENT, NAMEID_FORMAT_TRANSIENT, \
- AUTHN_PASSWORD
+from saml2.saml import NAMEID_FORMAT_PERSISTENT
+from saml2.saml import NAMEID_FORMAT_TRANSIENT
+from saml2.saml import AUTHN_PASSWORD
+from saml2.saml import NameID
from saml2.server import Server
from saml2.time_util import in_a_while
@@ -55,6 +58,9 @@ REQ1 = { "1.2.14": """<?xml version='1.0' encoding='UTF-8'?>
AUTHN = (AUTHN_PASSWORD, "http://www.example.com/login")
+nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
+ text="123456")
+
class TestClient:
def setup_class(self):
self.server = Server("idp_conf")
@@ -68,7 +74,7 @@ class TestClient:
"https://idp.example.com/idp/",
"E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
format=saml.NAMEID_FORMAT_PERSISTENT,
- id="id1")
+ sid="id1")
reqstr = "%s" % req.to_string()
assert req.destination == "https://idp.example.com/idp/"
@@ -110,7 +116,7 @@ class TestClient:
"urn:oasis:names:tc:SAML:2.0:attrname-format:uri"):None,
},
format=saml.NAMEID_FORMAT_PERSISTENT,
- id="id1")
+ sid="id1")
print req.to_string()
assert req.destination == "https://idp.example.com/idp/"
@@ -144,7 +150,7 @@ class TestClient:
"https://aai-demo-idp.switch.ch/idp/shibboleth",
"_e7b68a04488f715cda642fbdd90099f5",
format=saml.NAMEID_FORMAT_TRANSIENT,
- id="id1")
+ sid="id1")
assert isinstance(req, samlp.AttributeQuery)
assert req.destination == "https://aai-demo-idp.switch.ch/idp/shibboleth"
@@ -178,7 +184,7 @@ class TestClient:
def test_create_auth_request_0(self):
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso",
- id="id1")
+ sid="id1")
ar = samlp.authn_request_from_string(ar_str)
print ar
assert ar.assertion_consumer_service_url == "http://lingon.catalogix.se:8087/"
@@ -199,7 +205,7 @@ class TestClient:
"http://www.example.com/sso",
"urn:mace:example.com:it:tek", # vo
nameid_format=NAMEID_FORMAT_PERSISTENT,
- id="666")
+ sid="666")
ar = samlp.authn_request_from_string(ar_str)
print ar
@@ -221,7 +227,7 @@ class TestClient:
ar_str = "%s" % self.client.create_authn_request(
"http://www.example.com/sso",
sign=True,
- id="id1")
+ sid="id1")
ar = samlp.authn_request_from_string(ar_str)
@@ -343,10 +349,10 @@ class TestClientWithDummy():
self.client.send = self.server.receive
def test_do_authn(self):
- id, http_args = self.client.prepare_for_authenticate(IDP,
+ sid, http_args = self.client.prepare_for_authenticate(IDP,
"http://www.example.com/relay_state")
- assert isinstance(id, basestring)
+ assert isinstance(sid, basestring)
assert len(http_args) == 4
assert http_args["headers"][0][0] == "Location"
assert http_args["data"] == []
@@ -363,7 +369,7 @@ class TestClientWithDummy():
# information about the user from an IdP
session_info = {
- "name_id": "123456",
+ "name_id": nid,
"issuer": "urn:mace:example.com:saml:roland:idp",
"not_on_or_after": in_a_while(minutes=15),
"ava": {
@@ -373,24 +379,18 @@ class TestClientWithDummy():
}
}
self.client.users.add_information_about_person(session_info)
- entity_ids = self.client.users.issuers_of_info("123456")
+ entity_ids = self.client.users.issuers_of_info(nid)
assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
- resp = self.client.global_logout("123456", "Tired", in_a_while(minutes=5))
+ resp = self.client.global_logout(nid, "Tired", in_a_while(minutes=5))
print resp
assert resp
assert len(resp) == 1
assert resp.keys() == entity_ids
- http_args = resp[entity_ids[0]]
- assert isinstance(http_args, dict)
- assert http_args["headers"] == [('Content-type', 'text/html')]
- info = unpack_form(http_args["data"][3])
- xml_str = base64.b64decode(info["SAMLRequest"])
- req = logout_request_from_string(xml_str)
- print req
- assert req.reason == "Tired"
+ response = resp[entity_ids[0]]
+ assert isinstance(response, LogoutResponse)
def test_post_sso(self):
- id, http_args = self.client.prepare_for_authenticate(
+ sid, http_args = self.client.prepare_for_authenticate(
"urn:mace:example.com:saml:roland:idp",
relay_state="really",
binding=BINDING_HTTP_POST)
@@ -403,210 +403,17 @@ class TestClientWithDummy():
http_args["data"] = urllib.urlencode(_dic)
http_args["method"] = "POST"
http_args["dummy"] = _dic["SAMLRequest"]
- http_args["headers"] = [('Content-type','application/x-www-form-urlencoded')]
+ http_args["headers"] = [('Content-type',
+ 'application/x-www-form-urlencoded')]
response = self.client.send(**http_args)
print response.text
_dic = unpack_form(response.text[3], "SAMLResponse")
resp = self.client.parse_authn_request_response(_dic["SAMLResponse"],
BINDING_HTTP_POST,
- {id: "/"})
+ {sid: "/"})
ac = resp.assertion.authn_statement[0].authn_context
- assert ac.authenticating_authority[0].text == 'http://www.example.com/login'
+ assert ac.authenticating_authority[0].text == \
+ 'http://www.example.com/login'
assert ac.authn_context_class_ref.text == AUTHN_PASSWORD
-# def test_logout_2(self):
-# """ one IdP/AA with BINDING_SOAP, can't actually send something"""
-#
-# conf = config.SPConfig()
-# conf.load_file("server2_conf")
-# client = Saml2Client(conf)
-#
-# # information about the user from an IdP
-# session_info = {
-# "name_id": "123456",
-# "issuer": "urn:mace:example.com:saml:roland:idp",
-# "not_on_or_after": in_a_while(minutes=15),
-# "ava": {
-# "givenName": "Anders",
-# "surName": "Andersson",
-# "mail": "anders.andersson@example.com"
-# }
-# }
-# client.users.add_information_about_person(session_info)
-# entity_ids = self.client.users.issuers_of_info("123456")
-# assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
-# destinations = client.config.single_logout_services(entity_ids[0],
-# BINDING_SOAP)
-# print destinations
-# assert destinations == ['http://localhost:8088/slo']
-#
-# # Will raise an error since there is noone at the other end.
-# raises(LogoutError, 'client.global_logout("123456", "Tired", in_a_while(minutes=5))')
-#
-# def test_logout_3(self):
-# """ two or more IdP/AA with BINDING_HTTP_REDIRECT"""
-#
-# conf = config.SPConfig()
-# conf.load_file("server3_conf")
-# client = Saml2Client(conf)
-#
-# # information about the user from an IdP
-# session_info_authn = {
-# "name_id": "123456",
-# "issuer": "urn:mace:example.com:saml:roland:idp",
-# "not_on_or_after": in_a_while(minutes=15),
-# "ava": {
-# "givenName": "Anders",
-# "surName": "Andersson",
-# "mail": "anders.andersson@example.com"
-# }
-# }
-# client.users.add_information_about_person(session_info_authn)
-# session_info_aa = {
-# "name_id": "123456",
-# "issuer": "urn:mace:example.com:saml:roland:aa",
-# "not_on_or_after": in_a_while(minutes=15),
-# "ava": {
-# "eduPersonEntitlement": "Foobar",
-# }
-# }
-# client.users.add_information_about_person(session_info_aa)
-# entity_ids = client.users.issuers_of_info("123456")
-# assert _leq(entity_ids, ["urn:mace:example.com:saml:roland:idp",
-# "urn:mace:example.com:saml:roland:aa"])
-# resp = client.global_logout("123456", "Tired", in_a_while(minutes=5))
-# print resp
-# assert resp
-# assert resp[0] # a session_id
-# assert resp[1] == '200 OK'
-# # HTTP POST
-# assert resp[2] == [('Content-type', 'text/html')]
-# assert resp[3][0] == '<head>'
-# assert resp[3][1] == '<title>SAML 2.0 POST</title>'
-#
-# state_info = client.state[resp[0]]
-# print state_info
-# assert state_info["entity_id"] == entity_ids[0]
-# assert state_info["subject_id"] == "123456"
-# assert state_info["reason"] == "Tired"
-# assert state_info["operation"] == "SLO"
-# assert state_info["entity_ids"] == entity_ids
-# assert state_info["sign"] == True
-#
-# def test_authz_decision_query(self):
-# conf = config.SPConfig()
-# conf.load_file("server3_conf")
-# client = Saml2Client(conf)
-#
-# AVA = {'mail': u'roland.hedberg@adm.umu.se',
-# 'eduPersonTargetedID': '95e9ae91dbe62d35198fbbd5e1fb0976',
-# 'displayName': u'Roland Hedberg',
-# 'uid': 'http://roland.hedberg.myopenid.com/'}
-#
-# sp_entity_id = "sp_entity_id"
-# in_response_to = "1234"
-# consumer_url = "http://example.com/consumer"
-# name_id = saml.NameID(saml.NAMEID_FORMAT_TRANSIENT, text="name_id")
-# policy = Policy()
-# ava = Assertion(AVA)
-# assertion = ava.construct(sp_entity_id, in_response_to,
-# consumer_url, name_id,
-# conf.attribute_converters,
-# policy, issuer=client._issuer())
-#
-# adq = client.create_authz_decision_query_using_assertion("entity_id",
-# assertion,
-# "read",
-# "http://example.com/text")
-#
-# assert adq
-# print adq
-# assert adq.keyswv() != []
-# assert adq.destination == "entity_id"
-# assert adq.resource == "http://example.com/text"
-# assert adq.action[0].text == "read"
-#
-# def test_request_to_discovery_service(self):
-# disc_url = "http://example.com/saml2/idp/disc"
-# url = discovery_service_request_url("urn:mace:example.com:saml:roland:sp",
-# disc_url)
-# print url
-# assert url == "http://example.com/saml2/idp/disc?entityID=urn%3Amace%3Aexample.com%3Asaml%3Aroland%3Asp"
-#
-# url = discovery_service_request_url(
-# self.client.config.entityid,
-# disc_url,
-# return_url= "http://example.org/saml2/sp/ds")
-#
-# print url
-# assert url == "http://example.com/saml2/idp/disc?entityID=urn%3Amace%3Aexample.com%3Asaml%3Aroland%3Asp&return=http%3A%2F%2Fexample.org%2Fsaml2%2Fsp%2Fds"
-#
-# def test_get_idp_from_discovery_service(self):
-# pdir = {"entityID": "http://example.org/saml2/idp/sso"}
-# params = urllib.urlencode(pdir)
-# redirect_url = "http://example.com/saml2/sp/disc?%s" % params
-#
-# entity_id = discovery_service_response(url=redirect_url)
-# assert entity_id == "http://example.org/saml2/idp/sso"
-#
-# pdir = {"idpID": "http://example.org/saml2/idp/sso"}
-# params = urllib.urlencode(pdir)
-# redirect_url = "http://example.com/saml2/sp/disc?%s" % params
-#
-# entity_id = discovery_service_response(url=redirect_url,
-# returnIDParam="idpID")
-#
-# assert entity_id == "http://example.org/saml2/idp/sso"
-# self.server.close_shelve_db()
-#
-# def test_unsolicited_response(self):
-# """
-#
-# """
-# self.server = Server("idp_conf")
-#
-# conf = config.SPConfig()
-# conf.load_file("server_conf")
-# self.client = Saml2Client(conf)
-#
-# for subject in self.client.users.subjects():
-# self.client.users.remove_person(subject)
-#
-# IDP = "urn:mace:example.com:saml:roland:idp"
-#
-# ava = { "givenName": ["Derek"], "surName": ["Jeter"],
-# "mail": ["derek@nyy.mlb.com"], "title": ["The man"]}
-#
-# resp_str = "%s" % self.server.create_authn_response(
-# identity=ava,
-# in_response_to="id1",
-# destination="http://lingon.catalogix.se:8087/",
-# sp_entity_id="urn:mace:example.com:saml:roland:sp",
-# name_id_policy=samlp.NameIDPolicy(
-# format=saml.NAMEID_FORMAT_PERSISTENT),
-# userid="foba0001@example.com")
-#
-# resp_str = base64.encodestring(resp_str)
-#
-# self.client.allow_unsolicited = True
-# authn_response = self.client.authn_request_response(
-# {"SAMLResponse":resp_str}, ())
-#
-# assert authn_response is not None
-# assert authn_response.issuer() == IDP
-# assert authn_response.response.assertion[0].issuer.text == IDP
-# session_info = authn_response.session_info()
-#
-# print session_info
-# assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'],
-# 'givenName': ['Derek'],
-# 'surName': ['Jeter']}
-# assert session_info["issuer"] == IDP
-# assert session_info["came_from"] == ""
-# response = samlp.response_from_string(authn_response.xmlstr)
-# assert response.destination == "http://lingon.catalogix.se:8087/"
-#
-# # One person in the cache
-# assert len(self.client.users.subjects()) == 1
-# self.server.close_shelve_db() \ No newline at end of file
diff --git a/tests/test_62_vo.py b/tests/test_62_vo.py
index 0839241f..b0ad0c22 100644
--- a/tests/test_62_vo.py
+++ b/tests/test_62_vo.py
@@ -1,22 +1,31 @@
+from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
+
__author__ = 'rolandh'
from saml2 import config
from saml2.client import Saml2Client
from saml2.time_util import str_to_time, in_a_while
-SESSION_INFO_PATTERN = {"ava":{}, "came from":"", "not_on_or_after":0,
- "issuer":"", "session_id":-1}
+SESSION_INFO_PATTERN = {"ava": {}, "came from": "", "not_on_or_after": 0,
+ "issuer": "", "session_id": -1}
+
+nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
+ text="abcdefgh")
+nid0 = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
+ text="01234567")
+
def add_derek_info(sp):
not_on_or_after = str_to_time(in_a_while(days=1))
session_info = SESSION_INFO_PATTERN.copy()
- session_info["ava"] = {"givenName":["Derek"], "umuselin":["deje0001"]}
+ session_info["ava"] = {"givenName": ["Derek"], "umuselin": ["deje0001"]}
session_info["issuer"] = "urn:mace:example.com:saml:idp"
- session_info["name_id"] = "abcdefgh"
+ session_info["name_id"] = nid
session_info["not_on_or_after"] = not_on_or_after
# subject_id, entity_id, info, timestamp
sp.users.add_information_about_person(session_info)
+
class TestVirtualOrg():
def setup_class(self):
conf = config.SPConfig()
@@ -28,24 +37,25 @@ class TestVirtualOrg():
add_derek_info(self.sp)
def test_mta(self):
- aas = self.vo.members_to_ask("abcdefgh")
+ aas = self.vo.members_to_ask(nid)
print aas
assert len(aas) == 1
assert 'urn:mace:example.com:saml:aa' in aas
def test_unknown_subject(self):
- aas = self.vo.members_to_ask("01234567")
+ aas = self.vo.members_to_ask(nid0)
print aas
assert len(aas) == 2
def test_id(self):
- id = self.vo.get_common_identifier("abcdefgh")
- print id
- assert id == "deje0001"
+ cid = self.vo.get_common_identifier(nid)
+ print cid
+ assert cid == "deje0001"
def test_id_unknown(self):
- id = self.vo.get_common_identifier("01234567")
- assert id is None
+ cid = self.vo.get_common_identifier(nid0)
+ assert cid is None
+
class TestVirtualOrg_2():
def setup_class(self):
@@ -56,21 +66,21 @@ class TestVirtualOrg_2():
add_derek_info(self.sp)
def test_mta(self):
- aas = self.sp.vorg.members_to_ask("abcdefgh")
+ aas = self.sp.vorg.members_to_ask(nid)
print aas
assert len(aas) == 1
assert 'urn:mace:example.com:saml:aa' in aas
def test_unknown_subject(self):
- aas = self.sp.vorg.members_to_ask("01234567")
+ aas = self.sp.vorg.members_to_ask(nid0)
print aas
assert len(aas) == 2
def test_id(self):
- id = self.sp.vorg.get_common_identifier("abcdefgh")
- print id
- assert id == "deje0001"
+ cid = self.sp.vorg.get_common_identifier(nid)
+ print cid
+ assert cid == "deje0001"
def test_id_unknown(self):
- id = self.sp.vorg.get_common_identifier("01234567")
- assert id is None
+ cid = self.sp.vorg.get_common_identifier(nid0)
+ assert cid is None