From c4e00eb89f34de79d5fb123dd044621ef4df679c Mon Sep 17 00:00:00 2001 From: Matthew Oliver Date: Thu, 21 Jul 2022 12:32:27 +1000 Subject: Sharder: Fall back to local device in get_shard_broker If the sharder is processing a node that has 0 weight, especially for all the devices on the node, the `find_local_handoff_for_part` can fail because there will be no local hand off devices available as it uses the replica2part2dev_id to find a device. However, a 0 weighted device won't appear in the replica2part2dev table. This patch extends `find_local_handoff_for_part`, if it fails to find a node from the ring it'll fall back to a local device identified by the `_local_device_ids` that is built up when the replicator or sharder was identifing local devices. This uses the ring.devs, so does include 0 weighted devices. This allows the sharder to find a location to write the shard_broker in a handoff location while sharding. Co-Authored-By: Tim Burke Change-Id: Ic38698e9ca0397770c7362229baef1101a72788f --- swift/common/db_replicator.py | 6 +-- swift/container/replicator.py | 43 ++++++++++----- swift/container/sharder.py | 38 ++++++-------- test/unit/__init__.py | 1 + test/unit/common/test_db_replicator.py | 4 +- test/unit/container/test_replicator.py | 69 ++++++++++++++++++++++++ test/unit/container/test_sharder.py | 96 ++++++++++++++++++++++++++-------- 7 files changed, 193 insertions(+), 64 deletions(-) diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 0db65b2a3..3e82e2dd8 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -196,7 +196,7 @@ class Replicator(Daemon): self.cpool = GreenPool(size=concurrency) swift_dir = conf.get('swift_dir', '/etc/swift') self.ring = ring.Ring(swift_dir, ring_name=self.server_type) - self._local_device_ids = set() + self._local_device_ids = {} self.per_diff = int(conf.get('per_diff', 1000)) self.max_diffs = int(conf.get('max_diffs') or 100) self.interval = float(conf.get('interval') or @@ -795,7 +795,7 @@ class Replicator(Daemon): 'These modes are not intended for normal ' 'operation; use these options with care.') - self._local_device_ids = set() + self._local_device_ids = {} found_local = False for node in self.ring.devs: if node and is_local_device(ips, self.port, @@ -822,7 +822,7 @@ class Replicator(Daemon): time.time() - self.reclaim_age) datadir = os.path.join(self.root, node['device'], self.datadir) if os.path.isdir(datadir): - self._local_device_ids.add(node['id']) + self._local_device_ids[node['id']] = node part_filt = self._partition_dir_filter( node['id'], partitions_to_replicate) dirs.append((datadir, node['id'], part_filt)) diff --git a/swift/container/replicator.py b/swift/container/replicator.py index d45a5e8df..ab1350865 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -14,10 +14,10 @@ # limitations under the License. import os -import itertools import json from collections import defaultdict from eventlet import Timeout +from random import choice from swift.container.sync_store import ContainerSyncStore from swift.container.backend import ContainerBroker, DATADIR, SHARDED @@ -27,7 +27,6 @@ from swift.container.reconciler import ( from swift.common import db_replicator from swift.common.storage_policy import POLICIES from swift.common.swob import HTTPOk, HTTPAccepted -from swift.common.exceptions import DeviceUnavailable from swift.common.http import is_success from swift.common.utils import Timestamp, majority_size, get_db_files @@ -144,18 +143,37 @@ class ContainerReplicator(db_replicator.Replicator): def find_local_handoff_for_part(self, part): """ - Look through devices in the ring for the first handoff device that was - identified during job creation as available on this node. + Find a device in the ring that is on this node on which to place a + partition. Preference is given to a device that is a primary location + for the partition. If no such device is found then a local device with + weight is chosen, and failing that any local device. + :param part: a partition :returns: a node entry from the ring """ - nodes = self.ring.get_part_nodes(part) - more_nodes = self.ring.get_more_nodes(part) + if not self._local_device_ids: + raise RuntimeError('Cannot find local handoff; no local devices') - for node in itertools.chain(nodes, more_nodes): + for node in self.ring.get_part_nodes(part): if node['id'] in self._local_device_ids: return node - return None + + # don't attempt to minimize handoff depth: just choose any local + # device, but start by only picking a device with a weight, just in + # case some devices are being drained... + local_devs_with_weight = [ + dev for dev in self._local_device_ids.values() + if dev.get('weight', 0)] + if local_devs_with_weight: + return choice(local_devs_with_weight) + + # we have to return something, so choose any local device.. + node = choice(list(self._local_device_ids.values())) + self.logger.warning( + "Could not find a non-zero weight device for handoff partition " + "%d, falling back device %s" % + (part, node['device'])) + return node def get_reconciler_broker(self, timestamp): """ @@ -173,10 +191,6 @@ class ContainerReplicator(db_replicator.Replicator): account = MISPLACED_OBJECTS_ACCOUNT part = self.ring.get_part(account, container) node = self.find_local_handoff_for_part(part) - if not node: - raise DeviceUnavailable( - 'No mounted devices found suitable to Handoff reconciler ' - 'container %s in partition %s' % (container, part)) broker = ContainerBroker.create_broker( os.path.join(self.root, node['device']), part, account, container, logger=self.logger, put_timestamp=timestamp, @@ -198,8 +212,9 @@ class ContainerReplicator(db_replicator.Replicator): try: reconciler = self.get_reconciler_broker(container) - except DeviceUnavailable as e: - self.logger.warning('DeviceUnavailable: %s', e) + except Exception: + self.logger.exception('Failed to get reconciler broker for ' + 'container %s', container) return False self.logger.debug('Adding %d objects to the reconciler at %s', len(item_list), reconciler.db_file) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 7c00716a2..3f2537513 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -31,7 +31,6 @@ from swift.common import internal_client from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX from swift.common.direct_client import (direct_put_container, DirectClientException) -from swift.common.exceptions import DeviceUnavailable from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.ring.utils import is_local_device from swift.common.swob import str_to_wsgi @@ -1089,19 +1088,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): :param shard_range: a :class:`~swift.common.utils.ShardRange` :param root_path: the path of the shard's root container :param policy_index: the storage policy index - :returns: a tuple of ``(part, broker, node_id)`` where ``part`` is the - shard container's partition, ``broker`` is an instance of + :returns: a tuple of ``(part, broker, node_id, put_timestamp)`` where + ``part`` is the shard container's partition, + ``broker`` is an instance of :class:`~swift.container.backend.ContainerBroker`, - ``node_id`` is the id of the selected node. + ``node_id`` is the id of the selected node, + ``put_timestamp`` is the put_timestamp if the broker needed to + be initialized. """ part = self.ring.get_part(shard_range.account, shard_range.container) node = self.find_local_handoff_for_part(part) - put_timestamp = Timestamp.now().internal - if not node: - raise DeviceUnavailable( - 'No mounted devices found suitable for creating shard broker ' - 'for %s in partition %s' % (quote(shard_range.name), part)) + put_timestamp = Timestamp.now().internal shard_broker = ContainerBroker.create_broker( os.path.join(self.root, node['device']), part, shard_range.account, shard_range.container, epoch=shard_range.epoch, @@ -1830,18 +1828,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): quote(shard_range.name), shard_range) self._increment_stat('cleaved', 'attempted') policy_index = broker.storage_policy_index - try: - shard_part, shard_broker, node_id, put_timestamp = \ - self._get_shard_broker(shard_range, broker.root_path, - policy_index) - except DeviceUnavailable as duex: - self.logger.warning(str(duex)) - self._increment_stat('cleaved', 'failure', statsd=True) - return CLEAVE_FAILED - else: - return self._cleave_shard_broker( - broker, cleaving_context, shard_range, own_shard_range, - shard_broker, put_timestamp, shard_part, node_id) + shard_part, shard_broker, node_id, put_timestamp = \ + self._get_shard_broker(shard_range, broker.root_path, + policy_index) + return self._cleave_shard_broker( + broker, cleaving_context, shard_range, own_shard_range, + shard_broker, put_timestamp, shard_part, node_id) def _cleave(self, broker): # Returns True if misplaced objects have been moved and the entire @@ -2184,7 +2176,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self.logger.info('(Override partitions: %s)', ', '.join(str(p) for p in partitions_to_shard)) self._zero_stats() - self._local_device_ids = set() + self._local_device_ids = {} dirs = [] self.ips = whataremyips(self.bind_ip) for node in self.ring.devs: @@ -2195,7 +2187,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if os.path.isdir(datadir): # Populate self._local_device_ids so we can find devices for # shard containers later - self._local_device_ids.add(node['id']) + self._local_device_ids[node['id']] = node if node['device'] not in devices_to_shard: continue part_filt = self._partition_dir_filter( diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 266763e0a..e57b57453 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -279,6 +279,7 @@ class FakeRing(Ring): 'zone': x % 3, 'region': x % 2, 'id': x, + 'weight': 1, } self.add_node(dev) diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index deffd4151..485773a0b 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -2213,10 +2213,10 @@ class TestReplicatorSync(unittest.TestCase): for node in self._ring.devs: daemon = self._run_once(node) if node['device'] == 'sdc': - self.assertEqual(daemon._local_device_ids, set()) + self.assertEqual(daemon._local_device_ids, {}) else: self.assertEqual(daemon._local_device_ids, - set([node['id']])) + {node['id']: node}) def test_clean_up_after_deleted_brokers(self): broker = self._get_broker('a', 'c', node_index=0) diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 31fccd17d..ba2a3c0d4 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -2635,6 +2635,75 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): os.path.basename(rsync_calls[0][1])) self.assertFalse(rsync_calls[1:]) + @mock.patch('swift.common.ring.ring.Ring.get_part_nodes', return_value=[]) + def test_find_local_handoff_for_part(self, mock_part_nodes): + + with mock.patch( + 'swift.common.db_replicator.ring.Ring', + return_value=self._ring): + daemon = replicator.ContainerReplicator({}, logger=self.logger) + + # First let's assume we find a primary node + ring_node1, ring_node2, ring_node3 = daemon.ring.devs[-3:] + mock_part_nodes.return_value = [ring_node1, ring_node2] + daemon._local_device_ids = {ring_node1['id']: ring_node1, + ring_node3['id']: ring_node3} + node = daemon.find_local_handoff_for_part(0) + self.assertEqual(node['id'], ring_node1['id']) + + # And if we can't find one from the primaries get *some* local device + mock_part_nodes.return_value = [] + daemon._local_device_ids = {ring_node3['id']: ring_node3} + node = daemon.find_local_handoff_for_part(0) + self.assertEqual(node['id'], ring_node3['id']) + + # if there are more then 1 local_dev_id it'll randomly pick one, but + # not a zero-weight device + ring_node3['weight'] = 0 + selected_node_ids = set() + local_dev_ids = {dev['id']: dev for dev in daemon.ring.devs[-3:]} + daemon._local_device_ids = local_dev_ids + for _ in range(15): + node = daemon.find_local_handoff_for_part(0) + self.assertIn(node['id'], local_dev_ids) + selected_node_ids.add(node['id']) + if len(selected_node_ids) == 3: + break # unexpected + self.assertEqual(len(selected_node_ids), 2) + self.assertEqual([1, 1], [local_dev_ids[dev_id]['weight'] + for dev_id in selected_node_ids]) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertFalse(warning_lines) + + # ...unless all devices have zero-weight + ring_node3['weight'] = 0 + ring_node2['weight'] = 0 + selected_node_ids = set() + local_dev_ids = {dev['id']: dev for dev in daemon.ring.devs[-2:]} + daemon._local_device_ids = local_dev_ids + for _ in range(15): + self.logger.clear() + node = daemon.find_local_handoff_for_part(0) + self.assertIn(node['id'], local_dev_ids) + selected_node_ids.add(node['id']) + if len(selected_node_ids) == 2: + break # expected + self.assertEqual(len(selected_node_ids), 2) + self.assertEqual([0, 0], [local_dev_ids[dev_id]['weight'] + for dev_id in selected_node_ids]) + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_lines), warning_lines) + self.assertIn( + 'Could not find a non-zero weight device for handoff partition', + warning_lines[0]) + + # If there are also no local_dev_ids, then we'll get the RuntimeError + daemon._local_device_ids = {} + with self.assertRaises(RuntimeError) as dev_err: + daemon.find_local_handoff_for_part(0) + expected_error_string = 'Cannot find local handoff; no local devices' + self.assertEqual(str(dev_err.exception), expected_error_string) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index aab8e5721..93670cc51 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -682,14 +682,14 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), mock.patch.object( sharder, '_process_broker' ) as mock_process_broker: - sharder._local_device_ids = {'stale_node_id'} + sharder._local_device_ids = {'stale_node_id': {}} sharder._one_shard_cycle(Everything(), Everything()) lines = sharder.logger.get_lines_for_level('warning') expected = 'Skipping %s as it is not mounted' % \ unmounted_dev['device'] self.assertIn(expected, lines[0]) - self.assertEqual(device_ids, sharder._local_device_ids) + self.assertEqual(device_ids, set(sharder._local_device_ids.keys())) self.assertEqual(2, mock_process_broker.call_count) processed_paths = [call[0][0].path for call in mock_process_broker.call_args_list] @@ -741,14 +741,14 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), mock.patch.object( sharder, '_process_broker', side_effect=mock_processing ) as mock_process_broker: - sharder._local_device_ids = {'stale_node_id'} + sharder._local_device_ids = {'stale_node_id': {}} sharder._one_shard_cycle(Everything(), Everything()) lines = sharder.logger.get_lines_for_level('warning') expected = 'Skipping %s as it is not mounted' % \ unmounted_dev['device'] self.assertIn(expected, lines[0]) - self.assertEqual(device_ids, sharder._local_device_ids) + self.assertEqual(device_ids, set(sharder._local_device_ids.keys())) self.assertEqual(3, mock_process_broker.call_count) processed_paths = [call[0][0].path for call in mock_process_broker.call_args_list] @@ -799,10 +799,10 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), mock.patch.object( sharder, '_process_broker' ) as mock_process_broker: - sharder._local_device_ids = {999} + sharder._local_device_ids = {999: {}} sharder._one_shard_cycle(Everything(), Everything()) - self.assertEqual(device_ids, sharder._local_device_ids) + self.assertEqual(device_ids, set(sharder._local_device_ids.keys())) self.assertEqual(3, mock_process_broker.call_count) processed_paths = [call[0][0].path for call in mock_process_broker.call_args_list] @@ -826,7 +826,7 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), mock.patch.object( sharder, '_process_broker' ) as mock_process_broker: - sharder._local_device_ids = {999} + sharder._local_device_ids = {999: {}} sharder._one_shard_cycle(Everything(), Everything()) expected_in_progress_stats = { @@ -866,7 +866,7 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), \ mock.patch.object(sharder, '_process_broker') \ as mock_process_broker, mock_timestamp_now(ts_now): - sharder._local_device_ids = {999} + sharder._local_device_ids = {999: {}} sharder._one_shard_cycle(Everything(), Everything()) expected_in_progress_stats = { @@ -896,7 +896,7 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), \ mock.patch.object(sharder, '_process_broker') \ as mock_process_broker, mock_timestamp_now(ts_now): - sharder._local_device_ids = {999} + sharder._local_device_ids = {999: {}} sharder._one_shard_cycle(Everything(), Everything()) self._assert_stats( expected_in_progress_stats, sharder, 'sharding_in_progress') @@ -908,7 +908,7 @@ class TestSharder(BaseTestSharder): with mock.patch('eventlet.sleep'), \ mock.patch.object(sharder, '_process_broker') \ as mock_process_broker, mock_timestamp_now(ts_now): - sharder._local_device_ids = {999} + sharder._local_device_ids = {999: {}} sharder._one_shard_cycle(Everything(), Everything()) expected_in_progress_stats = { @@ -1005,7 +1005,8 @@ class TestSharder(BaseTestSharder): 'swift.common.db_replicator.ring.Ring', return_value=fake_ring): sharder = ContainerSharder(conf, logger=self.logger) - sharder._local_device_ids = {0, 1, 2} + sharder._local_device_ids = {dev['id']: dev + for dev in fake_ring.devs} sharder._replicate_object = mock.MagicMock( return_value=(True, [True] * sharder.ring.replica_count)) yield sharder @@ -5558,9 +5559,10 @@ class TestSharder(BaseTestSharder): self.assertEqual([], self.logger.get_lines_for_level('warning')) # advance time - with mock.patch('swift.container.sharder.time.time') as fake_time, \ - self._mock_sharder() as sharder: - fake_time.return_value = 6048000 + float(delete_ts) + future_time = 6048000 + float(delete_ts) + with mock.patch( + 'swift.container.sharder.time.time', + return_value=future_time), self._mock_sharder() as sharder: sharder._audit_container(broker) message = 'Reclaimable db stuck waiting for shrinking: %s (%s)' % ( broker.db_file, broker.path) @@ -5574,9 +5576,9 @@ class TestSharder(BaseTestSharder): broker.merge_shard_ranges(shard_ranges) # no more warning - with mock.patch('swift.container.sharder.time.time') as fake_time, \ - self._mock_sharder() as sharder: - fake_time.return_value = 6048000 + float(delete_ts) + with mock.patch( + 'swift.container.sharder.time.time', + return_value=future_time), self._mock_sharder() as sharder: sharder._audit_container(broker) self.assertEqual([], self.logger.get_lines_for_level('warning')) @@ -6049,7 +6051,7 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker') as mock_process_broker: sharder.run_once() - self.assertEqual(dev_ids, set(sharder._local_device_ids)) + self.assertEqual(dev_ids, set(sharder._local_device_ids.keys())) self.assertEqual(set(container_data), set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) @@ -6061,7 +6063,7 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker') as mock_process_broker: sharder.run_once(partitions='0') - self.assertEqual(dev_ids, set(sharder._local_device_ids)) + self.assertEqual(dev_ids, set(sharder._local_device_ids.keys())) self.assertEqual(set([container_data[0]]), set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) @@ -6073,7 +6075,7 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker') as mock_process_broker: sharder.run_once(partitions='2,0') - self.assertEqual(dev_ids, set(sharder._local_device_ids)) + self.assertEqual(dev_ids, set(sharder._local_device_ids.keys())) self.assertEqual(set([container_data[0], container_data[2]]), set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) @@ -6085,7 +6087,7 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker') as mock_process_broker: sharder.run_once(partitions='2,0', devices='sdc') - self.assertEqual(dev_ids, set(sharder._local_device_ids)) + self.assertEqual(dev_ids, set(sharder._local_device_ids.keys())) self.assertEqual(set([container_data[2]]), set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) @@ -6097,7 +6099,7 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker') as mock_process_broker: sharder.run_once(devices='sdb,sdc') - self.assertEqual(dev_ids, set(sharder._local_device_ids)) + self.assertEqual(dev_ids, set(sharder._local_device_ids.keys())) self.assertEqual(set(container_data[1:]), set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) @@ -6403,6 +6405,56 @@ class TestSharder(BaseTestSharder): self._assert_recon_stats(expected_shrinking_candidates_data, sharder, 'shrinking_candidates') + @mock.patch('swift.common.ring.ring.Ring.get_part_nodes', return_value=[]) + @mock.patch('swift.common.ring.ring.Ring.get_more_nodes', return_value=[]) + def test_get_shard_broker_no_local_handoff_for_part( + self, mock_part_nodes, mock_more_nodes): + broker = self._make_broker() + broker.enable_sharding(Timestamp.now()) + + shard_bounds = (('', 'd'), ('d', 'x'), ('x', '')) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.CREATED) + + broker.merge_shard_ranges(shard_ranges) + self.assertTrue(broker.set_sharding_state()) + + # first, let's assume there local_handoff_for_part fails because the + # node we're on is at zero weight for all disks. So it wont appear in + # the replica2part2dev table, meaning we wont get a node back. + # in this case, we'll fall back to one of our own devices which we + # determine from the ring.devs not the replica2part2dev table. + with self._mock_sharder() as sharder: + local_dev_ids = {dev['id']: dev for dev in sharder.ring.devs[-1:]} + sharder._local_device_ids = local_dev_ids + part, shard_broker, node_id, _ = sharder._get_shard_broker( + shard_ranges[0], broker.root_path, 0) + self.assertIn(node_id, local_dev_ids) + + # if there are more then 1 local_dev_id it'll randomly pick one + selected_node_ids = set() + for _ in range(10): + with self._mock_sharder() as sharder: + local_dev_ids = {dev['id']: dev + for dev in sharder.ring.devs[-2:]} + sharder._local_device_ids = local_dev_ids + part, shard_broker, node_id, _ = sharder._get_shard_broker( + shard_ranges[0], broker.root_path, 0) + self.assertIn(node_id, local_dev_ids) + selected_node_ids.add(node_id) + if len(selected_node_ids) == 2: + break + self.assertEqual(len(selected_node_ids), 2) + + # If there are also no local_dev_ids, then we'll get the RuntimeError + with self._mock_sharder() as sharder: + sharder._local_device_ids = {} + with self.assertRaises(RuntimeError) as dev_err: + sharder._get_shard_broker(shard_ranges[0], broker.root_path, 0) + + expected_error_string = 'Cannot find local handoff; no local devices' + self.assertEqual(str(dev_err.exception), expected_error_string) + class TestCleavingContext(BaseTestSharder): def test_init(self): -- cgit v1.2.1