summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2014-04-14 14:19:19 +0200
committerRoland Hedberg <roland.hedberg@adm.umu.se>2014-04-14 14:19:19 +0200
commita6ef115ff77d81bec4e8beb2cbd720d903bb806f (patch)
tree059d990a090ac242845321c42bbce610e025160f
parentfca906e9b0b2df455885119c9a9603171874ef4a (diff)
downloadpysaml2-a6ef115ff77d81bec4e8beb2cbd720d903bb806f.tar.gz
Fixed handling of signed and then encrypted response assertions. At the same time added support for dealing with any combination of encrypted/non-encrypted assertions.
-rw-r--r--src/saml2/assertion.py198
-rw-r--r--src/saml2/entity.py26
-rw-r--r--src/saml2/response.py89
-rw-r--r--src/saml2/sigver.py50
-rw-r--r--tests/test_12_s_utils.py2
-rw-r--r--tests/test_40_sigver.py262
-rw-r--r--tests/test_41_response.py53
-rw-r--r--tests/test_50_server.py59
-rw-r--r--tests/test_51_client.py142
-rw-r--r--tests/test_60_sp.py1
-rw-r--r--tests/test_81_certificates.py129
11 files changed, 632 insertions, 379 deletions
diff --git a/src/saml2/assertion.py b/src/saml2/assertion.py
index 59cd6216..ad083450 100644
--- a/src/saml2/assertion.py
+++ b/src/saml2/assertion.py
@@ -543,107 +543,109 @@ class EntityCategories(object):
pass
-class Assertion(dict):
- """ Handles assertions about subjects """
-
- def __init__(self, dic=None):
- dict.__init__(self, dic)
- self.acs = []
-
- @staticmethod
- def _authn_context_decl(decl, authn_auth=None):
- """
- Construct the authn context with a authn context declaration
- :param decl: The authn context declaration
- :param authn_auth: Authenticating Authority
- :return: An AuthnContext instance
- """
+def _authn_context_class_ref(authn_class, authn_auth=None):
+ """
+ Construct the authn context with a authn context class reference
+ :param authn_class: The authn context class reference
+ :param authn_auth: Authenticating Authority
+ :return: An AuthnContext instance
+ """
+ cntx_class = factory(saml.AuthnContextClassRef, text=authn_class)
+ if authn_auth:
return factory(saml.AuthnContext,
- authn_context_decl=decl,
+ authn_context_class_ref=cntx_class,
authenticating_authority=factory(
saml.AuthenticatingAuthority, text=authn_auth))
-
- def _authn_context_decl_ref(self, decl_ref, authn_auth=None):
- """
- Construct the authn context with a authn context declaration reference
- :param decl_ref: The authn context declaration reference
- :param authn_auth: Authenticating Authority
- :return: An AuthnContext instance
- """
+ else:
return factory(saml.AuthnContext,
- authn_context_decl_ref=decl_ref,
- authenticating_authority=factory(
- saml.AuthenticatingAuthority, text=authn_auth))
+ authn_context_class_ref=cntx_class)
- @staticmethod
- def _authn_context_class_ref(authn_class, authn_auth=None):
- """
- Construct the authn context with a authn context class reference
- :param authn_class: The authn context class reference
- :param authn_auth: Authenticating Authority
- :return: An AuthnContext instance
- """
- cntx_class = factory(saml.AuthnContextClassRef, text=authn_class)
- if authn_auth:
- return factory(saml.AuthnContext,
- authn_context_class_ref=cntx_class,
- authenticating_authority=factory(
- saml.AuthenticatingAuthority, text=authn_auth))
- else:
- return factory(saml.AuthnContext,
- authn_context_class_ref=cntx_class)
-
- def _authn_statement(self, authn_class=None, authn_auth=None,
- authn_decl=None, authn_decl_ref=None, authn_instant="",
- subject_locality=""):
- """
- Construct the AuthnStatement
- :param authn_class: Authentication Context Class reference
- :param authn_auth: Authenticating Authority
- :param authn_decl: Authentication Context Declaration
- :param authn_decl_ref: Authentication Context Declaration reference
- :param authn_instant: When the Authentication was performed.
- Assumed to be seconds since the Epoch.
- :param subject_locality: Specifies the DNS domain name and IP address
- for the system from which the assertion subject was apparently
- authenticated.
- :return: An AuthnContext instance
- """
- if authn_instant:
- _instant = instant(time_stamp=authn_instant)
- else:
- _instant = instant()
-
- if authn_class:
- res = factory(
- saml.AuthnStatement,
- authn_instant=_instant,
- session_index=sid(),
- authn_context=self._authn_context_class_ref(
- authn_class, authn_auth))
- elif authn_decl:
- res = factory(
- saml.AuthnStatement,
- authn_instant=_instant,
- session_index=sid(),
- authn_context=self._authn_context_decl(authn_decl, authn_auth))
- elif authn_decl_ref:
- res = factory(
- saml.AuthnStatement,
- authn_instant=_instant,
- session_index=sid(),
- authn_context=self._authn_context_decl_ref(authn_decl_ref,
- authn_auth))
- else:
- res = factory(
- saml.AuthnStatement,
- authn_instant=_instant,
- session_index=sid())
- if subject_locality:
- res.subject_locality = saml.SubjectLocality(text=subject_locality)
+def _authn_context_decl(decl, authn_auth=None):
+ """
+ Construct the authn context with a authn context declaration
+ :param decl: The authn context declaration
+ :param authn_auth: Authenticating Authority
+ :return: An AuthnContext instance
+ """
+ return factory(saml.AuthnContext,
+ authn_context_decl=decl,
+ authenticating_authority=factory(
+ saml.AuthenticatingAuthority, text=authn_auth))
- return res
+
+def _authn_context_decl_ref(decl_ref, authn_auth=None):
+ """
+ Construct the authn context with a authn context declaration reference
+ :param decl_ref: The authn context declaration reference
+ :param authn_auth: Authenticating Authority
+ :return: An AuthnContext instance
+ """
+ return factory(saml.AuthnContext,
+ authn_context_decl_ref=decl_ref,
+ authenticating_authority=factory(
+ saml.AuthenticatingAuthority, text=authn_auth))
+
+
+def authn_statement(authn_class=None, authn_auth=None,
+ authn_decl=None, authn_decl_ref=None, authn_instant="",
+ subject_locality=""):
+ """
+ Construct the AuthnStatement
+ :param authn_class: Authentication Context Class reference
+ :param authn_auth: Authenticating Authority
+ :param authn_decl: Authentication Context Declaration
+ :param authn_decl_ref: Authentication Context Declaration reference
+ :param authn_instant: When the Authentication was performed.
+ Assumed to be seconds since the Epoch.
+ :param subject_locality: Specifies the DNS domain name and IP address
+ for the system from which the assertion subject was apparently
+ authenticated.
+ :return: An AuthnContext instance
+ """
+ if authn_instant:
+ _instant = instant(time_stamp=authn_instant)
+ else:
+ _instant = instant()
+
+ if authn_class:
+ res = factory(
+ saml.AuthnStatement,
+ authn_instant=_instant,
+ session_index=sid(),
+ authn_context=_authn_context_class_ref(
+ authn_class, authn_auth))
+ elif authn_decl:
+ res = factory(
+ saml.AuthnStatement,
+ authn_instant=_instant,
+ session_index=sid(),
+ authn_context=_authn_context_decl(authn_decl, authn_auth))
+ elif authn_decl_ref:
+ res = factory(
+ saml.AuthnStatement,
+ authn_instant=_instant,
+ session_index=sid(),
+ authn_context=_authn_context_decl_ref(authn_decl_ref,
+ authn_auth))
+ else:
+ res = factory(
+ saml.AuthnStatement,
+ authn_instant=_instant,
+ session_index=sid())
+
+ if subject_locality:
+ res.subject_locality = saml.SubjectLocality(text=subject_locality)
+
+ return res
+
+
+class Assertion(dict):
+ """ Handles assertions about subjects """
+
+ def __init__(self, dic=None):
+ dict.__init__(self, dic)
+ self.acs = []
def construct(self, sp_entity_id, in_response_to, consumer_url,
name_id, attrconvs, policy, issuer, authn_class=None,
@@ -695,10 +697,10 @@ class Assertion(dict):
conds = policy.conditions(sp_entity_id)
if authn_auth or authn_class or authn_decl or authn_decl_ref:
- _authn_statement = self._authn_statement(authn_class, authn_auth,
- authn_decl, authn_decl_ref,
- authn_instant,
- subject_locality)
+ _authn_statement = authn_statement(authn_class, authn_auth,
+ authn_decl, authn_decl_ref,
+ authn_instant,
+ subject_locality)
else:
_authn_statement = None
diff --git a/src/saml2/entity.py b/src/saml2/entity.py
index 96248730..db1a34f7 100644
--- a/src/saml2/entity.py
+++ b/src/saml2/entity.py
@@ -852,14 +852,6 @@ class Entity(HTTPBase):
xmlstr = self.unravel(xmlstr, binding, response_cls.msgtype)
origxml = xmlstr
- if outstanding_certs is not None:
- _response = samlp.any_response_from_string(xmlstr)
- if len(_response.encrypted_assertion) > 0:
- _, cert_file = make_temp(
- "%s" % outstanding_certs[
- _response.in_response_to]["key"], decode=False)
- cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
- xmlstr = cbxs.decrypt(xmlstr, cert_file)
if not xmlstr: # Not a valid reponse
return None
@@ -878,18 +870,14 @@ class Entity(HTTPBase):
logger.debug("XMLSTR: %s" % xmlstr)
- if hasattr(response.response, 'encrypted_assertion'):
- for encrypted_assertion in response.response.encrypted_assertion:
- if encrypted_assertion.extension_elements is not None:
- assertion_list = extension_elements_to_elements(
- encrypted_assertion.extension_elements, [saml])
- for assertion in assertion_list:
- _assertion = saml.assertion_from_string(
- str(assertion))
- response.response.assertion.append(_assertion)
-
if response:
- response = response.verify()
+ if outstanding_certs is not None:
+ _, key_file = make_temp(
+ "%s" % outstanding_certs[
+ response.in_response_to]["key"], decode=False)
+ else:
+ key_file = ""
+ response = response.verify(key_file)
if not response:
return None
diff --git a/src/saml2/response.py b/src/saml2/response.py
index 8212584d..c197380b 100644
--- a/src/saml2/response.py
+++ b/src/saml2/response.py
@@ -42,8 +42,8 @@ import xmldsig as ds
import xmlenc as xenc
from saml2 import samlp
+from saml2 import class_name
from saml2 import saml
-from saml2 import extension_element_to_element
from saml2 import extension_elements_to_elements
from saml2 import SAMLError
from saml2 import time_util
@@ -387,7 +387,7 @@ class StatusResponse(object):
if self.asynchop:
if self.response.destination and \
- self.response.destination not in self.return_addrs:
+ self.response.destination not in self.return_addrs:
logger.error("%s not in %s" % (self.response.destination,
self.return_addrs))
return None
@@ -399,7 +399,7 @@ class StatusResponse(object):
def loads(self, xmldata, decode=True, origxml=None):
return self._loads(xmldata, decode, origxml)
- def verify(self):
+ def verify(self, key_file=""):
try:
return self._verify()
except AssertionError:
@@ -473,6 +473,7 @@ class AuthnResponse(StatusResponse):
self.came_from = ""
self.ava = None
self.assertion = None
+ self.assertions = []
self.session_not_on_or_after = 0
self.allow_unsolicited = allow_unsolicited
self.require_signature = want_assertions_signed
@@ -739,8 +740,13 @@ class AuthnResponse(StatusResponse):
return self.name_id
def _assertion(self, assertion):
- self.assertion = assertion
+ """
+ Check the assertion
+ :param assertion:
+ :return: True/False depending on if the assertion is sane or not
+ """
+ self.assertion = assertion
logger.debug("assertion context: %s" % (self.context,))
logger.debug("assertion keys: %s" % (assertion.keyswv()))
logger.debug("outstanding_queries: %s" % (self.outstanding_queries,))
@@ -773,54 +779,61 @@ class AuthnResponse(StatusResponse):
logger.exception("get subject")
raise
- def _encrypted_assertion(self, xmlstr):
- if xmlstr.encrypted_data:
- assertion_str = self.sec.decrypt(xmlstr.encrypted_data.to_string())
- if not assertion_str:
- raise DecryptionFailed()
- assertion = saml.assertion_from_string(assertion_str)
- else:
- decrypt_xml = self.sec.decrypt(xmlstr)
-
- logger.debug("Decryption successfull")
-
- self.response = samlp.response_from_string(decrypt_xml)
- logger.debug("Parsed decrypted assertion successfull")
-
- enc = self.response.encrypted_assertion[0].extension_elements[0]
- assertion = extension_element_to_element(
- enc, saml.ELEMENT_FROM_STRING, namespace=saml.NAMESPACE)
-
- logger.debug("Decrypted Assertion: %s" % assertion)
- return self._assertion(assertion)
+ def decrypt_assertions(self, encrypted_assertions, key_file=""):
+ res = []
+ for encrypted_assertion in encrypted_assertions:
+ if encrypted_assertion.extension_elements:
+ assertions = extension_elements_to_elements(
+ encrypted_assertion.extension_elements, [saml, samlp])
+ for assertion in assertions:
+ if assertion.signature:
+ if not self.sec.verify_signature(
+ "%s" % assertion, key_file,
+ node_name=class_name(assertion)):
+ logger.error(
+ "Failed to verify signature on '%s'" % assertion)
+ raise SignatureError()
+ res.append(assertion)
+ return res
- def parse_assertion(self):
+ def parse_assertion(self, key_file=""):
if self.context == "AuthnQuery":
# can contain one or more assertions
pass
else: # This is a saml2int limitation
try:
assert len(self.response.assertion) == 1 or \
- len(self.response.encrypted_assertion) == 1
+ len(self.response.encrypted_assertion) == 1
except AssertionError:
raise Exception("No assertion part")
+ if self.response.encrypted_assertion:
+ logger.debug("***Encrypted assertion/-s***")
+ decr_text = self.sec.decrypt(self.xmlstr)
+ resp = samlp.response_from_string(decr_text)
+ res = self.decrypt_assertions(resp.encrypted_assertion, key_file)
+ if self.response.assertion:
+ self.response.assertion.extend(res)
+ else:
+ self.response.assertion = res
+ self.response.encrypted_assertion = []
+
if self.response.assertion:
- logger.debug("***Unencrypted response***")
+ logger.debug("***Unencrypted assertion***")
for assertion in self.response.assertion:
if not self._assertion(assertion):
return False
- return True
- else:
- logger.debug("***Encrypted response***")
- for assertion in self.response.encrypted_assertion:
- if not self._encrypted_assertion(assertion):
- return False
- return True
+ else:
+ self.assertions.append(assertion)
+ self.assertion = self.assertions[0]
- def verify(self):
+ return True
+
+ def verify(self, key_file):
""" Verify that the assertion is syntactically correct and
- the signature is correct if present."""
+ the signature is correct if present.
+ :param key_file: If not the default key file should be used this is it.
+ """
try:
self._verify()
@@ -830,7 +843,7 @@ class AuthnResponse(StatusResponse):
if not isinstance(self.response, samlp.Response):
return self
- if self.parse_assertion():
+ if self.parse_assertion(key_file):
return self
else:
logger.error("Could not parse the assertion")
@@ -1056,7 +1069,7 @@ class AssertionIDResponse(object):
return self._postamble()
- def verify(self):
+ def verify(self, key_file=""):
try:
valid_instance(self.response)
except NotValid, exc:
diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
index 57fa5914..9db02bb4 100644
--- a/src/saml2/sigver.py
+++ b/src/saml2/sigver.py
@@ -35,12 +35,13 @@ from Crypto.PublicKey import RSA
from saml2.cert import OpenSSLWrapper
from saml2.extension import pefim
from saml2.saml import EncryptedAssertion
-from saml2.samlp import Response, response_from_string
+from saml2.samlp import Response
import xmldsig as ds
-import xmlenc as enc
-from saml2 import samlp, SAMLError, extension_elements_to_elements
+from saml2 import samlp
+from saml2 import SAMLError
+from saml2 import extension_elements_to_elements
from saml2 import class_name
from saml2 import saml
from saml2 import ExtensionElement
@@ -74,7 +75,8 @@ RSA_SHA1 = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
RSA_1_5 = "http://www.w3.org/2001/04/xmlenc#rsa-1_5"
TRIPLE_DES_CBC = "http://www.w3.org/2001/04/xmlenc#tripledes-cbc"
XMLTAG = "<?xml version='1.0'?>"
-PREFIX = "<?xml version='1.0' encoding='UTF-8'?>"
+PREFIX1 = "<?xml version='1.0' encoding='UTF-8'?>"
+PREFIX2 = '<?xml version="1.0" encoding="UTF-8"?>'
class SigverError(SAMLError):
@@ -125,8 +127,12 @@ def rm_xmltag(statement):
statement = statement[len(XMLTAG):]
if statement[0] == '\n':
statement = statement[1:]
- elif statement.startswith(PREFIX):
- statement = statement[len(PREFIX):]
+ elif statement.startswith(PREFIX1):
+ statement = statement[len(PREFIX1):]
+ if statement[0] == '\n':
+ statement = statement[1:]
+ elif statement.startswith(PREFIX2):
+ statement = statement[len(PREFIX2):]
if statement[0] == '\n':
statement = statement[1:]
@@ -693,7 +699,7 @@ class CryptoBackend():
def encrypt(self, text, recv_key, template, key_type):
raise NotImplementedError()
- def encrypt_assertion(self, statement, recv_key, key_type):
+ def encrypt_assertion(self, statement, recv_key, key_type, xpath=""):
raise NotImplementedError()
def decrypt(self, enctext, key_file):
@@ -733,12 +739,25 @@ class CryptoBackendXmlSec1(CryptoBackend):
except IndexError:
return ""
- def encrypt(self, text, recv_key, template, key_type):
+ def encrypt(self, text, recv_key, template, session_key_type, xpath=""):
+ """
+
+ :param text: The text to be compiled
+ :param recv_key: Filename of a file where the key resides
+ :param template: Filename of a file with the pre-encryption part
+ :param session_key_type: Type and size of a new session key
+ "des-192" generates a new 192 bits DES key for DES3 encryption
+ :param xpath: What should be encrypted
+ :return:
+ """
logger.debug("Encryption input len: %d" % len(text))
_, fil = make_temp("%s" % text, decode=False)
com_list = [self.xmlsec, "--encrypt", "--pubkey-cert-pem", recv_key,
- "--session-key", key_type, "--xml-data", fil]
+ "--session-key", session_key_type, "--xml-data", fil]
+
+ if xpath:
+ com_list.extend(['--node-xpath', xpath])
(_stdout, _stderr, output) = self._run_xmlsec(com_list, [template],
exception=DecryptError,
@@ -767,7 +786,8 @@ class CryptoBackendXmlSec1(CryptoBackend):
"--session-key", key_type, "--xml-data", fil,
"--node-xpath", ASSERT_XPATH]
- (_stdout, _stderr, output) = self._run_xmlsec(com_list, [tmpl], exception=EncryptError, validate_output=False)
+ (_stdout, _stderr, output) = self._run_xmlsec(
+ com_list, [tmpl], exception=EncryptError, validate_output=False)
os.unlink(fil)
if not output:
@@ -1206,7 +1226,7 @@ class SecurityContext(object):
:param text: Text to encrypt
:param recv_key: A file containing the receivers public key
- :param template: A file containing the XML document template
+ :param template: A file containing the XMLSEC template
:param key_type: The type of session key to use
:result: An encrypted XML text
"""
@@ -1262,8 +1282,7 @@ class SecurityContext(object):
return self.crypto.validate_signature(signedtext, cert_file=cert_file,
cert_type=cert_type,
node_name=node_name,
- node_id=node_id, id_attr=id_attr,
- )
+ node_id=node_id, id_attr=id_attr)
def _check_signature(self, decoded_xml, item, node_name=NODE_NAME,
origdoc=None, id_attr="", must=False,
@@ -1735,7 +1754,10 @@ def pre_encrypt_assertion(response):
assertion = response.assertion
response.assertion = None
response.encrypted_assertion = EncryptedAssertion()
- response.encrypted_assertion.add_extension_element(assertion)
+ if isinstance(assertion, list):
+ response.encrypted_assertion.add_extension_elements(assertion)
+ else:
+ response.encrypted_assertion.add_extension_element(assertion)
# txt = "%s" % response
# _ass = "%s" % assertion
# _ass = rm_xmltag(_ass)
diff --git a/tests/test_12_s_utils.py b/tests/test_12_s_utils.py
index 9d41d691..f4cfdda8 100644
--- a/tests/test_12_s_utils.py
+++ b/tests/test_12_s_utils.py
@@ -151,7 +151,7 @@ def test_audience():
assert aud_restr.audience.text == "urn:foo:bar"
def test_conditions():
- conditions = utils.factory( saml.Conditions,
+ conditions = utils.factory(saml.Conditions,
not_before="2009-10-30T07:58:10.852Z",
not_on_or_after="2009-10-30T08:03:10.852Z",
audience_restriction=[utils.factory(saml.AudienceRestriction,
diff --git a/tests/test_40_sigver.py b/tests/test_40_sigver.py
index 89a85408..3ce200e2 100644
--- a/tests/test_40_sigver.py
+++ b/tests/test_40_sigver.py
@@ -1,11 +1,12 @@
#!/usr/bin/env python
import base64
+from saml2.sigver import pre_encryption_part, make_temp
from saml2.mdstore import MetadataStore
-from saml2.saml import assertion_from_string
+from saml2.saml import assertion_from_string, EncryptedAssertion
from saml2.samlp import response_from_string
-from saml2 import sigver
+from saml2 import sigver, extension_elements_to_elements
from saml2 import class_name
from saml2 import time_util
from saml2 import saml, samlp
@@ -25,9 +26,10 @@ PUB_KEY = full_path("test.pem")
PRIV_KEY = full_path("test.key")
-def _eq(l1,l2):
+def _eq(l1, l2):
return set(l1) == set(l2)
+
CERT1 = """MIICsDCCAhmgAwIBAgIJAJrzqSSwmDY9MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMDkxMDA2MTk0OTQxWhcNMDkxMTA1MTk0OTQxWjBF
@@ -60,7 +62,7 @@ wKe3ACFXBvqGQN0IbcH49hu0FKhYFM/GPDJcIHFBsiyMBXChpye9vBaTNEBCtU3K
jjyG0hRT2mAQ9h+bkPmOvlEo/aH0xR68Z9hw4PF13w=="""
from pyasn1.codec.der import decoder
-
+
def test_cert_from_instance_1():
xml_response = open(SIGNED).read()
@@ -104,7 +106,6 @@ class FakeConfig():
class TestSecurity():
-
def setup_class(self):
# This would be one way to initialize the security context :
#
@@ -126,8 +127,8 @@ class TestSecurity():
issue_instant="2009-10-30T13:20:28Z",
signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
attribute_statement=do_attribute_statement({
- ("", "", "surName"): ("Foo", ""),
- ("", "", "givenName"): ("Bar", ""),
+ ("", "", "surName"): ("Foo", ""),
+ ("", "", "givenName"): ("Bar", ""),
})
)
@@ -144,7 +145,7 @@ class TestSecurity():
def test_non_verify_2(self):
xml_response = open(FALSE_SIGNED).read()
- raises(sigver.SignatureError,self.sec.correctly_signed_response,
+ raises(sigver.SignatureError, self.sec.correctly_signed_response,
xml_response)
def test_sign_assertion(self):
@@ -154,8 +155,8 @@ class TestSecurity():
#print sign_ass
sass = saml.assertion_from_string(sign_ass)
#print sass
- assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
- 'version', 'signature', 'id'])
+ assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
+ 'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11111"
assert time_util.str_to_time(sass.issue_instant)
@@ -227,19 +228,21 @@ class TestSecurity():
def test_sign_response(self):
response = factory(samlp.Response,
- assertion=self._assertion,
- id="22222",
- signature=sigver.pre_signature_part("22222", self.sec.my_cert))
+ assertion=self._assertion,
+ id="22222",
+ signature=sigver.pre_signature_part("22222",
+ self.sec
+ .my_cert))
to_sign = [(class_name(self._assertion), self._assertion.id),
- (class_name(response), response.id)]
+ (class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
-
+
assert s_response is not None
print s_response
response = response_from_string(s_response)
sass = response.assertion[0]
-
+
print sass
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
'version', 'signature', 'id'])
@@ -252,23 +255,27 @@ class TestSecurity():
assert item.id == "22222"
def test_sign_response_2(self):
- assertion2 = factory( saml.Assertion,
- version= "2.0",
- id= "11122",
- issue_instant= "2009-10-30T13:20:28Z",
- signature= sigver.pre_signature_part("11122", self.sec.my_cert),
- attribute_statement=do_attribute_statement({
- ("","","surName"): ("Fox",""),
- ("","","givenName") :("Bear",""),
- })
- )
+ assertion2 = factory(saml.Assertion,
+ version="2.0",
+ id="11122",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11122",
+ self.sec
+ .my_cert),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Fox", ""),
+ ("", "", "givenName"): ("Bear", ""),
+ })
+ )
response = factory(samlp.Response,
- assertion=assertion2,
- id="22233",
- signature=sigver.pre_signature_part("22233", self.sec.my_cert))
+ assertion=assertion2,
+ id="22233",
+ signature=sigver.pre_signature_part("22233",
+ self.sec
+ .my_cert))
to_sign = [(class_name(assertion2), assertion2.id),
- (class_name(response), response.id)]
+ (class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
@@ -277,7 +284,7 @@ class TestSecurity():
sass = response2.assertion[0]
assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
- 'version', 'signature', 'id'])
+ 'version', 'signature', 'id'])
assert sass.version == "2.0"
assert sass.id == "11122"
@@ -285,105 +292,115 @@ class TestSecurity():
s_response)
assert isinstance(item, samlp.Response)
-
- def test_sign_verify(self):
+
+ def test_sign_verify(self):
response = factory(samlp.Response,
- assertion=self._assertion,
- id="22233",
- signature=sigver.pre_signature_part("22233", self.sec.my_cert))
+ assertion=self._assertion,
+ id="22233",
+ signature=sigver.pre_signature_part("22233",
+ self.sec
+ .my_cert))
to_sign = [(class_name(self._assertion), self._assertion.id),
- (class_name(response), response.id)]
+ (class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
print s_response
- res = self.sec.verify_signature("%s" % s_response,
- node_name=class_name(samlp.Response()))
+ res = self.sec.verify_signature("%s" % s_response,
+ node_name=class_name(samlp.Response()))
- print res
+ print res
assert res
def test_sign_verify_with_cert_from_instance(self):
response = factory(samlp.Response,
- assertion=self._assertion,
- id="22222",
- signature=sigver.pre_signature_part("22222", self.sec.my_cert))
+ assertion=self._assertion,
+ id="22222",
+ signature=sigver.pre_signature_part("22222",
+ self.sec
+ .my_cert))
to_sign = [(class_name(self._assertion), self._assertion.id),
- (class_name(response), response.id)]
+ (class_name(response), response.id)]
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
response2 = response_from_string(s_response)
ci = "".join(sigver.cert_from_instance(response2)[0].split())
-
assert ci == self.sec.my_cert
-
- res = self.sec.verify_signature("%s" % s_response,
- node_name=class_name(samlp.Response()))
+
+ res = self.sec.verify_signature("%s" % s_response,
+ node_name=class_name(samlp.Response()))
assert res
res = self.sec._check_signature(s_response, response2,
class_name(response2), s_response)
assert res == response2
-
+
def test_sign_verify_assertion_with_cert_from_instance(self):
- assertion = factory( saml.Assertion,
- version= "2.0",
- id= "11100",
- issue_instant= "2009-10-30T13:20:28Z",
- signature= sigver.pre_signature_part("11100", self.sec.my_cert),
- attribute_statement=do_attribute_statement({
- ("","","surName"): ("Fox",""),
- ("","","givenName") :("Bear",""),
- })
- )
+ assertion = factory(saml.Assertion,
+ version="2.0",
+ id="11100",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11100",
+ self.sec
+ .my_cert),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Fox", ""),
+ ("", "", "givenName"): ("Bear", ""),
+ })
+ )
to_sign = [(class_name(assertion), assertion.id)]
- s_assertion = sigver.signed_instance_factory(assertion, self.sec, to_sign)
+ s_assertion = sigver.signed_instance_factory(assertion, self.sec,
+ to_sign)
print s_assertion
ass = assertion_from_string(s_assertion)
ci = "".join(sigver.cert_from_instance(ass)[0].split())
assert ci == self.sec.my_cert
-
+
res = self.sec.verify_signature("%s" % s_assertion,
node_name=class_name(ass))
- assert res
-
+ assert res
+
res = self.sec._check_signature(s_assertion, ass, class_name(ass))
-
+
assert res
def test_exception_sign_verify_with_cert_from_instance(self):
- assertion = factory( saml.Assertion,
- version= "2.0",
- id= "11100",
- issue_instant= "2009-10-30T13:20:28Z",
- #signature= sigver.pre_signature_part("11100", self.sec.my_cert),
- attribute_statement=do_attribute_statement({
- ("","","surName"): ("Foo",""),
- ("","","givenName") :("Bar",""),
- })
- )
+ assertion = factory(saml.Assertion,
+ version="2.0",
+ id="11100",
+ issue_instant="2009-10-30T13:20:28Z",
+ #signature= sigver.pre_signature_part("11100",
+ # self.sec.my_cert),
+ attribute_statement=do_attribute_statement({
+ ("", "", "surName"): ("Foo", ""),
+ ("", "", "givenName"): ("Bar", ""),
+ })
+ )
response = factory(samlp.Response,
- assertion=assertion,
- id="22222",
- signature=sigver.pre_signature_part("22222", self.sec.my_cert))
+ assertion=assertion,
+ id="22222",
+ signature=sigver.pre_signature_part("22222",
+ self.sec
+ .my_cert))
to_sign = [(class_name(response), response.id)]
-
+
s_response = sigver.signed_instance_factory(response, self.sec, to_sign)
response2 = response_from_string(s_response)
# Change something that should make everything fail
response2.id = "23456"
raises(sigver.SignatureError, self.sec._check_signature,
- s_response, response2, class_name(response2))
+ s_response, response2, class_name(response2))
+
class TestSecurityMetadata():
@@ -397,36 +414,71 @@ class TestSecurityMetadata():
conf.only_use_keys_in_metadata = False
self.sec = sigver.security_context(conf)
- self._assertion = factory( saml.Assertion,
- version="2.0",
- id="11111",
- issue_instant="2009-10-30T13:20:28Z",
- signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
- attribute_statement=do_attribute_statement({
- ("","","surName"): ("Foo",""),
- ("","","givenName") :("Bar",""),
- })
+ assertion = factory(
+ saml.Assertion, version="2.0", id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", self.sec.my_cert, 1),
+ attribute_statement=do_attribute_statement(
+ {("", "", "surName"): ("Foo", ""),
+ ("", "", "givenName"): ("Bar", ""), })
)
- def test_sign_assertion(self):
- ass = self._assertion
- print ass
- sign_ass = self.sec.sign_assertion("%s" % ass, node_id=ass.id)
- #print sign_ass
- sass = saml.assertion_from_string(sign_ass)
- #print sass
- assert _eq(sass.keyswv(), ['attribute_statement', 'issue_instant',
- 'version', 'signature', 'id'])
- assert sass.version == "2.0"
- assert sass.id == "11111"
- assert time_util.str_to_time(sass.issue_instant)
- print "Crypto version : %s" % (self.sec.crypto.version())
+def test_xbox():
+ conf = config.SPConfig()
+ conf.load_file("server_conf")
+ md = MetadataStore([saml, samlp], None, conf)
+ md.load("local", full_path("idp_example.xml"))
- item = self.sec.check_signature(sass, class_name(sass), sign_ass)
+ conf.metadata = md
+ conf.only_use_keys_in_metadata = False
+ sec = sigver.security_context(conf)
+
+ assertion = factory(
+ saml.Assertion, version="2.0", id="11111",
+ issue_instant="2009-10-30T13:20:28Z",
+ signature=sigver.pre_signature_part("11111", sec.my_cert, 1),
+ attribute_statement=do_attribute_statement(
+ {("", "", "surName"): ("Foo", ""),
+ ("", "", "givenName"): ("Bar", ""), })
+ )
+
+ sigass = sec.sign_statement(assertion, class_name(assertion),
+ key_file="pki/mykey.pem", node_id=assertion.id)
+
+ _ass0 = saml.assertion_from_string(sigass)
+
+ encrypted_assertion = EncryptedAssertion()
+ encrypted_assertion.add_extension_element(_ass0)
+
+ _, pre = make_temp("%s" % pre_encryption_part(), decode=False)
+ enctext = sec.crypto.encrypt(
+ "%s" % encrypted_assertion, conf.cert_file, pre, "des-192",
+ '/*[local-name()="EncryptedAssertion"]/*[local-name()="Assertion"]')
+
+
+ decr_text = sec.decrypt(enctext)
+ _seass = saml.encrypted_assertion_from_string(decr_text)
+ assertions = []
+ assers = extension_elements_to_elements(_seass.extension_elements,
+ [saml, samlp])
+
+ sign_cert_file = "pki/mycert.pem"
+
+ for ass in assers:
+ _ass = "%s" % ass
+ #_ass = _ass.replace('xsi:nil="true" ', '')
+ #assert sigass == _ass
+ _txt = sec.verify_signature(_ass, sign_cert_file,
+ node_name=class_name(assertion))
+ if _txt:
+ assertions.append(ass)
+
+ print assertions
- assert isinstance(item, saml.Assertion)
if __name__ == "__main__":
- t = TestSecurity()
- t.setup_class()
+ #t = TestSecurity()
+ #t.setup_class()
+ #t.test_sign_then_encrypt_assertion()
+ test_xbox() \ No newline at end of file
diff --git a/tests/test_41_response.py b/tests/test_41_response.py
index ef8d4d32..9ac3bae1 100644
--- a/tests/test_41_response.py
+++ b/tests/test_41_response.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-from saml2 import saml
from saml2 import config
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
@@ -9,7 +8,8 @@ from saml2.server import Server
from saml2.response import response_factory
from saml2.response import StatusResponse
from saml2.response import AuthnResponse
-from saml2.sigver import security_context, MissingKey
+from saml2.sigver import security_context
+from saml2.sigver import MissingKey
from pytest import raises
@@ -26,7 +26,6 @@ IDENTITY = {"eduPersonAffiliation": ["staff", "member"],
"mail": ["foo@gmail.com"],
"title": ["shortstop"]}
-
AUTHN = {
"class_ref": INTERNETPROTOCOLPASSWORD,
"authn_auth": "http://www.example.com/login"
@@ -39,29 +38,28 @@ class TestResponse:
name_id = server.ident.transient_nameid(
"urn:mace:example.com:saml:roland:sp", "id12")
- self._resp_ = server.create_authn_response(IDENTITY,
- "id12", # in_response_to
- "http://lingon.catalogix.se:8087/",
-
- # consumer_url
- "urn:mace:example"
- ".com:saml:roland:sp",
- # sp_entity_id
- name_id=name_id)
+ self._resp_ = server.create_authn_response(
+ IDENTITY,
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/",
+ # consumer_url
+ "urn:mace:example.com:saml:roland:sp",
+ # sp_entity_id
+ name_id=name_id)
self._sign_resp_ = server.create_authn_response(
IDENTITY,
"id12", # in_response_to
- "http://lingon.catalogix.se:8087/", # consumer_url
- "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_assertion=True)
self._resp_authn = server.create_authn_response(
IDENTITY,
- "id12", # in_response_to
- "http://lingon.catalogix.se:8087/", # consumer_url
- "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
authn=AUTHN)
@@ -72,7 +70,8 @@ class TestResponse:
def test_1(self):
xml_response = ("%s" % (self._resp_,))
resp = response_factory(xml_response, self.conf,
- return_addrs=["http://lingon.catalogix.se:8087/"],
+ return_addrs=[
+ "http://lingon.catalogix.se:8087/"],
outstanding_queries={
"id12": "http://localhost:8088/sso"},
timeslack=10000, decode=False)
@@ -83,7 +82,8 @@ class TestResponse:
def test_2(self):
xml_response = self._sign_resp_
resp = response_factory(xml_response, self.conf,
- return_addrs=["http://lingon.catalogix.se:8087/"],
+ return_addrs=[
+ "http://lingon.catalogix.se:8087/"],
outstanding_queries={
"id12": "http://localhost:8088/sso"},
timeslack=10000, decode=False)
@@ -92,14 +92,15 @@ class TestResponse:
assert isinstance(resp, AuthnResponse)
- def test_only_use_keys_in_metadata(self):
- conf = config.SPConfig()
- conf.load_file("sp_2_conf")
+def test_only_use_keys_in_metadata(self):
+ conf = config.SPConfig()
+ conf.load_file("sp_2_conf")
+
+ sc = security_context(conf)
+ # should fail
+ raises(MissingKey,
+ 'sc.correctly_signed_response("%s" % self._sign_resp_)')
- sc = security_context(conf)
- # should fail
- raises(MissingKey,
- 'sc.correctly_signed_response("%s" % self._sign_resp_)')
if __name__ == "__main__":
t = TestResponse()
diff --git a/tests/test_50_server.py b/tests/test_50_server.py
index 05544676..f8678c2a 100644
--- a/tests/test_50_server.py
+++ b/tests/test_50_server.py
@@ -2,18 +2,25 @@
# -*- coding: utf-8 -*-
import base64
from urlparse import parse_qs
+from saml2.sigver import pre_encryption_part
from saml2.assertion import Policy
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.saml import NameID, NAMEID_FORMAT_TRANSIENT
from saml2.samlp import response_from_string
from saml2.server import Server
-from saml2 import samlp, saml, client, config
+from saml2 import samlp
+from saml2 import saml
+from saml2 import client
+from saml2 import config
+from saml2 import class_name
+from saml2 import extension_elements_to_elements
from saml2 import s_utils
from saml2 import sigver
from saml2 import time_util
from saml2.s_utils import OtherError
-from saml2.s_utils import do_attribute_statement, factory
+from saml2.s_utils import do_attribute_statement
+from saml2.s_utils import factory
from saml2.soap import make_soap_enveloped_saml_thingy
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_HTTP_REDIRECT
@@ -182,7 +189,8 @@ class TestServer1():
name_id_policy = resp_args["name_id_policy"]
assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
- assert resp_args["sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
+ assert resp_args[
+ "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
def test_sso_response_with_identity(self):
name_id = self.server.ident.transient_nameid(
@@ -195,8 +203,8 @@ class TestServer1():
"mail": "derek.jeter@nyy.mlb.com",
"title": "The man"
},
- "id12", # in_response_to
- "http://localhost:8087/", # destination
+ "id12", # in_response_to
+ "http://localhost:8087/", # destination
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
authn=AUTHN
@@ -227,7 +235,8 @@ class TestServer1():
break
assert len(attr.attribute_value) == 1
assert attr.name == "urn:oid:1.3.6.1.4.1.5923.1.1.1.7"
- assert attr.name_format == "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"
+ assert attr.name_format == "urn:oasis:names:tc:SAML:2" \
+ ".0:attrname-format:uri"
value = attr.attribute_value[0]
assert value.text.strip() == "Short stop"
assert value.get_type() == "xs:string"
@@ -242,13 +251,13 @@ class TestServer1():
def test_sso_response_without_identity(self):
resp = self.server.create_authn_response(
{},
- "id12", # in_response_to
- "http://localhost:8087/", # consumer_url
- "urn:mace:example.com:saml:roland:sp", # sp_entity_id
- userid="USER1",
- authn=AUTHN,
- release_policy=Policy(),
- best_effort=True
+ "id12", # in_response_to
+ "http://localhost:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ userid="USER1",
+ authn=AUTHN,
+ release_policy=Policy(),
+ best_effort=True
)
print resp.keyswv()
@@ -268,12 +277,12 @@ class TestServer1():
resp = self.server.create_authn_response(
{},
- "id12", # in_response_to
- "http://localhost:8087/", # consumer_url
- "urn:mace:example.com:saml:roland:sp", # sp_entity_id
- userid="USER1",
- authn=_authn,
- best_effort=True
+ "id12", # in_response_to
+ "http://localhost:8087/", # consumer_url
+ "urn:mace:example.com:saml:roland:sp", # sp_entity_id
+ userid="USER1",
+ authn=_authn,
+ best_effort=True
)
print resp.keyswv()
@@ -297,11 +306,11 @@ class TestServer1():
print resp.status
assert resp.status.status_code.value == samlp.STATUS_RESPONDER
assert resp.status.status_code.status_code.value == \
- samlp.STATUS_REQUEST_UNSUPPORTED
+ samlp.STATUS_REQUEST_UNSUPPORTED
assert resp.status.status_message.text == \
- "eduPersonAffiliation missing"
+ "eduPersonAffiliation missing"
assert resp.issuer.text == "urn:mace:example.com:saml:roland:idp"
- assert not resp.assertion
+ assert not resp.assertion
def test_authn_response_0(self):
self.server = Server("idp_conf")
@@ -346,8 +355,8 @@ class TestServer1():
signed_resp = self.server.create_authn_response(
ava,
- "id12", # in_response_to
- "http://lingon.catalogix.se:8087/", # consumer_url
+ "id12", # in_response_to
+ "http://lingon.catalogix.se:8087/", # consumer_url
"urn:mace:example.com:saml:roland:sp", # sp_entity_id
name_id=name_id,
sign_assertion=True
@@ -480,7 +489,6 @@ def _logout_request(conf_file):
class TestServerLogout():
-
def test_1(self):
server = Server("idp_slo_redirect_conf")
req_id, request = _logout_request("sp_slo_redirect_conf")
@@ -502,4 +510,3 @@ class TestServerLogout():
if __name__ == "__main__":
ts = TestServer1()
ts.setup_class()
- ts.test_sso_response_specific_instant()
diff --git a/tests/test_51_client.py b/tests/test_51_client.py
index a808fca6..3eedcf9d 100644
--- a/tests/test_51_client.py
+++ b/tests/test_51_client.py
@@ -4,21 +4,32 @@
import base64
import urllib
import urlparse
-from saml2.authn_context import INTERNETPROTOCOLPASSWORD
-from saml2.response import LogoutResponse
+from saml2 import BINDING_HTTP_POST
+from saml2 import BINDING_HTTP_REDIRECT
+from saml2 import config
+from saml2 import class_name
+from saml2 import extension_elements_to_elements
+from saml2 import saml
+from saml2 import samlp
+from saml2 import sigver
+from saml2 import s_utils
+from saml2.assertion import Assertion
+from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
-from saml2 import samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
-from saml2 import saml, config, class_name
from saml2.config import SPConfig
+from saml2.response import LogoutResponse
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
from saml2.server import Server
+from saml2.sigver import pre_encryption_part
+from saml2.s_utils import do_attribute_statement
+from saml2.s_utils import factory
from saml2.time_util import in_a_while
-from py.test import raises
-from fakeIDP import FakeIDP, unpack_form
+from fakeIDP import FakeIDP
+from fakeIDP import unpack_form
AUTHN = {
@@ -341,8 +352,123 @@ class TestClient:
print my_name
assert my_name == "urn:mace:example.com:saml:roland:sp"
-# Below can only be done with dummy Server
+ def test_sign_then_encrypt_assertion(self):
+ # Begin with the IdPs side
+ _sec = self.server.sec
+
+ assertion = s_utils.assertion_factory(
+ subject=factory(saml.Subject, text="_aaa",
+ name_id=factory(
+ saml.NameID,
+ format=saml.NAMEID_FORMAT_TRANSIENT)),
+ attribute_statement=do_attribute_statement(
+ {
+ ("", "", "surName"): ("Jeter", ""),
+ ("", "", "givenName"): ("Derek", ""),
+ }
+ ),
+ issuer=self.server._issuer(),
+ )
+
+ assertion.signature = sigver.pre_signature_part(
+ assertion.id, _sec.my_cert, 1)
+
+ sigass = _sec.sign_statement(assertion, class_name(assertion),
+ key_file="pki/mykey.pem",
+ node_id=assertion.id)
+ # Create an Assertion instance from the signed assertion
+ _ass = saml.assertion_from_string(sigass)
+
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="https:#www.example.com",
+ status=s_utils.success_status_factory(),
+ issuer=self.server._issuer(),
+ assertion=_ass
+ )
+
+ enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
+ pre_encryption_part())
+
+ seresp = samlp.response_from_string(enctext)
+
+ # Now over to the client side
+ _csec = self.client.sec
+ if seresp.encrypted_assertion:
+ decr_text = _csec.decrypt(enctext)
+ seresp = samlp.response_from_string(decr_text)
+ resp_ass = []
+
+ sign_cert_file = "pki/mycert.pem"
+ for enc_ass in seresp.encrypted_assertion:
+ assers = extension_elements_to_elements(
+ enc_ass.extension_elements, [saml, samlp])
+ for ass in assers:
+ if ass.signature:
+ if not _csec.verify_signature("%s" % ass,
+ sign_cert_file,
+ node_name=class_name(ass)):
+ continue
+ resp_ass.append(ass)
+
+ seresp.assertion = resp_ass
+ seresp.encrypted_assertion = None
+ #print _sresp
+
+ assert seresp.assertion
+
+ def test_sign_then_encrypt_assertion2(self):
+ # Begin with the IdPs side
+ _sec = self.server.sec
+
+ nameid_policy = samlp.NameIDPolicy(allow_create="false",
+ format=saml.NAMEID_FORMAT_PERSISTENT)
+
+ asser = Assertion({"givenName": "Derek", "surName": "Jeter"})
+ assertion = asser.construct(
+ self.client.config.entityid, "_012345",
+ "http://lingon.catalogix.se:8087/",
+ factory(saml.NameID, format=saml.NAMEID_FORMAT_TRANSIENT),
+ policy=self.server.config.getattr("policy", "idp"),
+ issuer=self.server._issuer(),
+ attrconvs=self.server.config.attribute_converters,
+ authn_class=INTERNETPROTOCOLPASSWORD,
+ authn_auth="http://www.example.com/login")
+
+ assertion.signature = sigver.pre_signature_part(
+ assertion.id, _sec.my_cert, 1)
+
+ sigass = _sec.sign_statement(assertion, class_name(assertion),
+ #key_file="pki/mykey.pem",
+ key_file="test.key",
+ node_id=assertion.id)
+ # Create an Assertion instance from the signed assertion
+ _ass = saml.assertion_from_string(sigass)
+
+ response = sigver.response_factory(
+ in_response_to="_012345",
+ destination="https://www.example.com",
+ status=s_utils.success_status_factory(),
+ issuer=self.server._issuer(),
+ assertion=_ass
+ )
+
+ enctext = _sec.crypto.encrypt_assertion(response, _sec.cert_file,
+ pre_encryption_part())
+
+ #seresp = samlp.response_from_string(enctext)
+
+ resp_str = base64.encodestring(enctext)
+ # Now over to the client side
+ resp = self.client.parse_authn_request_response(
+ resp_str, BINDING_HTTP_POST,
+ {"_012345": "http://foo.example.com/service"})
+
+ #assert resp.encrypted_assertion == []
+ assert resp.assertion
+ assert resp.ava == {'givenName': ['Derek'], 'sn': ['Jeter']}
+# Below can only be done with dummy Server
IDP = "urn:mace:example.com:saml:roland:idp"
@@ -448,4 +574,4 @@ class TestClientWithDummy():
if __name__ == "__main__":
tc = TestClient()
tc.setup_class()
- tc.test_sign_auth_request_0()
+ tc.test_sign_then_encrypt_assertion2()
diff --git a/tests/test_60_sp.py b/tests/test_60_sp.py
index 76c2b551..cb07c65e 100644
--- a/tests/test_60_sp.py
+++ b/tests/test_60_sp.py
@@ -73,6 +73,7 @@ class TestSP():
'sn': ['Jeter'],
'title': ['The man']}
+
if __name__ == "__main__":
_sp = TestSP()
_sp.setup_class()
diff --git a/tests/test_81_certificates.py b/tests/test_81_certificates.py
index 88a43120..714a5c48 100644
--- a/tests/test_81_certificates.py
+++ b/tests/test_81_certificates.py
@@ -8,7 +8,6 @@ from saml2.cert import OpenSSLWrapper
class TestGenerateCertificates(unittest.TestCase):
-
def test_validate_with_root_cert(self):
cert_info_ca = {
@@ -33,23 +32,35 @@ class TestGenerateCertificates(unittest.TestCase):
ca_cert, ca_key = osw.create_certificate(cert_info_ca, request=False,
write_to_file=True,
- cert_dir=os.path.dirname(os.path.abspath(__file__)) + "/pki")
+ cert_dir=os.path.dirname(
+ os.path.abspath(
+ __file__)) + "/pki")
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
ca_cert_str = osw.read_str_from_file(ca_cert)
ca_key_str = osw.read_str_from_file(ca_key)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertTrue(valid)
- false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca, request=False, write_to_file=False)
- false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(cert_info_ca, request=True)
- false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_1)
- false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(cert_info, request=True)
- false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert, false_ca_key, false_req_cert_str_2)
+ false_ca_cert, false_ca_key = osw.create_certificate(cert_info_ca,
+ request=False,
+ write_to_file=False)
+ false_req_cert_str_1, false_req_key_str_1 = osw.create_certificate(
+ cert_info_ca, request=True)
+ false_cert_str_1 = osw.create_cert_signed_certificate(false_ca_cert,
+ false_ca_key,
+ false_req_cert_str_1)
+ false_req_cert_str_2, false_req_key_str_2 = osw.create_certificate(
+ cert_info, request=True)
+ false_cert_str_2 = osw.create_cert_signed_certificate(false_ca_cert,
+ false_ca_key,
+ false_req_cert_str_2)
valid, mess = osw.verify(false_ca_cert, cert_str)
self.assertFalse(valid)
@@ -106,20 +117,28 @@ class TestGenerateCertificates(unittest.TestCase):
osw = OpenSSLWrapper()
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
+ request=False)
- req_cert_str, intermediate_1_key_str = osw.create_certificate(cert_intermediate_1_info, request=True)
- intermediate_cert_1_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ req_cert_str, intermediate_1_key_str = osw.create_certificate(
+ cert_intermediate_1_info, request=True)
+ intermediate_cert_1_str = osw.create_cert_signed_certificate(
+ ca_cert_str, ca_key_str, req_cert_str)
- req_cert_str, intermediate_2_key_str = osw.create_certificate(cert_intermediate_2_info, request=True)
- intermediate_cert_2_str = osw.create_cert_signed_certificate(intermediate_cert_1_str, intermediate_1_key_str,
- req_cert_str)
+ req_cert_str, intermediate_2_key_str = osw.create_certificate(
+ cert_intermediate_2_info, request=True)
+ intermediate_cert_2_str = osw.create_cert_signed_certificate(
+ intermediate_cert_1_str, intermediate_1_key_str,
+ req_cert_str)
- req_cert_str, client_key_str = osw.create_certificate(cert_client_cert_info, request=True)
- client_cert_str = osw.create_cert_signed_certificate(intermediate_cert_2_str, intermediate_2_key_str,
- req_cert_str)
+ req_cert_str, client_key_str = osw.create_certificate(
+ cert_client_cert_info, request=True)
+ client_cert_str = osw.create_cert_signed_certificate(
+ intermediate_cert_2_str, intermediate_2_key_str,
+ req_cert_str)
- cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str, ca_cert_str]
+ cert_chain = [intermediate_cert_2_str, intermediate_cert_1_str,
+ ca_cert_str]
valid, mess = osw.verify_chain(cert_chain, client_cert_str)
self.assertTrue(valid)
@@ -145,21 +164,23 @@ class TestGenerateCertificates(unittest.TestCase):
"organization_unit": "asdfg"
}
-
osw = OpenSSLWrapper()
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False,
- cipher_passphrase=
- {"cipher": "blowfish", "passphrase": "qwerty"})
+ ca_cert_str, ca_key_str = osw.create_certificate(
+ cert_info_ca, request=False,
+ cipher_passphrase={"cipher": "blowfish", "passphrase": "qwerty"})
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
- passphrase="qwerty")
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str,
+ passphrase="qwerty")
valid = False
try:
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str,
- passphrase="qwertyqwerty")
+ cert_str = osw.create_cert_signed_certificate(
+ ca_cert_str, ca_key_str, req_cert_str,
+ passphrase="qwertyqwerty")
except Exception:
valid = True
@@ -185,39 +206,59 @@ class TestGenerateCertificates(unittest.TestCase):
"organization_unit": "asdfg"
}
-
osw = OpenSSLWrapper()
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
+ request=False)
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=1000, valid_to=100000)
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
+ request=False,
+ valid_from=1000,
+ valid_to=100000)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=1000,
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
+ request=False)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str,
+ valid_from=1000,
valid_to=100000)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False, valid_from=0, valid_to=1)
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str)
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
+ request=False,
+ valid_from=0,
+ valid_to=1)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str)
time.sleep(2)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)
- ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca, request=False)
- req_cert_str, req_key_str = osw.create_certificate(cert_info, request=True)
- cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str, req_cert_str, valid_from=0, valid_to=1)
+ ca_cert_str, ca_key_str = osw.create_certificate(cert_info_ca,
+ request=False)
+ req_cert_str, req_key_str = osw.create_certificate(cert_info,
+ request=True)
+ cert_str = osw.create_cert_signed_certificate(ca_cert_str, ca_key_str,
+ req_cert_str,
+ valid_from=0, valid_to=1)
time.sleep(2)
valid, mess = osw.verify(ca_cert_str, cert_str)
self.assertFalse(valid)