summaryrefslogtreecommitdiff
path: root/example
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2016-04-16 08:52:53 +0200
committerRoland Hedberg <roland.hedberg@adm.umu.se>2016-04-16 08:52:53 +0200
commit9dd3ee910aed4fb6b322b9ab022b044c7e753ab4 (patch)
treef3c2e01e1ed0beebc440d4da02be3fefaf9d0934 /example
parent61afe88cd9d3b5f6bfdd499af6746867ee78c8d1 (diff)
downloadpysaml2-9dd3ee910aed4fb6b322b9ab022b044c7e753ab4.tar.gz
Added functionality needed by the saml2test tool.
Diffstat (limited to 'example')
-rwxr-xr-xexample/sp-wsgi/sp.py37
1 files changed, 23 insertions, 14 deletions
diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py
index 77345bca..d1c182e6 100755
--- a/example/sp-wsgi/sp.py
+++ b/example/sp-wsgi/sp.py
@@ -9,6 +9,7 @@ import os
import re
import sys
import xml.dom.minidom
+from saml2.sigver import SignatureError
import six
from six.moves.http_cookies import SimpleCookie
@@ -48,7 +49,7 @@ from saml2.samlp import Extensions
logger = logging.getLogger("")
hdlr = logging.FileHandler('spx.log')
base_formatter = logging.Formatter(
- "%(asctime)s %(name)s:%(levelname)s %(message)s")
+ "%(asctime)s %(name)s:%(levelname)s %(message)s")
hdlr.setFormatter(base_formatter)
logger.addHandler(hdlr)
@@ -329,7 +330,8 @@ class User(object):
@property
def authn_statement(self):
- xml_doc = xml.dom.minidom.parseString(str(self.response.assertion.authn_statement[0]))
+ xml_doc = xml.dom.minidom.parseString(
+ str(self.response.assertion.authn_statement[0]))
return xml_doc.toprettyxml()
@@ -355,7 +357,8 @@ class ACS(Service):
try:
self.response = self.sp.parse_authn_request_response(
- response, binding, self.outstanding_queries, self.cache.outstanding_certs)
+ response, binding, self.outstanding_queries,
+ self.cache.outstanding_certs)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
@@ -367,6 +370,9 @@ class ACS(Service):
except VerificationError as err:
resp = ServiceError("Verification error: %s" % (err,))
return resp(self.environ, self.start_response)
+ except SignatureError as err:
+ resp = ServiceError("Signature error: %s" % (err,))
+ return resp(self.environ, self.start_response)
except Exception as err:
resp = ServiceError("Other error: %s" % (err,))
return resp(self.environ, self.start_response)
@@ -384,7 +390,7 @@ class ACS(Service):
def verify_attributes(self, ava):
logger.info("SP: %s", self.sp.config.entityid)
rest = POLICY.get_entity_categories(
- self.sp.config.entityid, self.sp.metadata)
+ self.sp.config.entityid, self.sp.metadata)
akeys = [k.lower() for k in ava.keys()]
@@ -469,7 +475,7 @@ class SSO(object):
_rstate = rndstr()
self.cache.relay_state[_rstate] = geturl(self.environ)
_entityid = _cli.config.ecp_endpoint(
- self.environ["REMOTE_ADDR"])
+ self.environ["REMOTE_ADDR"])
if not _entityid:
return -1, ServiceError("No IdP to talk to")
@@ -521,7 +527,7 @@ class SSO(object):
elif self.discosrv:
if query:
idp_entity_id = _cli.parse_discovery_service_response(
- query=self.environ.get("QUERY_STRING"))
+ query=self.environ.get("QUERY_STRING"))
if not idp_entity_id:
sid_ = sid()
self.cache.outstanding_queries[sid_] = came_from
@@ -531,7 +537,7 @@ class SSO(object):
"sp")["discovery_response"][0][0]
ret += "?sid=%s" % sid_
loc = _cli.create_discovery_service_request(
- self.discosrv, eid, **{"return": ret})
+ self.discosrv, eid, **{"return": ret})
return -1, SeeOther(loc)
elif len(idps) == 1:
# idps is a dictionary
@@ -548,8 +554,8 @@ class SSO(object):
try:
# Picks a binding to use for sending the Request to the IDP
_binding, destination = _cli.pick_binding(
- "single_sign_on_service", self.bindings, "idpsso",
- entity_id=entity_id)
+ "single_sign_on_service", self.bindings, "idpsso",
+ entity_id=entity_id)
logger.debug("binding: %s, destination: %s", _binding,
destination)
# Binding here is the response binding that is which binding the
@@ -568,7 +574,7 @@ class SSO(object):
"key": req_key_str
}
spcertenc = SPCertEnc(x509_data=ds.X509Data(
- x509_certificate=ds.X509Certificate(text=cert_str)))
+ x509_certificate=ds.X509Certificate(text=cert_str)))
extensions = Extensions(extension_elements=[
element_to_extension_element(spcertenc)])
@@ -589,7 +595,7 @@ class SSO(object):
except Exception as exc:
logger.exception(exc)
resp = ServiceError(
- "Failed to construct the AuthnRequest: %s" % exc)
+ "Failed to construct the AuthnRequest: %s" % exc)
return resp
# remember the request
@@ -782,7 +788,8 @@ def metadata(environ, start_response):
if path[-1] != "/":
path += "/"
metadata = create_metadata_string(path + "sp_conf.py", None,
- _args.valid, _args.cert, _args.keyfile,
+ _args.valid, _args.cert,
+ _args.keyfile,
_args.id, _args.name, _args.sign)
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
@@ -851,10 +858,12 @@ if __name__ == '__main__':
_parser.add_argument("config", help="SAML client config")
_parser.add_argument('-p', dest='path', help='Path to configuration file.')
_parser.add_argument('-v', dest='valid', default="4",
- help="How long, in days, the metadata is valid from the time of creation")
+ help="How long, in days, the metadata is valid from "
+ "the time of creation")
_parser.add_argument('-c', dest='cert', help='certificate')
_parser.add_argument('-i', dest='id',
- help="The ID of the entities descriptor in the metadata")
+ help="The ID of the entities descriptor in the "
+ "metadata")
_parser.add_argument('-k', dest='keyfile',
help="A file with a key to sign the metadata with")
_parser.add_argument('-n', dest='name')