diff options
author | Clay Gerrard <clay.gerrard@gmail.com> | 2021-12-07 17:19:54 -0600 |
---|---|---|
committer | Tim Burke <tim.burke@gmail.com> | 2022-01-06 12:47:09 -0800 |
commit | de888629817a2a326b6e8dc66edb0ce3168818a7 (patch) | |
tree | a5e97452ab2ea056d7f09764606b8c85b2f6d7e1 /swift/obj/updater.py | |
parent | 1859f2e161f396023d8328489fb4b88c303797a9 (diff) | |
download | swift-de888629817a2a326b6e8dc66edb0ce3168818a7.tar.gz |
Finer grained ratelimit for update
Throw our stream of async_pendings through a hash ring; if the virtual
bucket gets hot just start leaving the updates on the floor and move on.
It's off by default; and if you use it you're probably going to leave a
bunch of async updates pointed at a small set of containers in the queue
for the next sweep every sweep (so maybe turn it off at some point)
Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Change-Id: Idef25cd6026b02c1b5c10a9816c8c6cbe505e7ed
Diffstat (limited to 'swift/obj/updater.py')
-rw-r--r-- | swift/obj/updater.py | 182 |
1 files changed, 139 insertions, 43 deletions
diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 6f22a97de..ae42e95cd 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -19,6 +19,7 @@ import os import signal import sys import time +import uuid from random import random, shuffle from eventlet import spawn, Timeout @@ -29,7 +30,8 @@ from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ - eventlet_monkey_patch, get_redirect_data, ContextPool + eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ + non_negative_float, config_positive_int_value from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -39,18 +41,68 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY +class BucketizedUpdateSkippingLimiter(object): + """ + Wrap an iterator to filter elements that show up too often. + + :param update_iterable: an async_pending update iterable + :param num_buckets: number of buckets to divide container hashes into, the + more buckets total the less containers to a bucket + (once a busy container slows down a bucket the whole + bucket starts skipping) + :param max_elements_per_group_per_second: tunable, when skipping kicks in + :param skip_f: function to call with update_ctx when skipping it + """ + + def __init__(self, update_iterable, num_buckets, + max_elements_per_group_per_second, + skip_f=lambda update_ctx: None): + self.iterator = iter(update_iterable) + # if we want a smaller "blast radius" we could make this number bigger + self.num_buckets = max(num_buckets, 1) + # an array might be more efficient; but this is pretty cheap + self.next_update = [0.0 for _ in range(self.num_buckets)] + try: + self.bucket_update_delta = 1.0 / max_elements_per_group_per_second + except ZeroDivisionError: + self.bucket_update_delta = -1 + self.skip_f = skip_f + self.salt = str(uuid.uuid4()) + + def __iter__(self): + return self + + def _bucket_key(self, update): + acct, cont = split_update_path(update) + return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets + + def next(self): + for update_ctx in self.iterator: + bucket_key = self._bucket_key(update_ctx['update']) + now = time.time() + if self.next_update[bucket_key] > now: + self.skip_f(update_ctx) + continue + self.next_update[bucket_key] = now + self.bucket_update_delta + return update_ctx + raise StopIteration() + + __next__ = next + + class SweepStats(object): """ Stats bucket for an update sweep """ def __init__(self, errors=0, failures=0, quarantines=0, successes=0, - unlinks=0, redirects=0): + unlinks=0, redirects=0, skips=0): self.errors = errors self.failures = failures self.quarantines = quarantines self.successes = successes self.unlinks = unlinks self.redirects = redirects + self.skips = skips def copy(self): return type(self)(self.errors, self.failures, self.quarantines, @@ -62,7 +114,8 @@ class SweepStats(object): self.quarantines - other.quarantines, self.successes - other.successes, self.unlinks - other.unlinks, - self.redirects - other.redirects) + self.redirects - other.redirects, + self.skips - other.skips) def reset(self): self.errors = 0 @@ -71,6 +124,7 @@ class SweepStats(object): self.successes = 0 self.unlinks = 0 self.redirects = 0 + self.skips = 0 def __str__(self): keys = ( @@ -80,10 +134,26 @@ class SweepStats(object): (self.unlinks, 'unlinks'), (self.errors, 'errors'), (self.redirects, 'redirects'), + (self.skips, 'skips'), ) return ', '.join('%d %s' % pair for pair in keys) +def split_update_path(update): + """ + Split the account and container parts out of the async update data. + + N.B. updates to shards set the container_path key while the account and + container keys are always the root. + """ + container_path = update.get('container_path') + if container_path: + acct, cont = split_path('/' + container_path, minsegs=2) + else: + acct, cont = update['account'], update['container'] + return acct, cont + + class ObjectUpdater(Daemon): """Update object information in container listings.""" @@ -110,6 +180,10 @@ class ObjectUpdater(Daemon): self.max_objects_per_second = \ float(conf.get('objects_per_second', objects_per_second)) + self.max_objects_per_container_per_second = non_negative_float( + conf.get('max_objects_per_container_per_second', 0)) + self.per_container_ratelimit_buckets = config_positive_int_value( + conf.get('per_container_ratelimit_buckets', 1000)) self.node_timeout = float(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.report_interval = float(conf.get('report_interval', 300)) @@ -205,13 +279,40 @@ class ObjectUpdater(Daemon): dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger) + def _load_update(self, device, update_path): + try: + return pickle.load(open(update_path, 'rb')) + except Exception as e: + if getattr(e, 'errno', None) == errno.ENOENT: + return + self.logger.exception( + 'ERROR Pickle problem, quarantining %s', update_path) + self.stats.quarantines += 1 + self.logger.increment('quarantines') + target_path = os.path.join(device, 'quarantined', 'objects', + os.path.basename(update_path)) + renamer(update_path, target_path, fsync=False) + try: + # If this was the last async_pending in the directory, + # then this will succeed. Otherwise, it'll fail, and + # that's okay. + os.rmdir(os.path.dirname(update_path)) + except OSError: + pass + return + def _iter_async_pendings(self, device): """ - Locate and yield all the async pendings on the device. Multiple updates - for the same object will come out in reverse-chronological order - (i.e. newest first) so that callers can skip stale async_pendings. + Locate and yield an update context for all the async pending files on + the device. Each update context contains details of the async pending + file location, its timestamp and the un-pickled update data. + + Async pending files that fail to load will be quarantined. + + Only the most recent update for the same object is yielded; older + (stale) async pending files are unlinked as they are located. - Tries to clean up empty directories as it goes. + The iterator tries to clean up empty directories as it goes. """ # loop through async pending dirs for all policies for asyncdir in self._listdir(device): @@ -238,12 +339,13 @@ class ObjectUpdater(Daemon): if not os.path.isdir(prefix_path): continue last_obj_hash = None - for update in sorted(self._listdir(prefix_path), reverse=True): - update_path = os.path.join(prefix_path, update) + for update_file in sorted(self._listdir(prefix_path), + reverse=True): + update_path = os.path.join(prefix_path, update_file) if not os.path.isfile(update_path): continue try: - obj_hash, timestamp = update.split('-') + obj_hash, timestamp = update_file.split('-') except ValueError: self.stats.errors += 1 self.logger.increment('errors') @@ -280,9 +382,14 @@ class ObjectUpdater(Daemon): raise else: last_obj_hash = obj_hash - yield {'device': device, 'policy': policy, - 'path': update_path, - 'obj_hash': obj_hash, 'timestamp': timestamp} + update = self._load_update(device, update_path) + if update is not None: + yield {'device': device, + 'policy': policy, + 'update_path': update_path, + 'obj_hash': obj_hash, + 'timestamp': timestamp, + 'update': update} def object_sweep(self, device): """ @@ -297,13 +404,21 @@ class ObjectUpdater(Daemon): self.logger.info("Object update sweep starting on %s (pid: %d)", device, my_pid) + def skip_counting_f(update_ctx): + # in the future we could defer update_ctx + self.stats.skips += 1 + self.logger.increment("skips") + ap_iter = RateLimitedIterator( self._iter_async_pendings(device), elements_per_second=self.max_objects_per_second) + ap_iter = BucketizedUpdateSkippingLimiter( + ap_iter, self.per_container_ratelimit_buckets, + self.max_objects_per_container_per_second, + skip_f=skip_counting_f) with ContextPool(self.concurrency) as pool: - for update in ap_iter: - pool.spawn(self.process_object_update, - update['path'], update['device'], update['policy']) + for update_ctx in ap_iter: + pool.spawn(self.process_object_update, **update_ctx) now = time.time() if now - last_status_update >= self.report_interval: this_sweep = self.stats.since(start_stats) @@ -326,6 +441,7 @@ class ObjectUpdater(Daemon): '%(quarantines)d quarantines, ' '%(unlinks)d unlinks, %(errors)d errors, ' '%(redirects)d redirects ' + '%(skips)d skips ' '(pid: %(pid)d)'), {'device': device, 'elapsed': time.time() - start_time, @@ -335,36 +451,20 @@ class ObjectUpdater(Daemon): 'quarantines': sweep_totals.quarantines, 'unlinks': sweep_totals.unlinks, 'errors': sweep_totals.errors, - 'redirects': sweep_totals.redirects}) + 'redirects': sweep_totals.redirects, + 'skips': sweep_totals.skips}) - def process_object_update(self, update_path, device, policy): + def process_object_update(self, update_path, device, policy, update, + **kwargs): """ Process the object information to be updated and update. :param update_path: path to pickled object update file :param device: path to device :param policy: storage policy of object update + :param update: the un-pickled update data + :param kwargs: un-used keys from update_ctx """ - try: - update = pickle.load(open(update_path, 'rb')) - except Exception as e: - if getattr(e, 'errno', None) == errno.ENOENT: - return - self.logger.exception( - 'ERROR Pickle problem, quarantining %s', update_path) - self.stats.quarantines += 1 - self.logger.increment('quarantines') - target_path = os.path.join(device, 'quarantined', 'objects', - os.path.basename(update_path)) - renamer(update_path, target_path, fsync=False) - try: - # If this was the last async_pending in the directory, - # then this will succeed. Otherwise, it'll fail, and - # that's okay. - os.rmdir(os.path.dirname(update_path)) - except OSError: - pass - return def do_update(): successes = update.get('successes', []) @@ -374,11 +474,7 @@ class ObjectUpdater(Daemon): str(int(policy))) headers_out.setdefault('X-Backend-Accept-Redirect', 'true') headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true') - container_path = update.get('container_path') - if container_path: - acct, cont = split_path('/' + container_path, minsegs=2) - else: - acct, cont = update['account'], update['container'] + acct, cont = split_update_path(update) part, nodes = self.get_container_ring().get_nodes(acct, cont) obj = '/%s/%s/%s' % (acct, cont, update['obj']) events = [spawn(self.object_update, |