summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlistair Coles <alistairncoles@gmail.com>2018-04-30 14:45:31 +0100
committerAlistair Coles <alistairncoles@gmail.com>2018-04-30 14:45:31 +0100
commit73f9f851053ffd37eb0d0fa0278ea198a9ab96c3 (patch)
tree6f1c73db43b66b4526b6403c117d503757cd89fe
parenta09a0358dd8d9c1cf466f6a73260da1c395c7083 (diff)
downloadswift-73f9f851053ffd37eb0d0fa0278ea198a9ab96c3.tar.gz
Reset local device ids on each cycle
Change-Id: Iebfede8423748319924e0729f1149c1348703e2b
-rw-r--r--swift/container/sharder.py3
-rw-r--r--test/unit/container/test_sharder.py50
2 files changed, 31 insertions, 22 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index a13e270d6..b588e4b0b 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -1429,13 +1429,14 @@ class ContainerSharder(ContainerReplicator):
"""
self.logger.info('Container sharder cycle starting, auto-sharding %s',
self.auto_shard)
- self._zero_stats()
if isinstance(devices_to_shard, (list, tuple)):
self.logger.info('(Override devices: %s)',
', '.join(str(d) for d in devices_to_shard))
if isinstance(partitions_to_shard, (list, tuple)):
self.logger.info('(Override partitions: %s)',
', '.join(str(p) for p in partitions_to_shard))
+ self._zero_stats()
+ self._local_device_ids = set()
dirs = []
self.ips = whataremyips(bind_ip=self.bind_ip)
for node in self.ring.devs:
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index 8f9502559..8009a06fb 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -446,10 +446,11 @@ class TestSharder(BaseTestSharder):
sharder.reported = time.time()
sharder.logger = debug_logger()
brokers = []
- for container in ('c1', 'c2', 'c3'):
+ device_ids = set(range(3))
+ for device_id in device_ids:
brokers.append(self._make_broker(
- container=container, hash_=container + 'hash',
- device=sharder.ring.devs[0]['device'], part=0))
+ container='c%s' % device_id, hash_='c%shash' % device_id,
+ device=sharder.ring.devs[device_id]['device'], part=0))
# enable a/c2 and a/c3 for sharding
for broker in brokers[1:]:
broker.update_metadata({'X-Container-Sysmeta-Sharding':
@@ -463,23 +464,25 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
+ sharder._local_device_ids = {'stale_node_id'}
sharder._one_shard_cycle(Everything(), Everything())
+ self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(2, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
- self.assertEqual({'a/c2', 'a/c3'}, set(processed_paths))
+ self.assertEqual({'a/c1', 'a/c2'}, set(processed_paths))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
expected_stats = {'attempted': 2, 'success': 2, 'failure': 0,
'skipped': 1, 'completed': 0}
self._assert_recon_stats(expected_stats, sharder, 'visited')
expected_candidate_stats = {
'found': 1,
- 'top': [{'object_count': 10, 'account': 'a', 'container': 'c2',
+ 'top': [{'object_count': 10, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
- 'path': brokers[1].db_file, 'root': 'a/c2',
- 'node_index': 0}]}
+ 'path': brokers[1].db_file, 'root': 'a/c1',
+ 'node_index': 1}]}
self._assert_recon_stats(
expected_candidate_stats, sharder, 'sharding_candidates')
self._assert_recon_stats(None, sharder, 'sharding_progress')
@@ -487,14 +490,14 @@ class TestSharder(BaseTestSharder):
# enable and progress container a/c1 by giving it shard ranges
now = next(self.ts_iter)
brokers[0].merge_shard_ranges(
- [ShardRange('a/c1', now, '', '', state=ShardRange.SHARDING),
+ [ShardRange('a/c0', now, '', '', state=ShardRange.SHARDING),
ShardRange('.s_a/1', now, '', 'b', state=ShardRange.ACTIVE),
ShardRange('.s_a/2', now, 'b', 'c', state=ShardRange.CLEAVED),
ShardRange('.s_a/3', now, 'c', 'd', state=ShardRange.CREATED),
ShardRange('.s_a/4', now, 'd', 'e', state=ShardRange.CREATED),
ShardRange('.s_a/5', now, 'e', '', state=ShardRange.FOUND)])
brokers[1].merge_shard_ranges(
- [ShardRange('a/c2', now, '', '', state=ShardRange.SHARDING),
+ [ShardRange('a/c1', now, '', '', state=ShardRange.SHARDING),
ShardRange('.s_a/6', now, '', 'b', state=ShardRange.ACTIVE),
ShardRange('.s_a/7', now, 'b', 'c', state=ShardRange.ACTIVE),
ShardRange('.s_a/8', now, 'c', 'd', state=ShardRange.CLEAVED),
@@ -505,9 +508,9 @@ class TestSharder(BaseTestSharder):
0, 'text/plain', 'etag', 0)
def mock_processing(broker, node, part):
- if broker.path == 'a/c2':
+ if broker.path == 'a/c1':
raise Exception('kapow!')
- elif broker.path not in ('a/c1', 'a/c3'):
+ elif broker.path not in ('a/c0', 'a/c2'):
raise BaseException("I don't know how to handle a broker "
"for %s" % broker.path)
@@ -515,12 +518,14 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker', side_effect=mock_processing
) as mock_process_broker:
+ sharder._local_device_ids = {'stale_node_id'}
sharder._one_shard_cycle(Everything(), Everything())
+ self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
- self.assertEqual({'a/c1', 'a/c2', 'a/c3'}, set(processed_paths))
+ self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths))
lines = sharder.logger.get_lines_for_level('error')
self.assertIn('Unhandled exception while processing', lines[0])
self.assertFalse(lines[1:])
@@ -530,27 +535,27 @@ class TestSharder(BaseTestSharder):
self._assert_recon_stats(expected_stats, sharder, 'visited')
expected_candidate_stats = {
'found': 1,
- 'top': [{'object_count': 11, 'account': 'a', 'container': 'c3',
+ 'top': [{'object_count': 11, 'account': 'a', 'container': 'c2',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
- 'path': brokers[2].db_file, 'root': 'a/c3',
- 'node_index': 0}]}
+ 'path': brokers[2].db_file, 'root': 'a/c2',
+ 'node_index': 2}]}
self._assert_recon_stats(
expected_candidate_stats, sharder, 'sharding_candidates')
expected_in_progress_stats = {
- 'all': [{'object_count': 0, 'account': 'a', 'container': 'c1',
+ 'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[0].db_file).st_size,
- 'path': brokers[0].db_file, 'root': 'a/c1',
+ 'path': brokers[0].db_file, 'root': 'a/c0',
'node_index': 0,
'found': 1, 'created': 2, 'cleaved': 1, 'active': 1,
'state': 'sharding', 'db_state': 'unsharded',
'error': None},
- {'object_count': 10, 'account': 'a', 'container': 'c2',
+ {'object_count': 10, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
- 'path': brokers[1].db_file, 'root': 'a/c2',
- 'node_index': 0,
+ 'path': brokers[1].db_file, 'root': 'a/c1',
+ 'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': 'kapow!'}]}
@@ -567,11 +572,14 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
+ sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
+
+ self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
- self.assertEqual({'a/c1', 'a/c2', 'a/c3'}, set(processed_paths))
+ self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
expected_stats = {'attempted': 3, 'success': 3, 'failure': 0,
'skipped': 0, 'completed': 0}