1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
from six.moves.urllib import parse
from saml2.entity import Entity
from saml2.response import VerificationError
__author__ = 'rolandh'
IDPDISC_POLICY = "urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol:single"
class DiscoveryServer(Entity):
def __init__(self, config=None, config_file=""):
if config or config_file:
Entity.__init__(self, "disco", config, config_file)
def parse_discovery_service_request(self, url="", query=""):
if url:
part = parse.urlparse(url)
dsr = parse.parse_qs(part[4])
elif query:
dsr = parse.parse_qs(query)
else:
dsr = {}
# verify
for key in ["isPassive", "return", "returnIDParam", "policy", 'entityID']:
try:
if len(dsr[key]) != 1:
raise Exception("Invalid DS request keys: {k}".format(k=key))
dsr[key] = dsr[key][0]
except KeyError:
pass
if "return" in dsr:
part = parse.urlparse(dsr["return"])
if part.query:
qp = parse.parse_qs(part.query)
if "returnIDParam" in dsr:
if dsr["returnIDParam"] in qp.keys():
raise Exception(
"returnIDParam value should not be in the query params"
)
else:
if "entityID" in qp.keys():
raise Exception("entityID should not be in the query params")
else:
# If metadata not used this is mandatory
raise VerificationError("Missing mandatory parameter 'return'")
if "policy" not in dsr:
dsr["policy"] = IDPDISC_POLICY
is_passive = dsr.get("isPassive")
if is_passive not in ["true", "false"]:
raise ValueError(
"Invalid value '{v}' for attribute '{attr}'".format(
v=is_passive, attr="isPassive"
)
)
if "isPassive" in dsr and dsr["isPassive"] == "true":
dsr["isPassive"] = True
else:
dsr["isPassive"] = False
if not "returnIDParam" in dsr:
dsr["returnIDParam"] = "entityID"
return dsr
# -------------------------------------------------------------------------
@staticmethod
def create_discovery_service_response(return_url=None,
returnIDParam="entityID",
entity_id=None, **kwargs):
if return_url is None:
return_url = kwargs["return"]
if entity_id:
qp = parse.urlencode({returnIDParam: entity_id})
part = parse.urlparse(return_url)
if part.query:
# Iff there is a query part add the new info at the end
return_url = "%s&%s" % (return_url, qp)
else:
return_url = "%s?%s" % (return_url, qp)
return return_url
def verify_sp_in_metadata(self, entity_id):
if self.metadata:
endp = self.metadata.discovery_response(entity_id)
if endp:
return True
return False
def verify_return(self, entity_id, return_url):
for endp in self.metadata.discovery_response(entity_id):
if not return_url.startswith(endp["location"]):
return True
return False
|