summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/object-server.conf-sample10
-rw-r--r--swift/obj/updater.py182
-rw-r--r--test/unit/obj/test_updater.py269
3 files changed, 398 insertions, 63 deletions
diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample
index e9557b6c2..5329bca8f 100644
--- a/etc/object-server.conf-sample
+++ b/etc/object-server.conf-sample
@@ -473,6 +473,16 @@ use = egg:swift#recon
# Send at most this many object updates per second
# objects_per_second = 50
#
+# Send at most this many object updates per bucket per second. The value must
+# be a float greater than or equal to 0. Set to 0 for unlimited.
+# max_objects_per_container_per_second = 0
+#
+# The per_container ratelimit implementation uses a hashring to constrain
+# memory requirements. Orders of magnitude more buckets will use (nominally)
+# more memory, but will ratelimit smaller groups of containers. The value must
+# be an integer greater than 0.
+# per_container_ratelimit_buckets = 1000
+#
# slowdown will sleep that amount between objects. Deprecated; use
# objects_per_second instead.
# slowdown = 0.01
diff --git a/swift/obj/updater.py b/swift/obj/updater.py
index d49f12f76..a0db3ffe7 100644
--- a/swift/obj/updater.py
+++ b/swift/obj/updater.py
@@ -19,6 +19,7 @@ import os
import signal
import sys
import time
+import uuid
from random import random, shuffle
from eventlet import spawn, Timeout
@@ -29,7 +30,8 @@ from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
- eventlet_monkey_patch, get_redirect_data, ContextPool
+ eventlet_monkey_patch, get_redirect_data, ContextPool, hash_path, \
+ non_negative_float, config_positive_int_value
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import split_policy_string, PolicyError
@@ -39,18 +41,68 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
+class BucketizedUpdateSkippingLimiter(object):
+ """
+ Wrap an iterator to filter elements that show up too often.
+
+ :param update_iterable: an async_pending update iterable
+ :param num_buckets: number of buckets to divide container hashes into, the
+ more buckets total the less containers to a bucket
+ (once a busy container slows down a bucket the whole
+ bucket starts skipping)
+ :param max_elements_per_group_per_second: tunable, when skipping kicks in
+ :param skip_f: function to call with update_ctx when skipping it
+ """
+
+ def __init__(self, update_iterable, num_buckets,
+ max_elements_per_group_per_second,
+ skip_f=lambda update_ctx: None):
+ self.iterator = iter(update_iterable)
+ # if we want a smaller "blast radius" we could make this number bigger
+ self.num_buckets = max(num_buckets, 1)
+ # an array might be more efficient; but this is pretty cheap
+ self.next_update = [0.0 for _ in range(self.num_buckets)]
+ try:
+ self.bucket_update_delta = 1.0 / max_elements_per_group_per_second
+ except ZeroDivisionError:
+ self.bucket_update_delta = -1
+ self.skip_f = skip_f
+ self.salt = str(uuid.uuid4())
+
+ def __iter__(self):
+ return self
+
+ def _bucket_key(self, update):
+ acct, cont = split_update_path(update)
+ return int(hash_path(acct, cont, self.salt), 16) % self.num_buckets
+
+ def next(self):
+ for update_ctx in self.iterator:
+ bucket_key = self._bucket_key(update_ctx['update'])
+ now = time.time()
+ if self.next_update[bucket_key] > now:
+ self.skip_f(update_ctx)
+ continue
+ self.next_update[bucket_key] = now + self.bucket_update_delta
+ return update_ctx
+ raise StopIteration()
+
+ __next__ = next
+
+
class SweepStats(object):
"""
Stats bucket for an update sweep
"""
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
- unlinks=0, redirects=0):
+ unlinks=0, redirects=0, skips=0):
self.errors = errors
self.failures = failures
self.quarantines = quarantines
self.successes = successes
self.unlinks = unlinks
self.redirects = redirects
+ self.skips = skips
def copy(self):
return type(self)(self.errors, self.failures, self.quarantines,
@@ -62,7 +114,8 @@ class SweepStats(object):
self.quarantines - other.quarantines,
self.successes - other.successes,
self.unlinks - other.unlinks,
- self.redirects - other.redirects)
+ self.redirects - other.redirects,
+ self.skips - other.skips)
def reset(self):
self.errors = 0
@@ -71,6 +124,7 @@ class SweepStats(object):
self.successes = 0
self.unlinks = 0
self.redirects = 0
+ self.skips = 0
def __str__(self):
keys = (
@@ -80,10 +134,26 @@ class SweepStats(object):
(self.unlinks, 'unlinks'),
(self.errors, 'errors'),
(self.redirects, 'redirects'),
+ (self.skips, 'skips'),
)
return ', '.join('%d %s' % pair for pair in keys)
+def split_update_path(update):
+ """
+ Split the account and container parts out of the async update data.
+
+ N.B. updates to shards set the container_path key while the account and
+ container keys are always the root.
+ """
+ container_path = update.get('container_path')
+ if container_path:
+ acct, cont = split_path('/' + container_path, minsegs=2)
+ else:
+ acct, cont = update['account'], update['container']
+ return acct, cont
+
+
class ObjectUpdater(Daemon):
"""Update object information in container listings."""
@@ -110,6 +180,10 @@ class ObjectUpdater(Daemon):
self.max_objects_per_second = \
float(conf.get('objects_per_second',
objects_per_second))
+ self.max_objects_per_container_per_second = non_negative_float(
+ conf.get('max_objects_per_container_per_second', 0))
+ self.per_container_ratelimit_buckets = config_positive_int_value(
+ conf.get('per_container_ratelimit_buckets', 1000))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.report_interval = float(conf.get('report_interval', 300))
@@ -205,13 +279,40 @@ class ObjectUpdater(Daemon):
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
+ def _load_update(self, device, update_path):
+ try:
+ return pickle.load(open(update_path, 'rb'))
+ except Exception as e:
+ if getattr(e, 'errno', None) == errno.ENOENT:
+ return
+ self.logger.exception(
+ 'ERROR Pickle problem, quarantining %s', update_path)
+ self.stats.quarantines += 1
+ self.logger.increment('quarantines')
+ target_path = os.path.join(device, 'quarantined', 'objects',
+ os.path.basename(update_path))
+ renamer(update_path, target_path, fsync=False)
+ try:
+ # If this was the last async_pending in the directory,
+ # then this will succeed. Otherwise, it'll fail, and
+ # that's okay.
+ os.rmdir(os.path.dirname(update_path))
+ except OSError:
+ pass
+ return
+
def _iter_async_pendings(self, device):
"""
- Locate and yield all the async pendings on the device. Multiple updates
- for the same object will come out in reverse-chronological order
- (i.e. newest first) so that callers can skip stale async_pendings.
+ Locate and yield an update context for all the async pending files on
+ the device. Each update context contains details of the async pending
+ file location, its timestamp and the un-pickled update data.
+
+ Async pending files that fail to load will be quarantined.
+
+ Only the most recent update for the same object is yielded; older
+ (stale) async pending files are unlinked as they are located.
- Tries to clean up empty directories as it goes.
+ The iterator tries to clean up empty directories as it goes.
"""
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
@@ -238,12 +339,13 @@ class ObjectUpdater(Daemon):
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
- for update in sorted(self._listdir(prefix_path), reverse=True):
- update_path = os.path.join(prefix_path, update)
+ for update_file in sorted(self._listdir(prefix_path),
+ reverse=True):
+ update_path = os.path.join(prefix_path, update_file)
if not os.path.isfile(update_path):
continue
try:
- obj_hash, timestamp = update.split('-')
+ obj_hash, timestamp = update_file.split('-')
except ValueError:
self.stats.errors += 1
self.logger.increment('errors')
@@ -280,9 +382,14 @@ class ObjectUpdater(Daemon):
raise
else:
last_obj_hash = obj_hash
- yield {'device': device, 'policy': policy,
- 'path': update_path,
- 'obj_hash': obj_hash, 'timestamp': timestamp}
+ update = self._load_update(device, update_path)
+ if update is not None:
+ yield {'device': device,
+ 'policy': policy,
+ 'update_path': update_path,
+ 'obj_hash': obj_hash,
+ 'timestamp': timestamp,
+ 'update': update}
def object_sweep(self, device):
"""
@@ -297,13 +404,21 @@ class ObjectUpdater(Daemon):
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
+ def skip_counting_f(update_ctx):
+ # in the future we could defer update_ctx
+ self.stats.skips += 1
+ self.logger.increment("skips")
+
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
+ ap_iter = BucketizedUpdateSkippingLimiter(
+ ap_iter, self.per_container_ratelimit_buckets,
+ self.max_objects_per_container_per_second,
+ skip_f=skip_counting_f)
with ContextPool(self.concurrency) as pool:
- for update in ap_iter:
- pool.spawn(self.process_object_update,
- update['path'], update['device'], update['policy'])
+ for update_ctx in ap_iter:
+ pool.spawn(self.process_object_update, **update_ctx)
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
@@ -326,6 +441,7 @@ class ObjectUpdater(Daemon):
'%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors, '
'%(redirects)d redirects '
+ '%(skips)d skips '
'(pid: %(pid)d)'),
{'device': device,
'elapsed': time.time() - start_time,
@@ -335,36 +451,20 @@ class ObjectUpdater(Daemon):
'quarantines': sweep_totals.quarantines,
'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors,
- 'redirects': sweep_totals.redirects})
+ 'redirects': sweep_totals.redirects,
+ 'skips': sweep_totals.skips})
- def process_object_update(self, update_path, device, policy):
+ def process_object_update(self, update_path, device, policy, update,
+ **kwargs):
"""
Process the object information to be updated and update.
:param update_path: path to pickled object update file
:param device: path to device
:param policy: storage policy of object update
+ :param update: the un-pickled update data
+ :param kwargs: un-used keys from update_ctx
"""
- try:
- update = pickle.load(open(update_path, 'rb'))
- except Exception as e:
- if getattr(e, 'errno', None) == errno.ENOENT:
- return
- self.logger.exception(
- 'ERROR Pickle problem, quarantining %s', update_path)
- self.stats.quarantines += 1
- self.logger.increment('quarantines')
- target_path = os.path.join(device, 'quarantined', 'objects',
- os.path.basename(update_path))
- renamer(update_path, target_path, fsync=False)
- try:
- # If this was the last async_pending in the directory,
- # then this will succeed. Otherwise, it'll fail, and
- # that's okay.
- os.rmdir(os.path.dirname(update_path))
- except OSError:
- pass
- return
def do_update():
successes = update.get('successes', [])
@@ -374,11 +474,7 @@ class ObjectUpdater(Daemon):
str(int(policy)))
headers_out.setdefault('X-Backend-Accept-Redirect', 'true')
headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true')
- container_path = update.get('container_path')
- if container_path:
- acct, cont = split_path('/' + container_path, minsegs=2)
- else:
- acct, cont = update['account'], update['container']
+ acct, cont = split_update_path(update)
part, nodes = self.get_container_ring().get_nodes(acct, cont)
obj = '/%s/%s/%s' % (acct, cont, update['obj'])
events = [spawn(self.object_update,
diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py
index 1ba4c8d6e..b380286f9 100644
--- a/test/unit/obj/test_updater.py
+++ b/test/unit/obj/test_updater.py
@@ -12,13 +12,14 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import eventlet
import six.moves.cPickle as pickle
import mock
import os
import unittest
import random
import itertools
+from collections import Counter
from contextlib import closing
from gzip import GzipFile
from tempfile import mkdtemp
@@ -39,8 +40,7 @@ from swift.common.ring import RingData
from swift.common import utils
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.swob import bytes_to_wsgi
-from swift.common.utils import (
- hash_path, normalize_timestamp, mkdirs, write_pickle)
+from swift.common.utils import hash_path, normalize_timestamp, mkdirs
from swift.common.storage_policy import StoragePolicy, POLICIES
@@ -135,6 +135,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.concurrency, 8)
self.assertEqual(daemon.updater_workers, 1)
self.assertEqual(daemon.max_objects_per_second, 50.0)
+ self.assertEqual(daemon.max_objects_per_container_per_second, 0.0)
+ self.assertEqual(daemon.per_container_ratelimit_buckets, 1000)
# non-defaults
conf = {
@@ -145,6 +147,8 @@ class TestObjectUpdater(unittest.TestCase):
'concurrency': '2',
'updater_workers': '3',
'objects_per_second': '10.5',
+ 'max_objects_per_container_per_second': '1.2',
+ 'per_container_ratelimit_buckets': '100',
}
daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
self.assertEqual(daemon.devices, '/some/where/else')
@@ -154,6 +158,8 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(daemon.concurrency, 2)
self.assertEqual(daemon.updater_workers, 3)
self.assertEqual(daemon.max_objects_per_second, 10.5)
+ self.assertEqual(daemon.max_objects_per_container_per_second, 1.2)
+ self.assertEqual(daemon.per_container_ratelimit_buckets, 100)
# check deprecated option
daemon = object_updater.ObjectUpdater({'slowdown': '0.04'},
@@ -169,6 +175,12 @@ class TestObjectUpdater(unittest.TestCase):
check_bad({'concurrency': '1.0'})
check_bad({'slowdown': 'baz'})
check_bad({'objects_per_second': 'quux'})
+ check_bad({'max_objects_per_container_per_second': '-0.1'})
+ check_bad({'max_objects_per_container_per_second': 'auto'})
+ check_bad({'per_container_ratelimit_buckets': '1.2'})
+ check_bad({'per_container_ratelimit_buckets': '0'})
+ check_bad({'per_container_ratelimit_buckets': '-1'})
+ check_bad({'per_container_ratelimit_buckets': 'auto'})
@mock.patch('os.listdir')
def test_listdir_with_exception(self, mock_listdir):
@@ -201,11 +213,12 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual(len(log_lines), 0)
self.assertEqual(path, ['foo', 'bar'])
- def test_object_sweep(self):
- def check_with_idx(index, warn, should_skip):
- if int(index) > 0:
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_object_sweep(self, mock_recon):
+ def check_with_idx(policy_index, warn, should_skip):
+ if int(policy_index) > 0:
asyncdir = os.path.join(self.sda1,
- ASYNCDIR_BASE + "-" + index)
+ ASYNCDIR_BASE + "-" + policy_index)
else:
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
@@ -220,7 +233,8 @@ class TestObjectUpdater(unittest.TestCase):
os.path.join(self.sda1,
ASYNCDIR_BASE + '-' + 'twentington'),
os.path.join(self.sda1,
- ASYNCDIR_BASE + '-' + str(int(index) + 100)))
+ ASYNCDIR_BASE + '-' + str(
+ int(policy_index) + 100)))
for not_dir in not_dirs:
with open(not_dir, 'w'):
@@ -239,13 +253,13 @@ class TestObjectUpdater(unittest.TestCase):
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
if t == timestamps[0]:
- expected.add((o_path, int(index)))
- write_pickle({}, o_path)
+ expected.add((o_path, int(policy_index)))
+ self._write_dummy_pickle(o_path, 'account', 'container', o)
seen = set()
class MockObjectUpdater(object_updater.ObjectUpdater):
- def process_object_update(self, update_path, device, policy):
+ def process_object_update(self, update_path, policy, **kwargs):
seen.add((update_path, int(policy)))
os.unlink(update_path)
@@ -290,10 +304,10 @@ class TestObjectUpdater(unittest.TestCase):
ohash = hash_path('account', 'container', o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
- write_pickle({}, o_path)
+ self._write_dummy_pickle(o_path, 'account', 'container', o)
class MockObjectUpdater(object_updater.ObjectUpdater):
- def process_object_update(self, update_path, device, policy):
+ def process_object_update(self, update_path, **kwargs):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
@@ -312,12 +326,13 @@ class TestObjectUpdater(unittest.TestCase):
def mock_time_function():
rv = now[0]
- now[0] += 5
+ now[0] += 4
return rv
- # With 10s between updates, time() advancing 5s every time we look,
+ # With 10s between updates, time() advancing 4s every time we look,
# and 5 async_pendings on disk, we should get at least two progress
- # lines.
+ # lines. (time is incremented by 4 each time the update app iter yields
+ # and each time the elapsed time is sampled)
with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time_function)), \
mock.patch.object(object_updater, 'ContextPool', MockPool):
@@ -360,10 +375,10 @@ class TestObjectUpdater(unittest.TestCase):
ohash = hash_path('account', 'container%d' % policy.idx, o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
- write_pickle({}, o_path)
+ self._write_dummy_pickle(o_path, 'account', 'container', o)
class MockObjectUpdater(object_updater.ObjectUpdater):
- def process_object_update(self, update_path, device, policy):
+ def process_object_update(self, update_path, **kwargs):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
@@ -1196,7 +1211,7 @@ class TestObjectUpdater(unittest.TestCase):
def test_obj_update_gone_missing(self):
# if you've got multiple updaters running (say, both a background
- # and foreground process), process_object_update may get a file
+ # and foreground process), _load_update may get a file
# that doesn't exist
policies = list(POLICIES)
random.shuffle(policies)
@@ -1218,13 +1233,227 @@ class TestObjectUpdater(unittest.TestCase):
odir,
'%s-%s' % (ohash, next(self.ts_iter).internal))
+ self.assertEqual(os.listdir(async_dir), [ohash[-3:]])
+ self.assertFalse(os.listdir(odir))
with mocked_http_conn():
with mock.patch('swift.obj.updater.dump_recon_cache'):
- daemon.process_object_update(op_path, self.sda1, policies[0])
+ daemon._load_update(self.sda1, op_path)
self.assertEqual({}, daemon.logger.get_increment_counts())
self.assertEqual(os.listdir(async_dir), [ohash[-3:]])
self.assertFalse(os.listdir(odir))
+ def _write_dummy_pickle(self, path, a, c, o, cp=None):
+ update = {
+ 'op': 'PUT',
+ 'account': a,
+ 'container': c,
+ 'obj': o,
+ 'headers': {'X-Container-Timestamp': normalize_timestamp(0)}
+ }
+ if cp:
+ update['container_path'] = cp
+ with open(path, 'wb') as async_pending:
+ pickle.dump(update, async_pending)
+
+ def _make_async_pending_pickle(self, a, c, o, cp=None):
+ ohash = hash_path(a, c, o)
+ odir = os.path.join(self.async_dir, ohash[-3:])
+ mkdirs(odir)
+ path = os.path.join(
+ odir,
+ '%s-%s' % (ohash, normalize_timestamp(time())))
+ self._write_dummy_pickle(path, a, c, o, cp)
+
+ def _find_async_pending_files(self):
+ found_files = []
+ for root, dirs, files in os.walk(self.async_dir):
+ found_files.extend(files)
+ return found_files
+
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_per_container_rate_limit(self, mock_recon):
+ conf = {
+ 'devices': self.devices_dir,
+ 'mount_check': 'false',
+ 'swift_dir': self.testdir,
+ 'max_objects_per_container_per_second': 1,
+ }
+ daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
+ self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
+ os.mkdir(self.async_dir)
+ num_c1_files = 10
+ for i in range(num_c1_files):
+ obj_name = 'o%02d' % i
+ self._make_async_pending_pickle('a', 'c1', obj_name)
+ c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
+ # make one more in a different container, with a container_path
+ self._make_async_pending_pickle('a', 'c2', obj_name,
+ cp='.shards_a/c2_shard')
+ c2_part, _ = daemon.get_container_ring().get_nodes('.shards_a',
+ 'c2_shard')
+ expected_total = num_c1_files + 1
+ self.assertEqual(expected_total,
+ len(self._find_async_pending_files()))
+ expected_success = 2
+ fake_status_codes = [200] * 3 * expected_success
+ with mocked_http_conn(*fake_status_codes) as fake_conn:
+ daemon.run_once()
+ self.assertEqual(expected_success, daemon.stats.successes)
+ expected_skipped = expected_total - expected_success
+ self.assertEqual(expected_skipped, daemon.stats.skips)
+ self.assertEqual(expected_skipped,
+ len(self._find_async_pending_files()))
+ self.assertEqual(
+ Counter(
+ '/'.join(req['path'].split('/')[:5])
+ for req in fake_conn.requests),
+ {'/sda1/%s/a/c1' % c1_part: 3,
+ '/sda1/%s/.shards_a/c2_shard' % c2_part: 3})
+
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_per_container_rate_limit_unlimited(self, mock_recon):
+ conf = {
+ 'devices': self.devices_dir,
+ 'mount_check': 'false',
+ 'swift_dir': self.testdir,
+ 'max_objects_per_container_per_second': 0,
+ }
+ daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
+ self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
+ os.mkdir(self.async_dir)
+ num_c1_files = 10
+ for i in range(num_c1_files):
+ obj_name = 'o%02d' % i
+ self._make_async_pending_pickle('a', 'c1', obj_name)
+ c1_part, _ = daemon.get_container_ring().get_nodes('a', 'c1')
+ # make one more in a different container, with a container_path
+ self._make_async_pending_pickle('a', 'c2', obj_name,
+ cp='.shards_a/c2_shard')
+ c2_part, _ = daemon.get_container_ring().get_nodes('.shards_a',
+ 'c2_shard')
+ expected_total = num_c1_files + 1
+ self.assertEqual(expected_total,
+ len(self._find_async_pending_files()))
+ fake_status_codes = [200] * 3 * expected_total
+ with mocked_http_conn(*fake_status_codes):
+ daemon.run_once()
+ self.assertEqual(expected_total, daemon.stats.successes)
+ self.assertEqual(0, daemon.stats.skips)
+ self.assertEqual([], self._find_async_pending_files())
+
+ @mock.patch('swift.obj.updater.dump_recon_cache')
+ def test_per_container_rate_limit_slow_responses(self, mock_recon):
+ conf = {
+ 'devices': self.devices_dir,
+ 'mount_check': 'false',
+ 'swift_dir': self.testdir,
+ 'max_objects_per_container_per_second': 10,
+ }
+ daemon = object_updater.ObjectUpdater(conf, logger=self.logger)
+ self.async_dir = os.path.join(self.sda1, get_async_dir(POLICIES[0]))
+ os.mkdir(self.async_dir)
+ # all updates for same container
+ num_c1_files = 4
+ for i in range(num_c1_files):
+ obj_name = 'o%02d' % i
+ self._make_async_pending_pickle('a', 'c1', obj_name)
+ expected_total = num_c1_files
+ self.assertEqual(expected_total,
+ len(self._find_async_pending_files()))
+ latencies = [.11, 0, .11, 0]
+ expected_success = 2
+ fake_status_codes = [200] * 3 * expected_success
+
+ def fake_spawn(pool, *args, **kwargs):
+ # make each update delay the iter being called again
+ eventlet.sleep(latencies.pop(0))
+ return args[0](*args[1:], **kwargs)
+
+ with mocked_http_conn(*fake_status_codes):
+ with mock.patch('swift.obj.updater.ContextPool.spawn', fake_spawn):
+ daemon.run_once()
+ self.assertEqual(expected_success, daemon.stats.successes)
+ expected_skipped = expected_total - expected_success
+ self.assertEqual(expected_skipped, daemon.stats.skips)
+ self.assertEqual(expected_skipped,
+ len(self._find_async_pending_files()))
+
+
+class TestObjectUpdaterFunctions(unittest.TestCase):
+ def test_split_update_path(self):
+ update = {
+ 'op': 'PUT',
+ 'account': 'a',
+ 'container': 'c',
+ 'obj': 'o',
+ 'headers': {
+ 'X-Container-Timestamp': normalize_timestamp(0),
+ }
+ }
+ actual = object_updater.split_update_path(update)
+ self.assertEqual(('a', 'c'), actual)
+
+ update['container_path'] = None
+ actual = object_updater.split_update_path(update)
+ self.assertEqual(('a', 'c'), actual)
+
+ update['container_path'] = '.shards_a/c_shard_n'
+ actual = object_updater.split_update_path(update)
+ self.assertEqual(('.shards_a', 'c_shard_n'), actual)
+
+
+class TestBucketizedUpdateSkippingLimiter(unittest.TestCase):
+ def test_init(self):
+ it = object_updater.BucketizedUpdateSkippingLimiter([3, 1], 1000, 10)
+ self.assertEqual(1000, it.num_buckets)
+ self.assertEqual(0.1, it.bucket_update_delta)
+ self.assertEqual([3, 1], [x for x in it.iterator])
+
+ # rate of 0 implies unlimited
+ it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 9, 0)
+ self.assertEqual(9, it.num_buckets)
+ self.assertEqual(-1, it.bucket_update_delta)
+ self.assertEqual([3, 1], [x for x in it.iterator])
+
+ # num_buckets is collared at 1
+ it = object_updater.BucketizedUpdateSkippingLimiter(iter([3, 1]), 0, 1)
+ self.assertEqual(1, it.num_buckets)
+ self.assertEqual(1, it.bucket_update_delta)
+ self.assertEqual([3, 1], [x for x in it.iterator])
+
+ def test_iteration_unlimited(self):
+ # verify iteration at unlimited rate
+ update_ctxs = [
+ {'update': {'account': '%d' % i, 'container': '%s' % i}}
+ for i in range(20)]
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs), 9, 0)
+ self.assertEqual(update_ctxs, [x for x in it])
+
+ def test_iteration_ratelimited(self):
+ # verify iteration at limited rate - single bucket
+ update_ctxs = [
+ {'update': {'account': '%d' % i, 'container': '%s' % i}}
+ for i in range(2)]
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs), 1, 0.1)
+ self.assertEqual(update_ctxs[:1], [x for x in it])
+
+ def test_iteration_ratelimited_with_callback(self):
+ # verify iteration at limited rate - single bucket
+ skipped = []
+
+ def on_skip(update_ctx):
+ skipped.append(update_ctx)
+
+ update_ctxs = [
+ {'update': {'account': '%d' % i, 'container': '%s' % i}}
+ for i in range(2)]
+ it = object_updater.BucketizedUpdateSkippingLimiter(
+ iter(update_ctxs), 1, 0.1, skip_f=on_skip)
+ self.assertEqual(update_ctxs[:1], [x for x in it])
+ self.assertEqual(update_ctxs[1:], skipped)
+
if __name__ == '__main__':
unittest.main()