summaryrefslogtreecommitdiff
path: root/example/idp2/idp.py
diff options
context:
space:
mode:
Diffstat (limited to 'example/idp2/idp.py')
-rwxr-xr-xexample/idp2/idp.py93
1 files changed, 46 insertions, 47 deletions
diff --git a/example/idp2/idp.py b/example/idp2/idp.py
index 4d67a8d9..b86e2990 100755
--- a/example/idp2/idp.py
+++ b/example/idp2/idp.py
@@ -5,9 +5,7 @@ import importlib
import logging
import os
import re
-import socket
import time
-import ssl
from Cookie import SimpleCookie
from hashlib import sha1
@@ -92,7 +90,7 @@ def dict2list_of_tuples(d):
class Service(object):
def __init__(self, environ, start_response, user=None):
self.environ = environ
- logger.debug("ENVIRON: %s" % environ)
+ logger.debug("ENVIRON: %s", environ)
self.start_response = start_response
self.user = user
@@ -105,7 +103,7 @@ class Service(object):
def unpack_post(self):
_dict = parse_qs(get_post(self.environ))
- logger.debug("unpack_post:: %s" % _dict)
+ logger.debug("unpack_post:: %s", _dict)
try:
return dict([(k, v[0]) for k, v in _dict.items()])
except Exception:
@@ -125,11 +123,11 @@ class Service(object):
_dict = self.unpack_post()
else:
_dict = None
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return _dict
def operation(self, saml_msg, binding):
- logger.debug("_operation: %s" % saml_msg)
+ logger.debug("_operation: %s", saml_msg)
if not (saml_msg and 'SAMLRequest' in saml_msg):
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -211,7 +209,7 @@ class Service(object):
"""
logger.debug("- SOAP -")
_dict = self.unpack_soap()
- logger.debug("_dict: %s" % _dict)
+ logger.debug("_dict: %s", _dict)
return self.operation(_dict, BINDING_SOAP)
def uri(self):
@@ -274,7 +272,7 @@ class SSO(Service):
logger.info("parsed OK")
_authn_req = self.req_info.message
- logger.debug("%s" % _authn_req)
+ logger.debug("%s", _authn_req)
try:
self.binding_out, self.destination = IDP.pick_binding(
@@ -282,11 +280,11 @@ class SSO(Service):
bindings=self.response_bindings,
entity_id=_authn_req.issuer.text, request=_authn_req)
except Exception as err:
- logger.error("Couldn't find receiver endpoint: %s" % err)
+ logger.error("Couldn't find receiver endpoint: %s", err)
raise
- logger.debug("Binding: %s, destination: %s" % (self.binding_out,
- self.destination))
+ logger.debug("Binding: %s, destination: %s", self.binding_out,
+ self.destination)
resp_args = {}
try:
@@ -314,18 +312,18 @@ class SSO(Service):
try:
resp_args, _resp = self.verify_request(query, binding_in)
except UnknownPrincipal as excp:
- logger.error("UnknownPrincipal: %s" % (excp,))
+ logger.error("UnknownPrincipal: %s", excp)
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
- logger.error("UnsupportedBinding: %s" % (excp,))
+ logger.error("UnsupportedBinding: %s", excp)
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
if not _resp:
identity = USERS[self.user].copy()
# identity["eduPersonTargetedID"] = get_eptid(IDP, query, session)
- logger.info("Identity: %s" % (identity,))
+ logger.info("Identity: %s", identity)
if REPOZE_ID_EQUIVALENT:
identity[REPOZE_ID_EQUIVALENT] = self.user
@@ -346,7 +344,7 @@ class SSO(Service):
resp = ServiceError("Exception: %s" % (excp,))
return resp(self.environ, self.start_response)
- logger.info("AuthNResponse: %s" % _resp)
+ logger.info("AuthNResponse: %s", _resp)
if self.op_type == "ecp":
kwargs = {"soap_headers": [
ecp.Response(
@@ -358,12 +356,12 @@ class SSO(Service):
"%s" % _resp, self.destination,
relay_state, response=True, **kwargs)
- logger.debug("HTTPargs: %s" % http_args)
+ logger.debug("HTTPargs: %s", http_args)
return self.response(self.binding_out, http_args)
@staticmethod
def _store_request(saml_msg):
- logger.debug("_store_request: %s" % saml_msg)
+ logger.debug("_store_request: %s", saml_msg)
key = sha1(saml_msg["SAMLRequest"]).hexdigest()
# store the AuthnRequest
IDP.ticket[key] = saml_msg
@@ -509,7 +507,7 @@ def do_authentication(environ, start_response, authn_context, key,
if len(auth_info):
method, reference = auth_info[0]
- logger.debug("Authn chosen: %s (ref=%s)" % (method, reference))
+ logger.debug("Authn chosen: %s (ref=%s)", method, reference)
return method(environ, start_response, reference, key, redirect_uri, headers)
else:
resp = Unauthorized("No usable authentication method")
@@ -547,7 +545,7 @@ def username_password_authn(environ, start_response, reference, key,
"authn_reference": reference,
"redirect_uri": redirect_uri
}
- logger.info("do_authentication argv: %s" % argv)
+ logger.info("do_authentication argv: %s", argv)
return resp(environ, start_response, **argv)
@@ -563,7 +561,7 @@ def verify_username_and_password(dic):
def do_verify(environ, start_response, _):
query = parse_qs(get_post(environ))
- logger.debug("do_verify: %s" % query)
+ logger.debug("do_verify: %s", query)
try:
_ok, user = verify_username_and_password(query)
@@ -577,13 +575,13 @@ def do_verify(environ, start_response, _):
uid = rndstr(24)
IDP.cache.uid2user[uid] = user
IDP.cache.user2uid[user] = uid
- logger.debug("Register %s under '%s'" % (user, uid))
+ logger.debug("Register %s under '%s'", user, uid)
kaka = set_cookie("idpauthn", "/", uid, query["authn_reference"][0])
lox = "%s?id=%s&key=%s" % (query["redirect_uri"][0], uid,
query["key"][0])
- logger.debug("Redirect => %s" % lox)
+ logger.debug("Redirect => %s", lox)
resp = Redirect(lox, headers=[kaka], content="text/html")
return resp(environ, start_response)
@@ -611,17 +609,17 @@ class SLO(Service):
logger.info("--- Single Log Out Service ---")
try:
- logger.debug("req: '%s'" % request)
+ logger.debug("req: '%s'", request)
req_info = IDP.parse_logout_request(request, binding)
except Exception as exc:
- logger.error("Bad request: %s" % exc)
+ logger.error("Bad request: %s", exc)
resp = BadRequest("%s" % exc)
return resp(self.environ, self.start_response)
msg = req_info.message
if msg.name_id:
lid = IDP.ident.find_local_id(msg.name_id)
- logger.info("local identifier: %s" % lid)
+ logger.info("local identifier: %s", lid)
if lid in IDP.cache.user2uid:
uid = IDP.cache.user2uid[lid]
if uid in IDP.cache.uid2user:
@@ -631,8 +629,8 @@ class SLO(Service):
try:
IDP.session_db.remove_authn_statements(msg.name_id)
except KeyError as exc:
- logger.error("Unknown session: %s" % exc)
- resp = ServiceError("Unknown session: %s" % exc)
+ logger.error("Unknown session: %s", exc)
+ resp = ServiceError("Unknown session: %s", exc)
return resp(self.environ, self.start_response)
resp = IDP.create_logout_response(msg, [binding])
@@ -650,7 +648,7 @@ class SLO(Service):
hinfo = IDP.apply_binding(binding, "%s" % resp, destination,
relay_state, response=response)
except Exception as exc:
- logger.error("ServiceError: %s" % exc)
+ logger.error("ServiceError: %s", exc)
resp = ServiceError("%s" % exc)
return resp(self.environ, self.start_response)
@@ -658,7 +656,7 @@ class SLO(Service):
delco = delete_cookie(self.environ, "idpauthn")
if delco:
hinfo["headers"].append(delco)
- logger.info("Header: %s" % (hinfo["headers"],))
+ logger.info("Header: %s", (hinfo["headers"],))
if binding == BINDING_HTTP_REDIRECT:
for key, value in hinfo['headers']:
@@ -689,7 +687,7 @@ class NMI(Service):
request.name_id, request.new_id, request.new_encrypted_id,
request.terminate)
- logger.debug("New NameID: %s" % name_id)
+ logger.debug("New NameID: %s", name_id)
_resp = IDP.create_manage_name_id_response(request)
@@ -719,12 +717,12 @@ class AIDR(Service):
hinfo = IDP.apply_binding(BINDING_URI, "%s" % assertion, response=True)
- logger.debug("HINFO: %s" % hinfo)
+ logger.debug("HINFO: %s", hinfo)
resp = Response(hinfo["data"], headers=hinfo["headers"])
return resp(self.environ, self.start_response)
def operation(self, _dict, binding, **kwargs):
- logger.debug("_operation: %s" % _dict)
+ logger.debug("_operation: %s", _dict)
if not _dict or "ID" not in _dict:
resp = BadRequest('Error parsing request or no request')
return resp(self.environ, self.start_response)
@@ -765,7 +763,7 @@ class AQS(Service):
_query.requested_authn_context,
_query.session_index)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -788,7 +786,7 @@ class ATTR(Service):
name_id = _query.subject.name_id
uid = name_id.text
- logger.debug("Local uid: %s" % uid)
+ logger.debug("Local uid: %s", uid)
identity = EXTRA[uid]
# Comes in over SOAP so only need to construct the response
@@ -796,7 +794,7 @@ class ATTR(Service):
msg = IDP.create_attribute_response(identity,
name_id=name_id, **args)
- logger.debug("response: %s" % msg)
+ logger.debug("response: %s", msg)
hinfo = IDP.apply_binding(BINDING_SOAP, "%s" % msg, "", "",
response=True)
@@ -843,7 +841,7 @@ class NIM(Service):
# Cookie handling
# ----------------------------------------------------------------------------
def info_from_cookie(kaka):
- logger.debug("KAKA: %s" % kaka)
+ logger.debug("KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get("idpauthn", None)
@@ -860,14 +858,14 @@ def info_from_cookie(kaka):
def delete_cookie(environ, name):
kaka = environ.get("HTTP_COOKIE", '')
- logger.debug("delete KAKA: %s" % kaka)
+ logger.debug("delete KAKA: %s", kaka)
if kaka:
cookie_obj = SimpleCookie(kaka)
morsel = cookie_obj.get(name, None)
cookie = SimpleCookie()
cookie[name] = ""
cookie[name]['path'] = "/"
- logger.debug("Expire: %s" % morsel)
+ logger.debug("Expire: %s", morsel)
cookie[name]["expires"] = _expiration("dawn")
return tuple(cookie.output().split(": ", 1))
return None
@@ -878,7 +876,7 @@ def set_cookie(name, _, *args):
cookie[name] = base64.b64encode(":".join(args))
cookie[name]['path'] = "/"
cookie[name]["expires"] = _expiration(5) # 5 minutes from now
- logger.debug("Cookie expires: %s" % cookie[name]["expires"])
+ logger.debug("Cookie expires: %s", cookie[name]["expires"])
return tuple(cookie.output().split(": ", 1))
# ----------------------------------------------------------------------------
@@ -941,7 +939,7 @@ def metadata(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return metadata
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -960,7 +958,7 @@ def staticfile(environ, start_response):
start_response('200 OK', [('Content-Type', "text/xml")])
return open(path, 'r').read()
except Exception as ex:
- logger.error("An error occured while creating metadata:" + ex.message)
+ logger.error("An error occured while creating metadata: %s", ex.message)
return not_found(environ, start_response)
@@ -972,9 +970,9 @@ def application(environ, start_response):
the functions from above can access the url placeholders.
If nothing matches, call the `not_found` function.
-
+
:param environ: The HTTP application environment
- :param start_response: The application to run when the handling of the
+ :param start_response: The application to run when the handling of the
request is done
:return: The response as a list of lines
"""
@@ -985,7 +983,7 @@ def application(environ, start_response):
return metadata(environ, start_response)
kaka = environ.get("HTTP_COOKIE", None)
- logger.info("<application> PATH: %s" % path)
+ logger.info("<application> PATH: %s", path)
if kaka:
logger.info("= KAKA =")
@@ -995,7 +993,7 @@ def application(environ, start_response):
else:
try:
query = parse_qs(environ["QUERY_STRING"])
- logger.debug("QUERY: %s" % query)
+ logger.debug("QUERY: %s", query)
user = IDP.cache.uid2user[query["id"][0]]
except KeyError:
user = None
@@ -1014,7 +1012,7 @@ def application(environ, start_response):
except IndexError:
environ['myapp.url_args'] = path
- logger.debug("Callback: %s" % (callback,))
+ logger.debug("Callback: %s", callback)
if isinstance(callback, tuple):
cls = callback[0](environ, start_response, user)
func = getattr(cls, callback[1])
@@ -1085,7 +1083,8 @@ if __name__ == '__main__':
_https = ""
if CONFIG.HTTPS:
SRV.ssl_adapter = ssl_pyopenssl.pyOpenSSLAdapter(CONFIG.SERVER_CERT,
- CONFIG.SERVER_KEY, CONFIG.CERT_CHAIN)
+ CONFIG.SERVER_KEY,
+ CONFIG.CERT_CHAIN)
_https = " using SSL/TLS"
logger.info("Server starting")
print("IDP listening on %s:%s%s" % (HOST, PORT, _https))