summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2012-12-21 11:18:39 +0100
committerRoland Hedberg <roland.hedberg@adm.umu.se>2012-12-21 11:18:39 +0100
commit77b30122a4594f6e6768f24c36343fa1a557bbff (patch)
tree5e1cfa57d23fcffc991d1fa73fa7ec5777414421 /tools
parentdbe2ba207baa48321ae15b6055182cbf70d409f6 (diff)
downloadpysaml2-77b30122a4594f6e6768f24c36343fa1a557bbff.tar.gz
The examples are also working (at least for me) now.
Diffstat (limited to 'tools')
-rwxr-xr-xtools/make_metadata.py630
1 files changed, 64 insertions, 566 deletions
diff --git a/tools/make_metadata.py b/tools/make_metadata.py
index 9a3a64a9..962ae3a8 100755
--- a/tools/make_metadata.py
+++ b/tools/make_metadata.py
@@ -2,578 +2,76 @@
import argparse
import os
import sys
-from saml2.time_util import in_a_while
-from saml2.extension import mdui, idpdisc, shibmd
-from saml2.saml import NAME_FORMAT_URI
-from saml2.attribute_converter import from_local_name
-from saml2 import md, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT, BINDING_SOAP,\
- samlp, class_name
-import xmldsig as ds
+from saml2.metadata import entity_descriptor
+from saml2.metadata import entities_descriptor
+from saml2.metadata import sign_entity_descriptor
-from saml2.sigver import SecurityContext, pre_signature_part
+from saml2.sigver import SecurityContext
from saml2.sigver import get_xmlsec_binary
from saml2.validate import valid_instance
from saml2.config import Config
-from saml2.s_utils import factory
-from saml2.s_utils import sid
-
-NSPAIR = {
- "saml2p":"urn:oasis:names:tc:SAML:2.0:protocol",
- "saml2":"urn:oasis:names:tc:SAML:2.0:assertion",
- "soap11":"http://schemas.xmlsoap.org/soap/envelope/",
- "meta": "urn:oasis:names:tc:SAML:2.0:metadata",
- "xsi":"http://www.w3.org/2001/XMLSchema-instance",
- "ds":"http://www.w3.org/2000/09/xmldsig#",
- "shibmd":"urn:mace:shibboleth:metadata:1.0",
- "md":"urn:oasis:names:tc:SAML:2.0:metadata",
-}
-
-DEFAULTS = {
- "want_assertions_signed": "true",
- "authn_requests_signed": "false",
- "want_authn_requests_signed": "true",
- }
-
-ORG_ATTR_TRANSL = {
- "organization_name": ("name", md.OrganizationName),
- "organization_display_name": ("display_name", md.OrganizationDisplayName),
- "organization_url": ("url", md.OrganizationURL)
-}
-
-def _localized_name(val, klass):
- """If no language is defined 'en' is the default"""
- try:
- (text, lang) = val
- return klass(text=text, lang=lang)
- except ValueError:
- return klass(text=val, lang="en")
-
-def do_organization_info(ava):
- """ decription of an organization in the configuration is
- a dictionary of keys and values, where the values might be tuples:
-
- "organization": {
- "name": ("AB Exempel", "se"),
- "display_name": ("AB Exempel", "se"),
- "url": "http://www.example.org"
- }
-
- """
-
- if ava is None:
- return None
-
- org = md.Organization()
- for dkey, (ckey, klass) in ORG_ATTR_TRANSL.items():
- if ckey not in ava:
- continue
- if isinstance(ava[ckey], basestring):
- setattr(org, dkey, [_localized_name(ava[ckey], klass)])
- elif isinstance(ava[ckey], list):
- setattr(org, dkey,
- [_localized_name(n, klass) for n in ava[ckey]])
- else:
- setattr(org, dkey, [_localized_name(ava[ckey], klass)])
- return org
-
-def do_contact_person_info(lava):
- """ Creates a ContactPerson instance from configuration information"""
-
- cps = []
- if lava is None:
- return cps
-
- contact_person = md.ContactPerson
- for ava in lava:
- cper = md.ContactPerson()
- for (key, classpec) in contact_person.c_children.values():
- try:
- value = ava[key]
- data = []
- if isinstance(classpec, list):
- # What if value is not a list ?
- if isinstance(value, basestring):
- data = [classpec[0](text=value)]
- else:
- for val in value:
- data.append(classpec[0](text=val))
- else:
- data = classpec(text=value)
- setattr(cper, key, data)
- except KeyError:
- pass
- for (prop, classpec, _) in contact_person.c_attributes.values():
- try:
- # should do a check for valid value
- setattr(cper, prop, ava[prop])
- except KeyError:
- pass
-
- # ContactType must have a value
- typ = getattr(cper, "contact_type")
- if not typ:
- setattr(cper, "contact_type", "technical")
-
- cps.append(cper)
-
- return cps
-
-
-def do_key_descriptor(cert, use="signing"):
- return md.KeyDescriptor(
- key_info = ds.KeyInfo(
- x509_data=ds.X509Data(
- x509_certificate=ds.X509Certificate(text=cert)
- )
- ),
- use=use
- )
-
-def do_requested_attribute(attributes, acs, is_required="false"):
- lista = []
- for attr in attributes:
- attr = from_local_name(acs, attr, NAME_FORMAT_URI)
- args = {}
- for key in attr.keyswv():
- args[key] = getattr(attr, key)
- args["is_required"] = is_required
- args["name_format"] = NAME_FORMAT_URI
- lista.append(md.RequestedAttribute(**args))
- return lista
-
-def do_uiinfo(_uiinfo):
- uii = mdui.UIInfo()
- for attr in ['display_name', 'description', "information_url",
- 'privacy_statement_url']:
- try:
- val = _uiinfo[attr]
- except KeyError:
- continue
-
- aclass = uii.child_class(attr)
- inst = getattr(uii, attr)
- if isinstance(val, basestring):
- ainst = aclass(text=val)
- inst.append(ainst)
- elif isinstance(val, dict):
- ainst = aclass()
- ainst.text = val["text"]
- ainst.lang = val["lang"]
- inst.append(ainst)
- else :
- for value in val:
- if isinstance(value, basestring):
- ainst = aclass(text=value)
- inst.append(ainst)
- elif isinstance(value, dict):
- ainst = aclass()
- ainst.text = value["text"]
- ainst.lang = value["lang"]
- inst.append(ainst)
-
- try:
- _attr = "logo"
- val = _uiinfo[_attr]
- inst = getattr(uii, _attr)
- # dictionary or list of dictionaries
- if isinstance(val, dict):
- logo = mdui.Logo()
- for attr, value in val.items():
- if attr in logo.keys():
- setattr(logo, attr, value)
- inst.append(logo)
- elif isinstance(val, list):
- for logga in val:
- if not isinstance(logga, dict):
- raise Exception("Configuration error !!")
- logo = mdui.Logo()
- for attr, value in logga.items():
- if attr in logo.keys():
- setattr(logo, attr, value)
- inst.append(logo)
- except KeyError:
- pass
-
- try:
- _attr = "keywords"
- val = _uiinfo[_attr]
- inst = getattr(uii, _attr)
- # list of basestrings, dictionary or list of dictionaries
- if isinstance(val, list):
- for value in val:
- keyw = mdui.Keywords()
- if isinstance(value, basestring):
- keyw.text = " ".join(value)
- elif isinstance(value, dict):
- keyw.text = " ".join(value["text"])
- try:
- keyw.lang = value["lang"]
- except KeyError:
- pass
- else:
- raise Exception("Configuration error: ui_info logo")
- inst.append(keyw)
- elif isinstance(val, dict):
- keyw = mdui.Keywords()
- keyw.text = " ".join(val["text"])
- try:
- keyw.lang = val["lang"]
- except KeyError:
- pass
- inst.append(keyw)
- else:
- raise Exception("Configuration Error: ui_info logo")
- except KeyError:
- pass
-
- return uii
-
-def do_idpdisc(discovery_response):
- return idpdisc.DiscoveryResponse(index="0", location=discovery_response,
- binding=idpdisc.NAMESPACE)
-
-ENDPOINTS = {
- "sp": {
- "artifact_resolution_service": (md.ArtifactResolutionService, True),
- "single_logout_service": (md.SingleLogoutService, False),
- "manage_name_id_service": (md.ManageNameIDService, False),
- "assertion_consumer_service": (md.AssertionConsumerService, True),
- },
- "idp":{
- "artifact_resolution_service": (md.ArtifactResolutionService, True),
- "single_logout_service": (md.SingleLogoutService, False),
- "manage_name_id_service": (md.ManageNameIDService, False),
- "single_sign_on_service": (md.SingleSignOnService, False),
- "name_id_mapping_service": (md.NameIDMappingService, False),
- "assertion_id_request_service": (md.AssertionIDRequestService, False),
- },
- "aa":{
- "artifact_resolution_service": (md.ArtifactResolutionService, True),
- "single_logout_service": (md.SingleLogoutService, False),
- "manage_name_id_service": (md.ManageNameIDService, False),
-
- "assertion_id_request_service": (md.AssertionIDRequestService, False),
-
- "attribute_service": (md.AttributeService, False)
- },
- "pdp": {
- "authz_service": (md.AuthzService, True)
- }
-}
-
-DEFAULT_BINDING = {
- "assertion_consumer_service": BINDING_HTTP_POST,
- "single_sign_on_service": BINDING_HTTP_REDIRECT,
- "single_logout_service": BINDING_HTTP_POST,
- "attribute_service": BINDING_SOAP,
- "artifact_resolution_service": BINDING_SOAP
-}
-
-def do_endpoints(conf, endpoints):
- service = {}
-
- for endpoint, (eclass, indexed) in endpoints.items():
- try:
- servs = []
- i = 1
- for args in conf[endpoint]:
- if isinstance(args, basestring): # Assume it's the location
- args = {"location":args,
- "binding": DEFAULT_BINDING[endpoint]}
- elif isinstance(args, tuple): # (location, binding)
- args = {"location":args[0], "binding": args[1]}
- if indexed and "index" not in args:
- args["index"] = "%d" % i
- servs.append(factory(eclass, **args))
- i += 1
- service[endpoint] = servs
- except KeyError:
- pass
- return service
-
-DEFAULT = {
- "want_assertions_signed": "true",
- "authn_requests_signed": "false",
- "want_authn_requests_signed": "false",
- }
-
-def do_spsso_descriptor(conf, cert=None):
- spsso = md.SPSSODescriptor()
- spsso.protocol_support_enumeration = samlp.NAMESPACE
-
- endps = conf.getattr("endpoints", "sp")
- if endps:
- for (endpoint, instlist) in do_endpoints(endps,
- ENDPOINTS["sp"]).items():
- setattr(spsso, endpoint, instlist)
-
- if cert:
- spsso.key_descriptor = do_key_descriptor(cert)
-
- for key in ["want_assertions_signed", "authn_requests_signed"]:
- try:
- val = conf.getattr(key, "sp")
- if val is None:
- setattr(spsso, key, DEFAULT[key]) #default ?!
- else:
- strval = "{0:>s}".format(val)
- setattr(spsso, key, strval.lower())
- except KeyError:
- setattr(spsso, key, DEFAULTS[key])
-
- requested_attributes = []
- acs = conf.attribute_converters
- req = conf.getattr("required_attributes", "sp")
- if req:
- requested_attributes.extend(do_requested_attribute(req, acs,
- is_required="true"))
-
- opt=conf.getattr("optional_attributes", "sp")
- if opt:
- requested_attributes.extend(do_requested_attribute(opt, acs))
-
- if requested_attributes:
- spsso.attribute_consuming_service = [md.AttributeConsumingService(
- requested_attribute=requested_attributes,
- service_name= [md.ServiceName(lang="en",text=conf.name)],
- index="1",
- )]
- try:
- if conf.description:
- try:
- (text, lang) = conf.description
- except ValueError:
- text = conf.description
- lang = "en"
- spsso.attribute_consuming_service[0].service_description = [
- md.ServiceDescription(text=text,
- lang=lang)]
- except KeyError:
- pass
-
- dresp = conf.getattr("discovery_response", "sp")
- if dresp:
- if spsso.extensions is None:
- spsso.extensions = md.Extensions()
- spsso.extensions.add_extension_element(do_idpdisc(dresp))
-
- return spsso
-
-def do_idpsso_descriptor(conf, cert=None):
- idpsso = md.IDPSSODescriptor()
- idpsso.protocol_support_enumeration = samlp.NAMESPACE
-
- endps = conf.getattr("endpoints", "idp")
- if endps:
- for (endpoint, instlist) in do_endpoints(endps,
- ENDPOINTS["idp"]).items():
- setattr(idpsso, endpoint, instlist)
-
- scopes = conf.getattr("scope", "idp")
- if scopes:
- if idpsso.extensions is None:
- idpsso.extensions = md.Extensions()
- for scope in scopes:
- mdscope = shibmd.Scope()
- mdscope.text = scope
- # unless scope contains '*'/'+'/'?' assume non regexp ?
- mdscope.regexp = "false"
- idpsso.extensions.add_extension_element(mdscope)
-
- ui_info = conf.getattr("ui_info", "idp")
- if ui_info:
- if idpsso.extensions is None:
- idpsso.extensions = md.Extensions()
- idpsso.extensions.add_extension_element(do_uiinfo(ui_info))
-
- if cert:
- idpsso.key_descriptor = do_key_descriptor(cert)
-
- for key in ["want_authn_requests_signed"]:
- try:
- val = conf.getattr(key, "idp")
- if val is None:
- setattr(idpsso, key, DEFAULT["want_authn_requests_signed"])
- else:
- setattr(idpsso, key, "%s" % val)
- except KeyError:
- setattr(idpsso, key, DEFAULTS[key])
-
- return idpsso
-
-def do_aa_descriptor(conf, cert):
- aad = md.AttributeAuthorityDescriptor()
- aad.protocol_support_enumeration = samlp.NAMESPACE
-
- endps = conf.getattr("endpoints", "aa")
-
- if endps:
- for (endpoint, instlist) in do_endpoints(endps,
- ENDPOINTS["aa"]).items():
- setattr(aad, endpoint, instlist)
-
- if cert:
- aad.key_descriptor = do_key_descriptor(cert)
-
- return aad
-
-def do_pdp_descriptor(conf, cert):
- """ Create a Policy Decision Point descriptor """
- pdp = md.PDPDescriptor()
-
- pdp.protocol_support_enumeration = samlp.NAMESPACE
-
- endps = conf.getattr("endpoints", "pdp")
-
- if endps:
- for (endpoint, instlist) in do_endpoints(endps,
- ENDPOINTS["pdp"]).items():
- setattr(pdp, endpoint, instlist)
-
- namef = conf.getattr("name_form", "pdp")
- if namef:
- if isinstance(namef, basestring):
- ids = [md.NameIDFormat(namef)]
+# =============================================================================
+# Script that creates a SAML2 metadata file from a pysaml2 entity configuration
+# file
+# =============================================================================
+
+parser = argparse.ArgumentParser()
+parser.add_argument('-v', dest='valid', action='store_true',
+ help="How long, in days, the metadata is valid from the time of creation")
+parser.add_argument('-c', dest='cert', help='certificate')
+parser.add_argument('-e', dest='ed', action='store_true',
+ help="Wrap the whole thing in an EntitiesDescriptor")
+parser.add_argument('-i', dest='id',
+ help="The ID of the entities descriptor")
+parser.add_argument('-k', dest='keyfile',
+ help="A file with a key to sign the metadata with")
+parser.add_argument('-n', dest='name', default="")
+parser.add_argument('-p', dest='path',
+ help="path to the configuration file")
+parser.add_argument('-s', dest='sign', action='store_true',
+ help="sign the metadata")
+parser.add_argument('-x', dest='xmlsec',
+ help="xmlsec binaries to be used for the signing")
+parser.add_argument('-w', dest='wellknown',
+ help="Use wellknown namespace prefixes")
+parser.add_argument(dest="config", nargs="+")
+args = parser.parse_args()
+
+valid_for = 0
+nspair = None
+paths = [".", "/opt/local/bin"]
+
+if args.valid:
+ # translate into hours
+ valid_for = int(args.valid) * 24
+if args.xmlsec:
+ xmlsec = args.xmlsec
+else:
+ xmlsec = get_xmlsec_binary(paths)
+
+eds = []
+for filespec in args.config:
+ bas, fil = os.path.split(filespec)
+ if bas != "":
+ sys.path.insert(0, bas)
+ if fil.endswith(".py"):
+ fil = fil[:-3]
+ cnf = Config().load_file(fil, metadata_construction=True)
+ eds.append(entity_descriptor(cnf))
+
+secc = SecurityContext(xmlsec, args.keyfile, cert_file=args.cert)
+if args.id:
+ desc = entities_descriptor(eds, valid_for, args.name, args.id,
+ args.sign, secc)
+ valid_instance(desc)
+ print desc.to_string(nspair)
+else:
+ for eid in eds:
+ if args.sign:
+ desc = sign_entity_descriptor(eid, id, secc)
else:
- ids = [md.NameIDFormat(text=form) for form in namef]
- setattr(pdp, "name_id_format", ids)
-
- if cert:
- pdp.key_descriptor = do_key_descriptor(cert)
-
- return pdp
-
-def entity_descriptor(confd):
- mycert = "".join(open(confd.cert_file).readlines()[1:-1])
-
- entd = md.EntityDescriptor()
- entd.entity_id = confd.entityid
-
- if confd.valid_for:
- entd.valid_until = in_a_while(hours=int(confd.valid_for))
-
- if confd.organization is not None:
- entd.organization = do_organization_info(confd.organization)
- if confd.contact_person is not None:
- entd.contact_person = do_contact_person_info(confd.contact_person)
-
- serves = confd.serves
- if not serves:
- raise Exception(
- 'No service type ("sp","idp","aa") provided in the configuration')
-
- if "sp" in serves:
- confd.context = "sp"
- entd.spsso_descriptor = do_spsso_descriptor(confd, mycert)
- if "idp" in serves:
- confd.context = "idp"
- entd.idpsso_descriptor = do_idpsso_descriptor(confd, mycert)
- if "aa" in serves:
- confd.context = "aa"
- entd.attribute_authority_descriptor = do_aa_descriptor(confd, mycert)
- if "pdp" in serves:
- confd.context = "pdp"
- entd.pdp_descriptor = do_pdp_descriptor(confd, mycert)
-
- return entd
-
-def entities_descriptor(eds, valid_for, name, ident, sign, secc):
- entities = md.EntitiesDescriptor(entity_descriptor= eds)
- if valid_for:
- entities.valid_until = in_a_while(hours=valid_for)
- if name:
- entities.name = name
- if ident:
- entities.id = ident
-
- if sign:
- if not ident:
- ident = sid()
-
- if not secc.key_file:
- raise Exception("If you want to do signing you should define " +
- "a key to sign with")
-
- if not secc.my_cert:
- raise Exception("If you want to do signing you should define " +
- "where your public key are")
-
- entities.signature = pre_signature_part(ident, secc.my_cert, 1)
- entities.id = ident
- xmldoc = secc.sign_statement_using_xmlsec("%s" % entities,
- class_name(entities))
- entities = md.entities_descriptor_from_string(xmldoc)
- return entities
-
-def sign_entity_descriptor(edesc, ident, secc):
- if not ident:
- ident = sid()
-
- edesc.signature = pre_signature_part(ident, secc.my_cert, 1)
- edesc.id = ident
- xmldoc = secc.sign_statement_using_xmlsec("%s" % edesc, class_name(edesc))
- return md.entity_descriptor_from_string(xmldoc)
-
-if __name__ == "__main__":
- import sys
-
- parser = argparse.ArgumentParser()
- parser.add_argument('-v', dest='valid', action='store_true',
- help="How long, in days, the metadata is valid from the time of creation")
- parser.add_argument('-c', dest='cert', help='certificate')
- parser.add_argument('-e', dest='ed', action='store_true',
- help="Wrap the whole thing in an EntitiesDescriptor")
- parser.add_argument('-i', dest='id',
- help="The ID of the entities descriptor")
- parser.add_argument('-k', dest='keyfile',
- help="A file with a key to sign the metadata with")
- parser.add_argument('-n', dest='name', default="")
- parser.add_argument('-p', dest='path',
- help="path to the configuration file")
- parser.add_argument('-s', dest='sign', action='store_true',
- help="sign the metadata")
- parser.add_argument('-x', dest='xmlsec',
- help="xmlsec binaries to be used for the signing")
- parser.add_argument('-w', dest='wellknown',
- help="Use wellknown namespace prefixes")
- parser.add_argument(dest="config", nargs="+")
- args = parser.parse_args()
-
- valid_for = 0
- nspair = None
- paths = [".", "/opt/local/bin"]
-
- if args.valid:
- # translate into hours
- valid_for = int(args.valid) * 24
- if args.xmlsec:
- xmlsec = args.xmlsec
- else:
- xmlsec = get_xmlsec_binary(paths)
-
- eds = []
- for filespec in args.config:
- bas, fil = os.path.split(filespec)
- if bas != "":
- sys.path.insert(0, bas)
- if fil.endswith(".py"):
- fil = fil[:-3]
- cnf = Config().load_file(fil, metadata_construction=True)
- eds.append(entity_descriptor(cnf))
-
- secc = SecurityContext(xmlsec, args.keyfile, cert_file=args.cert)
- if args.id:
- desc = entities_descriptor(eds, valid_for, args.name, args.id,
- args.sign, secc)
+ desc = eid
valid_instance(desc)
print desc.to_string(nspair)
- else:
- for eid in eds:
- if args.sign:
- desc = sign_entity_descriptor(eid, id, secc)
- else:
- desc = eid
- valid_instance(desc)
- print desc.to_string(nspair)