summaryrefslogtreecommitdiff
path: root/swift/obj/expirer.py
blob: 6f506b1d0f45282775827a517ba17c3c2587c5a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import six

from random import random
from time import time
from os.path import join
from swift import gettext_ as _
from collections import defaultdict, deque
import hashlib

from eventlet import sleep, Timeout
from eventlet.greenpool import GreenPool

from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.daemon import Daemon
from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
    Timestamp, config_true_value, normalize_delete_at_timestamp, \
    RateLimitedIterator
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
    HTTP_PRECONDITION_FAILED
from swift.common.swob import wsgi_quote, str_to_wsgi

from swift.container.reconciler import direct_delete_container_entry

MAX_OBJECTS_TO_CACHE = 100000
ASYNC_DELETE_TYPE = 'application/async-deleted'


def build_task_obj(timestamp, target_account, target_container,
                   target_obj, high_precision=False):
    """
    :return: a task object name in format of
             "<timestamp>-<target_account>/<target_container>/<target_obj>"
    """
    timestamp = Timestamp(timestamp)
    return '%s-%s/%s/%s' % (
        normalize_delete_at_timestamp(timestamp, high_precision),
        target_account, target_container, target_obj)


def parse_task_obj(task_obj):
    """
    :param task_obj: a task object name in format of
                     "<timestamp>-<target_account>/<target_container>" +
                     "/<target_obj>"
    :return: 4-tuples of (delete_at_time, target_account, target_container,
             target_obj)
    """
    timestamp, target_path = task_obj.split('-', 1)
    timestamp = Timestamp(timestamp)
    target_account, target_container, target_obj = \
        split_path('/' + target_path, 3, 3, True)
    return timestamp, target_account, target_container, target_obj


class ObjectExpirer(Daemon):
    """
    Daemon that queries the internal hidden task accounts to discover objects
    that need to be deleted.

    :param conf: The daemon configuration.
    """

    def __init__(self, conf, logger=None, swift=None):
        self.conf = conf
        self.logger = logger or get_logger(conf, log_route='object-expirer')
        self.interval = int(conf.get('interval') or 300)
        self.tasks_per_second = float(conf.get('tasks_per_second', 50.0))

        self.conf_path = \
            self.conf.get('__file__') or '/etc/swift/object-expirer.conf'
        # True, if the conf file is 'object-expirer.conf'.
        is_legacy_conf = 'expirer' in self.conf_path
        # object-expirer.conf supports only legacy queue
        self.dequeue_from_legacy = \
            True if is_legacy_conf else \
            config_true_value(conf.get('dequeue_from_legacy', 'false'))

        if is_legacy_conf:
            self.ic_conf_path = self.conf_path
        else:
            self.ic_conf_path = \
                self.conf.get('internal_client_conf_path') or \
                '/etc/swift/internal-client.conf'

        self.read_conf_for_queue_access(swift)

        self.report_interval = int(conf.get('report_interval') or 300)
        self.report_first_time = self.report_last_time = time()
        self.report_objects = 0
        self.recon_cache_path = conf.get('recon_cache_path',
                                         '/var/cache/swift')
        self.rcache = join(self.recon_cache_path, 'object.recon')
        self.concurrency = int(conf.get('concurrency', 1))
        if self.concurrency < 1:
            raise ValueError("concurrency must be set to at least 1")
        # This option defines how long an un-processable expired object
        # marker will be retried before it is abandoned.  It is not coupled
        # with the tombstone reclaim age in the consistency engine.
        self.reclaim_age = int(conf.get('reclaim_age', 604800))

    def read_conf_for_queue_access(self, swift):
        if self.conf.get('auto_create_account_prefix'):
            self.logger.warning('Option auto_create_account_prefix is '
                                'deprecated. Configure '
                                'auto_create_account_prefix under the '
                                'swift-constraints section of '
                                'swift.conf. This option will '
                                'be ignored in a future release.')
            auto_create_account_prefix = \
                self.conf['auto_create_account_prefix']
        else:
            auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX

        self.expiring_objects_account = auto_create_account_prefix + \
            (self.conf.get('expiring_objects_account_name') or
             'expiring_objects')

        # This is for common parameter with general task queue in future
        self.task_container_prefix = ''

        request_tries = int(self.conf.get('request_tries') or 3)
        self.swift = swift or InternalClient(
            self.ic_conf_path, 'Swift Object Expirer', request_tries,
            use_replication_network=True)

        self.processes = int(self.conf.get('processes', 0))
        self.process = int(self.conf.get('process', 0))

    def report(self, final=False):
        """
        Emits a log line report of the progress so far, or the final progress
        is final=True.

        :param final: Set to True for the last report once the expiration pass
                      has completed.
        """
        if final:
            elapsed = time() - self.report_first_time
            self.logger.info(_('Pass completed in %(time)ds; '
                               '%(objects)d objects expired') % {
                             'time': elapsed, 'objects': self.report_objects})
            dump_recon_cache({'object_expiration_pass': elapsed,
                              'expired_last_pass': self.report_objects},
                             self.rcache, self.logger)
        elif time() - self.report_last_time >= self.report_interval:
            elapsed = time() - self.report_first_time
            self.logger.info(_('Pass so far %(time)ds; '
                               '%(objects)d objects expired') % {
                             'time': elapsed, 'objects': self.report_objects})
            self.report_last_time = time()

    def parse_task_obj(self, task_obj):
        return parse_task_obj(task_obj)

    def round_robin_order(self, task_iter):
        """
        Change order of expiration tasks to avoid deleting objects in a
        certain container continuously.

        :param task_iter: An iterator of delete-task dicts, which should each
            have a ``target_path`` key.
        """
        obj_cache = defaultdict(deque)
        cnt = 0

        def dump_obj_cache_in_round_robin():
            while obj_cache:
                for key in sorted(obj_cache):
                    if obj_cache[key]:
                        yield obj_cache[key].popleft()
                    else:
                        del obj_cache[key]

        for delete_task in task_iter:
            try:
                target_account, target_container, _junk = \
                    split_path('/' + delete_task['target_path'], 3, 3, True)
                cache_key = '%s/%s' % (target_account, target_container)
            # sanity
            except ValueError:
                self.logger.error('Unexcepted error handling task %r' %
                                  delete_task)
                continue

            obj_cache[cache_key].append(delete_task)
            cnt += 1

            if cnt > MAX_OBJECTS_TO_CACHE:
                for task in dump_obj_cache_in_round_robin():
                    yield task
                cnt = 0

        for task in dump_obj_cache_in_round_robin():
            yield task

    def hash_mod(self, name, divisor):
        """
        :param name: a task object name
        :param divisor: a divisor number
        :return: an integer to decide which expirer is assigned to the task
        """
        if not isinstance(name, bytes):
            name = name.encode('utf8')
        # md5 is only used for shuffling mod
        return int(hashlib.md5(name).hexdigest(), 16) % divisor

    def iter_task_accounts_to_expire(self):
        """
        Yields (task_account, my_index, divisor).
        my_index and divisor is used to assign task obj to only one
        expirer. In expirer method, expirer calculates assigned index for each
        expiration task. The assigned index is in [0, 1, ..., divisor - 1].
        Expirers have their own "my_index" for each task_account. Expirer whose
        "my_index" is equal to the assigned index executes the task. Because
        each expirer have different "my_index", task objects are executed by
        only one expirer.
        """
        if self.processes > 0:
            yield self.expiring_objects_account, self.process, self.processes
        else:
            yield self.expiring_objects_account, 0, 1

    def delete_at_time_of_task_container(self, task_container):
        """
        get delete_at timestamp from task_container name
        """
        # task_container name is timestamp
        return Timestamp(task_container)

    def iter_task_containers_to_expire(self, task_account):
        """
        Yields task_container names under the task_account if the delete at
        timestamp of task_container is past.
        """
        for c in self.swift.iter_containers(task_account,
                                            prefix=self.task_container_prefix):
            task_container = str(c['name'])
            timestamp = self.delete_at_time_of_task_container(task_container)
            if timestamp > Timestamp.now():
                break
            yield task_container

    def iter_task_to_expire(self, task_account_container_list,
                            my_index, divisor):
        """
        Yields task expire info dict which consists of task_account,
        task_container, task_object, timestamp_to_delete, and target_path
        """
        for task_account, task_container in task_account_container_list:
            for o in self.swift.iter_objects(task_account, task_container):
                if six.PY2:
                    task_object = o['name'].encode('utf8')
                else:
                    task_object = o['name']
                try:
                    delete_timestamp, target_account, target_container, \
                        target_object = parse_task_obj(task_object)
                except ValueError:
                    self.logger.exception('Unexcepted error handling task %r' %
                                          task_object)
                    continue
                if delete_timestamp > Timestamp.now():
                    # we shouldn't yield the object that doesn't reach
                    # the expiration date yet.
                    break

                # Only one expirer daemon assigned for one task
                if self.hash_mod('%s/%s' % (task_container, task_object),
                                 divisor) != my_index:
                    continue

                is_async = o.get('content_type') == ASYNC_DELETE_TYPE
                yield {'task_account': task_account,
                       'task_container': task_container,
                       'task_object': task_object,
                       'target_path': '/'.join([
                           target_account, target_container, target_object]),
                       'delete_timestamp': delete_timestamp,
                       'is_async_delete': is_async}

    def run_once(self, *args, **kwargs):
        """
        Executes a single pass, looking for objects to expire.

        :param args: Extra args to fulfill the Daemon interface; this daemon
                     has no additional args.
        :param kwargs: Extra keyword args to fulfill the Daemon interface; this
                       daemon accepts processes and process keyword args.
                       These will override the values from the config file if
                       provided.
        """
        # This if-clause will be removed when general task queue feature is
        # implemented.
        if not self.dequeue_from_legacy:
            self.logger.info('This node is not configured to dequeue tasks '
                             'from the legacy queue.  This node will '
                             'not process any expiration tasks.  At least '
                             'one node in your cluster must be configured '
                             'with dequeue_from_legacy == true.')
            return

        self.get_process_values(kwargs)
        pool = GreenPool(self.concurrency)
        self.report_first_time = self.report_last_time = time()
        self.report_objects = 0
        try:
            self.logger.debug('Run begin')
            task_account_container_list_to_delete = list()
            for task_account, my_index, divisor in \
                    self.iter_task_accounts_to_expire():
                container_count, obj_count = \
                    self.swift.get_account_info(task_account)

                # the task account is skipped if there are no task container
                if not container_count:
                    continue

                self.logger.info(_(
                    'Pass beginning for task account %(account)s; '
                    '%(container_count)s possible containers; '
                    '%(obj_count)s possible objects') % {
                    'account': task_account,
                    'container_count': container_count,
                    'obj_count': obj_count})

                task_account_container_list = \
                    [(task_account, task_container) for task_container in
                     self.iter_task_containers_to_expire(task_account)]

                task_account_container_list_to_delete.extend(
                    task_account_container_list)

                # delete_task_iter is a generator to yield a dict of
                # task_account, task_container, task_object, delete_timestamp,
                # target_path to handle delete actual object and pop the task
                # from the queue.
                delete_task_iter = \
                    self.round_robin_order(self.iter_task_to_expire(
                        task_account_container_list, my_index, divisor))
                rate_limited_iter = RateLimitedIterator(
                    delete_task_iter,
                    elements_per_second=self.tasks_per_second)
                for delete_task in rate_limited_iter:
                    pool.spawn_n(self.delete_object, **delete_task)

            pool.waitall()
            for task_account, task_container in \
                    task_account_container_list_to_delete:
                try:
                    self.swift.delete_container(
                        task_account, task_container,
                        acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
                except (Exception, Timeout) as err:
                    self.logger.exception(
                        _('Exception while deleting container %(account)s '
                          '%(container)s %(err)s') % {
                              'account': task_account,
                              'container': task_container, 'err': str(err)})
            self.logger.debug('Run end')
            self.report(final=True)
        except (Exception, Timeout):
            self.logger.exception(_('Unhandled exception'))

    def run_forever(self, *args, **kwargs):
        """
        Executes passes forever, looking for objects to expire.

        :param args: Extra args to fulfill the Daemon interface; this daemon
                     has no additional args.
        :param kwargs: Extra keyword args to fulfill the Daemon interface; this
                       daemon has no additional keyword args.
        """
        sleep(random() * self.interval)
        while True:
            begin = time()
            try:
                self.run_once(*args, **kwargs)
            except (Exception, Timeout):
                self.logger.exception(_('Unhandled exception'))
            elapsed = time() - begin
            if elapsed < self.interval:
                sleep(random() * (self.interval - elapsed))

    def get_process_values(self, kwargs):
        """
        Sets self.processes and self.process from the kwargs if those
        values exist, otherwise, leaves those values as they were set in
        the config file.

        :param kwargs: Keyword args passed into the run_forever(), run_once()
                       methods.  They have values specified on the command
                       line when the daemon is run.
        """
        if kwargs.get('processes') is not None:
            self.processes = int(kwargs['processes'])

        if kwargs.get('process') is not None:
            self.process = int(kwargs['process'])

        if self.process < 0:
            raise ValueError(
                'process must be an integer greater than or equal to 0')

        if self.processes < 0:
            raise ValueError(
                'processes must be an integer greater than or equal to 0')

        if self.processes and self.process >= self.processes:
            raise ValueError(
                'process must be less than processes')

    def delete_object(self, target_path, delete_timestamp,
                      task_account, task_container, task_object,
                      is_async_delete):
        start_time = time()
        try:
            try:
                self.delete_actual_object(target_path, delete_timestamp,
                                          is_async_delete)
            except UnexpectedResponse as err:
                if err.resp.status_int not in {HTTP_NOT_FOUND,
                                               HTTP_PRECONDITION_FAILED}:
                    raise
                if float(delete_timestamp) > time() - self.reclaim_age:
                    # we'll have to retry the DELETE later
                    raise
            self.pop_queue(task_account, task_container, task_object)
            self.report_objects += 1
            self.logger.increment('objects')
        except UnexpectedResponse as err:
            self.logger.increment('errors')
            self.logger.error(
                'Unexpected response while deleting object '
                '%(account)s %(container)s %(obj)s: %(err)s' % {
                    'account': task_account, 'container': task_container,
                    'obj': task_object, 'err': str(err.resp.status_int)})
            self.logger.debug(err.resp.body)
        except (Exception, Timeout) as err:
            self.logger.increment('errors')
            self.logger.exception(
                'Exception while deleting object %(account)s %(container)s '
                '%(obj)s %(err)s' % {
                    'account': task_account, 'container': task_container,
                    'obj': task_object, 'err': str(err)})
        self.logger.timing_since('timing', start_time)
        self.report()

    def pop_queue(self, task_account, task_container, task_object):
        """
        Issue a delete object request to the task_container for the expiring
        object queue entry.
        """
        direct_delete_container_entry(self.swift.container_ring, task_account,
                                      task_container, task_object)

    def delete_actual_object(self, actual_obj, timestamp, is_async_delete):
        """
        Deletes the end-user object indicated by the actual object name given
        '<account>/<container>/<object>' if and only if the X-Delete-At value
        of the object is exactly the timestamp given.

        :param actual_obj: The name of the end-user object to delete:
                           '<account>/<container>/<object>'
        :param timestamp: The swift.common.utils.Timestamp instance the
                          X-Delete-At value must match to perform the actual
                          delete.
        :param is_async_delete: False if the object should be deleted because
                                of "normal" expiration, or True if it should
                                be async-deleted.
        :raises UnexpectedResponse: if the delete was unsuccessful and
                                    should be retried later
        """
        path = '/v1/' + wsgi_quote(str_to_wsgi(actual_obj.lstrip('/')))
        if is_async_delete:
            headers = {'X-Timestamp': timestamp.normal}
            acceptable_statuses = (2, HTTP_CONFLICT, HTTP_NOT_FOUND)
        else:
            headers = {'X-Timestamp': timestamp.normal,
                       'X-If-Delete-At': timestamp.normal,
                       'X-Backend-Clean-Expiring-Object-Queue': 'no'}
            acceptable_statuses = (2, HTTP_CONFLICT)
        self.swift.make_request('DELETE', path, headers, acceptable_statuses)