summaryrefslogtreecommitdiff
path: root/tests/test_75_mongodb.py
diff options
context:
space:
mode:
authorRoland Hedberg <roland.hedberg@adm.umu.se>2014-11-13 09:41:44 +0100
committerRoland Hedberg <roland.hedberg@adm.umu.se>2014-11-13 09:41:44 +0100
commit3ec5e13e31b4103898e0b8138984212975cbc193 (patch)
treee7ff222b69f5edea3cb7eb9476df12bcd2a4f21c /tests/test_75_mongodb.py
parent108a38655dc511675a21f8c9e42d640488e7537b (diff)
downloadpysaml2-3ec5e13e31b4103898e0b8138984212975cbc193.tar.gz
Tests will not fail if you don't have a MondoDB running.
Diffstat (limited to 'tests/test_75_mongodb.py')
-rw-r--r--tests/test_75_mongodb.py116
1 files changed, 63 insertions, 53 deletions
diff --git a/tests/test_75_mongodb.py b/tests/test_75_mongodb.py
index d61fef13..1dd02d86 100644
--- a/tests/test_75_mongodb.py
+++ b/tests/test_75_mongodb.py
@@ -1,4 +1,5 @@
from contextlib import closing
+from pymongo.errors import ConnectionFailure
from saml2 import BINDING_HTTP_POST
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
@@ -20,62 +21,71 @@ def _eq(l1, l2):
def test_flow():
sp = Saml2Client(config_file="servera_conf")
- with closing(Server(config_file="idp_conf_mdb")) as idp1:
- with closing(Server(config_file="idp_conf_mdb")) as idp2:
- # clean out database
- idp1.ident.mdb.db.drop()
-
- # -- dummy request ---
- req_id, orig_req = sp.create_authn_request(idp1.config.entityid)
-
- # == Create an AuthnRequest response
-
- rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])
-
- #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
- resp = idp1.create_authn_response({"eduPersonEntitlement": "Short stop",
- "surName": "Jeter",
- "givenName": "Derek",
- "mail": "derek.jeter@nyy.mlb.com",
- "title": "The man"},
- userid="jeter",
- authn=AUTHN,
- **rinfo)
-
- # What's stored away is the assertion
- a_info = idp2.session_db.get_assertion(resp.assertion.id)
- # Make sure what I got back from MongoDB is the same as I put in
- assert a_info["assertion"] == resp.assertion
-
- # By subject
- nid = resp.assertion.subject.name_id
- _assertion = idp2.session_db.get_assertions_by_subject(nid)
- assert len(_assertion) == 1
- assert _assertion[0] == resp.assertion
-
- nids = idp2.ident.find_nameid("jeter")
- assert len(nids) == 1
+ try:
+ with closing(Server(config_file="idp_conf_mdb")) as idp1:
+ with closing(Server(config_file="idp_conf_mdb")) as idp2:
+ # clean out database
+ idp1.ident.mdb.db.drop()
+
+ # -- dummy request ---
+ req_id, orig_req = sp.create_authn_request(idp1.config.entityid)
+
+ # == Create an AuthnRequest response
+
+ rinfo = idp1.response_args(orig_req, [BINDING_HTTP_POST])
+
+ #name_id = idp1.ident.transient_nameid("id12", rinfo["sp_entity_id"])
+ resp = idp1.create_authn_response(
+ {
+ "eduPersonEntitlement": "Short stop",
+ "surName": "Jeter",
+ "givenName": "Derek",
+ "mail": "derek.jeter@nyy.mlb.com",
+ "title": "The man"},
+ userid="jeter",
+ authn=AUTHN,
+ **rinfo)
+
+ # What's stored away is the assertion
+ a_info = idp2.session_db.get_assertion(resp.assertion.id)
+ # Make sure what I got back from MongoDB is the same as I put in
+ assert a_info["assertion"] == resp.assertion
+
+ # By subject
+ nid = resp.assertion.subject.name_id
+ _assertion = idp2.session_db.get_assertions_by_subject(nid)
+ assert len(_assertion) == 1
+ assert _assertion[0] == resp.assertion
+
+ nids = idp2.ident.find_nameid("jeter")
+ assert len(nids) == 1
+ except ConnectionFailure:
+ pass
def test_eptid_mongo_db():
- edb = EptidMDB("secret", "idp")
- e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
- "some other data")
- print e1
- assert e1.startswith("idp_entity_id!sp_entity_id!")
- e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
- "some other data")
- assert e1 == e2
-
- e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2",
- "some other data")
- print e3
- assert e1 != e3
-
- e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id",
- "some other data")
- assert e4 != e1
- assert e4 != e3
+ try:
+ edb = EptidMDB("secret", "idp")
+ except ConnectionFailure:
+ pass
+ else:
+ e1 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
+ "some other data")
+ print e1
+ assert e1.startswith("idp_entity_id!sp_entity_id!")
+ e2 = edb.get("idp_entity_id", "sp_entity_id", "user_id",
+ "some other data")
+ assert e1 == e2
+
+ e3 = edb.get("idp_entity_id", "sp_entity_id", "user_2",
+ "some other data")
+ print e3
+ assert e1 != e3
+
+ e4 = edb.get("idp_entity_id", "sp_entity_id2", "user_id",
+ "some other data")
+ assert e4 != e1
+ assert e4 != e3