summaryrefslogtreecommitdiff
path: root/keystone/tests/unit/filtering.py
blob: 8b9a29a3596723e129185880d984fef7330edc07 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import uuid

from keystone.common import provider_api
import keystone.conf
from keystone import exception


PROVIDERS = provider_api.ProviderAPIs
CONF = keystone.conf.CONF


class FilterTests(object):

    # Provide support for checking if a batch of list items all
    # exist within a contiguous range in a total list
    def _match_with_list(self, this_batch, total_list,
                         batch_size=None,
                         list_start=None, list_end=None):
        if batch_size is None:
            batch_size = len(this_batch)
        if list_start is None:
            list_start = 0
        if list_end is None:
            list_end = len(total_list)
        for batch_item in range(0, batch_size):
            found = False
            for list_item in range(list_start, list_end):
                if this_batch[batch_item]['id'] == total_list[list_item]['id']:
                    found = True
            self.assertTrue(found)

    def _create_entity(self, entity_type):
        """Find the create_<entity_type> method.

        Searches through the [identity_api, resource_api, assignment_api]
        managers for a method called create_<entity_type> and returns the first
        one.

        """
        f = getattr(PROVIDERS.identity_api, 'create_%s' % entity_type, None)
        if f is None:
            f = getattr(
                PROVIDERS.resource_api, 'create_%s' % entity_type, None
            )
        if f is None:
            f = getattr(PROVIDERS.assignment_api, 'create_%s' % entity_type)
        return f

    def _delete_entity(self, entity_type):
        """Find the delete_<entity_type> method.

        Searches through the [identity_api, resource_api, assignment_api]
        managers for a method called delete_<entity_type> and returns the first
        one.

        """
        f = getattr(PROVIDERS.identity_api, 'delete_%s' % entity_type, None)
        if f is None:
            f = getattr(
                PROVIDERS.resource_api, 'delete_%s' % entity_type, None
            )
        if f is None:
            f = getattr(PROVIDERS.assignment_api, 'delete_%s' % entity_type)
        return f

    def _list_entities(self, entity_type):
        """Find the list_<entity_type> method.

        Searches through the [identity_api, resource_api, assignment_api]
        managers for a method called list_<entity_type> and returns the first
        one.

        """
        f = getattr(PROVIDERS.identity_api, 'list_%ss' % entity_type, None)
        if f is None:
            f = getattr(PROVIDERS.resource_api, 'list_%ss' % entity_type, None)
        if f is None:
            f = getattr(PROVIDERS.assignment_api, 'list_%ss' % entity_type)
        return f

    def _create_one_entity(self, entity_type, domain_id, name):
        new_entity = {'name': name,
                      'domain_id': domain_id}
        if entity_type in ['user', 'group']:
            # The manager layer creates the ID for users and groups
            new_entity = self._create_entity(entity_type)(new_entity)
        else:
            new_entity['id'] = uuid.uuid4().hex
            self._create_entity(entity_type)(new_entity['id'], new_entity)
        return new_entity

    def _create_test_data(self, entity_type, number, domain_id=None,
                          name_dict=None):
        """Create entity test data.

        :param entity_type: type of entity to create, e.g. 'user', group' etc.
        :param number: number of entities to create,
        :param domain_id: if not defined, all users will be created in the
                          default domain.
        :param name_dict: optional dict containing entity number and name pairs

        """
        entity_list = []
        if domain_id is None:
            domain_id = CONF.identity.default_domain_id
        name_dict = name_dict or {}
        for x in range(number):
            # If this index has a name defined in the name_dict, then use it
            name = name_dict.get(x, uuid.uuid4().hex)
            new_entity = self._create_one_entity(entity_type, domain_id, name)
            entity_list.append(new_entity)
        return entity_list

    def _delete_test_data(self, entity_type, entity_list):
        for entity in entity_list:
            try:
                self._delete_entity(entity_type)(entity['id'])
            except exception.Forbidden:
                # Note(knikolla): Some identity backends such as LDAP are
                # read only
                break