summaryrefslogtreecommitdiff
path: root/example
diff options
context:
space:
mode:
authorAarni Koskela <akx@iki.fi>2022-10-27 15:13:08 +0300
committerIvan Kanakarakis <ivan.kanak@gmail.com>2022-11-15 13:06:41 +0200
commit88feeba03c2f891a31a86cbb24b210070aab1fdc (patch)
tree4bf48f6b2ca8e86aac23e825fbc6e84b134c400c /example
parent4fa20a92a9d7fccc2ca34f1f6ad777cc0fd36ef7 (diff)
downloadpysaml2-88feeba03c2f891a31a86cbb24b210070aab1fdc.tar.gz
Run pyupgrade --py36-plus + black + isort
Diffstat (limited to 'example')
-rwxr-xr-xexample/idp2/idp.py32
-rwxr-xr-xexample/idp2/idp_uwsgi.py20
-rwxr-xr-xexample/idp2_repoze/idp.py22
-rw-r--r--example/idp2_repoze/modules/login.mako.py1
-rw-r--r--example/idp2_repoze/modules/root.mako.py1
-rw-r--r--example/sp-repoze/pki/certgeneration.py1
-rwxr-xr-xexample/sp-repoze/sp.py2
-rwxr-xr-xexample/sp-wsgi/sp.py43
8 files changed, 59 insertions, 63 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py
index d8d71399..fd310a53 100755
--- a/example/idp2/idp.py
+++ b/example/idp2/idp.py
@@ -2,18 +2,18 @@
import argparse
import base64
from hashlib import sha1
+from http.cookies import SimpleCookie
import importlib
import logging
import os
import re
import time
+from urllib.parse import parse_qs
from idp_user import EXTRA
from idp_user import USERS
from mako.lookup import TemplateLookup
import six
-from six.moves.http_cookies import SimpleCookie
-from six.moves.urllib.parse import parse_qs
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_HTTP_POST
@@ -61,7 +61,7 @@ logger = logging.getLogger("saml2.idp")
logger.setLevel(logging.WARNING)
-class Cache(object):
+class Cache:
def __init__(self):
self.user2uid = {}
self.uid2user = {}
@@ -93,7 +93,7 @@ def dict2list_of_tuples(d):
# -----------------------------------------------------------------------------
-class Service(object):
+class Service:
def __init__(self, environ, start_response, user=None):
self.environ = environ
logger.debug("ENVIRON: %s", environ)
@@ -103,7 +103,7 @@ class Service(object):
def unpack_redirect(self):
if "QUERY_STRING" in self.environ:
_qs = self.environ["QUERY_STRING"]
- return dict([(k, v[0]) for k, v in parse_qs(_qs).items()])
+ return {k: v[0] for k, v in parse_qs(_qs).items()}
else:
return None
@@ -112,7 +112,7 @@ class Service(object):
_dict = parse_qs(post_data if isinstance(post_data, str) else post_data.decode("utf-8"))
logger.debug("unpack_post:: %s", _dict)
try:
- return dict([(k, v[0]) for k, v in _dict.items()])
+ return {k: v[0] for k, v in _dict.items()}
except Exception:
return None
@@ -323,11 +323,11 @@ class SSO(Service):
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
- resp = ServiceError("UnknownPrincipal: %s" % (excp,))
+ resp = ServiceError(f"UnknownPrincipal: {excp}")
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s", excp)
- resp = ServiceError("UnsupportedBinding: %s" % (excp,))
+ resp = ServiceError(f"UnsupportedBinding: {excp}")
return resp(self.environ, self.start_response)
if not _resp:
@@ -350,7 +350,7 @@ class SSO(Service):
)
except Exception as excp:
logging.error(exception_trace(excp))
- resp = ServiceError("Exception: %s" % (excp,))
+ resp = ServiceError(f"Exception: {excp}")
return resp(self.environ, self.start_response)
logger.info("AuthNResponse: %s", _resp)
@@ -566,7 +566,7 @@ def verify_username_and_password(dic):
def do_verify(environ, start_response, _):
query_str = get_post(environ)
- if not isinstance(query_str, six.string_types):
+ if not isinstance(query_str, str):
query_str = query_str.decode("ascii")
query = parse_qs(query_str)
@@ -588,7 +588,7 @@ def do_verify(environ, start_response, _):
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
- lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0])
+ lox = "{}?id={}&key={}".format(query["redirect_uri"][0], uid, query["key"][0])
logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
@@ -849,7 +849,7 @@ def info_from_cookie(kaka):
if morsel:
try:
data = base64.b64decode(morsel.value)
- if not isinstance(data, six.string_types):
+ if not isinstance(data, str):
data = data.decode("ascii")
key, ref = data.split(":", 1)
return IDP.cache.uid2user[key], ref
@@ -879,11 +879,11 @@ def set_cookie(name, _, *args):
cookie = SimpleCookie()
data = ":".join(args)
- if not isinstance(data, six.binary_type):
+ if not isinstance(data, bytes):
data = data.encode("ascii")
data64 = base64.b64encode(data)
- if not isinstance(data64, six.string_types):
+ if not isinstance(data64, str):
data64 = data64.decode("ascii")
cookie[name] = data64
@@ -979,7 +979,7 @@ def staticfile(environ, start_response):
resp = Unauthorized()
return resp(environ, start_response)
start_response("200 OK", [("Content-Type", "text/xml")])
- return open(path, "r").read()
+ return open(path).read()
except Exception as ex:
logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -1110,7 +1110,7 @@ if __name__ == "__main__":
SRV.ssl_adapter = BuiltinSSLAdapter(CONFIG.SERVER_CERT, CONFIG.SERVER_KEY, CONFIG.CERT_CHAIN)
logger.info("Server starting")
- print("IDP listening on %s:%s%s" % (HOST, PORT, _https))
+ print(f"IDP listening on {HOST}:{PORT}{_https}")
try:
SRV.start()
except KeyboardInterrupt:
diff --git a/example/idp2/idp_uwsgi.py b/example/idp2/idp_uwsgi.py
index 89f3bade..442cbae4 100755
--- a/example/idp2/idp_uwsgi.py
+++ b/example/idp2/idp_uwsgi.py
@@ -47,7 +47,7 @@ from saml2.sigver import verify_redirect_signature
logger = logging.getLogger("saml2.idp")
-class Cache(object):
+class Cache:
def __init__(self):
self.user2uid = {}
self.uid2user = {}
@@ -83,7 +83,7 @@ def dict2list_of_tuples(d):
# -----------------------------------------------------------------------------
-class Service(object):
+class Service:
def __init__(self, environ, start_response, user=None):
self.environ = environ
logger.debug("ENVIRON: %s", environ)
@@ -93,7 +93,7 @@ class Service(object):
def unpack_redirect(self):
if "QUERY_STRING" in self.environ:
_qs = self.environ["QUERY_STRING"]
- return dict([(k, v[0]) for k, v in parse_qs(_qs).items()])
+ return {k: v[0] for k, v in parse_qs(_qs).items()}
else:
return None
@@ -101,7 +101,7 @@ class Service(object):
_dict = parse_qs(get_post(self.environ))
logger.debug("unpack_post:: %s", _dict)
try:
- return dict([(k, v[0]) for k, v in _dict.items()])
+ return {k: v[0] for k, v in _dict.items()}
except Exception:
return None
@@ -276,11 +276,11 @@ class SSO(Service):
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
- resp = ServiceError("UnknownPrincipal: %s" % (excp,))
+ resp = ServiceError(f"UnknownPrincipal: {excp}")
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s", excp)
- resp = ServiceError("UnsupportedBinding: %s" % (excp,))
+ resp = ServiceError(f"UnsupportedBinding: {excp}")
return resp(self.environ, self.start_response)
if not _resp:
@@ -301,7 +301,7 @@ class SSO(Service):
_resp = IDP.create_authn_response(identity, userid=self.user, encrypt_cert=encrypt_cert, **resp_args)
except Exception as excp:
logging.error(exception_trace(excp))
- resp = ServiceError("Exception: %s" % (excp,))
+ resp = ServiceError(f"Exception: {excp}")
return resp(self.environ, self.start_response)
logger.info("AuthNResponse: %s", _resp)
@@ -511,7 +511,7 @@ def do_verify(environ, start_response, _):
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
- lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0])
+ lox = "{}?id={}&key={}".format(query["redirect_uri"][0], uid, query["key"][0])
logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
@@ -861,7 +861,7 @@ def staticfile(environ, start_response):
resp = Unauthorized()
return resp(environ, start_response)
start_response("200 OK", [("Content-Type", "text/xml")])
- return open(path, "r").read()
+ return open(path).read()
except Exception as ex:
logger.error("An error occured while creating metadata:", ex.message)
return not_found(environ, start_response)
@@ -985,7 +985,7 @@ if __name__ == "__main__":
PORT = CONFIG.PORT
SRV = make_server(HOST, PORT, application)
- print("IdP listening on %s:%s" % (HOST, PORT))
+ print(f"IdP listening on {HOST}:{PORT}")
SRV.serve_forever()
else:
_rot = args.mako_root
diff --git a/example/idp2_repoze/idp.py b/example/idp2_repoze/idp.py
index aae2a3f1..9be36b71 100755
--- a/example/idp2_repoze/idp.py
+++ b/example/idp2_repoze/idp.py
@@ -44,7 +44,7 @@ from saml2.sigver import verify_redirect_signature
logger = logging.getLogger("saml2.idp")
-class Cache(object):
+class Cache:
def __init__(self):
self.user2uid = {}
self.uid2user = {}
@@ -80,7 +80,7 @@ def dict2list_of_tuples(d):
# -----------------------------------------------------------------------------
-class Service(object):
+class Service:
def __init__(self, environ, start_response, user=None):
self.environ = environ
logger.debug("ENVIRON: %s", environ)
@@ -90,7 +90,7 @@ class Service(object):
def unpack_redirect(self):
if "QUERY_STRING" in self.environ:
_qs = self.environ["QUERY_STRING"]
- return dict([(k, v[0]) for k, v in parse_qs(_qs).items()])
+ return {k: v[0] for k, v in parse_qs(_qs).items()}
else:
return None
@@ -98,7 +98,7 @@ class Service(object):
_dict = parse_qs(get_post(self.environ))
logger.debug("unpack_post:: %s", _dict)
try:
- return dict([(k, v[0]) for k, v in _dict.items()])
+ return {k: v[0] for k, v in _dict.items()}
except Exception:
return None
@@ -277,11 +277,11 @@ class SSO(Service):
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
- resp = ServiceError("UnknownPrincipal: %s" % (excp,))
+ resp = ServiceError(f"UnknownPrincipal: {excp}")
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s", excp)
- resp = ServiceError("UnsupportedBinding: %s" % (excp,))
+ resp = ServiceError(f"UnsupportedBinding: {excp}")
return resp(self.environ, self.start_response)
if not _resp:
@@ -301,11 +301,11 @@ class SSO(Service):
authn=AUTHN_BROKER[self.environ["idp.authn_ref"]],
sign_assertion=sign_assertion,
sign_response=False,
- **resp_args
+ **resp_args,
)
except Exception as excp:
logging.error(exception_trace(excp))
- resp = ServiceError("Exception: %s" % (excp,))
+ resp = ServiceError(f"Exception: {excp}")
return resp(self.environ, self.start_response)
logger.info("AuthNResponse: %s", _resp)
@@ -505,7 +505,7 @@ def do_verify(environ, start_response, _):
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
- lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid, query["key"][0])
+ lox = "{}?id={}&key={}".format(query["redirect_uri"][0], uid, query["key"][0])
logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
@@ -851,7 +851,7 @@ def staticfile(environ, start_response):
path += "/"
path += environ.get("PATH_INFO", "").lstrip("/")
start_response("200 OK", [("Content-Type", "text/xml")])
- return open(path, "r").read()
+ return open(path).read()
except Exception as ex:
logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -972,7 +972,7 @@ if __name__ == "__main__":
PORT = 8088
SRV = make_server(HOST, PORT, application)
- print("IdP listening on %s:%s" % (HOST, PORT))
+ print(f"IdP listening on {HOST}:{PORT}")
SRV.serve_forever()
else:
_rot = args.mako_root
diff --git a/example/idp2_repoze/modules/login.mako.py b/example/idp2_repoze/modules/login.mako.py
index 4603f6fb..abc10236 100644
--- a/example/idp2_repoze/modules/login.mako.py
+++ b/example/idp2_repoze/modules/login.mako.py
@@ -1,4 +1,3 @@
-# -*- encoding:utf-8 -*-
from mako import runtime, filters, cache
UNDEFINED = runtime.UNDEFINED
diff --git a/example/idp2_repoze/modules/root.mako.py b/example/idp2_repoze/modules/root.mako.py
index 9b21c4b3..16eceec6 100644
--- a/example/idp2_repoze/modules/root.mako.py
+++ b/example/idp2_repoze/modules/root.mako.py
@@ -1,4 +1,3 @@
-# -*- encoding:utf-8 -*-
from mako import runtime, filters, cache
UNDEFINED = runtime.UNDEFINED
diff --git a/example/sp-repoze/pki/certgeneration.py b/example/sp-repoze/pki/certgeneration.py
index 4b61feed..84061c38 100644
--- a/example/sp-repoze/pki/certgeneration.py
+++ b/example/sp-repoze/pki/certgeneration.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python
-# -*- coding: utf-8 -*-
from saml2.cert import OpenSSLWrapper
diff --git a/example/sp-repoze/sp.py b/example/sp-repoze/sp.py
index e3a59a89..7fd1dde5 100755
--- a/example/sp-repoze/sp.py
+++ b/example/sp-repoze/sp.py
@@ -297,5 +297,5 @@ if __name__ == "__main__":
from wsgiref.simple_server import make_server
srv = make_server(HOST, PORT, app_with_auth)
- print("SP listening on %s:%s" % (HOST, PORT))
+ print(f"SP listening on {HOST}:{PORT}")
srv.serve_forever()
diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py
index 9687d581..0486a787 100755
--- a/example/sp-wsgi/sp.py
+++ b/example/sp-wsgi/sp.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python
-from __future__ import print_function
import argparse
@@ -9,16 +8,16 @@ try:
except:
import cgi as html
+from http.cookies import SimpleCookie
import importlib
import logging
import os
import re
import sys
+from urllib.parse import parse_qs
import xml.dom.minidom
import six
-from six.moves.http_cookies import SimpleCookie
-from six.moves.urllib.parse import parse_qs
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_HTTP_POST
@@ -78,7 +77,7 @@ def dict_to_table(ava, lev=0, width=1):
txt = ['<table border=%s bordercolor="black">\n' % width]
for prop, valarr in ava.items():
txt.append("<tr>\n")
- if isinstance(valarr, six.string_types):
+ if isinstance(valarr, str):
txt.append("<th>%s</th>\n" % str(prop))
txt.append("<td>%s</td>\n" % valarr)
elif isinstance(valarr, list):
@@ -135,12 +134,12 @@ def handle_static(environ, start_response, path):
resp = Response(data, headers=[("Content-Type", "image/png")])
else:
resp = Response(data)
- except IOError:
+ except OSError:
resp = NotFound()
return resp(environ, start_response)
-class ECPResponse(object):
+class ECPResponse:
code = 200
title = "OK"
@@ -149,7 +148,7 @@ class ECPResponse(object):
# noinspection PyUnusedLocal
def __call__(self, environ, start_response):
- start_response("%s %s" % (self.code, self.title), [("Content-Type", "text/xml")])
+ start_response(f"{self.code} {self.title}", [("Content-Type", "text/xml")])
return [self.content]
@@ -165,7 +164,7 @@ def _expiration(timeout, tformat=None):
return time_util.in_a_while(minutes=timeout, format=tformat)
-class Cache(object):
+class Cache:
def __init__(self):
self.uid2user = {}
self.cookie_name = "spauthn"
@@ -222,7 +221,7 @@ class Cache(object):
# -----------------------------------------------------------------------------
-class Service(object):
+class Service:
def __init__(self, environ, start_response, user=None):
self.environ = environ
logger.debug("ENVIRON: %s", environ)
@@ -233,14 +232,14 @@ class Service(object):
def unpack_redirect(self):
if "QUERY_STRING" in self.environ:
_qs = self.environ["QUERY_STRING"]
- return dict([(k, v[0]) for k, v in parse_qs(_qs).items()])
+ return {k: v[0] for k, v in parse_qs(_qs).items()}
else:
return None
def unpack_post(self):
_dict = parse_qs(get_post(self.environ).decode("utf8"))
logger.debug("unpack_post:: %s", _dict)
- return dict([(k, v[0]) for k, v in _dict.items()])
+ return {k: v[0] for k, v in _dict.items()}
def unpack_soap(self):
try:
@@ -333,7 +332,7 @@ class Service(object):
# -----------------------------------------------------------------------------
-class User(object):
+class User:
def __init__(self, name_id, data, saml_response):
self.name_id = name_id
self.data = data
@@ -382,20 +381,20 @@ class ACS(Service):
)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s", excp)
- resp = ServiceError("UnknownPrincipal: %s" % (excp,))
+ resp = ServiceError(f"UnknownPrincipal: {excp}")
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s", excp)
- resp = ServiceError("UnsupportedBinding: %s" % (excp,))
+ resp = ServiceError(f"UnsupportedBinding: {excp}")
return resp(self.environ, self.start_response)
except VerificationError as err:
- resp = ServiceError("Verification error: %s" % (err,))
+ resp = ServiceError(f"Verification error: {err}")
return resp(self.environ, self.start_response)
except SignatureError as err:
- resp = ServiceError("Signature error: %s" % (err,))
+ resp = ServiceError(f"Signature error: {err}")
return resp(self.environ, self.start_response)
except Exception as err:
- resp = ServiceError("Other error: %s" % (err,))
+ resp = ServiceError(f"Other error: {err}")
return resp(self.environ, self.start_response)
logger.info("AVA: %s", self.response.ava)
@@ -431,7 +430,7 @@ class ACS(Service):
# -----------------------------------------------------------------------------
-class SSO(object):
+class SSO:
def __init__(
self,
sp,
@@ -481,7 +480,7 @@ class SSO(object):
sid_ = sid()
self.cache.outstanding_queries[sid_] = came_from
logger.debug("Redirect to WAYF function: %s", self.wayf)
- return -1, SeeOther(headers=[("Location", "%s?%s" % (self.wayf, sid_))])
+ return -1, SeeOther(headers=[("Location", f"{self.wayf}?{sid_}")])
def _pick_idp(self, came_from):
"""
@@ -688,7 +687,7 @@ def main(environ, start_response, sp):
return sso.do()
body = dict_to_table(user.data)
- body.append("<br><pre>{authn_stmt}</pre>".format(authn_stmt=_html_escape(user.authn_statement)))
+ body.append(f"<br><pre>{_html_escape(user.authn_statement)}</pre>")
body.append("<br><a href='/logout'>logout</a>")
resp = Response(body)
@@ -862,7 +861,7 @@ def application(environ, start_response):
return resp(environ, start_response)
-class ToBytesMiddleware(object):
+class ToBytesMiddleware:
"""Converts a message to bytes to be sent by WSGI server."""
def __init__(self, app):
@@ -960,7 +959,7 @@ if __name__ == "__main__":
SRV.ssl_adapter = pyopenssl.pyOpenSSLAdapter(SERVER_CERT, SERVER_KEY, CERT_CHAIN)
_https = " using SSL/TLS"
logger.info("Server starting")
- print("SP listening on %s:%s%s" % (HOST, PORT, _https))
+ print(f"SP listening on {HOST}:{PORT}{_https}")
try:
SRV.start()
except KeyboardInterrupt: