summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEamonn O'Toole <eamonn.otoole@hp.com>2014-02-24 11:24:56 +0000
committerEamonn O'Toole <eamonn.otoole@hp.com>2014-03-11 14:17:08 +0000
commit793489b80d212ef53748c8dc1ac0acb8bdfd628e (patch)
tree5a28fe9fbb52aea385ef0f2a0cc8ab4620e28be7
parent86668aa1acb622c6d899aba1726b384a832eab9a (diff)
downloadswift-793489b80d212ef53748c8dc1ac0acb8bdfd628e.tar.gz
Allow specification of object devices for audit
In object audit "once" mode we are allowing the user to specify a sub-set of devices to audit using the "--devices" command-line option. The sub-set is specified as a comma-separated list. This patch is taken from a larger patch to enable parallel processing in the object auditor. We've had to modify recon so that it will work properly with this change to "once" mode. We've modified dump_recon_cache() so that it will store nested dictionaries, in other words it will store a recon cache entry such as {'key1': {'key2': {...}}}. When the object auditor is run in "once" mode with "--devices" set the object_auditor_stats_ALL and ZBF entries look like: {'object_auditor_stats_ALL': {'disk1disk2..diskn': {...}}}. When swift-recon is run, it hunts through the nested dicts to find the appropriate entries. The object auditor recon cache entries are set to {} at the beginning of each audit cycle, and individual disk entries are cleared from cache at the end of each disk's audit cycle. DocImpact Change-Id: Icc53dac0a8136f1b2f61d5e08baf7b4fd87c8123
-rwxr-xr-xbin/swift-object-auditor2
-rw-r--r--doc/source/admin_guide.rst7
-rwxr-xr-xswift/cli/recon.py45
-rw-r--r--swift/common/utils.py24
-rw-r--r--swift/obj/auditor.py147
-rw-r--r--swift/obj/diskfile.py26
-rw-r--r--test/unit/common/middleware/test_recon.py39
-rw-r--r--test/unit/common/test_utils.py24
-rw-r--r--test/unit/obj/test_auditor.py99
9 files changed, 340 insertions, 73 deletions
diff --git a/bin/swift-object-auditor b/bin/swift-object-auditor
index 356fa9af5..55d0054b7 100755
--- a/bin/swift-object-auditor
+++ b/bin/swift-object-auditor
@@ -23,5 +23,7 @@ if __name__ == '__main__':
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('-z', '--zero_byte_fps',
help='Audit only zero byte files at specified files/sec')
+ parser.add_option('-d', '--devices',
+ help='Audit only given devices. Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectAuditor, conf_file, **options)
diff --git a/doc/source/admin_guide.rst b/doc/source/admin_guide.rst
index e5a641f00..5ea49f218 100644
--- a/doc/source/admin_guide.rst
+++ b/doc/source/admin_guide.rst
@@ -1067,6 +1067,13 @@ run this command as follows:
`swift-object-auditor /path/to/object-server/config/file.conf once -z 1000`
"-z" means to only check for zero-byte files at 1000 files per second.
+At times it is useful to be able to run the object auditor on a specific
+device or set of devices. You can run the object-auditor as follows:
+swift-object-auditor /path/to/object-server/config/file.conf once --devices=sda,sdb
+
+This will run the object auditor on only the sda and sdb devices. This param
+accepts a comma separated list of values.
+
-----------------
Object Replicator
-----------------
diff --git a/swift/cli/recon.py b/swift/cli/recon.py
index ba1448908..c91baf8e7 100755
--- a/swift/cli/recon.py
+++ b/swift/cli/recon.py
@@ -488,6 +488,24 @@ class SwiftRecon(object):
(self._ptime(low), self._ptime(high), self._ptime(average))
print "=" * 79
+ def nested_get_value(self, key, recon_entry):
+ """
+ Generator that yields all values for given key in a recon cache entry.
+ This is for use with object auditor recon cache entries. If the
+ object auditor has run in 'once' mode with a subset of devices
+ specified the checksum auditor section will have an entry of the form:
+ {'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
+ The same is true of the ZBF auditor cache entry section. We use this
+ generator to find all instances of a particular key in these multi-
+ level dictionaries.
+ """
+ for k, v in recon_entry.items():
+ if isinstance(v, dict):
+ for value in self.nested_get_value(key, v):
+ yield value
+ if k == key:
+ yield v
+
def object_auditor_check(self, hosts):
"""
Obtain and print obj auditor statistics
@@ -513,11 +531,16 @@ class SwiftRecon(object):
zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0:
stats = {}
- stats[atime] = [all_scan[i][atime] for i in all_scan]
- stats[bprocessed] = [all_scan[i][bprocessed] for i in all_scan]
- stats[passes] = [all_scan[i][passes] for i in all_scan]
- stats[errors] = [all_scan[i][errors] for i in all_scan]
- stats[quarantined] = [all_scan[i][quarantined] for i in all_scan]
+ stats[atime] = [(self.nested_get_value(atime, all_scan[i]))
+ for i in all_scan]
+ stats[bprocessed] = [(self.nested_get_value(bprocessed,
+ all_scan[i])) for i in all_scan]
+ stats[passes] = [(self.nested_get_value(passes, all_scan[i]))
+ for i in all_scan]
+ stats[errors] = [(self.nested_get_value(errors, all_scan[i]))
+ for i in all_scan]
+ stats[quarantined] = [(self.nested_get_value(quarantined,
+ all_scan[i])) for i in all_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
@@ -534,10 +557,14 @@ class SwiftRecon(object):
print "[ALL_auditor] - No hosts returned valid data."
if len(zbf_scan) > 0:
stats = {}
- stats[atime] = [zbf_scan[i][atime] for i in zbf_scan]
- stats[bprocessed] = [zbf_scan[i][bprocessed] for i in zbf_scan]
- stats[errors] = [zbf_scan[i][errors] for i in zbf_scan]
- stats[quarantined] = [zbf_scan[i][quarantined] for i in zbf_scan]
+ stats[atime] = [(self.nested_get_value(atime, zbf_scan[i]))
+ for i in zbf_scan]
+ stats[bprocessed] = [(self.nested_get_value(bprocessed,
+ zbf_scan[i])) for i in zbf_scan]
+ stats[errors] = [(self.nested_get_value(errors, zbf_scan[i]))
+ for i in zbf_scan]
+ stats[quarantined] = [(self.nested_get_value(quarantined,
+ zbf_scan[i])) for i in zbf_scan]
for k in stats:
if None in stats[k]:
stats[k] = [x for x in stats[k] if x is not None]
diff --git a/swift/common/utils.py b/swift/common/utils.py
index abebc6515..be76ddb6c 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -2079,6 +2079,28 @@ def human_readable(value):
return '%d%si' % (round(value), suffixes[index])
+def put_recon_cache_entry(cache_entry, key, item):
+ """
+ Function that will check if item is a dict, and if so put it under
+ cache_entry[key]. We use nested recon cache entries when the object
+ auditor runs in 'once' mode with a specified subset of devices.
+ """
+ if isinstance(item, dict):
+ if key not in cache_entry or key in cache_entry and not \
+ isinstance(cache_entry[key], dict):
+ cache_entry[key] = {}
+ elif key in cache_entry and item == {}:
+ cache_entry.pop(key, None)
+ return
+ for k, v in item.items():
+ if v == {}:
+ cache_entry[key].pop(k, None)
+ else:
+ cache_entry[key][k] = v
+ else:
+ cache_entry[key] = item
+
+
def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
"""Update recon cache values
@@ -2098,7 +2120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
#file doesn't have a valid entry, we'll recreate it
pass
for cache_key, cache_value in cache_dict.items():
- cache_entry[cache_key] = cache_value
+ put_recon_cache_entry(cache_entry, cache_key, cache_value)
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index d22fd5f42..c5b708acc 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -14,14 +14,16 @@
# limitations under the License.
import os
+import sys
import time
+import signal
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
from swift.obj import diskfile
from swift.common.utils import get_logger, ratelimit_sleep, dump_recon_cache, \
- list_from_csv, json
+ list_from_csv, json, listdir
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist
from swift.common.daemon import Daemon
@@ -31,10 +33,10 @@ SLEEP_BETWEEN_AUDITS = 30
class AuditorWorker(object):
"""Walk through file system to audit objects"""
- def __init__(self, conf, logger, zero_byte_only_at_fps=0):
+ def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0):
self.conf = conf
self.logger = logger
- self.devices = conf.get('devices', '/srv/node')
+ self.devices = devices
self.diskfile_mgr = diskfile.DiskFileManager(conf, self.logger)
self.max_files_per_second = float(conf.get('files_per_second', 20))
self.max_bytes_per_second = float(conf.get('bytes_per_second',
@@ -53,24 +55,34 @@ class AuditorWorker(object):
self.passes = 0
self.quarantines = 0
self.errors = 0
- self.recon_cache_path = conf.get('recon_cache_path',
- '/var/cache/swift')
- self.rcache = os.path.join(self.recon_cache_path, "object.recon")
+ self.rcache = rcache
self.stats_sizes = sorted(
[int(s) for s in list_from_csv(conf.get('object_size_stats'))])
self.stats_buckets = dict(
[(s, 0) for s in self.stats_sizes + ['OVER']])
- def audit_all_objects(self, mode='once'):
- self.logger.info(_('Begin object audit "%s" mode (%s)') %
- (mode, self.auditor_type))
+ def create_recon_nested_dict(self, top_level_key, device_list, item):
+ if device_list:
+ device_key = ''.join(sorted(device_list))
+ return {top_level_key: {device_key: item}}
+ else:
+ return {top_level_key: item}
+
+ def audit_all_objects(self, mode='once', device_dirs=None):
+ description = ''
+ if device_dirs:
+ device_dir_str = ','.join(sorted(device_dirs))
+ description = _(' - %s') % device_dir_str
+ self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
+ (mode, self.auditor_type, description))
begin = reported = time.time()
self.total_bytes_processed = 0
self.total_files_processed = 0
total_quarantines = 0
total_errors = 0
time_auditing = 0
- all_locs = self.diskfile_mgr.object_audit_location_generator()
+ all_locs = self.diskfile_mgr.object_audit_location_generator(
+ device_dirs=device_dirs)
for location in all_locs:
loop_time = time.time()
self.failsafe_object_audit(location)
@@ -87,7 +99,7 @@ class AuditorWorker(object):
'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
'Total time: %(total).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % {
- 'type': self.auditor_type,
+ 'type': '%s%s' % (self.auditor_type, description),
'start_time': time.ctime(reported),
'passes': self.passes, 'quars': self.quarantines,
'errors': self.errors,
@@ -95,15 +107,14 @@ class AuditorWorker(object):
'brate': self.bytes_processed / (now - reported),
'total': (now - begin), 'audit': time_auditing,
'audit_rate': time_auditing / (now - begin)})
- dump_recon_cache({'object_auditor_stats_%s' %
- self.auditor_type: {
- 'errors': self.errors,
- 'passes': self.passes,
- 'quarantined': self.quarantines,
- 'bytes_processed': self.bytes_processed,
- 'start_time': reported,
- 'audit_time': time_auditing}},
- self.rcache, self.logger)
+ cache_entry = self.create_recon_nested_dict(
+ 'object_auditor_stats_%s' % (self.auditor_type),
+ device_dirs,
+ {'errors': self.errors, 'passes': self.passes,
+ 'quarantined': self.quarantines,
+ 'bytes_processed': self.bytes_processed,
+ 'start_time': reported, 'audit_time': time_auditing})
+ dump_recon_cache(cache_entry, self.rcache, self.logger)
reported = now
total_quarantines += self.quarantines
total_errors += self.errors
@@ -120,12 +131,19 @@ class AuditorWorker(object):
'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
'Rate: %(audit_rate).2f') % {
- 'type': self.auditor_type, 'mode': mode, 'elapsed': elapsed,
+ 'type': '%s%s' % (self.auditor_type, description),
+ 'mode': mode, 'elapsed': elapsed,
'quars': total_quarantines + self.quarantines,
'errors': total_errors + self.errors,
'frate': self.total_files_processed / elapsed,
'brate': self.total_bytes_processed / elapsed,
'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
+ # Clear recon cache entry if device_dirs is set
+ if device_dirs:
+ cache_entry = self.create_recon_nested_dict(
+ 'object_auditor_stats_%s' % (self.auditor_type),
+ device_dirs, {})
+ dump_recon_cache(cache_entry, self.rcache, self.logger)
if self.stats_sizes:
self.logger.info(
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
@@ -204,35 +222,100 @@ class ObjectAuditor(Daemon):
def __init__(self, conf, **options):
self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor')
+ self.devices = conf.get('devices', '/srv/node')
self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50))
+ self.recon_cache_path = conf.get('recon_cache_path',
+ '/var/cache/swift')
+ self.rcache = os.path.join(self.recon_cache_path, "object.recon")
def _sleep(self):
time.sleep(SLEEP_BETWEEN_AUDITS)
+ def clear_recon_cache(self, auditor_type):
+ """Clear recon cache entries"""
+ dump_recon_cache({'object_auditor_stats_%s' % auditor_type: {}},
+ self.rcache, self.logger)
+
+ def run_audit(self, **kwargs):
+ """Run the object audit"""
+ mode = kwargs.get('mode')
+ zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
+ device_dirs = kwargs.get('device_dirs')
+ worker = AuditorWorker(self.conf, self.logger, self.rcache,
+ self.devices,
+ zero_byte_only_at_fps=zero_byte_only_at_fps)
+ worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
+
+ def fork_child(self, zero_byte_fps=False, **kwargs):
+ """Child execution"""
+ pid = os.fork()
+ if pid:
+ return pid
+ else:
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ if zero_byte_fps:
+ kwargs['zero_byte_fps'] = self.conf_zero_byte_fps
+ self.run_audit(**kwargs)
+ sys.exit()
+
+ def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
+ """Audit loop"""
+ self.clear_recon_cache('ALL')
+ self.clear_recon_cache('ZBF')
+ kwargs['device_dirs'] = override_devices
+ if parent:
+ kwargs['zero_byte_fps'] = zbo_fps
+ self.run_audit(**kwargs)
+ else:
+ pids = []
+ if self.conf_zero_byte_fps:
+ zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
+ pids.append(zbf_pid)
+ pids.append(self.fork_child(**kwargs))
+ while pids:
+ pid = os.wait()[0]
+ # ZBF scanner must be restarted as soon as it finishes
+ if self.conf_zero_byte_fps and pid == zbf_pid and \
+ len(pids) > 1:
+ kwargs['device_dirs'] = override_devices
+ zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
+ pids.append(zbf_pid)
+ pids.remove(pid)
+
def run_forever(self, *args, **kwargs):
"""Run the object audit until stopped."""
# zero byte only command line option
zbo_fps = kwargs.get('zero_byte_fps', 0)
+ parent = False
if zbo_fps:
# only start parent
parent = True
- else:
- parent = os.fork() # child gets parent = 0
kwargs = {'mode': 'forever'}
- if parent:
- kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps
+
while True:
try:
- self.run_once(**kwargs)
+ self.audit_loop(parent, zbo_fps, **kwargs)
except (Exception, Timeout):
self.logger.exception(_('ERROR auditing'))
self._sleep()
def run_once(self, *args, **kwargs):
- """Run the object audit once."""
- mode = kwargs.get('mode', 'once')
- zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
- worker = AuditorWorker(self.conf, self.logger,
- zero_byte_only_at_fps=zero_byte_only_at_fps)
- worker.audit_all_objects(mode=mode)
+ """Run the object audit once"""
+ # zero byte only command line option
+ zbo_fps = kwargs.get('zero_byte_fps', 0)
+ override_devices = list_from_csv(kwargs.get('devices'))
+ # Remove bogus entries and duplicates from override_devices
+ override_devices = list(
+ set(listdir(self.devices)).intersection(set(override_devices)))
+ parent = False
+ if zbo_fps:
+ # only start parent
+ parent = True
+ kwargs = {'mode': 'once'}
+
+ try:
+ self.audit_loop(parent, zbo_fps, override_devices=override_devices,
+ **kwargs)
+ except (Exception, Timeout):
+ self.logger.exception(_('ERROR auditing'))
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index 7c0a9483a..1e7481557 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -351,22 +351,32 @@ class AuditLocation(object):
return str(self.path)
-def object_audit_location_generator(devices, mount_check=True, logger=None):
+def object_audit_location_generator(devices, mount_check=True, logger=None,
+ device_dirs=None):
"""
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
- objects stored under that directory. The AuditLocation only knows the path
- to the hash directory, not to the .data file therein (if any). This is to
- avoid a double listdir(hash_dir); the DiskFile object will always do one,
- so we don't.
+ objects stored under that directory if device_dirs isn't set. If
+ device_dirs is set, only yield AuditLocation for the objects under the
+ entries in device_dirs. The AuditLocation only knows the path to the hash
+ directory, not to the .data file therein (if any). This is to avoid a
+ double listdir(hash_dir); the DiskFile object will always do one, so
+ we don't.
:param devices: parent directory of the devices to be audited
:param mount_check: flag to check if a mount check should be performed
on devices
:param logger: a logger object
+ :device_dirs: a list of directories under devices to traverse
"""
- device_dirs = listdir(devices)
+ if not device_dirs:
+ device_dirs = listdir(devices)
+ else:
+ # remove bogus devices and duplicates from device_dirs
+ device_dirs = list(
+ set(listdir(devices)).intersection(set(device_dirs)))
# randomize devices in case of process restart before sweep completed
shuffle(device_dirs)
+
for device in device_dirs:
if mount_check and not \
ismount(os.path.join(devices, device)):
@@ -502,9 +512,9 @@ class DiskFileManager(object):
return DiskFile(self, dev_path, self.threadpools[device],
partition, account, container, obj, **kwargs)
- def object_audit_location_generator(self):
+ def object_audit_location_generator(self, device_dirs=None):
return object_audit_location_generator(self.devices, self.mount_check,
- self.logger)
+ self.logger, device_dirs)
def get_diskfile_from_audit_location(self, audit_location):
dev_path = self.get_dev_path(audit_location.device, mount_check=False)
diff --git a/test/unit/common/middleware/test_recon.py b/test/unit/common/middleware/test_recon.py
index 5cdd2edf4..b94bd825e 100644
--- a/test/unit/common/middleware/test_recon.py
+++ b/test/unit/common/middleware/test_recon.py
@@ -562,6 +562,45 @@ class TestReconSuccess(TestCase):
"files_processed": 2310,
"quarantined": 0}})
+ def test_get_auditor_info_object_once(self):
+ from_cache_response = {
+ "object_auditor_stats_ALL": {'disk1disk2': {
+ "audit_time": 115.14418768882751,
+ "bytes_processed": 234660,
+ "completed": 115.4512460231781,
+ "errors": 0,
+ "files_processed": 2310,
+ "quarantined": 0}},
+ "object_auditor_stats_ZBF": {'disk1disk2': {
+ "audit_time": 45.877294063568115,
+ "bytes_processed": 0,
+ "completed": 46.181446075439453,
+ "errors": 0,
+ "files_processed": 2310,
+ "quarantined": 0}}}
+ self.fakecache.fakeout_calls = []
+ self.fakecache.fakeout = from_cache_response
+ rv = self.app.get_auditor_info('object')
+ self.assertEquals(self.fakecache.fakeout_calls,
+ [((['object_auditor_stats_ALL',
+ 'object_auditor_stats_ZBF'],
+ '/var/cache/swift/object.recon'), {})])
+ self.assertEquals(rv, {
+ "object_auditor_stats_ALL": {'disk1disk2': {
+ "audit_time": 115.14418768882751,
+ "bytes_processed": 234660,
+ "completed": 115.4512460231781,
+ "errors": 0,
+ "files_processed": 2310,
+ "quarantined": 0}},
+ "object_auditor_stats_ZBF": {'disk1disk2': {
+ "audit_time": 45.877294063568115,
+ "bytes_processed": 0,
+ "completed": 46.181446075439453,
+ "errors": 0,
+ "files_processed": 2310,
+ "quarantined": 0}}})
+
def test_get_unmounted(self):
unmounted_resp = [{'device': 'fakeone', 'mounted': False},
{'device': 'faketwo', 'mounted': False}]
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index 103ee8773..4455a5f72 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -28,6 +28,7 @@ import random
import re
import socket
import sys
+import json
from textwrap import dedent
@@ -486,6 +487,29 @@ class TestUtils(unittest.TestCase):
utils.sys.stdout = orig_stdout
utils.sys.stderr = orig_stderr
+ def test_dump_recon_cache(self):
+ testdir_base = mkdtemp()
+ testcache_file = os.path.join(testdir_base, 'cache.recon')
+ logger = utils.get_logger(None, 'server', log_route='server')
+ try:
+ submit_dict = {'key1': {'value1': 1, 'value2': 2}}
+ utils.dump_recon_cache(submit_dict, testcache_file, logger)
+ fd = open(testcache_file)
+ file_dict = json.loads(fd.readline())
+ fd.close()
+ self.assertEquals(submit_dict, file_dict)
+ # Use a nested entry
+ submit_dict = {'key1': {'key2': {'value1': 1, 'value2': 2}}}
+ result_dict = {'key1': {'key2': {'value1': 1, 'value2': 2},
+ 'value1': 1, 'value2': 2}}
+ utils.dump_recon_cache(submit_dict, testcache_file, logger)
+ fd = open(testcache_file)
+ file_dict = json.loads(fd.readline())
+ fd.close()
+ self.assertEquals(result_dict, file_dict)
+ finally:
+ rmtree(testdir_base)
+
def test_get_logger(self):
sio = StringIO()
logger = logging.getLogger('server')
diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py
index c6788c163..40e3f3c11 100644
--- a/test/unit/obj/test_auditor.py
+++ b/test/unit/obj/test_auditor.py
@@ -18,6 +18,7 @@ import unittest
import mock
import os
import time
+import string
from shutil import rmtree
from hashlib import md5
from tempfile import mkdtemp
@@ -34,6 +35,7 @@ class TestAuditor(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node')
+ self.rcache = os.path.join(self.testdir, 'object.recon')
self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda'))
@@ -60,7 +62,8 @@ class TestAuditor(unittest.TestCase):
unit.xattr_data = {}
def test_object_audit_extra_data(self):
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
data = '0' * 1024
etag = md5()
with self.disk_file.create() as writer:
@@ -86,7 +89,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_audit_diff_data(self):
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
data = '0' * 1024
etag = md5()
timestamp = str(normalize_timestamp(time.time()))
@@ -129,7 +133,8 @@ class TestAuditor(unittest.TestCase):
fp.write('0' * 1024)
fp.close()
invalidate_hash(os.path.dirname(self.disk_file._datadir))
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
pre_quarantines = auditor_worker.quarantines
auditor_worker.object_audit(
AuditLocation(self.disk_file._datadir, 'sda', '0'))
@@ -141,7 +146,8 @@ class TestAuditor(unittest.TestCase):
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
@@ -156,7 +162,8 @@ class TestAuditor(unittest.TestCase):
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
@@ -166,7 +173,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.errors, 1)
def test_generic_exception_handling(self):
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_errors = auditor_worker.errors
data = '0' * 1024
@@ -186,7 +194,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.errors, pre_errors + 1)
def test_object_run_once_pass(self):
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
auditor_worker.log_time = 0
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
@@ -208,7 +217,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
def test_object_run_once_no_sda(self):
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = '0' * 1024
@@ -228,7 +238,8 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.quarantines, pre_quarantines + 1)
def test_object_run_once_multi_devices(self):
- auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
+ auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
+ self.rcache, self.devices)
timestamp = str(normalize_timestamp(time.time()))
pre_quarantines = auditor_worker.quarantines
data = '0' * 10
@@ -284,9 +295,12 @@ class TestAuditor(unittest.TestCase):
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
- self.auditor.run_once(zero_byte_fps=50)
+ kwargs = {'mode': 'once'}
+ kwargs['zero_byte_fps'] = 50
+ self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
- self.auditor.run_once()
+ del(kwargs['zero_byte_fps'])
+ self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
def setup_bad_zero_byte(self, with_ts=False):
@@ -322,14 +336,17 @@ class TestAuditor(unittest.TestCase):
def test_object_run_fast_track_all(self):
self.setup_bad_zero_byte()
- self.auditor.run_once()
+ kwargs = {'mode': 'once'}
+ self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
def test_object_run_fast_track_zero(self):
self.setup_bad_zero_byte()
- self.auditor.run_once(zero_byte_fps=50)
+ kwargs = {'mode': 'once'}
+ kwargs['zero_byte_fps'] = 50
+ self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
@@ -347,7 +364,9 @@ class TestAuditor(unittest.TestCase):
was_df = auditor.diskfile.DiskFile
try:
auditor.diskfile.DiskFile = FakeFile
- self.auditor.run_once(zero_byte_fps=50)
+ kwargs = {'mode': 'once'}
+ kwargs['zero_byte_fps'] = 50
+ self.auditor.run_audit(**kwargs)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
self.assertTrue(os.path.isdir(quarantine_path))
@@ -358,7 +377,8 @@ class TestAuditor(unittest.TestCase):
def test_with_tombstone(self):
ts_file_path = self.setup_bad_zero_byte(with_ts=True)
self.assertTrue(ts_file_path.endswith('ts'))
- self.auditor.run_once()
+ kwargs = {'mode': 'once'}
+ self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.exists(ts_file_path))
def test_sleeper(self):
@@ -370,7 +390,7 @@ class TestAuditor(unittest.TestCase):
self.assert_(delta_t > 0.08)
self.assert_(delta_t < 0.12)
- def test_run_forever(self):
+ def test_run_audit(self):
class StopForever(Exception):
pass
@@ -378,45 +398,78 @@ class TestAuditor(unittest.TestCase):
class ObjectAuditorMock(object):
check_args = ()
check_kwargs = {}
+ check_device_dir = None
fork_called = 0
- fork_res = 0
+ master = 0
+ wait_called = 0
def mock_run(self, *args, **kwargs):
self.check_args = args
self.check_kwargs = kwargs
+ if 'zero_byte_fps' in kwargs:
+ self.check_device_dir = kwargs.get('device_dirs')
def mock_sleep(self):
raise StopForever('stop')
def mock_fork(self):
self.fork_called += 1
- return self.fork_res
+ if self.master:
+ return self.fork_called
+ else:
+ return 0
+
+ def mock_wait(self):
+ self.wait_called += 1
+ return (self.wait_called, 0)
+
+ for i in string.ascii_letters[2:26]:
+ mkdirs(os.path.join(self.devices, 'sd%s' % i))
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89))
mocker = ObjectAuditorMock()
- my_auditor.run_once = mocker.mock_run
+ my_auditor.run_audit = mocker.mock_run
my_auditor._sleep = mocker.mock_sleep
was_fork = os.fork
+ was_wait = os.wait
try:
os.fork = mocker.mock_fork
+ os.wait = mocker.mock_wait
self.assertRaises(StopForever,
my_auditor.run_forever, zero_byte_fps=50)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50)
self.assertEquals(mocker.fork_called, 0)
- self.assertRaises(StopForever, my_auditor.run_forever)
+ self.assertRaises(SystemExit, my_auditor.run_forever)
self.assertEquals(mocker.fork_called, 1)
+ self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
+ self.assertEquals(mocker.check_device_dir, None)
self.assertEquals(mocker.check_args, ())
- mocker.fork_res = 1
- self.assertRaises(StopForever, my_auditor.run_forever)
- self.assertEquals(mocker.fork_called, 2)
+ device_list = ['sd%s' % i for i in string.ascii_letters[2:10]]
+ device_string = ','.join(device_list)
+ device_string_bogus = device_string + ',bogus'
+
+ mocker.fork_called = 0
+ self.assertRaises(SystemExit, my_auditor.run_once,
+ devices=device_string_bogus)
+ self.assertEquals(mocker.fork_called, 1)
self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
+ self.assertEquals(sorted(mocker.check_device_dir), device_list)
+
+ mocker.master = 1
+
+ mocker.fork_called = 0
+ self.assertRaises(StopForever, my_auditor.run_forever)
+ # Fork is called 3 times since the zbf process is forked twice
+ self.assertEquals(mocker.fork_called, 3)
+ self.assertEquals(mocker.wait_called, 3)
finally:
os.fork = was_fork
+ os.wait = was_wait
if __name__ == '__main__':
unittest.main()