summaryrefslogtreecommitdiff
path: root/swift/obj/auditor.py
diff options
context:
space:
mode:
authorEamonn O'Toole <eamonn.otoole@hp.com>2014-03-26 16:32:07 +0000
committerEamonn O'Toole <eamonn.otoole@hp.com>2014-06-25 16:57:53 +0100
commitd317888a7eae276ae9dddf26a9030d01f6ba00fe (patch)
treecefb2e24c173ae7d3c71342774136bd561099e25 /swift/obj/auditor.py
parent92fb1c15dad91f705384c1c52b4750643ca8ac27 (diff)
downloadswift-d317888a7eae276ae9dddf26a9030d01f6ba00fe.tar.gz
Parallel object auditor
We are soon going to put servers with a high ratio of disk to CPU into production as object servers. One of our concerns with this configuration is that the object auditor would take too long to complete its audit cycle. Therefore we decided to parallelise the auditor. The auditor already uses fork(), so we decided to use the parallel model from the replicator. Concurrency is set by the concurrency parameter in the auditor stanza, which sets the number of parallel checksum auditors. The actual number of parallel auditing processes is concurrency + 1 if zero_byte_fps is non-zero. Only one ZBF process is forked, and a new ZBF process is forked as soon as the current ZBF process finishes. Thus the last process running will always be a ZBF process. Both forever and once modes are parallelised. Each checksum auditor process submits a nested dictionary with keys {'object_auditor_stats_ALL': {'diskn': {..}}} to dump_recon_cache so that the object_auditor_stats_ALL dict in recon cache consists of individual sub-dicts for each of the object disks on the server. The recon cache is no different to before when the checksum auditor is run in serial mode. When swift-recon is run, it sums the stats for the individual disks. DocImpact Change-Id: I0ce3db57a43e482d4be351cc522fc9060af6e2d3
Diffstat (limited to 'swift/obj/auditor.py')
-rw-r--r--swift/obj/auditor.py38
1 files changed, 35 insertions, 3 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index 13d34c18c..a50375bed 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -17,6 +17,7 @@ import os
import sys
import time
import signal
+from random import shuffle
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
@@ -72,7 +73,10 @@ class AuditorWorker(object):
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
- description = _(' - %s') % device_dir_str
+ if self.auditor_type == 'ALL':
+ description = _(' - parallel, %s') % device_dir_str
+ else:
+ description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
(mode, self.auditor_type, description))
begin = reported = time.time()
@@ -223,6 +227,7 @@ class ObjectAuditor(Daemon):
self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
+ self.concurrency = int(conf.get('concurrency', 1))
self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50))
self.recon_cache_path = conf.get('recon_cache_path',
@@ -260,7 +265,7 @@ class ObjectAuditor(Daemon):
sys.exit()
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
- """Audit loop"""
+ """Parallel audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
@@ -272,7 +277,34 @@ class ObjectAuditor(Daemon):
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
- pids.append(self.fork_child(**kwargs))
+ if self.concurrency == 1:
+ # Audit all devices in 1 process
+ pids.append(self.fork_child(**kwargs))
+ else:
+ # Divide devices amongst parallel processes set by
+ # self.concurrency. Total number of parallel processes
+ # is self.concurrency + 1 if zero_byte_fps.
+ parallel_proc = self.concurrency + 1 if \
+ self.conf_zero_byte_fps else self.concurrency
+ device_list = list(override_devices) if override_devices else \
+ listdir(self.devices)
+ shuffle(device_list)
+ while device_list:
+ pid = None
+ if len(pids) == parallel_proc:
+ pid = os.wait()[0]
+ pids.remove(pid)
+ # ZBF scanner must be restarted as soon as it finishes
+ if self.conf_zero_byte_fps and pid == zbf_pid:
+ kwargs['device_dirs'] = override_devices
+ # sleep between ZBF scanner forks
+ self._sleep()
+ zbf_pid = self.fork_child(zero_byte_fps=True,
+ **kwargs)
+ pids.append(zbf_pid)
+ else:
+ kwargs['device_dirs'] = [device_list.pop()]
+ pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes