summaryrefslogtreecommitdiff
path: root/tempest/lib/common/cred_client.py
blob: 69798a4b8648972321fb687fefd3e5a563bf14ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import abc

from oslo_log import log as logging

from tempest.lib import auth
from tempest.lib import exceptions as lib_exc
from tempest.lib.services.identity.v2 import identity_client as v2_identity

LOG = logging.getLogger(__name__)


class CredsClient(object, metaclass=abc.ABCMeta):
    """This class is a wrapper around the identity clients

     to provide a single interface for managing credentials in both v2 and v3
     cases. It's not bound to created credentials, only to a specific set of
     admin credentials used for generating credentials.
    """

    def __init__(self, identity_client, projects_client, users_client,
                 roles_client):
        # The client implies version and credentials
        self.identity_client = identity_client
        self.users_client = users_client
        self.projects_client = projects_client
        self.roles_client = roles_client

    def create_user(self, username, password, project=None, email=None):
        params = {'name': username,
                  'password': password}
        # with keystone v3, a default project is not required
        if project:
            params[self.project_id_param] = project['id']
        # email is not a first-class attribute of a user
        if email:
            params['email'] = email
        user = self.users_client.create_user(**params)
        if 'user' in user:
            user = user['user']
        return user

    def delete_user(self, user_id):
        self.users_client.delete_user(user_id)

    @abc.abstractmethod
    def create_project(self, name, description):
        pass

    @abc.abstractmethod
    def show_project(self, project_id):
        pass

    def _check_role_exists(self, role_name):
        try:
            roles = self._list_roles()
            lc_role_name = role_name.lower()
            role = next(r for r in roles if r['name'].lower() == lc_role_name)
        except StopIteration:
            return None
        return role

    def create_user_role(self, role_name):
        if not self._check_role_exists(role_name):
            self.roles_client.create_role(name=role_name)

    def assign_user_role(self, user, project, role_name):
        role = self._check_role_exists(role_name)
        if not role:
            msg = 'No "%s" role found' % role_name
            raise lib_exc.NotFound(msg)
        try:
            self.roles_client.create_user_role_on_project(project['id'],
                                                          user['id'],
                                                          role['id'])
        except lib_exc.Conflict:
            LOG.debug("Role %s already assigned on project %s for user %s",
                      role['id'], project['id'], user['id'])

    @abc.abstractmethod
    def get_credentials(
            self, user, project, password, domain=None, system=None):
        """Produces a Credentials object from the details provided

        :param user: a user dict
        :param project: a project dict or None if using domain or system scope
        :param password: the password as a string
        :param domain: a domain dict
        :param system: a system dict
        :return: a Credentials object with all the available credential details
        """
        pass

    def _list_roles(self):
        roles = self.roles_client.list_roles()['roles']
        return roles


class V2CredsClient(CredsClient):
    project_id_param = 'tenantId'

    def __init__(self, identity_client, projects_client, users_client,
                 roles_client):
        super(V2CredsClient, self).__init__(identity_client,
                                            projects_client,
                                            users_client,
                                            roles_client)

    def create_project(self, name, description):
        tenant = self.projects_client.create_tenant(
            name=name, description=description)['tenant']
        return tenant

    def show_project(self, project_id):
        return self.projects_client.show_tenant(project_id)['tenant']

    def delete_project(self, project_id):
        self.projects_client.delete_tenant(project_id)

    def get_credentials(
        self, user, project, password, domain=None, system=None):
        # User and project already include both ID and name here,
        # so there's no need to use the fill_in mode
        return auth.get_credentials(
            auth_url=None,
            fill_in=False,
            identity_version='v2',
            username=user['name'], user_id=user['id'],
            tenant_name=project['name'], tenant_id=project['id'],
            password=password)


class V3CredsClient(CredsClient):
    project_id_param = 'project_id'

    def __init__(self, identity_client, projects_client, users_client,
                 roles_client, domains_client, domain_name):
        super(V3CredsClient, self).__init__(identity_client, projects_client,
                                            users_client, roles_client)
        self.domains_client = domains_client

        try:
            # Domain names must be unique, in any case a list is returned,
            # selecting the first (and only) element
            self.creds_domain = self.domains_client.list_domains(
                name=domain_name)['domains'][0]
        except lib_exc.NotFound:
            # TODO(andrea) we could probably create the domain on the fly
            msg = "Requested domain %s could not be found" % domain_name
            raise lib_exc.InvalidCredentials(msg)

    def create_project(self, name, description):
        project = self.projects_client.create_project(
            name=name, description=description,
            domain_id=self.creds_domain['id'])['project']
        return project

    def show_project(self, project_id):
        return self.projects_client.show_project(project_id)['project']

    def delete_project(self, project_id):
        self.projects_client.delete_project(project_id)

    def create_domain(self, name, description):
        domain = self.domains_client.create_domain(
            name=name, description=description)['domain']
        return domain

    def delete_domain(self, domain_id):
        self.domains_client.update_domain(domain_id, enabled=False)
        self.domains_client.delete_domain(domain_id)

    def create_user(self, username, password, project=None, email=None,
                    domain_id=None):
        params = {'name': username,
                  'password': password,
                  'domain_id': domain_id or self.creds_domain['id']}
        # with keystone v3, a default project is not required
        if project:
            params[self.project_id_param] = project['id']
        # email is not a first-class attribute of a user
        if email:
            params['email'] = email
        user = self.users_client.create_user(**params)
        if 'user' in user:
            user = user['user']
        return user

    def get_credentials(
            self, user, project, password, domain=None, system=None):
        # User, project and domain already include both ID and name here,
        # so there's no need to use the fill_in mode.
        # NOTE(andreaf) We need to set all fields in the returned credentials.
        # Scope is then used to pick only those relevant for the type of
        # token needed by each service client.
        if project:
            project_name = project['name']
            project_id = project['id']
        else:
            project_name = None
            project_id = None
        if domain:
            domain_name = domain['name']
            domain_id = domain['id']
        else:
            domain_name = self.creds_domain['name']
            domain_id = self.creds_domain['id']
        return auth.get_credentials(
            auth_url=None,
            fill_in=False,
            identity_version='v3',
            username=user['name'], user_id=user['id'],
            project_name=project_name, project_id=project_id,
            password=password,
            project_domain_id=self.creds_domain['id'],
            project_domain_name=self.creds_domain['name'],
            domain_id=domain_id,
            domain_name=domain_name,
            system=system)

    def assign_user_role_on_domain(self, user, role_name, domain=None):
        """Assign the specified role on a domain

        :param user: a user dict
        :param role_name: name of the role to be assigned
        :param domain: (optional) The domain to assign the role on. If not
                                  specified the default domain of cred_client
        """
        # NOTE(andreaf) This method is very specific to the v3 case, and
        # because of that it's not defined in the parent class.
        if domain is None:
            domain = self.creds_domain
        role = self._check_role_exists(role_name)
        if not role:
            msg = 'No "%s" role found' % role_name
            raise lib_exc.NotFound(msg)
        try:
            self.roles_client.create_user_role_on_domain(
                domain['id'], user['id'], role['id'])
        except lib_exc.Conflict:
            LOG.debug("Role %s already assigned on domain %s for user %s",
                      role['id'], domain['id'], user['id'])

    def assign_user_role_on_system(self, user, role_name):
        """Assign the specified role on the system

        :param user: a user dict
        :param role_name: name of the role to be assigned
        """
        role = self._check_role_exists(role_name)
        if not role:
            msg = 'No "%s" role found' % role_name
            raise lib_exc.NotFound(msg)
        try:
            self.roles_client.create_user_role_on_system(
                user['id'], role['id'])
        except lib_exc.Conflict:
            LOG.debug("Role %s already assigned on the system for user %s",
                      role['id'], user['id'])


def get_creds_client(identity_client,
                     projects_client,
                     users_client,
                     roles_client,
                     domains_client=None,
                     project_domain_name=None):
    if isinstance(identity_client, v2_identity.IdentityClient):
        return V2CredsClient(identity_client, projects_client, users_client,
                             roles_client)
    else:
        return V3CredsClient(identity_client, projects_client, users_client,
                             roles_client, domains_client, project_domain_name)