From 8d7e245eb599b54d2bcc3c7a652b85987fdb36e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20L=C3=A9cuyer?= Date: Thu, 13 Jul 2017 15:44:56 +0200 Subject: Change object_audit_location_generator() to yield for a single policy. auditor.py currently relies on POLICY[0] object_audit_location_generator() to yield an AuditLocation for all policies on the object-server. The changes in this patch are : - object_audit_location_generator() yields AuditLocation only for the requested policy - audit_all_objects() calls object_audit_location_generator() for each policy Closes-Bug: 1534212 Change-Id: Ida92ba0a5e1e486a4f7132c6539460b38c34ec87 --- swift/obj/auditor.py | 28 ++++++----- swift/obj/diskfile.py | 108 ++++++++++++++++++---------------------- test/unit/obj/test_auditor.py | 2 +- test/unit/obj/test_diskfile.py | 110 ++++++++++++++++++----------------------- 4 files changed, 114 insertions(+), 134 deletions(-) diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index c3245482b..490582d64 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -27,7 +27,7 @@ from eventlet import Timeout from swift.obj import diskfile, replicator from swift.common.utils import ( get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir, - unlink_paths_older_than, readconf, config_auto_int_value) + unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter) from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\ DiskFileDeleted, DiskFileExpired from swift.common.daemon import Daemon @@ -120,18 +120,17 @@ class AuditorWorker(object): total_quarantines = 0 total_errors = 0 time_auditing = 0 - # TODO: we should move audit-location generation to the storage policy, - # as we may (conceivably) have a different filesystem layout for each. - # We'd still need to generate the policies to audit from the actual - # directories found on-disk, and have appropriate error reporting if we - # find a directory that doesn't correspond to any known policy. This - # will require a sizable refactor, but currently all diskfile managers - # can find all diskfile locations regardless of policy -- so for now - # just use Policy-0's manager. - all_locs = (self.diskfile_router[POLICIES[0]] + + # get AuditLocations for each policy + loc_generators = [] + for policy in POLICIES: + loc_generators.append( + self.diskfile_router[policy] .object_audit_location_generator( - device_dirs=device_dirs, + policy, device_dirs=device_dirs, auditor_type=self.auditor_type)) + + all_locs = round_robin_iter(loc_generators) for location in all_locs: loop_time = time.time() self.failsafe_object_audit(location) @@ -192,8 +191,11 @@ class AuditorWorker(object): self.logger.info( _('Object audit stats: %s') % json.dumps(self.stats_buckets)) - # Unset remaining partitions to not skip them in the next run - diskfile.clear_auditor_status(self.devices, self.auditor_type) + for policy in POLICIES: + # Unset remaining partitions to not skip them in the next run + self.diskfile_router[policy].clear_auditor_status( + policy, + self.auditor_type) def record_stats(self, obj_size): """ diff --git a/swift/obj/diskfile.py b/swift/obj/diskfile.py index f7e22161b..323ee5a99 100644 --- a/swift/obj/diskfile.py +++ b/swift/obj/diskfile.py @@ -453,18 +453,20 @@ class AuditLocation(object): return str(self.path) -def object_audit_location_generator(devices, mount_check=True, logger=None, - device_dirs=None, auditor_type="ALL"): +def object_audit_location_generator(devices, datadir, mount_check=True, + logger=None, device_dirs=None, + auditor_type="ALL"): """ Given a devices path (e.g. "/srv/node"), yield an AuditLocation for all - objects stored under that directory if device_dirs isn't set. If - device_dirs is set, only yield AuditLocation for the objects under the - entries in device_dirs. The AuditLocation only knows the path to the hash - directory, not to the .data file therein (if any). This is to avoid a - double listdir(hash_dir); the DiskFile object will always do one, so - we don't. + objects stored under that directory for the given datadir (policy), + if device_dirs isn't set. If device_dirs is set, only yield AuditLocation + for the objects under the entries in device_dirs. The AuditLocation only + knows the path to the hash directory, not to the .data file therein + (if any). This is to avoid a double listdir(hash_dir); the DiskFile object + will always do one, so we don't. :param devices: parent directory of the devices to be audited + :param datadir: objects directory :param mount_check: flag to check if a mount check should be performed on devices :param logger: a logger object @@ -480,6 +482,7 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, # randomize devices in case of process restart before sweep completed shuffle(device_dirs) + base, policy = split_policy_string(datadir) for device in device_dirs: if not check_drive(devices, device, mount_check): if logger: @@ -487,55 +490,37 @@ def object_audit_location_generator(devices, mount_check=True, logger=None, 'Skipping %s as it is not %s', device, 'mounted' if mount_check else 'a dir') continue - # loop through object dirs for all policies - device_dir = os.path.join(devices, device) - try: - dirs = os.listdir(device_dir) - except OSError as e: - if logger: - logger.debug( - _('Skipping %(dir)s: %(err)s') % {'dir': device_dir, - 'err': e.strerror}) + + datadir_path = os.path.join(devices, device, datadir) + if not os.path.exists(datadir_path): continue - for dir_ in dirs: - if not dir_.startswith(DATADIR_BASE): - continue - try: - base, policy = split_policy_string(dir_) - except PolicyError as e: - if logger: - logger.warning(_('Directory %(directory)r does not map ' - 'to a valid policy (%(error)s)') % { - 'directory': dir_, 'error': e}) - continue - datadir_path = os.path.join(devices, device, dir_) - partitions = get_auditor_status(datadir_path, logger, auditor_type) + partitions = get_auditor_status(datadir_path, logger, auditor_type) - for pos, partition in enumerate(partitions): - update_auditor_status(datadir_path, logger, - partitions[pos:], auditor_type) - part_path = os.path.join(datadir_path, partition) + for pos, partition in enumerate(partitions): + update_auditor_status(datadir_path, logger, + partitions[pos:], auditor_type) + part_path = os.path.join(datadir_path, partition) + try: + suffixes = listdir(part_path) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + continue + for asuffix in suffixes: + suff_path = os.path.join(part_path, asuffix) try: - suffixes = listdir(part_path) + hashes = listdir(suff_path) except OSError as e: if e.errno != errno.ENOTDIR: raise continue - for asuffix in suffixes: - suff_path = os.path.join(part_path, asuffix) - try: - hashes = listdir(suff_path) - except OSError as e: - if e.errno != errno.ENOTDIR: - raise - continue - for hsh in hashes: - hsh_path = os.path.join(suff_path, hsh) - yield AuditLocation(hsh_path, device, partition, - policy) + for hsh in hashes: + hsh_path = os.path.join(suff_path, hsh) + yield AuditLocation(hsh_path, device, partition, + policy) - update_auditor_status(datadir_path, logger, [], auditor_type) + update_auditor_status(datadir_path, logger, [], auditor_type) def get_auditor_status(datadir_path, logger, auditor_type): @@ -589,15 +574,13 @@ def update_auditor_status(datadir_path, logger, partitions, auditor_type): {'auditor_status': auditor_status, 'err': e}) -def clear_auditor_status(devices, auditor_type="ALL"): - for device in os.listdir(devices): - for dir_ in os.listdir(os.path.join(devices, device)): - if not dir_.startswith("objects"): - continue - datadir_path = os.path.join(devices, device, dir_) - auditor_status = os.path.join( - datadir_path, "auditor_status_%s.json" % auditor_type) - remove_file(auditor_status) +def clear_auditor_status(devices, datadir, auditor_type="ALL"): + device_dirs = listdir(devices) + for device in device_dirs: + datadir_path = os.path.join(devices, device, datadir) + auditor_status = os.path.join( + datadir_path, "auditor_status_%s.json" % auditor_type) + remove_file(auditor_status) def strip_self(f): @@ -1340,15 +1323,22 @@ class BaseDiskFileManager(object): pipe_size=self.pipe_size, use_linkat=self.use_linkat, **kwargs) - def object_audit_location_generator(self, device_dirs=None, + def clear_auditor_status(self, policy, auditor_type="ALL"): + datadir = get_data_dir(policy) + clear_auditor_status(self.devices, datadir, auditor_type) + + def object_audit_location_generator(self, policy, device_dirs=None, auditor_type="ALL"): """ Yield an AuditLocation for all objects stored under device_dirs. + :param policy: the StoragePolicy instance :param device_dirs: directory of target device :param auditor_type: either ALL or ZBF """ - return object_audit_location_generator(self.devices, self.mount_check, + datadir = get_data_dir(policy) + return object_audit_location_generator(self.devices, datadir, + self.mount_check, self.logger, device_dirs, auditor_type) diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 5615e9a2a..06c919bff 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -852,7 +852,7 @@ class TestAuditor(unittest.TestCase): self.auditor.run_audit(**kwargs) self.assertFalse(os.path.isdir(quarantine_path)) del(kwargs['zero_byte_fps']) - clear_auditor_status(self.devices) + clear_auditor_status(self.devices, 'objects') self.auditor.run_audit(**kwargs) self.assertTrue(os.path.isdir(quarantine_path)) diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index 76bd38522..96c2a622e 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -375,17 +375,6 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): "fcd938702024c25fef6c32fef05298eb")) os.makedirs(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5", "4a943bc72c2e647c4675923d58cf4ca5")) - os.makedirs(os.path.join(tmpdir, "sdq", "objects-2", "9971", "8eb", - "fcd938702024c25fef6c32fef05298eb")) - os.makedirs(os.path.join(tmpdir, "sdq", "objects-99", "9972", - "8eb", - "fcd938702024c25fef6c32fef05298eb")) - # the bad - os.makedirs(os.path.join(tmpdir, "sdq", "objects-", "1135", - "6c3", - "fcd938702024c25fef6c32fef05298eb")) - os.makedirs(os.path.join(tmpdir, "sdq", "objects-fud", "foo")) - os.makedirs(os.path.join(tmpdir, "sdq", "objects-+1", "foo")) self._make_file(os.path.join(tmpdir, "sdp", "objects", "1519", "fed")) @@ -404,27 +393,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): "4f9eee668b66c6f0250bfa3c7ab9e51e")) logger = debug_logger() - locations = [(loc.path, loc.device, loc.partition, loc.policy) - for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=False, - logger=logger)] + loc_generators = [] + datadirs = ["objects", "objects-1"] + for datadir in datadirs: + loc_generators.append( + diskfile.object_audit_location_generator( + devices=tmpdir, datadir=datadir, mount_check=False, + logger=logger)) + + all_locs = itertools.chain(*loc_generators) + locations = [(loc.path, loc.device, loc.partition, loc.policy) for + loc in all_locs] locations.sort() - # expect some warnings about those bad dirs - warnings = logger.get_lines_for_level('warning') - self.assertEqual(set(warnings), set([ - ("Directory 'objects-' does not map to a valid policy " - "(Unknown policy, for index '')"), - ("Directory 'objects-2' does not map to a valid policy " - "(Unknown policy, for index '2')"), - ("Directory 'objects-99' does not map to a valid policy " - "(Unknown policy, for index '99')"), - ("Directory 'objects-fud' does not map to a valid policy " - "(Unknown policy, for index 'fud')"), - ("Directory 'objects-+1' does not map to a valid policy " - "(Unknown policy, for index '+1')"), - ])) - expected = \ [(os.path.join(tmpdir, "sdp", "objects-1", "9970", "ca5", "4a943bc72c2e647c4675923d58cf4ca5"), @@ -448,12 +429,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): self.assertEqual(locations, expected) # Reset status file for next run - diskfile.clear_auditor_status(tmpdir) + for datadir in datadirs: + diskfile.clear_auditor_status(tmpdir, datadir) # now without a logger - locations = [(loc.path, loc.device, loc.partition, loc.policy) - for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=False)] + for datadir in datadirs: + loc_generators.append( + diskfile.object_audit_location_generator( + devices=tmpdir, datadir=datadir, mount_check=False, + logger=logger)) + + all_locs = itertools.chain(*loc_generators) + locations = [(loc.path, loc.device, loc.partition, loc.policy) for + loc in all_locs] locations.sort() self.assertEqual(locations, expected) @@ -470,7 +458,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): locations = [ (loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=True)] + devices=tmpdir, datadir="objects", mount_check=True)] locations.sort() self.assertEqual( @@ -485,7 +473,8 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): locations = [ (loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=True, logger=logger)] + devices=tmpdir, datadir="objects", mount_check=True, + logger=logger)] debug_lines = logger.get_lines_for_level('debug') self.assertEqual([ 'Skipping sdq as it is not mounted', @@ -502,7 +491,7 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): locations = [ (loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=False)] + devices=tmpdir, datadir="objects", mount_check=False)] self.assertEqual( locations, @@ -516,30 +505,22 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): locations = [ (loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=False, logger=logger)] + devices=tmpdir, datadir="objects", mount_check=False, + logger=logger)] debug_lines = logger.get_lines_for_level('debug') self.assertEqual([ 'Skipping garbage as it is not a dir', ], debug_lines) logger.clear() - with mock_check_drive(isdir=True): - locations = [ - (loc.path, loc.device, loc.partition, loc.policy) - for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=False, logger=logger)] - debug_lines = logger.get_lines_for_level('debug') - self.assertEqual([ - 'Skipping %s: Not a directory' % os.path.join( - tmpdir, "garbage"), - ], debug_lines) - logger.clear() + with mock_check_drive() as mocks: mocks['ismount'].side_effect = lambda path: ( False if path.endswith('garbage') else True) locations = [ (loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( - devices=tmpdir, mount_check=True, logger=logger)] + devices=tmpdir, datadir="objects", mount_check=True, + logger=logger)] debug_lines = logger.get_lines_for_level('debug') self.assertEqual([ 'Skipping garbage as it is not mounted', @@ -550,10 +531,10 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): # so that errors get logged and a human can see what's going wrong; # only normal FS corruption should be skipped over silently. - def list_locations(dirname): + def list_locations(dirname, datadir): return [(loc.path, loc.device, loc.partition, loc.policy) for loc in diskfile.object_audit_location_generator( - devices=dirname, mount_check=False)] + devices=dirname, datadir=datadir, mount_check=False)] real_listdir = os.listdir @@ -570,30 +551,34 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): "2607", "b54", "fe450ec990a88cc4b252b181bab04b54")) with mock.patch('os.listdir', splode_if_endswith("sdf/objects")): - self.assertRaises(OSError, list_locations, tmpdir) + self.assertRaises(OSError, list_locations, tmpdir, "objects") with mock.patch('os.listdir', splode_if_endswith("2607")): - self.assertRaises(OSError, list_locations, tmpdir) + self.assertRaises(OSError, list_locations, tmpdir, "objects") with mock.patch('os.listdir', splode_if_endswith("b54")): - self.assertRaises(OSError, list_locations, tmpdir) + self.assertRaises(OSError, list_locations, tmpdir, "objects") def test_auditor_status(self): with temptree([]) as tmpdir: os.makedirs(os.path.join(tmpdir, "sdf", "objects", "1", "a", "b")) os.makedirs(os.path.join(tmpdir, "sdf", "objects", "2", "a", "b")) + datadir = "objects" # Pretend that some time passed between each partition with mock.patch('os.stat') as mock_stat, \ mock_check_drive(isdir=True): mock_stat.return_value.st_mtime = time() - 60 # Auditor starts, there are two partitions to check - gen = diskfile.object_audit_location_generator(tmpdir, False) + gen = diskfile.object_audit_location_generator(tmpdir, + datadir, + False) gen.next() gen.next() # Auditor stopped for some reason without raising StopIterator in # the generator and restarts There is now only one remaining # partition to check - gen = diskfile.object_audit_location_generator(tmpdir, False) + gen = diskfile.object_audit_location_generator(tmpdir, datadir, + False) with mock_check_drive(isdir=True): gen.next() @@ -602,17 +587,19 @@ class TestObjectAuditLocationGenerator(unittest.TestCase): # There are no partitions to check if the auditor restarts another # time and the status files have not been cleared - gen = diskfile.object_audit_location_generator(tmpdir, False) + gen = diskfile.object_audit_location_generator(tmpdir, datadir, + False) with mock_check_drive(isdir=True): self.assertRaises(StopIteration, gen.next) # Reset status file - diskfile.clear_auditor_status(tmpdir) + diskfile.clear_auditor_status(tmpdir, datadir) # If the auditor restarts another time, we expect to # check two partitions again, because the remaining # partitions were empty and a new listdir was executed - gen = diskfile.object_audit_location_generator(tmpdir, False) + gen = diskfile.object_audit_location_generator(tmpdir, datadir, + False) with mock_check_drive(isdir=True): gen.next() gen.next() @@ -985,7 +972,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin): self.df_mgr.logger.increment.assert_called_with('async_pendings') def test_object_audit_location_generator(self): - locations = list(self.df_mgr.object_audit_location_generator()) + locations = list( + self.df_mgr.object_audit_location_generator(POLICIES[0])) self.assertEqual(locations, []) def test_replication_one_per_device_deprecation(self): -- cgit v1.2.1