summaryrefslogtreecommitdiff
path: root/src/saml2/sdb.py
blob: 01c69aea20db9fe0bbfd81a8670ba4062d937667 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging

from hashlib import sha1

from saml2.ident import code_binary


logger = logging.getLogger(__name__)


def context_match(cfilter, cntx):
    # TODO
    return True

# The key to the stored authn statement is placed encrypted in the cookie


class SessionStorage(object):
    """ In memory storage of session information """

    def __init__(self):
        self.db = {"assertion": {}, "authn": {}}
        self.assertion = self.db["assertion"]
        self.authn = self.db["authn"]

    def store_assertion(self, assertion, to_sign):
        self.assertion[assertion.id] = (assertion, to_sign)
        key = sha1(code_binary(assertion.subject.name_id)).hexdigest()
        try:
            self.authn[key].append(assertion.authn_statement)
        except KeyError:
            self.authn[key] = [assertion.authn_statement]

    def get_assertion(self, cid):
        return self.assertion[cid]

    def get_authn_statements(self, name_id, session_index=None,
                             requested_context=None):
        """

        :param name_id:
        :param session_index:
        :param requested_context:
        :return:
        """
        result = []
        key = sha1(code_binary(name_id)).hexdigest()
        try:
            statements = self.authn[key]
        except KeyError:
            logger.info("Unknown subject %s", name_id)
            return []

        for statement in statements:
            if session_index:
                if statement.session_index != session_index:
                    continue
            if requested_context:
                if not context_match(requested_context,
                                     statement[0].authn_context):
                    continue
            result.append(statement)

        return result

    def remove_authn_statements(self, name_id):
        logger.debug("remove authn about: %s", name_id)
        nkey = sha1(code_binary(name_id)).hexdigest()

        del self.authn[nkey]