summaryrefslogtreecommitdiff
path: root/example
diff options
context:
space:
mode:
authorHans Hörberg <hans.horberg@umu.se>2015-11-06 13:01:21 +0100
committerHans Hörberg <hans.horberg@umu.se>2015-11-06 13:01:21 +0100
commitbc93176fa6b4f15090a7d2d335727d60c6ffe2c3 (patch)
tree5f5fcc7252eec5520777495c3fc2da45960c967e /example
parent8c901a8f84748a2a2e273a729b16077d8dbcd606 (diff)
parent31f8ce0e14894252dc9c5b538e1b208ed97233b5 (diff)
downloadpysaml2-bc93176fa6b4f15090a7d2d335727d60c6ffe2c3.tar.gz
Merge remote-tracking branch 'upstream/master'
# Conflicts: # src/saml2/entity.py digest algorithm added to the same functions as sign alg.
Diffstat (limited to 'example')
-rwxr-xr-xexample/idp2/idp.py93
-rwxr-xr-xexample/idp2/idp_uwsgi.py86
-rwxr-xr-xexample/idp2_repoze/idp.py132
-rwxr-xr-xexample/sp-repoze/sp.py32
-rwxr-xr-xexample/sp-wsgi/sp.py67
5 files changed, 206 insertions, 204 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py
index 4d67a8d9..b86e2990 100755
--- a/example/idp2/idp.py
+++ b/example/idp2/idp.py
@@ -5,9 +5,7 @@ import importlib
import logging
import os
import re
-import socket
import time
-import ssl
from Cookie import SimpleCookie
from hashlib import sha1
@@ -92,7 +90,7 @@ def dict2list_of_tuples(d):
class Service(object):
def __init__(self, environ, start_response, user=None):
self.environ = environ
- logger.debug("ENVIRON: %s" % environ)
+ logger.debug("ENVIRON: %s", environ)
self.start_response = start_response
self.user = user
@@ -105,7 +103,7 @@ class Service(object):
def unpack_post(self):
_dict = parse_qs(get_post(self.environ))
- logger.debug("unpack_post:: %s" % _dict)
+ logger.debug("unpack_post:: %s", _dict)
try:
return dict([(k, v[0]) for k, v in _dict.items()])
except Exception:
@@ -125,11 +123,11 @@ class Service(object):
_dict = self.unpack_post()
else:
_dict = None
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return _dict
def operation(self, saml_msg, binding):
- logger.debug("_operation: %s" % saml_msg)
+ logger.debug("_operation: %s", saml_msg)
if not (saml_msg and 'SAMLRequest' in saml_msg):
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -211,7 +209,7 @@ class Service(object):
"""
logger.debug("- SOAP -")
_dict = self.unpack_soap()
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return self.operation(_dict, BINDING_SOAP)
def uri(self):
@@ -274,7 +272,7 @@ class SSO(Service):
logger.info("parsed OK")
_authn_req = self.req_info.message
- logger.debug("%s" % _authn_req)
+ logger.debug("%s", _authn_req)
try:
self.binding_out, self.destination = IDP.pick_binding(
@@ -282,11 +280,11 @@ class SSO(Service):
bindings=self.response_bindings,
entity_id=_authn_req.issuer.text, request=_authn_req)
except Exception as err:
- logger.error("Couldn't find receiver endpoint: %s" % err)
+ logger.error("Couldn't find receiver endpoint: %s", err)
raise
- logger.debug("Binding: %s, destination: %s" % (self.binding_out,
- self.destination))
+ logger.debug("Binding: %s, destination: %s", self.binding_out,
+ self.destination)
resp_args = {}
try:
@@ -314,18 +312,18 @@ class SSO(Service):
try:
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
- logger.error("UnknownPrincipal: %s" % (excp,))
+ logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
- logger.error("UnsupportedBinding: %s" % (excp,))
+ logger.error("UnsupportedBinding: %s", excp)
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
if not _resp:
identity = USERS[self.user].copy()
# identity["eduPersonTargetedID"] = get_eptid(IDP, query, session)
- logger.info("Identity: %s" % (identity,))
+ logger.info("Identity: %s", identity)
if REPOZE_ID_EQUIVALENT:
identity[REPOZE_ID_EQUIVALENT] = self.user
@@ -346,7 +344,7 @@ class SSO(Service):
resp = ServiceError("Exception: %s" % (excp,))
return resp(self.environ, self.start_response)
- logger.info("AuthNResponse: %s" % _resp)
+ logger.info("AuthNResponse: %s", _resp)
if self.op_type == "ecp":
kwargs = {"soap_headers": [
ecp.Response(
@@ -358,12 +356,12 @@ class SSO(Service):
"%s" % _resp, self.destination,
relay_state, response=True, **kwargs)
- logger.debug("HTTPargs: %s" % http_args)
+ logger.debug("HTTPargs: %s", http_args)
return self.response(self.binding_out, http_args)
@staticmethod
def _store_request(saml_msg):
- logger.debug("_store_request: %s" % saml_msg)
+ logger.debug("_store_request: %s", saml_msg)
key = sha1(saml_msg["SAMLRequest"]).hexdigest()
# store the AuthnRequest
IDP.ticket[key] = saml_msg
@@ -509,7 +507,7 @@ def do_authentication(environ, start_response, authn_context, key,
if len(auth_info):
method, reference = auth_info[0]
- logger.debug("Authn chosen: %s (ref=%s)" % (method, reference))
+ logger.debug("Authn chosen: %s (ref=%s)", method, reference)
return method(environ, start_response, reference, key, redirect_uri, headers)
else:
resp = Unauthorized("No usable authentication method")
@@ -547,7 +545,7 @@ def username_password_authn(environ, start_response, reference, key,
"authn_reference": reference,
"redirect_uri": redirect_uri
}
- logger.info("do_authentication argv: %s" % argv)
+ logger.info("do_authentication argv: %s", argv)
return resp(environ, start_response, **argv)
@@ -563,7 +561,7 @@ def verify_username_and_password(dic):
def do_verify(environ, start_response, _):
query = parse_qs(get_post(environ))
- logger.debug("do_verify: %s" % query)
+ logger.debug("do_verify: %s", query)
try:
_ok, user = verify_username_and_password(query)
@@ -577,13 +575,13 @@ def do_verify(environ, start_response, _):
uid = rndstr(24)
IDP.cache.uid2user[uid] = user
IDP.cache.user2uid[user] = uid
- logger.debug("Register %s under '%s'" % (user, uid))
+ logger.debug("Register %s under '%s'", user, uid)
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid,
query["key"][0])
- logger.debug("Redirect => %s" % lox)
+ logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
return resp(environ, start_response)
@@ -611,17 +609,17 @@ class SLO(Service):
logger.info("--- Single Log Out Service ---")
try:
- logger.debug("req: '%s'" % request)
+ logger.debug("req: '%s'", request)
req_info = IDP.parse_logout_request(request, binding)
except Exception as exc:
- logger.error("Bad request: %s" % exc)
+ logger.error("Bad request: %s", exc)
resp = BadRequest("%s" % exc)
return resp(self.environ, self.start_response)
msg = req_info.message
if msg.name_id:
lid = IDP.ident.find_local_id(msg.name_id)
- logger.info("local identifier: %s" % lid)
+ logger.info("local identifier: %s", lid)
if lid in IDP.cache.user2uid:
uid = IDP.cache.user2uid[lid]
if uid in IDP.cache.uid2user:
@@ -631,8 +629,8 @@ class SLO(Service):
try:
IDP.session_db.remove_authn_statements(msg.name_id)
except KeyError as exc:
- logger.error("Unknown session: %s" % exc)
- resp = ServiceError("Unknown session: %s" % exc)
+ logger.error("Unknown session: %s", exc)
+ resp = ServiceError("Unknown session: %s", exc)
return resp(self.environ, self.start_response)
resp = IDP.create_logout_response(msg, [binding])
@@ -650,7 +648,7 @@ class SLO(Service):
hinfo = IDP.apply_binding(binding, "%s" % resp, destination,
relay_state, response=response)
except Exception as exc:
- logger.error("ServiceError: %s" % exc)
+ logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
@@ -658,7 +656,7 @@ class SLO(Service):
delco = delete_cookie(self.environ, "idpauthn")
if delco:
hinfo["headers"].append(delco)
- logger.info("Header: %s" % (hinfo["headers"],))
+ logger.info("Header: %s", (hinfo["headers"],))
if binding == BINDING_HTTP_REDIRECT:
for key, value in hinfo['headers']:
@@ -689,7 +687,7 @@ class NMI(Service):
request.name_id, request.new_id, request.new_encrypted_id,
request.terminate)
- logger.debug("New NameID: %s" % name_id)
+ logger.debug("New NameID: %s", name_id)
_resp = IDP.create_manage_name_id_response(request)
@@ -719,12 +717,12 @@ class AIDR(Service):
hinfo = IDP.apply_binding(BINDING_URI, "%s" % assertion, response=True)
- logger.debug("HINFO: %s" % hinfo)
+ logger.debug("HINFO: %s", hinfo)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
def operation(self, _dict, binding, **kwargs):
- logger.debug("_operation: %s" % _dict)
+ logger.debug("_operation: %s", _dict)
if not _dict or "ID" not in _dict:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -765,7 +763,7 @@ class AQS(Service):
_query.requested_authn_context,
_query.session_index)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -788,7 +786,7 @@ class ATTR(Service):
name_id = _query.subject.name_id
uid = name_id.text
- logger.debug("Local uid: %s" % uid)
+ logger.debug("Local uid: %s", uid)
identity = EXTRA[uid]
# Comes in over SOAP so only need to construct the response
@@ -796,7 +794,7 @@ class ATTR(Service):
msg = IDP.create_attribute_response(identity,
name_id=name_id, **args)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -843,7 +841,7 @@ class NIM(Service):
# Cookie handling
# ----------------------------------------------------------------------------
def info_from_cookie(kaka):
- logger.debug("KAKA: %s" % kaka)
+ logger.debug("KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get("idpauthn", None)
@@ -860,14 +858,14 @@ def info_from_cookie(kaka):
def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '')
- logger.debug("delete KAKA: %s" % kaka)
+ logger.debug("delete KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = ""
cookie[name]['path'] = "/"
- logger.debug("Expire: %s" % morsel)
+ logger.debug("Expire: %s", morsel)
cookie[name]["expires"] = _expiration("dawn")
return tuple(cookie.output().split(": ", 1))
return None
@@ -878,7 +876,7 @@ def set_cookie(name, _, *args):
cookie[name] = base64.b64encode(":".join(args))
cookie[name]['path'] = "/"
cookie[name]["expires"] = _expiration(5) # 5 minutes from now
- logger.debug("Cookie expires: %s" % cookie[name]["expires"])
+ logger.debug("Cookie expires: %s", cookie[name]["expires"])
return tuple(cookie.output().split(": ", 1))
# ----------------------------------------------------------------------------
@@ -941,7 +939,7 @@ def metadata(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -960,7 +958,7 @@ def staticfile(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return open(path, 'r').read()
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -972,9 +970,9 @@ def application(environ, start_response):
the functions from above can access the url placeholders.
If nothing matches, call the `not_found` function.
-
+
:param environ: The HTTP application environment
- :param start_response: The application to run when the handling of the
+ :param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
@@ -985,7 +983,7 @@ def application(environ, start_response):
return metadata(environ, start_response)
kaka = environ.get("HTTP_COOKIE", None)
- logger.info("<application> PATH: %s" % path)
+ logger.info("<application> PATH: %s", path)
if kaka:
logger.info("= KAKA =")
@@ -995,7 +993,7 @@ def application(environ, start_response):
else:
try:
query = parse_qs(environ["QUERY_STRING"])
- logger.debug("QUERY: %s" % query)
+ logger.debug("QUERY: %s", query)
user = IDP.cache.uid2user[query["id"][0]]
except KeyError:
user = None
@@ -1014,7 +1012,7 @@ def application(environ, start_response):
except IndexError:
environ['myapp.url_args'] = path
- logger.debug("Callback: %s" % (callback,))
+ logger.debug("Callback: %s", callback)
if isinstance(callback, tuple):
cls = callback[0](environ, start_response, user)
func = getattr(cls, callback[1])
@@ -1085,7 +1083,8 @@ if __name__ == '__main__':
_https = ""
if CONFIG.HTTPS:
SRV.ssl_adapter = ssl_pyopenssl.pyOpenSSLAdapter(CONFIG.SERVER_CERT,
- CONFIG.SERVER_KEY, CONFIG.CERT_CHAIN)
+ CONFIG.SERVER_KEY,
+ CONFIG.CERT_CHAIN)
_https = " using SSL/TLS"
logger.info("Server starting")
print("IDP listening on %s:%s%s" % (HOST, PORT, _https))
diff --git a/example/idp2/idp_uwsgi.py b/example/idp2/idp_uwsgi.py
index dcc01216..30e4b26a 100755
--- a/example/idp2/idp_uwsgi.py
+++ b/example/idp2/idp_uwsgi.py
@@ -88,7 +88,7 @@ def dict2list_of_tuples(d):
class Service(object):
def __init__(self, environ, start_response, user=None):
self.environ = environ
- logger.debug("ENVIRON: %s" % environ)
+ logger.debug("ENVIRON: %s", environ)
self.start_response = start_response
self.user = user
@@ -101,7 +101,7 @@ class Service(object):
def unpack_post(self):
_dict = parse_qs(get_post(self.environ))
- logger.debug("unpack_post:: %s" % _dict)
+ logger.debug("unpack_post:: %s", _dict)
try:
return dict([(k, v[0]) for k, v in _dict.items()])
except Exception:
@@ -121,11 +121,11 @@ class Service(object):
_dict = self.unpack_post()
else:
_dict = None
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return _dict
def operation(self, saml_msg, binding):
- logger.debug("_operation: %s" % saml_msg)
+ logger.debug("_operation: %s", saml_msg)
if not saml_msg or not 'SAMLRequest' in saml_msg:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -186,7 +186,7 @@ class Service(object):
"""
logger.debug("- SOAP -")
_dict = self.unpack_soap()
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return self.operation(_dict, BINDING_SOAP)
def uri(self):
@@ -246,7 +246,7 @@ class SSO(Service):
logger.info("parsed OK")
_authn_req = self.req_info.message
- logger.debug("%s" % _authn_req)
+ logger.debug("%s", _authn_req)
try:
self.binding_out, self.destination = IDP.pick_binding(
@@ -254,11 +254,11 @@ class SSO(Service):
bindings=self.response_bindings,
entity_id=_authn_req.issuer.text)
except Exception as err:
- logger.error("Couldn't find receiver endpoint: %s" % err)
+ logger.error("Couldn't find receiver endpoint: %s", err)
raise
- logger.debug("Binding: %s, destination: %s" % (self.binding_out,
- self.destination))
+ logger.debug("Binding: %s, destination: %s", self.binding_out,
+ self.destination)
resp_args = {}
try:
@@ -285,18 +285,18 @@ class SSO(Service):
try:
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
- logger.error("UnknownPrincipal: %s" % (excp,))
+ logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
- logger.error("UnsupportedBinding: %s" % (excp,))
+ logger.error("UnsupportedBinding: %s", excp)
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
if not _resp:
identity = USERS[self.user].copy()
# identity["eduPersonTargetedID"] = get_eptid(IDP, query, session)
- logger.info("Identity: %s" % (identity,))
+ logger.info("Identity: %s", identity)
if REPOZE_ID_EQUIVALENT:
identity[REPOZE_ID_EQUIVALENT] = self.user
@@ -317,7 +317,7 @@ class SSO(Service):
resp = ServiceError("Exception: %s" % (excp,))
return resp(self.environ, self.start_response)
- logger.info("AuthNResponse: %s" % _resp)
+ logger.info("AuthNResponse: %s", _resp)
if self.op_type == "ecp":
kwargs = {"soap_headers": [
ecp.Response(
@@ -329,11 +329,11 @@ class SSO(Service):
"%s" % _resp, self.destination,
relay_state, response=True, **kwargs)
- logger.debug("HTTPargs: %s" % http_args)
+ logger.debug("HTTPargs: %s", http_args)
return self.response(self.binding_out, http_args)
def _store_request(self, saml_msg):
- logger.debug("_store_request: %s" % saml_msg)
+ logger.debug("_store_request: %s", saml_msg)
key = sha1(saml_msg["SAMLRequest"]).hexdigest()
# store the AuthnRequest
IDP.ticket[key] = saml_msg
@@ -468,7 +468,7 @@ def do_authentication(environ, start_response, authn_context, key,
if len(auth_info):
method, reference = auth_info[0]
- logger.debug("Authn chosen: %s (ref=%s)" % (method, reference))
+ logger.debug("Authn chosen: %s (ref=%s)", method, reference)
return method(environ, start_response, reference, key, redirect_uri)
else:
resp = Unauthorized("No usable authentication method")
@@ -504,7 +504,7 @@ def username_password_authn(environ, start_response, reference, key,
"authn_reference": reference,
"redirect_uri": redirect_uri
}
- logger.info("do_authentication argv: %s" % argv)
+ logger.info("do_authentication argv: %s", argv)
return resp(environ, start_response, **argv)
@@ -520,7 +520,7 @@ def verify_username_and_password(dic):
def do_verify(environ, start_response, _):
query = parse_qs(get_post(environ))
- logger.debug("do_verify: %s" % query)
+ logger.debug("do_verify: %s", query)
try:
_ok, user = verify_username_and_password(query)
@@ -534,13 +534,13 @@ def do_verify(environ, start_response, _):
uid = rndstr(24)
IDP.cache.uid2user[uid] = user
IDP.cache.user2uid[user] = uid
- logger.debug("Register %s under '%s'" % (user, uid))
+ logger.debug("Register %s under '%s'", user, uid)
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid,
query["key"][0])
- logger.debug("Redirect => %s" % lox)
+ logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
return resp(environ, start_response)
@@ -568,17 +568,17 @@ class SLO(Service):
logger.info("--- Single Log Out Service ---")
try:
_, body = request.split("\n")
- logger.debug("req: '%s'" % body)
+ logger.debug("req: '%s'", body)
req_info = IDP.parse_logout_request(body, binding)
except Exception as exc:
- logger.error("Bad request: %s" % exc)
+ logger.error("Bad request: %s", exc)
resp = BadRequest("%s" % exc)
return resp(self.environ, self.start_response)
msg = req_info.message
if msg.name_id:
lid = IDP.ident.find_local_id(msg.name_id)
- logger.info("local identifier: %s" % lid)
+ logger.info("local identifier: %s", lid)
if lid in IDP.cache.user2uid:
uid = IDP.cache.user2uid[lid]
if uid in IDP.cache.uid2user:
@@ -588,7 +588,7 @@ class SLO(Service):
try:
IDP.session_db.remove_authn_statements(msg.name_id)
except KeyError as exc:
- logger.error("ServiceError: %s" % exc)
+ logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
@@ -597,7 +597,7 @@ class SLO(Service):
try:
hinfo = IDP.apply_binding(binding, "%s" % resp, "", relay_state)
except Exception as exc:
- logger.error("ServiceError: %s" % exc)
+ logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
@@ -605,7 +605,7 @@ class SLO(Service):
delco = delete_cookie(self.environ, "idpauthn")
if delco:
hinfo["headers"].append(delco)
- logger.info("Header: %s" % (hinfo["headers"],))
+ logger.info("Header: %s", (hinfo["headers"],))
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
@@ -626,7 +626,7 @@ class NMI(Service):
request.name_id, request.new_id, request.new_encrypted_id,
request.terminate)
- logger.debug("New NameID: %s" % name_id)
+ logger.debug("New NameID: %s", name_id)
_resp = IDP.create_manage_name_id_response(request)
@@ -656,12 +656,12 @@ class AIDR(Service):
hinfo = IDP.apply_binding(BINDING_URI, "%s" % assertion, response=True)
- logger.debug("HINFO: %s" % hinfo)
+ logger.debug("HINFO: %s", hinfo)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
def operation(self, _dict, binding, **kwargs):
- logger.debug("_operation: %s" % _dict)
+ logger.debug("_operation: %s", _dict)
if not _dict or "ID" not in _dict:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -702,7 +702,7 @@ class AQS(Service):
_query.requested_authn_context,
_query.session_index)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -725,7 +725,7 @@ class ATTR(Service):
name_id = _query.subject.name_id
uid = name_id.text
- logger.debug("Local uid: %s" % uid)
+ logger.debug("Local uid: %s", uid)
identity = EXTRA[self.user]
# Comes in over SOAP so only need to construct the response
@@ -733,7 +733,7 @@ class ATTR(Service):
msg = IDP.create_attribute_response(identity,
name_id=name_id, **args)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -780,7 +780,7 @@ class NIM(Service):
# Cookie handling
# ----------------------------------------------------------------------------
def info_from_cookie(kaka):
- logger.debug("KAKA: %s" % kaka)
+ logger.debug("KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get("idpauthn", None)
@@ -797,14 +797,14 @@ def info_from_cookie(kaka):
def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '')
- logger.debug("delete KAKA: %s" % kaka)
+ logger.debug("delete KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = ""
cookie[name]['path'] = "/"
- logger.debug("Expire: %s" % morsel)
+ logger.debug("Expire: %s", morsel)
cookie[name]["expires"] = _expiration("dawn")
return tuple(cookie.output().split(": ", 1))
return None
@@ -815,7 +815,7 @@ def set_cookie(name, _, *args):
cookie[name] = base64.b64encode(":".join(args))
cookie[name]['path'] = "/"
cookie[name]["expires"] = _expiration(5) # 5 minutes from now
- logger.debug("Cookie expires: %s" % cookie[name]["expires"])
+ logger.debug("Cookie expires: %s", cookie[name]["expires"])
return tuple(cookie.output().split(": ", 1))
# ----------------------------------------------------------------------------
@@ -878,7 +878,7 @@ def metadata(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata:", ex.message)
return not_found(environ, start_response)
@@ -897,7 +897,7 @@ def staticfile(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return open(path, 'r').read()
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata:", ex.message)
return not_found(environ, start_response)
@@ -909,9 +909,9 @@ def application(environ, start_response):
the functions from above can access the url placeholders.
If nothing matches, call the `not_found` function.
-
+
:param environ: The HTTP application environment
- :param start_response: The application to run when the handling of the
+ :param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
@@ -922,7 +922,7 @@ def application(environ, start_response):
return metadata(environ, start_response)
kaka = environ.get("HTTP_COOKIE", None)
- logger.info("<application> PATH: %s" % path)
+ logger.info("<application> PATH: %s", path)
if kaka:
logger.info("= KAKA =")
@@ -932,7 +932,7 @@ def application(environ, start_response):
else:
try:
query = parse_qs(environ["QUERY_STRING"])
- logger.debug("QUERY: %s" % query)
+ logger.debug("QUERY: %s", query)
user = IDP.cache.uid2user[query["id"][0]]
except KeyError:
user = None
@@ -951,7 +951,7 @@ def application(environ, start_response):
except IndexError:
environ['myapp.url_args'] = path
- logger.debug("Callback: %s" % (callback,))
+ logger.debug("Callback: %s", callback)
if isinstance(callback, tuple):
cls = callback[0](environ, start_response, user)
func = getattr(cls, callback[1])
diff --git a/example/idp2_repoze/idp.py b/example/idp2_repoze/idp.py
index cd6b486d..9512fca0 100755
--- a/example/idp2_repoze/idp.py
+++ b/example/idp2_repoze/idp.py
@@ -83,7 +83,7 @@ def dict2list_of_tuples(d):
class Service(object):
def __init__(self, environ, start_response, user=None):
self.environ = environ
- logger.debug("ENVIRON: %s" % environ)
+ logger.debug("ENVIRON: %s", environ)
self.start_response = start_response
self.user = user
@@ -93,22 +93,22 @@ class Service(object):
return dict([(k, v[0]) for k, v in parse_qs(_qs).items()])
else:
return None
-
+
def unpack_post(self):
_dict = parse_qs(get_post(self.environ))
- logger.debug("unpack_post:: %s" % _dict)
+ logger.debug("unpack_post:: %s", _dict)
try:
return dict([(k, v[0]) for k, v in _dict.items()])
except Exception:
return None
-
+
def unpack_soap(self):
try:
query = get_post(self.environ)
return {"SAMLRequest": query, "RelayState": ""}
except Exception:
return None
-
+
def unpack_either(self):
if self.environ["REQUEST_METHOD"] == "GET":
_dict = self.unpack_redirect()
@@ -116,11 +116,11 @@ class Service(object):
_dict = self.unpack_post()
else:
_dict = None
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return _dict
def operation(self, _dict, binding):
- logger.debug("_operation: %s" % _dict)
+ logger.debug("_operation: %s", _dict)
if not _dict or not 'SAMLRequest' in _dict:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -178,7 +178,7 @@ class Service(object):
"""
logger.debug("- SOAP -")
_dict = self.unpack_soap()
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return self.operation(_dict, BINDING_SOAP)
def uri(self):
@@ -196,8 +196,8 @@ class Service(object):
# "PATH_INFO"], "key": key})
# headers = [('Content-Type', 'text/plain')]
#
- # logger.debug("location: %s" % loc)
- # logger.debug("headers: %s" % headers)
+ # logger.debug("location: %s", loc)
+ # logger.debug("headers: %s", headers)
#
# resp = Redirect(loc, headers=headers)
#
@@ -255,15 +255,15 @@ class SSO(Service):
logger.info("parsed OK")
_authn_req = self.req_info.message
- logger.debug("%s" % _authn_req)
+ logger.debug("%s", _authn_req)
self.binding_out, self.destination = IDP.pick_binding(
"assertion_consumer_service",
bindings=self.response_bindings,
entity_id=_authn_req.issuer.text)
- logger.debug("Binding: %s, destination: %s" % (self.binding_out,
- self.destination))
+ logger.debug("Binding: %s, destination: %s", self.binding_out,
+ self.destination)
resp_args = {}
try:
@@ -282,18 +282,18 @@ class SSO(Service):
try:
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
- logger.error("UnknownPrincipal: %s" % (excp,))
+ logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
- logger.error("UnsupportedBinding: %s" % (excp,))
+ logger.error("UnsupportedBinding: %s", excp)
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
if not _resp:
identity = USERS[self.user].copy()
#identity["eduPersonTargetedID"] = get_eptid(IDP, query, session)
- logger.info("Identity: %s" % (identity,))
+ logger.info("Identity: %s", identity)
if REPOZE_ID_EQUIVALENT:
identity[REPOZE_ID_EQUIVALENT] = self.user
@@ -310,15 +310,15 @@ class SSO(Service):
resp = ServiceError("Exception: %s" % (excp,))
return resp(self.environ, self.start_response)
- logger.info("AuthNResponse: %s" % _resp)
+ logger.info("AuthNResponse: %s", _resp)
http_args = IDP.apply_binding(self.binding_out,
"%s" % _resp, self.destination,
relay_state, response=True)
- logger.debug("HTTPargs: %s" % http_args)
+ logger.debug("HTTPargs: %s", http_args)
return self.response(self.binding_out, http_args)
def _store_request(self, _dict):
- logger.debug("_store_request: %s" % _dict)
+ logger.debug("_store_request: %s", _dict)
key = sha1(_dict["SAMLRequest"]).hexdigest()
# store the AuthnRequest
IDP.ticket[key] = _dict
@@ -412,7 +412,7 @@ class SSO(Service):
except TypeError:
resp = Unauthorized()
else:
- logger.debug("Authz_info: %s" % _info)
+ logger.debug("Authz_info: %s", _info)
try:
(user, passwd) = _info.split(":")
if is_equal(PASSWD[user], passwd):
@@ -448,7 +448,7 @@ def do_authentication(environ, start_response, authn_context, key,
if len(auth_info):
method, reference = auth_info[0]
- logger.debug("Authn chosen: %s (ref=%s)" % (method, reference))
+ logger.debug("Authn chosen: %s (ref=%s)", method, reference)
return method(environ, start_response, reference, key, redirect_uri)
else:
resp = Unauthorized("No usable authentication method")
@@ -482,7 +482,7 @@ def username_password_authn(environ, start_response, reference, key,
"authn_reference": reference,
"redirect_uri": redirect_uri
}
- logger.info("do_authentication argv: %s" % argv)
+ logger.info("do_authentication argv: %s", argv)
return resp(environ, start_response, **argv)
@@ -498,7 +498,7 @@ def verify_username_and_password(dic):
def do_verify(environ, start_response, _):
query = parse_qs(get_post(environ))
- logger.debug("do_verify: %s" % query)
+ logger.debug("do_verify: %s", query)
try:
_ok, user = verify_username_and_password(query)
@@ -512,13 +512,13 @@ def do_verify(environ, start_response, _):
uid = rndstr(24)
IDP.cache.uid2user[uid] = user
IDP.cache.user2uid[user] = uid
- logger.debug("Register %s under '%s'" % (user, uid))
+ logger.debug("Register %s under '%s'", user, uid)
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid,
query["key"][0])
- logger.debug("Redirect => %s" % lox)
+ logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
return resp(environ, start_response)
@@ -546,17 +546,17 @@ class SLO(Service):
logger.info("--- Single Log Out Service ---")
try:
_, body = request.split("\n")
- logger.debug("req: '%s'" % body)
+ logger.debug("req: '%s'", body)
req_info = IDP.parse_logout_request(body, binding)
except Exception as exc:
- logger.error("Bad request: %s" % exc)
+ logger.error("Bad request: %s", exc)
resp = BadRequest("%s" % exc)
return resp(self.environ, self.start_response)
-
+
msg = req_info.message
if msg.name_id:
lid = IDP.ident.find_local_id(msg.name_id)
- logger.info("local identifier: %s" % lid)
+ logger.info("local identifier: %s", lid)
if lid in IDP.cache.user2uid:
uid = IDP.cache.user2uid[lid]
if uid in IDP.cache.uid2user:
@@ -566,55 +566,55 @@ class SLO(Service):
try:
IDP.session_db.remove_authn_statements(msg.name_id)
except KeyError as exc:
- logger.error("ServiceError: %s" % exc)
+ logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
-
+
resp = IDP.create_logout_response(msg, [binding])
-
+
try:
hinfo = IDP.apply_binding(binding, "%s" % resp, "", relay_state)
except Exception as exc:
- logger.error("ServiceError: %s" % exc)
+ logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
-
+
#_tlh = dict2list_of_tuples(hinfo["headers"])
delco = delete_cookie(self.environ, "idpauthn")
if delco:
hinfo["headers"].append(delco)
- logger.info("Header: %s" % (hinfo["headers"],))
+ logger.info("Header: %s", hinfo["headers"])
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
-
+
# ----------------------------------------------------------------------------
# Manage Name ID service
# ----------------------------------------------------------------------------
class NMI(Service):
-
+
def do(self, query, binding, relay_state=""):
logger.info("--- Manage Name ID Service ---")
req = IDP.parse_manage_name_id_request(query, binding)
request = req.message
-
+
# Do the necessary stuff
name_id = IDP.ident.handle_manage_name_id_request(
request.name_id, request.new_id, request.new_encrypted_id,
request.terminate)
-
- logger.debug("New NameID: %s" % name_id)
-
+
+ logger.debug("New NameID: %s", name_id)
+
_resp = IDP.create_manage_name_id_response(request)
-
+
# It's using SOAP binding
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "",
relay_state, response=True)
-
+
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
-
+
# ----------------------------------------------------------------------------
# === Assertion ID request ===
# ----------------------------------------------------------------------------
@@ -630,15 +630,15 @@ class AIDR(Service):
except Unknown:
resp = NotFound(aid)
return resp(self.environ, self.start_response)
-
+
hinfo = IDP.apply_binding(BINDING_URI, "%s" % assertion, response=True)
-
- logger.debug("HINFO: %s" % hinfo)
+
+ logger.debug("HINFO: %s", hinfo)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
def operation(self, _dict, binding, **kwargs):
- logger.debug("_operation: %s" % _dict)
+ logger.debug("_operation: %s", _dict)
if not _dict or "ID" not in _dict:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -678,7 +678,7 @@ class AQS(Service):
_query.requested_authn_context,
_query.session_index)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -701,7 +701,7 @@ class ATTR(Service):
name_id = _query.subject.name_id
uid = name_id.text
- logger.debug("Local uid: %s" % uid)
+ logger.debug("Local uid: %s", uid)
identity = EXTRA[uid]
# Comes in over SOAP so only need to construct the response
@@ -709,7 +709,7 @@ class ATTR(Service):
msg = IDP.create_attribute_response(identity,
name_id=name_id, **args)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -739,23 +739,23 @@ class NIM(Service):
except PolicyError:
resp = BadRequest("Unknown entity")
return resp(self.environ, self.start_response)
-
+
info = IDP.response_args(request)
_resp = IDP.create_name_id_mapping_response(name_id, **info)
-
+
# Only SOAP
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % _resp, "", "",
response=True)
-
+
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
-
+
# ----------------------------------------------------------------------------
# Cookie handling
# ----------------------------------------------------------------------------
def info_from_cookie(kaka):
- logger.debug("KAKA: %s" % kaka)
+ logger.debug("KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get("idpauthn", None)
@@ -772,14 +772,14 @@ def info_from_cookie(kaka):
def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '')
- logger.debug("delete KAKA: %s" % kaka)
+ logger.debug("delete KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = ""
cookie[name]['path'] = "/"
- logger.debug("Expire: %s" % morsel)
+ logger.debug("Expire: %s", morsel)
cookie[name]["expires"] = _expiration("dawn")
return tuple(cookie.output().split(": ", 1))
return None
@@ -790,7 +790,7 @@ def set_cookie(name, _, *args):
cookie[name] = base64.b64encode(":".join(args))
cookie[name]['path'] = "/"
cookie[name]["expires"] = _expiration(5) # 5 minutes from now
- logger.debug("Cookie expires: %s" % cookie[name]["expires"])
+ logger.debug("Cookie expires: %s", cookie[name]["expires"])
return tuple(cookie.output().split(": ", 1))
# ----------------------------------------------------------------------------
@@ -853,7 +853,7 @@ def metadata(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
def staticfile(environ, start_response):
@@ -867,7 +867,7 @@ def staticfile(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return open(path, 'r').read()
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
def application(environ, start_response):
@@ -878,9 +878,9 @@ def application(environ, start_response):
the functions from above can access the url placeholders.
If nothing matches, call the `not_found` function.
-
+
:param environ: The HTTP application environment
- :param start_response: The application to run when the handling of the
+ :param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
@@ -891,7 +891,7 @@ def application(environ, start_response):
return metadata(environ, start_response)
kaka = environ.get("HTTP_COOKIE", None)
- logger.info("<application> PATH: %s" % path)
+ logger.info("<application> PATH: %s", path)
if kaka:
logger.info("= KAKA =")
@@ -900,7 +900,7 @@ def application(environ, start_response):
else:
try:
query = parse_qs(environ["QUERY_STRING"])
- logger.debug("QUERY: %s" % query)
+ logger.debug("QUERY: %s", query)
user = IDP.cache.uid2user[query["id"][0]]
except KeyError:
user = None
@@ -919,7 +919,7 @@ def application(environ, start_response):
except IndexError:
environ['myapp.url_args'] = path
- logger.debug("Callback: %s" % (callback,))
+ logger.debug("Callback: %s", (callback,))
if isinstance(callback, tuple):
cls = callback[0](environ, start_response, user)
func = getattr(cls, callback[1])
diff --git a/example/sp-repoze/sp.py b/example/sp-repoze/sp.py
index 49e38dc6..b6539e03 100755
--- a/example/sp-repoze/sp.py
+++ b/example/sp-repoze/sp.py
@@ -37,7 +37,7 @@ def dict_to_table(ava, lev=0, width=1):
txt.append("<td>%s</td>\n" % valarr)
elif isinstance(valarr, list):
i = 0
- n = len(valarr)
+ n = len(valarr)
for val in valarr:
if not i:
txt.append("<th rowspan=%d>%s</td>\n" % (len(valarr), prop))
@@ -105,7 +105,7 @@ def whoami(environ, start_response, user):
response.extend("<a href='logout'>Logout</a>")
resp = Response(response)
return resp(environ, start_response)
-
+
#noinspection PyUnusedLocal
def not_found(environ, start_response):
@@ -128,7 +128,7 @@ def slo(environ, start_response, user):
if "QUERY_STRING" in environ:
query = parse_qs(environ["QUERY_STRING"])
- logger.info("query: %s" % query)
+ logger.info("query: %s", query)
try:
response = sc.parse_logout_request_response(
query["SAMLResponse"][0], binding=BINDING_HTTP_REDIRECT)
@@ -147,19 +147,19 @@ def slo(environ, start_response, user):
headers.append(delco)
resp = Redirect("/done", headers=headers)
return resp(environ, start_response)
-
+
#noinspection PyUnusedLocal
def logout(environ, start_response, user):
# This is where it starts when a user wants to log out
client = environ['repoze.who.plugins']["saml2auth"]
subject_id = environ["repoze.who.identity"]['repoze.who.userid']
- logger.info("[logout] subject_id: '%s'" % (subject_id,))
+ logger.info("[logout] subject_id: '%s'", subject_id)
target = "/done"
# What if more than one
_dict = client.saml_client.global_logout(subject_id)
- logger.info("[logout] global_logout > %s" % (_dict,))
+ logger.info("[logout] global_logout > %s", _dict)
rem = environ['repoze.who.plugins'][client.rememberer_name]
rem.forget(environ, subject_id)
@@ -180,15 +180,15 @@ def logout(environ, start_response, user):
#noinspection PyUnusedLocal
def done(environ, start_response, user):
# remove cookie and stored info
- logger.info("[done] environ: %s" % environ)
+ logger.info("[done] environ: %s", environ)
subject_id = environ["repoze.who.identity"]['repoze.who.userid']
client = environ['repoze.who.plugins']["saml2auth"]
- logger.info("[logout done] remaining subjects: %s" % (
- client.saml_client.users.subjects(),))
+ logger.info("[logout done] remaining subjects: %s",
+ client.saml_client.users.subjects())
start_response('200 OK', [('Content-Type', 'text/html')])
return ["<h3>You are now logged out from this service</h3>"]
-
+
# ----------------------------------------------------------------------------
# map urls to functions
@@ -215,7 +215,7 @@ def metadata(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
def application(environ, start_response):
@@ -226,14 +226,14 @@ def application(environ, start_response):
the functions from above can access the url placeholders.
If nothing matches, call the `not_found` function.
-
+
:param environ: The HTTP application environment
- :param start_response: The application to run when the handling of the
+ :param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
path = environ.get('PATH_INFO', '').lstrip('/')
- logger.info("<application> PATH: %s" % path)
+ logger.info("<application> PATH: %s", path)
if path == "metadata":
return metadata(environ, start_response)
@@ -241,9 +241,9 @@ def application(environ, start_response):
user = environ.get("REMOTE_USER", "")
if not user:
user = environ.get("repoze.who.identity", "")
- logger.info("repoze.who.identity: '%s'" % user)
+ logger.info("repoze.who.identity: '%s'", user)
else:
- logger.info("REMOTE_USER: '%s'" % user)
+ logger.info("REMOTE_USER: '%s'", user)
#logger.info(logging.Logger.manager.loggerDict)
for regex, callback in urls:
if user:
diff --git a/example/sp-wsgi/sp.py b/example/sp-wsgi/sp.py
index 27759594..278b108c 100755
--- a/example/sp-wsgi/sp.py
+++ b/example/sp-wsgi/sp.py
@@ -4,7 +4,10 @@ import logging
import re
import argparse
import os
-from future.backports.http.cookies import SimpleCookie
+try:
+ from future.backports.http.cookies import SimpleCookie
+except:
+ from Cookie import SimpleCookie
import six
from saml2.extension.pefim import SPCertEnc
@@ -169,7 +172,7 @@ class Cache(object):
def get_user(self, environ):
cookie = environ.get("HTTP_COOKIE", '')
cookie = cookie.decode("UTF-8")
- logger.debug("Cookie: %s" % cookie)
+ logger.debug("Cookie: %s", cookie)
if cookie:
cookie_obj = SimpleCookie(cookie)
morsel = cookie_obj.get(self.cookie_name, None)
@@ -185,7 +188,7 @@ class Cache(object):
def delete_cookie(self, environ):
cookie = environ.get("HTTP_COOKIE", '')
- logger.debug("delete cookie: %s" % cookie)
+ logger.debug("delete cookie: %s", cookie)
if cookie:
_name = self.cookie_name
cookie_obj = SimpleCookie(cookie)
@@ -193,7 +196,7 @@ class Cache(object):
cookie = SimpleCookie()
cookie[_name] = ""
cookie[_name]['path'] = "/"
- logger.debug("Expire: %s" % morsel)
+ logger.debug("Expire: %s", morsel)
cookie[_name]["expires"] = _expiration("now")
return cookie.output().split(": ", 1)
return None
@@ -205,7 +208,7 @@ class Cache(object):
cookie[self.cookie_name] = uid
cookie[self.cookie_name]['path'] = "/"
cookie[self.cookie_name]["expires"] = _expiration(480)
- logger.debug("Cookie expires: %s" % cookie[self.cookie_name]["expires"])
+ logger.debug("Cookie expires: %s", cookie[self.cookie_name]["expires"])
return cookie.output().encode("UTF-8").split(": ", 1)
@@ -217,11 +220,11 @@ class Cache(object):
class Service(object):
def __init__(self, environ, start_response, user=None):
self.environ = environ
- logger.debug("ENVIRON: %s" % environ)
+ logger.debug("ENVIRON: %s", environ)
self.start_response = start_response
self.user = user
self.sp = None
-
+
def unpack_redirect(self):
if "QUERY_STRING" in self.environ:
_qs = self.environ["QUERY_STRING"]
@@ -231,7 +234,7 @@ class Service(object):
def unpack_post(self):
_dict = parse_qs(get_post(self.environ))
- logger.debug("unpack_post:: %s" % _dict)
+ logger.debug("unpack_post:: %s", _dict)
try:
return dict([(k, v[0]) for k, v in _dict.items()])
except Exception:
@@ -251,11 +254,11 @@ class Service(object):
_dict = self.unpack_post()
else:
_dict = None
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return _dict
def operation(self, _dict, binding):
- logger.debug("_operation: %s" % _dict)
+ logger.debug("_operation: %s", _dict)
if not _dict:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -313,7 +316,7 @@ class Service(object):
"""
logger.debug("- SOAP -")
_dict = self.unpack_soap()
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return self.operation(_dict, BINDING_SOAP)
def uri(self):
@@ -360,11 +363,11 @@ class ACS(Service):
self.response = self.sp.parse_authn_request_response(
response, binding, self.outstanding_queries, self.cache.outstanding_certs)
except UnknownPrincipal as excp:
- logger.error("UnknownPrincipal: %s" % (excp,))
+ logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
- logger.error("UnsupportedBinding: %s" % (excp,))
+ logger.error("UnsupportedBinding: %s", excp)
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
except VerificationError as err:
@@ -374,7 +377,7 @@ class ACS(Service):
resp = ServiceError("Other error: %s" % (err,))
return resp(self.environ, self.start_response)
- logger.info("AVA: %s" % self.response.ava)
+ logger.info("AVA: %s", self.response.ava)
user = User(self.response.name_id, self.response.ava)
cookie = self.cache.set_cookie(user)
@@ -385,7 +388,7 @@ class ACS(Service):
return resp(self.environ, self.start_response)
def verify_attributes(self, ava):
- logger.info("SP: %s" % self.sp.config.entityid)
+ logger.info("SP: %s", self.sp.config.entityid)
rest = POLICY.get_entity_categories(
self.sp.config.entityid, self.sp.metadata)
@@ -447,7 +450,7 @@ class SSO(object):
def _wayf_redirect(self, came_from):
sid_ = sid()
self.cache.outstanding_queries[sid_] = came_from
- logger.debug("Redirect to WAYF function: %s" % self.wayf)
+ logger.debug("Redirect to WAYF function: %s", self.wayf)
return -1, SeeOther(headers=[('Location', "%s?%s" % (self.wayf, sid_))])
def _pick_idp(self, came_from):
@@ -458,7 +461,7 @@ class SSO(object):
_cli = self.sp
- logger.debug("[_pick_idp] %s" % self.environ)
+ logger.debug("[_pick_idp] %s", self.environ)
if "HTTP_PAOS" in self.environ:
if self.environ["HTTP_PAOS"] == PAOS_HEADER_INFO:
if 'application/vnd.paos+xml' in self.environ["HTTP_ACCEPT"]:
@@ -475,7 +478,7 @@ class SSO(object):
if not _entityid:
return -1, ServiceError("No IdP to talk to")
- logger.debug("IdP to talk to: %s" % _entityid)
+ logger.debug("IdP to talk to: %s", _entityid)
return ecp.ecp_auth_request(_cli, _entityid, _rstate)
else:
return -1, ServiceError('Faulty Accept header')
@@ -505,7 +508,7 @@ class SSO(object):
if _idp_entity_id in idps:
idp_entity_id = _idp_entity_id
except KeyError:
- logger.debug("No IdP entity ID in query: %s" % query)
+ logger.debug("No IdP entity ID in query: %s", query)
pass
if not idp_entity_id:
@@ -543,7 +546,7 @@ class SSO(object):
else:
return -1, NotImplemented("No WAYF or DS present!")
- logger.info("Chosen IdP: '%s'" % idp_entity_id)
+ logger.info("Chosen IdP: '%s'", idp_entity_id)
return 0, idp_entity_id
def redirect_to_auth(self, _cli, entity_id, came_from, sigalg=""):
@@ -552,8 +555,8 @@ class SSO(object):
_binding, destination = _cli.pick_binding(
"single_sign_on_service", self.bindings, "idpsso",
entity_id=entity_id)
- logger.debug("binding: %s, destination: %s" % (_binding,
- destination))
+ logger.debug("binding: %s, destination: %s", _binding,
+ destination)
# Binding here is the response binding that is which binding the
# IDP should use to return the response.
acs = _cli.config.getattr("endpoints", "sp")[
@@ -602,14 +605,14 @@ class SSO(object):
# Which page was accessed to get here
came_from = geturl(self.environ)
- logger.debug("[sp.challenge] RelayState >> '%s'" % came_from)
+ logger.debug("[sp.challenge] RelayState >> '%s'", came_from)
# If more than one idp and if none is selected, I have to do wayf
(done, response) = self._pick_idp(came_from)
# Three cases: -1 something went wrong or Discovery service used
# 0 I've got an IdP to send a request to
# >0 ECP in progress
- logger.debug("_idp_pick returned: %s" % done)
+ logger.debug("_idp_pick returned: %s", done)
if done == -1:
return response(self.environ, self.start_response)
elif done > 0:
@@ -687,11 +690,11 @@ def logout(environ, start_response, sp):
sso = SSO(sp, environ, start_response, cache=CACHE, **ARGS)
return sso.do()
- logger.info("[logout] subject_id: '%s'" % (user.name_id,))
+ logger.info("[logout] subject_id: '%s'", user.name_id)
# What if more than one
data = sp.global_logout(user.name_id)
- logger.info("[logout] global_logout > %s" % data)
+ logger.info("[logout] global_logout > %s", data)
for entity_id, logout_info in data.items():
if isinstance(logout_info, tuple):
@@ -719,8 +722,8 @@ def logout(environ, start_response, sp):
def finish_logout(environ, start_response):
- logger.info("[logout done] environ: %s" % environ)
- logger.info("[logout done] remaining subjects: %s" % CACHE.uid2user.values())
+ logger.info("[logout done] environ: %s", environ)
+ logger.info("[logout done] remaining subjects: %s", CACHE.uid2user.values())
# remove cookie and stored info
cookie = CACHE.delete_cookie(environ)
@@ -772,7 +775,7 @@ def metadata(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
def application(environ, start_response):
@@ -781,14 +784,14 @@ def application(environ, start_response):
the functions from above.
If nothing matches, call the `not_found` function.
-
+
:param environ: The HTTP application environment
- :param start_response: The application to run when the handling of the
+ :param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
path = environ.get('PATH_INFO', '').lstrip('/')
- logger.debug("<application> PATH: '%s'" % path)
+ logger.debug("<application> PATH: '%s'", path)
if path == "metadata":
return metadata(environ, start_response)