summaryrefslogtreecommitdiff
path: root/tests/fakeIDP.py
blob: c2bd5612aaf7c9e1d26ecb56a789e76eade9066a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from six.moves.urllib.parse import parse_qs
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.samlp import attribute_query_from_string, logout_request_from_string
from saml2 import BINDING_HTTP_REDIRECT, pack
from saml2 import BINDING_HTTP_POST
from saml2 import BINDING_SOAP
from saml2.server import Server
from saml2.soap import parse_soap_enveloped_saml_attribute_query
from saml2.soap import parse_soap_enveloped_saml_logout_request
from saml2.soap import make_soap_enveloped_saml_thingy

__author__ = 'rolandh'

TYP = {
    "GET": [BINDING_HTTP_REDIRECT],
    "POST": [BINDING_HTTP_POST, BINDING_SOAP]
}


AUTHN = {
    "class_ref": INTERNETPROTOCOLPASSWORD,
    "authn_auth": "http://www.example.com/login"
}


def unpack_form(_str, ver="SAMLRequest"):
    SR_STR = "name=\"%s\" value=\"" % ver
    RS_STR = 'name="RelayState" value="'

    i = _str.find(SR_STR)
    i += len(SR_STR)
    j = _str.find('"', i)

    sr = _str[i:j]

    k = _str.find(RS_STR, j)
    k += len(RS_STR)
    l = _str.find('"', k)

    rs = _str[k:l]

    return {ver: sr, "RelayState": rs}


class DummyResponse(object):
    def __init__(self, status, data, headers=None):
        self.status_code = status
        self.text = data
        self.headers = headers or []
        self.content = data


class FakeIDP(Server):
    def __init__(self, config_file=""):
        Server.__init__(self, config_file)
        #self.sign = False

    def receive(self, url, method="GET", **kwargs):
        """
        Interface to receive HTTP calls on

        :param url:
        :param method:
        :param kwargs:
        :return:
        """

        if method == "GET":
            path, query = url.split("?")
            qs_dict = parse_qs(kwargs["data"])
            req = qs_dict["SAMLRequest"][0]
            rstate = qs_dict["RelayState"][0]
        else:
            # Could be either POST or SOAP
            path = url
            try:
                qs_dict = parse_qs(kwargs["data"])
                req = qs_dict["SAMLRequest"][0]
                rstate = qs_dict["RelayState"][0]
            except KeyError:
                req = kwargs["data"]
                rstate = ""

        response = ""

        # Get service from path

        for key, vals in self.config.getattr("endpoints", "idp").items():
            for endp, binding in vals:
                if path == endp:
                    assert binding in TYP[method]
                    if key == "single_sign_on_service":
                        return self.authn_request_endpoint(req, binding,
                                                           rstate)
                    elif key == "single_logout_service":
                        return self.logout_endpoint(req, binding)

        for key, vals in self.config.getattr("endpoints", "aa").items():
            for endp, binding in vals:
                if path == endp:
                    assert binding in TYP[method]
                    if key == "attribute_service":
                        return self.attribute_query_endpoint(req, binding)

        return response

    def authn_request_endpoint(self, req, binding, relay_state):
        req = self.parse_authn_request(req, binding)
        if req.message.protocol_binding == BINDING_HTTP_REDIRECT:
            _binding = BINDING_HTTP_POST
        else:
            _binding = req.message.protocol_binding

        try:
            resp_args = self.response_args(req.message, [_binding])
        except Exception:
            raise

        identity = {"surName": "Hedberg", "givenName": "Roland",
                    "title": "supertramp", "mail": "roland@example.com"}
        userid = "Pavill"

        authn_resp = self.create_authn_response(identity,
                                                userid=userid,
                                                authn=AUTHN,
                                                **resp_args)

        response = "%s" % authn_resp

        _dict = pack.factory(_binding, response,
                             resp_args["destination"], relay_state,
                             "SAMLResponse")
        return DummyResponse(**_dict)

    def attribute_query_endpoint(self, xml_str, binding):
        if binding == BINDING_SOAP:
            _str = parse_soap_enveloped_saml_attribute_query(xml_str)
        else:
            _str = xml_str

        aquery = attribute_query_from_string(_str)
        extra = {"eduPersonAffiliation": "faculty"}
        #userid = "Pavill"

        name_id = aquery.subject.name_id
        attr_resp = self.create_attribute_response(
            extra, aquery.id, None, sp_entity_id=aquery.issuer.text,
            name_id=name_id, attributes=aquery.attribute)

        if binding == BINDING_SOAP:
            # SOAP packing
            #headers = {"content-type": "application/soap+xml"}
            soap_message = make_soap_enveloped_saml_thingy(attr_resp)
            #            if self.sign and self.sec:
            #                _signed = self.sec.sign_statement_using_xmlsec(soap_message,
            #                                                               class_name(attr_resp),
            #                                                               nodeid=attr_resp.id)
            #                soap_message = _signed
            response = "%s" % soap_message
        else:  # Just POST
            response = "%s" % attr_resp

        return DummyResponse(status=200, data=response)

    def logout_endpoint(self, xml_str, binding):
        if binding == BINDING_SOAP:
            _str = parse_soap_enveloped_saml_logout_request(xml_str)
        else:
            _str = xml_str

        req = logout_request_from_string(_str)

        _resp = self.create_logout_response(req, [binding])

        if binding == BINDING_SOAP:
            # SOAP packing
            #headers = {"content-type": "application/soap+xml"}
            soap_message = make_soap_enveloped_saml_thingy(_resp)
            #            if self.sign and self.sec:
            #                _signed = self.sec.sign_statement_using_xmlsec(soap_message,
            #                                                               class_name(attr_resp),
            #                                                               nodeid=attr_resp.id)
            #                soap_message = _signed
            response = "%s" % soap_message
        else: # Just POST
            response = "%s" % _resp

        return DummyResponse(status=200, data=response)