summaryrefslogtreecommitdiff
path: root/example
diff options
context:
space:
mode:
authorIvan Kanakarakis <ivan.kanak@gmail.com>2018-11-19 21:01:26 +0200
committerIvan Kanakarakis <ivan.kanak@gmail.com>2018-11-20 02:30:18 +0200
commit40187543971a34cc9663dddb9b0400a59c84cd04 (patch)
treea77239cb6e578ec43459f9e4382c385e9e9dec88 /example
parent2e239147b94eb92ae477948e61e86dbcfc84229d (diff)
downloadpysaml2-40187543971a34cc9663dddb9b0400a59c84cd04.tar.gz
Indent and format code
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
Diffstat (limited to 'example')
-rwxr-xr-xexample/idp2/idp.py374
-rwxr-xr-xexample/sp-wsgi/sp.py283
2 files changed, 359 insertions, 298 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py
index 0c3d0913..a5027e39 100755
--- a/example/idp2/idp.py
+++ b/example/idp2/idp.py
@@ -6,56 +6,56 @@ import logging
import os
import re
import time
-
from hashlib import sha1
-try:
- from cheroot.wsgi import Server as WSGIServer
- from cheroot.ssl.builtin import BuiltinSSLAdapter
-except ImportError:
- from cherrypy.wsgiserver import CherryPyWSGIServer as WSGIServer
- from cherrypy.wsgiserver.ssl_builtin import BuiltinSSLAdapter
+from mako.lookup import TemplateLookup
-from six.moves.urllib.parse import parse_qs
from six.moves.http_cookies import SimpleCookie
+from six.moves.urllib.parse import parse_qs
+import saml2.xmldsig as ds
from saml2 import BINDING_HTTP_ARTIFACT
-from saml2 import BINDING_URI
+from saml2 import BINDING_HTTP_POST
+from saml2 import BINDING_HTTP_REDIRECT
from saml2 import BINDING_PAOS
from saml2 import BINDING_SOAP
-from saml2 import BINDING_HTTP_REDIRECT
-from saml2 import BINDING_HTTP_POST
+from saml2 import BINDING_URI
from saml2 import server
from saml2 import time_util
from saml2.authn import is_equal
-
from saml2.authn_context import AuthnBroker
from saml2.authn_context import PASSWORD
from saml2.authn_context import UNSPECIFIED
from saml2.authn_context import authn_context_class_ref
-from saml2.httputil import Response
+from saml2.httputil import BadRequest
from saml2.httputil import NotFound
-from saml2.httputil import geturl
-from saml2.httputil import get_post
from saml2.httputil import Redirect
-from saml2.httputil import Unauthorized
-from saml2.httputil import BadRequest
+from saml2.httputil import Response
from saml2.httputil import ServiceError
+from saml2.httputil import Unauthorized
+from saml2.httputil import get_post
+from saml2.httputil import geturl
from saml2.ident import Unknown
from saml2.metadata import create_metadata_string
from saml2.profile import ecp
-from saml2.s_utils import rndstr
-from saml2.s_utils import exception_trace
+from saml2.s_utils import PolicyError
from saml2.s_utils import UnknownPrincipal
from saml2.s_utils import UnsupportedBinding
-from saml2.s_utils import PolicyError
-from saml2.sigver import verify_redirect_signature
+from saml2.s_utils import exception_trace
+from saml2.s_utils import rndstr
from saml2.sigver import encrypt_cert_from_item
+from saml2.sigver import verify_redirect_signature
-from idp_user import USERS
from idp_user import EXTRA
-from mako.lookup import TemplateLookup
-import saml2.xmldsig as ds
+from idp_user import USERS
+
+try:
+ from cheroot.wsgi import Server as WSGIServer
+ from cheroot.ssl.builtin import BuiltinSSLAdapter
+except ImportError:
+ from cherrypy.wsgiserver import CherryPyWSGIServer as WSGIServer
+ from cherrypy.wsgiserver.ssl_builtin import BuiltinSSLAdapter
+
logger = logging.getLogger("saml2.idp")
logger.setLevel(logging.WARNING)
@@ -134,30 +134,32 @@ class Service(object):
def operation(self, saml_msg, binding):
logger.debug("_operation: %s", saml_msg)
- if not (saml_msg and 'SAMLRequest' in saml_msg):
- resp = BadRequest('Error parsing request or no request')
+ if not (saml_msg and "SAMLRequest" in saml_msg):
+ resp = BadRequest("Error parsing request or no request")
return resp(self.environ, self.start_response)
else:
# saml_msg may also contain Signature and SigAlg
if "Signature" in saml_msg:
try:
- kwargs = {"signature": saml_msg["Signature"],
- "sigalg": saml_msg["SigAlg"]}
+ kwargs = {
+ "signature": saml_msg["Signature"],
+ "sigalg": saml_msg["SigAlg"],
+ }
except KeyError:
- resp = BadRequest(
- 'Signature Algorithm specification is missing')
+ resp = BadRequest("Signature Algorithm specification is missing")
return resp(self.environ, self.start_response)
else:
kwargs = {}
try:
- kwargs['encrypt_cert'] = encrypt_cert_from_item(
- saml_msg["req_info"].message)
+ kwargs["encrypt_cert"] = encrypt_cert_from_item(
+ saml_msg["req_info"].message
+ )
except KeyError:
pass
try:
- kwargs['relay_state'] = saml_msg['RelayState']
+ kwargs["relay_state"] = saml_msg["RelayState"]
except KeyError:
pass
@@ -171,8 +173,7 @@ class Service(object):
# exchange artifact for request
request = IDP.artifact2message(saml_msg["SAMLart"], "spsso")
try:
- return self.do(request, BINDING_HTTP_ARTIFACT,
- saml_msg["RelayState"])
+ return self.do(request, BINDING_HTTP_ARTIFACT, saml_msg["RelayState"])
except KeyError:
return self.do(request, BINDING_HTTP_ARTIFACT)
@@ -235,14 +236,17 @@ class Service(object):
kwargs["headers"] = [kaka]
return do_authentication(self.environ, self.start_response, **kwargs)
+
# -----------------------------------------------------------------------------
+
REPOZE_ID_EQUIVALENT = "uid"
FORM_SPEC = """<form name="myform" method="post" action="%s">
<input type="hidden" name="SAMLResponse" value="%s" />
<input type="hidden" name="RelayState" value="%s" />
</form>"""
+
# -----------------------------------------------------------------------------
# === Single log in ====
# -----------------------------------------------------------------------------
@@ -273,7 +277,7 @@ class SSO(Service):
resp_args = {}
if not query:
logger.info("Missing QUERY")
- resp = Unauthorized('Unknown user')
+ resp = Unauthorized("Unknown user")
return resp_args, resp(self.environ, self.start_response)
if not self.req_info:
@@ -287,29 +291,27 @@ class SSO(Service):
self.binding_out, self.destination = IDP.pick_binding(
"assertion_consumer_service",
bindings=self.response_bindings,
- entity_id=_authn_req.issuer.text, request=_authn_req)
+ entity_id=_authn_req.issuer.text,
+ request=_authn_req,
+ )
except Exception as err:
logger.error("Couldn't find receiver endpoint: %s", err)
raise
- logger.debug("Binding: %s, destination: %s", self.binding_out,
- self.destination)
+ logger.debug("Binding: %s, destination: %s", self.binding_out, self.destination)
resp_args = {}
try:
resp_args = IDP.response_args(_authn_req)
_resp = None
except UnknownPrincipal as excp:
- _resp = IDP.create_error_response(_authn_req.id,
- self.destination, excp)
+ _resp = IDP.create_error_response(_authn_req.id, self.destination, excp)
except UnsupportedBinding as excp:
- _resp = IDP.create_error_response(_authn_req.id,
- self.destination, excp)
+ _resp = IDP.create_error_response(_authn_req.id, self.destination, excp)
return resp_args, _resp
- def do(self, query, binding_in, relay_state="", encrypt_cert=None,
- **kwargs):
+ def do(self, query, binding_in, relay_state="", encrypt_cert=None, **kwargs):
"""
:param query: The request
@@ -345,9 +347,11 @@ class SSO(Service):
resp_args["authn"] = metod
_resp = IDP.create_authn_response(
- identity, userid=self.user,
+ identity,
+ userid=self.user,
encrypt_cert_assertion=encrypt_cert,
- **resp_args)
+ **resp_args
+ )
except Exception as excp:
logging.error(exception_trace(excp))
resp = ServiceError("Exception: %s" % (excp,))
@@ -355,15 +359,22 @@ class SSO(Service):
logger.info("AuthNResponse: %s", _resp)
if self.op_type == "ecp":
- kwargs = {"soap_headers": [
- ecp.Response(
- assertion_consumer_service_url=self.destination)]}
+ kwargs = {
+ "soap_headers": [
+ ecp.Response(assertion_consumer_service_url=self.destination)
+ ]
+ }
else:
kwargs = {}
- http_args = IDP.apply_binding(self.binding_out,
- "%s" % _resp, self.destination,
- relay_state, response=True, **kwargs)
+ http_args = IDP.apply_binding(
+ self.binding_out,
+ "%s" % _resp,
+ self.destination,
+ relay_state,
+ response=True,
+ **kwargs
+ )
logger.debug("HTTPargs: %s", http_args)
return self.response(self.binding_out, http_args)
@@ -389,8 +400,9 @@ class SSO(Service):
del IDP.ticket[_key]
except KeyError:
try:
- self.req_info = IDP.parse_authn_request(saml_msg["SAMLRequest"],
- BINDING_HTTP_REDIRECT)
+ self.req_info = IDP.parse_authn_request(
+ saml_msg["SAMLRequest"], BINDING_HTTP_REDIRECT
+ )
except KeyError:
resp = BadRequest("Message signature verification failure")
return resp(self.environ, self.start_response)
@@ -407,8 +419,7 @@ class SSO(Service):
_certs = IDP.metadata.certs(issuer, "any", "signing")
verified_ok = False
for cert in _certs:
- if verify_redirect_signature(saml_msg, IDP.sec.sec_backend,
- cert):
+ if verify_redirect_signature(saml_msg, IDP.sec.sec_backend, cert):
verified_ok = True
break
if not verified_ok:
@@ -417,8 +428,7 @@ class SSO(Service):
if self.user:
saml_msg["req_info"] = self.req_info
- if _req.force_authn is not None and \
- _req.force_authn.lower() == 'true':
+ if _req.force_authn is not None and _req.force_authn.lower() == "true":
key = self._store_request(saml_msg)
return self.not_authn(key, _req.requested_authn_context)
else:
@@ -444,11 +454,11 @@ class SSO(Service):
del IDP.ticket[_key]
except KeyError:
self.req_info = IDP.parse_authn_request(
- saml_msg["SAMLRequest"], BINDING_HTTP_POST)
+ saml_msg["SAMLRequest"], BINDING_HTTP_POST
+ )
_req = self.req_info.message
if self.user:
- if _req.force_authn is not None and \
- _req.force_authn.lower() == 'true':
+ if _req.force_authn is not None and _req.force_authn.lower() == "true":
saml_msg["req_info"] = self.req_info
key = self._store_request(saml_msg)
return self.not_authn(key, _req.requested_authn_context)
@@ -486,9 +496,9 @@ class SSO(Service):
if is_equal(PASSWD[user], passwd):
resp = Unauthorized()
self.user = user
- self.environ[
- "idp.authn"] = AUTHN_BROKER.get_authn_by_accr(
- PASSWORD)
+ self.environ["idp.authn"] = AUTHN_BROKER.get_authn_by_accr(
+ PASSWORD
+ )
except ValueError:
resp = Unauthorized()
else:
@@ -511,8 +521,9 @@ class SSO(Service):
# -----------------------------------------------------------------------------
-def do_authentication(environ, start_response, authn_context, key,
- redirect_uri, headers=None):
+def do_authentication(
+ environ, start_response, authn_context, key, redirect_uri, headers=None
+):
"""
Display the login form
"""
@@ -530,16 +541,19 @@ def do_authentication(environ, start_response, authn_context, key,
# -----------------------------------------------------------------------------
+
PASSWD = {
"daev0001": "qwerty",
"testuser": "qwerty",
"roland": "dianakra",
"babs": "howes",
- "upper": "crust"}
+ "upper": "crust",
+}
-def username_password_authn(environ, start_response, reference, key,
- redirect_uri, headers=None):
+def username_password_authn(
+ environ, start_response, reference, key, redirect_uri, headers=None
+):
"""
Display the login form
"""
@@ -557,7 +571,7 @@ def username_password_authn(environ, start_response, reference, key,
"password": "",
"key": key,
"authn_reference": reference,
- "redirect_uri": redirect_uri
+ "redirect_uri": redirect_uri,
}
logger.info("do_authentication argv: %s", argv)
return resp(environ, start_response, **argv)
@@ -593,8 +607,7 @@ def do_verify(environ, start_response, _):
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
- lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid,
- query["key"][0])
+ lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0])
logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
@@ -611,6 +624,7 @@ def not_found(environ, start_response):
# === Single log out ===
# -----------------------------------------------------------------------------
+
# def _subject_sp_info(req_info):
# # look for the subject
# subject = req_info.subject_id()
@@ -618,6 +632,7 @@ def not_found(environ, start_response):
# sp_entity_id = req_info.message.issuer.text.strip()
# return subject, sp_entity_id
+
class SLO(Service):
def do(self, request, binding, relay_state="", encrypt_cert=None, **kwargs):
@@ -653,32 +668,33 @@ class SLO(Service):
destination = ""
response = False
else:
- binding, destination = IDP.pick_binding("single_logout_service",
- [binding], "spsso",
- req_info)
+ binding, destination = IDP.pick_binding(
+ "single_logout_service", [binding], "spsso", req_info
+ )
response = True
try:
- hinfo = IDP.apply_binding(binding, "%s" % resp, destination,
- relay_state, response=response)
+ hinfo = IDP.apply_binding(
+ binding, "%s" % resp, destination, relay_state, response=response
+ )
except Exception as exc:
logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
- #_tlh = dict2list_of_tuples(hinfo["headers"])
+ # _tlh = dict2list_of_tuples(hinfo["headers"])
delco = delete_cookie(self.environ, "idpauthn")
if delco:
hinfo["headers"].append(delco)
logger.info("Header: %s", (hinfo["headers"],))
if binding == BINDING_HTTP_REDIRECT:
- for key, value in hinfo['headers']:
- if key.lower() == 'location':
+ for key, value in hinfo["headers"]:
+ if key.lower() == "location":
resp = Redirect(value, headers=hinfo["headers"])
return resp(self.environ, self.start_response)
- resp = ServiceError('missing Location header')
+ resp = ServiceError("missing Location header")
return resp(self.environ, self.start_response)
else:
resp = Response(hinfo["data"], headers=hinfo["headers"])
@@ -698,16 +714,17 @@ class NMI(Service):
# Do the necessary stuff
name_id = IDP.ident.handle_manage_name_id_request(
- request.name_id, request.new_id, request.new_encrypted_id,
- request.terminate)
+ request.name_id, request.new_id, request.new_encrypted_id, request.terminate
+ )
logger.debug("New NameID: %s", name_id)
_resp = IDP.create_manage_name_id_response(request)
# It's using SOAP binding
- hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "",
- relay_state, response=True)
+ hinfo = IDP.apply_binding(
+ BINDING_SOAP, "%s" % _resp, "", relay_state, response=True
+ )
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
@@ -738,7 +755,7 @@ class AIDR(Service):
def operation(self, _dict, binding, **kwargs):
logger.debug("_operation: %s", _dict)
if not _dict or "ID" not in _dict:
- resp = BadRequest('Error parsing request or no request')
+ resp = BadRequest("Error parsing request or no request")
return resp(self.environ, self.start_response)
return self.do(_dict["ID"], binding, **kwargs)
@@ -748,14 +765,14 @@ class AIDR(Service):
# === Artifact resolve service ===
# ----------------------------------------------------------------------------
+
class ARS(Service):
def do(self, request, binding, relay_state="", encrypt_cert=None):
_req = IDP.parse_artifact_resolve(request, binding)
msg = IDP.create_artifact_response(_req, _req.artifact.text)
- hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
- response=True)
+ hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", response=True)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
@@ -773,13 +790,12 @@ class AQS(Service):
_req = IDP.parse_authn_query(request, binding)
_query = _req.message
- msg = IDP.create_authn_query_response(_query.subject,
- _query.requested_authn_context,
- _query.session_index)
+ msg = IDP.create_authn_query_response(
+ _query.subject, _query.requested_authn_context, _query.session_index
+ )
logger.debug("response: %s", msg)
- hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
- response=True)
+ hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", response=True)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
@@ -805,12 +821,10 @@ class ATTR(Service):
# Comes in over SOAP so only need to construct the response
args = IDP.response_args(_query, [BINDING_SOAP])
- msg = IDP.create_attribute_response(identity,
- name_id=name_id, **args)
+ msg = IDP.create_attribute_response(identity, name_id=name_id, **args)
logger.debug("response: %s", msg)
- hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
- response=True)
+ hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", response=True)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
@@ -832,7 +846,8 @@ class NIM(Service):
# Do the necessary stuff
try:
name_id = IDP.ident.handle_name_id_mapping_request(
- request.name_id, request.name_id_policy)
+ request.name_id, request.name_id_policy
+ )
except Unknown:
resp = BadRequest("Unknown entity")
return resp(self.environ, self.start_response)
@@ -844,8 +859,7 @@ class NIM(Service):
_resp = IDP.create_name_id_mapping_response(name_id, **info)
# Only SOAP
- hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", "",
- response=True)
+ hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", "", response=True)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
@@ -854,6 +868,8 @@ class NIM(Service):
# ----------------------------------------------------------------------------
# Cookie handling
# ----------------------------------------------------------------------------
+
+
def info_from_cookie(kaka):
logger.debug("KAKA: %s", kaka)
if kaka:
@@ -871,14 +887,14 @@ def info_from_cookie(kaka):
def delete_cookie(environ, name):
- kaka = environ.get("HTTP_COOKIE", '')
+ kaka = environ.get("HTTP_COOKIE", "")
logger.debug("delete KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = ""
- cookie[name]['path'] = "/"
+ cookie[name]["path"] = "/"
logger.debug("Expire: %s", morsel)
cookie[name]["expires"] = _expiration("dawn")
return tuple(cookie.output().split(": ", 1))
@@ -888,55 +904,58 @@ def delete_cookie(environ, name):
def set_cookie(name, _, *args):
cookie = SimpleCookie()
cookie[name] = base64.b64encode(":".join(args))
- cookie[name]['path'] = "/"
+ cookie[name]["path"] = "/"
cookie[name]["expires"] = _expiration(5) # 5 minutes from now
logger.debug("Cookie expires: %s", cookie[name]["expires"])
return tuple(cookie.output().split(": ", 1))
+
# ----------------------------------------------------------------------------
+
# map urls to functions
AUTHN_URLS = [
# sso
- (r'sso/post$', (SSO, "post")),
- (r'sso/post/(.*)$', (SSO, "post")),
- (r'sso/redirect$', (SSO, "redirect")),
- (r'sso/redirect/(.*)$', (SSO, "redirect")),
- (r'sso/art$', (SSO, "artifact")),
- (r'sso/art/(.*)$', (SSO, "artifact")),
+ (r"sso/post$", (SSO, "post")),
+ (r"sso/post/(.*)$", (SSO, "post")),
+ (r"sso/redirect$", (SSO, "redirect")),
+ (r"sso/redirect/(.*)$", (SSO, "redirect")),
+ (r"sso/art$", (SSO, "artifact")),
+ (r"sso/art/(.*)$", (SSO, "artifact")),
# slo
- (r'slo/redirect$', (SLO, "redirect")),
- (r'slo/redirect/(.*)$', (SLO, "redirect")),
- (r'slo/post$', (SLO, "post")),
- (r'slo/post/(.*)$', (SLO, "post")),
- (r'slo/soap$', (SLO, "soap")),
- (r'slo/soap/(.*)$', (SLO, "soap")),
+ (r"slo/redirect$", (SLO, "redirect")),
+ (r"slo/redirect/(.*)$", (SLO, "redirect")),
+ (r"slo/post$", (SLO, "post")),
+ (r"slo/post/(.*)$", (SLO, "post")),
+ (r"slo/soap$", (SLO, "soap")),
+ (r"slo/soap/(.*)$", (SLO, "soap")),
#
- (r'airs$', (AIDR, "uri")),
- (r'ars$', (ARS, "soap")),
+ (r"airs$", (AIDR, "uri")),
+ (r"ars$", (ARS, "soap")),
# mni
- (r'mni/post$', (NMI, "post")),
- (r'mni/post/(.*)$', (NMI, "post")),
- (r'mni/redirect$', (NMI, "redirect")),
- (r'mni/redirect/(.*)$', (NMI, "redirect")),
- (r'mni/art$', (NMI, "artifact")),
- (r'mni/art/(.*)$', (NMI, "artifact")),
- (r'mni/soap$', (NMI, "soap")),
- (r'mni/soap/(.*)$', (NMI, "soap")),
+ (r"mni/post$", (NMI, "post")),
+ (r"mni/post/(.*)$", (NMI, "post")),
+ (r"mni/redirect$", (NMI, "redirect")),
+ (r"mni/redirect/(.*)$", (NMI, "redirect")),
+ (r"mni/art$", (NMI, "artifact")),
+ (r"mni/art/(.*)$", (NMI, "artifact")),
+ (r"mni/soap$", (NMI, "soap")),
+ (r"mni/soap/(.*)$", (NMI, "soap")),
# nim
- (r'nim$', (NIM, "soap")),
- (r'nim/(.*)$', (NIM, "soap")),
+ (r"nim$", (NIM, "soap")),
+ (r"nim/(.*)$", (NIM, "soap")),
#
- (r'aqs$', (AQS, "soap")),
- (r'attr$', (ATTR, "soap"))
+ (r"aqs$", (AQS, "soap")),
+ (r"attr$", (ATTR, "soap")),
]
NON_AUTHN_URLS = [
- #(r'login?(.*)$', do_authentication),
- (r'verify?(.*)$', do_verify),
- (r'sso/ecp$', (SSO, "ecp")),
+ # (r'login?(.*)$', do_authentication),
+ (r"verify?(.*)$", do_verify),
+ (r"sso/ecp$", (SSO, "ecp")),
]
+
# ----------------------------------------------------------------------------
@@ -947,10 +966,17 @@ def metadata(environ, start_response):
path = os.path.dirname(os.path.abspath(__file__))
if path[-1] != "/":
path += "/"
- metadata = create_metadata_string(path + args.config, IDP.config,
- args.valid, args.cert, args.keyfile,
- args.id, args.name, args.sign)
- start_response('200 OK', [('Content-Type', "text/xml")])
+ metadata = create_metadata_string(
+ path + args.config,
+ IDP.config,
+ args.valid,
+ args.cert,
+ args.keyfile,
+ args.id,
+ args.name,
+ args.sign,
+ )
+ start_response("200 OK", [("Content-Type", "text/xml")])
return metadata
except Exception as ex:
logger.error("An error occured while creating metadata: %s", ex.message)
@@ -964,13 +990,13 @@ def staticfile(environ, start_response):
path = os.path.dirname(os.path.abspath(__file__))
if path[-1] != "/":
path += "/"
- path += environ.get('PATH_INFO', '').lstrip('/')
+ path += environ.get("PATH_INFO", "").lstrip("/")
path = os.path.realpath(path)
if not path.startswith(args.path):
resp = Unauthorized()
return resp(environ, start_response)
- start_response('200 OK', [('Content-Type', "text/xml")])
- return open(path, 'r').read()
+ start_response("200 OK", [("Content-Type", "text/xml")])
+ return open(path, "r").read()
except Exception as ex:
logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -991,7 +1017,7 @@ def application(environ, start_response):
:return: The response as a list of lines
"""
- path = environ.get('PATH_INFO', '').lstrip('/')
+ path = environ.get("PATH_INFO", "").lstrip("/")
if path == "metadata":
return metadata(environ, start_response)
@@ -1022,9 +1048,9 @@ def application(environ, start_response):
match = re.search(regex, path)
if match is not None:
try:
- environ['myapp.url_args'] = match.groups()[0]
+ environ["myapp.url_args"] = match.groups()[0]
except IndexError:
- environ['myapp.url_args'] = path
+ environ["myapp.url_args"] = path
logger.debug("Callback: %s", callback)
if isinstance(callback, tuple):
@@ -1034,46 +1060,55 @@ def application(environ, start_response):
return func()
return callback(environ, start_response, user)
- if re.search(r'static/.*', path) is not None:
+ if re.search(r"static/.*", path) is not None:
return staticfile(environ, start_response)
return not_found(environ, start_response)
+
# ----------------------------------------------------------------------------
-if __name__ == '__main__':
+
+if __name__ == "__main__":
parser = argparse.ArgumentParser()
- parser.add_argument('-p', dest='path', help='Path to configuration file.', default='./idp_conf.py')
- parser.add_argument('-v', dest='valid',
- help="How long, in days, the metadata is valid from "
- "the time of creation")
- parser.add_argument('-c', dest='cert', help='certificate')
- parser.add_argument('-i', dest='id',
- help="The ID of the entities descriptor")
- parser.add_argument('-k', dest='keyfile',
- help="A file with a key to sign the metadata with")
- parser.add_argument('-n', dest='name')
- parser.add_argument('-s', dest='sign', action='store_true',
- help="sign the metadata")
- parser.add_argument('-m', dest='mako_root', default="./")
+ parser.add_argument(
+ "-p", dest="path", help="Path to configuration file.", default="./idp_conf.py"
+ )
+ parser.add_argument(
+ "-v",
+ dest="valid",
+ help="How long, in days, the metadata is valid from " "the time of creation",
+ )
+ parser.add_argument("-c", dest="cert", help="certificate")
+ parser.add_argument("-i", dest="id", help="The ID of the entities descriptor")
+ parser.add_argument(
+ "-k", dest="keyfile", help="A file with a key to sign the metadata with"
+ )
+ parser.add_argument("-n", dest="name")
+ parser.add_argument(
+ "-s", dest="sign", action="store_true", help="sign the metadata"
+ )
+ parser.add_argument("-m", dest="mako_root", default="./")
parser.add_argument(dest="config")
args = parser.parse_args()
CONFIG = importlib.import_module(args.config)
AUTHN_BROKER = AuthnBroker()
- AUTHN_BROKER.add(authn_context_class_ref(PASSWORD),
- username_password_authn, 10,
- CONFIG.BASE)
- AUTHN_BROKER.add(authn_context_class_ref(UNSPECIFIED),
- "", 0, CONFIG.BASE)
+ AUTHN_BROKER.add(
+ authn_context_class_ref(PASSWORD), username_password_authn, 10, CONFIG.BASE
+ )
+ AUTHN_BROKER.add(authn_context_class_ref(UNSPECIFIED), "", 0, CONFIG.BASE)
IDP = server.Server(args.config, cache=Cache())
IDP.ticket = {}
_rot = args.mako_root
- LOOKUP = TemplateLookup(directories=[_rot + 'templates', _rot + 'htdocs'],
- module_directory=_rot + 'modules',
- input_encoding='utf-8', output_encoding='utf-8')
+ LOOKUP = TemplateLookup(
+ directories=[_rot + "templates", _rot + "htdocs"],
+ module_directory=_rot + "modules",
+ input_encoding="utf-8",
+ output_encoding="utf-8",
+ )
HOST = CONFIG.HOST
PORT = CONFIG.PORT
@@ -1097,9 +1132,9 @@ if __name__ == '__main__':
https = "using HTTPS"
# SRV.ssl_adapter = ssl_pyopenssl.pyOpenSSLAdapter(
# config.SERVER_CERT, config.SERVER_KEY, config.CERT_CHAIN)
- SRV.ssl_adapter = BuiltinSSLAdapter(CONFIG.SERVER_CERT,
- CONFIG.SERVER_KEY,
- CONFIG.CERT_CHAIN)
+ SRV.ssl_adapter = BuiltinSSLAdapter(
+ CONFIG.SERVER_CERT, CONFIG.SERVER_KEY, CONFIG.CERT_CHAIN
+ )
logger.info("Server starting")
print("IDP listening on %s:%s%s" % (HOST, PORT, _https))
@@ -1107,4 +1142,3 @@ if __name__ == '__main__':
SRV.start()
except KeyboardInterrupt:
SRV.stop()
-
diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py
index 90d7838c..4e9eab9c 100755
--- a/example/sp-wsgi/sp.py
+++ b/example/sp-wsgi/sp.py
@@ -49,9 +49,8 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.samlp import Extensions
logger = logging.getLogger("")
-hdlr = logging.FileHandler('spx.log')
-base_formatter = logging.Formatter(
- "%(asctime)s %(name)s:%(levelname)s %(message)s")
+hdlr = logging.FileHandler("spx.log")
+base_formatter = logging.Formatter("%(asctime)s %(name)s:%(levelname)s %(message)s")
hdlr.setFormatter(base_formatter)
logger.addHandler(hdlr)
@@ -93,7 +92,7 @@ def dict_to_table(ava, lev=0, width=1):
txt.extend(dict_to_table(valarr, lev + 1, width - 1))
txt.append("</td>\n")
txt.append("</tr>\n")
- txt.append('</table>\n')
+ txt.append("</table>\n")
return txt
@@ -108,19 +107,19 @@ def handle_static(environ, start_response, path):
:return: wsgi response for the static file.
"""
try:
- data = open(path, 'rb').read()
+ data = open(path, "rb").read()
if path.endswith(".ico"):
- resp = Response(data, headers=[('Content-Type', "image/x-icon")])
+ resp = Response(data, headers=[("Content-Type", "image/x-icon")])
elif path.endswith(".html"):
- resp = Response(data, headers=[('Content-Type', 'text/html')])
+ resp = Response(data, headers=[("Content-Type", "text/html")])
elif path.endswith(".txt"):
- resp = Response(data, headers=[('Content-Type', 'text/plain')])
+ resp = Response(data, headers=[("Content-Type", "text/plain")])
elif path.endswith(".css"):
- resp = Response(data, headers=[('Content-Type', 'text/css')])
+ resp = Response(data, headers=[("Content-Type", "text/css")])
elif path.endswith(".js"):
- resp = Response(data, headers=[('Content-Type', 'text/javascript')])
+ resp = Response(data, headers=[("Content-Type", "text/javascript")])
elif path.endswith(".png"):
- resp = Response(data, headers=[('Content-Type', 'image/png')])
+ resp = Response(data, headers=[("Content-Type", "image/png")])
else:
resp = Response(data)
except IOError:
@@ -130,22 +129,23 @@ def handle_static(environ, start_response, path):
class ECPResponse(object):
code = 200
- title = 'OK'
+ title = "OK"
def __init__(self, content):
self.content = content
# noinspection PyUnusedLocal
def __call__(self, environ, start_response):
- start_response('%s %s' % (self.code, self.title),
- [('Content-Type', "text/xml")])
+ start_response(
+ "%s %s" % (self.code, self.title), [("Content-Type", "text/xml")]
+ )
return [self.content]
def _expiration(timeout, tformat=None):
# Wed, 06-Jun-2012 01:34:34 GMT
if not tformat:
- tformat = '%a, %d-%b-%Y %T GMT'
+ tformat = "%a, %d-%b-%Y %T GMT"
if timeout == "now":
return time_util.instant(tformat)
@@ -165,7 +165,7 @@ class Cache(object):
self.result = {}
def get_user(self, environ):
- cookie = environ.get("HTTP_COOKIE", '')
+ cookie = environ.get("HTTP_COOKIE", "")
logger.debug("Cookie: %s", cookie)
if cookie:
cookie_obj = SimpleCookie(cookie)
@@ -181,7 +181,7 @@ class Cache(object):
return None
def delete_cookie(self, environ):
- cookie = environ.get("HTTP_COOKIE", '')
+ cookie = environ.get("HTTP_COOKIE", "")
logger.debug("delete cookie: %s", cookie)
if cookie:
_name = self.cookie_name
@@ -189,7 +189,7 @@ class Cache(object):
morsel = cookie_obj.get(_name, None)
cookie = SimpleCookie()
cookie[_name] = ""
- cookie[_name]['path'] = "/"
+ cookie[_name]["path"] = "/"
logger.debug("Expire: %s", morsel)
cookie[_name]["expires"] = _expiration("now")
return cookie.output().split(": ", 1)
@@ -200,7 +200,7 @@ class Cache(object):
self.uid2user[uid] = user
cookie = SimpleCookie()
cookie[self.cookie_name] = uid
- cookie[self.cookie_name]['path'] = "/"
+ cookie[self.cookie_name]["path"] = "/"
cookie[self.cookie_name]["expires"] = _expiration(480)
logger.debug("Cookie expires: %s", cookie[self.cookie_name]["expires"])
return cookie.output().split(": ", 1)
@@ -227,7 +227,7 @@ class Service(object):
return None
def unpack_post(self):
- _dict = parse_qs(get_post(self.environ).decode('utf8'))
+ _dict = parse_qs(get_post(self.environ).decode("utf8"))
logger.debug("unpack_post:: %s", _dict)
return dict([(k, v[0]) for k, v in _dict.items()])
@@ -251,7 +251,7 @@ class Service(object):
def operation(self, _dict, binding):
logger.debug("_operation: %s", _dict)
if not _dict:
- resp = BadRequest('Error parsing request or no request')
+ resp = BadRequest("Error parsing request or no request")
return resp(self.environ, self.start_response)
else:
try:
@@ -259,11 +259,13 @@ class Service(object):
except KeyError:
_relay_state = ""
if "SAMLResponse" in _dict:
- return self.do(_dict["SAMLResponse"], binding,
- _relay_state, mtype="response")
+ return self.do(
+ _dict["SAMLResponse"], binding, _relay_state, mtype="response"
+ )
elif "SAMLRequest" in _dict:
- return self.do(_dict["SAMLRequest"], binding,
- _relay_state, mtype="request")
+ return self.do(
+ _dict["SAMLRequest"], binding, _relay_state, mtype="request"
+ )
def artifact_operation(self, _dict):
if not _dict:
@@ -315,7 +317,7 @@ class Service(object):
return self.operation(_dict, BINDING_SOAP)
def not_authn(self):
- resp = Unauthorized('Unknown user')
+ resp = Unauthorized("Unknown user")
return resp(self.environ, self.start_response)
@@ -333,7 +335,8 @@ class User(object):
@property
def authn_statement(self):
xml_doc = xml.dom.minidom.parseString(
- str(self.response.assertion.authn_statement[0]))
+ str(self.response.assertion.authn_statement[0])
+ )
return xml_doc.toprettyxml()
@@ -354,18 +357,24 @@ class ACS(Service):
# tmp_outstanding_queries = dict(self.outstanding_queries)
if not response:
logger.info("Missing Response")
- resp = Unauthorized('Unknown user')
+ resp = Unauthorized("Unknown user")
return resp(self.environ, self.start_response)
try:
- conv_info = {'remote_addr': self.environ['REMOTE_ADDR'],
- 'request_uri': self.environ['REQUEST_URI'],
- 'entity_id': self.sp.config.entityid,
- 'endpoints': self.sp.config.getattr('endpoints', 'sp')}
+ conv_info = {
+ "remote_addr": self.environ["REMOTE_ADDR"],
+ "request_uri": self.environ["REQUEST_URI"],
+ "entity_id": self.sp.config.entityid,
+ "endpoints": self.sp.config.getattr("endpoints", "sp"),
+ }
self.response = self.sp.parse_authn_request_response(
- response, binding, self.outstanding_queries,
- self.cache.outstanding_certs, conv_info=conv_info)
+ response,
+ binding,
+ self.outstanding_queries,
+ self.cache.outstanding_certs,
+ conv_info=conv_info,
+ )
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
@@ -389,15 +398,12 @@ class ACS(Service):
user = User(self.response.name_id, self.response.ava, self.response)
cookie = self.cache.set_cookie(user)
- resp = Redirect("/", headers=[
- cookie,
- ])
+ resp = Redirect("/", headers=[cookie])
return resp(self.environ, self.start_response)
def verify_attributes(self, ava):
logger.info("SP: %s", self.sp.config.entityid)
- rest = POLICY.get_entity_categories(
- self.sp.config.entityid, self.sp.metadata)
+ rest = POLICY.get_entity_categories(self.sp.config.entityid, self.sp.metadata)
akeys = [k.lower() for k in ava.keys()]
@@ -421,8 +427,16 @@ class ACS(Service):
class SSO(object):
- def __init__(self, sp, environ, start_response, cache=None,
- wayf=None, discosrv=None, bindings=None):
+ def __init__(
+ self,
+ sp,
+ environ,
+ start_response,
+ cache=None,
+ wayf=None,
+ discosrv=None,
+ bindings=None,
+ ):
self.sp = sp
self.environ = environ
self.start_response = start_response
@@ -433,8 +447,11 @@ class SSO(object):
if bindings:
self.bindings = bindings
else:
- self.bindings = [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST,
- BINDING_HTTP_ARTIFACT]
+ self.bindings = [
+ BINDING_HTTP_REDIRECT,
+ BINDING_HTTP_POST,
+ BINDING_HTTP_ARTIFACT,
+ ]
logger.debug("--- SSO ---")
def response(self, binding, http_args, do_not_start_response=False):
@@ -459,7 +476,7 @@ class SSO(object):
sid_ = sid()
self.cache.outstanding_queries[sid_] = came_from
logger.debug("Redirect to WAYF function: %s", self.wayf)
- return -1, SeeOther(headers=[('Location', "%s?%s" % (self.wayf, sid_))])
+ return -1, SeeOther(headers=[("Location", "%s?%s" % (self.wayf, sid_))])
def _pick_idp(self, came_from):
"""
@@ -481,24 +498,23 @@ class SSO(object):
_rstate = rndstr()
self.cache.relay_state[_rstate] = geturl(self.environ)
- _entityid = _cli.config.ecp_endpoint(
- self.environ["REMOTE_ADDR"])
+ _entityid = _cli.config.ecp_endpoint(self.environ["REMOTE_ADDR"])
if not _entityid:
return -1, ServiceError("No IdP to talk to")
logger.debug("IdP to talk to: %s", _entityid)
return ecp.ecp_auth_request(_cli, _entityid, _rstate)
else:
- return -1, ServiceError('Faulty Accept header')
+ return -1, ServiceError("Faulty Accept header")
else:
- return -1, ServiceError('unknown ECP version')
+ return -1, ServiceError("unknown ECP version")
# Find all IdPs
idps = self.sp.metadata.with_descriptor("idpsso")
idp_entity_id = None
- kaka = self.environ.get("HTTP_COOKIE", '')
+ kaka = self.environ.get("HTTP_COOKIE", "")
if kaka:
try:
(idp_entity_id, _) = parse_cookie("ve_disco", "SEED_SAW", kaka)
@@ -511,8 +527,7 @@ class SSO(object):
query = self.environ.get("QUERY_STRING")
if not idp_entity_id and query:
try:
- _idp_entity_id = dict(parse_qs(query))[
- self.idp_query_param][0]
+ _idp_entity_id = dict(parse_qs(query))[self.idp_query_param][0]
if _idp_entity_id in idps:
idp_entity_id = _idp_entity_id
except KeyError:
@@ -524,8 +539,7 @@ class SSO(object):
if self.wayf:
if query:
try:
- wayf_selected = dict(parse_qs(query))[
- "wayf_selected"][0]
+ wayf_selected = dict(parse_qs(query))["wayf_selected"][0]
except KeyError:
return self._wayf_redirect(came_from)
idp_entity_id = wayf_selected
@@ -534,23 +548,26 @@ class SSO(object):
elif self.discosrv:
if query:
idp_entity_id = _cli.parse_discovery_service_response(
- query=self.environ.get("QUERY_STRING"))
+ query=self.environ.get("QUERY_STRING")
+ )
if not idp_entity_id:
sid_ = sid()
self.cache.outstanding_queries[sid_] = came_from
logger.debug("Redirect to Discovery Service function")
eid = _cli.config.entityid
- ret = _cli.config.getattr("endpoints",
- "sp")["discovery_response"][0][0]
+ ret = _cli.config.getattr("endpoints", "sp")["discovery_response"][
+ 0
+ ][0]
ret += "?sid=%s" % sid_
loc = _cli.create_discovery_service_request(
- self.discosrv, eid, **{"return": ret})
+ self.discosrv, eid, **{"return": ret}
+ )
return -1, SeeOther(loc)
elif len(idps) == 1:
# idps is a dictionary
idp_entity_id = list(idps.keys())[0]
elif not len(idps):
- return -1, ServiceError('Misconfiguration')
+ return -1, ServiceError("Misconfiguration")
else:
return -1, NotImplemented("No WAYF or DS present!")
@@ -561,14 +578,12 @@ class SSO(object):
try:
# Picks a binding to use for sending the Request to the IDP
_binding, destination = _cli.pick_binding(
- "single_sign_on_service", self.bindings, "idpsso",
- entity_id=entity_id)
- logger.debug("binding: %s, destination: %s", _binding,
- destination)
+ "single_sign_on_service", self.bindings, "idpsso", entity_id=entity_id
+ )
+ logger.debug("binding: %s, destination: %s", _binding, destination)
# Binding here is the response binding that is which binding the
# IDP should use to return the response.
- acs = _cli.config.getattr("endpoints", "sp")[
- "assertion_consumer_service"]
+ acs = _cli.config.getattr("endpoints", "sp")["assertion_consumer_service"]
# just pick one
endp, return_binding = acs[0]
@@ -576,24 +591,27 @@ class SSO(object):
cert = None
if _cli.config.generate_cert_func is not None:
cert_str, req_key_str = _cli.config.generate_cert_func()
- cert = {
- "cert": cert_str,
- "key": req_key_str
- }
- spcertenc = SPCertEnc(x509_data=ds.X509Data(
- x509_certificate=ds.X509Certificate(text=cert_str)))
- extensions = Extensions(extension_elements=[
- element_to_extension_element(spcertenc)])
-
- req_id, req = _cli.create_authn_request(destination,
- binding=return_binding,
- extensions=extensions,
- nameid_format=NAMEID_FORMAT_PERSISTENT)
+ cert = {"cert": cert_str, "key": req_key_str}
+ spcertenc = SPCertEnc(
+ x509_data=ds.X509Data(
+ x509_certificate=ds.X509Certificate(text=cert_str)
+ )
+ )
+ extensions = Extensions(
+ extension_elements=[element_to_extension_element(spcertenc)]
+ )
+
+ req_id, req = _cli.create_authn_request(
+ destination,
+ binding=return_binding,
+ extensions=extensions,
+ nameid_format=NAMEID_FORMAT_PERSISTENT,
+ )
_rstate = rndstr()
self.cache.relay_state[_rstate] = came_from
- ht_args = _cli.apply_binding(_binding, "%s" % req, destination,
- relay_state=_rstate,
- sigalg=sigalg)
+ ht_args = _cli.apply_binding(
+ _binding, "%s" % req, destination, relay_state=_rstate, sigalg=sigalg
+ )
_sid = req_id
if cert is not None:
@@ -601,8 +619,7 @@ class SSO(object):
except Exception as exc:
logger.exception(exc)
- resp = ServiceError(
- "Failed to construct the AuthnRequest: %s" % exc)
+ resp = ServiceError("Failed to construct the AuthnRequest: %s" % exc)
return resp
# remember the request
@@ -646,7 +663,7 @@ class SLO(Service):
def do(self, message, binding, relay_state="", mtype="response"):
try:
txt = decode_base64_and_inflate(message)
- is_logout_request = 'LogoutRequest' in txt.split('>', 1)[0]
+ is_logout_request = "LogoutRequest" in txt.split(">", 1)[0]
except: # TODO: parse the XML correctly
is_logout_request = False
@@ -664,7 +681,7 @@ class SLO(Service):
# noinspection PyUnusedLocal
def not_found(environ, start_response):
"""Called if no URL matches."""
- resp = NotFound('Not Found')
+ resp = NotFound("Not Found")
return resp(environ, start_response)
@@ -681,7 +698,7 @@ def main(environ, start_response, sp):
body = dict_to_table(user.data)
authn_stmt = cgi.escape(user.authn_statement)
- body.append('<br><pre>' + authn_stmt + "</pre>")
+ body.append("<br><pre>" + authn_stmt + "</pre>")
body.append('<br><a href="/logout">logout</a>')
resp = Response(body)
@@ -724,19 +741,19 @@ def logout(environ, start_response, sp):
binding, http_info = logout_info
if binding == BINDING_HTTP_POST:
- body = ''.join(http_info['data'])
+ body = "".join(http_info["data"])
resp = Response(body)
return resp(environ, start_response)
elif binding == BINDING_HTTP_REDIRECT:
- for key, value in http_info['headers']:
- if key.lower() == 'location':
+ for key, value in http_info["headers"]:
+ if key.lower() == "location":
resp = Redirect(value)
return resp(environ, start_response)
- resp = ServiceError('missing Location header')
+ resp = ServiceError("missing Location header")
return resp(environ, start_response)
else:
- resp = ServiceError('unknown logout binding: %s', binding)
+ resp = ServiceError("unknown logout binding: %s", binding)
return resp(environ, start_response)
else: # result from logout, should be OK
pass
@@ -751,9 +768,7 @@ def finish_logout(environ, start_response):
# remove cookie and stored info
cookie = CACHE.delete_cookie(environ)
- resp = Response('You are now logged out of this service', headers=[
- cookie,
- ])
+ resp = Response("You are now logged out of this service", headers=[cookie])
return resp(environ, start_response)
@@ -762,10 +777,10 @@ def finish_logout(environ, start_response):
# map urls to functions
urls = [
# Hmm, place holder, NOT used
- ('place', ("holder", None)),
- (r'^$', main),
- (r'^disco', disco),
- (r'^logout$', logout),
+ ("place", ("holder", None)),
+ (r"^$", main),
+ (r"^disco", disco),
+ (r"^logout$", logout),
]
@@ -787,6 +802,7 @@ def add_urls():
# ----------------------------------------------------------------------------
+
def metadata(environ, start_response):
try:
path = _args.path
@@ -794,11 +810,17 @@ def metadata(environ, start_response):
path = os.path.dirname(os.path.abspath(__file__))
if path[-1] != "/":
path += "/"
- metadata = create_metadata_string(path + "sp_conf.py", None,
- _args.valid, _args.cert,
- _args.keyfile,
- _args.id, _args.name, _args.sign)
- start_response('200 OK', [('Content-Type', "text/xml")])
+ metadata = create_metadata_string(
+ path + "sp_conf.py",
+ None,
+ _args.valid,
+ _args.cert,
+ _args.keyfile,
+ _args.id,
+ _args.name,
+ _args.sign,
+ )
+ start_response("200 OK", [("Content-Type", "text/xml")])
return metadata
except Exception as ex:
logger.error("An error occured while creating metadata: %s", ex.message)
@@ -817,7 +839,7 @@ def application(environ, start_response):
request is done
:return: The response as a list of lines
"""
- path = environ.get('PATH_INFO', '').lstrip('/')
+ path = environ.get("PATH_INFO", "").lstrip("/")
logger.debug("<application> PATH: '%s'", path)
if path == "metadata":
@@ -850,7 +872,7 @@ def application(environ, start_response):
return resp(environ, start_response)
-if __name__ == '__main__':
+if __name__ == "__main__":
try:
from cheroot.wsgi import Server as WSGIServer
from cheroot.ssl import pyopenssl
@@ -859,30 +881,34 @@ if __name__ == '__main__':
from cherrypy.wsgiserver import ssl_pyopenssl as pyopenssl
_parser = argparse.ArgumentParser()
- _parser.add_argument('-d', dest='debug', action='store_true',
- help="Print debug information")
- _parser.add_argument('-D', dest='discosrv',
- help="Which disco server to use")
- _parser.add_argument('-s', dest='seed',
- help="Cookie seed")
- _parser.add_argument('-W', dest='wayf', action='store_true',
- help="Which WAYF url to use")
+ _parser.add_argument(
+ "-d", dest="debug", action="store_true", help="Print debug information"
+ )
+ _parser.add_argument("-D", dest="discosrv", help="Which disco server to use")
+ _parser.add_argument("-s", dest="seed", help="Cookie seed")
+ _parser.add_argument(
+ "-W", dest="wayf", action="store_true", help="Which WAYF url to use"
+ )
_parser.add_argument("config", help="SAML client config")
- _parser.add_argument('-p', dest='path', help='Path to configuration file.')
- _parser.add_argument('-v', dest='valid', default="4",
- help="How long, in days, the metadata is valid from "
- "the time of creation")
- _parser.add_argument('-c', dest='cert', help='certificate')
- _parser.add_argument('-i', dest='id',
- help="The ID of the entities descriptor in the "
- "metadata")
- _parser.add_argument('-k', dest='keyfile',
- help="A file with a key to sign the metadata with")
- _parser.add_argument('-n', dest='name')
- _parser.add_argument('-S', dest='sign', action='store_true',
- help="sign the metadata")
- _parser.add_argument('-C', dest='service_conf_module',
- help="service config module")
+ _parser.add_argument("-p", dest="path", help="Path to configuration file.")
+ _parser.add_argument(
+ "-v",
+ dest="valid",
+ default="4",
+ help="How long, in days, the metadata is valid from " "the time of creation",
+ )
+ _parser.add_argument("-c", dest="cert", help="certificate")
+ _parser.add_argument(
+ "-i", dest="id", help="The ID of the entities descriptor in the " "metadata"
+ )
+ _parser.add_argument(
+ "-k", dest="keyfile", help="A file with a key to sign the metadata with"
+ )
+ _parser.add_argument("-n", dest="name")
+ _parser.add_argument(
+ "-S", dest="sign", action="store_true", help="sign the metadata"
+ )
+ _parser.add_argument("-C", dest="service_conf_module", help="service config module")
ARGS = {}
_args = _parser.parse_args()
@@ -935,7 +961,8 @@ if __name__ == '__main__':
_https = ""
if service_conf.HTTPS:
SRV.ssl_adapter = pyopenssl.pyOpenSSLAdapter(
- SERVER_CERT, SERVER_KEY, CERT_CHAIN)
+ SERVER_CERT, SERVER_KEY, CERT_CHAIN
+ )
_https = " using SSL/TLS"
logger.info("Server starting")
print("SP listening on %s:%s%s" % (HOST, PORT, _https))