summaryrefslogtreecommitdiff
path: root/tests/test_68_assertion_id.py
blob: 34792efc4a3d21ed04419d4c971190237c1f142e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from contextlib import closing
from urllib.parse import parse_qs
from urllib.parse import urlparse

from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
from saml2 import BINDING_URI
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import Assertion
from saml2.samlp import AuthnRequest
from saml2.samlp import NameIDPolicy
from saml2.server import Server


__author__ = "rolandh"

TAG1 = 'name="SAMLRequest" value='


AUTHN = {"class_ref": INTERNETPROTOCOLPASSWORD, "authn_auth": "http://www.example.com/login"}


def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_URI:
        if response:
            msg = hinfo["data"]
        else:
            msg = ""
            return parse_qs(hinfo["url"].split("?")[1])["ID"][0]
    else:  # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg


def test_basic_flow():
    sp = Saml2Client(config_file="servera_conf")
    with closing(Server(config_file="idp_all_conf")) as idp:
        # -------- @IDP -------------

        relay_state = "FOO"
        # -- dummy request ---
        orig_req = AuthnRequest(
            issuer=sp._issuer(), name_id_policy=NameIDPolicy(allow_create="true", format=NAMEID_FORMAT_TRANSIENT)
        )

        # == Create an AuthnRequest response

        name_id = idp.ident.transient_nameid("id12", sp.config.entityid)

        binding, destination = idp.pick_binding("assertion_consumer_service", entity_id=sp.config.entityid)
        resp = idp.create_authn_response(
            {
                "eduPersonEntitlement": "Short stop",
                "surName": "Jeter",
                "givenName": "Derek",
                "mail": "derek.jeter@nyy.mlb.com",
                "title": "The man",
            },
            "id-123456789",
            destination,
            sp.config.entityid,
            name_id=name_id,
            authn=AUTHN,
        )

        hinfo = idp.apply_binding(binding, f"{resp}", destination, relay_state)

        # --------- @SP -------------

        xmlstr = get_msg(hinfo, binding)
        # Explicitly allow unsigned responses for this test
        sp.want_response_signed = False
        aresp = sp.parse_authn_request_response(xmlstr, binding, {resp.in_response_to: "/"})

        # == Look for assertion X

        asid = aresp.assertion.id

        binding, destination = sp.pick_binding("assertion_id_request_service", entity_id=idp.config.entityid)

        hinfo = sp.apply_binding(binding, asid, destination)

        # ---------- @IDP ------------

        aid = get_msg(hinfo, binding, response=False)

        # == construct response

        resp = idp.create_assertion_id_request_response(aid)

        hinfo = idp.apply_binding(binding, f"{resp}", None, "", response=True)

        # ----------- @SP -------------

        xmlstr = get_msg(hinfo, binding, response=True)

        final = sp.parse_assertion_id_request_response(xmlstr, binding)

        print(final.response)
        assert isinstance(final.response, Assertion)