#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (C) 2010-2011 UmeƄ University # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Contains classes and functions that are necessary to implement different bindings. Bindings normally consists of three parts: - rules about what to send - how to package the information - which protocol to use """ import saml2 import base64 import urllib from saml2.s_utils import deflate_and_base64_encode from saml2.soap import SOAPClient, HTTPClient try: from xml.etree import cElementTree as ElementTree except ImportError: try: import cElementTree as ElementTree except ImportError: from elementtree import ElementTree NAMESPACE = "http://schemas.xmlsoap.org/soap/envelope/" FORM_SPEC = """
""" def http_post_message(message, location, relay_state="", typ="SAMLRequest"): """The HTTP POST binding defines a mechanism by which SAML protocol messages may be transmitted within the base64-encoded content of a HTML form control. :param message: The message :param location: Where the form should be posted to :param relay_state: for preserving and conveying state information :return: A tuple containing header information and a HTML message. """ response = ["", """SAML 2.0 POST""", ""] if not isinstance(message, basestring): message = "%s" % (message,) response.append(FORM_SPEC % (location, typ, base64.b64encode(message), relay_state)) response.append("""""") response.append("") return [("Content-type", "text/html")], response def http_redirect_message(message, location, relay_state="", typ="SAMLRequest"): """The HTTP Redirect binding defines a mechanism by which SAML protocol messages can be transmitted within URL parameters. Messages are encoded for use with this binding using a URL encoding technique, and transmitted using the HTTP GET method. The DEFLATE Encoding is used in this function. :param message: The message :param location: Where the message should be posted to :param relay_state: for preserving and conveying state information :return: A tuple containing header information and a HTML message. """ if not isinstance(message, basestring): message = "%s" % (message,) args = {typ: deflate_and_base64_encode(message)} if relay_state: args["RelayState"] = relay_state login_url = "?".join([location, urllib.urlencode(args)]) headers = [('Location', login_url)] body = [""] return headers, body def make_soap_enveloped_saml_thingy(thingy, header_parts=None): """ Returns a soap envelope containing a SAML request as a text string. :param thingy: The SAML thingy :return: The SOAP envelope as a string """ envelope = ElementTree.Element('') envelope.tag = '{%s}Envelope' % NAMESPACE if header_parts: header = ElementTree.Element('') header.tag = '{%s}Header' % NAMESPACE envelope.append(header) for part in header_parts: part.become_child_element_of(header) body = ElementTree.Element('') body.tag = '{%s}Body' % NAMESPACE envelope.append(body) thingy.become_child_element_of(body) return ElementTree.tostring(envelope, encoding="UTF-8") def http_soap_message(message): return ([("Content-type", "application/soap+xml")], make_soap_enveloped_saml_thingy(message)) def http_paos(message, extra=None): return ([("Content-type", "application/soap+xml")], make_soap_enveloped_saml_thingy(message, extra)) def parse_soap_enveloped_saml(text, body_class, header_class=None): """Parses a SOAP enveloped SAML thing and returns header parts and body :param text: The SOAP object as XML :return: header parts and body as saml.samlbase instances """ envelope = ElementTree.fromstring(text) assert envelope.tag == '{%s}Envelope' % NAMESPACE #print len(envelope) body = None header = {} for part in envelope: #print ">",part.tag if part.tag == '{%s}Body' % NAMESPACE: for sub in part: try: body = saml2.create_class_from_element_tree(body_class, sub) except Exception: raise Exception( "Wrong body type (%s) in SOAP envelope" % sub.tag) elif part.tag == '{%s}Header' % NAMESPACE: if not header_class: raise Exception("Header where I didn't expect one") #print "--- HEADER ---" for sub in part: #print ">>",sub.tag for klass in header_class: #print "?{%s}%s" % (klass.c_namespace,klass.c_tag) if sub.tag == "{%s}%s" % (klass.c_namespace, klass.c_tag): header[sub.tag] = \ saml2.create_class_from_element_tree(klass, sub) break return body, header # ----------------------------------------------------------------------------- # def send_using_http_get(request, destination, key_file=None, cert_file=None, # log=None): # # # http = HTTPClient(destination, key_file, cert_file, log) # if log: log.info("HTTP client initiated") # # try: # response = http.get() # except Exception, exc: # if log: log.info("HTTPClient exception: %s" % (exc,)) # return None # # if log: log.info("HTTP request sent and got response: %s" % response) # # return response def send_using_http_post(request, destination, relay_state, key_file=None, cert_file=None, log=None, ca_certs=""): http = HTTPClient(destination, key_file, cert_file, log, ca_certs) if log: log.info("HTTP client initiated") if not isinstance(request, basestring): request = "%s" % (request,) (headers, message) = http_post_message(request, destination, relay_state) try: response = http.post(message, headers) except Exception, exc: if log: log.info("HTTPClient exception: %s" % (exc,)) return None if log: log.info("HTTP request sent and got response: %s" % response) return response def send_using_soap(message, destination, key_file=None, cert_file=None, log=None, ca_certs=""): """ Actual construction of the SOAP message is done by the SOAPClient :param message: The SAML message to send :param destination: Where to send the message :param key_file: If HTTPS this is the client certificate :param cert_file: If HTTPS this a certificates file :param log: A log function to use for logging :param ca_certs: CA certificates to use when verifying server certificates :return: The response gotten from the other side interpreted by the SOAPClient """ soapclient = SOAPClient(destination, key_file, cert_file, log, ca_certs) if log: log.info("SOAP client initiated") try: response = soapclient.send(message) except Exception, exc: if log: log.info("SoapClient exception: %s" % (exc,)) return None if log: log.info("SOAP request sent and got response: %s" % response) return response # ----------------------------------------------------------------------------- PACKING = { saml2.BINDING_HTTP_REDIRECT: http_redirect_message, saml2.BINDING_HTTP_POST: http_post_message, } def packager( identifier ): try: return PACKING[identifier] except KeyError: raise Exception("Unkown binding type: %s" % identifier)