summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlistair Coles <alistairncoles@gmail.com>2018-04-25 17:08:16 +0100
committerAlistair Coles <alistairncoles@gmail.com>2018-04-25 17:36:19 +0100
commitfc58926e5eb2f08299a56968e4914b832bba6c7d (patch)
tree28b62fb7ac89706400e6653b5b2a7a957e90d8d3
parent74356fc3cc9e46597b8ae619ac0d9b8e8a32378b (diff)
downloadswift-fc58926e5eb2f08299a56968e4914b832bba6c7d.tar.gz
Refactor finding sharding and shrinking candidates
Refactor to provide module level functions for the finding of sharding and shrinking candidates, so that these can be used by other callers. Add unit tests. Change-Id: Iada00e63f14238b67aaa818314fa6601eeec624e
-rw-r--r--swift/container/sharder.py233
-rw-r--r--test/unit/container/test_sharder.py212
2 files changed, 323 insertions, 122 deletions
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index cf51a1842..400794bee 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -109,6 +109,89 @@ def find_overlapping_ranges(shard_ranges):
return result
+def is_sharding_candidate(shard_range, threshold):
+ return (shard_range.state == ShardRange.ACTIVE and
+ shard_range.object_count >= threshold)
+
+
+def find_sharding_candidates(broker, threshold, shard_ranges=None):
+ # this should only execute on root containers; the goal is to find
+ # large shard containers that should be sharded.
+ # First cut is simple: assume root container shard usage stats are good
+ # enough to make decision.
+ # TODO: object counts may well not be the appropriate metric for
+ # deciding to shrink because a shard with low object_count may have a
+ # large number of deleted object rows that will need to be merged with
+ # a neighbour. We may need to expose row count as well as object count.
+ if shard_ranges is None:
+ shard_ranges = broker.get_shard_ranges(states=[ShardRange.ACTIVE])
+ candidates = []
+ for shard_range in shard_ranges:
+ if not is_sharding_candidate(shard_range, threshold):
+ continue
+ shard_range.update_state(ShardRange.SHARDING,
+ state_timestamp=Timestamp.now())
+ shard_range.epoch = shard_range.state_timestamp
+ candidates.append(shard_range)
+ return candidates
+
+
+def find_shrinking_candidates(broker, shrink_threshold, merge_size):
+ # this should only execute on root containers that have sharded; the
+ # goal is to find small shard containers that could be retired by
+ # merging with a neighbour.
+ # First cut is simple: assume root container shard usage stats are good
+ # enough to make decision; only merge with upper neighbour so that
+ # upper bounds never change (shard names include upper bound).
+ # TODO: object counts may well not be the appropriate metric for
+ # deciding to shrink because a shard with low object_count may have a
+ # large number of deleted object rows that will need to be merged with
+ # a neighbour. We may need to expose row count as well as object count.
+ shard_ranges = broker.get_shard_ranges()
+ own_shard_range = broker.get_own_shard_range()
+ if len(shard_ranges) == 1:
+ # special case to enable final shard to shrink into root
+ shard_ranges.append(own_shard_range)
+
+ merge_pairs = {}
+ for donor, acceptor in zip(shard_ranges, shard_ranges[1:]):
+ if donor in merge_pairs:
+ # this range may already have been made an acceptor; if so then
+ # move on. In principle it might be that even after expansion
+ # this range and its donor(s) could all be merged with the next
+ # range. In practice it is much easier to reason about a single
+ # donor merging into a single acceptor. Don't fret - eventually
+ # all the small ranges will be retired.
+ continue
+ if (acceptor.name != own_shard_range.name and
+ acceptor.state != ShardRange.ACTIVE):
+ # don't shrink into a range that is not yet ACTIVE
+ continue
+ if donor.state not in (ShardRange.ACTIVE, ShardRange.SHRINKING):
+ # found? created? sharded? don't touch it
+ continue
+
+ proposed_object_count = donor.object_count + acceptor.object_count
+ if (donor.state == ShardRange.SHRINKING or
+ (donor.object_count < shrink_threshold and
+ proposed_object_count < merge_size)):
+ # include previously identified merge pairs on presumption that
+ # following shrink procedure is idempotent
+ merge_pairs[acceptor] = donor
+ if donor.update_state(ShardRange.SHRINKING):
+ # Set donor state to shrinking so that next cycle won't use
+ # it as an acceptor; state_timestamp defines new epoch for
+ # donor and new timestamp for the expanded acceptor below.
+ donor.epoch = donor.state_timestamp = Timestamp.now()
+ if acceptor.lower != donor.lower:
+ # Update the acceptor container with its expanding state to
+ # prevent it treating objects cleaved from the donor
+ # as misplaced.
+ acceptor.lower = donor.lower
+ acceptor.timestamp = donor.state_timestamp
+ return merge_pairs
+
+
class CleavingContext(object):
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
last_cleave_to_row=None, cleaving_done=False,
@@ -177,6 +260,11 @@ class CleavingContext(object):
self.max_row == self.cleave_to_row))
+DEFAULT_SHARD_CONTAINER_SIZE = 10000000
+DEFAULT_SHARD_SHRINK_POINT = 25
+DEFAULT_SHARD_MERGE_POINT = 75
+
+
class ContainerSharder(ContainerReplicator):
"""Shards containers."""
@@ -186,18 +274,19 @@ class ContainerSharder(ContainerReplicator):
self.shards_account_prefix = (
(conf.get('auto_create_account_prefix') or '.') + 'shards_')
- try:
- self.shard_shrink_point = config_float_value(
- conf.get('shard_shrink_point', 25), 0, 100) / 100.0
- except ValueError as err:
- raise ValueError(err.message + ": shard_shrink_point")
- try:
- self.shrink_merge_point = config_float_value(
- conf.get('shard_shrink_merge_point', 75), 0, 100) / 100.0
- except ValueError as err:
- raise ValueError(err.message + ": shard_shrink_merge_point")
+ def percent_value(key, default):
+ try:
+ value = conf.get(key, default)
+ return config_float_value(value, 0, 100) / 100.0
+ except ValueError as err:
+ raise ValueError("%s: %s" % (str(err), key))
+
+ self.shard_shrink_point = percent_value('shard_shrink_point',
+ DEFAULT_SHARD_SHRINK_POINT)
+ self.shrink_merge_point = percent_value('shard_shrink_merge_point',
+ DEFAULT_SHARD_MERGE_POINT)
self.shard_container_size = config_positive_int_value(
- conf.get('shard_container_size', 10000000))
+ conf.get('shard_container_size', DEFAULT_SHARD_CONTAINER_SIZE))
self.shrink_size = self.shard_container_size * self.shard_shrink_point
self.merge_size = self.shard_container_size * self.shrink_merge_point
self.split_size = self.shard_container_size // 2
@@ -282,7 +371,7 @@ class ContainerSharder(ContainerReplicator):
def _identify_sharding_candidate(self, broker, node):
own_shard_range = broker.get_own_shard_range()
- if self._is_sharding_candidate(own_shard_range):
+ if is_sharding_candidate(own_shard_range, self.shard_container_size):
self.sharding_candidates.append(
self._make_stats_info(broker, node, own_shard_range))
@@ -1135,108 +1224,29 @@ class ContainerSharder(ContainerReplicator):
return False
- def _is_sharding_candidate(self, shard_range):
- return (shard_range.state == ShardRange.ACTIVE and
- shard_range.object_count >= self.shard_container_size)
-
- def _find_sharding_candidates(self, broker, shard_ranges=None):
- # this should only execute on root containers; the goal is to find
- # large shard containers that should be sharded.
- # First cut is simple: assume root container shard usage stats are good
- # enough to make decision.
- # TODO: object counts may well not be the appropriate metric for
- # deciding to shrink because a shard with low object_count may have a
- # large number of deleted object rows that will need to be merged with
- # a neighbour. We may need to expose row count as well as object count.
- if shard_ranges is None:
- shard_ranges = broker.get_shard_ranges(states=[ShardRange.ACTIVE])
- candidates = []
- for shard_range in shard_ranges:
- if not self._is_sharding_candidate(shard_range):
- continue
- shard_range.update_state(ShardRange.SHARDING,
- state_timestamp=Timestamp.now())
- shard_range.epoch = shard_range.state_timestamp
- candidates.append(shard_range)
- broker.merge_shard_ranges(candidates)
- return len(candidates)
-
- def _find_shrinks(self, broker):
- # this should only execute on root containers that have sharded; the
- # goal is to find small shard containers that could be retired by
- # merging with a neighbour.
- # First cut is simple: assume root container shard usage stats are good
- # enough to make decision; only merge with upper neighbour so that
- # upper bounds never change (shard names include upper bound).
- # TODO: object counts may well not be the appropriate metric for
- # deciding to shrink because a shard with low object_count may have a
- # large number of deleted object rows that will need to be merged with
- # a neighbour. We may need to expose row count as well as object count.
+ def _find_and_enable_sharding_candidates(self, broker, shard_ranges=None):
+ candidates = find_sharding_candidates(
+ broker, self.shard_container_size, shard_ranges)
+ if candidates:
+ self.logger.debug('Identified %s sharding candidates'
+ % len(candidates))
+ broker.merge_shard_ranges(candidates)
+
+ def _find_and_enable_shrinking_candidates(self, broker):
if not broker.is_sharded():
- self.logger.warning(
- 'Cannot shrink a not yet sharded container %s/%s',
- broker.account, broker.container)
+ self.logger.warning('Cannot shrink a not yet sharded container %s',
+ broker.path)
+ return
- shard_ranges = broker.get_shard_ranges()
+ merge_pairs = find_shrinking_candidates(
+ broker, self.shrink_size, self.merge_size)
+ self.logger.debug('Found %s shrinking candidates' % len(merge_pairs))
own_shard_range = broker.get_own_shard_range()
- if len(shard_ranges) == 1:
- # special case to enable final shard to shrink into root
- shard_ranges.append(own_shard_range)
-
- merge_pairs = {}
- for donor, acceptor in zip(shard_ranges, shard_ranges[1:]):
- self.logger.debug('considering shrink donor %r %s' %
- (donor, donor.name))
- self.logger.debug('considering shrink acceptor %r %s' %
- (acceptor, acceptor.name))
- if donor in merge_pairs:
- # this range may already have been made an acceptor; if so then
- # move on. In principle it might be that even after expansion
- # this range and its donor(s) could all be merged with the next
- # range. In practice it is much easier to reason about a single
- # donor merging into a single acceptor. Don't fret - eventually
- # all the small ranges will be retired.
- continue
- if (acceptor is not own_shard_range and
- acceptor.state != ShardRange.ACTIVE):
- # don't shrink into a range that is not yet ACTIVE
- continue
- if donor.state not in (ShardRange.ACTIVE, ShardRange.SHRINKING):
- # found? created? sharded? don't touch it
- continue
-
- proposed_object_count = donor.object_count + acceptor.object_count
- if (donor.state == ShardRange.SHRINKING or
- (donor.object_count < self.shrink_size and
- proposed_object_count < self.merge_size)):
- # include previously identified merge pairs on presumption that
- # following shrink procedure is idempotent
- merge_pairs[acceptor] = donor
- self.logger.debug('selecting shrink pair %r %r' %
- (donor, acceptor))
-
- # TODO: think long and hard about the correct order for these remaining
- # operations and what happens when one fails...
for acceptor, donor in merge_pairs.items():
- # TODO: unit test to verify idempotent nature of this procedure
self.logger.debug('shrinking shard range %s into %s in %s' %
(donor, acceptor, broker.db_file))
- modified_shard_ranges = []
- if donor.update_state(ShardRange.SHRINKING):
- # Set donor state to shrinking so that next cycle won't use it
- # as an acceptor; state_timestamp defines new epoch for donor
- # and new timestamp for the expanded acceptor below.
- donor.epoch = donor.state_timestamp = Timestamp.now()
- modified_shard_ranges.append(donor)
- if acceptor.lower != donor.lower:
- # Update the acceptor container with its expanding state to
- # prevent it treating objects cleaved from the donor
- # as misplaced.
- acceptor.lower = donor.lower
- acceptor.timestamp = donor.state_timestamp
- modified_shard_ranges.append(acceptor)
- broker.merge_shard_ranges(modified_shard_ranges)
- if acceptor is not own_shard_range:
+ broker.merge_shard_ranges([acceptor, donor])
+ if acceptor.name != own_shard_range.name:
self._send_shard_ranges(
acceptor.account, acceptor.container, [acceptor])
acceptor.increment_meta(donor.object_count, donor.bytes_used)
@@ -1292,7 +1302,7 @@ class ContainerSharder(ContainerReplicator):
if state in (UNSHARDED, COLLAPSED):
if is_leader and broker.is_root_container():
# bootstrap sharding of root container
- self._find_sharding_candidates(
+ self._find_and_enable_sharding_candidates(
broker, shard_ranges=[broker.get_own_shard_range()])
own_shard_range = broker.get_own_shard_range()
@@ -1340,9 +1350,8 @@ class ContainerSharder(ContainerReplicator):
if state == SHARDED and broker.is_root_container():
if is_leader:
- self._find_shrinks(broker)
- self._find_sharding_candidates(broker)
-
+ self._find_and_enable_shrinking_candidates(broker)
+ self._find_and_enable_sharding_candidates(broker)
for shard_range in broker.get_shard_ranges(
states=[ShardRange.SHARDING]):
self._send_shard_ranges(
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index b2f665d14..0fdcfd20c 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -35,7 +35,7 @@ from swift.common import internal_client
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
SHARDED
from swift.container.sharder import ContainerSharder, sharding_enabled, \
- CleavingContext
+ CleavingContext, DEFAULT_SHARD_SHRINK_POINT, DEFAULT_SHARD_CONTAINER_SIZE
from swift.common.utils import ShardRange, Timestamp, hash_path, \
encode_timestamps, parse_db_filename, quorum_size, Everything
from test import annotate_failure
@@ -88,9 +88,10 @@ class BaseTestSharder(unittest.TestCase):
self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check
return broker
- def _make_shard_ranges(self, bounds, state=None):
+ def _make_shard_ranges(self, bounds, state=None, object_count=0):
return [ShardRange('.shards_a/c_%s' % upper, Timestamp.now(),
- lower, upper, state=state)
+ lower, upper, state=state,
+ object_count=object_count)
for lower, upper in bounds]
def ts_encoded(self):
@@ -310,13 +311,16 @@ class TestSharder(BaseTestSharder):
with mock.patch(
'swift.container.sharder.time.sleep') as mock_sleep:
with mock.patch(
- 'swift.container.sharder.dump_recon_cache',
- mock_dump_recon_cache):
- fake_time.return_value = next(fake_periods_iter)
- sharder._is_sharding_candidate = lambda x: True
- sharder._process_broker = fake_process_broker
- with self.assertRaises(Exception) as cm:
- sharder.run_forever()
+ 'swift.container.sharder.is_sharding_candidate',
+ return_value=True):
+ with mock.patch(
+ 'swift.container.sharder.dump_recon_cache',
+ mock_dump_recon_cache):
+ fake_time.return_value = next(fake_periods_iter)
+ sharder._is_sharding_candidate = lambda x: True
+ sharder._process_broker = fake_process_broker
+ with self.assertRaises(Exception) as cm:
+ sharder.run_forever()
self.assertEqual('Test over', cm.exception.message)
# four cycles are started, two brokers visited per cycle, but
@@ -3566,6 +3570,194 @@ class TestSharder(BaseTestSharder):
assert_ok()
self.assertTrue(broker.is_deleted())
+ def test_find_and_enable_sharding_candidates(self):
+ broker = self._make_broker()
+ own_sr = broker.get_own_shard_range()
+ own_sr.update_state(ShardRange.SHARDING)
+ own_sr.epoch = next(self.ts_iter)
+ shard_bounds = (('', 'here'), ('here', 'there'), ('there', ''))
+ shard_ranges = self._make_shard_ranges(
+ shard_bounds, state=ShardRange.CLEAVED)
+ shard_ranges[0].state = ShardRange.ACTIVE
+ broker.merge_shard_ranges(shard_ranges + [own_sr])
+ self.assertTrue(broker.set_sharding_state())
+ self.assertTrue(broker.set_sharded_state())
+ with self._mock_sharder() as sharder:
+ sharder._find_and_enable_sharding_candidates(broker)
+
+ # one range just below threshold
+ shard_ranges[0].update_meta(sharder.shard_container_size - 1, 0)
+ broker.merge_shard_ranges(shard_ranges[0])
+ with self._mock_sharder() as sharder:
+ sharder._find_and_enable_sharding_candidates(broker)
+ self._assert_shard_ranges_equal(shard_ranges,
+ broker.get_shard_ranges())
+
+ # two ranges above threshold, only one ACTIVE
+ shard_ranges[0].update_meta(sharder.shard_container_size, 0)
+ shard_ranges[2].update_meta(sharder.shard_container_size + 1, 0)
+ broker.merge_shard_ranges([shard_ranges[0], shard_ranges[2]])
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ sharder._find_and_enable_sharding_candidates(broker)
+ expected = shard_ranges[0].copy(state=ShardRange.SHARDING,
+ state_timestamp=now, epoch=now)
+ self._assert_shard_ranges_equal([expected] + shard_ranges[1:],
+ broker.get_shard_ranges())
+
+ # check idempotency
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ sharder._find_and_enable_sharding_candidates(broker)
+ self._assert_shard_ranges_equal([expected] + shard_ranges[1:],
+ broker.get_shard_ranges())
+
+ # two ranges above threshold, both ACTIVE
+ shard_ranges[2].update_state(ShardRange.ACTIVE)
+ broker.merge_shard_ranges(shard_ranges[2])
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ sharder._find_and_enable_sharding_candidates(broker)
+ expected_2 = shard_ranges[2].copy(state=ShardRange.SHARDING,
+ state_timestamp=now, epoch=now)
+ self._assert_shard_ranges_equal(
+ [expected, shard_ranges[1], expected_2], broker.get_shard_ranges())
+
+ # check idempotency
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ sharder._find_and_enable_sharding_candidates(broker)
+ self._assert_shard_ranges_equal(
+ [expected, shard_ranges[1], expected_2], broker.get_shard_ranges())
+
+ def test_find_and_enable_sharding_candidates_bootstrap(self):
+ broker = self._make_broker()
+ with self._mock_sharder(conf={'shard_container_size': 1}) as sharder:
+ sharder._find_and_enable_sharding_candidates(broker)
+ self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state)
+ broker.put_object('obj', next(self.ts_iter).internal, 1, '', '')
+ self.assertEqual(1, broker.get_info()['object_count'])
+ with self._mock_sharder(conf={'shard_container_size': 1}) as sharder:
+ with mock_timestamp_now() as now:
+ sharder._find_and_enable_sharding_candidates(
+ broker, [broker.get_own_shard_range()])
+ own_sr = broker.get_own_shard_range()
+ self.assertEqual(ShardRange.SHARDING, own_sr.state)
+ self.assertEqual(now, own_sr.state_timestamp)
+ self.assertEqual(now, own_sr.epoch)
+
+ # check idempotency
+ with self._mock_sharder(conf={'shard_container_size': 1}) as sharder:
+ with mock_timestamp_now():
+ sharder._find_and_enable_sharding_candidates(
+ broker, [broker.get_own_shard_range()])
+ own_sr = broker.get_own_shard_range()
+ self.assertEqual(ShardRange.SHARDING, own_sr.state)
+ self.assertEqual(now, own_sr.state_timestamp)
+ self.assertEqual(now, own_sr.epoch)
+
+ def test_find_and_enable_shrinking_candidates(self):
+ broker = self._make_broker()
+ own_sr = broker.get_own_shard_range()
+ own_sr.update_state(ShardRange.SHARDING)
+ own_sr.epoch = next(self.ts_iter)
+ shard_bounds = (('', 'here'), ('here', 'there'), ('there', ''))
+ size = DEFAULT_SHARD_SHRINK_POINT * DEFAULT_SHARD_CONTAINER_SIZE / 100
+ shard_ranges = self._make_shard_ranges(
+ shard_bounds, state=ShardRange.ACTIVE, object_count=size)
+ broker.merge_shard_ranges(shard_ranges + [own_sr])
+ self.assertTrue(broker.set_sharding_state())
+ self.assertTrue(broker.set_sharded_state())
+ with self._mock_sharder() as sharder:
+ sharder._find_and_enable_shrinking_candidates(broker)
+ self._assert_shard_ranges_equal(shard_ranges,
+ broker.get_shard_ranges())
+
+ # one range just below threshold
+ shard_ranges[0].update_meta(size - 1, 0)
+ broker.merge_shard_ranges(shard_ranges[0])
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ sharder._send_shard_ranges = mock.MagicMock()
+ sharder._find_and_enable_shrinking_candidates(broker)
+ acceptor = shard_ranges[1].copy(lower=shard_ranges[0].lower)
+ acceptor.timestamp = now
+ donor = shard_ranges[0].copy(state=ShardRange.SHRINKING,
+ state_timestamp=now, epoch=now)
+ self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]],
+ broker.get_shard_ranges())
+ sharder._send_shard_ranges.assert_has_calls(
+ [mock.call(acceptor.account, acceptor.container, [acceptor]),
+ mock.call(donor.account, donor.container, [donor, acceptor])]
+ )
+
+ # check idempotency
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ sharder._send_shard_ranges = mock.MagicMock()
+ sharder._find_and_enable_shrinking_candidates(broker)
+ self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]],
+ broker.get_shard_ranges())
+ sharder._send_shard_ranges.assert_has_calls(
+ [mock.call(acceptor.account, acceptor.container, [acceptor]),
+ mock.call(donor.account, donor.container, [donor, acceptor])]
+ )
+
+ # acceptor falls below threshold - not a candidate
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ acceptor.update_meta(0, 0, meta_timestamp=now)
+ broker.merge_shard_ranges(acceptor)
+ sharder._send_shard_ranges = mock.MagicMock()
+ sharder._find_and_enable_shrinking_candidates(broker)
+ self._assert_shard_ranges_equal([donor, acceptor, shard_ranges[2]],
+ broker.get_shard_ranges())
+ sharder._send_shard_ranges.assert_has_calls(
+ [mock.call(acceptor.account, acceptor.container, [acceptor]),
+ mock.call(donor.account, donor.container, [donor, acceptor])]
+ )
+
+ # ...until donor has shrunk
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ donor.update_state(ShardRange.SHARDED, state_timestamp=now)
+ donor.set_deleted(timestamp=now)
+ broker.merge_shard_ranges(donor)
+ sharder._send_shard_ranges = mock.MagicMock()
+ sharder._find_and_enable_shrinking_candidates(broker)
+ new_acceptor = shard_ranges[2].copy(lower=acceptor.lower)
+ new_acceptor.timestamp = now
+ new_donor = acceptor.copy(state=ShardRange.SHRINKING,
+ state_timestamp=now, epoch=now)
+ self._assert_shard_ranges_equal(
+ [donor, new_donor, new_acceptor],
+ broker.get_shard_ranges(include_deleted=True))
+ sharder._send_shard_ranges.assert_has_calls(
+ [mock.call(new_acceptor.account, new_acceptor.container,
+ [new_acceptor]),
+ mock.call(new_donor.account, new_donor.container,
+ [new_donor, new_acceptor])]
+ )
+
+ # ..finally last shard shrinks to root
+ with self._mock_sharder() as sharder:
+ with mock_timestamp_now() as now:
+ new_donor.update_state(ShardRange.SHARDED, state_timestamp=now)
+ new_donor.set_deleted(timestamp=now)
+ new_acceptor.update_meta(0, 0, meta_timestamp=now)
+ broker.merge_shard_ranges([new_donor, new_acceptor])
+ sharder._send_shard_ranges = mock.MagicMock()
+ sharder._find_and_enable_shrinking_candidates(broker)
+ final_donor = new_acceptor.copy(state=ShardRange.SHRINKING,
+ state_timestamp=now, epoch=now)
+ self._assert_shard_ranges_equal(
+ [donor, new_donor, final_donor],
+ broker.get_shard_ranges(include_deleted=True))
+ sharder._send_shard_ranges.assert_has_calls(
+ [mock.call(final_donor.account, final_donor.container,
+ [final_donor, broker.get_own_shard_range()])]
+ )
+
class TestCleavingContext(BaseTestSharder):
def test_init(self):