summaryrefslogtreecommitdiff
path: root/swift/obj/updater.py
diff options
context:
space:
mode:
authorClay Gerrard <clay.gerrard@gmail.com>2021-12-07 17:19:54 -0600
committerTim Burke <tim.burke@gmail.com>2022-01-06 12:47:09 -0800
commitde888629817a2a326b6e8dc66edb0ce3168818a7 (patch)
treea5e97452ab2ea056d7f09764606b8c85b2f6d7e1 /swift/obj/updater.py
parent1859f2e161f396023d8328489fb4b88c303797a9 (diff)
downloadswift-de888629817a2a326b6e8dc66edb0ce3168818a7.tar.gz
Finer grained ratelimit for update
Throw our stream of async_pendings through a hash ring; if the virtual bucket gets hot just start leaving the updates on the floor and move on. It's off by default; and if you use it you're probably going to leave a bunch of async updates pointed at a small set of containers in the queue for the next sweep every sweep (so maybe turn it off at some point) Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Change-Id: Idef25cd6026b02c1b5c10a9816c8c6cbe505e7ed
Diffstat (limited to 'swift/obj/updater.py')
-rw-r--r--swift/obj/updater.py182
1 files changed, 139 insertions, 43 deletions
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 6f22a97de..ae42e95cd 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -19,6 +19,7 @@ import os
import signal
import sys
import time
+import uuid
from random import random, shuffle
from eventlet import spawn, Timeout
@@ -29,7 +30,8 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
- eventlet_monkey_patch, get_redirect_data, ContextPool
+ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
+ non_negative_float, config_positive_int_value
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@@ -39,18 +41,68 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
+class BucketizedUpdateSkippingLimiter(object):
+ """
+ Wrap an iterator to filter elements that show up too often.
+
+ :param update_iterable: an async_pending update iterable
+ :param num_buckets: number of buckets to divide container hashes into, the
+ more buckets total the less containers to a bucket
+ (once a busy container slows down a bucket the whole
+ bucket starts skipping)
+ :param max_elements_per_group_per_second: tunable, when skipping kicks in
+ :param skip_f: function to call with update_ctx when skipping it
+ """
+
+ def __init__(self, update_iterable, num_buckets,
+ max_elements_per_group_per_second,
+ skip_f=lambda update_ctx: None):
+ self.iterator = iter(update_iterable)
+ # if we want a smaller "blast radius" we could make this number bigger
+ self.num_buckets = max(num_buckets, 1)
+ # an array might be more efficient; but this is pretty cheap
+ self.next_update = [0.0 for _ in range(self.num_buckets)]
+ try:
+ self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
+ except ZeroDivisionError:
+ self.bucket_update_delta = -1
+ self.skip_f = skip_f
+ self.salt = str(uuid.uuid4())
+
+ def __iter__(self):
+ return self
+
+ def _bucket_key(self, update):
+ acct, cont = split_update_path(update)
+ return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets
+
+ def next(self):
+ for update_ctx in self.iterator:
+ bucket_key = self._bucket_key(update_ctx['update'])
+ now = time.time()
+ if self.next_update[bucket_key] > now:
+ self.skip_f(update_ctx)
+ continue
+ self.next_update[bucket_key] = now + self.bucket_update_delta
+ return update_ctx
+ raise StopIteration()
+
+ __next__ = next
+
+
class SweepStats(object):
"""
Stats bucket for an update sweep
"""
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
- unlinks=0, redirects=0):
+ unlinks=0, redirects=0, skips=0):
self.errors = errors
self.failures = failures
self.quarantines = quarantines
self.successes = successes
self.unlinks = unlinks
self.redirects = redirects
+ self.skips = skips
def copy(self):
return type(self)(self.errors, self.failures, self.quarantines,
@@ -62,7 +114,8 @@ class SweepStats(object):
self.quarantines - other.quarantines,
self.successes - other.successes,
self.unlinks - other.unlinks,
- self.redirects - other.redirects)
+ self.redirects - other.redirects,
+ self.skips - other.skips)
def reset(self):
self.errors = 0
@@ -71,6 +124,7 @@ class SweepStats(object):
self.successes = 0
self.unlinks = 0
self.redirects = 0
+ self.skips = 0
def __str__(self):
keys = (
@@ -80,10 +134,26 @@ class SweepStats(object):
(self.unlinks, 'unlinks'),
(self.errors, 'errors'),
(self.redirects, 'redirects'),
+ (self.skips, 'skips'),
)
return ', '.join('%d %s' % pair for pair in keys)
+def split_update_path(update):
+ """
+ Split the account and container parts out of the async update data.
+
+ N.B. updates to shards set the container_path key while the account and
+ container keys are always the root.
+ """
+ container_path = update.get('container_path')
+ if container_path:
+ acct, cont = split_path('/' + container_path, minsegs=2)
+ else:
+ acct, cont = update['account'], update['container']
+ return acct, cont
+
+
class ObjectUpdater(Daemon):
"""Update object information in container listings."""
@@ -110,6 +180,10 @@ class ObjectUpdater(Daemon):
self.max_objects_per_second = \
float(conf.get('objects_per_second',
objects_per_second))
+ self.max_objects_per_container_per_second = non_negative_float(
+ conf.get('max_objects_per_container_per_second', 0))
+ self.per_container_ratelimit_buckets = config_positive_int_value(
+ conf.get('per_container_ratelimit_buckets', 1000))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.report_interval = float(conf.get('report_interval', 300))
@@ -205,13 +279,40 @@ class ObjectUpdater(Daemon):
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
+ def _load_update(self, device, update_path):
+ try:
+ return pickle.load(open(update_path, 'rb'))
+ except Exception as e:
+ if getattr(e, 'errno', None) == errno.ENOENT:
+ return
+ self.logger.exception(
+ 'ERROR Pickle problem, quarantining %s', update_path)
+ self.stats.quarantines += 1
+ self.logger.increment('quarantines')
+ target_path = os.path.join(device, 'quarantined', 'objects',
+ os.path.basename(update_path))
+ renamer(update_path, target_path, fsync=False)
+ try:
+ # If this was the last async_pending in the directory,
+ # then this will succeed. Otherwise, it'll fail, and
+ # that's okay.
+ os.rmdir(os.path.dirname(update_path))
+ except OSError:
+ pass
+ return
+
def _iter_async_pendings(self, device):
"""
- Locate and yield all the async pendings on the device. Multiple updates
- for the same object will come out in reverse-chronological order
- (i.e. newest first) so that callers can skip stale async_pendings.
+ Locate and yield an update context for all the async pending files on
+ the device. Each update context contains details of the async pending
+ file location, its timestamp and the un-pickled update data.
+
+ Async pending files that fail to load will be quarantined.
+
+ Only the most recent update for the same object is yielded; older
+ (stale) async pending files are unlinked as they are located.
- Tries to clean up empty directories as it goes.
+ The iterator tries to clean up empty directories as it goes.
"""
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
@@ -238,12 +339,13 @@ class ObjectUpdater(Daemon):
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
- for update in sorted(self._listdir(prefix_path), reverse=True):
- update_path = os.path.join(prefix_path, update)
+ for update_file in sorted(self._listdir(prefix_path),
+ reverse=True):
+ update_path = os.path.join(prefix_path, update_file)
if not os.path.isfile(update_path):
continue
try:
- obj_hash, timestamp = update.split('-')
+ obj_hash, timestamp = update_file.split('-')
except ValueError:
self.stats.errors += 1
self.logger.increment('errors')
@@ -280,9 +382,14 @@ class ObjectUpdater(Daemon):
raise
else:
last_obj_hash = obj_hash
- yield {'device': device, 'policy': policy,
- 'path': update_path,
- 'obj_hash': obj_hash, 'timestamp': timestamp}
+ update = self._load_update(device, update_path)
+ if update is not None:
+ yield {'device': device,
+ 'policy': policy,
+ 'update_path': update_path,
+ 'obj_hash': obj_hash,
+ 'timestamp': timestamp,
+ 'update': update}
def object_sweep(self, device):
"""
@@ -297,13 +404,21 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
+ def skip_counting_f(update_ctx):
+ # in the future we could defer update_ctx
+ self.stats.skips += 1
+ self.logger.increment("skips")
+
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
+ ap_iter = BucketizedUpdateSkippingLimiter(
+ ap_iter, self.per_container_ratelimit_buckets,
+ self.max_objects_per_container_per_second,
+ skip_f=skip_counting_f)
with ContextPool(self.concurrency) as pool:
- for update in ap_iter:
- pool.spawn(self.process_object_update,
- update['path'], update['device'], update['policy'])
+ for update_ctx in ap_iter:
+ pool.spawn(self.process_object_update, **update_ctx)
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
@@ -326,6 +441,7 @@ class ObjectUpdater(Daemon):
'%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors, '
'%(redirects)d redirects '
+ '%(skips)d skips '
'(pid: %(pid)d)'),
{'device': device,
'elapsed': time.time() - start_time,
@@ -335,36 +451,20 @@ class ObjectUpdater(Daemon):
'quarantines': sweep_totals.quarantines,
'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors,
- 'redirects': sweep_totals.redirects})
+ 'redirects': sweep_totals.redirects,
+ 'skips': sweep_totals.skips})
- def process_object_update(self, update_path, device, policy):
+ def process_object_update(self, update_path, device, policy, update,
+ **kwargs):
"""
Process the object information to be updated and update.
:param update_path: path to pickled object update file
:param device: path to device
:param policy: storage policy of object update
+ :param update: the un-pickled update data
+ :param kwargs: un-used keys from update_ctx
"""
- try:
- update = pickle.load(open(update_path, 'rb'))
- except Exception as e:
- if getattr(e, 'errno', None) == errno.ENOENT:
- return
- self.logger.exception(
- 'ERROR Pickle problem, quarantining %s', update_path)
- self.stats.quarantines += 1
- self.logger.increment('quarantines')
- target_path = os.path.join(device, 'quarantined', 'objects',
- os.path.basename(update_path))
- renamer(update_path, target_path, fsync=False)
- try:
- # If this was the last async_pending in the directory,
- # then this will succeed. Otherwise, it'll fail, and
- # that's okay.
- os.rmdir(os.path.dirname(update_path))
- except OSError:
- pass
- return
def do_update():
successes = update.get('successes', [])
@@ -374,11 +474,7 @@ class ObjectUpdater(Daemon):
str(int(policy)))
headers_out.setdefault('X-Backend-Accept-Redirect', 'true')
headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true')
- container_path = update.get('container_path')
- if container_path:
- acct, cont = split_path('/' + container_path, minsegs=2)
- else:
- acct, cont = update['account'], update['container']
+ acct, cont = split_update_path(update)
part, nodes = self.get_container_ring().get_nodes(acct, cont)
obj = '/%s/%s/%s' % (acct, cont, update['obj'])
events = [spawn(self.object_update,