summaryrefslogtreecommitdiff
path: root/swift/obj
diff options
context:
space:
mode:
Diffstat (limited to 'swift/obj')
-rw-r--r--swift/obj/auditor.py16
-rw-r--r--swift/obj/updater.py38
2 files changed, 24 insertions, 30 deletions
diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py
index e0d95bb4d..f9013748a 100644
--- a/swift/obj/auditor.py
+++ b/swift/obj/auditor.py
@@ -30,7 +30,7 @@ from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES
from swift.common.utils import (
config_auto_int_value, dump_recon_cache, get_logger, list_from_csv,
- listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep,
+ listdir, load_pkg_resource, parse_prefixed_conf, EventletRateLimiter,
readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter)
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
@@ -85,8 +85,10 @@ class AuditorWorker(object):
self.auditor_type = 'ZBF'
self.log_time = int(conf.get('log_time', 3600))
self.last_logged = 0
- self.files_running_time = 0
- self.bytes_running_time = 0
+ self.files_rate_limiter = EventletRateLimiter(
+ self.max_files_per_second)
+ self.bytes_rate_limiter = EventletRateLimiter(
+ self.max_bytes_per_second)
self.bytes_processed = 0
self.total_bytes_processed = 0
self.total_files_processed = 0
@@ -146,8 +148,7 @@ class AuditorWorker(object):
loop_time = time.time()
self.failsafe_object_audit(location)
self.logger.timing_since('timing', loop_time)
- self.files_running_time = ratelimit_sleep(
- self.files_running_time, self.max_files_per_second)
+ self.files_rate_limiter.wait()
self.total_files_processed += 1
now = time.time()
if now - self.last_logged >= self.log_time:
@@ -266,10 +267,7 @@ class AuditorWorker(object):
with closing(reader):
for chunk in reader:
chunk_len = len(chunk)
- self.bytes_running_time = ratelimit_sleep(
- self.bytes_running_time,
- self.max_bytes_per_second,
- incr_by=chunk_len)
+ self.bytes_rate_limiter.wait(incr_by=chunk_len)
self.bytes_processed += chunk_len
self.total_bytes_processed += chunk_len
for watcher in self.watchers:
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index 01c609f13..2ee7c35fa 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -33,7 +33,8 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
- non_negative_float, config_positive_int_value, non_negative_int
+ non_negative_float, config_positive_int_value, non_negative_int, \
+ EventletRateLimiter
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@@ -43,16 +44,17 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
-class RateLimiterBucket(object):
- def __init__(self, update_delta):
- self.update_delta = update_delta
- self.last_time = 0
+class RateLimiterBucket(EventletRateLimiter):
+ """
+ Extends EventletRateLimiter to also maintain a deque of items that have
+ been deferred due to rate-limiting, and to provide a comparator for sorting
+ instanced by readiness.
+ """
+ def __init__(self, max_updates_per_second):
+ super(RateLimiterBucket, self).__init__(max_updates_per_second,
+ rate_buffer=0)
self.deque = deque()
- @property
- def wait_until(self):
- return self.last_time + self.update_delta
-
def __len__(self):
return len(self.deque)
@@ -62,10 +64,10 @@ class RateLimiterBucket(object):
__nonzero__ = __bool__ # py2
def __lt__(self, other):
- # used to sort buckets by readiness
+ # used to sort RateLimiterBuckets by readiness
if isinstance(other, RateLimiterBucket):
- return self.wait_until < other.wait_until
- return self.wait_until < other
+ return self.running_time < other.running_time
+ return self.running_time < other
class BucketizedUpdateSkippingLimiter(object):
@@ -124,15 +126,11 @@ class BucketizedUpdateSkippingLimiter(object):
self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
- try:
- self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
- except ZeroDivisionError:
- self.bucket_update_delta = -1
self.max_deferred_elements = max_deferred_elements
self.deferred_buckets = deque()
self.drain_until = drain_until
self.salt = str(uuid.uuid4())
- self.buckets = [RateLimiterBucket(self.bucket_update_delta)
+ self.buckets = [RateLimiterBucket(max_elements_per_group_per_second)
for _ in range(self.num_buckets)]
self.buckets_ordered_by_readiness = None
@@ -151,9 +149,8 @@ class BucketizedUpdateSkippingLimiter(object):
for update_ctx in self.iterator:
bucket = self.buckets[self._bucket_key(update_ctx['update'])]
now = self._get_time()
- if now >= bucket.wait_until:
+ if bucket.is_allowed(now=now):
# no need to ratelimit, just return next update
- bucket.last_time = now
return update_ctx
self.stats.deferrals += 1
@@ -194,13 +191,12 @@ class BucketizedUpdateSkippingLimiter(object):
bucket = self.buckets_ordered_by_readiness.get_nowait()
if now < self.drain_until:
# wait for next element to be ready
- time.sleep(max(0, bucket.wait_until - now))
+ bucket.wait(now=now)
# drain the most recently deferred element
item = bucket.deque.pop()
if bucket:
# bucket has more deferred elements, re-insert in queue in
# correct chronological position
- bucket.last_time = self._get_time()
self.buckets_ordered_by_readiness.put(bucket)
self.stats.drains += 1
self.logger.increment("drains")