diff options
author | Roland Hedberg <roland.hedberg@adm.umu.se> | 2015-11-01 15:28:34 -0800 |
---|---|---|
committer | Roland Hedberg <roland.hedberg@adm.umu.se> | 2015-11-01 15:28:34 -0800 |
commit | bc9c6d0f16eb2a97225369225f5604485b327cf6 (patch) | |
tree | e4a81aa724812e586f83bbc85a2f78a486eb3e8d /src/saml2/entity.py | |
parent | 343b4eed40f56e5c8e47457bcd9858554c73855c (diff) | |
download | pysaml2-bc9c6d0f16eb2a97225369225f5604485b327cf6.tar.gz |
Added a new exception (UnknownBinding) and used it.
Diffstat (limited to 'src/saml2/entity.py')
-rw-r--r-- | src/saml2/entity.py | 139 |
1 files changed, 93 insertions, 46 deletions
diff --git a/src/saml2/entity.py b/src/saml2/entity.py index 4d0c548a..31c9d6ca 100644 --- a/src/saml2/entity.py +++ b/src/saml2/entity.py @@ -1,5 +1,5 @@ import base64 -#from binascii import hexlify +# from binascii import hexlify import copy import logging from hashlib import sha1 @@ -25,7 +25,8 @@ from saml2 import soap from saml2 import element_to_extension_element from saml2 import extension_elements_to_elements -from saml2.saml import NameID, EncryptedAssertion +from saml2.saml import NameID +from saml2.saml import EncryptedAssertion from saml2.saml import Issuer from saml2.saml import NAMEID_FORMAT_ENTITY from saml2.response import LogoutResponse @@ -88,6 +89,10 @@ SERVICE2MESSAGE = { } +class UnknownBinding(SAMLError): + pass + + def create_artifact(entity_id, message_handle, endpoint_index=0): """ SAML_artifact := B64(TypeCode EndpointIndex RemainingArtifact) @@ -285,8 +290,8 @@ class Entity(HTTPBase): logger.error("Failed to find consumer URL: %s, %s, %s", entity_id, bindings, descr_type) - #logger.error("Bindings: %s", bindings) - #logger.error("Entities: %s", self.metadata) + # logger.error("Bindings: %s", bindings) + # logger.error("Entities: %s", self.metadata) raise SAMLError("Unknown entity or unsupported bindings") @@ -362,11 +367,11 @@ class Entity(HTTPBase): :param msgtype: :return: """ - #logger.debug("unravel '%s'", txt) + # logger.debug("unravel '%s'", txt) if binding not in [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST, BINDING_SOAP, BINDING_URI, BINDING_HTTP_ARTIFACT, None]: - raise ValueError("Don't know how to handle '%s'" % binding) + raise UnknownBinding("Don't know how to handle '%s'" % binding) else: try: if binding == BINDING_HTTP_REDIRECT: @@ -406,7 +411,7 @@ class Entity(HTTPBase): """ return open_soap_envelope(text) -# -------------------------------------------------------------------------- + # -------------------------------------------------------------------------- def sign(self, msg, mid=None, to_sign=None, sign_prepare=False): if msg.signature is None: @@ -520,7 +525,8 @@ class Entity(HTTPBase): return True return False - def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=None): + def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, + node_xpath=None): """ Encryption of assertions. :param encrypt_cert: Certificate to be used for encryption. @@ -547,7 +553,8 @@ class Entity(HTTPBase): _cert = "%s%s" % (_cert, end_cert) _, cert_file = make_temp(_cert.encode('ascii'), decode=False) response = cbxs.encrypt_assertion(response, cert_file, - pre_encryption_part(), node_xpath=node_xpath) + pre_encryption_part(), + node_xpath=node_xpath) return response except Exception as ex: exception = ex @@ -558,14 +565,21 @@ class Entity(HTTPBase): def _response(self, in_response_to, consumer_url=None, status=None, issuer=None, sign=False, to_sign=None, sp_entity_id=None, - encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False, - encrypt_cert_advice=None, encrypt_cert_assertion=None,sign_assertion=None, pefim=False, **kwargs): + encrypt_assertion=False, + encrypt_assertion_self_contained=False, + encrypted_advice_attributes=False, + encrypt_cert_advice=None, encrypt_cert_assertion=None, + sign_assertion=None, pefim=False, **kwargs): """ Create a Response. Encryption: - encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is - true, then will the function try to encrypt the assertion in the the advice element of the main - assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists - in the advice element the main assertion will be encrypted instead, since it's no point to encrypt + encrypt_assertion must be true for encryption to be + performed. If encrypted_advice_attributes also is + true, then will the function try to encrypt the assertion in + the the advice element of the main + assertion. Only one assertion element is allowed in the + advice element, if multiple assertions exists + in the advice element the main assertion will be encrypted + instead, since it's no point to encrypt If encrypted_advice_attributes is false the main assertion will be encrypted. Since the same key @@ -577,13 +591,17 @@ class Entity(HTTPBase): :param to_sign: If there are other parts to sign :param sp_entity_id: Entity ID for the calling service provider. :param encrypt_assertion: True if assertions should be encrypted. - :param encrypt_assertion_self_contained: True if all encrypted assertions should have alla namespaces - selfcontained. - :param encrypted_advice_attributes: True if assertions in the advice element should be encrypted. - :param encrypt_cert_advice: Certificate to be used for encryption of assertions in the advice element. - :param encrypt_cert_assertion: Certificate to be used for encryption of assertions. + :param encrypt_assertion_self_contained: True if all encrypted + assertions should have alla namespaces selfcontained. + :param encrypted_advice_attributes: True if assertions in the advice + element should be encrypted. + :param encrypt_cert_advice: Certificate to be used for encryption of + assertions in the advice element. + :param encrypt_cert_assertion: Certificate to be used for encryption + of assertions. :param sign_assertion: True if assertions should be signed. - :param pefim: True if a response according to the PEFIM profile should be created. + :param pefim: True if a response according to the PEFIM profile + should be created. :param kwargs: Extra key word arguments :return: A Response instance """ @@ -611,46 +629,63 @@ class Entity(HTTPBase): if not has_encrypt_cert and encrypt_cert_assertion is None: encrypt_assertion = False - if encrypt_assertion or (encrypted_advice_attributes and response.assertion.advice is not None and - len(response.assertion.advice.assertion) == 1): + if encrypt_assertion or ( + encrypted_advice_attributes and response.assertion.advice is + not None and + len(response.assertion.advice.assertion) == 1): if sign: response.signature = pre_signature_part(response.id, self.sec.my_cert, 1) sign_class = [(class_name(response), response.id)] cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary) encrypt_advice = False - if encrypted_advice_attributes and response.assertion.advice is not None \ + if encrypted_advice_attributes and response.assertion.advice is \ + not None \ and len(response.assertion.advice.assertion) > 0: _assertions = response.assertion if not isinstance(_assertions, list): _assertions = [_assertions] for _assertion in _assertions: _assertion.advice.encrypted_assertion = [] - _assertion.advice.encrypted_assertion.append(EncryptedAssertion()) - _advice_assertions = copy.deepcopy(_assertion.advice.assertion) + _assertion.advice.encrypted_assertion.append( + EncryptedAssertion()) + _advice_assertions = copy.deepcopy( + _assertion.advice.assertion) _assertion.advice.assertion = [] if not isinstance(_advice_assertions, list): _advice_assertions = [_advice_assertions] for tmp_assertion in _advice_assertions: to_sign_advice = [] if sign_assertion and not pefim: - tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1) - to_sign_advice.append((class_name(tmp_assertion), tmp_assertion.id)) - #tmp_assertion = response.assertion.advice.assertion[0] - _assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion) + tmp_assertion.signature = pre_signature_part( + tmp_assertion.id, self.sec.my_cert, 1) + to_sign_advice.append( + (class_name(tmp_assertion), tmp_assertion.id)) + + # tmp_assertion = response.assertion.advice.assertion[0] + _assertion.advice.encrypted_assertion[ + 0].add_extension_element(tmp_assertion) if encrypt_assertion_self_contained: - advice_tag = response.assertion.advice._to_element_tree().tag + advice_tag = \ + response.assertion.advice._to_element_tree().tag assertion_tag = tmp_assertion._to_element_tree().tag response = \ response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion( assertion_tag, advice_tag) - node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in - ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]]) + node_xpath = ''.join( + ["/*[local-name()=\"%s\"]" % v for v in + ["Response", "Assertion", "Advice", + "EncryptedAssertion", "Assertion"]]) if to_sign_advice: - response = signed_instance_factory(response, self.sec, to_sign_advice) - response = self._encrypt_assertion(encrypt_cert_advice, sp_entity_id, response, node_xpath=node_xpath) + response = signed_instance_factory(response, + self.sec, + to_sign_advice) + response = self._encrypt_assertion(encrypt_cert_advice, + sp_entity_id, + response, + node_xpath=node_xpath) response = response_from_string(response) if encrypt_assertion: @@ -660,24 +695,34 @@ class Entity(HTTPBase): if not isinstance(_assertions, list): _assertions = [_assertions] for _assertion in _assertions: - _assertion.signature = pre_signature_part(_assertion.id, self.sec.my_cert, 1) - to_sign_assertion.append((class_name(_assertion), _assertion.id)) + _assertion.signature = pre_signature_part(_assertion.id, + self.sec.my_cert, + 1) + to_sign_assertion.append( + (class_name(_assertion), _assertion.id)) if encrypt_assertion_self_contained: try: - assertion_tag = response.assertion._to_element_tree().tag + assertion_tag = response.assertion._to_element_tree( + + ).tag except: - assertion_tag = response.assertion[0]._to_element_tree().tag + assertion_tag = response.assertion[ + 0]._to_element_tree().tag response = pre_encrypt_assertion(response) - response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( + response = \ + response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion( assertion_tag) else: response = pre_encrypt_assertion(response) if to_sign_assertion: - response = signed_instance_factory(response, self.sec, to_sign_assertion) - response = self._encrypt_assertion(encrypt_cert_assertion, sp_entity_id, response) + response = signed_instance_factory(response, self.sec, + to_sign_assertion) + response = self._encrypt_assertion(encrypt_cert_assertion, + sp_entity_id, response) else: if to_sign: - response = signed_instance_factory(response, self.sec, to_sign) + response = signed_instance_factory(response, self.sec, + to_sign) if sign: return signed_instance_factory(response, self.sec, sign_class) else: @@ -965,7 +1010,8 @@ class Entity(HTTPBase): kwargs["terminate"] = terminate else: raise AttributeError( - "One of NewID, NewEncryptedNameID or Terminate has to be provided") + "One of NewID, NewEncryptedNameID or Terminate has to be " + "provided") return self._message(ManageNameIDRequest, destination, consent=consent, extensions=extensions, sign=sign, **kwargs) @@ -1081,14 +1127,15 @@ class Entity(HTTPBase): keys.append(_cert["key"]) only_identity_in_encrypted_assertion = False if "only_identity_in_encrypted_assertion" in kwargs: - only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"] + only_identity_in_encrypted_assertion = kwargs[ + "only_identity_in_encrypted_assertion"] response = response.verify(keys) if not response: return None - #logger.debug(response) + # logger.debug(response) return response |