summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Oliver <matt@oliver.net.au>2022-07-21 12:32:27 +1000
committerAlistair Coles <alistairncoles@gmail.com>2022-07-29 15:02:26 +0100
commitc4e00eb89f34de79d5fb123dd044621ef4df679c (patch)
tree6c9cb018bc19c4b4592b376087d09928cb59084d
parentd7a931191b805f9f14f57d91532d88bba88cf362 (diff)
downloadswift-c4e00eb89f34de79d5fb123dd044621ef4df679c.tar.gz
Sharder: Fall back to local device in get_shard_broker
If the sharder is processing a node that has 0 weight, especially for all the devices on the node, the `find_local_handoff_for_part` can fail because there will be no local hand off devices available as it uses the replica2part2dev_id to find a device. However, a 0 weighted device won't appear in the replica2part2dev table. This patch extends `find_local_handoff_for_part`, if it fails to find a node from the ring it'll fall back to a local device identified by the `_local_device_ids` that is built up when the replicator or sharder was identifing local devices. This uses the ring.devs, so does include 0 weighted devices. This allows the sharder to find a location to write the shard_broker in a handoff location while sharding. Co-Authored-By: Tim Burke <tim.burke@gmail.com> Change-Id: Ic38698e9ca0397770c7362229baef1101a72788f
-rw-r--r--swift/common/db_replicator.py6
-rw-r--r--swift/container/replicator.py43
-rw-r--r--swift/container/sharder.py38
-rw-r--r--test/unit/__init__.py1
-rw-r--r--test/unit/common/test_db_replicator.py4
-rw-r--r--test/unit/container/test_replicator.py69
-rw-r--r--test/unit/container/test_sharder.py96
7 files changed, 193 insertions, 64 deletions
diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py
index 0db65b2a3..3e82e2dd8 100644
--- a/swift/common/db_replicator.py
+++ b/swift/common/db_replicator.py
@@ -196,7 +196,7 @@ class Replicator(Daemon):
self.cpool = GreenPool(size=concurrency)
swift_dir = conf.get('swift_dir', '/etc/swift')
self.ring = ring.Ring(swift_dir, ring_name=self.server_type)
- self._local_device_ids = set()
+ self._local_device_ids = {}
self.per_diff = int(conf.get('per_diff', 1000))
self.max_diffs = int(conf.get('max_diffs') or 100)
self.interval = float(conf.get('interval') or
@@ -795,7 +795,7 @@ class Replicator(Daemon):
'These modes are not intended for normal '
'operation; use these options with care.')
- self._local_device_ids = set()
+ self._local_device_ids = {}
found_local = False
for node in self.ring.devs:
if node and is_local_device(ips, self.port,
@@ -822,7 +822,7 @@ class Replicator(Daemon):
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
- self._local_device_ids.add(node['id'])
+ self._local_device_ids[node['id']] = node
part_filt = self._partition_dir_filter(
node['id'], partitions_to_replicate)
dirs.append((datadir, node['id'], part_filt))
diff --git a/swift/container/replicator.py b/swift/container/replicator.py
index d45a5e8df..ab1350865 100644
--- a/swift/container/replicator.py
+++ b/swift/container/replicator.py
@@ -14,10 +14,10 @@
# limitations under the License.
import os
-import itertools
import json
from collections import defaultdict
from eventlet import Timeout
+from random import choice
from swift.container.sync_store import ContainerSyncStore
from swift.container.backend import ContainerBroker, DATADIR, SHARDED
@@ -27,7 +27,6 @@ from swift.container.reconciler import (
from swift.common import db_replicator
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPOk, HTTPAccepted
-from swift.common.exceptions import DeviceUnavailable
from swift.common.http import is_success
from swift.common.utils import Timestamp, majority_size, get_db_files
@@ -144,18 +143,37 @@ class ContainerReplicator(db_replicator.Replicator):
def find_local_handoff_for_part(self, part):
"""
- Look through devices in the ring for the first handoff device that was
- identified during job creation as available on this node.
+ Find a device in the ring that is on this node on which to place a
+ partition. Preference is given to a device that is a primary location
+ for the partition. If no such device is found then a local device with
+ weight is chosen, and failing that any local device.
+ :param part: a partition
:returns: a node entry from the ring
"""
- nodes = self.ring.get_part_nodes(part)
- more_nodes = self.ring.get_more_nodes(part)
+ if not self._local_device_ids:
+ raise RuntimeError('Cannot find local handoff; no local devices')
- for node in itertools.chain(nodes, more_nodes):
+ for node in self.ring.get_part_nodes(part):
if node['id'] in self._local_device_ids:
return node
- return None
+
+ # don't attempt to minimize handoff depth: just choose any local
+ # device, but start by only picking a device with a weight, just in
+ # case some devices are being drained...
+ local_devs_with_weight = [
+ dev for dev in self._local_device_ids.values()
+ if dev.get('weight', 0)]
+ if local_devs_with_weight:
+ return choice(local_devs_with_weight)
+
+ # we have to return something, so choose any local device..
+ node = choice(list(self._local_device_ids.values()))
+ self.logger.warning(
+ "Could not find a non-zero weight device for handoff partition "
+ "%d, falling back device %s" %
+ (part, node['device']))
+ return node
def get_reconciler_broker(self, timestamp):
"""
@@ -173,10 +191,6 @@ class ContainerReplicator(db_replicator.Replicator):
account = MISPLACED_OBJECTS_ACCOUNT
part = self.ring.get_part(account, container)
node = self.find_local_handoff_for_part(part)
- if not node:
- raise DeviceUnavailable(
- 'No mounted devices found suitable to Handoff reconciler '
- 'container %s in partition %s' % (container, part))
broker = ContainerBroker.create_broker(
os.path.join(self.root, node['device']), part, account, container,
logger=self.logger, put_timestamp=timestamp,
@@ -198,8 +212,9 @@ class ContainerReplicator(db_replicator.Replicator):
try:
reconciler = self.get_reconciler_broker(container)
- except DeviceUnavailable as e:
- self.logger.warning('DeviceUnavailable: %s', e)
+ except Exception:
+ self.logger.exception('Failed to get reconciler broker for '
+ 'container %s', container)
return False
self.logger.debug('Adding %d objects to the reconciler at %s',
len(item_list), reconciler.db_file)
diff --git a/swift/container/sharder.py b/swift/container/sharder.py
index 7c00716a2..3f2537513 100644
--- a/swift/container/sharder.py
+++ b/swift/container/sharder.py
@@ -31,7 +31,6 @@ from swift.common import internal_client
from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.direct_client import (direct_put_container,
DirectClientException)
-from swift.common.exceptions import DeviceUnavailable
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
from swift.common.ring.utils import is_local_device
from swift.common.swob import str_to_wsgi
@@ -1089,19 +1088,18 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
:param shard_range: a :class:`~swift.common.utils.ShardRange`
:param root_path: the path of the shard's root container
:param policy_index: the storage policy index
- :returns: a tuple of ``(part, broker, node_id)`` where ``part`` is the
- shard container's partition, ``broker`` is an instance of
+ :returns: a tuple of ``(part, broker, node_id, put_timestamp)`` where
+ ``part`` is the shard container's partition,
+ ``broker`` is an instance of
:class:`~swift.container.backend.ContainerBroker`,
- ``node_id`` is the id of the selected node.
+ ``node_id`` is the id of the selected node,
+ ``put_timestamp`` is the put_timestamp if the broker needed to
+ be initialized.
"""
part = self.ring.get_part(shard_range.account, shard_range.container)
node = self.find_local_handoff_for_part(part)
- put_timestamp = Timestamp.now().internal
- if not node:
- raise DeviceUnavailable(
- 'No mounted devices found suitable for creating shard broker '
- 'for %s in partition %s' % (quote(shard_range.name), part))
+ put_timestamp = Timestamp.now().internal
shard_broker = ContainerBroker.create_broker(
os.path.join(self.root, node['device']), part, shard_range.account,
shard_range.container, epoch=shard_range.epoch,
@@ -1830,18 +1828,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
quote(shard_range.name), shard_range)
self._increment_stat('cleaved', 'attempted')
policy_index = broker.storage_policy_index
- try:
- shard_part, shard_broker, node_id, put_timestamp = \
- self._get_shard_broker(shard_range, broker.root_path,
- policy_index)
- except DeviceUnavailable as duex:
- self.logger.warning(str(duex))
- self._increment_stat('cleaved', 'failure', statsd=True)
- return CLEAVE_FAILED
- else:
- return self._cleave_shard_broker(
- broker, cleaving_context, shard_range, own_shard_range,
- shard_broker, put_timestamp, shard_part, node_id)
+ shard_part, shard_broker, node_id, put_timestamp = \
+ self._get_shard_broker(shard_range, broker.root_path,
+ policy_index)
+ return self._cleave_shard_broker(
+ broker, cleaving_context, shard_range, own_shard_range,
+ shard_broker, put_timestamp, shard_part, node_id)
def _cleave(self, broker):
# Returns True if misplaced objects have been moved and the entire
@@ -2184,7 +2176,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
self.logger.info('(Override partitions: %s)',
', '.join(str(p) for p in partitions_to_shard))
self._zero_stats()
- self._local_device_ids = set()
+ self._local_device_ids = {}
dirs = []
self.ips = whataremyips(self.bind_ip)
for node in self.ring.devs:
@@ -2195,7 +2187,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if os.path.isdir(datadir):
# Populate self._local_device_ids so we can find devices for
# shard containers later
- self._local_device_ids.add(node['id'])
+ self._local_device_ids[node['id']] = node
if node['device'] not in devices_to_shard:
continue
part_filt = self._partition_dir_filter(
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 266763e0a..e57b57453 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -279,6 +279,7 @@ class FakeRing(Ring):
'zone': x % 3,
'region': x % 2,
'id': x,
+ 'weight': 1,
}
self.add_node(dev)
diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py
index deffd4151..485773a0b 100644
--- a/test/unit/common/test_db_replicator.py
+++ b/test/unit/common/test_db_replicator.py
@@ -2213,10 +2213,10 @@ class TestReplicatorSync(unittest.TestCase):
for node in self._ring.devs:
daemon = self._run_once(node)
if node['device'] == 'sdc':
- self.assertEqual(daemon._local_device_ids, set())
+ self.assertEqual(daemon._local_device_ids, {})
else:
self.assertEqual(daemon._local_device_ids,
- set([node['id']]))
+ {node['id']: node})
def test_clean_up_after_deleted_brokers(self):
broker = self._get_broker('a', 'c', node_index=0)
diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py
index 31fccd17d..ba2a3c0d4 100644
--- a/test/unit/container/test_replicator.py
+++ b/test/unit/container/test_replicator.py
@@ -2635,6 +2635,75 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
os.path.basename(rsync_calls[0][1]))
self.assertFalse(rsync_calls[1:])
+ @mock.patch('swift.common.ring.ring.Ring.get_part_nodes', return_value=[])
+ def test_find_local_handoff_for_part(self, mock_part_nodes):
+
+ with mock.patch(
+ 'swift.common.db_replicator.ring.Ring',
+ return_value=self._ring):
+ daemon = replicator.ContainerReplicator({}, logger=self.logger)
+
+ # First let's assume we find a primary node
+ ring_node1, ring_node2, ring_node3 = daemon.ring.devs[-3:]
+ mock_part_nodes.return_value = [ring_node1, ring_node2]
+ daemon._local_device_ids = {ring_node1['id']: ring_node1,
+ ring_node3['id']: ring_node3}
+ node = daemon.find_local_handoff_for_part(0)
+ self.assertEqual(node['id'], ring_node1['id'])
+
+ # And if we can't find one from the primaries get *some* local device
+ mock_part_nodes.return_value = []
+ daemon._local_device_ids = {ring_node3['id']: ring_node3}
+ node = daemon.find_local_handoff_for_part(0)
+ self.assertEqual(node['id'], ring_node3['id'])
+
+ # if there are more then 1 local_dev_id it'll randomly pick one, but
+ # not a zero-weight device
+ ring_node3['weight'] = 0
+ selected_node_ids = set()
+ local_dev_ids = {dev['id']: dev for dev in daemon.ring.devs[-3:]}
+ daemon._local_device_ids = local_dev_ids
+ for _ in range(15):
+ node = daemon.find_local_handoff_for_part(0)
+ self.assertIn(node['id'], local_dev_ids)
+ selected_node_ids.add(node['id'])
+ if len(selected_node_ids) == 3:
+ break # unexpected
+ self.assertEqual(len(selected_node_ids), 2)
+ self.assertEqual([1, 1], [local_dev_ids[dev_id]['weight']
+ for dev_id in selected_node_ids])
+ warning_lines = self.logger.get_lines_for_level('warning')
+ self.assertFalse(warning_lines)
+
+ # ...unless all devices have zero-weight
+ ring_node3['weight'] = 0
+ ring_node2['weight'] = 0
+ selected_node_ids = set()
+ local_dev_ids = {dev['id']: dev for dev in daemon.ring.devs[-2:]}
+ daemon._local_device_ids = local_dev_ids
+ for _ in range(15):
+ self.logger.clear()
+ node = daemon.find_local_handoff_for_part(0)
+ self.assertIn(node['id'], local_dev_ids)
+ selected_node_ids.add(node['id'])
+ if len(selected_node_ids) == 2:
+ break # expected
+ self.assertEqual(len(selected_node_ids), 2)
+ self.assertEqual([0, 0], [local_dev_ids[dev_id]['weight']
+ for dev_id in selected_node_ids])
+ warning_lines = self.logger.get_lines_for_level('warning')
+ self.assertEqual(1, len(warning_lines), warning_lines)
+ self.assertIn(
+ 'Could not find a non-zero weight device for handoff partition',
+ warning_lines[0])
+
+ # If there are also no local_dev_ids, then we'll get the RuntimeError
+ daemon._local_device_ids = {}
+ with self.assertRaises(RuntimeError) as dev_err:
+ daemon.find_local_handoff_for_part(0)
+ expected_error_string = 'Cannot find local handoff; no local devices'
+ self.assertEqual(str(dev_err.exception), expected_error_string)
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py
index aab8e5721..93670cc51 100644
--- a/test/unit/container/test_sharder.py
+++ b/test/unit/container/test_sharder.py
@@ -682,14 +682,14 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
- sharder._local_device_ids = {'stale_node_id'}
+ sharder._local_device_ids = {'stale_node_id': {}}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
- self.assertEqual(device_ids, sharder._local_device_ids)
+ self.assertEqual(device_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(2, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
@@ -741,14 +741,14 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker', side_effect=mock_processing
) as mock_process_broker:
- sharder._local_device_ids = {'stale_node_id'}
+ sharder._local_device_ids = {'stale_node_id': {}}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
- self.assertEqual(device_ids, sharder._local_device_ids)
+ self.assertEqual(device_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
@@ -799,10 +799,10 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
- sharder._local_device_ids = {999}
+ sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
- self.assertEqual(device_ids, sharder._local_device_ids)
+ self.assertEqual(device_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
@@ -826,7 +826,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
- sharder._local_device_ids = {999}
+ sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
@@ -866,7 +866,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
- sharder._local_device_ids = {999}
+ sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
@@ -896,7 +896,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
- sharder._local_device_ids = {999}
+ sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
@@ -908,7 +908,7 @@ class TestSharder(BaseTestSharder):
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
- sharder._local_device_ids = {999}
+ sharder._local_device_ids = {999: {}}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
@@ -1005,7 +1005,8 @@ class TestSharder(BaseTestSharder):
'swift.common.db_replicator.ring.Ring',
return_value=fake_ring):
sharder = ContainerSharder(conf, logger=self.logger)
- sharder._local_device_ids = {0, 1, 2}
+ sharder._local_device_ids = {dev['id']: dev
+ for dev in fake_ring.devs}
sharder._replicate_object = mock.MagicMock(
return_value=(True, [True] * sharder.ring.replica_count))
yield sharder
@@ -5558,9 +5559,10 @@ class TestSharder(BaseTestSharder):
self.assertEqual([], self.logger.get_lines_for_level('warning'))
# advance time
- with mock.patch('swift.container.sharder.time.time') as fake_time, \
- self._mock_sharder() as sharder:
- fake_time.return_value = 6048000 + float(delete_ts)
+ future_time = 6048000 + float(delete_ts)
+ with mock.patch(
+ 'swift.container.sharder.time.time',
+ return_value=future_time), self._mock_sharder() as sharder:
sharder._audit_container(broker)
message = 'Reclaimable db stuck waiting for shrinking: %s (%s)' % (
broker.db_file, broker.path)
@@ -5574,9 +5576,9 @@ class TestSharder(BaseTestSharder):
broker.merge_shard_ranges(shard_ranges)
# no more warning
- with mock.patch('swift.container.sharder.time.time') as fake_time, \
- self._mock_sharder() as sharder:
- fake_time.return_value = 6048000 + float(delete_ts)
+ with mock.patch(
+ 'swift.container.sharder.time.time',
+ return_value=future_time), self._mock_sharder() as sharder:
sharder._audit_container(broker)
self.assertEqual([], self.logger.get_lines_for_level('warning'))
@@ -6049,7 +6051,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once()
- self.assertEqual(dev_ids, set(sharder._local_device_ids))
+ self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set(container_data),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@@ -6061,7 +6063,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='0')
- self.assertEqual(dev_ids, set(sharder._local_device_ids))
+ self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set([container_data[0]]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@@ -6073,7 +6075,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='2,0')
- self.assertEqual(dev_ids, set(sharder._local_device_ids))
+ self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set([container_data[0], container_data[2]]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@@ -6085,7 +6087,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(partitions='2,0', devices='sdc')
- self.assertEqual(dev_ids, set(sharder._local_device_ids))
+ self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set([container_data[2]]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@@ -6097,7 +6099,7 @@ class TestSharder(BaseTestSharder):
with mock.patch.object(
sharder, '_process_broker') as mock_process_broker:
sharder.run_once(devices='sdb,sdc')
- self.assertEqual(dev_ids, set(sharder._local_device_ids))
+ self.assertEqual(dev_ids, set(sharder._local_device_ids.keys()))
self.assertEqual(set(container_data[1:]),
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
@@ -6403,6 +6405,56 @@ class TestSharder(BaseTestSharder):
self._assert_recon_stats(expected_shrinking_candidates_data,
sharder, 'shrinking_candidates')
+ @mock.patch('swift.common.ring.ring.Ring.get_part_nodes', return_value=[])
+ @mock.patch('swift.common.ring.ring.Ring.get_more_nodes', return_value=[])
+ def test_get_shard_broker_no_local_handoff_for_part(
+ self, mock_part_nodes, mock_more_nodes):
+ broker = self._make_broker()
+ broker.enable_sharding(Timestamp.now())
+
+ shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
+ shard_ranges = self._make_shard_ranges(
+ shard_bounds, state=ShardRange.CREATED)
+
+ broker.merge_shard_ranges(shard_ranges)
+ self.assertTrue(broker.set_sharding_state())
+
+ # first, let's assume there local_handoff_for_part fails because the
+ # node we're on is at zero weight for all disks. So it wont appear in
+ # the replica2part2dev table, meaning we wont get a node back.
+ # in this case, we'll fall back to one of our own devices which we
+ # determine from the ring.devs not the replica2part2dev table.
+ with self._mock_sharder() as sharder:
+ local_dev_ids = {dev['id']: dev for dev in sharder.ring.devs[-1:]}
+ sharder._local_device_ids = local_dev_ids
+ part, shard_broker, node_id, _ = sharder._get_shard_broker(
+ shard_ranges[0], broker.root_path, 0)
+ self.assertIn(node_id, local_dev_ids)
+
+ # if there are more then 1 local_dev_id it'll randomly pick one
+ selected_node_ids = set()
+ for _ in range(10):
+ with self._mock_sharder() as sharder:
+ local_dev_ids = {dev['id']: dev
+ for dev in sharder.ring.devs[-2:]}
+ sharder._local_device_ids = local_dev_ids
+ part, shard_broker, node_id, _ = sharder._get_shard_broker(
+ shard_ranges[0], broker.root_path, 0)
+ self.assertIn(node_id, local_dev_ids)
+ selected_node_ids.add(node_id)
+ if len(selected_node_ids) == 2:
+ break
+ self.assertEqual(len(selected_node_ids), 2)
+
+ # If there are also no local_dev_ids, then we'll get the RuntimeError
+ with self._mock_sharder() as sharder:
+ sharder._local_device_ids = {}
+ with self.assertRaises(RuntimeError) as dev_err:
+ sharder._get_shard_broker(shard_ranges[0], broker.root_path, 0)
+
+ expected_error_string = 'Cannot find local handoff; no local devices'
+ self.assertEqual(str(dev_err.exception), expected_error_string)
+
class TestCleavingContext(BaseTestSharder):
def test_init(self):