From 73f9f851053ffd37eb0d0fa0278ea198a9ab96c3 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Mon, 30 Apr 2018 14:45:31 +0100 Subject: Reset local device ids on each cycle Change-Id: Iebfede8423748319924e0729f1149c1348703e2b --- swift/container/sharder.py | 3 ++- test/unit/container/test_sharder.py | 50 +++++++++++++++++++++---------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index a13e270d6..b588e4b0b 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1429,13 +1429,14 @@ class ContainerSharder(ContainerReplicator): """ self.logger.info('Container sharder cycle starting, auto-sharding %s', self.auto_shard) - self._zero_stats() if isinstance(devices_to_shard, (list, tuple)): self.logger.info('(Override devices: %s)', ', '.join(str(d) for d in devices_to_shard)) if isinstance(partitions_to_shard, (list, tuple)): self.logger.info('(Override partitions: %s)', ', '.join(str(p) for p in partitions_to_shard)) + self._zero_stats() + self._local_device_ids = set() dirs = [] self.ips = whataremyips(bind_ip=self.bind_ip) for node in self.ring.devs: diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 8f9502559..8009a06fb 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -446,10 +446,11 @@ class TestSharder(BaseTestSharder): sharder.reported = time.time() sharder.logger = debug_logger() brokers = [] - for container in ('c1', 'c2', 'c3'): + device_ids = set(range(3)) + for device_id in device_ids: brokers.append(self._make_broker( - container=container, hash_=container + 'hash', - device=sharder.ring.devs[0]['device'], part=0)) + container='c%s' % device_id, hash_='c%shash' % device_id, + device=sharder.ring.devs[device_id]['device'], part=0)) # enable a/c2 and a/c3 for sharding for broker in brokers[1:]: broker.update_metadata({'X-Container-Sysmeta-Sharding': @@ -463,23 +464,25 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker' ) as mock_process_broker: + sharder._local_device_ids = {'stale_node_id'} sharder._one_shard_cycle(Everything(), Everything()) + self.assertEqual(device_ids, sharder._local_device_ids) self.assertEqual(2, mock_process_broker.call_count) processed_paths = [call[0][0].path for call in mock_process_broker.call_args_list] - self.assertEqual({'a/c2', 'a/c3'}, set(processed_paths)) + self.assertEqual({'a/c1', 'a/c2'}, set(processed_paths)) self.assertFalse(sharder.logger.get_lines_for_level('error')) expected_stats = {'attempted': 2, 'success': 2, 'failure': 0, 'skipped': 1, 'completed': 0} self._assert_recon_stats(expected_stats, sharder, 'visited') expected_candidate_stats = { 'found': 1, - 'top': [{'object_count': 10, 'account': 'a', 'container': 'c2', + 'top': [{'object_count': 10, 'account': 'a', 'container': 'c1', 'meta_timestamp': mock.ANY, 'file_size': os.stat(brokers[1].db_file).st_size, - 'path': brokers[1].db_file, 'root': 'a/c2', - 'node_index': 0}]} + 'path': brokers[1].db_file, 'root': 'a/c1', + 'node_index': 1}]} self._assert_recon_stats( expected_candidate_stats, sharder, 'sharding_candidates') self._assert_recon_stats(None, sharder, 'sharding_progress') @@ -487,14 +490,14 @@ class TestSharder(BaseTestSharder): # enable and progress container a/c1 by giving it shard ranges now = next(self.ts_iter) brokers[0].merge_shard_ranges( - [ShardRange('a/c1', now, '', '', state=ShardRange.SHARDING), + [ShardRange('a/c0', now, '', '', state=ShardRange.SHARDING), ShardRange('.s_a/1', now, '', 'b', state=ShardRange.ACTIVE), ShardRange('.s_a/2', now, 'b', 'c', state=ShardRange.CLEAVED), ShardRange('.s_a/3', now, 'c', 'd', state=ShardRange.CREATED), ShardRange('.s_a/4', now, 'd', 'e', state=ShardRange.CREATED), ShardRange('.s_a/5', now, 'e', '', state=ShardRange.FOUND)]) brokers[1].merge_shard_ranges( - [ShardRange('a/c2', now, '', '', state=ShardRange.SHARDING), + [ShardRange('a/c1', now, '', '', state=ShardRange.SHARDING), ShardRange('.s_a/6', now, '', 'b', state=ShardRange.ACTIVE), ShardRange('.s_a/7', now, 'b', 'c', state=ShardRange.ACTIVE), ShardRange('.s_a/8', now, 'c', 'd', state=ShardRange.CLEAVED), @@ -505,9 +508,9 @@ class TestSharder(BaseTestSharder): 0, 'text/plain', 'etag', 0) def mock_processing(broker, node, part): - if broker.path == 'a/c2': + if broker.path == 'a/c1': raise Exception('kapow!') - elif broker.path not in ('a/c1', 'a/c3'): + elif broker.path not in ('a/c0', 'a/c2'): raise BaseException("I don't know how to handle a broker " "for %s" % broker.path) @@ -515,12 +518,14 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker', side_effect=mock_processing ) as mock_process_broker: + sharder._local_device_ids = {'stale_node_id'} sharder._one_shard_cycle(Everything(), Everything()) + self.assertEqual(device_ids, sharder._local_device_ids) self.assertEqual(3, mock_process_broker.call_count) processed_paths = [call[0][0].path for call in mock_process_broker.call_args_list] - self.assertEqual({'a/c1', 'a/c2', 'a/c3'}, set(processed_paths)) + self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths)) lines = sharder.logger.get_lines_for_level('error') self.assertIn('Unhandled exception while processing', lines[0]) self.assertFalse(lines[1:]) @@ -530,27 +535,27 @@ class TestSharder(BaseTestSharder): self._assert_recon_stats(expected_stats, sharder, 'visited') expected_candidate_stats = { 'found': 1, - 'top': [{'object_count': 11, 'account': 'a', 'container': 'c3', + 'top': [{'object_count': 11, 'account': 'a', 'container': 'c2', 'meta_timestamp': mock.ANY, 'file_size': os.stat(brokers[1].db_file).st_size, - 'path': brokers[2].db_file, 'root': 'a/c3', - 'node_index': 0}]} + 'path': brokers[2].db_file, 'root': 'a/c2', + 'node_index': 2}]} self._assert_recon_stats( expected_candidate_stats, sharder, 'sharding_candidates') expected_in_progress_stats = { - 'all': [{'object_count': 0, 'account': 'a', 'container': 'c1', + 'all': [{'object_count': 0, 'account': 'a', 'container': 'c0', 'meta_timestamp': mock.ANY, 'file_size': os.stat(brokers[0].db_file).st_size, - 'path': brokers[0].db_file, 'root': 'a/c1', + 'path': brokers[0].db_file, 'root': 'a/c0', 'node_index': 0, 'found': 1, 'created': 2, 'cleaved': 1, 'active': 1, 'state': 'sharding', 'db_state': 'unsharded', 'error': None}, - {'object_count': 10, 'account': 'a', 'container': 'c2', + {'object_count': 10, 'account': 'a', 'container': 'c1', 'meta_timestamp': mock.ANY, 'file_size': os.stat(brokers[1].db_file).st_size, - 'path': brokers[1].db_file, 'root': 'a/c2', - 'node_index': 0, + 'path': brokers[1].db_file, 'root': 'a/c1', + 'node_index': 1, 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2, 'state': 'sharding', 'db_state': 'unsharded', 'error': 'kapow!'}]} @@ -567,11 +572,14 @@ class TestSharder(BaseTestSharder): with mock.patch.object( sharder, '_process_broker' ) as mock_process_broker: + sharder._local_device_ids = {999} sharder._one_shard_cycle(Everything(), Everything()) + + self.assertEqual(device_ids, sharder._local_device_ids) self.assertEqual(3, mock_process_broker.call_count) processed_paths = [call[0][0].path for call in mock_process_broker.call_args_list] - self.assertEqual({'a/c1', 'a/c2', 'a/c3'}, set(processed_paths)) + self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths)) self.assertFalse(sharder.logger.get_lines_for_level('error')) expected_stats = {'attempted': 3, 'success': 3, 'failure': 0, 'skipped': 0, 'completed': 0} -- cgit v1.2.1