diff options
author | Samuel Merritt <sam@swiftstack.com> | 2018-03-23 15:56:26 -0700 |
---|---|---|
committer | Samuel Merritt <sam@swiftstack.com> | 2018-04-25 11:18:35 -0700 |
commit | c4751d0d551ad193a205c71821f6770a31146421 (patch) | |
tree | 40dce15641c592fdb55a92ede339eb215c7330dd | |
parent | 47efb5b96974a145f1e038f1f558568de89535a6 (diff) | |
download | swift-c4751d0d551ad193a205c71821f6770a31146421.tar.gz |
Make reconstructor go faster with --override-devices
The object reconstructor will now fork all available worker processes
when operating on a subset of local devices.
Example:
A system has 24 disks, named "d1" through "d24"
reconstructor_workers = 8
invoked with --override-devices=d1,d2,d3,d4,d5,d6
In this case, the reconstructor will now use 6 worker processes, one
per disk. The old behavior was to use 2 worker processes, one for d1,
d3, and d5 and the other for d2, d4, and d6 (because 24 / 8 = 3, so we
assigned 3 disks per worker before creating another).
I think the new behavior better matches operators' expectations. If I
give a concurrent program six tasks to do and tell it to operate on up
to eight at a time, I'd expect it to do all six tasks at once, not run
two concurrent batches of three tasks apiece.
This has no effect when --override-devices is not specified. When
operating on all local devices instead of a subset, the new and old
code produce the same result.
The reconstructor's behavior now matches the object replicator's
behavior.
Change-Id: Ib308c156c77b9b92541a12dd7e9b1a8ea8307a30
-rw-r--r-- | swift/common/utils.py | 10 | ||||
-rw-r--r-- | swift/obj/reconstructor.py | 21 | ||||
-rw-r--r-- | swift/obj/replicator.py | 24 | ||||
-rw-r--r-- | test/unit/common/test_utils.py | 34 | ||||
-rw-r--r-- | test/unit/obj/test_reconstructor.py | 77 |
5 files changed, 107 insertions, 59 deletions
diff --git a/swift/common/utils.py b/swift/common/utils.py index 491b6981b..54efdf2b1 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -4740,3 +4740,13 @@ def parse_override_options(**kwargs): return OverrideOptions(devices=devices, partitions=partitions, policies=policies) + + +def distribute_evenly(items, num_buckets): + """ + Distribute items as evenly as possible into N buckets. + """ + out = [[] for _ in range(num_buckets)] + for index, item in enumerate(items): + out[index % num_buckets].append(item) + return out diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 4abe0ea56..3a9cfe99e 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -15,7 +15,6 @@ import json import errno -import math import os from os.path import join import random @@ -34,7 +33,7 @@ from swift.common.utils import ( whataremyips, unlink_older_than, compute_eta, get_logger, dump_recon_cache, mkdirs, config_true_value, tpool_reraise, GreenAsyncPile, Timestamp, remove_file, - load_recon_cache, parse_override_options) + load_recon_cache, parse_override_options, distribute_evenly) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -228,16 +227,14 @@ class ObjectReconstructor(Daemon): yield dict(override_devices=override_opts.devices, override_partitions=override_opts.partitions) return - # for somewhat uniform load per worker use same max_devices_per_worker - # when handling all devices or just override devices... - max_devices_per_worker = int(math.ceil( - 1.0 * len(self.all_local_devices) / self.reconstructor_workers)) - # ...but only use enough workers for the actual devices being handled - n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker)) - override_devices_per_worker = [devices[i::n] for i in range(n)] - for override_devices_pw in override_devices_per_worker: - yield dict(override_devices=override_devices_pw, - override_partitions=override_opts.partitions) + # for somewhat uniform load per worker use same + # max_devices_per_worker when handling all devices or just override + # devices, but only use enough workers for the actual devices being + # handled + n_workers = min(self.reconstructor_workers, len(devices)) + for ods in distribute_evenly(devices, n_workers): + yield dict(override_partitions=override_opts.partitions, + override_devices=ods) def is_healthy(self): """ diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index ba1e3b8a7..ddebdb3a9 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -35,7 +35,8 @@ from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, \ rsync_module_interpolation, mkdirs, config_true_value, \ tpool_reraise, config_auto_int_value, storage_directory, \ - load_recon_cache, PrefixLoggerAdapter, parse_override_options + load_recon_cache, PrefixLoggerAdapter, parse_override_options, \ + distribute_evenly from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE @@ -258,19 +259,14 @@ class ObjectReplicator(Daemon): # Distribute devices among workers as evenly as possible self.replicator_workers = min(self.replicator_workers, len(devices_to_replicate)) - worker_args = [ - { - 'override_devices': [], - 'override_partitions': override_opts.partitions, - 'override_policies': override_opts.policies, - 'have_overrides': have_overrides, - 'multiprocess_worker_index': i, - } - for i in range(self.replicator_workers)] - for index, device in enumerate(devices_to_replicate): - idx = index % self.replicator_workers - worker_args[idx]['override_devices'].append(device) - return worker_args + return [{'override_devices': devs, + 'override_partitions': override_opts.partitions, + 'override_policies': override_opts.policies, + 'have_overrides': have_overrides, + 'multiprocess_worker_index': index} + for index, devs in enumerate( + distribute_evenly(devices_to_replicate, + self.replicator_workers))] def is_healthy(self): """ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index c71b572e3..b9caaabf3 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -6622,5 +6622,39 @@ class TestPipeMutex(unittest.TestCase): eventlet.debug.hub_prevent_multiple_readers(True) +class TestDistributeEvenly(unittest.TestCase): + def test_evenly_divided(self): + out = utils.distribute_evenly(range(12), 3) + self.assertEqual(out, [ + [0, 3, 6, 9], + [1, 4, 7, 10], + [2, 5, 8, 11], + ]) + + out = utils.distribute_evenly(range(12), 4) + self.assertEqual(out, [ + [0, 4, 8], + [1, 5, 9], + [2, 6, 10], + [3, 7, 11], + ]) + + def test_uneven(self): + out = utils.distribute_evenly(range(11), 3) + self.assertEqual(out, [ + [0, 3, 6, 9], + [1, 4, 7, 10], + [2, 5, 8], + ]) + + def test_just_one(self): + out = utils.distribute_evenly(range(5), 1) + self.assertEqual(out, [[0, 1, 2, 3, 4]]) + + def test_more_buckets_than_items(self): + out = utils.distribute_evenly(range(5), 7) + self.assertEqual(out, [[0], [1], [2], [3], [4], [], []]) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 4007d1aa1..be05adc08 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -1474,13 +1474,15 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(2, reconstructor.reconstructor_workers) worker_args = list(reconstructor.get_worker_args( once=True, devices='sdb,sdd,sdf', partitions='99,333')) - self.assertEqual(1, len(worker_args)) - # 5 devices in total, 2 workers -> up to 3 devices per worker so a - # single worker should handle the requested override devices - self.assertEqual([ - {'override_partitions': [99, 333], 'override_devices': [ - 'sdb', 'sdd', 'sdf']}, - ], worker_args) + # 3 devices to operate on, 2 workers -> one worker gets two devices + # and the other worker just gets one + self.assertEqual([{ + 'override_partitions': [99, 333], + 'override_devices': ['sdb', 'sdf'], + }, { + 'override_partitions': [99, 333], + 'override_devices': ['sdd'], + }], worker_args) # with 4 override devices, expect 2 per worker worker_args = list(reconstructor.get_worker_args( @@ -1524,26 +1526,41 @@ class TestWorkerReconstructor(unittest.TestCase): {}, logger=self.logger) reconstructor.get_local_devices = lambda: [ 'd%s' % (i + 1) for i in range(21)] - # ... with many devices per worker, worker count is pretty granular - for i in range(1, 8): - reconstructor.reconstructor_workers = i - self.assertEqual(i, len(list(reconstructor.get_worker_args()))) - # ... then it gets sorta stair step - for i in range(9, 10): - reconstructor.reconstructor_workers = i - self.assertEqual(7, len(list(reconstructor.get_worker_args()))) - # 2-3 devices per worker - for args in reconstructor.get_worker_args(): - self.assertIn(len(args['override_devices']), (2, 3)) - for i in range(11, 20): - reconstructor.reconstructor_workers = i - self.assertEqual(11, len(list(reconstructor.get_worker_args()))) - # 1, 2 devices per worker - for args in reconstructor.get_worker_args(): - self.assertIn(len(args['override_devices']), (1, 2)) - # this is debatable, but maybe I'll argue if you're going to have - # *some* workers with > 1 device, it's better to have fewer workers - # with devices spread out evenly than a couple outliers? + + # With more devices than workers, the work is spread out as evenly + # as we can manage. When number-of-devices is a multiple of + # number-of-workers, every worker has the same number of devices to + # operate on. + reconstructor.reconstructor_workers = 7 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual([len(a['override_devices']) for a in worker_args], + [3] * 7) + + # When number-of-devices is not a multiple of number-of-workers, + # device counts differ by at most 1. + reconstructor.reconstructor_workers = 5 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual( + sorted([len(a['override_devices']) for a in worker_args]), + [4, 4, 4, 4, 5]) + + # With more workers than devices, we don't create useless workers. + # We'll only make one per device. + reconstructor.reconstructor_workers = 22 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual( + [len(a['override_devices']) for a in worker_args], + [1] * 21) + + # This is true even if we have far more workers than devices. + reconstructor.reconstructor_workers = 2 ** 16 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual( + [len(a['override_devices']) for a in worker_args], + [1] * 21) + + # Spot check one full result for sanity's sake + reconstructor.reconstructor_workers = 11 self.assertEqual([ {'override_partitions': [], 'override_devices': ['d1', 'd12']}, {'override_partitions': [], 'override_devices': ['d2', 'd13']}, @@ -1557,12 +1574,6 @@ class TestWorkerReconstructor(unittest.TestCase): {'override_partitions': [], 'override_devices': ['d10', 'd21']}, {'override_partitions': [], 'override_devices': ['d11']}, ], list(reconstructor.get_worker_args())) - # you can't get < than 1 device per worker - for i in range(21, 52): - reconstructor.reconstructor_workers = i - self.assertEqual(21, len(list(reconstructor.get_worker_args()))) - for args in reconstructor.get_worker_args(): - self.assertEqual(1, len(args['override_devices'])) def test_next_rcache_update_configured_with_stats_interval(self): now = time.time() |