summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/container-server.conf-sample11
-rw-r--r--swift/container/sharder.py26
-rw-r--r--test/probe/test_sharder.py62
-rw-r--r--test/unit/container/test_sharder.py157
-rw-r--r--tools/playbooks/common/install_dependencies.yaml1
5 files changed, 230 insertions, 27 deletions
diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample
index a337e7149..09f3bc7e9 100644
--- a/etc/container-server.conf-sample
+++ b/etc/container-server.conf-sample
@@ -440,6 +440,17 @@ use = egg:swift#xprofile
# integer value. A negative value implies no limit.
# recon_candidates_limit = 5
#
+# As the sharder visits each container that's currently sharding it dumps to
+# recon their current progress. To be able to mark their progress as completed
+# this in-progress check will need to monitor containers that have just
+# completed sharding. The recon_sharded_timeout parameter says for how long a
+# container whose just finished sharding should be checked by the in-progress
+# check. This is to allow anything monitoring the sharding recon dump to have
+# enough time to collate and see things complete. The time is capped at
+# reclaim_age, so this parameter should be less than or equal to reclaim_age.
+# The default is 12 hours (12 x 60 x 60)
+# recon_sharded_timeout = 43200
+#
# Large databases tend to take a while to work with, but we want to make sure
# we write down our progress. Use a larger-than-normal broker timeout to make
# us less likely to bomb out on a LockTimeout.
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index 0bfa42a23..d577f0316 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -518,6 +518,8 @@ class ContainerSharder(ContainerReplicator):
self.shrinking_candidates = []
self.recon_candidates_limit = int(
conf.get('recon_candidates_limit', 5))
+ self.recon_sharded_timeout = int(
+ conf.get('recon_sharded_timeout', 43200))
self.broker_timeout = config_positive_int_value(
conf.get('broker_timeout', 60))
replica_count = self.ring.replica_count
@@ -645,9 +647,19 @@ class ContainerSharder(ContainerReplicator):
def _record_sharding_progress(self, broker, node, error):
own_shard_range = broker.get_own_shard_range()
- if (broker.get_db_state() in (UNSHARDED, SHARDING) and
- own_shard_range.state in (ShardRange.SHARDING,
- ShardRange.SHARDED)):
+ db_state = broker.get_db_state()
+ if (db_state in (UNSHARDED, SHARDING, SHARDED)
+ and own_shard_range.state in (ShardRange.SHARDING,
+ ShardRange.SHARDED)):
+ if db_state == SHARDED:
+ context_ts = max([float(ts) for c, ts in
+ CleavingContext.load_all(broker)]) or None
+ if not context_ts or (context_ts + self.recon_sharded_timeout
+ < Timestamp.now().timestamp):
+ # not contexts or last context timestamp too old for the
+ # broker to be recorded
+ return
+
info = self._make_stats_info(broker, node, own_shard_range)
info['state'] = own_shard_range.state_text
info['db_state'] = broker.get_db_state()
@@ -995,8 +1007,11 @@ class ContainerSharder(ContainerReplicator):
def _audit_cleave_contexts(self, broker):
now = Timestamp.now()
for context, last_mod in CleavingContext.load_all(broker):
- if Timestamp(last_mod).timestamp + self.reclaim_age < \
- now.timestamp:
+ last_mod = Timestamp(last_mod)
+ is_done = context.done() and last_mod.timestamp + \
+ self.recon_sharded_timeout < now.timestamp
+ is_stale = last_mod.timestamp + self.reclaim_age < now.timestamp
+ if is_done or is_stale:
context.delete(broker)
def _audit_container(self, broker):
@@ -1646,7 +1661,6 @@ class ContainerSharder(ContainerReplicator):
modified_shard_ranges.append(own_shard_range)
broker.merge_shard_ranges(modified_shard_ranges)
if broker.set_sharded_state():
- cleaving_context.delete(broker)
return True
else:
self.logger.warning(
diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py
index b8154fdf8..3cf785a50 100644
--- a/test/probe/test_sharder.py
+++ b/test/probe/test_sharder.py
@@ -41,6 +41,7 @@ from test.probe.brain import BrainSplitter
from test.probe.common import ReplProbeTest, get_server_number, \
wait_for_server_to_hangup
from test.debug_logger import debug_logger
+import mock
MIN_SHARD_CONTAINER_THRESHOLD = 4
@@ -888,8 +889,10 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
[sr['state'] for sr in orig_root_shard_ranges])
- contexts = list(CleavingContext.load_all(broker))
- self.assertEqual([], contexts) # length check
+ # Contexts should still be there, and should be complete
+ contexts = set([ctx.done()
+ for ctx, _ in CleavingContext.load_all(broker)])
+ self.assertEqual({True}, contexts)
self.direct_delete_container(expect_failure=True)
self.assertLengthEqual(found['normal_dbs'], 2)
@@ -939,9 +942,10 @@ class TestContainerSharding(BaseTestContainerSharding):
orig['state_timestamp'])
self.assertGreaterEqual(updated.meta_timestamp,
orig['meta_timestamp'])
-
- contexts = list(CleavingContext.load_all(broker))
- self.assertEqual([], contexts) # length check
+ # Contexts should still be there, and should be complete
+ contexts = set([ctx.done()
+ for ctx, _ in CleavingContext.load_all(broker)])
+ self.assertEqual({True}, contexts)
# Check that entire listing is available
headers, actual_listing = self.assert_container_listing(obj_names)
@@ -1197,9 +1201,11 @@ class TestContainerSharding(BaseTestContainerSharding):
[ShardRange.ACTIVE] * 3,
[sr.state for sr in broker.get_shard_ranges()])
- # Make sure our cleaving contexts got cleaned up
- contexts = list(CleavingContext.load_all(broker))
- self.assertEqual([], contexts)
+ # Contexts should still be there, and should be complete
+ contexts = set([ctx.done()
+ for ctx, _
+ in CleavingContext.load_all(broker)])
+ self.assertEqual({True}, contexts)
# check root shard ranges
root_shard_ranges = self.direct_get_container_shard_ranges()
@@ -2021,6 +2027,26 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2,
[sr.state for sr in shard_ranges])
+ # Check the current progress. It shouldn't be complete.
+ recon = direct_client.direct_get_recon(leader_node, "sharding")
+ expected_in_progress = {'all': [{'account': 'AUTH_test',
+ 'active': 0,
+ 'cleaved': 2,
+ 'created': 2,
+ 'found': 0,
+ 'db_state': 'sharding',
+ 'state': 'sharding',
+ 'error': None,
+ 'file_size': mock.ANY,
+ 'meta_timestamp': mock.ANY,
+ 'node_index': 0,
+ 'object_count': len(obj_names),
+ 'container': mock.ANY,
+ 'path': mock.ANY,
+ 'root': mock.ANY}]}
+ actual = recon['sharding_stats']['sharding']['sharding_in_progress']
+ self.assertEqual(expected_in_progress, actual)
+
# stop *all* container servers for third shard range
sr_part, sr_node_nums = self.get_part_and_node_numbers(shard_ranges[2])
for node_num in sr_node_nums:
@@ -2078,6 +2104,26 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual([ShardRange.ACTIVE] * 4,
[sr.state for sr in shard_ranges])
+ # Check the leader's progress again, this time is should be complete
+ recon = direct_client.direct_get_recon(leader_node, "sharding")
+ expected_in_progress = {'all': [{'account': 'AUTH_test',
+ 'active': 4,
+ 'cleaved': 0,
+ 'created': 0,
+ 'found': 0,
+ 'db_state': 'sharded',
+ 'state': 'sharded',
+ 'error': None,
+ 'file_size': mock.ANY,
+ 'meta_timestamp': mock.ANY,
+ 'node_index': 0,
+ 'object_count': len(obj_names),
+ 'container': mock.ANY,
+ 'path': mock.ANY,
+ 'root': mock.ANY}]}
+ actual = recon['sharding_stats']['sharding']['sharding_in_progress']
+ self.assertEqual(expected_in_progress, actual)
+
def test_sharded_delete(self):
all_obj_names = self._make_object_names(self.max_shard_size)
self.put_objects(all_obj_names)
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index b7cd4e2a1..c89f0eedf 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -170,6 +170,7 @@ class TestSharder(BaseTestSharder):
'shards_account_prefix': '.shards_',
'auto_shard': False,
'recon_candidates_limit': 5,
+ 'recon_sharded_timeout': 43200,
'shard_replication_quorum': 2,
'existing_shard_replication_quorum': 2
}
@@ -201,6 +202,7 @@ class TestSharder(BaseTestSharder):
'auto_create_account_prefix': '...',
'auto_shard': 'yes',
'recon_candidates_limit': 10,
+ 'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0
}
@@ -223,6 +225,7 @@ class TestSharder(BaseTestSharder):
'shards_account_prefix': '...shards_',
'auto_shard': True,
'recon_candidates_limit': 10,
+ 'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0
}
@@ -694,6 +697,114 @@ class TestSharder(BaseTestSharder):
expected_candidate_stats, sharder, 'sharding_candidates')
self._assert_recon_stats(None, sharder, 'sharding_progress')
+ # let's progress broker 1 (broker[0])
+ brokers[0].enable_sharding(next(self.ts_iter))
+ brokers[0].set_sharding_state()
+ shard_ranges = brokers[0].get_shard_ranges()
+ for sr in shard_ranges[:-1]:
+ sr.update_state(ShardRange.CLEAVED)
+ brokers[0].merge_shard_ranges(shard_ranges)
+
+ with mock.patch('eventlet.sleep'), mock.patch.object(
+ sharder, '_process_broker'
+ ) as mock_process_broker:
+ sharder._local_device_ids = {999}
+ sharder._one_shard_cycle(Everything(), Everything())
+
+ expected_in_progress_stats = {
+ 'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
+ 'meta_timestamp': mock.ANY,
+ 'file_size': os.stat(brokers[0].db_file).st_size,
+ 'path': brokers[0].db_file, 'root': 'a/c0',
+ 'node_index': 0,
+ 'found': 1, 'created': 0, 'cleaved': 3, 'active': 1,
+ 'state': 'sharding', 'db_state': 'sharding',
+ 'error': None},
+ {'object_count': 0, 'account': 'a', 'container': 'c1',
+ 'meta_timestamp': mock.ANY,
+ 'file_size': os.stat(brokers[1].db_file).st_size,
+ 'path': brokers[1].db_file, 'root': 'a/c1',
+ 'node_index': 1,
+ 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
+ 'state': 'sharding', 'db_state': 'unsharded',
+ 'error': None}]}
+ self._assert_stats(
+ expected_in_progress_stats, sharder, 'sharding_in_progress')
+
+ # Now complete sharding broker 1.
+ shard_ranges[-1].update_state(ShardRange.CLEAVED)
+ own_sr = brokers[0].get_own_shard_range()
+ own_sr.update_state(ShardRange.SHARDED)
+ brokers[0].merge_shard_ranges(shard_ranges + [own_sr])
+ # make and complete a cleave context, this is used for the
+ # recon_sharded_timeout timer.
+ cxt = CleavingContext.load(brokers[0])
+ cxt.misplaced_done = cxt.cleaving_done = True
+ ts_now = next(self.ts_iter)
+ with mock_timestamp_now(ts_now):
+ cxt.store(brokers[0])
+ self.assertTrue(brokers[0].set_sharded_state())
+
+ with mock.patch('eventlet.sleep'), \
+ mock.patch.object(sharder, '_process_broker') \
+ as mock_process_broker, mock_timestamp_now(ts_now):
+ sharder._local_device_ids = {999}
+ sharder._one_shard_cycle(Everything(), Everything())
+
+ expected_in_progress_stats = {
+ 'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
+ 'meta_timestamp': mock.ANY,
+ 'file_size': os.stat(brokers[0].db_file).st_size,
+ 'path': brokers[0].db_file, 'root': 'a/c0',
+ 'node_index': 0,
+ 'found': 0, 'created': 0, 'cleaved': 4, 'active': 1,
+ 'state': 'sharded', 'db_state': 'sharded',
+ 'error': None},
+ {'object_count': 0, 'account': 'a', 'container': 'c1',
+ 'meta_timestamp': mock.ANY,
+ 'file_size': os.stat(brokers[1].db_file).st_size,
+ 'path': brokers[1].db_file, 'root': 'a/c1',
+ 'node_index': 1,
+ 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
+ 'state': 'sharding', 'db_state': 'unsharded',
+ 'error': None}]}
+ self._assert_stats(
+ expected_in_progress_stats, sharder, 'sharding_in_progress')
+
+ # one more cycle at recon_sharded_timeout seconds into the
+ # future to check that the completed broker is still reported
+ ts_now = Timestamp(ts_now.timestamp +
+ sharder.recon_sharded_timeout)
+ with mock.patch('eventlet.sleep'), \
+ mock.patch.object(sharder, '_process_broker') \
+ as mock_process_broker, mock_timestamp_now(ts_now):
+ sharder._local_device_ids = {999}
+ sharder._one_shard_cycle(Everything(), Everything())
+ self._assert_stats(
+ expected_in_progress_stats, sharder, 'sharding_in_progress')
+
+ # when we move recon_sharded_timeout + 1 seconds into the future,
+ # broker 1 will be removed from the progress report
+ ts_now = Timestamp(ts_now.timestamp +
+ sharder.recon_sharded_timeout + 1)
+ with mock.patch('eventlet.sleep'), \
+ mock.patch.object(sharder, '_process_broker') \
+ as mock_process_broker, mock_timestamp_now(ts_now):
+ sharder._local_device_ids = {999}
+ sharder._one_shard_cycle(Everything(), Everything())
+
+ expected_in_progress_stats = {
+ 'all': [{'object_count': 0, 'account': 'a', 'container': 'c1',
+ 'meta_timestamp': mock.ANY,
+ 'file_size': os.stat(brokers[1].db_file).st_size,
+ 'path': brokers[1].db_file, 'root': 'a/c1',
+ 'node_index': 1,
+ 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
+ 'state': 'sharding', 'db_state': 'unsharded',
+ 'error': None}]}
+ self._assert_stats(
+ expected_in_progress_stats, sharder, 'sharding_in_progress')
+
def test_ratelimited_roundrobin(self):
n_databases = 100
@@ -5426,21 +5537,21 @@ class TestSharder(BaseTestSharder):
def test_audit_cleave_contexts(self):
- def add_cleave_context(id, last_modified):
+ def add_cleave_context(id, last_modified, cleaving_done):
params = {'ref': id,
'cursor': 'curs',
'max_row': 2,
'cleave_to_row': 2,
'last_cleave_to_row': 1,
- 'cleaving_done': False,
+ 'cleaving_done': cleaving_done,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
- with mock_timestamp_now(Timestamp(last_modified)):
+ with mock_timestamp_now(last_modified):
broker.update_metadata(
{key: (json.dumps(params),
- Timestamp(last_modified).internal)})
+ last_modified.internal)})
def get_context(id, broker):
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
@@ -5449,6 +5560,7 @@ class TestSharder(BaseTestSharder):
return data
reclaim_age = 100
+ recon_sharded_timeout = 50
broker = self._make_broker()
# sanity check
@@ -5456,25 +5568,43 @@ class TestSharder(BaseTestSharder):
self.assertEqual(UNSHARDED, broker.get_db_state())
# Setup some cleaving contexts
- id_old, id_newish = [str(uuid4()) for _ in range(2)]
- contexts = ((id_old, 1),
- (id_newish, reclaim_age // 2))
- for id, last_modified in contexts:
- add_cleave_context(id, last_modified)
+ id_old, id_newish, id_complete = [str(uuid4()) for _ in range(3)]
+ ts_old, ts_newish, ts_complete = (
+ Timestamp(1),
+ Timestamp(reclaim_age // 2),
+ Timestamp(reclaim_age - recon_sharded_timeout))
+ contexts = ((id_old, ts_old, False),
+ (id_newish, ts_newish, False),
+ (id_complete, ts_complete, True))
+ for id, last_modified, cleaving_done in contexts:
+ add_cleave_context(id, last_modified, cleaving_done)
+
+ sharder_conf = {'reclaim_age': str(reclaim_age),
+ 'recon_sharded_timeout': str(recon_sharded_timeout)}
- with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
+ with self._mock_sharder(sharder_conf) as sharder:
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
sharder._audit_cleave_contexts(broker)
+ # old context is stale, ie last modified reached reclaim_age and was
+ # never completed (done).
old_ctx = get_context(id_old, broker)
self.assertEqual(old_ctx, "")
+ # Newish context is almost stale, as in it's been 1/2 reclaim age since
+ # it was last modified yet it's not completed. So it haven't been
+ # cleaned up.
newish_ctx = get_context(id_newish, broker)
self.assertEqual(newish_ctx.ref, id_newish)
- # If we push time another reclaim age later, and they all be removed
- # minus id_missing_lm as it has a later last_modified.
- with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
+ # Complete context is complete (done) and it's been
+ # recon_sharded_timeout time since it was marked completed so it's
+ # been removed
+ complete_ctx = get_context(id_complete, broker)
+ self.assertEqual(complete_ctx, "")
+
+ # If we push time another reclaim age later, they are all removed
+ with self._mock_sharder(sharder_conf) as sharder:
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
sharder._audit_cleave_contexts(broker)
@@ -5853,6 +5983,7 @@ class TestCleavingContext(BaseTestSharder):
# Now let's delete it. When deleted the metadata key will exist, but
# the value will be "" as this means it'll be reaped later.
ctx.delete(broker)
+
sysmeta = broker.get_sharding_sysmeta()
for key, val in sysmeta.items():
if key == "Context-%s" % db_id:
diff --git a/tools/playbooks/common/install_dependencies.yaml b/tools/playbooks/common/install_dependencies.yaml
index 893b7d03b..687607fad 100644
--- a/tools/playbooks/common/install_dependencies.yaml
+++ b/tools/playbooks/common/install_dependencies.yaml
@@ -40,3 +40,4 @@
- nose
- pyeclib
- python-swiftclient
+ - mock