1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
Contains a class that can do SAML ECP Authentication for other python
programs.
"""
from six.moves import http_cookiejar as cookielib
import logging
from saml2 import soap
from saml2 import saml
from saml2 import samlp
from saml2 import SAMLError
from saml2 import BINDING_SOAP
from saml2.client_base import MIME_PAOS
from saml2.config import Config
from saml2.entity import Entity
from saml2.httpbase import set_list2dict, dict2set_list
from saml2.profile import paos
from saml2.profile import ecp
from saml2.mdstore import MetadataStore
from saml2.s_utils import BadRequest
SERVICE = "urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"
PAOS_HEADER_INFO = 'ver="%s";"%s"' % (paos.NAMESPACE, SERVICE)
logger = logging.getLogger(__name__)
class Client(Entity):
"""ECP-aware client that works on the client (application) side.
You can use this class when you want to login user through
ECP-aware SP and IdP.
"""
def __init__(self, user, passwd, sp="", idp=None, metadata_file=None,
xmlsec_binary=None, verbose=0, ca_certs="",
disable_ssl_certificate_validation=True, key_file=None,
cert_file=None, config=None):
"""
:param user: user name
:param passwd: user password
:param sp: The SP URL
:param idp: The IdP PAOS endpoint
:param metadata_file: Where the metadata file is if used
:param xmlsec_binary: Where the xmlsec1 binary can be found (*)
:param verbose: Chatty or not
:param ca_certs: is the path of a file containing root CA certificates
for SSL server certificate validation (*)
:param disable_ssl_certificate_validation: If
disable_ssl_certificate_validation is true, SSL cert validation
will not be performed (*)
:param key_file: Private key filename (*)
:param cert_file: Certificate filename (*)
:param config: Config() instance, overrides all the parameters marked
with an asterisk (*) above
"""
if not config:
config = Config()
config.disable_ssl_certificate_validation = \
disable_ssl_certificate_validation
config.key_file = key_file
config.cert_file = cert_file
config.ca_certs = ca_certs
config.xmlsec_binary = xmlsec_binary
Entity.__init__(self, "sp", config)
self._idp = idp
self._sp = sp
self.user = user
self.passwd = passwd
self._verbose = verbose
if metadata_file:
self._metadata = MetadataStore([saml, samlp], None, config)
self._metadata.load("local", metadata_file)
logger.debug("Loaded metadata from '%s'", metadata_file)
else:
self._metadata = None
self.metadata = self._metadata
self.cookie_handler = None
self.done_ecp = False
self.cookie_jar = cookielib.LWPCookieJar()
def phase2(self, authn_request, rc_url, idp_entity_id, headers=None,
sign=False, **kwargs):
"""
Doing the second phase of the ECP conversation, the conversation
with the IdP happens.
:param authn_request: The AuthenticationRequest
:param rc_url: The assertion consumer service url of the SP
:param idp_entity_id: The EntityID of the IdP
:param headers: Possible extra headers
:param sign: If the message should be signed
:return: The response from the IdP
"""
_, destination = self.pick_binding("single_sign_on_service",
[BINDING_SOAP], "idpsso",
entity_id=idp_entity_id)
ht_args = self.apply_binding(BINDING_SOAP, authn_request, destination,
sign=sign)
if headers:
ht_args["headers"].extend(headers)
logger.debug("[P2] Sending request: %s", ht_args["data"])
# POST the request to the IdP
response = self.send(**ht_args)
logger.debug("[P2] Got IdP response: %s", response)
if response.status_code != 200:
raise SAMLError(
"Request to IdP failed (%s): %s" % (response.status_code,
response.text))
# SAMLP response in a SOAP envelope body, ecp response in headers
respdict = self.parse_soap_message(response.text)
if respdict is None:
raise SAMLError("Unexpected reply from the IdP")
logger.debug("[P2] IdP response dict: %s", respdict)
idp_response = respdict["body"]
assert idp_response.c_tag == "Response"
logger.debug("[P2] IdP AUTHN response: %s", idp_response)
_ecp_response = None
for item in respdict["header"]:
if item.c_tag == "Response" and item.c_namespace == ecp.NAMESPACE:
_ecp_response = item
_acs_url = _ecp_response.assertion_consumer_service_url
if rc_url != _acs_url:
error = ("response_consumer_url '%s' does not match" % rc_url,
"assertion_consumer_service_url '%s" % _acs_url)
# Send an error message to the SP
_ = self.send(rc_url, "POST", data=soap.soap_fault(error))
# Raise an exception so the user knows something went wrong
raise SAMLError(error)
return idp_response
@staticmethod
def parse_sp_ecp_response(respdict):
if respdict is None:
raise SAMLError("Unexpected reply from the SP")
logger.debug("[P1] SP response dict: %s", respdict)
# AuthnRequest in the body or not
authn_request = respdict["body"]
assert authn_request.c_tag == "AuthnRequest"
# ecp.RelayState among headers
_relay_state = None
_paos_request = None
for item in respdict["header"]:
if item.c_tag == "RelayState" and item.c_namespace == ecp.NAMESPACE:
_relay_state = item
if item.c_tag == "Request" and item.c_namespace == paos.NAMESPACE:
_paos_request = item
if _paos_request is None:
raise BadRequest("Missing request")
_rc_url = _paos_request.response_consumer_url
return {"authn_request": authn_request, "rc_url": _rc_url,
"relay_state": _relay_state}
def ecp_conversation(self, respdict, idp_entity_id=None):
"""
:param respdict:
:param idp_entity_id:
:return:
"""
args = self.parse_sp_ecp_response(respdict)
# **********************
# Phase 2 - talk to the IdP
# **********************
idp_response = self.phase2(idp_entity_id=idp_entity_id, **args)
# **********************************
# Phase 3 - back to the SP
# **********************************
ht_args = self.use_soap(idp_response, args["rc_url"],
[args["relay_state"]])
ht_args["headers"][0] = ('Content-Type', MIME_PAOS)
logger.debug("[P3] Post to SP: %s", ht_args["data"])
# POST the package from the IdP to the SP
response = self.send(**ht_args)
if response.status_code == 302:
# ignore where the SP is redirecting us to and go for the
# url I started off with.
pass
else:
raise SAMLError(
"Error POSTing package to SP: %s" % response.text)
logger.debug("[P3] SP response: %s", response.text)
self.done_ecp = True
logger.debug("Done ECP")
return None
@staticmethod
def add_paos_headers(headers=None):
if headers:
headers = set_list2dict(headers)
headers["PAOS"] = PAOS_HEADER_INFO
if "Accept" in headers:
headers["Accept"] += ";%s" % MIME_PAOS
elif "accept" in headers:
headers["Accept"] = headers["accept"]
headers["Accept"] += ";%s" % MIME_PAOS
del headers["accept"]
headers = dict2set_list(headers)
else:
headers = [
('Accept', 'text/html; %s' % MIME_PAOS),
('PAOS', PAOS_HEADER_INFO)
]
return headers
def operation(self, url, idp_entity_id, op, **opargs):
"""
This is the method that should be used by someone that wants
to authenticate using SAML ECP
:param url: The page that access is sought for
:param idp_entity_id: The entity ID of the IdP that should be
used for authentication
:param op: Which HTTP operation (GET/POST/PUT/DELETE)
:param opargs: Arguments to the HTTP call
:return: The page
"""
sp_url = self._sp
# ********************************************
# Phase 1 - First conversation with the SP
# ********************************************
# headers needed to indicate to the SP that I'm ECP enabled
opargs["headers"] = self.add_paos_headers(opargs["headers"])
response = self.send(sp_url, op, **opargs)
logger.debug("[Op] SP response: %s" % response)
print(response.text)
if response.status_code != 200:
raise SAMLError(
"Request to SP failed: %s" % response.text)
# The response might be a AuthnRequest instance in a SOAP envelope
# body. If so it's the start of the ECP conversation
# Two SOAP header blocks; paos:Request and ecp:Request
# may also contain a ecp:RelayState SOAP header block
# If channel-binding was part of the PAOS header any number of
# <cb:ChannelBindings> header blocks may also be present
# if 'holder-of-key' option then one or more <ecp:SubjectConfirmation>
# header blocks may also be present
try:
respdict = self.parse_soap_message(response.text)
self.ecp_conversation(respdict, idp_entity_id)
# should by now be authenticated so this should go smoothly
response = self.send(url, op, **opargs)
except (soap.XmlParseError, AssertionError, KeyError):
raise
if response.status_code >= 400:
raise SAMLError("Error performing operation: %s" % (
response.text,))
return response
# different HTTP operations
def delete(self, url=None, idp_entity_id=None):
return self.operation(url, idp_entity_id, "DELETE")
def get(self, url=None, idp_entity_id=None, headers=None):
return self.operation(url, idp_entity_id, "GET", headers=headers)
def post(self, url=None, data="", idp_entity_id=None, headers=None):
return self.operation(url, idp_entity_id, "POST", data=data,
headers=headers)
def put(self, url=None, data="", idp_entity_id=None, headers=None):
return self.operation(url, idp_entity_id, "PUT", data=data,
headers=headers)
|