#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import re
from collections import OrderedDict
from six.moves.urllib import parse
from saml2.config import Config
from saml2.mdstore import MetadataStore, MetaDataExtern
from saml2.mdstore import MetaDataMDX
from saml2.mdstore import SAML_METADATA_CONTENT_TYPE
from saml2.mdstore import destinations
from saml2.mdstore import name
from saml2 import sigver
from saml2.httpbase import HTTPBase
from saml2 import BINDING_SOAP
from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import config
from saml2.attribute_converter import ac_factory
from saml2.attribute_converter import d_to_local_name
from saml2.s_utils import UnknownPrincipal
from pathutils import full_path
import responses
sec_config = config.Config()
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
TEST_CERT = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDJg2cms7MqjniT8Fi/XkNHZNPbNVQyMUMXE9tXOdqwYCA1cc8vQdzkihscQMXy
3iPw2cMggBu6gjMTOSOxECkuvX5ZCclKr8pXAJM5cY6gVOaVO2PdTZcvDBKGbiaN
efiEw5hnoZomqZGp8wHNLAUkwtH9vjqqvxyS/vclc6k2ewIDAQABo4GnMIGkMB0G
A1UdDgQWBBRePsKHKYJsiojE78ZWXccK9K4aJTB1BgNVHSMEbjBsgBRePsKHKYJs
iojE78ZWXccK9K4aJaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAJrzqSSw
mDY9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAJSrKOEzHO7TL5cy6
h3qh+3+JAk8HbGBW+cbX6KBCAw/mzU8flK25vnWwXS3dv2FF3Aod0/S7AWNfKib5
U/SA9nJaz/mWeF9S0farz9AQFc8/NSzAzaVq7YbM4F6f6N2FRl7GikdXRCed45j6
mrPzGzk3ECbupFnqyREH3+ZPSdk="""
TEST_METADATA_STRING = """
{cert_data}
urn:oasis:names:tc:SAML:2.0:nameid-format:transient
Catalogix
Catalogix
http://www.catalogix.se
Hedberg
datordrift@catalogix.se
""".format(cert_data=TEST_CERT)
ATTRCONV = ac_factory(full_path("attributemaps"))
METADATACONF = {
"1": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("swamid-1.0.xml"),)],
}],
"2": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("InCommon-metadata.xml"),)],
}],
"3": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("extended.xml"),)],
}],
# "7": [{
# "class": "saml2.mdstore.MetaDataFile",
# "metadata": [(full_path("metadata_sp_1.xml"), ),
# (full_path("InCommon-metadata.xml"), )], },
# {
# "class": "saml2.mdstore.MetaDataExtern",
# "metadata": [
# ("https://kalmar2.org/simplesaml/module.php/aggregator/?id
# =kalmarcentral2&set=saml2",
# full_path("kalmar2.pem")), ],
# }],
"4": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("metadata_example.xml"),)],
}],
"5": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("metadata.aaitest.xml"),)],
}],
"8": [{
"class": "saml2.mdstore.MetaDataMD",
"metadata": [(full_path("swamid.md"),)],
}],
"9": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("metadata"),)]
}],
"10": [{
"class": "saml2.mdstore.MetaDataExtern",
"metadata": [
("http://md.incommon.org/InCommon/InCommon-metadata-export.xml",
full_path("inc-md-cert.pem"))]
}],
"11": [{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [(TEST_METADATA_STRING,)]
}],
"12": [{
"class": "saml2.mdstore.MetaDataFile",
"metadata": [(full_path("uu.xml"),)],
}],
}
def _eq(l1, l2):
return set(l1) == set(l2)
def _fix_valid_until(xmlstring):
new_date = datetime.datetime.now() + datetime.timedelta(days=1)
new_date = new_date.strftime("%Y-%m-%dT%H:%M:%SZ")
return re.sub(r' validUntil=".*?"', ' validUntil="%s"' % new_date,
xmlstring)
def test_swami_1():
UMU_IDP = 'https://idp.umu.se/saml2/idp/metadata.php'
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["1"])
assert len(mds) == 1 # One source
idps = mds.with_descriptor("idpsso")
assert idps.keys()
idpsso = mds.single_sign_on_service(UMU_IDP)
assert len(idpsso) == 1
assert destinations(idpsso) == [
'https://idp.umu.se/saml2/idp/SSOService.php']
_name = name(mds[UMU_IDP])
assert _name == u'UmeƄ University (SAML2)'
certs = mds.certs(UMU_IDP, "idpsso", "signing")
assert len(certs) == 1
sps = mds.with_descriptor("spsso")
assert len(sps) == 108
wants = mds.attribute_requirement('https://connect8.sunet.se/shibboleth')
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation'])
wants = mds.attribute_requirement('https://beta.lobber.se/shibboleth')
assert wants["required"] == []
lnamn = [d_to_local_name(mds.attrc, attr) for attr in wants["optional"]]
assert _eq(lnamn, ['eduPersonPrincipalName', 'mail', 'givenName', 'sn',
'eduPersonScopedAffiliation', 'eduPersonEntitlement'])
def test_incommon_1():
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["2"])
print(mds.entities())
assert mds.entities() > 1700
idps = mds.with_descriptor("idpsso")
print(idps.keys())
assert len(idps) > 300 # ~ 18%
try:
_ = mds.single_sign_on_service('urn:mace:incommon:uiuc.edu')
except UnknownPrincipal:
pass
idpsso = mds.single_sign_on_service('urn:mace:incommon:alaska.edu')
assert len(idpsso) == 1
print(idpsso)
assert destinations(idpsso) == [
'https://idp.alaska.edu/idp/profile/SAML2/Redirect/SSO']
sps = mds.with_descriptor("spsso")
acs_sp = []
for nam, desc in sps.items():
if "attribute_consuming_service" in desc:
acs_sp.append(nam)
assert len(acs_sp) == 0
# Look for attribute authorities
aas = mds.with_descriptor("attribute_authority")
print(aas.keys())
assert len(aas) == 180
def test_ext_2():
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["3"])
# No specific binding defined
ents = mds.with_descriptor("spsso")
for binding in [BINDING_SOAP, BINDING_HTTP_POST, BINDING_HTTP_ARTIFACT,
BINDING_HTTP_REDIRECT]:
assert mds.single_logout_service(list(ents.keys())[0], binding, "spsso")
def test_example():
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["4"])
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
assert list(idps.keys()) == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
certs = mds.certs(
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',
"idpsso", "signing")
assert len(certs) == 1
def test_switch_1():
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["5"])
assert len(mds.keys()) > 160
idps = mds.with_descriptor("idpsso")
print(idps.keys())
idpsso = mds.single_sign_on_service(
'https://aai-demo-idp.switch.ch/idp/shibboleth')
assert len(idpsso) == 1
print(idpsso)
assert destinations(idpsso) == [
'https://aai-demo-idp.switch.ch/idp/profile/SAML2/Redirect/SSO']
assert len(idps) > 30
aas = mds.with_descriptor("attribute_authority")
print(aas.keys())
aad = aas['https://aai-demo-idp.switch.ch/idp/shibboleth']
print(aad.keys())
assert len(aad["attribute_authority_descriptor"]) == 1
assert len(aad["idpsso_descriptor"]) == 1
sps = mds.with_descriptor("spsso")
dual = [eid for eid, ent in idps.items() if eid in sps]
print(len(dual))
assert len(dual) == 0
def test_metadata_file():
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["8"])
print(len(mds.keys()))
assert len(mds.keys()) == 560
@responses.activate
def test_mdx_service():
entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
url = "http://mdx.example.com/entities/{}".format(
parse.quote_plus(MetaDataMDX.sha1_entity_transform(entity_id)))
responses.add(responses.GET, url, body=TEST_METADATA_STRING, status=200,
content_type=SAML_METADATA_CONTENT_TYPE)
mdx = MetaDataMDX("http://mdx.example.com")
sso_loc = mdx.service(entity_id, "idpsso_descriptor", "single_sign_on_service")
assert sso_loc[BINDING_HTTP_REDIRECT][0]["location"] == "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
certs = mdx.certs(entity_id, "idpsso")
assert len(certs) == 1
@responses.activate
def test_mdx_single_sign_on_service():
entity_id = "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
url = "http://mdx.example.com/entities/{}".format(
parse.quote_plus(MetaDataMDX.sha1_entity_transform(entity_id)))
responses.add(responses.GET, url, body=TEST_METADATA_STRING, status=200,
content_type=SAML_METADATA_CONTENT_TYPE)
mdx = MetaDataMDX("http://mdx.example.com")
sso_loc = mdx.single_sign_on_service(entity_id, BINDING_HTTP_REDIRECT)
assert sso_loc[0]["location"] == "http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php"
# pyff-test not available
# def test_mdx_service():
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
# http = HTTPBase(verify=False, ca_bundle=None)
#
# mdx = MetaDataMDX(quote_plus, ATTRCONV,
# "http://pyff-test.nordu.net",
# sec_config, None, http)
# foo = mdx.service("https://idp.umu.se/saml2/idp/metadata.php",
# "idpsso_descriptor", "single_sign_on_service")
#
# assert len(foo) == 1
# assert foo.keys()[0] == BINDING_HTTP_REDIRECT
#
#
# def test_mdx_certs():
# sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
# http = HTTPBase(verify=False, ca_bundle=None)
#
# mdx = MetaDataMDX(quote_plus, ATTRCONV,
# "http://pyff-test.nordu.net",
# sec_config, None, http)
# foo = mdx.certs("https://idp.umu.se/saml2/idp/metadata.php", "idpsso")
#
# assert len(foo) == 1
def test_load_local_dir():
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["9"])
print(mds)
assert len(mds) == 3 # Three sources
assert len(mds.keys()) == 4 # number of idps
def test_load_extern_incommon():
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["10"])
print(mds)
assert mds
assert len(mds.keys())
def test_load_local():
# string representation of XML idp definition
with open(full_path("metadata.xml")) as fp:
idp_metadata = fp.read()
saml_config = Config()
config_dict = {
"metadata": {"inline": [idp_metadata]}
}
cfg = saml_config.load(config_dict)
assert cfg
def test_load_remote_encoding():
crypto = sigver._get_xmlsec_cryptobackend()
sc = sigver.SecurityContext(crypto, key_type="", cert_type="")
httpc = HTTPBase()
mds = MetaDataExtern(ATTRCONV, 'http://metadata.aai.switch.ch/metadata.aaitest.xml', sc, full_path('SWITCHaaiRootCA.crt.pem'), httpc)
mds.load()
def test_load_string():
sec_config.xmlsec_binary = sigver.get_xmlsec_binary(["/opt/local/bin"])
mds = MetadataStore(ATTRCONV, sec_config,
disable_ssl_certificate_validation=True)
mds.imp(METADATACONF["11"])
# print(mds)
assert len(mds.keys()) == 1
idps = mds.with_descriptor("idpsso")
assert list(idps.keys()) == [
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php']
certs = mds.certs(
'http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php',
"idpsso", "signing")
assert len(certs) == 1
def test_get_certs_from_metadata():
mds = MetadataStore(ATTRCONV, None)
mds.imp(METADATACONF["11"])
certs1 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "any")
certs2 = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
assert certs1[0] == certs2[0] == TEST_CERT
def test_get_certs_from_metadata_without_keydescriptor():
mds = MetadataStore(ATTRCONV, None)
mds.imp([{
"class": "saml2.mdstore.InMemoryMetaData",
"metadata": [("""
urn:oasis:names:tc:SAML:2.0:nameid-format:transient
Catalogix
Catalogix
http://www.catalogix.se
Hedberg
datordrift@catalogix.se
""",)]
}])
certs = mds.certs("http://xenosmilus.umdc.umu.se/simplesaml/saml2/idp/metadata.php", "idpsso")
assert len(certs) == 0
def test_metadata_extension_algsupport():
mds = MetadataStore(ATTRCONV, None)
mds.imp(METADATACONF["12"])
mdf = mds.metadata[full_path("uu.xml")]
assert mds
def test_extension():
mds = MetadataStore(ATTRCONV, None)
# use ordered dict to force expected entity to be last
metadata = OrderedDict()
metadata["1"] = {"entity1": {}}
metadata["2"] = {"entity2": {"idpsso_descriptor": [{"extensions": {"extension_elements": [{"__class__": "test"}]}}]}}
mds.metadata = metadata
assert mds.extension("entity2", "idpsso_descriptor", "test")
if __name__ == "__main__":
test_metadata_extension_algsupport()