diff options
author | Eamonn O'Toole <eamonn.otoole@hp.com> | 2014-03-26 16:32:07 +0000 |
---|---|---|
committer | Eamonn O'Toole <eamonn.otoole@hp.com> | 2014-06-25 16:57:53 +0100 |
commit | d317888a7eae276ae9dddf26a9030d01f6ba00fe (patch) | |
tree | cefb2e24c173ae7d3c71342774136bd561099e25 /swift/obj/auditor.py | |
parent | 92fb1c15dad91f705384c1c52b4750643ca8ac27 (diff) | |
download | swift-d317888a7eae276ae9dddf26a9030d01f6ba00fe.tar.gz |
Parallel object auditor
We are soon going to put servers with a high ratio of disk to CPU
into production as object servers. One of our concerns with this
configuration is that the object auditor would take too long to
complete its audit cycle. Therefore we decided to parallelise
the auditor.
The auditor already uses fork(), so we decided to use the parallel
model from the replicator. Concurrency is set by the concurrency
parameter in the auditor stanza, which sets the number of parallel
checksum auditors. The actual number of parallel auditing processes
is concurrency + 1 if zero_byte_fps is non-zero.
Only one ZBF process is forked, and a new ZBF process is forked as
soon as the current ZBF process finishes. Thus the last process
running will always be a ZBF process.
Both forever and once modes are parallelised.
Each checksum auditor process submits a nested dictionary with keys
{'object_auditor_stats_ALL': {'diskn': {..}}} to dump_recon_cache
so that the object_auditor_stats_ALL dict in recon cache consists
of individual sub-dicts for each of the object disks on the server.
The recon cache is no different to before when the checksum auditor
is run in serial mode. When swift-recon is run, it sums the stats
for the individual disks.
DocImpact
Change-Id: I0ce3db57a43e482d4be351cc522fc9060af6e2d3
Diffstat (limited to 'swift/obj/auditor.py')
-rw-r--r-- | swift/obj/auditor.py | 38 |
1 files changed, 35 insertions, 3 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 13d34c18c..a50375bed 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -17,6 +17,7 @@ import os import sys import time import signal +from random import shuffle from swift import gettext_ as _ from contextlib import closing from eventlet import Timeout @@ -72,7 +73,10 @@ class AuditorWorker(object): description = '' if device_dirs: device_dir_str = ','.join(sorted(device_dirs)) - description = _(' - %s') % device_dir_str + if self.auditor_type == 'ALL': + description = _(' - parallel, %s') % device_dir_str + else: + description = _(' - %s') % device_dir_str self.logger.info(_('Begin object audit "%s" mode (%s%s)') % (mode, self.auditor_type, description)) begin = reported = time.time() @@ -223,6 +227,7 @@ class ObjectAuditor(Daemon): self.conf = conf self.logger = get_logger(conf, log_route='object-auditor') self.devices = conf.get('devices', '/srv/node') + self.concurrency = int(conf.get('concurrency', 1)) self.conf_zero_byte_fps = int( conf.get('zero_byte_files_per_second', 50)) self.recon_cache_path = conf.get('recon_cache_path', @@ -260,7 +265,7 @@ class ObjectAuditor(Daemon): sys.exit() def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs): - """Audit loop""" + """Parallel audit loop""" self.clear_recon_cache('ALL') self.clear_recon_cache('ZBF') kwargs['device_dirs'] = override_devices @@ -272,7 +277,34 @@ class ObjectAuditor(Daemon): if self.conf_zero_byte_fps: zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs) pids.append(zbf_pid) - pids.append(self.fork_child(**kwargs)) + if self.concurrency == 1: + # Audit all devices in 1 process + pids.append(self.fork_child(**kwargs)) + else: + # Divide devices amongst parallel processes set by + # self.concurrency. Total number of parallel processes + # is self.concurrency + 1 if zero_byte_fps. + parallel_proc = self.concurrency + 1 if \ + self.conf_zero_byte_fps else self.concurrency + device_list = list(override_devices) if override_devices else \ + listdir(self.devices) + shuffle(device_list) + while device_list: + pid = None + if len(pids) == parallel_proc: + pid = os.wait()[0] + pids.remove(pid) + # ZBF scanner must be restarted as soon as it finishes + if self.conf_zero_byte_fps and pid == zbf_pid: + kwargs['device_dirs'] = override_devices + # sleep between ZBF scanner forks + self._sleep() + zbf_pid = self.fork_child(zero_byte_fps=True, + **kwargs) + pids.append(zbf_pid) + else: + kwargs['device_dirs'] = [device_list.pop()] + pids.append(self.fork_child(**kwargs)) while pids: pid = os.wait()[0] # ZBF scanner must be restarted as soon as it finishes |