summaryrefslogtreecommitdiff
path: root/swift/obj
diff options
context:
space:
mode:
authorAlistair Coles <alistairncoles@gmail.com>2022-03-23 18:26:50 +0000
committerAlistair Coles <alistairncoles@gmail.com>2022-05-04 11:22:50 +0100
commit5227cb702b744bbe9aaecfff002604be45f64a8c (patch)
tree602bae1a65d9b331849a91fa8563ffc79398aad5 /swift/obj
parent954032d5d245411e676a5722b7e581c97a0aaaf7 (diff)
downloadswift-5227cb702b744bbe9aaecfff002604be45f64a8c.tar.gz
Refactor rate-limiting helper into a class
Replaces the ratelimit_sleep helper function with an EventletRateLimiter class that encapsulates the rate-limiting state that previously needed to be maintained by the caller of the function. The ratelimit_sleep function is retained but deprecated, and now forwards to the EventletRateLimiter class. The object updater's BucketizedUpdateSkippingLimiter is refactored to take advantage of the new EventletRateLimiter class. The rate limiting algorithm is corrected to make the allowed request rate more uniform: previously pairs of requests would be allowed in rapid succession before the rate limiter would the sleep for the time allowance consumed by those two requests; now the rate limiter will sleep as required after each allowed request. For example, before a max_rate of 1 per second might result in 2 requests being allowed followed by a 2 second sleep. That is corrected to be a sleep of 1 second after each request. Change-Id: Ibcf4dbeb4332dee7e9e233473d4ceaf75a5a85c7
Diffstat (limited to 'swift/obj')
-rw-r--r--swift/obj/auditor.py16
-rw-r--r--swift/obj/updater.py38
2 files changed, 24 insertions, 30 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index e0d95bb4d..f9013748a 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -30,7 +30,7 @@ from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
from swift.common.utils import (
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
- listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
+ listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter,
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
@@ -85,8 +85,10 @@ class AuditorWorker(object):
self.auditor_type = 'ZBF'
self.log_time = int(conf.get('log_time', 3600))
self.last_logged = 0
- self.files_running_time = 0
- self.bytes_running_time = 0
+ self.files_rate_limiter = EventletRateLimiter(
+ self.max_files_per_second)
+ self.bytes_rate_limiter = EventletRateLimiter(
+ self.max_bytes_per_second)
self.bytes_processed = 0
self.total_bytes_processed = 0
self.total_files_processed = 0
@@ -146,8 +148,7 @@ class AuditorWorker(object):
loop_time = time.time()
self.failsafe_object_audit(location)
self.logger.timing_since('timing', loop_time)
- self.files_running_time = ratelimit_sleep(
- self.files_running_time, self.max_files_per_second)
+ self.files_rate_limiter.wait()
self.total_files_processed += 1
now = time.time()
if now - self.last_logged >= self.log_time:
@@ -266,10 +267,7 @@ class AuditorWorker(object):
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
- self.bytes_running_time = ratelimit_sleep(
- self.bytes_running_time,
- self.max_bytes_per_second,
- incr_by=chunk_len)
+ self.bytes_rate_limiter.wait(incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
for watcher in self.watchers:
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 01c609f13..2ee7c35fa 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -33,7 +33,8 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
- non_negative_float, config_positive_int_value, non_negative_int
+ non_negative_float, config_positive_int_value, non_negative_int, \
+ EventletRateLimiter
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
-class RateLimiterBucket(object):
- def __init__(self, update_delta):
- self.update_delta = update_delta
- self.last_time = 0
+class RateLimiterBucket(EventletRateLimiter):
+ """
+ Extends EventletRateLimiter to also maintain a deque of items that have
+ been deferred due to rate-limiting, and to provide a comparator for sorting
+ instanced by readiness.
+ """
+ def __init__(self, max_updates_per_second):
+ super(RateLimiterBucket, self).__init__(max_updates_per_second,
+ rate_buffer=0)
self.deque = deque()
- @property
- def wait_until(self):
- return self.last_time + self.update_delta
-
def __len__(self):
return len(self.deque)
@@ -62,10 +64,10 @@ class RateLimiterBucket(object):
__nonzero__ = __bool__ # py2
def __lt__(self, other):
- # used to sort buckets by readiness
+ # used to sort RateLimiterBuckets by readiness
if isinstance(other, RateLimiterBucket):
- return self.wait_until < other.wait_until
- return self.wait_until < other
+ return self.running_time < other.running_time
+ return self.running_time < other
class BucketizedUpdateSkippingLimiter(object):
@@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object):
self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
- try:
- self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
- except ZeroDivisionError:
- self.bucket_update_delta = -1
self.max_deferred_elements = max_deferred_elements
self.deferred_buckets = deque()
self.drain_until = drain_until
self.salt = str(uuid.uuid4())
- self.buckets = [RateLimiterBucket(self.bucket_update_delta)
+ self.buckets = [RateLimiterBucket(max_elements_per_group_per_second)
for _ in range(self.num_buckets)]
self.buckets_ordered_by_readiness = None
@@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object):
for update_ctx in self.iterator:
bucket = self.buckets[self._bucket_key(update_ctx['update'])]
now = self._get_time()
- if now >= bucket.wait_until:
+ if bucket.is_allowed(now=now):
# no need to ratelimit, just return next update
- bucket.last_time = now
return update_ctx
self.stats.deferrals += 1
@@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object):
bucket = self.buckets_ordered_by_readiness.get_nowait()
if now < self.drain_until:
# wait for next element to be ready
- time.sleep(max(0, bucket.wait_until - now))
+ bucket.wait(now=now)
# drain the most recently deferred element
item = bucket.deque.pop()
if bucket:
# bucket has more deferred elements, re-insert in queue in
# correct chronological position
- bucket.last_time = self._get_time()
self.buckets_ordered_by_readiness.put(bucket)
self.stats.drains += 1
self.logger.increment("drains")