summaryrefslogtreecommitdiff
path: root/src/saml2/ecp_client.py
blob: f0183f45c7482a1915d3320d031bed715c006c33 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#

"""
Contains a class that can do SAML ECP Authentication for other python
programs.
"""

from six.moves import http_cookiejar as cookielib
import logging

from saml2 import soap
from saml2 import saml
from saml2 import samlp
from saml2 import SAMLError
from saml2 import BINDING_SOAP
from saml2.client_base import MIME_PAOS
from saml2.config import Config
from saml2.entity import Entity
from saml2.httpbase import set_list2dict, dict2set_list

from saml2.profile import paos
from saml2.profile import ecp

from saml2.mdstore import MetadataStore
from saml2.s_utils import BadRequest

SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
PAOS_HEADER_INFO = 'ver="%s";"%s"' % (paos.NAMESPACE, SERVICE)

logger = logging.getLogger(__name__)


class Client(Entity):
    """ECP-aware client that works on the client (application) side.

    You can use this class when you want to login user through
    ECP-aware SP and IdP.
    """

    def __init__(self, user, passwd, sp="", idp=None, metadata_file=None,
                 xmlsec_binary=None, verbose=0, ca_certs="",
                 disable_ssl_certificate_validation=True, key_file=None,
                 cert_file=None, config=None):
        """
        :param user: user name
        :param passwd: user password
        :param sp: The SP URL
        :param idp: The IdP PAOS endpoint
        :param metadata_file: Where the metadata file is if used
        :param xmlsec_binary: Where the xmlsec1 binary can be found (*)
        :param verbose: Chatty or not
        :param ca_certs: is the path of a file containing root CA certificates
            for SSL server certificate validation (*)
        :param disable_ssl_certificate_validation: If
            disable_ssl_certificate_validation is true, SSL cert validation
            will not be performed (*)
        :param key_file: Private key filename (*)
        :param cert_file: Certificate filename (*)
        :param config: Config() instance, overrides all the parameters marked
            with an asterisk (*) above
        """
        if not config:
            config = Config()
            config.disable_ssl_certificate_validation = \
                disable_ssl_certificate_validation
            config.key_file = key_file
            config.cert_file = cert_file
            config.ca_certs = ca_certs
            config.xmlsec_binary = xmlsec_binary

        Entity.__init__(self, "sp", config)
        self._idp = idp
        self._sp = sp
        self.user = user
        self.passwd = passwd
        self._verbose = verbose

        if metadata_file:
            self._metadata = MetadataStore([saml, samlp], None, config)
            self._metadata.load("local", metadata_file)
            logger.debug("Loaded metadata from '%s'", metadata_file)
        else:
            self._metadata = None

        self.metadata = self._metadata

        self.cookie_handler = None

        self.done_ecp = False
        self.cookie_jar = cookielib.LWPCookieJar()

    def phase2(self, authn_request, rc_url, idp_entity_id, headers=None,
               sign=False, **kwargs):
        """
        Doing the second phase of the ECP conversation, the conversation
        with the IdP happens.

        :param authn_request: The AuthenticationRequest
        :param rc_url: The assertion consumer service url of the SP
        :param idp_entity_id: The EntityID of the IdP
        :param headers: Possible extra headers
        :param sign: If the message should be signed
        :return: The response from the IdP
        """

        _, destination = self.pick_binding("single_sign_on_service",
                                           [BINDING_SOAP], "idpsso",
                                           entity_id=idp_entity_id)

        ht_args = self.apply_binding(BINDING_SOAP, authn_request, destination,
                                     sign=sign)

        if headers:
            ht_args["headers"].extend(headers)

        logger.debug("[P2] Sending request: %s", ht_args["data"])

        # POST the request to the IdP
        response = self.send(**ht_args)

        logger.debug("[P2] Got IdP response: %s", response)

        if response.status_code != 200:
            raise SAMLError(
                "Request to IdP failed (%s): %s" % (response.status_code,
                                                    response.text))

        # SAMLP response in a SOAP envelope body, ecp response in headers
        respdict = self.parse_soap_message(response.text)

        if respdict is None:
            raise SAMLError("Unexpected reply from the IdP")

        logger.debug("[P2] IdP response dict: %s", respdict)

        idp_response = respdict["body"]
        assert idp_response.c_tag == "Response"

        logger.debug("[P2] IdP AUTHN response: %s", idp_response)

        _ecp_response = None
        for item in respdict["header"]:
            if item.c_tag == "Response" and item.c_namespace == ecp.NAMESPACE:
                _ecp_response = item

        _acs_url = _ecp_response.assertion_consumer_service_url
        if rc_url != _acs_url:
            error = ("response_consumer_url '%s' does not match" % rc_url,
                     "assertion_consumer_service_url '%s" % _acs_url)
            # Send an error message to the SP
            _ = self.send(rc_url, "POST", data=soap.soap_fault(error))
            # Raise an exception so the user knows something went wrong
            raise SAMLError(error)

        return idp_response

    @staticmethod
    def parse_sp_ecp_response(respdict):
        if respdict is None:
            raise SAMLError("Unexpected reply from the SP")

        logger.debug("[P1] SP response dict: %s", respdict)

        # AuthnRequest in the body or not
        authn_request = respdict["body"]
        assert authn_request.c_tag == "AuthnRequest"

        # ecp.RelayState among headers
        _relay_state = None
        _paos_request = None
        for item in respdict["header"]:
            if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE:
                _relay_state = item
            if item.c_tag == "Request" and item.c_namespace == paos.NAMESPACE:
                _paos_request = item

        if _paos_request is None:
            raise BadRequest("Missing request")

        _rc_url = _paos_request.response_consumer_url

        return {"authn_request": authn_request, "rc_url": _rc_url,
                "relay_state": _relay_state}

    def ecp_conversation(self, respdict, idp_entity_id=None):
        """

        :param respdict:
        :param idp_entity_id:
        :return:
        """

        args = self.parse_sp_ecp_response(respdict)

        # **********************
        # Phase 2 - talk to the IdP
        # **********************

        idp_response = self.phase2(idp_entity_id=idp_entity_id, **args)

        # **********************************
        # Phase 3 - back to the SP
        # **********************************

        ht_args = self.use_soap(idp_response, args["rc_url"],
                                [args["relay_state"]])
        ht_args["headers"][0] = ('Content-Type', MIME_PAOS)
        logger.debug("[P3] Post to SP: %s", ht_args["data"])

        # POST the package from the IdP to the SP
        response = self.send(**ht_args)

        if response.status_code == 302:
            # ignore where the SP is redirecting us to and go for the
            # url I started off with.
            pass
        else:
            raise SAMLError(
                "Error POSTing package to SP: %s" % response.text)

        logger.debug("[P3] SP response: %s", response.text)

        self.done_ecp = True
        logger.debug("Done ECP")

        return None

    @staticmethod
    def add_paos_headers(headers=None):
        if headers:
            headers = set_list2dict(headers)
            headers["PAOS"] = PAOS_HEADER_INFO
            if "Accept" in headers:
                headers["Accept"] += ";%s" % MIME_PAOS
            elif "accept" in headers:
                headers["Accept"] = headers["accept"]
                headers["Accept"] += ";%s" % MIME_PAOS
                del headers["accept"]
            headers = dict2set_list(headers)
        else:
            headers = [
                ('Accept', 'text/html; %s' % MIME_PAOS),
                ('PAOS', PAOS_HEADER_INFO)
            ]

        return headers

    def operation(self, url, idp_entity_id, op, **opargs):
        """
        This is the method that should be used by someone that wants
        to authenticate using SAML ECP

        :param url: The page that access is sought for
        :param idp_entity_id: The entity ID of the IdP that should be
            used for authentication
        :param op: Which HTTP operation (GET/POST/PUT/DELETE)
        :param opargs: Arguments to the HTTP call
        :return: The page
        """
        sp_url = self._sp

        # ********************************************
        # Phase 1 - First conversation with the SP
        # ********************************************
        # headers needed to indicate to the SP that I'm ECP enabled

        opargs["headers"] = self.add_paos_headers(opargs["headers"])
        response = self.send(sp_url, op, **opargs)
        logger.debug("[Op] SP response: %s" % response)
        print(response.text)

        if response.status_code != 200:
            raise SAMLError(
                "Request to SP failed: %s" % response.text)

        # The response might be a AuthnRequest instance in a SOAP envelope
        # body. If so it's the start of the ECP conversation
        # Two SOAP header blocks; paos:Request and ecp:Request
        # may also contain a ecp:RelayState SOAP header block
        # If channel-binding was part of the PAOS header any number of
        # <cb:ChannelBindings> header blocks may also be present
        # if 'holder-of-key' option then one or more <ecp:SubjectConfirmation>
        # header blocks may also be present
        try:
            respdict = self.parse_soap_message(response.text)
            self.ecp_conversation(respdict, idp_entity_id)

            # should by now be authenticated so this should go smoothly
            response = self.send(url, op, **opargs)
        except (soap.XmlParseError, AssertionError, KeyError):
            raise

        if response.status_code >= 400:
            raise SAMLError("Error performing operation: %s" % (
                response.text,))

        return response

    # different HTTP operations
    def delete(self, url=None, idp_entity_id=None):
        return self.operation(url, idp_entity_id, "DELETE")

    def get(self, url=None, idp_entity_id=None, headers=None):
        return self.operation(url, idp_entity_id, "GET", headers=headers)

    def post(self, url=None, data="", idp_entity_id=None, headers=None):
        return self.operation(url, idp_entity_id, "POST", data=data,
                              headers=headers)

    def put(self, url=None, data="", idp_entity_id=None, headers=None):
        return self.operation(url, idp_entity_id, "PUT", data=data,
                              headers=headers)