summaryrefslogtreecommitdiff
path: root/swift/obj/updater.py
diff options
context:
space:
mode:
authorSamuel Merritt <sam@swiftstack.com>2018-06-04 16:26:50 -0700
committerSamuel Merritt <sam@swiftstack.com>2018-06-13 17:39:34 -0700
commitd5c532a94ed84b445fa7019c87fa80413ae52f4c (patch)
treecdc296f12b0c41d741197c53a9613a2996baa1af /swift/obj/updater.py
parent3e9fdfceb449c0f9aabf9de6e5e3002aafe2de3b (diff)
downloadswift-d5c532a94ed84b445fa7019c87fa80413ae52f4c.tar.gz
object-updater: add concurrent updates
The object updater now supports two configuration settings: "concurrency" and "updater_workers". The latter controls how many worker processes are spawned, while the former controls how many concurrent container updates are performed by each worker process. This should speed the processing of async_pendings. There is a change to the semantics of the configuration options. Previously, "concurrency" controlled the number of worker processes spawned, and "updater_workers" did not exist. I switched the meanings for consistency with other configuration options. In the object reconstructor, object replicator, object server, object expirer, container replicator, container server, account replicator, account server, and account reaper, "concurrency" refers to the number of concurrent tasks performed within one process (for reference, the container updater and object auditor use "concurrency" to mean number of processes). On upgrade, a node configured with concurrency=N will still handle async updates N-at-a-time, but will do so using only one process instead of N. UpgradeImpact: If you have a config file like this: [object-updater] concurrency = <N> and you want to take advantage of faster updates, then do this: [object-updater] concurrency = 8 # the default; you can omit this line updater_workers = <N> If you want updates to be processed exactly as before, do this: [object-updater] concurrency = 1 updater_workers = <N> Change-Id: I17e18088e61f664e1b9942d66423666d0cae1689
Diffstat (limited to 'swift/obj/updater.py')
-rw-r--r--swift/obj/updater.py88
1 files changed, 56 insertions, 32 deletions
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 6f8419d32..a189b9c76 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -29,7 +29,7 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
- eventlet_monkey_patch, get_redirect_data
+ eventlet_monkey_patch, get_redirect_data, ContextPool
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@@ -94,7 +94,8 @@ class ObjectUpdater(Daemon):
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.interval = int(conf.get('interval', 300))
self.container_ring = None
- self.concurrency = int(conf.get('concurrency', 1))
+ self.concurrency = int(conf.get('concurrency', 8))
+ self.updater_workers = int(conf.get('updater_workers', 1))
if 'slowdown' in conf:
self.logger.warning(
'The slowdown option is deprecated in favor of '
@@ -150,7 +151,7 @@ class ObjectUpdater(Daemon):
self.logger.warning(
_('Skipping %s as it is not mounted'), device)
continue
- while len(pids) >= self.concurrency:
+ while len(pids) >= self.updater_workers:
pids.remove(os.wait()[0])
pid = os.fork()
if pid:
@@ -230,6 +231,7 @@ class ObjectUpdater(Daemon):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
+ last_obj_hash = None
for update in sorted(self._listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
@@ -244,13 +246,34 @@ class ObjectUpdater(Daemon):
'name %s')
% (update_path))
continue
- yield {'device': device, 'policy': policy,
- 'path': update_path,
- 'obj_hash': obj_hash, 'timestamp': timestamp}
- try:
- os.rmdir(prefix_path)
- except OSError:
- pass
+ # Async pendings are stored on disk like this:
+ #
+ # <device>/async_pending/<suffix>/<obj_hash>-<timestamp>
+ #
+ # If there are multiple updates for a given object,
+ # they'll look like this:
+ #
+ # <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp1>
+ # <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp2>
+ # <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp3>
+ #
+ # Async updates also have the property that newer
+ # updates contain all the information in older updates.
+ # Since we sorted the directory listing in reverse
+ # order, we'll see timestamp3 first, yield it, and then
+ # unlink timestamp2 and timestamp1 since we know they
+ # are obsolete.
+ #
+ # This way, our caller only gets useful async_pendings.
+ if obj_hash == last_obj_hash:
+ self.stats.unlinks += 1
+ self.logger.increment('unlinks')
+ os.unlink(update_path)
+ else:
+ last_obj_hash = obj_hash
+ yield {'device': device, 'policy': policy,
+ 'path': update_path,
+ 'obj_hash': obj_hash, 'timestamp': timestamp}
def object_sweep(self, device):
"""
@@ -265,31 +288,25 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
- last_obj_hash = None
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
- for update in ap_iter:
- if update['obj_hash'] == last_obj_hash:
- self.stats.unlinks += 1
- self.logger.increment('unlinks')
- os.unlink(update['path'])
- else:
- self.process_object_update(update['path'], update['device'],
- update['policy'])
- last_obj_hash = update['obj_hash']
-
- now = time.time()
- if now - last_status_update >= self.report_interval:
- this_sweep = self.stats.since(start_stats)
- self.logger.info(
- ('Object update sweep progress on %(device)s: '
- '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
- {'device': device,
- 'elapsed': now - start_time,
- 'pid': my_pid,
- 'stats': this_sweep})
- last_status_update = now
+ with ContextPool(self.concurrency) as pool:
+ for update in ap_iter:
+ pool.spawn(self.process_object_update,
+ update['path'], update['device'], update['policy'])
+ now = time.time()
+ if now - last_status_update >= self.report_interval:
+ this_sweep = self.stats.since(start_stats)
+ self.logger.info(
+ ('Object update sweep progress on %(device)s: '
+ '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
+ {'device': device,
+ 'elapsed': now - start_time,
+ 'pid': my_pid,
+ 'stats': this_sweep})
+ last_status_update = now
+ pool.waitall()
self.logger.timing_since('timing', start_time)
sweep_totals = self.stats.since(start_stats)
@@ -370,6 +387,13 @@ class ObjectUpdater(Daemon):
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
+ try:
+ # If this was the last async_pending in the directory,
+ # then this will succeed. Otherwise, it'll fail, and
+ # that's okay.
+ os.rmdir(os.path.dirname(update_path))
+ except OSError:
+ pass
elif redirects:
# erase any previous successes
update.pop('successes', None)