diff options
author | Samuel Merritt <sam@swiftstack.com> | 2018-06-04 16:26:50 -0700 |
---|---|---|
committer | Samuel Merritt <sam@swiftstack.com> | 2018-06-13 17:39:34 -0700 |
commit | d5c532a94ed84b445fa7019c87fa80413ae52f4c (patch) | |
tree | cdc296f12b0c41d741197c53a9613a2996baa1af /swift/obj/updater.py | |
parent | 3e9fdfceb449c0f9aabf9de6e5e3002aafe2de3b (diff) | |
download | swift-d5c532a94ed84b445fa7019c87fa80413ae52f4c.tar.gz |
object-updater: add concurrent updates
The object updater now supports two configuration settings:
"concurrency" and "updater_workers". The latter controls how many
worker processes are spawned, while the former controls how many
concurrent container updates are performed by each worker
process. This should speed the processing of async_pendings.
There is a change to the semantics of the configuration
options. Previously, "concurrency" controlled the number of worker
processes spawned, and "updater_workers" did not exist. I switched the
meanings for consistency with other configuration options. In the
object reconstructor, object replicator, object server, object
expirer, container replicator, container server, account replicator,
account server, and account reaper, "concurrency" refers to the number
of concurrent tasks performed within one process (for reference, the
container updater and object auditor use "concurrency" to mean number
of processes).
On upgrade, a node configured with concurrency=N will still handle
async updates N-at-a-time, but will do so using only one process
instead of N.
UpgradeImpact:
If you have a config file like this:
[object-updater]
concurrency = <N>
and you want to take advantage of faster updates, then do this:
[object-updater]
concurrency = 8 # the default; you can omit this line
updater_workers = <N>
If you want updates to be processed exactly as before, do this:
[object-updater]
concurrency = 1
updater_workers = <N>
Change-Id: I17e18088e61f664e1b9942d66423666d0cae1689
Diffstat (limited to 'swift/obj/updater.py')
-rw-r--r-- | swift/obj/updater.py | 88 |
1 files changed, 56 insertions, 32 deletions
diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 6f8419d32..a189b9c76 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -29,7 +29,7 @@ from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ - eventlet_monkey_patch, get_redirect_data + eventlet_monkey_patch, get_redirect_data, ContextPool from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -94,7 +94,8 @@ class ObjectUpdater(Daemon): self.swift_dir = conf.get('swift_dir', '/etc/swift') self.interval = int(conf.get('interval', 300)) self.container_ring = None - self.concurrency = int(conf.get('concurrency', 1)) + self.concurrency = int(conf.get('concurrency', 8)) + self.updater_workers = int(conf.get('updater_workers', 1)) if 'slowdown' in conf: self.logger.warning( 'The slowdown option is deprecated in favor of ' @@ -150,7 +151,7 @@ class ObjectUpdater(Daemon): self.logger.warning( _('Skipping %s as it is not mounted'), device) continue - while len(pids) >= self.concurrency: + while len(pids) >= self.updater_workers: pids.remove(os.wait()[0]) pid = os.fork() if pid: @@ -230,6 +231,7 @@ class ObjectUpdater(Daemon): prefix_path = os.path.join(async_pending, prefix) if not os.path.isdir(prefix_path): continue + last_obj_hash = None for update in sorted(self._listdir(prefix_path), reverse=True): update_path = os.path.join(prefix_path, update) if not os.path.isfile(update_path): @@ -244,13 +246,34 @@ class ObjectUpdater(Daemon): 'name %s') % (update_path)) continue - yield {'device': device, 'policy': policy, - 'path': update_path, - 'obj_hash': obj_hash, 'timestamp': timestamp} - try: - os.rmdir(prefix_path) - except OSError: - pass + # Async pendings are stored on disk like this: + # + # <device>/async_pending/<suffix>/<obj_hash>-<timestamp> + # + # If there are multiple updates for a given object, + # they'll look like this: + # + # <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp1> + # <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp2> + # <device>/async_pending/<obj_suffix>/<obj_hash>-<timestamp3> + # + # Async updates also have the property that newer + # updates contain all the information in older updates. + # Since we sorted the directory listing in reverse + # order, we'll see timestamp3 first, yield it, and then + # unlink timestamp2 and timestamp1 since we know they + # are obsolete. + # + # This way, our caller only gets useful async_pendings. + if obj_hash == last_obj_hash: + self.stats.unlinks += 1 + self.logger.increment('unlinks') + os.unlink(update_path) + else: + last_obj_hash = obj_hash + yield {'device': device, 'policy': policy, + 'path': update_path, + 'obj_hash': obj_hash, 'timestamp': timestamp} def object_sweep(self, device): """ @@ -265,31 +288,25 @@ class ObjectUpdater(Daemon): self.logger.info("Object update sweep starting on %s (pid: %d)", device, my_pid) - last_obj_hash = None ap_iter = RateLimitedIterator( self._iter_async_pendings(device), elements_per_second=self.max_objects_per_second) - for update in ap_iter: - if update['obj_hash'] == last_obj_hash: - self.stats.unlinks += 1 - self.logger.increment('unlinks') - os.unlink(update['path']) - else: - self.process_object_update(update['path'], update['device'], - update['policy']) - last_obj_hash = update['obj_hash'] - - now = time.time() - if now - last_status_update >= self.report_interval: - this_sweep = self.stats.since(start_stats) - self.logger.info( - ('Object update sweep progress on %(device)s: ' - '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), - {'device': device, - 'elapsed': now - start_time, - 'pid': my_pid, - 'stats': this_sweep}) - last_status_update = now + with ContextPool(self.concurrency) as pool: + for update in ap_iter: + pool.spawn(self.process_object_update, + update['path'], update['device'], update['policy']) + now = time.time() + if now - last_status_update >= self.report_interval: + this_sweep = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep progress on %(device)s: ' + '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), + {'device': device, + 'elapsed': now - start_time, + 'pid': my_pid, + 'stats': this_sweep}) + last_status_update = now + pool.waitall() self.logger.timing_since('timing', start_time) sweep_totals = self.stats.since(start_stats) @@ -370,6 +387,13 @@ class ObjectUpdater(Daemon): self.stats.unlinks += 1 self.logger.increment('unlinks') os.unlink(update_path) + try: + # If this was the last async_pending in the directory, + # then this will succeed. Otherwise, it'll fail, and + # that's okay. + os.rmdir(os.path.dirname(update_path)) + except OSError: + pass elif redirects: # erase any previous successes update.pop('successes', None) |