diff options
Diffstat (limited to 'example')
-rwxr-xr-x | example/idp2/idp.py | 32 | ||||
-rwxr-xr-x | example/idp2/idp_uwsgi.py | 20 | ||||
-rwxr-xr-x | example/idp2_repoze/idp.py | 22 | ||||
-rw-r--r-- | example/idp2_repoze/modules/login.mako.py | 1 | ||||
-rw-r--r-- | example/idp2_repoze/modules/root.mako.py | 1 | ||||
-rw-r--r-- | example/sp-repoze/pki/certgeneration.py | 1 | ||||
-rwxr-xr-x | example/sp-repoze/sp.py | 2 | ||||
-rwxr-xr-x | example/sp-wsgi/sp.py | 43 |
8 files changed, 59 insertions, 63 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py index d8d71399..fd310a53 100755 --- a/example/idp2/idp.py +++ b/example/idp2/idp.py @@ -2,18 +2,18 @@ import argparse import base64 from hashlib import sha1 +from http.cookies import SimpleCookie import importlib import logging import os import re import time +from urllib.parse import parse_qs from idp_user import EXTRA from idp_user import USERS from mako.lookup import TemplateLookup import six -from six.moves.http_cookies import SimpleCookie -from six.moves.urllib.parse import parse_qs from saml2 import BINDING_HTTP_ARTIFACT from saml2 import BINDING_HTTP_POST @@ -61,7 +61,7 @@ logger = logging.getLogger("saml2.idp") logger.setLevel(logging.WARNING) -class Cache(object): +class Cache: def __init__(self): self.user2uid = {} self.uid2user = {} @@ -93,7 +93,7 @@ def dict2list_of_tuples(d): # ----------------------------------------------------------------------------- -class Service(object): +class Service: def __init__(self, environ, start_response, user=None): self.environ = environ logger.debug("ENVIRON: %s", environ) @@ -103,7 +103,7 @@ class Service(object): def unpack_redirect(self): if "QUERY_STRING" in self.environ: _qs = self.environ["QUERY_STRING"] - return dict([(k, v[0]) for k, v in parse_qs(_qs).items()]) + return {k: v[0] for k, v in parse_qs(_qs).items()} else: return None @@ -112,7 +112,7 @@ class Service(object): _dict = parse_qs(post_data if isinstance(post_data, str) else post_data.decode("utf-8")) logger.debug("unpack_post:: %s", _dict) try: - return dict([(k, v[0]) for k, v in _dict.items()]) + return {k: v[0] for k, v in _dict.items()} except Exception: return None @@ -323,11 +323,11 @@ class SSO(Service): resp_args, _resp = self.verify_request(query, binding_in) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) - resp = ServiceError("UnknownPrincipal: %s" % (excp,)) + resp = ServiceError(f"UnknownPrincipal: {excp}") return resp(self.environ, self.start_response) except UnsupportedBinding as excp: logger.error("UnsupportedBinding: %s", excp) - resp = ServiceError("UnsupportedBinding: %s" % (excp,)) + resp = ServiceError(f"UnsupportedBinding: {excp}") return resp(self.environ, self.start_response) if not _resp: @@ -350,7 +350,7 @@ class SSO(Service): ) except Exception as excp: logging.error(exception_trace(excp)) - resp = ServiceError("Exception: %s" % (excp,)) + resp = ServiceError(f"Exception: {excp}") return resp(self.environ, self.start_response) logger.info("AuthNResponse: %s", _resp) @@ -566,7 +566,7 @@ def verify_username_and_password(dic): def do_verify(environ, start_response, _): query_str = get_post(environ) - if not isinstance(query_str, six.string_types): + if not isinstance(query_str, str): query_str = query_str.decode("ascii") query = parse_qs(query_str) @@ -588,7 +588,7 @@ def do_verify(environ, start_response, _): kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0]) - lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0]) + lox = "{}?id={}&key={}".format(query["redirect_uri"][0], uid, query["key"][0]) logger.debug("Redirect => %s", lox) resp = Redirect(lox, headers=[kaka], content="text/html") @@ -849,7 +849,7 @@ def info_from_cookie(kaka): if morsel: try: data = base64.b64decode(morsel.value) - if not isinstance(data, six.string_types): + if not isinstance(data, str): data = data.decode("ascii") key, ref = data.split(":", 1) return IDP.cache.uid2user[key], ref @@ -879,11 +879,11 @@ def set_cookie(name, _, *args): cookie = SimpleCookie() data = ":".join(args) - if not isinstance(data, six.binary_type): + if not isinstance(data, bytes): data = data.encode("ascii") data64 = base64.b64encode(data) - if not isinstance(data64, six.string_types): + if not isinstance(data64, str): data64 = data64.decode("ascii") cookie[name] = data64 @@ -979,7 +979,7 @@ def staticfile(environ, start_response): resp = Unauthorized() return resp(environ, start_response) start_response("200 OK", [("Content-Type", "text/xml")]) - return open(path, "r").read() + return open(path).read() except Exception as ex: logger.error("An error occured while creating metadata: %s", ex.message) return not_found(environ, start_response) @@ -1110,7 +1110,7 @@ if __name__ == "__main__": SRV.ssl_adapter = BuiltinSSLAdapter(CONFIG.SERVER_CERT, CONFIG.SERVER_KEY, CONFIG.CERT_CHAIN) logger.info("Server starting") - print("IDP listening on %s:%s%s" % (HOST, PORT, _https)) + print(f"IDP listening on {HOST}:{PORT}{_https}") try: SRV.start() except KeyboardInterrupt: diff --git a/example/idp2/idp_uwsgi.py b/example/idp2/idp_uwsgi.py index 89f3bade..442cbae4 100755 --- a/example/idp2/idp_uwsgi.py +++ b/example/idp2/idp_uwsgi.py @@ -47,7 +47,7 @@ from saml2.sigver import verify_redirect_signature logger = logging.getLogger("saml2.idp") -class Cache(object): +class Cache: def __init__(self): self.user2uid = {} self.uid2user = {} @@ -83,7 +83,7 @@ def dict2list_of_tuples(d): # ----------------------------------------------------------------------------- -class Service(object): +class Service: def __init__(self, environ, start_response, user=None): self.environ = environ logger.debug("ENVIRON: %s", environ) @@ -93,7 +93,7 @@ class Service(object): def unpack_redirect(self): if "QUERY_STRING" in self.environ: _qs = self.environ["QUERY_STRING"] - return dict([(k, v[0]) for k, v in parse_qs(_qs).items()]) + return {k: v[0] for k, v in parse_qs(_qs).items()} else: return None @@ -101,7 +101,7 @@ class Service(object): _dict = parse_qs(get_post(self.environ)) logger.debug("unpack_post:: %s", _dict) try: - return dict([(k, v[0]) for k, v in _dict.items()]) + return {k: v[0] for k, v in _dict.items()} except Exception: return None @@ -276,11 +276,11 @@ class SSO(Service): resp_args, _resp = self.verify_request(query, binding_in) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) - resp = ServiceError("UnknownPrincipal: %s" % (excp,)) + resp = ServiceError(f"UnknownPrincipal: {excp}") return resp(self.environ, self.start_response) except UnsupportedBinding as excp: logger.error("UnsupportedBinding: %s", excp) - resp = ServiceError("UnsupportedBinding: %s" % (excp,)) + resp = ServiceError(f"UnsupportedBinding: {excp}") return resp(self.environ, self.start_response) if not _resp: @@ -301,7 +301,7 @@ class SSO(Service): _resp = IDP.create_authn_response(identity, userid=self.user, encrypt_cert=encrypt_cert, **resp_args) except Exception as excp: logging.error(exception_trace(excp)) - resp = ServiceError("Exception: %s" % (excp,)) + resp = ServiceError(f"Exception: {excp}") return resp(self.environ, self.start_response) logger.info("AuthNResponse: %s", _resp) @@ -511,7 +511,7 @@ def do_verify(environ, start_response, _): kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0]) - lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0]) + lox = "{}?id={}&key={}".format(query["redirect_uri"][0], uid, query["key"][0]) logger.debug("Redirect => %s", lox) resp = Redirect(lox, headers=[kaka], content="text/html") @@ -861,7 +861,7 @@ def staticfile(environ, start_response): resp = Unauthorized() return resp(environ, start_response) start_response("200 OK", [("Content-Type", "text/xml")]) - return open(path, "r").read() + return open(path).read() except Exception as ex: logger.error("An error occured while creating metadata:", ex.message) return not_found(environ, start_response) @@ -985,7 +985,7 @@ if __name__ == "__main__": PORT = CONFIG.PORT SRV = make_server(HOST, PORT, application) - print("IdP listening on %s:%s" % (HOST, PORT)) + print(f"IdP listening on {HOST}:{PORT}") SRV.serve_forever() else: _rot = args.mako_root diff --git a/example/idp2_repoze/idp.py b/example/idp2_repoze/idp.py index aae2a3f1..9be36b71 100755 --- a/example/idp2_repoze/idp.py +++ b/example/idp2_repoze/idp.py @@ -44,7 +44,7 @@ from saml2.sigver import verify_redirect_signature logger = logging.getLogger("saml2.idp") -class Cache(object): +class Cache: def __init__(self): self.user2uid = {} self.uid2user = {} @@ -80,7 +80,7 @@ def dict2list_of_tuples(d): # ----------------------------------------------------------------------------- -class Service(object): +class Service: def __init__(self, environ, start_response, user=None): self.environ = environ logger.debug("ENVIRON: %s", environ) @@ -90,7 +90,7 @@ class Service(object): def unpack_redirect(self): if "QUERY_STRING" in self.environ: _qs = self.environ["QUERY_STRING"] - return dict([(k, v[0]) for k, v in parse_qs(_qs).items()]) + return {k: v[0] for k, v in parse_qs(_qs).items()} else: return None @@ -98,7 +98,7 @@ class Service(object): _dict = parse_qs(get_post(self.environ)) logger.debug("unpack_post:: %s", _dict) try: - return dict([(k, v[0]) for k, v in _dict.items()]) + return {k: v[0] for k, v in _dict.items()} except Exception: return None @@ -277,11 +277,11 @@ class SSO(Service): resp_args, _resp = self.verify_request(query, binding_in) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) - resp = ServiceError("UnknownPrincipal: %s" % (excp,)) + resp = ServiceError(f"UnknownPrincipal: {excp}") return resp(self.environ, self.start_response) except UnsupportedBinding as excp: logger.error("UnsupportedBinding: %s", excp) - resp = ServiceError("UnsupportedBinding: %s" % (excp,)) + resp = ServiceError(f"UnsupportedBinding: {excp}") return resp(self.environ, self.start_response) if not _resp: @@ -301,11 +301,11 @@ class SSO(Service): authn=AUTHN_BROKER[self.environ["idp.authn_ref"]], sign_assertion=sign_assertion, sign_response=False, - **resp_args + **resp_args, ) except Exception as excp: logging.error(exception_trace(excp)) - resp = ServiceError("Exception: %s" % (excp,)) + resp = ServiceError(f"Exception: {excp}") return resp(self.environ, self.start_response) logger.info("AuthNResponse: %s", _resp) @@ -505,7 +505,7 @@ def do_verify(environ, start_response, _): kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0]) - lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0]) + lox = "{}?id={}&key={}".format(query["redirect_uri"][0], uid, query["key"][0]) logger.debug("Redirect => %s", lox) resp = Redirect(lox, headers=[kaka], content="text/html") @@ -851,7 +851,7 @@ def staticfile(environ, start_response): path += "/" path += environ.get("PATH_INFO", "").lstrip("/") start_response("200 OK", [("Content-Type", "text/xml")]) - return open(path, "r").read() + return open(path).read() except Exception as ex: logger.error("An error occured while creating metadata: %s", ex.message) return not_found(environ, start_response) @@ -972,7 +972,7 @@ if __name__ == "__main__": PORT = 8088 SRV = make_server(HOST, PORT, application) - print("IdP listening on %s:%s" % (HOST, PORT)) + print(f"IdP listening on {HOST}:{PORT}") SRV.serve_forever() else: _rot = args.mako_root diff --git a/example/idp2_repoze/modules/login.mako.py b/example/idp2_repoze/modules/login.mako.py index 4603f6fb..abc10236 100644 --- a/example/idp2_repoze/modules/login.mako.py +++ b/example/idp2_repoze/modules/login.mako.py @@ -1,4 +1,3 @@ -# -*- encoding:utf-8 -*- from mako import runtime, filters, cache UNDEFINED = runtime.UNDEFINED diff --git a/example/idp2_repoze/modules/root.mako.py b/example/idp2_repoze/modules/root.mako.py index 9b21c4b3..16eceec6 100644 --- a/example/idp2_repoze/modules/root.mako.py +++ b/example/idp2_repoze/modules/root.mako.py @@ -1,4 +1,3 @@ -# -*- encoding:utf-8 -*- from mako import runtime, filters, cache UNDEFINED = runtime.UNDEFINED diff --git a/example/sp-repoze/pki/certgeneration.py b/example/sp-repoze/pki/certgeneration.py index 4b61feed..84061c38 100644 --- a/example/sp-repoze/pki/certgeneration.py +++ b/example/sp-repoze/pki/certgeneration.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*- from saml2.cert import OpenSSLWrapper diff --git a/example/sp-repoze/sp.py b/example/sp-repoze/sp.py index e3a59a89..7fd1dde5 100755 --- a/example/sp-repoze/sp.py +++ b/example/sp-repoze/sp.py @@ -297,5 +297,5 @@ if __name__ == "__main__": from wsgiref.simple_server import make_server srv = make_server(HOST, PORT, app_with_auth) - print("SP listening on %s:%s" % (HOST, PORT)) + print(f"SP listening on {HOST}:{PORT}") srv.serve_forever() diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py index 9687d581..0486a787 100755 --- a/example/sp-wsgi/sp.py +++ b/example/sp-wsgi/sp.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -from __future__ import print_function import argparse @@ -9,16 +8,16 @@ try: except: import cgi as html +from http.cookies import SimpleCookie import importlib import logging import os import re import sys +from urllib.parse import parse_qs import xml.dom.minidom import six -from six.moves.http_cookies import SimpleCookie -from six.moves.urllib.parse import parse_qs from saml2 import BINDING_HTTP_ARTIFACT from saml2 import BINDING_HTTP_POST @@ -78,7 +77,7 @@ def dict_to_table(ava, lev=0, width=1): txt = ['<table border=%s bordercolor="black">\n' % width] for prop, valarr in ava.items(): txt.append("<tr>\n") - if isinstance(valarr, six.string_types): + if isinstance(valarr, str): txt.append("<th>%s</th>\n" % str(prop)) txt.append("<td>%s</td>\n" % valarr) elif isinstance(valarr, list): @@ -135,12 +134,12 @@ def handle_static(environ, start_response, path): resp = Response(data, headers=[("Content-Type", "image/png")]) else: resp = Response(data) - except IOError: + except OSError: resp = NotFound() return resp(environ, start_response) -class ECPResponse(object): +class ECPResponse: code = 200 title = "OK" @@ -149,7 +148,7 @@ class ECPResponse(object): # noinspection PyUnusedLocal def __call__(self, environ, start_response): - start_response("%s %s" % (self.code, self.title), [("Content-Type", "text/xml")]) + start_response(f"{self.code} {self.title}", [("Content-Type", "text/xml")]) return [self.content] @@ -165,7 +164,7 @@ def _expiration(timeout, tformat=None): return time_util.in_a_while(minutes=timeout, format=tformat) -class Cache(object): +class Cache: def __init__(self): self.uid2user = {} self.cookie_name = "spauthn" @@ -222,7 +221,7 @@ class Cache(object): # ----------------------------------------------------------------------------- -class Service(object): +class Service: def __init__(self, environ, start_response, user=None): self.environ = environ logger.debug("ENVIRON: %s", environ) @@ -233,14 +232,14 @@ class Service(object): def unpack_redirect(self): if "QUERY_STRING" in self.environ: _qs = self.environ["QUERY_STRING"] - return dict([(k, v[0]) for k, v in parse_qs(_qs).items()]) + return {k: v[0] for k, v in parse_qs(_qs).items()} else: return None def unpack_post(self): _dict = parse_qs(get_post(self.environ).decode("utf8")) logger.debug("unpack_post:: %s", _dict) - return dict([(k, v[0]) for k, v in _dict.items()]) + return {k: v[0] for k, v in _dict.items()} def unpack_soap(self): try: @@ -333,7 +332,7 @@ class Service(object): # ----------------------------------------------------------------------------- -class User(object): +class User: def __init__(self, name_id, data, saml_response): self.name_id = name_id self.data = data @@ -382,20 +381,20 @@ class ACS(Service): ) except UnknownPrincipal as excp: logger.error("UnknownPrincipal: %s", excp) - resp = ServiceError("UnknownPrincipal: %s" % (excp,)) + resp = ServiceError(f"UnknownPrincipal: {excp}") return resp(self.environ, self.start_response) except UnsupportedBinding as excp: logger.error("UnsupportedBinding: %s", excp) - resp = ServiceError("UnsupportedBinding: %s" % (excp,)) + resp = ServiceError(f"UnsupportedBinding: {excp}") return resp(self.environ, self.start_response) except VerificationError as err: - resp = ServiceError("Verification error: %s" % (err,)) + resp = ServiceError(f"Verification error: {err}") return resp(self.environ, self.start_response) except SignatureError as err: - resp = ServiceError("Signature error: %s" % (err,)) + resp = ServiceError(f"Signature error: {err}") return resp(self.environ, self.start_response) except Exception as err: - resp = ServiceError("Other error: %s" % (err,)) + resp = ServiceError(f"Other error: {err}") return resp(self.environ, self.start_response) logger.info("AVA: %s", self.response.ava) @@ -431,7 +430,7 @@ class ACS(Service): # ----------------------------------------------------------------------------- -class SSO(object): +class SSO: def __init__( self, sp, @@ -481,7 +480,7 @@ class SSO(object): sid_ = sid() self.cache.outstanding_queries[sid_] = came_from logger.debug("Redirect to WAYF function: %s", self.wayf) - return -1, SeeOther(headers=[("Location", "%s?%s" % (self.wayf, sid_))]) + return -1, SeeOther(headers=[("Location", f"{self.wayf}?{sid_}")]) def _pick_idp(self, came_from): """ @@ -688,7 +687,7 @@ def main(environ, start_response, sp): return sso.do() body = dict_to_table(user.data) - body.append("<br><pre>{authn_stmt}</pre>".format(authn_stmt=_html_escape(user.authn_statement))) + body.append(f"<br><pre>{_html_escape(user.authn_statement)}</pre>") body.append("<br><a href='/logout'>logout</a>") resp = Response(body) @@ -862,7 +861,7 @@ def application(environ, start_response): return resp(environ, start_response) -class ToBytesMiddleware(object): +class ToBytesMiddleware: """Converts a message to bytes to be sent by WSGI server.""" def __init__(self, app): @@ -960,7 +959,7 @@ if __name__ == "__main__": SRV.ssl_adapter = pyopenssl.pyOpenSSLAdapter(SERVER_CERT, SERVER_KEY, CERT_CHAIN) _https = " using SSL/TLS" logger.info("Server starting") - print("SP listening on %s:%s%s" % (HOST, PORT, _https)) + print(f"SP listening on {HOST}:{PORT}{_https}") try: SRV.start() except KeyboardInterrupt: |