summaryrefslogtreecommitdiff
path: root/swift/obj/auditor.py
blob: e0d95bb4d934fbe232359b18ceab7286b5239978 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import sys
import time
import signal
from os.path import basename, dirname, join
from random import shuffle
from contextlib import closing
from eventlet import Timeout

from swift.obj import diskfile, replicator
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
    DiskFileDeleted, DiskFileExpired, QuarantineRequest
from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
from swift.common.utils import (
    config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
    listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
    readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH


class AuditorWorker(object):
    """Walk through file system to audit objects"""

    def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0,
                 watcher_defs=None):
        if watcher_defs is None:
            watcher_defs = {}
        self.conf = conf
        self.logger = logger
        self.devices = devices
        self.max_files_per_second = float(conf.get('files_per_second', 20))
        self.max_bytes_per_second = float(conf.get('bytes_per_second',
                                                   10000000))
        try:
            # ideally unless ops overrides the rsync_tempfile_timeout in the
            # auditor section we can base our behavior on whatever they
            # configure for their replicator
            replicator_config = readconf(self.conf['__file__'],
                                         'object-replicator')
        except (KeyError, ValueError, IOError):
            # if we can't parse the real config (generally a KeyError on
            # __file__, or ValueError on no object-replicator section, or
            # IOError if reading the file failed) we use
            # a very conservative default for rsync_timeout
            default_rsync_timeout = 86400
        else:
            replicator_rsync_timeout = int(replicator_config.get(
                'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT))
            # Here we can do some light math for ops and use the *replicator's*
            # rsync_timeout (plus 15 mins to avoid deleting local tempfiles
            # before the remote replicator kills it's rsync)
            default_rsync_timeout = replicator_rsync_timeout + 900
            # there's not really a good reason to assume the replicator
            # section's reclaim_age is more appropriate than the reconstructor
            # reclaim_age - but we're already parsing the config so we can set
            # the default value in our config if it's not already set
            if 'reclaim_age' in replicator_config:
                conf.setdefault('reclaim_age',
                                replicator_config['reclaim_age'])
        self.rsync_tempfile_timeout = config_auto_int_value(
            self.conf.get('rsync_tempfile_timeout'), default_rsync_timeout)
        self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger)

        self.auditor_type = 'ALL'
        self.zero_byte_only_at_fps = zero_byte_only_at_fps
        if self.zero_byte_only_at_fps:
            self.max_files_per_second = float(self.zero_byte_only_at_fps)
            self.auditor_type = 'ZBF'
        self.log_time = int(conf.get('log_time', 3600))
        self.last_logged = 0
        self.files_running_time = 0
        self.bytes_running_time = 0
        self.bytes_processed = 0
        self.total_bytes_processed = 0
        self.total_files_processed = 0
        self.passes = 0
        self.quarantines = 0
        self.errors = 0
        self.rcache = rcache
        self.stats_sizes = sorted(
            [int(s) for s in list_from_csv(conf.get('object_size_stats'))])
        self.stats_buckets = dict(
            [(s, 0) for s in self.stats_sizes + ['OVER']])

        self.watchers = [
            WatcherWrapper(wdef['klass'], name, wdef['conf'], logger)
            for name, wdef in watcher_defs.items()]
        logger.debug("%d audit watcher(s) loaded", len(self.watchers))

    def create_recon_nested_dict(self, top_level_key, device_list, item):
        if device_list:
            device_key = ''.join(sorted(device_list))
            return {top_level_key: {device_key: item}}
        else:
            return {top_level_key: item}

    def audit_all_objects(self, mode='once', device_dirs=None):
        description = ''
        if device_dirs:
            device_dir_str = ','.join(sorted(device_dirs))
            if self.auditor_type == 'ALL':
                description = ' - parallel, %s' % device_dir_str
            else:
                description = ' - %s' % device_dir_str
        self.logger.info('Begin object audit "%(mode)s" mode (%(audi_type)s'
                         '%(description)s)',
                         {'mode': mode, 'audi_type': self.auditor_type,
                          'description': description})
        for watcher in self.watchers:
            watcher.start(self.auditor_type)
        begin = reported = time.time()
        self.total_bytes_processed = 0
        self.total_files_processed = 0
        total_quarantines = 0
        total_errors = 0
        time_auditing = 0

        # get AuditLocations for each policy
        loc_generators = []
        for policy in POLICIES:
            loc_generators.append(
                self.diskfile_router[policy]
                    .object_audit_location_generator(
                        policy, device_dirs=device_dirs,
                        auditor_type=self.auditor_type))

        all_locs = round_robin_iter(loc_generators)
        for location in all_locs:
            loop_time = time.time()
            self.failsafe_object_audit(location)
            self.logger.timing_since('timing', loop_time)
            self.files_running_time = ratelimit_sleep(
                self.files_running_time, self.max_files_per_second)
            self.total_files_processed += 1
            now = time.time()
            if now - self.last_logged >= self.log_time:
                self.logger.info(
                    'Object audit (%(type)s). '
                    'Since %(start_time)s: Locally: %(passes)d passed, '
                    '%(quars)d quarantined, %(errors)d errors, '
                    'files/sec: %(frate).2f, bytes/sec: %(brate).2f, '
                    'Total time: %(total).2f, Auditing time: %(audit).2f, '
                    'Rate: %(audit_rate).2f', {
                        'type': '%s%s' % (self.auditor_type, description),
                        'start_time': time.ctime(reported),
                        'passes': self.passes, 'quars': self.quarantines,
                        'errors': self.errors,
                        'frate': self.passes / (now - reported),
                        'brate': self.bytes_processed / (now - reported),
                        'total': (now - begin), 'audit': time_auditing,
                        'audit_rate': time_auditing / (now - begin)})
                cache_entry = self.create_recon_nested_dict(
                    'object_auditor_stats_%s' % (self.auditor_type),
                    device_dirs,
                    {'errors': self.errors, 'passes': self.passes,
                     'quarantined': self.quarantines,
                     'bytes_processed': self.bytes_processed,
                     'start_time': reported, 'audit_time': time_auditing})
                dump_recon_cache(cache_entry, self.rcache, self.logger)
                reported = now
                total_quarantines += self.quarantines
                total_errors += self.errors
                self.passes = 0
                self.quarantines = 0
                self.errors = 0
                self.bytes_processed = 0
                self.last_logged = now
            time_auditing += (now - loop_time)
        # Avoid divide by zero during very short runs
        elapsed = (time.time() - begin) or 0.000001
        self.logger.info(
            'Object audit (%(type)s) "%(mode)s" mode '
            'completed: %(elapsed).02fs. Total quarantined: %(quars)d, '
            'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
            'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
            'Rate: %(audit_rate).2f', {
                'type': '%s%s' % (self.auditor_type, description),
                'mode': mode, 'elapsed': elapsed,
                'quars': total_quarantines + self.quarantines,
                'errors': total_errors + self.errors,
                'frate': self.total_files_processed / elapsed,
                'brate': self.total_bytes_processed / elapsed,
                'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
        for watcher in self.watchers:
            watcher.end()
        if self.stats_sizes:
            self.logger.info(
                'Object audit stats: %s', json.dumps(self.stats_buckets))

        for policy in POLICIES:
            # Unset remaining partitions to not skip them in the next run
            self.diskfile_router[policy].clear_auditor_status(
                policy,
                self.auditor_type)

    def record_stats(self, obj_size):
        """
        Based on config's object_size_stats will keep track of how many objects
        fall into the specified ranges. For example with the following:

        object_size_stats = 10, 100, 1024

        and your system has 3 objects of sizes: 5, 20, and 10000 bytes the log
        will look like: {"10": 1, "100": 1, "1024": 0, "OVER": 1}
        """
        for size in self.stats_sizes:
            if obj_size <= size:
                self.stats_buckets[size] += 1
                break
        else:
            self.stats_buckets["OVER"] += 1

    def failsafe_object_audit(self, location):
        """
        Entrypoint to object_audit, with a failsafe generic exception handler.
        """
        try:
            self.object_audit(location)
        except (Exception, Timeout):
            self.logger.increment('errors')
            self.errors += 1
            self.logger.exception('ERROR Trying to audit %s', location)

    def object_audit(self, location):
        """
        Audits the given object location.

        :param location: an audit location
                         (from diskfile.object_audit_location_generator)
        """
        def raise_dfq(msg):
            raise DiskFileQuarantined(msg)

        diskfile_mgr = self.diskfile_router[location.policy]
        # this method doesn't normally raise errors, even if the audit
        # location does not exist; if this raises an unexpected error it
        # will get logged in failsafe
        df = diskfile_mgr.get_diskfile_from_audit_location(location)
        reader = None
        try:
            with df.open(modernize=True):
                metadata = df.get_metadata()
                obj_size = int(metadata['Content-Length'])
                if self.stats_sizes:
                    self.record_stats(obj_size)
                if obj_size and not self.zero_byte_only_at_fps:
                    reader = df.reader(_quarantine_hook=raise_dfq)
            if reader:
                with closing(reader):
                    for chunk in reader:
                        chunk_len = len(chunk)
                        self.bytes_running_time = ratelimit_sleep(
                            self.bytes_running_time,
                            self.max_bytes_per_second,
                            incr_by=chunk_len)
                        self.bytes_processed += chunk_len
                        self.total_bytes_processed += chunk_len
            for watcher in self.watchers:
                try:
                    watcher.see_object(
                        metadata,
                        df._ondisk_info['data_file'])
                except QuarantineRequest:
                    raise df._quarantine(
                        df._data_file,
                        "Requested by %s" % watcher.watcher_name)
        except DiskFileQuarantined as err:
            self.quarantines += 1
            self.logger.error('ERROR Object %(obj)s failed audit and was'
                              ' quarantined: %(err)s',
                              {'obj': location, 'err': err})
        except DiskFileExpired:
            pass  # ignore expired objects
        except DiskFileDeleted:
            # If there is a reclaimable tombstone, we'll invalidate the hash
            # to trigger the replicator to rehash/cleanup this suffix
            ts = df._ondisk_info['ts_info']['timestamp']
            if (not self.zero_byte_only_at_fps and
                    (time.time() - float(ts)) > df.manager.reclaim_age):
                df.manager.invalidate_hash(dirname(df._datadir))
        except DiskFileNotExist:
            pass

        self.passes += 1
        # _ondisk_info attr is initialized to None and filled in by open
        ondisk_info_dict = df._ondisk_info or {}
        if 'unexpected' in ondisk_info_dict:
            is_rsync_tempfile = lambda fpath: (
                diskfile.RE_RSYNC_TEMPFILE.match(basename(fpath)))
            rsync_tempfile_paths = filter(is_rsync_tempfile,
                                          ondisk_info_dict['unexpected'])
            mtime = time.time() - self.rsync_tempfile_timeout
            unlink_paths_older_than(rsync_tempfile_paths, mtime)


class ObjectAuditor(Daemon):
    """Audit objects."""

    def __init__(self, conf, logger=None, **options):
        self.conf = conf
        self.logger = logger or get_logger(conf, log_route='object-auditor')
        self.devices = conf.get('devices', '/srv/node')
        self.concurrency = int(conf.get('concurrency', 1))
        self.conf_zero_byte_fps = int(
            conf.get('zero_byte_files_per_second', 50))
        self.recon_cache_path = conf.get('recon_cache_path',
                                         DEFAULT_RECON_CACHE_PATH)
        self.rcache = join(self.recon_cache_path, RECON_OBJECT_FILE)
        self.interval = float(conf.get('interval', 30))

        watcher_names = set(list_from_csv(conf.get('watchers', '')))
        # Normally '__file__' is always in config, but tests neglect it often.
        watcher_configs = \
            parse_prefixed_conf(conf['__file__'], 'object-auditor:watcher:') \
            if '__file__' in conf else {}
        self.watcher_defs = {}
        for name in watcher_names:
            self.logger.debug("Loading entry point '%s'", name)
            wconf = dict(conf)
            wconf.update(watcher_configs.get(name, {}))
            self.watcher_defs[name] = {
                'conf': wconf,
                'klass': load_pkg_resource("swift.object_audit_watcher", name)}

    def _sleep(self):
        time.sleep(self.interval)

    def clear_recon_cache(self, auditor_type):
        """Clear recon cache entries"""
        dump_recon_cache({'object_auditor_stats_%s' % auditor_type: {}},
                         self.rcache, self.logger)

    def run_audit(self, **kwargs):
        """Run the object audit"""
        mode = kwargs.get('mode')
        zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
        device_dirs = kwargs.get('device_dirs')
        worker = AuditorWorker(self.conf, self.logger, self.rcache,
                               self.devices,
                               zero_byte_only_at_fps=zero_byte_only_at_fps,
                               watcher_defs=self.watcher_defs)
        worker.audit_all_objects(mode=mode, device_dirs=device_dirs)

    def fork_child(self, zero_byte_fps=False, sleep_between_zbf_scanner=False,
                   **kwargs):
        """Child execution"""
        pid = os.fork()
        if pid:
            return pid
        else:
            signal.signal(signal.SIGTERM, signal.SIG_DFL)
            if zero_byte_fps:
                kwargs['zero_byte_fps'] = self.conf_zero_byte_fps
                if sleep_between_zbf_scanner:
                    self._sleep()
            try:
                self.run_audit(**kwargs)
            except Exception as e:
                self.logger.exception(
                    "ERROR: Unable to run auditing: %s", e)
            finally:
                sys.exit()

    def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
        """Parallel audit loop"""
        self.clear_recon_cache('ALL')
        self.clear_recon_cache('ZBF')
        once = kwargs.get('mode') == 'once'
        kwargs['device_dirs'] = override_devices
        if parent:
            kwargs['zero_byte_fps'] = zbo_fps
            self.run_audit(**kwargs)
        else:
            pids = set()
            if self.conf_zero_byte_fps:
                zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
                pids.add(zbf_pid)
            if self.concurrency == 1:
                # Audit all devices in 1 process
                pids.add(self.fork_child(**kwargs))
            else:
                # Divide devices amongst parallel processes set by
                # self.concurrency.  Total number of parallel processes
                # is self.concurrency + 1 if zero_byte_fps.
                parallel_proc = self.concurrency + 1 if \
                    self.conf_zero_byte_fps else self.concurrency
                device_list = list(override_devices) if override_devices else \
                    listdir(self.devices)
                shuffle(device_list)
                while device_list:
                    pid = None
                    if len(pids) == parallel_proc:
                        pid = os.wait()[0]
                        pids.discard(pid)

                    if self.conf_zero_byte_fps and pid == zbf_pid and once:
                        # If we're only running one pass and the ZBF scanner
                        # finished, don't bother restarting it.
                        zbf_pid = -100
                    elif self.conf_zero_byte_fps and pid == zbf_pid:
                        # When we're running forever, the ZBF scanner must
                        # be restarted as soon as it finishes.
                        kwargs['device_dirs'] = override_devices
                        # sleep between ZBF scanner forks
                        self._sleep()
                        zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
                        pids.add(zbf_pid)
                    else:
                        kwargs['device_dirs'] = [device_list.pop()]
                        pids.add(self.fork_child(**kwargs))
            while pids:
                pid = os.wait()[0]
                # ZBF scanner must be restarted as soon as it finishes
                # unless we're in run-once mode
                if self.conf_zero_byte_fps and pid == zbf_pid and \
                   len(pids) > 1 and not once:
                    kwargs['device_dirs'] = override_devices
                    # sleep between ZBF scanner forks
                    zbf_pid = self.fork_child(zero_byte_fps=True,
                                              sleep_between_zbf_scanner=True,
                                              **kwargs)
                    pids.add(zbf_pid)
                pids.discard(pid)

    def run_forever(self, *args, **kwargs):
        """Run the object audit until stopped."""
        # zero byte only command line option
        zbo_fps = kwargs.get('zero_byte_fps', 0)
        parent = False
        if zbo_fps:
            # only start parent
            parent = True
        kwargs = {'mode': 'forever'}

        while True:
            try:
                self.audit_loop(parent, zbo_fps, **kwargs)
            except (Exception, Timeout) as err:
                self.logger.exception('ERROR auditing: %s', err)
            self._sleep()

    def run_once(self, *args, **kwargs):
        """Run the object audit once"""
        # zero byte only command line option
        zbo_fps = kwargs.get('zero_byte_fps', 0)
        override_devices = list_from_csv(kwargs.get('devices'))
        # Remove bogus entries and duplicates from override_devices
        override_devices = list(
            set(listdir(self.devices)).intersection(set(override_devices)))
        parent = False
        if zbo_fps:
            # only start parent
            parent = True
        kwargs = {'mode': 'once'}

        try:
            self.audit_loop(parent, zbo_fps, override_devices=override_devices,
                            **kwargs)
        except (Exception, Timeout) as err:
            self.logger.exception('ERROR auditing: %s', err)


class WatcherWrapper(object):
    """
    Run the user-supplied watcher.

    Simple and gets the job done. Note that we aren't doing anything
    to isolate ourselves from hangs or file descriptor leaks
    in the plugins.
    """

    def __init__(self, watcher_class, watcher_name, conf, logger):
        self.watcher_name = watcher_name
        self.watcher_in_error = False
        self.logger = PrefixLoggerAdapter(logger, {})
        self.logger.set_prefix('[audit-watcher %s] ' % watcher_name)

        try:
            self.watcher = watcher_class(conf, self.logger)
        except (Exception, Timeout):
            self.logger.exception('Error intializing watcher')
            self.watcher_in_error = True

    def start(self, audit_type):
        if self.watcher_in_error:
            return  # can't trust the state of the thing; bail
        try:
            self.watcher.start(audit_type=audit_type)
        except (Exception, Timeout):
            self.logger.exception('Error starting watcher')
            self.watcher_in_error = True

    def see_object(self, meta, data_file_path):
        if self.watcher_in_error:
            return  # can't trust the state of the thing; bail
        kwargs = {'object_metadata': meta,
                  'data_file_path': data_file_path}
        try:
            self.watcher.see_object(**kwargs)
        except QuarantineRequest:
            # Avoid extra logging.
            raise
        except (Exception, Timeout):
            self.logger.exception(
                'Error in see_object(meta=%r, data_file_path=%r)',
                meta, data_file_path)
            # Do *not* flag watcher as being in an error state; a failure
            # to process one object shouldn't impact the ability to process
            # others.

    def end(self):
        if self.watcher_in_error:
            return  # can't trust the state of the thing; bail
        kwargs = {}
        try:
            self.watcher.end(**kwargs)
        except (Exception, Timeout):
            self.logger.exception('Error ending watcher')
            self.watcher_in_error = True