summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2022-07-18 23:57:55 +0000
committerGerrit Code Review <review@openstack.org>2022-07-18 23:57:55 +0000
commitc3aa1ce66c20c0c9148a4d3e22d3ff5bcd12bfdb (patch)
treea191078d3b5a4353ff2077bcbfd7afafec352ec9
parent45e13ff4c56748345c9d31bbee3bc6e7abdb5b43 (diff)
parent57f7145f7379de1f736ff2d904e85918c8166536 (diff)
downloadswift-c3aa1ce66c20c0c9148a4d3e22d3ff5bcd12bfdb.tar.gz
Merge "sharder: always set state to CLEAVED after cleaving"
-rw-r--r--swift/container/sharder.py83
-rw-r--r--test/probe/test_sharder.py75
-rw-r--r--test/unit/container/test_sharder.py7
3 files changed, 124 insertions, 41 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index 958418274..1812de74b 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -1660,6 +1660,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
own_shard_range, shard_broker, put_timestamp,
shard_part, node_id):
+ result = CLEAVE_SUCCESS
start = time.time()
# only cleave from the retiring db - misplaced objects handler will
# deal with any objects in the fresh db
@@ -1685,15 +1686,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
# This was just created; don't need to replicate this
# SR because there was nothing there. So cleanup and
# remove the shard_broker from its hand off location.
- self.delete_db(shard_broker)
- cleaving_context.range_done(shard_range.upper_str)
- if shard_range.upper >= own_shard_range.upper:
- # cleaving complete
- cleaving_context.cleaving_done = True
- cleaving_context.store(broker)
# Because nothing was here we wont count it in the shard
# batch count.
- return CLEAVE_EMPTY
+ result = CLEAVE_EMPTY
# Else, it wasn't newly created by us, and
# we don't know what's in it or why. Let it get
# replicated and counted in the batch count.
@@ -1742,40 +1737,46 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
shard_broker.merge_shard_ranges(shard_range)
replication_quorum = self.shard_replication_quorum
- self.logger.info(
- 'Replicating new shard container %s for %s',
- quote(shard_broker.path), own_shard_range)
-
- success, responses = self._replicate_object(
- shard_part, shard_broker.db_file, node_id)
-
- replication_successes = responses.count(True)
- if (not success and (not responses or
- replication_successes < replication_quorum)):
- # insufficient replication or replication not even attempted;
- # break because we don't want to progress the cleave cursor
- # until each shard range has been successfully cleaved
- self.logger.warning(
- 'Failed to sufficiently replicate cleaved shard %s for %s: '
- '%s successes, %s required.', shard_range, quote(broker.path),
- replication_successes, replication_quorum)
- self._increment_stat('cleaved', 'failure', statsd=True)
- return CLEAVE_FAILED
-
- elapsed = round(time.time() - start, 3)
- self._min_stat('cleaved', 'min_time', elapsed)
- self._max_stat('cleaved', 'max_time', elapsed)
- broker.merge_shard_ranges(shard_range)
- cleaving_context.range_done(shard_range.upper_str)
- if shard_range.upper >= own_shard_range.upper:
- # cleaving complete
- cleaving_context.cleaving_done = True
- cleaving_context.store(broker)
- self.logger.info(
- 'Cleaved %s for shard range %s in %gs.',
- quote(broker.path), shard_range, elapsed)
- self._increment_stat('cleaved', 'success', statsd=True)
- return CLEAVE_SUCCESS
+ if result == CLEAVE_EMPTY:
+ self.delete_db(shard_broker)
+ else: # result == CLEAVE_SUCCESS:
+ self.logger.info(
+ 'Replicating new shard container %s for %s',
+ quote(shard_broker.path), own_shard_range)
+
+ success, responses = self._replicate_object(
+ shard_part, shard_broker.db_file, node_id)
+
+ replication_successes = responses.count(True)
+ if (not success and (not responses or
+ replication_successes < replication_quorum)):
+ # insufficient replication or replication not even attempted;
+ # break because we don't want to progress the cleave cursor
+ # until each shard range has been successfully cleaved
+ self.logger.warning(
+ 'Failed to sufficiently replicate cleaved shard %s for %s:'
+ ' %s successes, %s required.', shard_range,
+ quote(broker.path),
+ replication_successes, replication_quorum)
+ self._increment_stat('cleaved', 'failure', statsd=True)
+ result = CLEAVE_FAILED
+ else:
+ elapsed = round(time.time() - start, 3)
+ self._min_stat('cleaved', 'min_time', elapsed)
+ self._max_stat('cleaved', 'max_time', elapsed)
+ self.logger.info(
+ 'Cleaved %s for shard range %s in %gs.',
+ quote(broker.path), shard_range, elapsed)
+ self._increment_stat('cleaved', 'success', statsd=True)
+
+ if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY):
+ broker.merge_shard_ranges(shard_range)
+ cleaving_context.range_done(shard_range.upper_str)
+ if shard_range.upper >= own_shard_range.upper:
+ # cleaving complete
+ cleaving_context.cleaving_done = True
+ cleaving_context.store(broker)
+ return result
def _cleave_shard_range(self, broker, cleaving_context, shard_range,
own_shard_range):
diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py
index 082f5163e..22a6d4ed2 100644
--- a/test/probe/test_sharder.py
+++ b/test/probe/test_sharder.py
@@ -1332,6 +1332,81 @@ class TestContainerSharding(BaseAutoContainerSharding):
def test_sharded_listing_with_replicators(self):
self._test_sharded_listing(run_replicators=True)
+ def test_listing_under_populated_replica(self):
+ # the leader node and one other primary have all the objects and will
+ # cleave to 4 shard ranges, but the third primary only has 1 object in
+ # the final shard range
+ obj_names = self._make_object_names(2 * self.max_shard_size)
+ self.brain.servers.stop(number=self.brain.node_numbers[2])
+ self.put_objects(obj_names)
+ self.brain.servers.start(number=self.brain.node_numbers[2])
+ subset_obj_names = [obj_names[-1]]
+ self.put_objects(subset_obj_names)
+ self.brain.servers.stop(number=self.brain.node_numbers[2])
+
+ # sanity check: the first 2 primaries will list all objects
+ self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
+
+ # Run sharder on the fully populated nodes, starting with the leader
+ client.post_container(self.url, self.admin_token, self.container_name,
+ headers={'X-Container-Sharding': 'on'})
+ self.sharders.once(number=self.brain.node_numbers[0],
+ additional_args='--partitions=%s' % self.brain.part)
+ self.sharders.once(number=self.brain.node_numbers[1],
+ additional_args='--partitions=%s' % self.brain.part)
+
+ # Verify that the first 2 primary nodes have cleaved the first batch of
+ # 2 shard ranges
+ broker = self.get_broker(self.brain.part, self.brain.nodes[0])
+ self.assertEqual('sharding', broker.get_db_state())
+ shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
+ self.assertLengthEqual(shard_ranges, 4)
+ self.assertEqual([ShardRange.CLEAVED, ShardRange.CLEAVED,
+ ShardRange.CREATED, ShardRange.CREATED],
+ [sr['state'] for sr in shard_ranges])
+ self.assertEqual(
+ {False},
+ set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)]))
+
+ # listing is complete (from the fully populated primaries at least);
+ # the root serves the listing parts for the last 2 shard ranges which
+ # are not yet cleaved
+ self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
+
+ # Run the sharder on the under-populated node to get it fully
+ # cleaved.
+ self.brain.servers.start(number=self.brain.node_numbers[2])
+ Manager(['container-replicator']).once(
+ number=self.brain.node_numbers[2])
+ self.sharders.once(number=self.brain.node_numbers[2],
+ additional_args='--partitions=%s' % self.brain.part)
+
+ broker = self.get_broker(self.brain.part, self.brain.nodes[2])
+ self.assertEqual('sharded', broker.get_db_state())
+ shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
+ self.assertLengthEqual(shard_ranges, 4)
+ self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE,
+ ShardRange.ACTIVE, ShardRange.ACTIVE],
+ [sr['state'] for sr in shard_ranges])
+ self.assertEqual(
+ {True, False},
+ set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)]))
+
+ # Get a consistent view of shard range states then check listing
+ Manager(['container-replicator']).once(
+ number=self.brain.node_numbers[2])
+ # oops, the listing is incomplete because the last 2 listing parts are
+ # now served by the under-populated shard ranges.
+ self.assert_container_listing(
+ obj_names[:self.max_shard_size] + subset_obj_names,
+ req_hdrs={'x-newest': 'true'})
+
+ # but once another replica has completed cleaving the listing is
+ # complete again
+ self.sharders.once(number=self.brain.node_numbers[1],
+ additional_args='--partitions=%s' % self.brain.part)
+ self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
+
def test_async_pendings(self):
obj_names = self._make_object_names(self.max_shard_size * 2)
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index 1ffb0e16a..e0beaceb2 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -1663,6 +1663,9 @@ class TestSharder(BaseTestSharder):
self.assertEqual(cleaving_context.ranges_todo, 0)
self.assertTrue(cleaving_context.cleaving_done)
+ self.assertEqual([ShardRange.CLEAVED] * 3,
+ [sr.state for sr in broker.get_shard_ranges()])
+
def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self):
broker = self._make_broker()
broker.enable_sharding(Timestamp.now())
@@ -1695,6 +1698,10 @@ class TestSharder(BaseTestSharder):
self.assertEqual(cleaving_context.ranges_todo, 2)
self.assertFalse(cleaving_context.cleaving_done)
+ self.assertEqual(
+ [ShardRange.CLEAVED, ShardRange.CREATED, ShardRange.CREATED],
+ [sr.state for sr in broker.get_shard_ranges()])
+
def test_cleave_shard_range_no_own_shard_range(self):
# create an unsharded broker that has shard ranges but no
# own_shard_range, verify that it does not cleave...