summaryrefslogtreecommitdiff
path: root/tests/test_64_artifact.py
blob: 8b3bb262aa1bb2c393fab4ee0fc41b8d57812c22 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import base64
from contextlib import closing
from hashlib import sha1
from six.moves.urllib.parse import urlparse
from six.moves.urllib.parse import parse_qs
from saml2 import BINDING_HTTP_ARTIFACT
from saml2 import BINDING_SOAP
from saml2 import BINDING_HTTP_POST
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.client import Saml2Client

from saml2.entity import create_artifact
from saml2.entity import ARTIFACT_TYPECODE
from saml2.s_utils import sid
from saml2.server import Server

__author__ = 'rolandh'

TAG1 = "name=\"SAMLRequest\" value="


AUTHN = {
    "class_ref": INTERNETPROTOCOLPASSWORD,
    "authn_auth": "http://www.example.com/login"
}


def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_HTTP_ARTIFACT:
        # either by POST or by redirect
        if hinfo["data"]:
            _inp = hinfo["data"][3]
            i = _inp.find(TAG1)
            i += len(TAG1) + 1
            j = _inp.find('"', i)
            msg = _inp[i:j]
        else:
            parts = urlparse(hinfo["url"])
            msg = parse_qs(parts.query)["SAMLart"][0]
    else: # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg


def test_create_artifact():
    b64art = create_artifact("http://sp.example.com/saml.xml",
                             b"aabbccddeeffgghhiijj")

    art = base64.b64decode(b64art.encode('ascii'))

    assert art[:2] == ARTIFACT_TYPECODE
    assert int(art[2:4]) == 0

    s = sha1(b"http://sp.example.com/saml.xml")
    assert art[4:24] == s.digest()

SP = 'urn:mace:example.com:saml:roland:sp'


def test_create_artifact_resolve():
    b64art = create_artifact(SP, "aabbccddeeffgghhiijj", 1)
    artifact = base64.b64decode(b64art)

    #assert artifact[:2] == '\x00\x04'
    #assert int(artifact[2:4]) == 0
    #
    s = sha1(SP.encode('ascii'))
    assert artifact[4:24] == s.digest()

    with closing(Server(config_file="idp_all_conf")) as idp:
        typecode = artifact[:2]
        assert typecode == ARTIFACT_TYPECODE

        destination = idp.artifact2destination(b64art, "spsso")

        msg_id, msg = idp.create_artifact_resolve(b64art, destination, sid())

        print(msg)

        args = idp.use_soap(msg, destination, None, False)

        sp = Saml2Client(config_file="servera_conf")

        ar = sp.parse_artifact_resolve(args["data"])

        print(ar)

        assert ar.artifact.text == b64art


def test_artifact_flow():
    #SP = 'urn:mace:example.com:saml:roland:sp'
    sp = Saml2Client(config_file="servera_conf")

    with closing(Server(config_file="idp_all_conf")) as idp:
        # original request

        binding, destination = sp.pick_binding("single_sign_on_service",
                                               entity_id=idp.config.entityid)
        relay_state = "RS0"
        req_id, req = sp.create_authn_request(destination, id="id1")

        artifact = sp.use_artifact(req, 1)

        binding, destination = sp.pick_binding("single_sign_on_service",
                                               [BINDING_HTTP_ARTIFACT],
                                               entity_id=idp.config.entityid)

        hinfo = sp.apply_binding(binding, artifact, destination, relay_state)

        # ========== @IDP ============

        artifact2 = get_msg(hinfo, binding)

        assert artifact == artifact2

        # The IDP now wants to replace the artifact with the real request

        destination = idp.artifact2destination(artifact2, "spsso")

        msg_id, msg = idp.create_artifact_resolve(artifact2, destination, sid())

        hinfo = idp.use_soap(msg, destination, None, False)

        # ======== @SP ==========

        msg = get_msg(hinfo, BINDING_SOAP)

        ar = sp.parse_artifact_resolve(msg)

        assert ar.artifact.text == artifact

        # The SP picks the request out of the repository with the artifact as the key
        oreq = sp.artifact[ar.artifact.text]
        # Should be the same as req above

        # Returns the information over the existing SOAP connection so
        # no transport information needed

        msg = sp.create_artifact_response(ar, ar.artifact.text)
        hinfo = sp.use_soap(msg, destination)

        # ========== @IDP ============

        msg = get_msg(hinfo, BINDING_SOAP)

        # The IDP untangles the request from the artifact resolve response
        spreq = idp.parse_artifact_resolve_response(msg)

        # should be the same as req above

        assert spreq.id == req.id

        # That was one way, the Request from the SP
        # ---------------------------------------------#
        # Now for the other, the response from the IDP

        name_id = idp.ident.transient_nameid(sp.config.entityid, "derek")

        resp_args = idp.response_args(spreq, [BINDING_HTTP_POST])

        response = idp.create_authn_response({"eduPersonEntitlement": "Short stop",
                                              "surName": "Jeter", "givenName": "Derek",
                                              "mail": "derek.jeter@nyy.mlb.com",
                                              "title": "The man"},
                                             name_id=name_id,
                                             authn=AUTHN,
                                             **resp_args)

        print(response)

        # with the response in hand create an artifact

        artifact = idp.use_artifact(response, 1)

        binding, destination = sp.pick_binding("single_sign_on_service",
                                               [BINDING_HTTP_ARTIFACT],
                                               entity_id=idp.config.entityid)

        hinfo = sp.apply_binding(binding, "%s" % artifact, destination, relay_state,
                                 response=True)

        # ========== SP =========

        artifact3 = get_msg(hinfo, binding)

        assert artifact == artifact3

        destination = sp.artifact2destination(artifact3, "idpsso")

        # Got an artifact want to replace it with the real message
        msg_id, msg = sp.create_artifact_resolve(artifact3, destination, sid())

        print(msg)

        hinfo = sp.use_soap(msg, destination, None, False)

        # ======== IDP ==========

        msg = get_msg(hinfo, BINDING_SOAP)

        ar = idp.parse_artifact_resolve(msg)

        print(ar)

        assert ar.artifact.text == artifact3

        # The IDP retrieves the response from the database using the artifact as the key
        #oreq = idp.artifact[ar.artifact.text]

        binding, destination = idp.pick_binding("artifact_resolution_service",
                                                entity_id=sp.config.entityid)

        resp = idp.create_artifact_response(ar, ar.artifact.text)
        hinfo = idp.use_soap(resp, destination)

        # ========== SP ============

        msg = get_msg(hinfo, BINDING_SOAP)
        sp_resp = sp.parse_artifact_resolve_response(msg)

        assert sp_resp.id == response.id