summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlistair Coles <alistairncoles@gmail.com>2022-01-10 11:52:49 +0000
committerAlistair Coles <alistairncoles@gmail.com>2022-02-21 10:56:23 +0000
commit51da2543ca532204b5b141948dde3a6216c41cf8 (patch)
treea929a31c81b4771575756ee6711a5c9ad6d53a9e
parentde888629817a2a326b6e8dc66edb0ce3168818a7 (diff)
downloadswift-51da2543ca532204b5b141948dde3a6216c41cf8.tar.gz
object-updater: defer ratelimited updates
Previously, objects updates that could not be sent immediately due to per-container/bucket ratelimiting [1] would be skipped and re-tried during the next updater cycle. There could potentially be a period of time at the end of a cycle when the updater slept, having completed a sweep of the on-disk async pending files, despite having skipped updates during the cycle. Skipped updates would then be read from disk again during the next cycle. With this change the updater will defer skipped updates to an in-memory queue (up to a configurable maximum number) until the sweep of async pending files has completed, and then trickle out deferred updates until the cycle's interval expires. This increases the useful work done in the current cycle and reduces the amount of repeated disk IO during the next cycle. The deferrals queue is bounded in size and will evict least recently read updates in order to accept more recently read updates. This reduces the probablility that a deferred update has been made obsolete by newer on-disk async pending files while waiting in the deferrals queue. The deferrals queue is implemented as a collection of per-bucket queues so that updates can be drained from the queues in the order that buckets cease to be ratelimited. [1] Related-Change: Idef25cd6026b02c1b5c10a9816c8c6cbe505e7ed Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Matthew Oliver <matt@oliver.net.au> Change-Id: I95e58df9f15c5f9d552b8f4c4989a474f52262f4
-rw-r--r--etc/object-server.conf-sample6
-rw-r--r--swift/obj/updater.py222
-rw-r--r--test/debug_logger.py3
-rw-r--r--test/unit/obj/test_updater.py709
4 files changed, 880 insertions, 60 deletions
diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample
index 5329bca8f..9f9f64515 100644
--- a/etc/object-server.conf-sample
+++ b/etc/object-server.conf-sample
@@ -483,6 +483,12 @@ use = egg:swift#recon
# be an integer greater than 0.
# per_container_ratelimit_buckets = 1000
#
+# Updates that cannot be sent due to per-container rate-limiting may be
+# deferred and re-tried at the end of the updater cycle. This option constrains
+# the size of the in-memory data structure used to store deferred updates.
+# Must be an integer value greater than or equal to 0.
+# max_deferred_updates = 10000
+#
# slowdown will sleep that amount between objects. Deprecated; use
# objects_per_second instead.
# slowdown = 0.01
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index ae42e95cd..72c48cc38 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -12,6 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from six.moves import queue
import six.moves.cPickle as pickle
import errno
@@ -21,6 +22,7 @@ import sys
import time
import uuid
from random import random, shuffle
+from collections import deque
from eventlet import spawn, Timeout
@@ -31,7 +33,7 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
- non_negative_float, config_positive_int_value
+ non_negative_float, config_positive_int_value, non_negative_int
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@@ -41,33 +43,98 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
+class RateLimiterBucket(object):
+ def __init__(self, update_delta):
+ self.update_delta = update_delta
+ self.last_time = 0
+ self.deque = deque()
+
+ @property
+ def wait_until(self):
+ return self.last_time + self.update_delta
+
+ def __len__(self):
+ return len(self.deque)
+
+ def __bool__(self):
+ return bool(self.deque)
+
+ __nonzero__ = __bool__ # py2
+
+ def __lt__(self, other):
+ # used to sort buckets by readiness
+ if isinstance(other, RateLimiterBucket):
+ return self.wait_until < other.wait_until
+ return self.wait_until < other
+
+
class BucketizedUpdateSkippingLimiter(object):
"""
- Wrap an iterator to filter elements that show up too often.
+ Wrap an iterator to rate-limit updates on a per-bucket basis, where updates
+ are mapped to buckets by hashing their destination path. If an update is
+ rate-limited then it is placed on a deferral queue and may be sent later if
+ the wrapped iterator is exhausted before the ``drain_until`` time is
+ reached.
+
+ The deferral queue has constrained size and once the queue is full updates
+ are evicted using a first-in-first-out policy. This policy is used because
+ updates on the queue may have been made obsolete by newer updates written
+ to disk, and this is more likely for updates that have been on the queue
+ longest.
+
+ The iterator increments stats as follows:
+
+ * The `deferrals` stat is incremented for each update that is
+ rate-limited. Note that a individual update is rate-limited at most
+ once.
+ * The `skips` stat is incremented for each rate-limited update that is
+ not eventually yielded. This includes updates that are evicted from the
+ deferral queue and all updates that remain in the deferral queue when
+ ``drain_until`` time is reached and the iterator terminates.
+ * The `drains` stat is incremented for each rate-limited update that is
+ eventually yielded.
+
+ Consequently, when this iterator terminates, the sum of `skips` and
+ `drains` is equal to the number of `deferrals`.
:param update_iterable: an async_pending update iterable
+ :param logger: a logger instance
+ :param stats: a SweepStats instance
:param num_buckets: number of buckets to divide container hashes into, the
more buckets total the less containers to a bucket
(once a busy container slows down a bucket the whole
- bucket starts skipping)
- :param max_elements_per_group_per_second: tunable, when skipping kicks in
- :param skip_f: function to call with update_ctx when skipping it
+ bucket starts deferring)
+ :param max_elements_per_group_per_second: tunable, when deferring kicks in
+ :param max_deferred_elements: maximum number of deferred elements before
+ skipping starts. Each bucket may defer updates, but once the total
+ number of deferred updates summed across all buckets reaches this
+ value then all buckets will skip subsequent updates.
+ :param drain_until: time at which any remaining deferred elements must be
+ skipped and the iterator stops. Once the wrapped iterator has been
+ exhausted, this iterator will drain deferred elements from its buckets
+ until either all buckets have drained or this time is reached.
"""
- def __init__(self, update_iterable, num_buckets,
- max_elements_per_group_per_second,
- skip_f=lambda update_ctx: None):
+ def __init__(self, update_iterable, logger, stats, num_buckets=1000,
+ max_elements_per_group_per_second=50,
+ max_deferred_elements=0,
+ drain_until=0):
self.iterator = iter(update_iterable)
+ self.logger = logger
+ self.stats = stats
# if we want a smaller "blast radius" we could make this number bigger
self.num_buckets = max(num_buckets, 1)
- # an array might be more efficient; but this is pretty cheap
- self.next_update = [0.0 for _ in range(self.num_buckets)]
try:
self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
except ZeroDivisionError:
self.bucket_update_delta = -1
- self.skip_f = skip_f
+ self.max_deferred_elements = max_deferred_elements
+ self.deferred_buckets = deque()
+ self.drain_until = drain_until
self.salt = str(uuid.uuid4())
+ self.buckets = [RateLimiterBucket(self.bucket_update_delta)
+ for _ in range(self.num_buckets)]
+ self.buckets_ordered_by_readiness = None
def __iter__(self):
return self
@@ -76,15 +143,77 @@ class BucketizedUpdateSkippingLimiter(object):
acct, cont = split_update_path(update)
return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets
+ def _get_time(self):
+ return time.time()
+
def next(self):
+ # first iterate over the wrapped iterator...
for update_ctx in self.iterator:
- bucket_key = self._bucket_key(update_ctx['update'])
- now = time.time()
- if self.next_update[bucket_key] > now:
- self.skip_f(update_ctx)
- continue
- self.next_update[bucket_key] = now + self.bucket_update_delta
- return update_ctx
+ bucket = self.buckets[self._bucket_key(update_ctx['update'])]
+ now = self._get_time()
+ if now >= bucket.wait_until:
+ # no need to ratelimit, just return next update
+ bucket.last_time = now
+ return update_ctx
+
+ self.stats.deferrals += 1
+ self.logger.increment("deferrals")
+ if self.max_deferred_elements > 0:
+ if len(self.deferred_buckets) >= self.max_deferred_elements:
+ # create space to defer this update by popping the least
+ # recent deferral from the least recently deferred bucket;
+ # updates read from disk recently are preferred over those
+ # read from disk less recently.
+ oldest_deferred_bucket = self.deferred_buckets.popleft()
+ oldest_deferred_bucket.deque.popleft()
+ self.stats.skips += 1
+ self.logger.increment("skips")
+ # append the update to the bucket's queue and append the bucket
+ # to the queue of deferred buckets
+ # note: buckets may have multiple entries in deferred_buckets,
+ # one for each deferred update in that particular bucket
+ bucket.deque.append(update_ctx)
+ self.deferred_buckets.append(bucket)
+ else:
+ self.stats.skips += 1
+ self.logger.increment("skips")
+
+ if self.buckets_ordered_by_readiness is None:
+ # initialise a queue of those buckets with deferred elements;
+ # buckets are queued in the chronological order in which they are
+ # ready to serve an element
+ self.buckets_ordered_by_readiness = queue.PriorityQueue()
+ for bucket in self.buckets:
+ if bucket:
+ self.buckets_ordered_by_readiness.put(bucket)
+
+ # now drain the buckets...
+ undrained_elements = []
+ while not self.buckets_ordered_by_readiness.empty():
+ now = self._get_time()
+ bucket = self.buckets_ordered_by_readiness.get_nowait()
+ if now < self.drain_until:
+ # wait for next element to be ready
+ time.sleep(max(0, bucket.wait_until - now))
+ # drain the most recently deferred element
+ item = bucket.deque.pop()
+ if bucket:
+ # bucket has more deferred elements, re-insert in queue in
+ # correct chronological position
+ bucket.last_time = self._get_time()
+ self.buckets_ordered_by_readiness.put(bucket)
+ self.stats.drains += 1
+ self.logger.increment("drains")
+ return item
+ else:
+ # time to stop iterating: gather all un-drained elements
+ undrained_elements.extend(bucket.deque)
+
+ if undrained_elements:
+ # report final batch of skipped elements
+ self.stats.skips += len(undrained_elements)
+ self.logger.update_stats("skips", len(undrained_elements))
+
raise StopIteration()
__next__ = next
@@ -93,9 +222,18 @@ class BucketizedUpdateSkippingLimiter(object):
class SweepStats(object):
"""
Stats bucket for an update sweep
+
+ A measure of the rate at which updates are being rate-limited is:
+
+ deferrals / (deferrals + successes + failures - drains)
+
+ A measure of the rate at which updates are not being sent during a sweep
+ is:
+
+ skips / (skips + successes + failures)
"""
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
- unlinks=0, redirects=0, skips=0):
+ unlinks=0, redirects=0, skips=0, deferrals=0, drains=0):
self.errors = errors
self.failures = failures
self.quarantines = quarantines
@@ -103,10 +241,13 @@ class SweepStats(object):
self.unlinks = unlinks
self.redirects = redirects
self.skips = skips
+ self.deferrals = deferrals
+ self.drains = drains
def copy(self):
return type(self)(self.errors, self.failures, self.quarantines,
- self.successes, self.unlinks)
+ self.successes, self.unlinks, self.redirects,
+ self.skips, self.deferrals, self.drains)
def since(self, other):
return type(self)(self.errors - other.errors,
@@ -115,7 +256,9 @@ class SweepStats(object):
self.successes - other.successes,
self.unlinks - other.unlinks,
self.redirects - other.redirects,
- self.skips - other.skips)
+ self.skips - other.skips,
+ self.deferrals - other.deferrals,
+ self.drains - other.drains)
def reset(self):
self.errors = 0
@@ -125,6 +268,8 @@ class SweepStats(object):
self.unlinks = 0
self.redirects = 0
self.skips = 0
+ self.deferrals = 0
+ self.drains = 0
def __str__(self):
keys = (
@@ -135,6 +280,8 @@ class SweepStats(object):
(self.errors, 'errors'),
(self.redirects, 'redirects'),
(self.skips, 'skips'),
+ (self.deferrals, 'deferrals'),
+ (self.drains, 'drains'),
)
return ', '.join('%d %s' % pair for pair in keys)
@@ -191,6 +338,9 @@ class ObjectUpdater(Daemon):
DEFAULT_RECON_CACHE_PATH)
self.rcache = os.path.join(self.recon_cache_path, RECON_OBJECT_FILE)
self.stats = SweepStats()
+ self.max_deferred_updates = non_negative_int(
+ conf.get('max_deferred_updates', 10000))
+ self.begin = time.time()
def _listdir(self, path):
try:
@@ -214,7 +364,7 @@ class ObjectUpdater(Daemon):
time.sleep(random() * self.interval)
while True:
self.logger.info('Begin object update sweep')
- begin = time.time()
+ self.begin = time.time()
pids = []
# read from container ring to ensure it's fresh
self.get_container_ring().get_nodes('')
@@ -248,7 +398,7 @@ class ObjectUpdater(Daemon):
sys.exit()
while pids:
pids.remove(os.wait()[0])
- elapsed = time.time() - begin
+ elapsed = time.time() - self.begin
self.logger.info('Object update sweep completed: %.02fs',
elapsed)
dump_recon_cache({'object_updater_sweep': elapsed},
@@ -259,7 +409,7 @@ class ObjectUpdater(Daemon):
def run_once(self, *args, **kwargs):
"""Run the updater once."""
self.logger.info('Begin object update single threaded sweep')
- begin = time.time()
+ self.begin = time.time()
self.stats.reset()
for device in self._listdir(self.devices):
try:
@@ -271,7 +421,7 @@ class ObjectUpdater(Daemon):
self.logger.warning('Skipping: %s', err)
continue
self.object_sweep(dev_path)
- elapsed = time.time() - begin
+ elapsed = time.time() - self.begin
self.logger.info(
('Object update single-threaded sweep completed: '
'%(elapsed).02fs, %(stats)s'),
@@ -404,18 +554,15 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
- def skip_counting_f(update_ctx):
- # in the future we could defer update_ctx
- self.stats.skips += 1
- self.logger.increment("skips")
-
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
ap_iter = BucketizedUpdateSkippingLimiter(
- ap_iter, self.per_container_ratelimit_buckets,
+ ap_iter, self.logger, self.stats,
+ self.per_container_ratelimit_buckets,
self.max_objects_per_container_per_second,
- skip_f=skip_counting_f)
+ max_deferred_elements=self.max_deferred_updates,
+ drain_until=self.begin + self.interval)
with ContextPool(self.concurrency) as pool:
for update_ctx in ap_iter:
pool.spawn(self.process_object_update, **update_ctx)
@@ -440,8 +587,10 @@ class ObjectUpdater(Daemon):
'%(successes)d successes, %(failures)d failures, '
'%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors, '
- '%(redirects)d redirects '
- '%(skips)d skips '
+ '%(redirects)d redirects, '
+ '%(skips)d skips, '
+ '%(deferrals)d deferrals, '
+ '%(drains)d drains '
'(pid: %(pid)d)'),
{'device': device,
'elapsed': time.time() - start_time,
@@ -452,7 +601,10 @@ class ObjectUpdater(Daemon):
'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors,
'redirects': sweep_totals.redirects,
- 'skips': sweep_totals.skips})
+ 'skips': sweep_totals.skips,
+ 'deferrals': sweep_totals.deferrals,
+ 'drains': sweep_totals.drains
+ })
def process_object_update(self, update_path, device, policy, update,
**kwargs):
diff --git a/test/debug_logger.py b/test/debug_logger.py
index 99f7ec3a5..e1fc84e69 100644
--- a/test/debug_logger.py
+++ b/test/debug_logger.py
@@ -141,6 +141,9 @@ class FakeLogger(logging.Logger, CaptureLog):
counts[metric] += 1
return counts
+ def get_update_stats(self):
+ return [call[0] for call in self.log_dict['update_stats']]
+
def setFormatter(self, obj):
self.formatter = obj
diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py
index 0bba5beaa..c0fd60cf8 100644
--- a/test/unit/obj/test_updater.py
+++ b/test/unit/obj/test_updater.py
@@ -12,6 +12,8 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from queue import PriorityQueue
+
import eventlet
import six.moves.cPickle as pickle
import mock
@@ -127,6 +129,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_second, 50.0)
self.assertEqual(daemon.max_objects_per_container_per_second, 0.0)
self.assertEqual(daemon.per_container_ratelimit_buckets, 1000)
+ self.assertEqual(daemon.max_deferred_updates, 10000)
# non-defaults
conf = {
@@ -139,6 +142,7 @@ class TestObjectUpdater(unittest.TestCase):
'objects_per_second': '10.5',
'max_objects_per_container_per_second': '1.2',
'per_container_ratelimit_buckets': '100',
+ 'max_deferred_updates': '0',
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.assertEqual(daemon.devices, '/some/where/else')
@@ -150,6 +154,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.max_objects_per_second, 10.5)
self.assertEqual(daemon.max_objects_per_container_per_second, 1.2)
self.assertEqual(daemon.per_container_ratelimit_buckets, 100)
+ self.assertEqual(daemon.max_deferred_updates, 0)
# check deprecated option
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
@@ -171,6 +176,9 @@ class TestObjectUpdater(unittest.TestCase):
check_bad({'per_container_ratelimit_buckets': '0'})
check_bad({'per_container_ratelimit_buckets': '-1'})
check_bad({'per_container_ratelimit_buckets': 'auto'})
+ check_bad({'max_deferred_updates': '-1'})
+ check_bad({'max_deferred_updates': '1.1'})
+ check_bad({'max_deferred_updates': 'auto'})
@mock.patch('os.listdir')
def test_listdir_with_exception(self, mock_listdir):
@@ -1267,6 +1275,8 @@ class TestObjectUpdater(unittest.TestCase):
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 1,
+ 'max_deferred_updates': 0, # do not re-iterate
+ 'concurrency': 1
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
@@ -1299,6 +1309,14 @@ class TestObjectUpdater(unittest.TestCase):
for req in fake_conn.requests),
{'/sda1/%s/a/c1' % c1_part: 3,
'/sda1/%s/.shards_a/c2_shard' % c2_part: 3})
+ info_lines = self.logger.get_lines_for_level('info')
+ self.assertTrue(info_lines)
+ self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, '
+ '0 errors, 0 redirects, 9 skips, 9 deferrals, 0 drains',
+ info_lines[-1])
+ self.assertEqual({'skips': 9, 'successes': 2, 'unlinks': 2,
+ 'deferrals': 9},
+ self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
def test_per_container_rate_limit_unlimited(self, mock_recon):
@@ -1330,14 +1348,24 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(expected_total, daemon.stats.successes)
self.assertEqual(0, daemon.stats.skips)
self.assertEqual([], self._find_async_pending_files())
+ info_lines = self.logger.get_lines_for_level('info')
+ self.assertTrue(info_lines)
+ self.assertIn('11 successes, 0 failures, 0 quarantines, 11 unlinks, '
+ '0 errors, 0 redirects, 0 skips, 0 deferrals, 0 drains',
+ info_lines[-1])
+ self.assertEqual({'successes': 11, 'unlinks': 11},
+ self.logger.get_increment_counts())
@mock.patch('swift.obj.updater.dump_recon_cache')
- def test_per_container_rate_limit_slow_responses(self, mock_recon):
+ def test_per_container_rate_limit_some_limited(self, mock_recon):
+ # simulate delays between buckets being fed so that only some updates
+ # are skipped
conf = {
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'max_objects_per_container_per_second': 10,
+ 'max_deferred_updates': 0, # do not re-iterate
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
@@ -1347,27 +1375,424 @@ class TestObjectUpdater(unittest.TestCase):
for i in range(num_c1_files):
obj_name = 'o%02d' % i
self._make_async_pending_pickle('a', 'c1', obj_name)
+ c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
expected_total = num_c1_files
self.assertEqual(expected_total,
len(self._find_async_pending_files()))
- latencies = [.11, 0, .11, 0]
+ # first one always succeeds, second is skipped because it is only 0.05s
+ # behind the first, second succeeds because it is 0.11 behind the
+ # first, fourth is skipped
+ latencies = [0, 0.05, .051, 0]
expected_success = 2
fake_status_codes = [200] * 3 * expected_success
- def fake_spawn(pool, *args, **kwargs):
- # make each update delay the iter being called again
+ contexts_fed_in = []
+
+ def ratelimit_if(value):
+ contexts_fed_in.append(value)
+ # make each update delay before the iter being called again
eventlet.sleep(latencies.pop(0))
- return args[0](*args[1:], **kwargs)
+ return False # returning False overrides normal ratelimiting
- with mocked_http_conn(*fake_status_codes):
- with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn):
- daemon.run_once()
+ orig_rate_limited_iterator = utils.RateLimitedIterator
+
+ def fake_rate_limited_iterator(*args, **kwargs):
+ # insert our own rate limiting function
+ kwargs['ratelimit_if'] = ratelimit_if
+ return orig_rate_limited_iterator(*args, **kwargs)
+
+ with mocked_http_conn(*fake_status_codes) as fake_conn, \
+ mock.patch('swift.obj.updater.RateLimitedIterator',
+ fake_rate_limited_iterator):
+ daemon.run_once()
+ self.assertEqual(expected_success, daemon.stats.successes)
+ expected_skipped = expected_total - expected_success
+ self.assertEqual(expected_skipped, daemon.stats.skips)
+ self.assertEqual(expected_skipped,
+ len(self._find_async_pending_files()))
+ paths_fed_in = ['/sda1/%(part)s/%(account)s/%(container)s/%(obj)s'
+ % dict(ctx['update'], part=c1_part)
+ for ctx in contexts_fed_in]
+ expected_update_paths = paths_fed_in[:1] * 3 + paths_fed_in[2:3] * 3
+ actual_update_paths = [req['path'] for req in fake_conn.requests]
+ self.assertEqual(expected_update_paths, actual_update_paths)
+ info_lines = self.logger.get_lines_for_level('info')
+ self.assertTrue(info_lines)
+ self.assertIn('2 successes, 0 failures, 0 quarantines, 2 unlinks, '
+ '0 errors, 0 redirects, 2 skips, 2 deferrals, 0 drains',
+ info_lines[-1])
+ self.assertEqual({'skips': 2, 'successes': 2, 'unlinks': 2,
+ 'deferrals': 2},
+ self.logger.get_increment_counts())
+
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_per_container_rate_limit_defer_2_skip_1(self, mock_recon):
+ # limit length of deferral queue so that some defer and some skip
+ conf = {
+ 'devices': self.devices_dir,
+ 'mount_check': 'false',
+ 'swift_dir': self.testdir,
+ 'max_objects_per_container_per_second': 10,
+ # only one bucket needed for test
+ 'per_container_ratelimit_buckets': 1,
+ 'max_deferred_updates': 1,
+ }
+ daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
+ self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
+ os.mkdir(self.async_dir)
+ # all updates for same container
+ num_c1_files = 4
+ for i in range(num_c1_files):
+ obj_name = 'o%02d' % i
+ self._make_async_pending_pickle('a', 'c1', obj_name)
+ c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
+ expected_total = num_c1_files
+ self.assertEqual(expected_total,
+ len(self._find_async_pending_files()))
+ # first succeeds, second is deferred, third succeeds, fourth is
+ # deferred and bumps second out of deferral queue, fourth is re-tried
+ latencies = [0, 0.05, .051, 0, 0, .11]
+ expected_success = 3
+
+ contexts_fed_in = []
+ captured_queues = []
+ captured_skips_stats = []
+
+ def ratelimit_if(value):
+ contexts_fed_in.append(value)
+ return False # returning False overrides normal ratelimiting
+
+ orig_rate_limited_iterator = utils.RateLimitedIterator
+
+ def fake_rate_limited_iterator(*args, **kwargs):
+ # insert our own rate limiting function
+ kwargs['ratelimit_if'] = ratelimit_if
+ return orig_rate_limited_iterator(*args, **kwargs)
+
+ now = [time()]
+
+ def fake_get_time(bucket_iter):
+ captured_skips_stats.append(
+ daemon.logger.get_increment_counts().get('skips', 0))
+ captured_queues.append(list(bucket_iter.buckets[0].deque))
+ # make each update delay before the iter being called again
+ now[0] += latencies.pop(0)
+ return now[0]
+
+ captured_updates = []
+
+ def fake_object_update(node, part, op, obj, *args, **kwargs):
+ captured_updates.append((node, part, op, obj))
+ return True, node['id'], False
+
+ with mock.patch(
+ 'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
+ fake_get_time), \
+ mock.patch.object(daemon, 'object_update',
+ fake_object_update), \
+ mock.patch('swift.obj.updater.RateLimitedIterator',
+ fake_rate_limited_iterator):
+ daemon.run_once()
+ self.assertEqual(expected_success, daemon.stats.successes)
+ expected_skipped = expected_total - expected_success
+ self.assertEqual(expected_skipped, daemon.stats.skips)
+ self.assertEqual(expected_skipped,
+ len(self._find_async_pending_files()))
+
+ orig_iteration = contexts_fed_in[:num_c1_files]
+ # we first capture every async fed in one by one
+ objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration]
+ self.assertEqual(num_c1_files, len(set(objs_fed_in)))
+ # keep track of this order for context
+ aorder = {ctx['update']['obj']: 'a%02d' % i
+ for i, ctx in enumerate(orig_iteration)}
+ expected_drops = (1,)
+ expected_updates_sent = []
+ for i, obj in enumerate(objs_fed_in):
+ if i in expected_drops:
+ continue
+ # triple replica, request to 3 nodes each obj!
+ expected_updates_sent.extend([obj] * 3)
+
+ actual_updates_sent = [
+ utils.split_path(update[3], minsegs=3)[-1]
+ for update in captured_updates
+ ]
+ self.assertEqual([aorder[o] for o in expected_updates_sent],
+ [aorder[o] for o in actual_updates_sent])
+
+ self.assertEqual([0, 0, 0, 0, 1], captured_skips_stats)
+
+ expected_deferrals = [
+ [],
+ [],
+ [objs_fed_in[1]],
+ [objs_fed_in[1]],
+ [objs_fed_in[3]],
+ ]
+ self.assertEqual(
+ expected_deferrals,
+ [[ctx['update']['obj'] for ctx in q] for q in captured_queues])
+ info_lines = self.logger.get_lines_for_level('info')
+ self.assertTrue(info_lines)
+ self.assertIn('3 successes, 0 failures, 0 quarantines, 3 unlinks, '
+ '0 errors, 0 redirects, 1 skips, 2 deferrals, 1 drains',
+ info_lines[-1])
+ self.assertEqual(
+ {'skips': 1, 'successes': 3, 'unlinks': 3, 'deferrals': 2,
+ 'drains': 1}, self.logger.get_increment_counts())
+
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_per_container_rate_limit_defer_3_skip_1(self, mock_recon):
+ # limit length of deferral queue so that some defer and some skip
+ conf = {
+ 'devices': self.devices_dir,
+ 'mount_check': 'false',
+ 'swift_dir': self.testdir,
+ 'max_objects_per_container_per_second': 10,
+ # only one bucket needed for test
+ 'per_container_ratelimit_buckets': 1,
+ 'max_deferred_updates': 2,
+ }
+ daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
+ self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
+ os.mkdir(self.async_dir)
+ # all updates for same container
+ num_c1_files = 5
+ for i in range(num_c1_files):
+ obj_name = 'o%02d' % i
+ self._make_async_pending_pickle('a', 'c1', obj_name)
+ c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
+ expected_total = num_c1_files
+ self.assertEqual(expected_total,
+ len(self._find_async_pending_files()))
+ # indexes 0, 2 succeed; 1, 3, 4 deferred but 1 is bumped from deferral
+ # queue by 4; 4, 3 are then drained
+ latencies = [0, 0.05, .051, 0, 0, 0, .11, .01]
+ expected_success = 4
+
+ contexts_fed_in = []
+ captured_queues = []
+ captured_skips_stats = []
+
+ def ratelimit_if(value):
+ contexts_fed_in.append(value)
+ return False # returning False overrides normal ratelimiting
+
+ orig_rate_limited_iterator = utils.RateLimitedIterator
+
+ def fake_rate_limited_iterator(*args, **kwargs):
+ # insert our own rate limiting function
+ kwargs['ratelimit_if'] = ratelimit_if
+ return orig_rate_limited_iterator(*args, **kwargs)
+
+ now = [time()]
+
+ def fake_get_time(bucket_iter):
+ captured_skips_stats.append(
+ daemon.logger.get_increment_counts().get('skips', 0))
+ captured_queues.append(list(bucket_iter.buckets[0].deque))
+ # make each update delay before the iter being called again
+ now[0] += latencies.pop(0)
+ return now[0]
+
+ captured_updates = []
+
+ def fake_object_update(node, part, op, obj, *args, **kwargs):
+ captured_updates.append((node, part, op, obj))
+ return True, node['id'], False
+
+ with mock.patch(
+ 'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
+ fake_get_time), \
+ mock.patch.object(daemon, 'object_update',
+ fake_object_update), \
+ mock.patch('swift.obj.updater.RateLimitedIterator',
+ fake_rate_limited_iterator), \
+ mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ daemon.run_once()
self.assertEqual(expected_success, daemon.stats.successes)
expected_skipped = expected_total - expected_success
self.assertEqual(expected_skipped, daemon.stats.skips)
self.assertEqual(expected_skipped,
len(self._find_async_pending_files()))
+ orig_iteration = contexts_fed_in[:num_c1_files]
+ # we first capture every async fed in one by one
+ objs_fed_in = [ctx['update']['obj'] for ctx in orig_iteration]
+ self.assertEqual(num_c1_files, len(set(objs_fed_in)))
+ # keep track of this order for context
+ aorder = {ctx['update']['obj']: 'a%02d' % i
+ for i, ctx in enumerate(orig_iteration)}
+ expected_updates_sent = []
+ for index_sent in (0, 2, 4, 3):
+ expected_updates_sent.extend(
+ [contexts_fed_in[index_sent]['update']['obj']] * 3)
+ actual_updates_sent = [
+ utils.split_path(update[3], minsegs=3)[-1]
+ for update in captured_updates
+ ]
+ self.assertEqual([aorder[o] for o in expected_updates_sent],
+ [aorder[o] for o in actual_updates_sent])
+
+ self.assertEqual([0, 0, 0, 0, 0, 1, 1, 1], captured_skips_stats)
+
+ expected_deferrals = [
+ [],
+ [],
+ [objs_fed_in[1]],
+ [objs_fed_in[1]],
+ [objs_fed_in[1], objs_fed_in[3]],
+ [objs_fed_in[3], objs_fed_in[4]],
+ [objs_fed_in[3]], # note: rightmost element is drained
+ [objs_fed_in[3]],
+ ]
+ self.assertEqual(
+ expected_deferrals,
+ [[ctx['update']['obj'] for ctx in q] for q in captured_queues])
+ actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list]
+ self.assertEqual(2, len(actual_sleeps))
+ self.assertAlmostEqual(0.1, actual_sleeps[0], 3)
+ self.assertAlmostEqual(0.09, actual_sleeps[1], 3)
+ info_lines = self.logger.get_lines_for_level('info')
+ self.assertTrue(info_lines)
+ self.assertIn('4 successes, 0 failures, 0 quarantines, 4 unlinks, '
+ '0 errors, 0 redirects, 1 skips, 3 deferrals, 2 drains',
+ info_lines[-1])
+ self.assertEqual(
+ {'skips': 1, 'successes': 4, 'unlinks': 4, 'deferrals': 3,
+ 'drains': 2}, self.logger.get_increment_counts())
+
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_per_container_rate_limit_unsent_deferrals(self, mock_recon):
+ # make some updates defer until interval is reached and cycle
+ # terminates
+ conf = {
+ 'devices': self.devices_dir,
+ 'mount_check': 'false',
+ 'swift_dir': self.testdir,
+ 'max_objects_per_container_per_second': 10,
+ # only one bucket needed for test
+ 'per_container_ratelimit_buckets': 1,
+ 'max_deferred_updates': 5,
+ 'interval': 0.4,
+ }
+ daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
+ self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
+ os.mkdir(self.async_dir)
+ # all updates for same container
+ num_c1_files = 7
+ for i in range(num_c1_files):
+ obj_name = 'o%02d' % i
+ self._make_async_pending_pickle('a', 'c1', obj_name)
+ c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
+ expected_total = num_c1_files
+ self.assertEqual(expected_total,
+ len(self._find_async_pending_files()))
+ # first pass: 0, 2 and 5 succeed, 1, 3, 4, 6 deferred
+ # last 2 deferred items sent before interval elapses
+ latencies = [0, .05, 0.051, 0, 0, .11, 0, 0,
+ 0.1, 0, 0.1, 0] # total 0.42
+ expected_success = 5
+
+ contexts_fed_in = []
+ captured_queues = []
+ captured_skips_stats = []
+
+ def ratelimit_if(value):
+ contexts_fed_in.append(value)
+ return False # returning False overrides normal ratelimiting
+
+ orig_rate_limited_iterator = utils.RateLimitedIterator
+
+ def fake_rate_limited_iterator(*args, **kwargs):
+ # insert our own rate limiting function
+ kwargs['ratelimit_if'] = ratelimit_if
+ return orig_rate_limited_iterator(*args, **kwargs)
+
+ start = time()
+ now = [start]
+
+ def fake_get_time(bucket_iter):
+ if not captured_skips_stats:
+ daemon.begin = now[0]
+ captured_skips_stats.append(
+ daemon.logger.get_increment_counts().get('skips', 0))
+ captured_queues.append(list(bucket_iter.buckets[0].deque))
+ # insert delay each time iter is called
+ now[0] += latencies.pop(0)
+ return now[0]
+
+ captured_updates = []
+
+ def fake_object_update(node, part, op, obj, *args, **kwargs):
+ captured_updates.append((node, part, op, obj))
+ return True, node['id'], False
+
+ with mock.patch(
+ 'swift.obj.updater.BucketizedUpdateSkippingLimiter._get_time',
+ fake_get_time), \
+ mock.patch.object(daemon, 'object_update',
+ fake_object_update), \
+ mock.patch('swift.obj.updater.RateLimitedIterator',
+ fake_rate_limited_iterator), \
+ mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ daemon.run_once()
+ self.assertEqual(expected_success, daemon.stats.successes)
+ expected_skipped = expected_total - expected_success
+ self.assertEqual(expected_skipped, daemon.stats.skips)
+ self.assertEqual(expected_skipped,
+ len(self._find_async_pending_files()))
+
+ expected_updates_sent = []
+ for index_sent in (0, 2, 5, 6, 4):
+ expected_updates_sent.extend(
+ [contexts_fed_in[index_sent]['update']['obj']] * 3)
+
+ actual_updates_sent = [
+ utils.split_path(update[3], minsegs=3)[-1]
+ for update in captured_updates
+ ]
+ self.assertEqual(expected_updates_sent, actual_updates_sent)
+
+ # skips (un-drained deferrals) not reported until end of cycle
+ self.assertEqual([0] * 12, captured_skips_stats)
+
+ objs_fed_in = [ctx['update']['obj'] for ctx in contexts_fed_in]
+ expected_deferrals = [
+ # queue content before app_iter feeds next update_ctx
+ [],
+ [],
+ [objs_fed_in[1]],
+ [objs_fed_in[1]],
+ [objs_fed_in[1], objs_fed_in[3]],
+ [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
+ [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
+ # queue content before each update_ctx is drained from queue...
+ # note: rightmost element is drained
+ [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4], objs_fed_in[6]],
+ [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
+ [objs_fed_in[1], objs_fed_in[3], objs_fed_in[4]],
+ [objs_fed_in[1], objs_fed_in[3]],
+ [objs_fed_in[1], objs_fed_in[3]],
+ ]
+ self.assertEqual(
+ expected_deferrals,
+ [[ctx['update']['obj'] for ctx in q] for q in captured_queues])
+ actual_sleeps = [call[0][0] for call in mock_sleep.call_args_list]
+ self.assertEqual(2, len(actual_sleeps))
+ self.assertAlmostEqual(0.1, actual_sleeps[0], 3)
+ self.assertAlmostEqual(0.1, actual_sleeps[1], 3)
+ info_lines = self.logger.get_lines_for_level('info')
+ self.assertTrue(info_lines)
+ self.assertIn('5 successes, 0 failures, 0 quarantines, 5 unlinks, '
+ '0 errors, 0 redirects, 2 skips, 4 deferrals, 2 drains',
+ info_lines[-1])
+ self.assertEqual(
+ {'successes': 5, 'unlinks': 5, 'deferrals': 4, 'drains': 2},
+ self.logger.get_increment_counts())
+ self.assertEqual([('skips', 2)], self.logger.get_update_stats())
+
class TestObjectUpdaterFunctions(unittest.TestCase):
def test_split_update_path(self):
@@ -1393,20 +1818,28 @@ class TestObjectUpdaterFunctions(unittest.TestCase):
class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
+
+ def setUp(self):
+ self.logger = debug_logger()
+ self.stats = object_updater.SweepStats()
+
def test_init(self):
- it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10)
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ [3, 1], self.logger, self.stats, 1000, 10)
self.assertEqual(1000, it.num_buckets)
self.assertEqual(0.1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
# rate of 0 implies unlimited
- it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0)
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter([3, 1]), self.logger, self.stats, 9, 0)
self.assertEqual(9, it.num_buckets)
self.assertEqual(-1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
# num_buckets is collared at 1
- it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1)
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter([3, 1]), self.logger, self.stats, 0, 1)
self.assertEqual(1, it.num_buckets)
self.assertEqual(1, it.bucket_update_delta)
self.assertEqual([3, 1], [x for x in it.iterator])
@@ -1417,8 +1850,11 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(20)]
it = object_updater.BucketizedUpdateSkippingLimiter(
- iter(update_ctxs), 9, 0)
+ iter(update_ctxs), self.logger, self.stats, 9, 0)
self.assertEqual(update_ctxs, [x for x in it])
+ self.assertEqual(0, self.stats.skips)
+ self.assertEqual(0, self.stats.drains)
+ self.assertEqual(0, self.stats.deferrals)
def test_iteration_ratelimited(self):
# verify iteration at limited rate - single bucket
@@ -1426,23 +1862,246 @@ class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
{'update': {'account': '%d' % i, 'container': '%s' % i}}
for i in range(2)]
it = object_updater.BucketizedUpdateSkippingLimiter(
- iter(update_ctxs), 1, 0.1)
+ iter(update_ctxs), self.logger, self.stats, 1, 0.1)
+ # second update is skipped
self.assertEqual(update_ctxs[:1], [x for x in it])
+ self.assertEqual(1, self.stats.skips)
+ self.assertEqual(0, self.stats.drains)
+ self.assertEqual(1, self.stats.deferrals)
- def test_iteration_ratelimited_with_callback(self):
- # verify iteration at limited rate - single bucket
- skipped = []
-
- def on_skip(update_ctx):
- skipped.append(update_ctx)
-
+ def test_deferral_single_bucket(self):
+ # verify deferral - single bucket
+ now = time()
update_ctxs = [
{'update': {'account': '%d' % i, 'container': '%s' % i}}
- for i in range(2)]
- it = object_updater.BucketizedUpdateSkippingLimiter(
- iter(update_ctxs), 1, 0.1, skip_f=on_skip)
- self.assertEqual(update_ctxs[:1], [x for x in it])
- self.assertEqual(update_ctxs[1:], skipped)
+ for i in range(4)]
+
+ # enough capacity for all deferrals
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[now, now, now, now, now, now]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
+ max_deferred_elements=2,
+ drain_until=now + 10)
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs[0],
+ update_ctxs[2], # deferrals...
+ update_ctxs[1]],
+ actual)
+ self.assertEqual(2, mock_sleep.call_count)
+ self.assertEqual(0, self.stats.skips)
+ self.assertEqual(2, self.stats.drains)
+ self.assertEqual(2, self.stats.deferrals)
+ self.stats.reset()
+
+ # only space for one deferral
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[now, now, now, now, now]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
+ max_deferred_elements=1,
+ drain_until=now + 10)
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs[0],
+ update_ctxs[2]], # deferrals...
+ actual)
+ self.assertEqual(1, mock_sleep.call_count)
+ self.assertEqual(1, self.stats.skips)
+ self.assertEqual(1, self.stats.drains)
+ self.assertEqual(2, self.stats.deferrals)
+ self.stats.reset()
+
+ # only time for one deferral
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[now, now, now, now, now + 20, now + 20]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs[:3]), self.logger, self.stats, 1, 10,
+ max_deferred_elements=2,
+ drain_until=now + 10)
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs[0],
+ update_ctxs[2]], # deferrals...
+ actual)
+ self.assertEqual(1, mock_sleep.call_count)
+ self.assertEqual(1, self.stats.skips)
+ self.assertEqual(1, self.stats.drains)
+ self.assertEqual(2, self.stats.deferrals)
+ self.stats.reset()
+
+ # only space for two deferrals, only time for one deferral
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[now, now, now, now, now,
+ now + 20, now + 20]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs), self.logger, self.stats, 1, 10,
+ max_deferred_elements=2,
+ drain_until=now + 10)
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs[0],
+ update_ctxs[3]], # deferrals...
+ actual)
+ self.assertEqual(1, mock_sleep.call_count)
+ self.assertEqual(2, self.stats.skips)
+ self.assertEqual(1, self.stats.drains)
+ self.assertEqual(3, self.stats.deferrals)
+ self.stats.reset()
+
+ def test_deferral_multiple_buckets(self):
+ # verify deferral - multiple buckets
+ update_ctxs_1 = [
+ {'update': {'account': 'a', 'container': 'c1', 'obj': '%3d' % i}}
+ for i in range(3)]
+ update_ctxs_2 = [
+ {'update': {'account': 'a', 'container': 'c2', 'obj': '%3d' % i}}
+ for i in range(3)]
+
+ time_iter = itertools.count(time(), 0.001)
+
+ # deferrals stick in both buckets
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[next(time_iter) for _ in range(12)]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs_1 + update_ctxs_2),
+ self.logger, self.stats, 4, 10,
+ max_deferred_elements=4,
+ drain_until=next(time_iter))
+ it.salt = '' # make container->bucket hashing predictable
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs_1[0],
+ update_ctxs_2[0],
+ update_ctxs_1[2], # deferrals...
+ update_ctxs_2[2],
+ update_ctxs_1[1],
+ update_ctxs_2[1],
+ ],
+ actual)
+ self.assertEqual(4, mock_sleep.call_count)
+ self.assertEqual(0, self.stats.skips)
+ self.assertEqual(4, self.stats.drains)
+ self.assertEqual(4, self.stats.deferrals)
+ self.stats.reset()
+
+ # oldest deferral bumped from one bucket due to max_deferrals == 3
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[next(time_iter) for _ in range(10)]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs_1 + update_ctxs_2),
+ self.logger, self.stats, 4, 10,
+ max_deferred_elements=3,
+ drain_until=next(time_iter))
+ it.salt = '' # make container->bucket hashing predictable
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs_1[0],
+ update_ctxs_2[0],
+ update_ctxs_1[2], # deferrals...
+ update_ctxs_2[2],
+ update_ctxs_2[1],
+ ],
+ actual)
+ self.assertEqual(3, mock_sleep.call_count)
+ self.assertEqual(1, self.stats.skips)
+ self.assertEqual(3, self.stats.drains)
+ self.assertEqual(4, self.stats.deferrals)
+ self.stats.reset()
+
+ # older deferrals bumped from one bucket due to max_deferrals == 2
+ with mock.patch('swift.obj.updater.time.time',
+ side_effect=[next(time_iter) for _ in range(10)]):
+ with mock.patch('swift.obj.updater.time.sleep') as mock_sleep:
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs_1 + update_ctxs_2),
+ self.logger, self.stats, 4, 10,
+ max_deferred_elements=2,
+ drain_until=next(time_iter))
+ it.salt = '' # make container->bucket hashing predictable
+ actual = [x for x in it]
+ self.assertEqual([update_ctxs_1[0],
+ update_ctxs_2[0],
+ update_ctxs_2[2], # deferrals...
+ update_ctxs_2[1],
+ ],
+ actual)
+ self.assertEqual(2, mock_sleep.call_count)
+ self.assertEqual(2, self.stats.skips)
+ self.assertEqual(2, self.stats.drains)
+ self.assertEqual(4, self.stats.deferrals)
+ self.stats.reset()
+
+
+class TestRateLimiterBucket(unittest.TestCase):
+ def test_wait_until(self):
+ b1 = object_updater.RateLimiterBucket(10)
+ self.assertEqual(10, b1.wait_until)
+ b1.last_time = b1.wait_until
+ self.assertEqual(20, b1.wait_until)
+ b1.last_time = 12345.678
+ self.assertEqual(12355.678, b1.wait_until)
+
+ def test_len(self):
+ b1 = object_updater.RateLimiterBucket(10)
+ b1.deque.append(1)
+ b1.deque.append(2)
+ self.assertEqual(2, len(b1))
+ b1.deque.pop()
+ self.assertEqual(1, len(b1))
+
+ def test_bool(self):
+ b1 = object_updater.RateLimiterBucket(10)
+ self.assertFalse(b1)
+ b1.deque.append(1)
+ self.assertTrue(b1)
+ b1.deque.pop()
+ self.assertFalse(b1)
+
+ def test_bucket_ordering(self):
+ time_iter = itertools.count(time(), step=0.001)
+ b1 = object_updater.RateLimiterBucket(10)
+ b2 = object_updater.RateLimiterBucket(10)
+
+ b2.last_time = next(time_iter)
+ buckets = PriorityQueue()
+ buckets.put(b1)
+ buckets.put(b2)
+ self.assertEqual([b1, b2], [buckets.get_nowait() for _ in range(2)])
+
+ b1.last_time = next(time_iter)
+ buckets.put(b1)
+ buckets.put(b2)
+ self.assertEqual([b2, b1], [buckets.get_nowait() for _ in range(2)])
+
+
+class TestSweepStats(unittest.TestCase):
+ def test_copy(self):
+ num_props = len(vars(object_updater.SweepStats()))
+ stats = object_updater.SweepStats(*range(1, num_props + 1))
+ stats2 = stats.copy()
+ self.assertEqual(vars(stats), vars(stats2))
+
+ def test_since(self):
+ stats = object_updater.SweepStats(1, 2, 3, 4, 5, 6, 7, 8, 9)
+ stats2 = object_updater.SweepStats(4, 6, 8, 10, 12, 14, 16, 18, 20)
+ expected = object_updater.SweepStats(3, 4, 5, 6, 7, 8, 9, 10, 11)
+ self.assertEqual(vars(expected), vars(stats2.since(stats)))
+
+ def test_reset(self):
+ num_props = len(vars(object_updater.SweepStats()))
+ stats = object_updater.SweepStats(*range(1, num_props + 1))
+ stats.reset()
+ expected = object_updater.SweepStats()
+ self.assertEqual(vars(expected), vars(stats))
+
+ def test_str(self):
+ num_props = len(vars(object_updater.SweepStats()))
+ stats = object_updater.SweepStats(*range(1, num_props + 1))
+ self.assertEqual(
+ '4 successes, 2 failures, 3 quarantines, 5 unlinks, 1 errors, '
+ '6 redirects, 7 skips, 8 deferrals, 9 drains', str(stats))
if __name__ == '__main__':