summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandre Lécuyer <alexandre.lecuyer@corp.ovh.com>2017-07-13 15:44:56 +0200
committerTim Burke <tim@swiftstack.com>2018-03-22 03:02:35 +0000
commit8d7e245eb599b54d2bcc3c7a652b85987fdb36e1 (patch)
treed691b5225656ca4d8058b47815ab179551683c28
parent9f4910f6b938aa1679571d4787bbdcfa5b5f3a19 (diff)
downloadswift-8d7e245eb599b54d2bcc3c7a652b85987fdb36e1.tar.gz
Change object_audit_location_generator() to yield for a single policy.
auditor.py currently relies on POLICY[0] object_audit_location_generator() to yield an AuditLocation for all policies on the object-server. The changes in this patch are : - object_audit_location_generator() yields AuditLocation only for the requested policy - audit_all_objects() calls object_audit_location_generator() for each policy Closes-Bug: 1534212 Change-Id: Ida92ba0a5e1e486a4f7132c6539460b38c34ec87
-rw-r--r--swift/obj/auditor.py28
-rw-r--r--swift/obj/diskfile.py108
-rw-r--r--test/unit/obj/test_auditor.py2
-rw-r--r--test/unit/obj/test_diskfile.py110
4 files changed, 114 insertions, 134 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index c3245482b..490582d64 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -27,7 +27,7 @@ from eventlet import Timeout
from swift.obj import diskfile, replicator
from swift.common.utils import (
get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir,
- unlink_paths_older_than, readconf, config_auto_int_value)
+ unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter)
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\
DiskFileDeleted, DiskFileExpired
from swift.common.daemon import Daemon
@@ -120,18 +120,17 @@ class AuditorWorker(object):
total_quarantines = 0
total_errors = 0
time_auditing = 0
- # TODO: we should move audit-location generation to the storage policy,
- # as we may (conceivably) have a different filesystem layout for each.
- # We'd still need to generate the policies to audit from the actual
- # directories found on-disk, and have appropriate error reporting if we
- # find a directory that doesn't correspond to any known policy. This
- # will require a sizable refactor, but currently all diskfile managers
- # can find all diskfile locations regardless of policy -- so for now
- # just use Policy-0's manager.
- all_locs = (self.diskfile_router[POLICIES[0]]
+
+ # get AuditLocations for each policy
+ loc_generators = []
+ for policy in POLICIES:
+ loc_generators.append(
+ self.diskfile_router[policy]
.object_audit_location_generator(
- device_dirs=device_dirs,
+ policy, device_dirs=device_dirs,
auditor_type=self.auditor_type))
+
+ all_locs = round_robin_iter(loc_generators)
for location in all_locs:
loop_time = time.time()
self.failsafe_object_audit(location)
@@ -192,8 +191,11 @@ class AuditorWorker(object):
self.logger.info(
_('Object audit stats: %s') % json.dumps(self.stats_buckets))
- # Unset remaining partitions to not skip them in the next run
- diskfile.clear_auditor_status(self.devices, self.auditor_type)
+ for policy in POLICIES:
+ # Unset remaining partitions to not skip them in the next run
+ self.diskfile_router[policy].clear_auditor_status(
+ policy,
+ self.auditor_type)
def record_stats(self, obj_size):
"""
diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py
index f7e22161b..323ee5a99 100644
--- a/swift/obj/diskfile.py
+++ b/swift/obj/diskfile.py
@@ -453,18 +453,20 @@ class AuditLocation(object):
return str(self.path)
-def object_audit_location_generator(devices, mount_check=True, logger=None,
- device_dirs=None, auditor_type="ALL"):
+def object_audit_location_generator(devices, datadir, mount_check=True,
+ logger=None, device_dirs=None,
+ auditor_type="ALL"):
"""
Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all
- objects stored under that directory if device_dirs isn't set. If
- device_dirs is set, only yield AuditLocation for the objects under the
- entries in device_dirs. The AuditLocation only knows the path to the hash
- directory, not to the .data file therein (if any). This is to avoid a
- double listdir(hash_dir); the DiskFile object will always do one, so
- we don't.
+ objects stored under that directory for the given datadir (policy),
+ if device_dirs isn't set. If device_dirs is set, only yield AuditLocation
+ for the objects under the entries in device_dirs. The AuditLocation only
+ knows the path to the hash directory, not to the .data file therein
+ (if any). This is to avoid a double listdir(hash_dir); the DiskFile object
+ will always do one, so we don't.
:param devices: parent directory of the devices to be audited
+ :param datadir: objects directory
:param mount_check: flag to check if a mount check should be performed
on devices
:param logger: a logger object
@@ -480,6 +482,7 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
# randomize devices in case of process restart before sweep completed
shuffle(device_dirs)
+ base, policy = split_policy_string(datadir)
for device in device_dirs:
if not check_drive(devices, device, mount_check):
if logger:
@@ -487,55 +490,37 @@ def object_audit_location_generator(devices, mount_check=True, logger=None,
'Skipping %s as it is not %s', device,
'mounted' if mount_check else 'a dir')
continue
- # loop through object dirs for all policies
- device_dir = os.path.join(devices, device)
- try:
- dirs = os.listdir(device_dir)
- except OSError as e:
- if logger:
- logger.debug(
- _('Skipping %(dir)s: %(err)s') % {'dir': device_dir,
- 'err': e.strerror})
+
+ datadir_path = os.path.join(devices, device, datadir)
+ if not os.path.exists(datadir_path):
continue
- for dir_ in dirs:
- if not dir_.startswith(DATADIR_BASE):
- continue
- try:
- base, policy = split_policy_string(dir_)
- except PolicyError as e:
- if logger:
- logger.warning(_('Directory %(directory)r does not map '
- 'to a valid policy (%(error)s)') % {
- 'directory': dir_, 'error': e})
- continue
- datadir_path = os.path.join(devices, device, dir_)
- partitions = get_auditor_status(datadir_path, logger, auditor_type)
+ partitions = get_auditor_status(datadir_path, logger, auditor_type)
- for pos, partition in enumerate(partitions):
- update_auditor_status(datadir_path, logger,
- partitions[pos:], auditor_type)
- part_path = os.path.join(datadir_path, partition)
+ for pos, partition in enumerate(partitions):
+ update_auditor_status(datadir_path, logger,
+ partitions[pos:], auditor_type)
+ part_path = os.path.join(datadir_path, partition)
+ try:
+ suffixes = listdir(part_path)
+ except OSError as e:
+ if e.errno != errno.ENOTDIR:
+ raise
+ continue
+ for asuffix in suffixes:
+ suff_path = os.path.join(part_path, asuffix)
try:
- suffixes = listdir(part_path)
+ hashes = listdir(suff_path)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
continue
- for asuffix in suffixes:
- suff_path = os.path.join(part_path, asuffix)
- try:
- hashes = listdir(suff_path)
- except OSError as e:
- if e.errno != errno.ENOTDIR:
- raise
- continue
- for hsh in hashes:
- hsh_path = os.path.join(suff_path, hsh)
- yield AuditLocation(hsh_path, device, partition,
- policy)
+ for hsh in hashes:
+ hsh_path = os.path.join(suff_path, hsh)
+ yield AuditLocation(hsh_path, device, partition,
+ policy)
- update_auditor_status(datadir_path, logger, [], auditor_type)
+ update_auditor_status(datadir_path, logger, [], auditor_type)
def get_auditor_status(datadir_path, logger, auditor_type):
@@ -589,15 +574,13 @@ def update_auditor_status(datadir_path, logger, partitions, auditor_type):
{'auditor_status': auditor_status, 'err': e})
-def clear_auditor_status(devices, auditor_type="ALL"):
- for device in os.listdir(devices):
- for dir_ in os.listdir(os.path.join(devices, device)):
- if not dir_.startswith("objects"):
- continue
- datadir_path = os.path.join(devices, device, dir_)
- auditor_status = os.path.join(
- datadir_path, "auditor_status_%s.json" % auditor_type)
- remove_file(auditor_status)
+def clear_auditor_status(devices, datadir, auditor_type="ALL"):
+ device_dirs = listdir(devices)
+ for device in device_dirs:
+ datadir_path = os.path.join(devices, device, datadir)
+ auditor_status = os.path.join(
+ datadir_path, "auditor_status_%s.json" % auditor_type)
+ remove_file(auditor_status)
def strip_self(f):
@@ -1340,15 +1323,22 @@ class BaseDiskFileManager(object):
pipe_size=self.pipe_size,
use_linkat=self.use_linkat, **kwargs)
- def object_audit_location_generator(self, device_dirs=None,
+ def clear_auditor_status(self, policy, auditor_type="ALL"):
+ datadir = get_data_dir(policy)
+ clear_auditor_status(self.devices, datadir, auditor_type)
+
+ def object_audit_location_generator(self, policy, device_dirs=None,
auditor_type="ALL"):
"""
Yield an AuditLocation for all objects stored under device_dirs.
+ :param policy: the StoragePolicy instance
:param device_dirs: directory of target device
:param auditor_type: either ALL or ZBF
"""
- return object_audit_location_generator(self.devices, self.mount_check,
+ datadir = get_data_dir(policy)
+ return object_audit_location_generator(self.devices, datadir,
+ self.mount_check,
self.logger, device_dirs,
auditor_type)
diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py
index 5615e9a2a..06c919bff 100644
--- a/test/unit/obj/test_auditor.py
+++ b/test/unit/obj/test_auditor.py
@@ -852,7 +852,7 @@ class TestAuditor(unittest.TestCase):
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del(kwargs['zero_byte_fps'])
- clear_auditor_status(self.devices)
+ clear_auditor_status(self.devices, 'objects')
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py
index 76bd38522..96c2a622e 100644
--- a/test/unit/obj/test_diskfile.py
+++ b/test/unit/obj/test_diskfile.py
@@ -375,17 +375,6 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
"fcd938702024c25fef6c32fef05298eb"))
os.makedirs(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
"4a943bc72c2e647c4675923d58cf4ca5"))
- os.makedirs(os.path.join(tmpdir, "sdq", "objects-2", "9971", "8eb",
- "fcd938702024c25fef6c32fef05298eb"))
- os.makedirs(os.path.join(tmpdir, "sdq", "objects-99", "9972",
- "8eb",
- "fcd938702024c25fef6c32fef05298eb"))
- # the bad
- os.makedirs(os.path.join(tmpdir, "sdq", "objects-", "1135",
- "6c3",
- "fcd938702024c25fef6c32fef05298eb"))
- os.makedirs(os.path.join(tmpdir, "sdq", "objects-fud", "foo"))
- os.makedirs(os.path.join(tmpdir, "sdq", "objects-+1", "foo"))
self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519",
"fed"))
@@ -404,27 +393,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
"4f9eee668b66c6f0250bfa3c7ab9e51e"))
logger = debug_logger()
- locations = [(loc.path, loc.device, loc.partition, loc.policy)
- for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=False,
- logger=logger)]
+ loc_generators = []
+ datadirs = ["objects", "objects-1"]
+ for datadir in datadirs:
+ loc_generators.append(
+ diskfile.object_audit_location_generator(
+ devices=tmpdir, datadir=datadir, mount_check=False,
+ logger=logger))
+
+ all_locs = itertools.chain(*loc_generators)
+ locations = [(loc.path, loc.device, loc.partition, loc.policy) for
+ loc in all_locs]
locations.sort()
- # expect some warnings about those bad dirs
- warnings = logger.get_lines_for_level('warning')
- self.assertEqual(set(warnings), set([
- ("Directory 'objects-' does not map to a valid policy "
- "(Unknown policy, for index '')"),
- ("Directory 'objects-2' does not map to a valid policy "
- "(Unknown policy, for index '2')"),
- ("Directory 'objects-99' does not map to a valid policy "
- "(Unknown policy, for index '99')"),
- ("Directory 'objects-fud' does not map to a valid policy "
- "(Unknown policy, for index 'fud')"),
- ("Directory 'objects-+1' does not map to a valid policy "
- "(Unknown policy, for index '+1')"),
- ]))
-
expected = \
[(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5",
"4a943bc72c2e647c4675923d58cf4ca5"),
@@ -448,12 +429,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
self.assertEqual(locations, expected)
# Reset status file for next run
- diskfile.clear_auditor_status(tmpdir)
+ for datadir in datadirs:
+ diskfile.clear_auditor_status(tmpdir, datadir)
# now without a logger
- locations = [(loc.path, loc.device, loc.partition, loc.policy)
- for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=False)]
+ for datadir in datadirs:
+ loc_generators.append(
+ diskfile.object_audit_location_generator(
+ devices=tmpdir, datadir=datadir, mount_check=False,
+ logger=logger))
+
+ all_locs = itertools.chain(*loc_generators)
+ locations = [(loc.path, loc.device, loc.partition, loc.policy) for
+ loc in all_locs]
locations.sort()
self.assertEqual(locations, expected)
@@ -470,7 +458,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=True)]
+ devices=tmpdir, datadir="objects", mount_check=True)]
locations.sort()
self.assertEqual(
@@ -485,7 +473,8 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=True, logger=logger)]
+ devices=tmpdir, datadir="objects", mount_check=True,
+ logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping sdq as it is not mounted',
@@ -502,7 +491,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=False)]
+ devices=tmpdir, datadir="objects", mount_check=False)]
self.assertEqual(
locations,
@@ -516,30 +505,22 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=False, logger=logger)]
+ devices=tmpdir, datadir="objects", mount_check=False,
+ logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping garbage as it is not a dir',
], debug_lines)
logger.clear()
- with mock_check_drive(isdir=True):
- locations = [
- (loc.path, loc.device, loc.partition, loc.policy)
- for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=False, logger=logger)]
- debug_lines = logger.get_lines_for_level('debug')
- self.assertEqual([
- 'Skipping %s: Not a directory' % os.path.join(
- tmpdir, "garbage"),
- ], debug_lines)
- logger.clear()
+
with mock_check_drive() as mocks:
mocks['ismount'].side_effect = lambda path: (
False if path.endswith('garbage') else True)
locations = [
(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
- devices=tmpdir, mount_check=True, logger=logger)]
+ devices=tmpdir, datadir="objects", mount_check=True,
+ logger=logger)]
debug_lines = logger.get_lines_for_level('debug')
self.assertEqual([
'Skipping garbage as it is not mounted',
@@ -550,10 +531,10 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
# so that errors get logged and a human can see what's going wrong;
# only normal FS corruption should be skipped over silently.
- def list_locations(dirname):
+ def list_locations(dirname, datadir):
return [(loc.path, loc.device, loc.partition, loc.policy)
for loc in diskfile.object_audit_location_generator(
- devices=dirname, mount_check=False)]
+ devices=dirname, datadir=datadir, mount_check=False)]
real_listdir = os.listdir
@@ -570,30 +551,34 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
"2607", "b54",
"fe450ec990a88cc4b252b181bab04b54"))
with mock.patch('os.listdir', splode_if_endswith("sdf/objects")):
- self.assertRaises(OSError, list_locations, tmpdir)
+ self.assertRaises(OSError, list_locations, tmpdir, "objects")
with mock.patch('os.listdir', splode_if_endswith("2607")):
- self.assertRaises(OSError, list_locations, tmpdir)
+ self.assertRaises(OSError, list_locations, tmpdir, "objects")
with mock.patch('os.listdir', splode_if_endswith("b54")):
- self.assertRaises(OSError, list_locations, tmpdir)
+ self.assertRaises(OSError, list_locations, tmpdir, "objects")
def test_auditor_status(self):
with temptree([]) as tmpdir:
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b"))
os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b"))
+ datadir = "objects"
# Pretend that some time passed between each partition
with mock.patch('os.stat') as mock_stat, \
mock_check_drive(isdir=True):
mock_stat.return_value.st_mtime = time() - 60
# Auditor starts, there are two partitions to check
- gen = diskfile.object_audit_location_generator(tmpdir, False)
+ gen = diskfile.object_audit_location_generator(tmpdir,
+ datadir,
+ False)
gen.next()
gen.next()
# Auditor stopped for some reason without raising StopIterator in
# the generator and restarts There is now only one remaining
# partition to check
- gen = diskfile.object_audit_location_generator(tmpdir, False)
+ gen = diskfile.object_audit_location_generator(tmpdir, datadir,
+ False)
with mock_check_drive(isdir=True):
gen.next()
@@ -602,17 +587,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase):
# There are no partitions to check if the auditor restarts another
# time and the status files have not been cleared
- gen = diskfile.object_audit_location_generator(tmpdir, False)
+ gen = diskfile.object_audit_location_generator(tmpdir, datadir,
+ False)
with mock_check_drive(isdir=True):
self.assertRaises(StopIteration, gen.next)
# Reset status file
- diskfile.clear_auditor_status(tmpdir)
+ diskfile.clear_auditor_status(tmpdir, datadir)
# If the auditor restarts another time, we expect to
# check two partitions again, because the remaining
# partitions were empty and a new listdir was executed
- gen = diskfile.object_audit_location_generator(tmpdir, False)
+ gen = diskfile.object_audit_location_generator(tmpdir, datadir,
+ False)
with mock_check_drive(isdir=True):
gen.next()
gen.next()
@@ -985,7 +972,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
self.df_mgr.logger.increment.assert_called_with('async_pendings')
def test_object_audit_location_generator(self):
- locations = list(self.df_mgr.object_audit_location_generator())
+ locations = list(
+ self.df_mgr.object_audit_location_generator(POLICIES[0]))
self.assertEqual(locations, [])
def test_replication_one_per_device_deprecation(self):