summaryrefslogtreecommitdiff
path: root/src/saml2/discovery.py
blob: d3e4250085d79020222549c2b2b83ae19c32bb75 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from six.moves.urllib import parse

from saml2.entity import Entity
from saml2.response import VerificationError

__author__ = 'rolandh'

IDPDISC_POLICY = "urn:oasis:names:tc:SAML:profiles:SSO:idp-discovery-protocol:single"


class DiscoveryServer(Entity):
    def __init__(self, config=None, config_file=""):
        if config or config_file:
            Entity.__init__(self, "disco", config, config_file)

    def parse_discovery_service_request(self, url="", query=""):
        if url:
            part = parse.urlparse(url)
            dsr = parse.parse_qs(part[4])
        elif query:
            dsr = parse.parse_qs(query)
        else:
            dsr = {}

        # verify

        for key in ["isPassive", "return", "returnIDParam", "policy",
                    'entityID']:
            try:
                assert len(dsr[key]) == 1
                dsr[key] = dsr[key][0]
            except KeyError:
                pass

        if "return" in dsr:
            part = parse.urlparse(dsr["return"])
            if part.query:
                qp = parse.parse_qs(part.query)
                if "returnIDParam" in dsr:
                    assert dsr["returnIDParam"] not in qp.keys()
                else:
                    assert "entityID" not in qp.keys()
        else:
            # If metadata not used this is mandatory
            raise VerificationError("Missing mandatory parameter 'return'")

        if "policy" not in dsr:
            dsr["policy"] = IDPDISC_POLICY

        try:
            assert dsr["isPassive"] in ["true", "false"]
        except KeyError:
            pass

        if "isPassive" in dsr and dsr["isPassive"] == "true":
            dsr["isPassive"] = True
        else:
            dsr["isPassive"] = False

        if not "returnIDParam" in dsr:
            dsr["returnIDParam"] = "entityID"

        return dsr

    # -------------------------------------------------------------------------

    @staticmethod
    def create_discovery_service_response(return_url=None,
                                          returnIDParam="entityID",
                                          entity_id=None, **kwargs):
        if return_url is None:
            return_url = kwargs["return"]

        if entity_id:
            qp = parse.urlencode({returnIDParam: entity_id})

            part = parse.urlparse(return_url)
            if part.query:
                # Iff there is a query part add the new info at the end
                return_url = "%s&%s" % (return_url, qp)
            else:
                return_url = "%s?%s" % (return_url, qp)

        return return_url

    def verify_sp_in_metadata(self, entity_id):
        if self.metadata:
            endp = self.metadata.discovery_response(entity_id)
            if endp:
                return True

        return False

    def verify_return(self, entity_id, return_url):
        for endp in self.metadata.discovery_response(entity_id):
            try:
                assert return_url.startswith(endp["location"])
            except AssertionError:
                pass
            else:
                return True
        return False