summaryrefslogtreecommitdiff
path: root/cinder/scheduler/filter_scheduler.py
blob: be3974f38b69ad42a3a11d36c9e26fe6ce8f73bf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# Copyright (c) 2011 Intel Corporation
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""The FilterScheduler is for creating volumes.

You can customize this scheduler by specifying your own volume Filters and
Weighing Functions.
"""

from typing import List, Optional  # noqa: H301

from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils

from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder.scheduler import driver
from cinder.scheduler.host_manager import BackendState
from cinder.scheduler import scheduler_options
from cinder.scheduler.weights import WeighedHost
from cinder.volume import volume_utils

CONF = cfg.CONF
LOG = logging.getLogger(__name__)


class FilterScheduler(driver.Scheduler):
    """Scheduler that can be used for filtering and weighing."""
    def __init__(self, *args, **kwargs):
        super(FilterScheduler, self).__init__(*args, **kwargs)
        self.options = scheduler_options.SchedulerOptions()
        self.max_attempts = self._max_attempts()

    def _get_configuration_options(self) -> dict:
        """Fetch options dictionary. Broken out for testing."""
        return self.options.get_configuration()

    def populate_filter_properties(self,
                                   request_spec: dict,
                                   filter_properties: dict) -> None:
        """Stuff things into filter_properties.

        Can be overridden in a subclass to add more data.
        """
        vol = request_spec['volume_properties']
        filter_properties['size'] = vol['size']
        filter_properties['availability_zone'] = vol.get('availability_zone')
        filter_properties['user_id'] = vol.get('user_id')
        filter_properties['metadata'] = vol.get('metadata')
        filter_properties['qos_specs'] = vol.get('qos_specs')

    def schedule_create_group(self,
                              context: context.RequestContext,
                              group,
                              group_spec,
                              request_spec_list,
                              group_filter_properties,
                              filter_properties_list) -> None:
        weighed_backend = self._schedule_generic_group(
            context,
            group_spec,
            request_spec_list,
            group_filter_properties,
            filter_properties_list)

        if not weighed_backend:
            raise exception.NoValidBackend(reason=_("No weighed backends "
                                                    "available"))

        backend = weighed_backend.obj

        updated_group = driver.generic_group_update_db(context, group,
                                                       backend.host,
                                                       backend.cluster_name)

        self.volume_rpcapi.create_group(context, updated_group)

    def schedule_create_volume(self,
                               context: context.RequestContext,
                               request_spec: dict,
                               filter_properties: dict) -> None:
        backend = self._schedule(context, request_spec, filter_properties)

        if not backend:
            raise exception.NoValidBackend(reason=_("No weighed backends "
                                                    "available"))

        backend = backend.obj
        volume_id = request_spec['volume_id']

        updated_volume = driver.volume_update_db(
            context, volume_id,
            backend.host,
            backend.cluster_name,
            availability_zone=backend.service['availability_zone'])
        self._post_select_populate_filter_properties(filter_properties,
                                                     backend)

        # context is not serializable
        filter_properties.pop('context', None)

        self.volume_rpcapi.create_volume(context, updated_volume, request_spec,
                                         filter_properties,
                                         allow_reschedule=True)

    def backend_passes_filters(self,
                               context: context.RequestContext,
                               backend: str,
                               request_spec: dict,
                               filter_properties: dict):
        """Check if the specified backend passes the filters."""
        weighed_backends = self._get_weighted_candidates(context, request_spec,
                                                         filter_properties)
        # If backend has no pool defined we will ignore it in the comparison
        ignore_pool = not bool(volume_utils.extract_host(backend, 'pool'))
        for weighed_backend in weighed_backends:
            backend_id = weighed_backend.obj.backend_id
            if ignore_pool:
                backend_id = volume_utils.extract_host(backend_id)
            if backend_id == backend:
                return weighed_backend.obj

        reason_param = {'resource': 'volume',
                        'id': '??id missing??',
                        'backend': backend}
        for resource in ['volume', 'group', 'snapshot']:
            resource_id = request_spec.get('%s_id' % resource, None)
            if resource_id:
                reason_param.update({'resource': resource, 'id': resource_id})
                break
        raise exception.NoValidBackend(_('Cannot place %(resource)s %(id)s '
                                         'on %(backend)s.') % reason_param)

    def find_retype_backend(self,
                            context: context.RequestContext,
                            request_spec: dict,
                            filter_properties: Optional[dict] = None,
                            migration_policy: str = 'never') -> BackendState:
        """Find a backend that can accept the volume with its new type."""
        filter_properties = filter_properties or {}
        backend = (request_spec['volume_properties'].get('cluster_name')
                   or request_spec['volume_properties']['host'])

        # The volume already exists on this backend, and so we shouldn't check
        # if it can accept the volume again in the CapacityFilter.
        filter_properties['vol_exists_on'] = backend

        weighed_backends = self._get_weighted_candidates(context, request_spec,
                                                         filter_properties)
        if not weighed_backends:
            raise exception.NoValidBackend(
                reason=_('No valid backends for volume %(id)s with type '
                         '%(type)s') % {'id': request_spec['volume_id'],
                                        'type': request_spec['volume_type']})

        for weighed_backend in weighed_backends:
            backend_state = weighed_backend.obj
            if backend_state.backend_id == backend:
                return backend_state

        if volume_utils.extract_host(backend, 'pool') is None:
            # legacy volumes created before pool is introduced has no pool
            # info in host.  But host_state.host always include pool level
            # info. In this case if above exact match didn't work out, we
            # find host_state that are of the same host of volume being
            # retyped. In other words, for legacy volumes, retyping could
            # cause migration between pools on same host, which we consider
            # it is different from migration between hosts thus allow that
            # to happen even migration policy is 'never'.
            for weighed_backend in weighed_backends:
                backend_state = weighed_backend.obj
                new_backend = volume_utils.extract_host(
                    backend_state.backend_id,
                    'backend')
                if new_backend == backend:
                    return backend_state

        if migration_policy == 'never':
            raise exception.NoValidBackend(
                reason=_('Current backend not valid for volume %(id)s with '
                         'type %(type)s, migration not allowed') %
                {'id': request_spec['volume_id'],
                 'type': request_spec['volume_type']})

        top_backend = self._choose_top_backend(weighed_backends, request_spec)
        return top_backend.obj

    def get_pools(self, context: context.RequestContext, filters: dict):
        return self.host_manager.get_pools(context, filters)

    def _post_select_populate_filter_properties(
            self,
            filter_properties: dict,
            backend_state: BackendState) -> None:
        """Populate filter properties with additional information.

        Add additional information to the filter properties after a backend has
        been selected by the scheduling process.
        """
        # Add a retry entry for the selected volume backend:
        self._add_retry_backend(filter_properties, backend_state.backend_id)

    def _add_retry_backend(self, filter_properties: dict, backend) -> None:
        """Add a retry entry for the selected volume backend.

        In the event that the request gets re-scheduled, this entry will signal
        that the given backend has already been tried.
        """
        retry = filter_properties.get('retry', None)
        if not retry:
            return
        # TODO(geguileo): In P - change to only use backends
        for key in ('hosts', 'backends'):
            backends = retry.get(key)
            if backends is not None:
                backends.append(backend)

    def _max_attempts(self) -> int:
        max_attempts = CONF.scheduler_max_attempts
        if max_attempts < 1:
            raise exception.InvalidParameterValue(
                err=_("Invalid value for 'scheduler_max_attempts', "
                      "must be >=1"))
        return max_attempts

    def _log_volume_error(self, volume_id: str, retry: dict) -> None:
        """Log requests with exceptions from previous volume operations."""
        exc = retry.pop('exc', None)  # string-ified exception from volume
        if not exc:
            return  # no exception info from a previous attempt, skip

        # TODO(geguileo): In P - change to hosts = retry.get('backends')
        backends = retry.get('backends', retry.get('hosts'))
        if not backends:
            return  # no previously attempted hosts, skip

        last_backend = backends[-1]
        LOG.error("Error scheduling %(volume_id)s from last vol-service: "
                  "%(last_backend)s : %(exc)s",
                  {'volume_id': volume_id,
                   'last_backend': last_backend,
                   'exc': exc})

    def _populate_retry(self,
                        filter_properties: dict,
                        request_spec: dict) -> None:
        """Populate filter properties with history of retries for request.

        If maximum retries is exceeded, raise NoValidBackend.
        """
        max_attempts = self.max_attempts
        retry = filter_properties.pop('retry', {})

        if max_attempts == 1:
            # re-scheduling is disabled.
            return

        # retry is enabled, update attempt count:
        if retry:
            retry['num_attempts'] += 1
        else:
            retry = {
                'num_attempts': 1,
                'backends': [],  # list of volume service backends tried
                'hosts': []  # TODO(geguileo): Remove in P and leave backends
            }
        filter_properties['retry'] = retry

        resource_id = str(request_spec.get(
            'volume_id')) or str(request_spec.get("group_id"))
        self._log_volume_error(resource_id, retry)

        if retry['num_attempts'] > max_attempts:
            raise exception.NoValidBackend(
                reason=_("Exceeded max scheduling attempts %(max_attempts)d "
                         "for resource %(resource_id)s") %
                {'max_attempts': max_attempts,
                 'resource_id': resource_id})

    def _get_weighted_candidates(
            self,
            context: context.RequestContext,
            request_spec: dict,
            filter_properties: Optional[dict] = None) -> list:
        """Return a list of backends that meet required specs.

        Returned list is ordered by their fitness.
        """
        elevated = context.elevated()

        # Since Cinder is using mixed filters from Oslo and it's own, which
        # takes 'resource_XX' and 'volume_XX' as input respectively, copying
        # 'volume_XX' to 'resource_XX' will make both filters happy.
        volume_type = request_spec.get("volume_type")
        # When creating snapshots, the value of volume_type is None here
        # which causes issues in filters (Eg: Bug #1856126).
        # To prevent that, we set it as an empty dictionary here.
        if volume_type is None:
            volume_type = {}
        resource_type = volume_type

        config_options = self._get_configuration_options()

        if filter_properties is None:
            filter_properties = {}
        self._populate_retry(filter_properties,
                             request_spec)

        request_spec_dict = jsonutils.to_primitive(request_spec)

        filter_properties.update({'context': context,
                                  'request_spec': request_spec_dict,
                                  'config_options': config_options,
                                  'volume_type': volume_type,
                                  'resource_type': resource_type})

        self.populate_filter_properties(request_spec,
                                        filter_properties)

        # Revert volume consumed capacity if it's a rescheduled request
        retry = filter_properties.get('retry', {})
        if retry.get('backends', []):
            self.host_manager.revert_volume_consumed_capacity(
                retry['backends'][-1],
                request_spec['volume_properties']['size'])
        # Find our local list of acceptable backends by filtering and
        # weighing our options. we virtually consume resources on
        # it so subsequent selections can adjust accordingly.

        # Note: remember, we are using an iterator here. So only
        # traverse this list once.
        backends = self.host_manager.get_all_backend_states(elevated)

        # Filter local hosts based on requirements ...
        backends = self.host_manager.get_filtered_backends(backends,
                                                           filter_properties)
        if not backends:
            return []

        LOG.debug("Filtered %s", backends)
        # weighted_backends = WeightedHost() ... the best
        # backend for the job.
        weighed_backends = self.host_manager.get_weighed_backends(
            backends, filter_properties)
        return weighed_backends

    def _get_weighted_candidates_generic_group(
            self, context: context.RequestContext,
            group_spec: dict, request_spec_list: List[dict],
            group_filter_properties: Optional[dict] = None,
            filter_properties_list: Optional[List[dict]] = None) -> list:
        """Finds backends that supports the group.

        Returns a list of backends that meet the required specs,
        ordered by their fitness.
        """
        elevated = context.elevated()

        backends_by_group_type = self._get_weighted_candidates_by_group_type(
            context, group_spec, group_filter_properties)

        weighed_backends = []
        backends_by_vol_type = []
        index = 0
        for request_spec in request_spec_list:
            volume_properties = request_spec['volume_properties']
            # Since Cinder is using mixed filters from Oslo and it's own, which
            # takes 'resource_XX' and 'volume_XX' as input respectively,
            # copying 'volume_XX' to 'resource_XX' will make both filters
            # happy.
            resource_properties = volume_properties.copy()
            volume_type = request_spec.get("volume_type", None)
            resource_type = request_spec.get("volume_type", None)
            request_spec.update({'resource_properties': resource_properties})

            config_options = self._get_configuration_options()

            filter_properties = {}
            if filter_properties_list:
                filter_properties = filter_properties_list[index]
                if filter_properties is None:
                    filter_properties = {}
            self._populate_retry(filter_properties, request_spec)

            # Add group_support in extra_specs if it is not there.
            # Make sure it is populated in filter_properties
            # if 'group_support' not in resource_type.get(
            #         'extra_specs', {}):
            #     resource_type['extra_specs'].update(
            #         group_support='<is> True')

            filter_properties.update({'context': context,
                                      'request_spec': request_spec,
                                      'config_options': config_options,
                                      'volume_type': volume_type,
                                      'resource_type': resource_type})

            self.populate_filter_properties(request_spec,
                                            filter_properties)

            # Find our local list of acceptable backends by filtering and
            # weighing our options. we virtually consume resources on
            # it so subsequent selections can adjust accordingly.

            # Note: remember, we are using an iterator here. So only
            # traverse this list once.
            all_backends = self.host_manager.get_all_backend_states(elevated)
            if not all_backends:
                return []

            # Filter local backends based on requirements ...
            backends = self.host_manager.get_filtered_backends(
                all_backends, filter_properties)

            if not backends:
                return []

            LOG.debug("Filtered %s", backends)

            # weighted_backend = WeightedHost() ... the best
            # backend for the job.
            temp_weighed_backends = self.host_manager.get_weighed_backends(
                backends,
                filter_properties)
            if not temp_weighed_backends:
                return []
            if index == 0:
                backends_by_vol_type = temp_weighed_backends
            else:
                backends_by_vol_type = self._find_valid_backends(
                    backends_by_vol_type, temp_weighed_backends)
                if not backends_by_vol_type:
                    return []

            index += 1

        # Find backends selected by both the group type and volume types.
        weighed_backends = self._find_valid_backends(backends_by_vol_type,
                                                     backends_by_group_type)

        return weighed_backends

    def _find_valid_backends(self,
                             backend_list1: list, backend_list2: list) -> list:
        new_backends = []
        for backend1 in backend_list1:
            for backend2 in backend_list2:
                # Should schedule creation of group on backend level,
                # not pool level.
                if (volume_utils.extract_host(backend1.obj.backend_id) ==
                        volume_utils.extract_host(backend2.obj.backend_id)):
                    new_backends.append(backend1)
        if not new_backends:
            return []
        return new_backends

    def _get_weighted_candidates_by_group_type(
            self, context: context.RequestContext, group_spec: dict,
            group_filter_properties: dict = None) -> List[WeighedHost]:
        """Finds backends that supports the group type.

        Returns a list of backends that meet the required specs,
        ordered by their fitness.
        """
        elevated = context.elevated()

        weighed_backends = []
        volume_properties = group_spec['volume_properties']
        # Since Cinder is using mixed filters from Oslo and it's own, which
        # takes 'resource_XX' and 'volume_XX' as input respectively,
        # copying 'volume_XX' to 'resource_XX' will make both filters
        # happy.
        resource_properties = volume_properties.copy()
        group_type = group_spec.get("group_type", None)
        resource_type = group_spec.get("group_type", None)
        group_spec.update({'resource_properties': resource_properties})

        config_options = self._get_configuration_options()

        if group_filter_properties is None:
            group_filter_properties = {}
        self._populate_retry(group_filter_properties, resource_properties)

        group_filter_properties.update({'context': context,
                                        'request_spec': group_spec,
                                        'config_options': config_options,
                                        'group_type': group_type,
                                        'resource_type': resource_type})

        self.populate_filter_properties(group_spec,
                                        group_filter_properties)

        # Find our local list of acceptable backends by filtering and
        # weighing our options. we virtually consume resources on
        # it so subsequent selections can adjust accordingly.

        # Note: remember, we are using an iterator here. So only
        # traverse this list once.
        all_backends = self.host_manager.get_all_backend_states(elevated)
        if not all_backends:
            return []

        # Filter local backends based on requirements ...
        backends = self.host_manager.get_filtered_backends(
            all_backends, group_filter_properties)

        if not backends:
            return []

        LOG.debug("Filtered %s", backends)

        # weighted_backends = WeightedHost() ... the best backend for the job.
        weighed_backends = self.host_manager.get_weighed_backends(
            backends,
            group_filter_properties)
        if not weighed_backends:
            return []

        return weighed_backends

    def _schedule(self,
                  context: context.RequestContext,
                  request_spec: dict,
                  filter_properties: Optional[dict] = None):
        weighed_backends = self._get_weighted_candidates(context, request_spec,
                                                         filter_properties)
        # When we get the weighed_backends, we clear those backends that don't
        # match the resource's backend (it could be assigned from group,
        # snapshot or volume).
        resource_backend = request_spec.get('resource_backend')
        if weighed_backends and resource_backend:
            resource_backend_has_pool = bool(volume_utils.extract_host(
                resource_backend, 'pool'))
            # Get host name including host@backend#pool info from
            # weighed_backends.
            for backend in weighed_backends[::-1]:
                backend_id = (
                    backend.obj.backend_id if resource_backend_has_pool
                    else volume_utils.extract_host(backend.obj.backend_id)
                )
                if backend_id != resource_backend:
                    weighed_backends.remove(backend)
        if not weighed_backends:
            assert filter_properties is not None
            LOG.warning('No weighed backend found for volume '
                        'with properties: %s',
                        filter_properties['request_spec'].get('volume_type'))
            return None
        return self._choose_top_backend(weighed_backends, request_spec)

    def _schedule_generic_group(
            self,
            context: context.RequestContext,
            group_spec: dict,
            request_spec_list: list,
            group_filter_properties: Optional[dict] = None,
            filter_properties_list: Optional[list] = None) \
            -> Optional[WeighedHost]:

        weighed_backends = self._get_weighted_candidates_generic_group(
            context,
            group_spec,
            request_spec_list,
            group_filter_properties,
            filter_properties_list)
        if not weighed_backends:
            return None
        return self._choose_top_backend_generic_group(weighed_backends)

    def _choose_top_backend(self,
                            weighed_backends: List[WeighedHost],
                            request_spec: dict):
        top_backend = weighed_backends[0]
        backend_state = top_backend.obj
        LOG.debug("Choosing %s", backend_state.backend_id)
        volume_properties = request_spec['volume_properties']
        backend_state.consume_from_volume(volume_properties)
        return top_backend

    def _choose_top_backend_generic_group(
            self,
            weighed_backends: List[WeighedHost]) -> WeighedHost:
        top_backend = weighed_backends[0]
        backend_state = top_backend.obj
        LOG.debug("Choosing %s", backend_state.backend_id)
        return top_backend

    def get_backup_host(self, volume: objects.Volume, driver=None):
        return self.host_manager.get_backup_host(volume, driver)