summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2018-03-23 15:56:26 -0700
committerSamuel Merritt <sam@swiftstack.com>2018-04-25 11:18:35 -0700
commitc4751d0d551ad193a205c71821f6770a31146421 (patch)
tree40dce15641c592fdb55a92ede339eb215c7330dd
parent47efb5b96974a145f1e038f1f558568de89535a6 (diff)
downloadswift-c4751d0d551ad193a205c71821f6770a31146421.tar.gz
Make reconstructor go faster with --override-devices
The object reconstructor will now fork all available worker processes when operating on a subset of local devices. Example: A system has 24 disks, named "d1" through "d24" reconstructor_workers = 8 invoked with --override-devices=d1,d2,d3,d4,d5,d6 In this case, the reconstructor will now use 6 worker processes, one per disk. The old behavior was to use 2 worker processes, one for d1, d3, and d5 and the other for d2, d4, and d6 (because 24 / 8 = 3, so we assigned 3 disks per worker before creating another). I think the new behavior better matches operators' expectations. If I give a concurrent program six tasks to do and tell it to operate on up to eight at a time, I'd expect it to do all six tasks at once, not run two concurrent batches of three tasks apiece. This has no effect when --override-devices is not specified. When operating on all local devices instead of a subset, the new and old code produce the same result. The reconstructor's behavior now matches the object replicator's behavior. Change-Id: Ib308c156c77b9b92541a12dd7e9b1a8ea8307a30
-rw-r--r--swift/common/utils.py10
-rw-r--r--swift/obj/reconstructor.py21
-rw-r--r--swift/obj/replicator.py24
-rw-r--r--test/unit/common/test_utils.py34
-rw-r--r--test/unit/obj/test_reconstructor.py77
5 files changed, 107 insertions, 59 deletions
diff --git a/swift/common/utils.py b/swift/common/utils.py
index 491b6981b..54efdf2b1 100644
--- a/swift/common/utils.py
+++ b/swift/common/utils.py
@@ -4740,3 +4740,13 @@ def parse_override_options(**kwargs):
return OverrideOptions(devices=devices, partitions=partitions,
policies=policies)
+
+
+def distribute_evenly(items, num_buckets):
+ """
+ Distribute items as evenly as possible into N buckets.
+ """
+ out = [[] for _ in range(num_buckets)]
+ for index, item in enumerate(items):
+ out[index % num_buckets].append(item)
+ return out
diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py
index 4abe0ea56..3a9cfe99e 100644
--- a/swift/obj/reconstructor.py
+++ b/swift/obj/reconstructor.py
@@ -15,7 +15,6 @@
import json
import errno
-import math
import os
from os.path import join
import random
@@ -34,7 +33,7 @@ from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger,
dump_recon_cache, mkdirs, config_true_value,
tpool_reraise, GreenAsyncPile, Timestamp, remove_file,
- load_recon_cache, parse_override_options)
+ load_recon_cache, parse_override_options, distribute_evenly)
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
@@ -228,16 +227,14 @@ class ObjectReconstructor(Daemon):
yield dict(override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
return
- # for somewhat uniform load per worker use same max_devices_per_worker
- # when handling all devices or just override devices...
- max_devices_per_worker = int(math.ceil(
- 1.0 * len(self.all_local_devices) / self.reconstructor_workers))
- # ...but only use enough workers for the actual devices being handled
- n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker))
- override_devices_per_worker = [devices[i::n] for i in range(n)]
- for override_devices_pw in override_devices_per_worker:
- yield dict(override_devices=override_devices_pw,
- override_partitions=override_opts.partitions)
+ # for somewhat uniform load per worker use same
+ # max_devices_per_worker when handling all devices or just override
+ # devices, but only use enough workers for the actual devices being
+ # handled
+ n_workers = min(self.reconstructor_workers, len(devices))
+ for ods in distribute_evenly(devices, n_workers):
+ yield dict(override_partitions=override_opts.partitions,
+ override_devices=ods)
def is_healthy(self):
"""
diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py
index ba1e3b8a7..ddebdb3a9 100644
--- a/swift/obj/replicator.py
+++ b/swift/obj/replicator.py
@@ -35,7 +35,8 @@ from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, \
rsync_module_interpolation, mkdirs, config_true_value, \
tpool_reraise, config_auto_int_value, storage_directory, \
- load_recon_cache, PrefixLoggerAdapter, parse_override_options
+ load_recon_cache, PrefixLoggerAdapter, parse_override_options, \
+ distribute_evenly
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@@ -258,19 +259,14 @@ class ObjectReplicator(Daemon):
# Distribute devices among workers as evenly as possible
self.replicator_workers = min(self.replicator_workers,
len(devices_to_replicate))
- worker_args = [
- {
- 'override_devices': [],
- 'override_partitions': override_opts.partitions,
- 'override_policies': override_opts.policies,
- 'have_overrides': have_overrides,
- 'multiprocess_worker_index': i,
- }
- for i in range(self.replicator_workers)]
- for index, device in enumerate(devices_to_replicate):
- idx = index % self.replicator_workers
- worker_args[idx]['override_devices'].append(device)
- return worker_args
+ return [{'override_devices': devs,
+ 'override_partitions': override_opts.partitions,
+ 'override_policies': override_opts.policies,
+ 'have_overrides': have_overrides,
+ 'multiprocess_worker_index': index}
+ for index, devs in enumerate(
+ distribute_evenly(devices_to_replicate,
+ self.replicator_workers))]
def is_healthy(self):
"""
diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py
index c71b572e3..b9caaabf3 100644
--- a/test/unit/common/test_utils.py
+++ b/test/unit/common/test_utils.py
@@ -6622,5 +6622,39 @@ class TestPipeMutex(unittest.TestCase):
eventlet.debug.hub_prevent_multiple_readers(True)
+class TestDistributeEvenly(unittest.TestCase):
+ def test_evenly_divided(self):
+ out = utils.distribute_evenly(range(12), 3)
+ self.assertEqual(out, [
+ [0, 3, 6, 9],
+ [1, 4, 7, 10],
+ [2, 5, 8, 11],
+ ])
+
+ out = utils.distribute_evenly(range(12), 4)
+ self.assertEqual(out, [
+ [0, 4, 8],
+ [1, 5, 9],
+ [2, 6, 10],
+ [3, 7, 11],
+ ])
+
+ def test_uneven(self):
+ out = utils.distribute_evenly(range(11), 3)
+ self.assertEqual(out, [
+ [0, 3, 6, 9],
+ [1, 4, 7, 10],
+ [2, 5, 8],
+ ])
+
+ def test_just_one(self):
+ out = utils.distribute_evenly(range(5), 1)
+ self.assertEqual(out, [[0, 1, 2, 3, 4]])
+
+ def test_more_buckets_than_items(self):
+ out = utils.distribute_evenly(range(5), 7)
+ self.assertEqual(out, [[0], [1], [2], [3], [4], [], []])
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py
index 4007d1aa1..be05adc08 100644
--- a/test/unit/obj/test_reconstructor.py
+++ b/test/unit/obj/test_reconstructor.py
@@ -1474,13 +1474,15 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(2, reconstructor.reconstructor_workers)
worker_args = list(reconstructor.get_worker_args(
once=True, devices='sdb,sdd,sdf', partitions='99,333'))
- self.assertEqual(1, len(worker_args))
- # 5 devices in total, 2 workers -> up to 3 devices per worker so a
- # single worker should handle the requested override devices
- self.assertEqual([
- {'override_partitions': [99, 333], 'override_devices': [
- 'sdb', 'sdd', 'sdf']},
- ], worker_args)
+ # 3 devices to operate on, 2 workers -> one worker gets two devices
+ # and the other worker just gets one
+ self.assertEqual([{
+ 'override_partitions': [99, 333],
+ 'override_devices': ['sdb', 'sdf'],
+ }, {
+ 'override_partitions': [99, 333],
+ 'override_devices': ['sdd'],
+ }], worker_args)
# with 4 override devices, expect 2 per worker
worker_args = list(reconstructor.get_worker_args(
@@ -1524,26 +1526,41 @@ class TestWorkerReconstructor(unittest.TestCase):
{}, logger=self.logger)
reconstructor.get_local_devices = lambda: [
'd%s' % (i + 1) for i in range(21)]
- # ... with many devices per worker, worker count is pretty granular
- for i in range(1, 8):
- reconstructor.reconstructor_workers = i
- self.assertEqual(i, len(list(reconstructor.get_worker_args())))
- # ... then it gets sorta stair step
- for i in range(9, 10):
- reconstructor.reconstructor_workers = i
- self.assertEqual(7, len(list(reconstructor.get_worker_args())))
- # 2-3 devices per worker
- for args in reconstructor.get_worker_args():
- self.assertIn(len(args['override_devices']), (2, 3))
- for i in range(11, 20):
- reconstructor.reconstructor_workers = i
- self.assertEqual(11, len(list(reconstructor.get_worker_args())))
- # 1, 2 devices per worker
- for args in reconstructor.get_worker_args():
- self.assertIn(len(args['override_devices']), (1, 2))
- # this is debatable, but maybe I'll argue if you're going to have
- # *some* workers with > 1 device, it's better to have fewer workers
- # with devices spread out evenly than a couple outliers?
+
+ # With more devices than workers, the work is spread out as evenly
+ # as we can manage. When number-of-devices is a multiple of
+ # number-of-workers, every worker has the same number of devices to
+ # operate on.
+ reconstructor.reconstructor_workers = 7
+ worker_args = list(reconstructor.get_worker_args())
+ self.assertEqual([len(a['override_devices']) for a in worker_args],
+ [3] * 7)
+
+ # When number-of-devices is not a multiple of number-of-workers,
+ # device counts differ by at most 1.
+ reconstructor.reconstructor_workers = 5
+ worker_args = list(reconstructor.get_worker_args())
+ self.assertEqual(
+ sorted([len(a['override_devices']) for a in worker_args]),
+ [4, 4, 4, 4, 5])
+
+ # With more workers than devices, we don't create useless workers.
+ # We'll only make one per device.
+ reconstructor.reconstructor_workers = 22
+ worker_args = list(reconstructor.get_worker_args())
+ self.assertEqual(
+ [len(a['override_devices']) for a in worker_args],
+ [1] * 21)
+
+ # This is true even if we have far more workers than devices.
+ reconstructor.reconstructor_workers = 2 ** 16
+ worker_args = list(reconstructor.get_worker_args())
+ self.assertEqual(
+ [len(a['override_devices']) for a in worker_args],
+ [1] * 21)
+
+ # Spot check one full result for sanity's sake
+ reconstructor.reconstructor_workers = 11
self.assertEqual([
{'override_partitions': [], 'override_devices': ['d1', 'd12']},
{'override_partitions': [], 'override_devices': ['d2', 'd13']},
@@ -1557,12 +1574,6 @@ class TestWorkerReconstructor(unittest.TestCase):
{'override_partitions': [], 'override_devices': ['d10', 'd21']},
{'override_partitions': [], 'override_devices': ['d11']},
], list(reconstructor.get_worker_args()))
- # you can't get < than 1 device per worker
- for i in range(21, 52):
- reconstructor.reconstructor_workers = i
- self.assertEqual(21, len(list(reconstructor.get_worker_args())))
- for args in reconstructor.get_worker_args():
- self.assertEqual(1, len(args['override_devices']))
def test_next_rcache_update_configured_with_stats_interval(self):
now = time.time()