summaryrefslogtreecommitdiff
path: root/src/saml2/entity.py
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2015-11-01 15:28:34 -0800
committerRoland Hedberg <roland.hedberg@adm.umu.se>2015-11-01 15:28:34 -0800
commitbc9c6d0f16eb2a97225369225f5604485b327cf6 (patch)
treee4a81aa724812e586f83bbc85a2f78a486eb3e8d /src/saml2/entity.py
parent343b4eed40f56e5c8e47457bcd9858554c73855c (diff)
downloadpysaml2-bc9c6d0f16eb2a97225369225f5604485b327cf6.tar.gz
Added a new exception (UnknownBinding) and used it.
Diffstat (limited to 'src/saml2/entity.py')
-rw-r--r--src/saml2/entity.py139
1 files changed, 93 insertions, 46 deletions
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index 4d0c548a..31c9d6ca 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -1,5 +1,5 @@
import base64
-#from binascii import hexlify
+# from binascii import hexlify
import copy
import logging
from hashlib import sha1
@@ -25,7 +25,8 @@ from saml2 import soap
from saml2 import element_to_extension_element
from saml2 import extension_elements_to_elements
-from saml2.saml import NameID, EncryptedAssertion
+from saml2.saml import NameID
+from saml2.saml import EncryptedAssertion
from saml2.saml import Issuer
from saml2.saml import NAMEID_FORMAT_ENTITY
from saml2.response import LogoutResponse
@@ -88,6 +89,10 @@ SERVICE2MESSAGE = {
}
+class UnknownBinding(SAMLError):
+ pass
+
+
def create_artifact(entity_id, message_handle, endpoint_index=0):
"""
SAML_artifact := B64(TypeCode EndpointIndex RemainingArtifact)
@@ -285,8 +290,8 @@ class Entity(HTTPBase):
logger.error("Failed to find consumer URL: %s, %s, %s",
entity_id, bindings, descr_type)
- #logger.error("Bindings: %s", bindings)
- #logger.error("Entities: %s", self.metadata)
+ # logger.error("Bindings: %s", bindings)
+ # logger.error("Entities: %s", self.metadata)
raise SAMLError("Unknown entity or unsupported bindings")
@@ -362,11 +367,11 @@ class Entity(HTTPBase):
:param msgtype:
:return:
"""
- #logger.debug("unravel '%s'", txt)
+ # logger.debug("unravel '%s'", txt)
if binding not in [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST,
BINDING_SOAP, BINDING_URI, BINDING_HTTP_ARTIFACT,
None]:
- raise ValueError("Don't know how to handle '%s'" % binding)
+ raise UnknownBinding("Don't know how to handle '%s'" % binding)
else:
try:
if binding == BINDING_HTTP_REDIRECT:
@@ -406,7 +411,7 @@ class Entity(HTTPBase):
"""
return open_soap_envelope(text)
-# --------------------------------------------------------------------------
+ # --------------------------------------------------------------------------
def sign(self, msg, mid=None, to_sign=None, sign_prepare=False):
if msg.signature is None:
@@ -520,7 +525,8 @@ class Entity(HTTPBase):
return True
return False
- def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=None):
+ def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response,
+ node_xpath=None):
""" Encryption of assertions.
:param encrypt_cert: Certificate to be used for encryption.
@@ -547,7 +553,8 @@ class Entity(HTTPBase):
_cert = "%s%s" % (_cert, end_cert)
_, cert_file = make_temp(_cert.encode('ascii'), decode=False)
response = cbxs.encrypt_assertion(response, cert_file,
- pre_encryption_part(), node_xpath=node_xpath)
+ pre_encryption_part(),
+ node_xpath=node_xpath)
return response
except Exception as ex:
exception = ex
@@ -558,14 +565,21 @@ class Entity(HTTPBase):
def _response(self, in_response_to, consumer_url=None, status=None,
issuer=None, sign=False, to_sign=None, sp_entity_id=None,
- encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
- encrypt_cert_advice=None, encrypt_cert_assertion=None,sign_assertion=None, pefim=False, **kwargs):
+ encrypt_assertion=False,
+ encrypt_assertion_self_contained=False,
+ encrypted_advice_attributes=False,
+ encrypt_cert_advice=None, encrypt_cert_assertion=None,
+ sign_assertion=None, pefim=False, **kwargs):
""" Create a Response.
Encryption:
- encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
- true, then will the function try to encrypt the assertion in the the advice element of the main
- assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
- in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
+ encrypt_assertion must be true for encryption to be
+ performed. If encrypted_advice_attributes also is
+ true, then will the function try to encrypt the assertion in
+ the the advice element of the main
+ assertion. Only one assertion element is allowed in the
+ advice element, if multiple assertions exists
+ in the advice element the main assertion will be encrypted
+ instead, since it's no point to encrypt
If encrypted_advice_attributes is
false the main assertion will be encrypted. Since the same key
@@ -577,13 +591,17 @@ class Entity(HTTPBase):
:param to_sign: If there are other parts to sign
:param sp_entity_id: Entity ID for the calling service provider.
:param encrypt_assertion: True if assertions should be encrypted.
- :param encrypt_assertion_self_contained: True if all encrypted assertions should have alla namespaces
- selfcontained.
- :param encrypted_advice_attributes: True if assertions in the advice element should be encrypted.
- :param encrypt_cert_advice: Certificate to be used for encryption of assertions in the advice element.
- :param encrypt_cert_assertion: Certificate to be used for encryption of assertions.
+ :param encrypt_assertion_self_contained: True if all encrypted
+ assertions should have alla namespaces selfcontained.
+ :param encrypted_advice_attributes: True if assertions in the advice
+ element should be encrypted.
+ :param encrypt_cert_advice: Certificate to be used for encryption of
+ assertions in the advice element.
+ :param encrypt_cert_assertion: Certificate to be used for encryption
+ of assertions.
:param sign_assertion: True if assertions should be signed.
- :param pefim: True if a response according to the PEFIM profile should be created.
+ :param pefim: True if a response according to the PEFIM profile
+ should be created.
:param kwargs: Extra key word arguments
:return: A Response instance
"""
@@ -611,46 +629,63 @@ class Entity(HTTPBase):
if not has_encrypt_cert and encrypt_cert_assertion is None:
encrypt_assertion = False
- if encrypt_assertion or (encrypted_advice_attributes and response.assertion.advice is not None and
- len(response.assertion.advice.assertion) == 1):
+ if encrypt_assertion or (
+ encrypted_advice_attributes and response.assertion.advice is
+ not None and
+ len(response.assertion.advice.assertion) == 1):
if sign:
response.signature = pre_signature_part(response.id,
self.sec.my_cert, 1)
sign_class = [(class_name(response), response.id)]
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
encrypt_advice = False
- if encrypted_advice_attributes and response.assertion.advice is not None \
+ if encrypted_advice_attributes and response.assertion.advice is \
+ not None \
and len(response.assertion.advice.assertion) > 0:
_assertions = response.assertion
if not isinstance(_assertions, list):
_assertions = [_assertions]
for _assertion in _assertions:
_assertion.advice.encrypted_assertion = []
- _assertion.advice.encrypted_assertion.append(EncryptedAssertion())
- _advice_assertions = copy.deepcopy(_assertion.advice.assertion)
+ _assertion.advice.encrypted_assertion.append(
+ EncryptedAssertion())
+ _advice_assertions = copy.deepcopy(
+ _assertion.advice.assertion)
_assertion.advice.assertion = []
if not isinstance(_advice_assertions, list):
_advice_assertions = [_advice_assertions]
for tmp_assertion in _advice_assertions:
to_sign_advice = []
if sign_assertion and not pefim:
- tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
- to_sign_advice.append((class_name(tmp_assertion), tmp_assertion.id))
- #tmp_assertion = response.assertion.advice.assertion[0]
- _assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
+ tmp_assertion.signature = pre_signature_part(
+ tmp_assertion.id, self.sec.my_cert, 1)
+ to_sign_advice.append(
+ (class_name(tmp_assertion), tmp_assertion.id))
+
+ # tmp_assertion = response.assertion.advice.assertion[0]
+ _assertion.advice.encrypted_assertion[
+ 0].add_extension_element(tmp_assertion)
if encrypt_assertion_self_contained:
- advice_tag = response.assertion.advice._to_element_tree().tag
+ advice_tag = \
+ response.assertion.advice._to_element_tree().tag
assertion_tag = tmp_assertion._to_element_tree().tag
response = \
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
assertion_tag, advice_tag)
- node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
- ["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
+ node_xpath = ''.join(
+ ["/*[local-name()=\"%s\"]" % v for v in
+ ["Response", "Assertion", "Advice",
+ "EncryptedAssertion", "Assertion"]])
if to_sign_advice:
- response = signed_instance_factory(response, self.sec, to_sign_advice)
- response = self._encrypt_assertion(encrypt_cert_advice, sp_entity_id, response, node_xpath=node_xpath)
+ response = signed_instance_factory(response,
+ self.sec,
+ to_sign_advice)
+ response = self._encrypt_assertion(encrypt_cert_advice,
+ sp_entity_id,
+ response,
+ node_xpath=node_xpath)
response = response_from_string(response)
if encrypt_assertion:
@@ -660,24 +695,34 @@ class Entity(HTTPBase):
if not isinstance(_assertions, list):
_assertions = [_assertions]
for _assertion in _assertions:
- _assertion.signature = pre_signature_part(_assertion.id, self.sec.my_cert, 1)
- to_sign_assertion.append((class_name(_assertion), _assertion.id))
+ _assertion.signature = pre_signature_part(_assertion.id,
+ self.sec.my_cert,
+ 1)
+ to_sign_assertion.append(
+ (class_name(_assertion), _assertion.id))
if encrypt_assertion_self_contained:
try:
- assertion_tag = response.assertion._to_element_tree().tag
+ assertion_tag = response.assertion._to_element_tree(
+
+ ).tag
except:
- assertion_tag = response.assertion[0]._to_element_tree().tag
+ assertion_tag = response.assertion[
+ 0]._to_element_tree().tag
response = pre_encrypt_assertion(response)
- response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
+ response = \
+ response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
assertion_tag)
else:
response = pre_encrypt_assertion(response)
if to_sign_assertion:
- response = signed_instance_factory(response, self.sec, to_sign_assertion)
- response = self._encrypt_assertion(encrypt_cert_assertion, sp_entity_id, response)
+ response = signed_instance_factory(response, self.sec,
+ to_sign_assertion)
+ response = self._encrypt_assertion(encrypt_cert_assertion,
+ sp_entity_id, response)
else:
if to_sign:
- response = signed_instance_factory(response, self.sec, to_sign)
+ response = signed_instance_factory(response, self.sec,
+ to_sign)
if sign:
return signed_instance_factory(response, self.sec, sign_class)
else:
@@ -965,7 +1010,8 @@ class Entity(HTTPBase):
kwargs["terminate"] = terminate
else:
raise AttributeError(
- "One of NewID, NewEncryptedNameID or Terminate has to be provided")
+ "One of NewID, NewEncryptedNameID or Terminate has to be "
+ "provided")
return self._message(ManageNameIDRequest, destination, consent=consent,
extensions=extensions, sign=sign, **kwargs)
@@ -1081,14 +1127,15 @@ class Entity(HTTPBase):
keys.append(_cert["key"])
only_identity_in_encrypted_assertion = False
if "only_identity_in_encrypted_assertion" in kwargs:
- only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"]
+ only_identity_in_encrypted_assertion = kwargs[
+ "only_identity_in_encrypted_assertion"]
response = response.verify(keys)
if not response:
return None
- #logger.debug(response)
+ # logger.debug(response)
return response