diff options
author | Ivan Kanakarakis <ivan.kanak@gmail.com> | 2018-11-19 21:01:26 +0200 |
---|---|---|
committer | Ivan Kanakarakis <ivan.kanak@gmail.com> | 2018-11-20 02:30:18 +0200 |
commit | 40187543971a34cc9663dddb9b0400a59c84cd04 (patch) | |
tree | a77239cb6e578ec43459f9e4382c385e9e9dec88 /example | |
parent | 2e239147b94eb92ae477948e61e86dbcfc84229d (diff) | |
download | pysaml2-40187543971a34cc9663dddb9b0400a59c84cd04.tar.gz |
Indent and format code
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
Diffstat (limited to 'example')
-rwxr-xr-x | example/idp2/idp.py | 374 | ||||
-rwxr-xr-x | example/sp-wsgi/sp.py | 283 |
2 files changed, 359 insertions, 298 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py index 0c3d0913..a5027e39 100755 --- a/example/idp2/idp.py +++ b/example/idp2/idp.py @@ -6,56 +6,56 @@ import logging import os import re import time - from hashlib import sha1 -try: - from cheroot.wsgi import Server as WSGIServer - from cheroot.ssl.builtin import BuiltinSSLAdapter -except ImportError: - from cherrypy.wsgiserver import CherryPyWSGIServer as WSGIServer - from cherrypy.wsgiserver.ssl_builtin import BuiltinSSLAdapter +from mako.lookup import TemplateLookup -from six.moves.urllib.parse import parse_qs from six.moves.http_cookies import SimpleCookie +from six.moves.urllib.parse import parse_qs +import saml2.xmldsig as ds from saml2 import BINDING_HTTP_ARTIFACT -from saml2 import BINDING_URI +from saml2 import BINDING_HTTP_POST +from saml2 import BINDING_HTTP_REDIRECT from saml2 import BINDING_PAOS from saml2 import BINDING_SOAP -from saml2 import BINDING_HTTP_REDIRECT -from saml2 import BINDING_HTTP_POST +from saml2 import BINDING_URI from saml2 import server from saml2 import time_util from saml2.authn import is_equal - from saml2.authn_context import AuthnBroker from saml2.authn_context import PASSWORD from saml2.authn_context import UNSPECIFIED from saml2.authn_context import authn_context_class_ref -from saml2.httputil import Response +from saml2.httputil import BadRequest from saml2.httputil import NotFound -from saml2.httputil import geturl -from saml2.httputil import get_post from saml2.httputil import Redirect -from saml2.httputil import Unauthorized -from saml2.httputil import BadRequest +from saml2.httputil import Response from saml2.httputil import ServiceError +from saml2.httputil import Unauthorized +from saml2.httputil import get_post +from saml2.httputil import geturl from saml2.ident import Unknown from saml2.metadata import create_metadata_string from saml2.profile import ecp -from saml2.s_utils import rndstr -from saml2.s_utils import exception_trace +from saml2.s_utils import PolicyError from saml2.s_utils import UnknownPrincipal from saml2.s_utils import UnsupportedBinding -from saml2.s_utils import PolicyError -from saml2.sigver import verify_redirect_signature +from saml2.s_utils import exception_trace +from saml2.s_utils import rndstr from saml2.sigver import encrypt_cert_from_item +from saml2.sigver import verify_redirect_signature -from idp_user import USERS from idp_user import EXTRA -from mako.lookup import TemplateLookup -import saml2.xmldsig as ds +from idp_user import USERS + +try: + from cheroot.wsgi import Server as WSGIServer + from cheroot.ssl.builtin import BuiltinSSLAdapter +except ImportError: + from cherrypy.wsgiserver import CherryPyWSGIServer as WSGIServer + from cherrypy.wsgiserver.ssl_builtin import BuiltinSSLAdapter + logger = logging.getLogger("saml2.idp") logger.setLevel(logging.WARNING) @@ -134,30 +134,32 @@ class Service(object): def operation(self, saml_msg, binding): logger.debug("_operation: %s", saml_msg) - if not (saml_msg and 'SAMLRequest' in saml_msg): - resp = BadRequest('Error parsing request or no request') + if not (saml_msg and "SAMLRequest" in saml_msg): + resp = BadRequest("Error parsing request or no request") return resp(self.environ, self.start_response) else: # saml_msg may also contain Signature and SigAlg if "Signature" in saml_msg: try: - kwargs = {"signature": saml_msg["Signature"], - "sigalg": saml_msg["SigAlg"]} + kwargs = { + "signature": saml_msg["Signature"], + "sigalg": saml_msg["SigAlg"], + } except KeyError: - resp = BadRequest( - 'Signature Algorithm specification is missing') + resp = BadRequest("Signature Algorithm specification is missing") return resp(self.environ, self.start_response) else: kwargs = {} try: - kwargs['encrypt_cert'] = encrypt_cert_from_item( - saml_msg["req_info"].message) + kwargs["encrypt_cert"] = encrypt_cert_from_item( + saml_msg["req_info"].message + ) except KeyError: pass try: - kwargs['relay_state'] = saml_msg['RelayState'] + kwargs["relay_state"] = saml_msg["RelayState"] except KeyError: pass @@ -171,8 +173,7 @@ class Service(object): # exchange artifact for request request = IDP.artifact2message(saml_msg["SAMLart"], "spsso") try: - return self.do(request, BINDING_HTTP_ARTIFACT, - saml_msg["RelayState"]) + return self.do(request, BINDING_HTTP_ARTIFACT, saml_msg["RelayState"]) except KeyError: return self.do(request, BINDING_HTTP_ARTIFACT) @@ -235,14 +236,17 @@ class Service(object): kwargs["headers"] = [kaka] return do_authentication(self.environ, self.start_response, **kwargs) + # ----------------------------------------------------------------------------- + REPOZE_ID_EQUIVALENT = "uid" FORM_SPEC = """<form name="myform" method="post" action="%s"> <input type="hidden" name="SAMLResponse" value="%s" /> <input type="hidden" name="RelayState" value="%s" /> </form>""" + # ----------------------------------------------------------------------------- # === Single log in ==== # ----------------------------------------------------------------------------- @@ -273,7 +277,7 @@ class SSO(Service): resp_args = {} if not query: logger.info("Missing QUERY") - resp = Unauthorized('Unknown user') + resp = Unauthorized("Unknown user") return resp_args, resp(self.environ, self.start_response) if not self.req_info: @@ -287,29 +291,27 @@ class SSO(Service): self.binding_out, self.destination = IDP.pick_binding( "assertion_consumer_service", bindings=self.response_bindings, - entity_id=_authn_req.issuer.text, request=_authn_req) + entity_id=_authn_req.issuer.text, + request=_authn_req, + ) except Exception as err: logger.error("Couldn't find receiver endpoint: %s", err) raise - logger.debug("Binding: %s, destination: %s", self.binding_out, - self.destination) + logger.debug("Binding: %s, destination: %s", self.binding_out, self.destination) resp_args = {} try: resp_args = IDP.response_args(_authn_req) _resp = None except UnknownPrincipal as excp: - _resp = IDP.create_error_response(_authn_req.id, - self.destination, excp) + _resp = IDP.create_error_response(_authn_req.id, self.destination, excp) except UnsupportedBinding as excp: - _resp = IDP.create_error_response(_authn_req.id, - self.destination, excp) + _resp = IDP.create_error_response(_authn_req.id, self.destination, excp) return resp_args, _resp - def do(self, query, binding_in, relay_state="", encrypt_cert=None, - **kwargs): + def do(self, query, binding_in, relay_state="", encrypt_cert=None, **kwargs): """ :param query: The request @@ -345,9 +347,11 @@ class SSO(Service): resp_args["authn"] = metod _resp = IDP.create_authn_response( - identity, userid=self.user, + identity, + userid=self.user, encrypt_cert_assertion=encrypt_cert, - **resp_args) + **resp_args + ) except Exception as excp: logging.error(exception_trace(excp)) resp = ServiceError("Exception: %s" % (excp,)) @@ -355,15 +359,22 @@ class SSO(Service): logger.info("AuthNResponse: %s", _resp) if self.op_type == "ecp": - kwargs = {"soap_headers": [ - ecp.Response( - assertion_consumer_service_url=self.destination)]} + kwargs = { + "soap_headers": [ + ecp.Response(assertion_consumer_service_url=self.destination) + ] + } else: kwargs = {} - http_args = IDP.apply_binding(self.binding_out, - "%s" % _resp, self.destination, - relay_state, response=True, **kwargs) + http_args = IDP.apply_binding( + self.binding_out, + "%s" % _resp, + self.destination, + relay_state, + response=True, + **kwargs + ) logger.debug("HTTPargs: %s", http_args) return self.response(self.binding_out, http_args) @@ -389,8 +400,9 @@ class SSO(Service): del IDP.ticket[_key] except KeyError: try: - self.req_info = IDP.parse_authn_request(saml_msg["SAMLRequest"], - BINDING_HTTP_REDIRECT) + self.req_info = IDP.parse_authn_request( + saml_msg["SAMLRequest"], BINDING_HTTP_REDIRECT + ) except KeyError: resp = BadRequest("Message signature verification failure") return resp(self.environ, self.start_response) @@ -407,8 +419,7 @@ class SSO(Service): _certs = IDP.metadata.certs(issuer, "any", "signing") verified_ok = False for cert in _certs: - if verify_redirect_signature(saml_msg, IDP.sec.sec_backend, - cert): + if verify_redirect_signature(saml_msg, IDP.sec.sec_backend, cert): verified_ok = True break if not verified_ok: @@ -417,8 +428,7 @@ class SSO(Service): if self.user: saml_msg["req_info"] = self.req_info - if _req.force_authn is not None and \ - _req.force_authn.lower() == 'true': + if _req.force_authn is not None and _req.force_authn.lower() == "true": key = self._store_request(saml_msg) return self.not_authn(key, _req.requested_authn_context) else: @@ -444,11 +454,11 @@ class SSO(Service): del IDP.ticket[_key] except KeyError: self.req_info = IDP.parse_authn_request( - saml_msg["SAMLRequest"], BINDING_HTTP_POST) + saml_msg["SAMLRequest"], BINDING_HTTP_POST + ) _req = self.req_info.message if self.user: - if _req.force_authn is not None and \ - _req.force_authn.lower() == 'true': + if _req.force_authn is not None and _req.force_authn.lower() == "true": saml_msg["req_info"] = self.req_info key = self._store_request(saml_msg) return self.not_authn(key, _req.requested_authn_context) @@ -486,9 +496,9 @@ class SSO(Service): if is_equal(PASSWD[user], passwd): resp = Unauthorized() self.user = user - self.environ[ - "idp.authn"] = AUTHN_BROKER.get_authn_by_accr( - PASSWORD) + self.environ["idp.authn"] = AUTHN_BROKER.get_authn_by_accr( + PASSWORD + ) except ValueError: resp = Unauthorized() else: @@ -511,8 +521,9 @@ class SSO(Service): # ----------------------------------------------------------------------------- -def do_authentication(environ, start_response, authn_context, key, - redirect_uri, headers=None): +def do_authentication( + environ, start_response, authn_context, key, redirect_uri, headers=None +): """ Display the login form """ @@ -530,16 +541,19 @@ def do_authentication(environ, start_response, authn_context, key, # ----------------------------------------------------------------------------- + PASSWD = { "daev0001": "qwerty", "testuser": "qwerty", "roland": "dianakra", "babs": "howes", - "upper": "crust"} + "upper": "crust", +} -def username_password_authn(environ, start_response, reference, key, - redirect_uri, headers=None): +def username_password_authn( + environ, start_response, reference, key, redirect_uri, headers=None +): """ Display the login form """ @@ -557,7 +571,7 @@ def username_password_authn(environ, start_response, reference, key, "password": "", "key": key, "authn_reference": reference, - "redirect_uri": redirect_uri + "redirect_uri": redirect_uri, } logger.info("do_authentication argv: %s", argv) return resp(environ, start_response, **argv) @@ -593,8 +607,7 @@ def do_verify(environ, start_response, _): kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0]) - lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, - query["key"][0]) + lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0]) logger.debug("Redirect => %s", lox) resp = Redirect(lox, headers=[kaka], content="text/html") @@ -611,6 +624,7 @@ def not_found(environ, start_response): # === Single log out === # ----------------------------------------------------------------------------- + # def _subject_sp_info(req_info): # # look for the subject # subject = req_info.subject_id() @@ -618,6 +632,7 @@ def not_found(environ, start_response): # sp_entity_id = req_info.message.issuer.text.strip() # return subject, sp_entity_id + class SLO(Service): def do(self, request, binding, relay_state="", encrypt_cert=None, **kwargs): @@ -653,32 +668,33 @@ class SLO(Service): destination = "" response = False else: - binding, destination = IDP.pick_binding("single_logout_service", - [binding], "spsso", - req_info) + binding, destination = IDP.pick_binding( + "single_logout_service", [binding], "spsso", req_info + ) response = True try: - hinfo = IDP.apply_binding(binding, "%s" % resp, destination, - relay_state, response=response) + hinfo = IDP.apply_binding( + binding, "%s" % resp, destination, relay_state, response=response + ) except Exception as exc: logger.error("ServiceError: %s", exc) resp = ServiceError("%s" % exc) return resp(self.environ, self.start_response) - #_tlh = dict2list_of_tuples(hinfo["headers"]) + # _tlh = dict2list_of_tuples(hinfo["headers"]) delco = delete_cookie(self.environ, "idpauthn") if delco: hinfo["headers"].append(delco) logger.info("Header: %s", (hinfo["headers"],)) if binding == BINDING_HTTP_REDIRECT: - for key, value in hinfo['headers']: - if key.lower() == 'location': + for key, value in hinfo["headers"]: + if key.lower() == "location": resp = Redirect(value, headers=hinfo["headers"]) return resp(self.environ, self.start_response) - resp = ServiceError('missing Location header') + resp = ServiceError("missing Location header") return resp(self.environ, self.start_response) else: resp = Response(hinfo["data"], headers=hinfo["headers"]) @@ -698,16 +714,17 @@ class NMI(Service): # Do the necessary stuff name_id = IDP.ident.handle_manage_name_id_request( - request.name_id, request.new_id, request.new_encrypted_id, - request.terminate) + request.name_id, request.new_id, request.new_encrypted_id, request.terminate + ) logger.debug("New NameID: %s", name_id) _resp = IDP.create_manage_name_id_response(request) # It's using SOAP binding - hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", - relay_state, response=True) + hinfo = IDP.apply_binding( + BINDING_SOAP, "%s" % _resp, "", relay_state, response=True + ) resp = Response(hinfo["data"], headers=hinfo["headers"]) return resp(self.environ, self.start_response) @@ -738,7 +755,7 @@ class AIDR(Service): def operation(self, _dict, binding, **kwargs): logger.debug("_operation: %s", _dict) if not _dict or "ID" not in _dict: - resp = BadRequest('Error parsing request or no request') + resp = BadRequest("Error parsing request or no request") return resp(self.environ, self.start_response) return self.do(_dict["ID"], binding, **kwargs) @@ -748,14 +765,14 @@ class AIDR(Service): # === Artifact resolve service === # ---------------------------------------------------------------------------- + class ARS(Service): def do(self, request, binding, relay_state="", encrypt_cert=None): _req = IDP.parse_artifact_resolve(request, binding) msg = IDP.create_artifact_response(_req, _req.artifact.text) - hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", - response=True) + hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", response=True) resp = Response(hinfo["data"], headers=hinfo["headers"]) return resp(self.environ, self.start_response) @@ -773,13 +790,12 @@ class AQS(Service): _req = IDP.parse_authn_query(request, binding) _query = _req.message - msg = IDP.create_authn_query_response(_query.subject, - _query.requested_authn_context, - _query.session_index) + msg = IDP.create_authn_query_response( + _query.subject, _query.requested_authn_context, _query.session_index + ) logger.debug("response: %s", msg) - hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", - response=True) + hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", response=True) resp = Response(hinfo["data"], headers=hinfo["headers"]) return resp(self.environ, self.start_response) @@ -805,12 +821,10 @@ class ATTR(Service): # Comes in over SOAP so only need to construct the response args = IDP.response_args(_query, [BINDING_SOAP]) - msg = IDP.create_attribute_response(identity, - name_id=name_id, **args) + msg = IDP.create_attribute_response(identity, name_id=name_id, **args) logger.debug("response: %s", msg) - hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", - response=True) + hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "", response=True) resp = Response(hinfo["data"], headers=hinfo["headers"]) return resp(self.environ, self.start_response) @@ -832,7 +846,8 @@ class NIM(Service): # Do the necessary stuff try: name_id = IDP.ident.handle_name_id_mapping_request( - request.name_id, request.name_id_policy) + request.name_id, request.name_id_policy + ) except Unknown: resp = BadRequest("Unknown entity") return resp(self.environ, self.start_response) @@ -844,8 +859,7 @@ class NIM(Service): _resp = IDP.create_name_id_mapping_response(name_id, **info) # Only SOAP - hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", "", - response=True) + hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", "", response=True) resp = Response(hinfo["data"], headers=hinfo["headers"]) return resp(self.environ, self.start_response) @@ -854,6 +868,8 @@ class NIM(Service): # ---------------------------------------------------------------------------- # Cookie handling # ---------------------------------------------------------------------------- + + def info_from_cookie(kaka): logger.debug("KAKA: %s", kaka) if kaka: @@ -871,14 +887,14 @@ def info_from_cookie(kaka): def delete_cookie(environ, name): - kaka = environ.get("HTTP_COOKIE", '') + kaka = environ.get("HTTP_COOKIE", "") logger.debug("delete KAKA: %s", kaka) if kaka: cookie_obj = SimpleCookie(kaka) morsel = cookie_obj.get(name, None) cookie = SimpleCookie() cookie[name] = "" - cookie[name]['path'] = "/" + cookie[name]["path"] = "/" logger.debug("Expire: %s", morsel) cookie[name]["expires"] = _expiration("dawn") return tuple(cookie.output().split(": ", 1)) @@ -888,55 +904,58 @@ def delete_cookie(environ, name): def set_cookie(name, _, *args): cookie = SimpleCookie() cookie[name] = base64.b64encode(":".join(args)) - cookie[name]['path'] = "/" + cookie[name]["path"] = "/" cookie[name]["expires"] = _expiration(5) # 5 minutes from now logger.debug("Cookie expires: %s", cookie[name]["expires"]) return tuple(cookie.output().split(": ", 1)) + # ---------------------------------------------------------------------------- + # map urls to functions AUTHN_URLS = [ # sso - (r'sso/post$', (SSO, "post")), - (r'sso/post/(.*)$', (SSO, "post")), - (r'sso/redirect$', (SSO, "redirect")), - (r'sso/redirect/(.*)$', (SSO, "redirect")), - (r'sso/art$', (SSO, "artifact")), - (r'sso/art/(.*)$', (SSO, "artifact")), + (r"sso/post$", (SSO, "post")), + (r"sso/post/(.*)$", (SSO, "post")), + (r"sso/redirect$", (SSO, "redirect")), + (r"sso/redirect/(.*)$", (SSO, "redirect")), + (r"sso/art$", (SSO, "artifact")), + (r"sso/art/(.*)$", (SSO, "artifact")), # slo - (r'slo/redirect$', (SLO, "redirect")), - (r'slo/redirect/(.*)$', (SLO, "redirect")), - (r'slo/post$', (SLO, "post")), - (r'slo/post/(.*)$', (SLO, "post")), - (r'slo/soap$', (SLO, "soap")), - (r'slo/soap/(.*)$', (SLO, "soap")), + (r"slo/redirect$", (SLO, "redirect")), + (r"slo/redirect/(.*)$", (SLO, "redirect")), + (r"slo/post$", (SLO, "post")), + (r"slo/post/(.*)$", (SLO, "post")), + (r"slo/soap$", (SLO, "soap")), + (r"slo/soap/(.*)$", (SLO, "soap")), # - (r'airs$', (AIDR, "uri")), - (r'ars$', (ARS, "soap")), + (r"airs$", (AIDR, "uri")), + (r"ars$", (ARS, "soap")), # mni - (r'mni/post$', (NMI, "post")), - (r'mni/post/(.*)$', (NMI, "post")), - (r'mni/redirect$', (NMI, "redirect")), - (r'mni/redirect/(.*)$', (NMI, "redirect")), - (r'mni/art$', (NMI, "artifact")), - (r'mni/art/(.*)$', (NMI, "artifact")), - (r'mni/soap$', (NMI, "soap")), - (r'mni/soap/(.*)$', (NMI, "soap")), + (r"mni/post$", (NMI, "post")), + (r"mni/post/(.*)$", (NMI, "post")), + (r"mni/redirect$", (NMI, "redirect")), + (r"mni/redirect/(.*)$", (NMI, "redirect")), + (r"mni/art$", (NMI, "artifact")), + (r"mni/art/(.*)$", (NMI, "artifact")), + (r"mni/soap$", (NMI, "soap")), + (r"mni/soap/(.*)$", (NMI, "soap")), # nim - (r'nim$', (NIM, "soap")), - (r'nim/(.*)$', (NIM, "soap")), + (r"nim$", (NIM, "soap")), + (r"nim/(.*)$", (NIM, "soap")), # - (r'aqs$', (AQS, "soap")), - (r'attr$', (ATTR, "soap")) + (r"aqs$", (AQS, "soap")), + (r"attr$", (ATTR, "soap")), ] NON_AUTHN_URLS = [ - #(r'login?(.*)$', do_authentication), - (r'verify?(.*)$', do_verify), - (r'sso/ecp$', (SSO, "ecp")), + # (r'login?(.*)$', do_authentication), + (r"verify?(.*)$", do_verify), + (r"sso/ecp$", (SSO, "ecp")), ] + # ---------------------------------------------------------------------------- @@ -947,10 +966,17 @@ def metadata(environ, start_response): path = os.path.dirname(os.path.abspath(__file__)) if path[-1] != "/": path += "/" - metadata = create_metadata_string(path + args.config, IDP.config, - args.valid, args.cert, args.keyfile, - args.id, args.name, args.sign) - start_response('200 OK', [('Content-Type', "text/xml")]) + metadata = create_metadata_string( + path + args.config, + IDP.config, + args.valid, + args.cert, + args.keyfile, + args.id, + args.name, + args.sign, + ) + start_response("200 OK", [("Content-Type", "text/xml")]) return metadata except Exception as ex: logger.error("An error occured while creating metadata: %s", ex.message) @@ -964,13 +990,13 @@ def staticfile(environ, start_response): path = os.path.dirname(os.path.abspath(__file__)) if path[-1] != "/": path += "/" - path += environ.get('PATH_INFO', '').lstrip('/') + path += environ.get("PATH_INFO", "").lstrip("/") path = os.path.realpath(path) if not path.startswith(args.path): resp = Unauthorized() return resp(environ, start_response) - start_response('200 OK', [('Content-Type', "text/xml")]) - return open(path, 'r').read() + start_response("200 OK", [("Content-Type", "text/xml")]) + return open(path, "r").read() except Exception as ex: logger.error("An error occured while creating metadata: %s", ex.message) return not_found(environ, start_response) @@ -991,7 +1017,7 @@ def application(environ, start_response): :return: The response as a list of lines """ - path = environ.get('PATH_INFO', '').lstrip('/') + path = environ.get("PATH_INFO", "").lstrip("/") if path == "metadata": return metadata(environ, start_response) @@ -1022,9 +1048,9 @@ def application(environ, start_response): match = re.search(regex, path) if match is not None: try: - environ['myapp.url_args'] = match.groups()[0] + environ["myapp.url_args"] = match.groups()[0] except IndexError: - environ['myapp.url_args'] = path + environ["myapp.url_args"] = path logger.debug("Callback: %s", callback) if isinstance(callback, tuple): @@ -1034,46 +1060,55 @@ def application(environ, start_response): return func() return callback(environ, start_response, user) - if re.search(r'static/.*', path) is not None: + if re.search(r"static/.*", path) is not None: return staticfile(environ, start_response) return not_found(environ, start_response) + # ---------------------------------------------------------------------------- -if __name__ == '__main__': + +if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('-p', dest='path', help='Path to configuration file.', default='./idp_conf.py') - parser.add_argument('-v', dest='valid', - help="How long, in days, the metadata is valid from " - "the time of creation") - parser.add_argument('-c', dest='cert', help='certificate') - parser.add_argument('-i', dest='id', - help="The ID of the entities descriptor") - parser.add_argument('-k', dest='keyfile', - help="A file with a key to sign the metadata with") - parser.add_argument('-n', dest='name') - parser.add_argument('-s', dest='sign', action='store_true', - help="sign the metadata") - parser.add_argument('-m', dest='mako_root', default="./") + parser.add_argument( + "-p", dest="path", help="Path to configuration file.", default="./idp_conf.py" + ) + parser.add_argument( + "-v", + dest="valid", + help="How long, in days, the metadata is valid from " "the time of creation", + ) + parser.add_argument("-c", dest="cert", help="certificate") + parser.add_argument("-i", dest="id", help="The ID of the entities descriptor") + parser.add_argument( + "-k", dest="keyfile", help="A file with a key to sign the metadata with" + ) + parser.add_argument("-n", dest="name") + parser.add_argument( + "-s", dest="sign", action="store_true", help="sign the metadata" + ) + parser.add_argument("-m", dest="mako_root", default="./") parser.add_argument(dest="config") args = parser.parse_args() CONFIG = importlib.import_module(args.config) AUTHN_BROKER = AuthnBroker() - AUTHN_BROKER.add(authn_context_class_ref(PASSWORD), - username_password_authn, 10, - CONFIG.BASE) - AUTHN_BROKER.add(authn_context_class_ref(UNSPECIFIED), - "", 0, CONFIG.BASE) + AUTHN_BROKER.add( + authn_context_class_ref(PASSWORD), username_password_authn, 10, CONFIG.BASE + ) + AUTHN_BROKER.add(authn_context_class_ref(UNSPECIFIED), "", 0, CONFIG.BASE) IDP = server.Server(args.config, cache=Cache()) IDP.ticket = {} _rot = args.mako_root - LOOKUP = TemplateLookup(directories=[_rot + 'templates', _rot + 'htdocs'], - module_directory=_rot + 'modules', - input_encoding='utf-8', output_encoding='utf-8') + LOOKUP = TemplateLookup( + directories=[_rot + "templates", _rot + "htdocs"], + module_directory=_rot + "modules", + input_encoding="utf-8", + output_encoding="utf-8", + ) HOST = CONFIG.HOST PORT = CONFIG.PORT @@ -1097,9 +1132,9 @@ if __name__ == '__main__': https = "using HTTPS" # SRV.ssl_adapter = ssl_pyopenssl.pyOpenSSLAdapter( # config.SERVER_CERT, config.SERVER_KEY, config.CERT_CHAIN) - SRV.ssl_adapter = BuiltinSSLAdapter(CONFIG.SERVER_CERT, - CONFIG.SERVER_KEY, - CONFIG.CERT_CHAIN) + SRV.ssl_adapter = BuiltinSSLAdapter( + CONFIG.SERVER_CERT, CONFIG.SERVER_KEY, CONFIG.CERT_CHAIN + ) logger.info("Server starting") print("IDP listening on %s:%s%s" % (HOST, PORT, _https)) @@ -1107,4 +1142,3 @@ if __name__ == '__main__': SRV.start() except KeyboardInterrupt: SRV.stop() - diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py index 90d7838c..4e9eab9c 100755 --- a/example/sp-wsgi/sp.py +++ b/example/sp-wsgi/sp.py @@ -49,9 +49,8 @@ from saml2.saml import NAMEID_FORMAT_PERSISTENT from saml2.samlp import Extensions logger = logging.getLogger("") -hdlr = logging.FileHandler('spx.log') -base_formatter = logging.Formatter( - "%(asctime)s %(name)s:%(levelname)s %(message)s") +hdlr = logging.FileHandler("spx.log") +base_formatter = logging.Formatter("%(asctime)s %(name)s:%(levelname)s %(message)s") hdlr.setFormatter(base_formatter) logger.addHandler(hdlr) @@ -93,7 +92,7 @@ def dict_to_table(ava, lev=0, width=1): txt.extend(dict_to_table(valarr, lev + 1, width - 1)) txt.append("</td>\n") txt.append("</tr>\n") - txt.append('</table>\n') + txt.append("</table>\n") return txt @@ -108,19 +107,19 @@ def handle_static(environ, start_response, path): :return: wsgi response for the static file. """ try: - data = open(path, 'rb').read() + data = open(path, "rb").read() if path.endswith(".ico"): - resp = Response(data, headers=[('Content-Type', "image/x-icon")]) + resp = Response(data, headers=[("Content-Type", "image/x-icon")]) elif path.endswith(".html"): - resp = Response(data, headers=[('Content-Type', 'text/html')]) + resp = Response(data, headers=[("Content-Type", "text/html")]) elif path.endswith(".txt"): - resp = Response(data, headers=[('Content-Type', 'text/plain')]) + resp = Response(data, headers=[("Content-Type", "text/plain")]) elif path.endswith(".css"): - resp = Response(data, headers=[('Content-Type', 'text/css')]) + resp = Response(data, headers=[("Content-Type", "text/css")]) elif path.endswith(".js"): - resp = Response(data, headers=[('Content-Type', 'text/javascript')]) + resp = Response(data, headers=[("Content-Type", "text/javascript")]) elif path.endswith(".png"): - resp = Response(data, headers=[('Content-Type', 'image/png')]) + resp = Response(data, headers=[("Content-Type", "image/png")]) else: resp = Response(data) except IOError: @@ -130,22 +129,23 @@ def handle_static(environ, start_response, path): class ECPResponse(object): code = 200 - title = 'OK' + title = "OK" def __init__(self, content): self.content = content # noinspection PyUnusedLocal def __call__(self, environ, start_response): - start_response('%s %s' % (self.code, self.title), - [('Content-Type', "text/xml")]) + start_response( + "%s %s" % (self.code, self.title), [("Content-Type", "text/xml")] + ) return [self.content] def _expiration(timeout, tformat=None): # Wed, 06-Jun-2012 01:34:34 GMT if not tformat: - tformat = '%a, %d-%b-%Y %T GMT' + tformat = "%a, %d-%b-%Y %T GMT" if timeout == "now": return time_util.instant(tformat) @@ -165,7 +165,7 @@ class Cache(object): self.result = {} def get_user(self, environ): - cookie = environ.get("HTTP_COOKIE", '') + cookie = environ.get("HTTP_COOKIE", "") logger.debug("Cookie: %s", cookie) if cookie: cookie_obj = SimpleCookie(cookie) @@ -181,7 +181,7 @@ class Cache(object): return None def delete_cookie(self, environ): - cookie = environ.get("HTTP_COOKIE", '') + cookie = environ.get("HTTP_COOKIE", "") logger.debug("delete cookie: %s", cookie) if cookie: _name = self.cookie_name @@ -189,7 +189,7 @@ class Cache(object): morsel = cookie_obj.get(_name, None) cookie = SimpleCookie() cookie[_name] = "" - cookie[_name]['path'] = "/" + cookie[_name]["path"] = "/" logger.debug("Expire: %s", morsel) cookie[_name]["expires"] = _expiration("now") return cookie.output().split(": ", 1) @@ -200,7 +200,7 @@ class Cache(object): self.uid2user[uid] = user cookie = SimpleCookie() cookie[self.cookie_name] = uid - cookie[self.cookie_name]['path'] = "/" + cookie[self.cookie_name]["path"] = "/" cookie[self.cookie_name]["expires"] = _expiration(480) logger.debug("Cookie expires: %s", cookie[self.cookie_name]["expires"]) return cookie.output().split(": ", 1) @@ -227,7 +227,7 @@ class Service(object): return None def unpack_post(self): - _dict = parse_qs(get_post(self.environ).decode('utf8')) + _dict = parse_qs(get_post(self.environ).decode("utf8")) logger.debug("unpack_post:: %s", _dict) return dict([(k, v[0]) for k, v in _dict.items()]) @@ -251,7 +251,7 @@ class Service(object): def operation(self, _dict, binding): logger.debug("_operation: %s", _dict) if not _dict: - resp = BadRequest('Error parsing request or no request') + resp = BadRequest("Error parsing request or no request") return resp(self.environ, self.start_response) else: try: @@ -259,11 +259,13 @@ class Service(object): except KeyError: _relay_state = "" if "SAMLResponse" in _dict: - return self.do(_dict["SAMLResponse"], binding, - _relay_state, mtype="response") + return self.do( + _dict["SAMLResponse"], binding, _relay_state, mtype="response" + ) elif "SAMLRequest" in _dict: - return self.do(_dict["SAMLRequest"], binding, - _relay_state, mtype="request") + return self.do( + _dict["SAMLRequest"], binding, _relay_state, mtype="request" + ) def artifact_operation(self, _dict): if not _dict: @@ -315,7 +317,7 @@ class Service(object): return self.operation(_dict, BINDING_SOAP) def not_authn(self): - resp = Unauthorized('Unknown user') + resp = Unauthorized("Unknown user") return resp(self.environ, self.start_response) @@ -333,7 +335,8 @@ class User(object): @property def authn_statement(self): xml_doc = xml.dom.minidom.parseString( - str(self.response.assertion.authn_statement[0])) + str(self.response.assertion.authn_statement[0]) + ) return xml_doc.toprettyxml() @@ -354,18 +357,24 @@ class ACS(Service): # tmp_outstanding_queries = dict(self.outstanding_queries) if not response: logger.info("Missing Response") - resp = Unauthorized('Unknown user') + resp = Unauthorized("Unknown user") return resp(self.environ, self.start_response) try: - conv_info = {'remote_addr': self.environ['REMOTE_ADDR'], - 'request_uri': self.environ['REQUEST_URI'], - 'entity_id': self.sp.config.entityid, - 'endpoints': self.sp.config.getattr('endpoints', 'sp')} + conv_info = { + "remote_addr": self.environ["REMOTE_ADDR"], + "request_uri": self.environ["REQUEST_URI"], + "entity_id": self.sp.config.entityid, + "endpoints": self.sp.config.getattr("endpoints", "sp"), + } self.response = self.sp.parse_authn_request_response( - response, binding, self.outstanding_queries, - self.cache.outstanding_certs, conv_info=conv_info) + response, + binding, + self.outstanding_queries, + self.cache.outstanding_certs, + conv_info=conv_info, + ) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) resp = ServiceError("UnknownPrincipal: %s" % (excp,)) @@ -389,15 +398,12 @@ class ACS(Service): user = User(self.response.name_id, self.response.ava, self.response) cookie = self.cache.set_cookie(user) - resp = Redirect("/", headers=[ - cookie, - ]) + resp = Redirect("/", headers=[cookie]) return resp(self.environ, self.start_response) def verify_attributes(self, ava): logger.info("SP: %s", self.sp.config.entityid) - rest = POLICY.get_entity_categories( - self.sp.config.entityid, self.sp.metadata) + rest = POLICY.get_entity_categories(self.sp.config.entityid, self.sp.metadata) akeys = [k.lower() for k in ava.keys()] @@ -421,8 +427,16 @@ class ACS(Service): class SSO(object): - def __init__(self, sp, environ, start_response, cache=None, - wayf=None, discosrv=None, bindings=None): + def __init__( + self, + sp, + environ, + start_response, + cache=None, + wayf=None, + discosrv=None, + bindings=None, + ): self.sp = sp self.environ = environ self.start_response = start_response @@ -433,8 +447,11 @@ class SSO(object): if bindings: self.bindings = bindings else: - self.bindings = [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST, - BINDING_HTTP_ARTIFACT] + self.bindings = [ + BINDING_HTTP_REDIRECT, + BINDING_HTTP_POST, + BINDING_HTTP_ARTIFACT, + ] logger.debug("--- SSO ---") def response(self, binding, http_args, do_not_start_response=False): @@ -459,7 +476,7 @@ class SSO(object): sid_ = sid() self.cache.outstanding_queries[sid_] = came_from logger.debug("Redirect to WAYF function: %s", self.wayf) - return -1, SeeOther(headers=[('Location', "%s?%s" % (self.wayf, sid_))]) + return -1, SeeOther(headers=[("Location", "%s?%s" % (self.wayf, sid_))]) def _pick_idp(self, came_from): """ @@ -481,24 +498,23 @@ class SSO(object): _rstate = rndstr() self.cache.relay_state[_rstate] = geturl(self.environ) - _entityid = _cli.config.ecp_endpoint( - self.environ["REMOTE_ADDR"]) + _entityid = _cli.config.ecp_endpoint(self.environ["REMOTE_ADDR"]) if not _entityid: return -1, ServiceError("No IdP to talk to") logger.debug("IdP to talk to: %s", _entityid) return ecp.ecp_auth_request(_cli, _entityid, _rstate) else: - return -1, ServiceError('Faulty Accept header') + return -1, ServiceError("Faulty Accept header") else: - return -1, ServiceError('unknown ECP version') + return -1, ServiceError("unknown ECP version") # Find all IdPs idps = self.sp.metadata.with_descriptor("idpsso") idp_entity_id = None - kaka = self.environ.get("HTTP_COOKIE", '') + kaka = self.environ.get("HTTP_COOKIE", "") if kaka: try: (idp_entity_id, _) = parse_cookie("ve_disco", "SEED_SAW", kaka) @@ -511,8 +527,7 @@ class SSO(object): query = self.environ.get("QUERY_STRING") if not idp_entity_id and query: try: - _idp_entity_id = dict(parse_qs(query))[ - self.idp_query_param][0] + _idp_entity_id = dict(parse_qs(query))[self.idp_query_param][0] if _idp_entity_id in idps: idp_entity_id = _idp_entity_id except KeyError: @@ -524,8 +539,7 @@ class SSO(object): if self.wayf: if query: try: - wayf_selected = dict(parse_qs(query))[ - "wayf_selected"][0] + wayf_selected = dict(parse_qs(query))["wayf_selected"][0] except KeyError: return self._wayf_redirect(came_from) idp_entity_id = wayf_selected @@ -534,23 +548,26 @@ class SSO(object): elif self.discosrv: if query: idp_entity_id = _cli.parse_discovery_service_response( - query=self.environ.get("QUERY_STRING")) + query=self.environ.get("QUERY_STRING") + ) if not idp_entity_id: sid_ = sid() self.cache.outstanding_queries[sid_] = came_from logger.debug("Redirect to Discovery Service function") eid = _cli.config.entityid - ret = _cli.config.getattr("endpoints", - "sp")["discovery_response"][0][0] + ret = _cli.config.getattr("endpoints", "sp")["discovery_response"][ + 0 + ][0] ret += "?sid=%s" % sid_ loc = _cli.create_discovery_service_request( - self.discosrv, eid, **{"return": ret}) + self.discosrv, eid, **{"return": ret} + ) return -1, SeeOther(loc) elif len(idps) == 1: # idps is a dictionary idp_entity_id = list(idps.keys())[0] elif not len(idps): - return -1, ServiceError('Misconfiguration') + return -1, ServiceError("Misconfiguration") else: return -1, NotImplemented("No WAYF or DS present!") @@ -561,14 +578,12 @@ class SSO(object): try: # Picks a binding to use for sending the Request to the IDP _binding, destination = _cli.pick_binding( - "single_sign_on_service", self.bindings, "idpsso", - entity_id=entity_id) - logger.debug("binding: %s, destination: %s", _binding, - destination) + "single_sign_on_service", self.bindings, "idpsso", entity_id=entity_id + ) + logger.debug("binding: %s, destination: %s", _binding, destination) # Binding here is the response binding that is which binding the # IDP should use to return the response. - acs = _cli.config.getattr("endpoints", "sp")[ - "assertion_consumer_service"] + acs = _cli.config.getattr("endpoints", "sp")["assertion_consumer_service"] # just pick one endp, return_binding = acs[0] @@ -576,24 +591,27 @@ class SSO(object): cert = None if _cli.config.generate_cert_func is not None: cert_str, req_key_str = _cli.config.generate_cert_func() - cert = { - "cert": cert_str, - "key": req_key_str - } - spcertenc = SPCertEnc(x509_data=ds.X509Data( - x509_certificate=ds.X509Certificate(text=cert_str))) - extensions = Extensions(extension_elements=[ - element_to_extension_element(spcertenc)]) - - req_id, req = _cli.create_authn_request(destination, - binding=return_binding, - extensions=extensions, - nameid_format=NAMEID_FORMAT_PERSISTENT) + cert = {"cert": cert_str, "key": req_key_str} + spcertenc = SPCertEnc( + x509_data=ds.X509Data( + x509_certificate=ds.X509Certificate(text=cert_str) + ) + ) + extensions = Extensions( + extension_elements=[element_to_extension_element(spcertenc)] + ) + + req_id, req = _cli.create_authn_request( + destination, + binding=return_binding, + extensions=extensions, + nameid_format=NAMEID_FORMAT_PERSISTENT, + ) _rstate = rndstr() self.cache.relay_state[_rstate] = came_from - ht_args = _cli.apply_binding(_binding, "%s" % req, destination, - relay_state=_rstate, - sigalg=sigalg) + ht_args = _cli.apply_binding( + _binding, "%s" % req, destination, relay_state=_rstate, sigalg=sigalg + ) _sid = req_id if cert is not None: @@ -601,8 +619,7 @@ class SSO(object): except Exception as exc: logger.exception(exc) - resp = ServiceError( - "Failed to construct the AuthnRequest: %s" % exc) + resp = ServiceError("Failed to construct the AuthnRequest: %s" % exc) return resp # remember the request @@ -646,7 +663,7 @@ class SLO(Service): def do(self, message, binding, relay_state="", mtype="response"): try: txt = decode_base64_and_inflate(message) - is_logout_request = 'LogoutRequest' in txt.split('>', 1)[0] + is_logout_request = "LogoutRequest" in txt.split(">", 1)[0] except: # TODO: parse the XML correctly is_logout_request = False @@ -664,7 +681,7 @@ class SLO(Service): # noinspection PyUnusedLocal def not_found(environ, start_response): """Called if no URL matches.""" - resp = NotFound('Not Found') + resp = NotFound("Not Found") return resp(environ, start_response) @@ -681,7 +698,7 @@ def main(environ, start_response, sp): body = dict_to_table(user.data) authn_stmt = cgi.escape(user.authn_statement) - body.append('<br><pre>' + authn_stmt + "</pre>") + body.append("<br><pre>" + authn_stmt + "</pre>") body.append('<br><a href="/logout">logout</a>') resp = Response(body) @@ -724,19 +741,19 @@ def logout(environ, start_response, sp): binding, http_info = logout_info if binding == BINDING_HTTP_POST: - body = ''.join(http_info['data']) + body = "".join(http_info["data"]) resp = Response(body) return resp(environ, start_response) elif binding == BINDING_HTTP_REDIRECT: - for key, value in http_info['headers']: - if key.lower() == 'location': + for key, value in http_info["headers"]: + if key.lower() == "location": resp = Redirect(value) return resp(environ, start_response) - resp = ServiceError('missing Location header') + resp = ServiceError("missing Location header") return resp(environ, start_response) else: - resp = ServiceError('unknown logout binding: %s', binding) + resp = ServiceError("unknown logout binding: %s", binding) return resp(environ, start_response) else: # result from logout, should be OK pass @@ -751,9 +768,7 @@ def finish_logout(environ, start_response): # remove cookie and stored info cookie = CACHE.delete_cookie(environ) - resp = Response('You are now logged out of this service', headers=[ - cookie, - ]) + resp = Response("You are now logged out of this service", headers=[cookie]) return resp(environ, start_response) @@ -762,10 +777,10 @@ def finish_logout(environ, start_response): # map urls to functions urls = [ # Hmm, place holder, NOT used - ('place', ("holder", None)), - (r'^$', main), - (r'^disco', disco), - (r'^logout$', logout), + ("place", ("holder", None)), + (r"^$", main), + (r"^disco", disco), + (r"^logout$", logout), ] @@ -787,6 +802,7 @@ def add_urls(): # ---------------------------------------------------------------------------- + def metadata(environ, start_response): try: path = _args.path @@ -794,11 +810,17 @@ def metadata(environ, start_response): path = os.path.dirname(os.path.abspath(__file__)) if path[-1] != "/": path += "/" - metadata = create_metadata_string(path + "sp_conf.py", None, - _args.valid, _args.cert, - _args.keyfile, - _args.id, _args.name, _args.sign) - start_response('200 OK', [('Content-Type', "text/xml")]) + metadata = create_metadata_string( + path + "sp_conf.py", + None, + _args.valid, + _args.cert, + _args.keyfile, + _args.id, + _args.name, + _args.sign, + ) + start_response("200 OK", [("Content-Type", "text/xml")]) return metadata except Exception as ex: logger.error("An error occured while creating metadata: %s", ex.message) @@ -817,7 +839,7 @@ def application(environ, start_response): request is done :return: The response as a list of lines """ - path = environ.get('PATH_INFO', '').lstrip('/') + path = environ.get("PATH_INFO", "").lstrip("/") logger.debug("<application> PATH: '%s'", path) if path == "metadata": @@ -850,7 +872,7 @@ def application(environ, start_response): return resp(environ, start_response) -if __name__ == '__main__': +if __name__ == "__main__": try: from cheroot.wsgi import Server as WSGIServer from cheroot.ssl import pyopenssl @@ -859,30 +881,34 @@ if __name__ == '__main__': from cherrypy.wsgiserver import ssl_pyopenssl as pyopenssl _parser = argparse.ArgumentParser() - _parser.add_argument('-d', dest='debug', action='store_true', - help="Print debug information") - _parser.add_argument('-D', dest='discosrv', - help="Which disco server to use") - _parser.add_argument('-s', dest='seed', - help="Cookie seed") - _parser.add_argument('-W', dest='wayf', action='store_true', - help="Which WAYF url to use") + _parser.add_argument( + "-d", dest="debug", action="store_true", help="Print debug information" + ) + _parser.add_argument("-D", dest="discosrv", help="Which disco server to use") + _parser.add_argument("-s", dest="seed", help="Cookie seed") + _parser.add_argument( + "-W", dest="wayf", action="store_true", help="Which WAYF url to use" + ) _parser.add_argument("config", help="SAML client config") - _parser.add_argument('-p', dest='path', help='Path to configuration file.') - _parser.add_argument('-v', dest='valid', default="4", - help="How long, in days, the metadata is valid from " - "the time of creation") - _parser.add_argument('-c', dest='cert', help='certificate') - _parser.add_argument('-i', dest='id', - help="The ID of the entities descriptor in the " - "metadata") - _parser.add_argument('-k', dest='keyfile', - help="A file with a key to sign the metadata with") - _parser.add_argument('-n', dest='name') - _parser.add_argument('-S', dest='sign', action='store_true', - help="sign the metadata") - _parser.add_argument('-C', dest='service_conf_module', - help="service config module") + _parser.add_argument("-p", dest="path", help="Path to configuration file.") + _parser.add_argument( + "-v", + dest="valid", + default="4", + help="How long, in days, the metadata is valid from " "the time of creation", + ) + _parser.add_argument("-c", dest="cert", help="certificate") + _parser.add_argument( + "-i", dest="id", help="The ID of the entities descriptor in the " "metadata" + ) + _parser.add_argument( + "-k", dest="keyfile", help="A file with a key to sign the metadata with" + ) + _parser.add_argument("-n", dest="name") + _parser.add_argument( + "-S", dest="sign", action="store_true", help="sign the metadata" + ) + _parser.add_argument("-C", dest="service_conf_module", help="service config module") ARGS = {} _args = _parser.parse_args() @@ -935,7 +961,8 @@ if __name__ == '__main__': _https = "" if service_conf.HTTPS: SRV.ssl_adapter = pyopenssl.pyOpenSSLAdapter( - SERVER_CERT, SERVER_KEY, CERT_CHAIN) + SERVER_CERT, SERVER_KEY, CERT_CHAIN + ) _https = " using SSL/TLS" logger.info("Server starting") print("SP listening on %s:%s%s" % (HOST, PORT, _https)) |