diff options
Diffstat (limited to 'swift/obj')
-rw-r--r-- | swift/obj/auditor.py | 16 | ||||
-rw-r--r-- | swift/obj/updater.py | 38 |
2 files changed, 24 insertions, 30 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index e0d95bb4d..f9013748a 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -30,7 +30,7 @@ from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES from swift.common.utils import ( config_auto_int_value, dump_recon_cache, get_logger, list_from_csv, - listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep, + listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter, readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter) from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH @@ -85,8 +85,10 @@ class AuditorWorker(object): self.auditor_type = 'ZBF' self.log_time = int(conf.get('log_time', 3600)) self.last_logged = 0 - self.files_running_time = 0 - self.bytes_running_time = 0 + self.files_rate_limiter = EventletRateLimiter( + self.max_files_per_second) + self.bytes_rate_limiter = EventletRateLimiter( + self.max_bytes_per_second) self.bytes_processed = 0 self.total_bytes_processed = 0 self.total_files_processed = 0 @@ -146,8 +148,7 @@ class AuditorWorker(object): loop_time = time.time() self.failsafe_object_audit(location) self.logger.timing_since('timing', loop_time) - self.files_running_time = ratelimit_sleep( - self.files_running_time, self.max_files_per_second) + self.files_rate_limiter.wait() self.total_files_processed += 1 now = time.time() if now - self.last_logged >= self.log_time: @@ -266,10 +267,7 @@ class AuditorWorker(object): with closing(reader): for chunk in reader: chunk_len = len(chunk) - self.bytes_running_time = ratelimit_sleep( - self.bytes_running_time, - self.max_bytes_per_second, - incr_by=chunk_len) + self.bytes_rate_limiter.wait(incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len for watcher in self.watchers: diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 01c609f13..2ee7c35fa 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -33,7 +33,8 @@ from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \ - non_negative_float, config_positive_int_value, non_negative_int + non_negative_float, config_positive_int_value, non_negative_int, \ + EventletRateLimiter from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \ HTTP_MOVED_PERMANENTLY -class RateLimiterBucket(object): - def __init__(self, update_delta): - self.update_delta = update_delta - self.last_time = 0 +class RateLimiterBucket(EventletRateLimiter): + """ + Extends EventletRateLimiter to also maintain a deque of items that have + been deferred due to rate-limiting, and to provide a comparator for sorting + instanced by readiness. + """ + def __init__(self, max_updates_per_second): + super(RateLimiterBucket, self).__init__(max_updates_per_second, + rate_buffer=0) self.deque = deque() - @property - def wait_until(self): - return self.last_time + self.update_delta - def __len__(self): return len(self.deque) @@ -62,10 +64,10 @@ class RateLimiterBucket(object): __nonzero__ = __bool__ # py2 def __lt__(self, other): - # used to sort buckets by readiness + # used to sort RateLimiterBuckets by readiness if isinstance(other, RateLimiterBucket): - return self.wait_until < other.wait_until - return self.wait_until < other + return self.running_time < other.running_time + return self.running_time < other class BucketizedUpdateSkippingLimiter(object): @@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object): self.stats = stats # if we want a smaller "blast radius" we could make this number bigger self.num_buckets = max(num_buckets, 1) - try: - self.bucket_update_delta = 1.0 / max_elements_per_group_per_second - except ZeroDivisionError: - self.bucket_update_delta = -1 self.max_deferred_elements = max_deferred_elements self.deferred_buckets = deque() self.drain_until = drain_until self.salt = str(uuid.uuid4()) - self.buckets = [RateLimiterBucket(self.bucket_update_delta) + self.buckets = [RateLimiterBucket(max_elements_per_group_per_second) for _ in range(self.num_buckets)] self.buckets_ordered_by_readiness = None @@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object): for update_ctx in self.iterator: bucket = self.buckets[self._bucket_key(update_ctx['update'])] now = self._get_time() - if now >= bucket.wait_until: + if bucket.is_allowed(now=now): # no need to ratelimit, just return next update - bucket.last_time = now return update_ctx self.stats.deferrals += 1 @@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object): bucket = self.buckets_ordered_by_readiness.get_nowait() if now < self.drain_until: # wait for next element to be ready - time.sleep(max(0, bucket.wait_until - now)) + bucket.wait(now=now) # drain the most recently deferred element item = bucket.deque.pop() if bucket: # bucket has more deferred elements, re-insert in queue in # correct chronological position - bucket.last_time = self._get_time() self.buckets_ordered_by_readiness.put(bucket) self.stats.drains += 1 self.logger.increment("drains") |