diff options
author | Roland Hedberg <roland.hedberg@adm.umu.se> | 2016-04-16 08:52:53 +0200 |
---|---|---|
committer | Roland Hedberg <roland.hedberg@adm.umu.se> | 2016-04-16 08:52:53 +0200 |
commit | 9dd3ee910aed4fb6b322b9ab022b044c7e753ab4 (patch) | |
tree | f3c2e01e1ed0beebc440d4da02be3fefaf9d0934 /example | |
parent | 61afe88cd9d3b5f6bfdd499af6746867ee78c8d1 (diff) | |
download | pysaml2-9dd3ee910aed4fb6b322b9ab022b044c7e753ab4.tar.gz |
Added functionality needed by the saml2test tool.
Diffstat (limited to 'example')
-rwxr-xr-x | example/sp-wsgi/sp.py | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py index 77345bca..d1c182e6 100755 --- a/example/sp-wsgi/sp.py +++ b/example/sp-wsgi/sp.py @@ -9,6 +9,7 @@ import os import re import sys import xml.dom.minidom +from saml2.sigver import SignatureError import six from six.moves.http_cookies import SimpleCookie @@ -48,7 +49,7 @@ from saml2.samlp import Extensions logger = logging.getLogger("") hdlr = logging.FileHandler('spx.log') base_formatter = logging.Formatter( - "%(asctime)s %(name)s:%(levelname)s %(message)s") + "%(asctime)s %(name)s:%(levelname)s %(message)s") hdlr.setFormatter(base_formatter) logger.addHandler(hdlr) @@ -329,7 +330,8 @@ class User(object): @property def authn_statement(self): - xml_doc = xml.dom.minidom.parseString(str(self.response.assertion.authn_statement[0])) + xml_doc = xml.dom.minidom.parseString( + str(self.response.assertion.authn_statement[0])) return xml_doc.toprettyxml() @@ -355,7 +357,8 @@ class ACS(Service): try: self.response = self.sp.parse_authn_request_response( - response, binding, self.outstanding_queries, self.cache.outstanding_certs) + response, binding, self.outstanding_queries, + self.cache.outstanding_certs) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) resp = ServiceError("UnknownPrincipal: %s" % (excp,)) @@ -367,6 +370,9 @@ class ACS(Service): except VerificationError as err: resp = ServiceError("Verification error: %s" % (err,)) return resp(self.environ, self.start_response) + except SignatureError as err: + resp = ServiceError("Signature error: %s" % (err,)) + return resp(self.environ, self.start_response) except Exception as err: resp = ServiceError("Other error: %s" % (err,)) return resp(self.environ, self.start_response) @@ -384,7 +390,7 @@ class ACS(Service): def verify_attributes(self, ava): logger.info("SP: %s", self.sp.config.entityid) rest = POLICY.get_entity_categories( - self.sp.config.entityid, self.sp.metadata) + self.sp.config.entityid, self.sp.metadata) akeys = [k.lower() for k in ava.keys()] @@ -469,7 +475,7 @@ class SSO(object): _rstate = rndstr() self.cache.relay_state[_rstate] = geturl(self.environ) _entityid = _cli.config.ecp_endpoint( - self.environ["REMOTE_ADDR"]) + self.environ["REMOTE_ADDR"]) if not _entityid: return -1, ServiceError("No IdP to talk to") @@ -521,7 +527,7 @@ class SSO(object): elif self.discosrv: if query: idp_entity_id = _cli.parse_discovery_service_response( - query=self.environ.get("QUERY_STRING")) + query=self.environ.get("QUERY_STRING")) if not idp_entity_id: sid_ = sid() self.cache.outstanding_queries[sid_] = came_from @@ -531,7 +537,7 @@ class SSO(object): "sp")["discovery_response"][0][0] ret += "?sid=%s" % sid_ loc = _cli.create_discovery_service_request( - self.discosrv, eid, **{"return": ret}) + self.discosrv, eid, **{"return": ret}) return -1, SeeOther(loc) elif len(idps) == 1: # idps is a dictionary @@ -548,8 +554,8 @@ class SSO(object): try: # Picks a binding to use for sending the Request to the IDP _binding, destination = _cli.pick_binding( - "single_sign_on_service", self.bindings, "idpsso", - entity_id=entity_id) + "single_sign_on_service", self.bindings, "idpsso", + entity_id=entity_id) logger.debug("binding: %s, destination: %s", _binding, destination) # Binding here is the response binding that is which binding the @@ -568,7 +574,7 @@ class SSO(object): "key": req_key_str } spcertenc = SPCertEnc(x509_data=ds.X509Data( - x509_certificate=ds.X509Certificate(text=cert_str))) + x509_certificate=ds.X509Certificate(text=cert_str))) extensions = Extensions(extension_elements=[ element_to_extension_element(spcertenc)]) @@ -589,7 +595,7 @@ class SSO(object): except Exception as exc: logger.exception(exc) resp = ServiceError( - "Failed to construct the AuthnRequest: %s" % exc) + "Failed to construct the AuthnRequest: %s" % exc) return resp # remember the request @@ -782,7 +788,8 @@ def metadata(environ, start_response): if path[-1] != "/": path += "/" metadata = create_metadata_string(path + "sp_conf.py", None, - _args.valid, _args.cert, _args.keyfile, + _args.valid, _args.cert, + _args.keyfile, _args.id, _args.name, _args.sign) start_response('200 OK', [('Content-Type', "text/xml")]) return metadata @@ -851,10 +858,12 @@ if __name__ == '__main__': _parser.add_argument("config", help="SAML client config") _parser.add_argument('-p', dest='path', help='Path to configuration file.') _parser.add_argument('-v', dest='valid', default="4", - help="How long, in days, the metadata is valid from the time of creation") + help="How long, in days, the metadata is valid from " + "the time of creation") _parser.add_argument('-c', dest='cert', help='certificate') _parser.add_argument('-i', dest='id', - help="The ID of the entities descriptor in the metadata") + help="The ID of the entities descriptor in the " + "metadata") _parser.add_argument('-k', dest='keyfile', help="A file with a key to sign the metadata with") _parser.add_argument('-n', dest='name') |