diff options
author | Alistair Coles <alistairncoles@gmail.com> | 2022-03-23 18:26:50 +0000 |
---|---|---|
committer | Alistair Coles <alistairncoles@gmail.com> | 2022-05-04 11:22:50 +0100 |
commit | 5227cb702b744bbe9aaecfff002604be45f64a8c (patch) | |
tree | 602bae1a65d9b331849a91fa8563ffc79398aad5 /swift/obj | |
parent | 954032d5d245411e676a5722b7e581c97a0aaaf7 (diff) | |
download | swift-5227cb702b744bbe9aaecfff002604be45f64a8c.tar.gz |
Refactor rate-limiting helper into a class
Replaces the ratelimit_sleep helper function with an
EventletRateLimiter class that encapsulates the rate-limiting state
that previously needed to be maintained by the caller of the function.
The ratelimit_sleep function is retained but deprecated, and now
forwards to the EventletRateLimiter class.
The object updater's BucketizedUpdateSkippingLimiter is refactored to
take advantage of the new EventletRateLimiter class.
The rate limiting algorithm is corrected to make the allowed request
rate more uniform: previously pairs of requests would be allowed in
rapid succession before the rate limiter would the sleep for the time
allowance consumed by those two requests; now the rate limiter will
sleep as required after each allowed request. For example, before a
max_rate of 1 per second might result in 2 requests being allowed
followed by a 2 second sleep. That is corrected to be a sleep of 1
second after each request.
Change-Id: Ibcf4dbeb4332dee7e9e233473d4ceaf75a5a85c7
Diffstat (limited to 'swift/obj')
-rw-r--r-- | swift/obj/auditor.py | 16 | ||||
-rw-r--r-- | swift/obj/updater.py | 38 |
2 files changed, 24 insertions, 30 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index e0d95bb4d..f9013748a 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -30,7 +30,7 @@ from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES from swift.common.utils import ( config_auto_int_value, dump_recon_cache, get_logger, list_from_csv, - listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep, + listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter, readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter) from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH @@ -85,8 +85,10 @@ class AuditorWorker(object): self.auditor_type = 'ZBF' self.log_time = int(conf.get('log_time', 3600)) self.last_logged = 0 - self.files_running_time = 0 - self.bytes_running_time = 0 + self.files_rate_limiter = EventletRateLimiter( + self.max_files_per_second) + self.bytes_rate_limiter = EventletRateLimiter( + self.max_bytes_per_second) self.bytes_processed = 0 self.total_bytes_processed = 0 self.total_files_processed = 0 @@ -146,8 +148,7 @@ class AuditorWorker(object): loop_time = time.time() self.failsafe_object_audit(location) self.logger.timing_since('timing', loop_time) - self.files_running_time = ratelimit_sleep( - self.files_running_time, self.max_files_per_second) + self.files_rate_limiter.wait() self.total_files_processed += 1 now = time.time() if now - self.last_logged >= self.log_time: @@ -266,10 +267,7 @@ class AuditorWorker(object): with closing(reader): for chunk in reader: chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) + self.bytes_rate_limiter.wait(incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len for watcher in self.watchers: diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 01c609f13..2ee7c35fa 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -33,7 +33,8 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ - non_negative_float, config_positive_int_value, non_negative_int + non_negative_float, config_positive_int_value, non_negative_int, \ + EventletRateLimiter from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY -class RateLimiterBucket(object): - def __init__(self, update_delta): - self.update_delta = update_delta - self.last_time = 0 +class RateLimiterBucket(EventletRateLimiter): + """ + Extends EventletRateLimiter to also maintain a deque of items that have + been deferred due to rate-limiting, and to provide a comparator for sorting + instanced by readiness. + """ + def __init__(self, max_updates_per_second): + super(RateLimiterBucket, self).__init__(max_updates_per_second, + rate_buffer=0) self.deque = deque() - @property - def wait_until(self): - return self.last_time + self.update_delta - def __len__(self): return len(self.deque) @@ -62,10 +64,10 @@ class RateLimiterBucket(object): __nonzero__ = __bool__ # py2 def __lt__(self, other): - # used to sort buckets by readiness + # used to sort RateLimiterBuckets by readiness if isinstance(other, RateLimiterBucket): - return self.wait_until < other.wait_until - return self.wait_until < other + return self.running_time < other.running_time + return self.running_time < other class BucketizedUpdateSkippingLimiter(object): @@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object): self.stats = stats # if we want a smaller "blast radius" we could make this number bigger self.num_buckets = max(num_buckets, 1) - try: - self.bucket_update_delta = 1.0 / max_elements_per_group_per_second - except ZeroDivisionError: - self.bucket_update_delta = -1 self.max_deferred_elements = max_deferred_elements self.deferred_buckets = deque() self.drain_until = drain_until self.salt = str(uuid.uuid4()) - self.buckets = [RateLimiterBucket(self.bucket_update_delta) + self.buckets = [RateLimiterBucket(max_elements_per_group_per_second) for _ in range(self.num_buckets)] self.buckets_ordered_by_readiness = None @@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object): for update_ctx in self.iterator: bucket = self.buckets[self._bucket_key(update_ctx['update'])] now = self._get_time() - if now >= bucket.wait_until: + if bucket.is_allowed(now=now): # no need to ratelimit, just return next update - bucket.last_time = now return update_ctx self.stats.deferrals += 1 @@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object): bucket = self.buckets_ordered_by_readiness.get_nowait() if now < self.drain_until: # wait for next element to be ready - time.sleep(max(0, bucket.wait_until - now)) + bucket.wait(now=now) # drain the most recently deferred element item = bucket.deque.pop() if bucket: # bucket has more deferred elements, re-insert in queue in # correct chronological position - bucket.last_time = self._get_time() self.buckets_ordered_by_readiness.put(bucket) self.stats.drains += 1 self.logger.increment("drains") |