summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swift/common/memcached.py18
-rw-r--r--swift/proxy/controllers/base.py11
-rw-r--r--swift/proxy/controllers/container.py12
-rw-r--r--test/unit/__init__.py14
-rw-r--r--test/unit/common/test_memcached.py76
-rw-r--r--test/unit/proxy/controllers/test_container.py43
-rw-r--r--test/unit/proxy/test_server.py17
7 files changed, 165 insertions, 26 deletions
diff --git a/swift/common/memcached.py b/swift/common/memcached.py
index 1a371e58b..37719bf79 100644
--- a/swift/common/memcached.py
+++ b/swift/common/memcached.py
@@ -258,6 +258,7 @@ class MemcacheRing(object):
"""
pos = bisect(self._sorted, key)
served = []
+ any_yielded = False
while len(served) < self._tries:
pos = (pos + 1) % len(self._sorted)
server = self._ring[self._sorted[pos]]
@@ -270,6 +271,7 @@ class MemcacheRing(object):
try:
with MemcachePoolTimeout(self._pool_timeout):
fp, sock = self._client_cache[server].get()
+ any_yielded = True
yield server, fp, sock
except MemcachePoolTimeout as e:
self._exception_occurred(
@@ -281,13 +283,15 @@ class MemcacheRing(object):
# object.
self._exception_occurred(
server, e, action='connecting', sock=sock)
+ if not any_yielded:
+ self.logger.error('All memcached servers error-limited')
def _return_conn(self, server, fp, sock):
"""Returns a server connection to the pool."""
self._client_cache[server].put((fp, sock))
def set(self, key, value, serialize=True, time=0,
- min_compress_len=0):
+ min_compress_len=0, raise_on_error=False):
"""
Set a key/value pair in memcache
@@ -301,6 +305,8 @@ class MemcacheRing(object):
added to keep the signature compatible with
python-memcached interface. This
implementation ignores it.
+ :param raise_on_error: if True, propagate Timeouts and other errors.
+ By default, errors are ignored.
"""
key = md5hash(key)
timeout = sanitize_timeout(time)
@@ -340,14 +346,19 @@ class MemcacheRing(object):
return
except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp)
+ if raise_on_error:
+ raise MemcacheConnectionError(
+ "No memcached connections succeeded.")
- def get(self, key):
+ def get(self, key, raise_on_error=False):
"""
Gets the object specified by key. It will also unserialize the object
before returning if it is serialized in memcache with JSON, or if it
is pickled and unpickling is allowed.
:param key: key
+ :param raise_on_error: if True, propagate Timeouts and other errors.
+ By default, errors are treated as cache misses.
:returns: value of the key in memcache
"""
key = md5hash(key)
@@ -378,6 +389,9 @@ class MemcacheRing(object):
return value
except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp)
+ if raise_on_error:
+ raise MemcacheConnectionError(
+ "No memcached connections succeeded.")
def incr(self, key, delta=1, time=0):
"""
diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 825fbb7c2..e0b95107e 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -40,6 +40,7 @@ from eventlet import sleep
from eventlet.timeout import Timeout
import six
+from swift.common.memcached import MemcacheConnectionError
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
@@ -2400,9 +2401,13 @@ class Controller(object):
if skip_chance and random.random() < skip_chance:
self.logger.increment('shard_updating.cache.skip')
else:
- cached_ranges = memcache.get(cache_key)
- self.logger.increment('shard_updating.cache.%s' % (
- 'hit' if cached_ranges else 'miss'))
+ try:
+ cached_ranges = memcache.get(
+ cache_key, raise_on_error=True)
+ cache_state = 'hit' if cached_ranges else 'miss'
+ except MemcacheConnectionError:
+ cache_state = 'error'
+ self.logger.increment('shard_updating.cache.%s' % cache_state)
if cached_ranges:
shard_ranges = [
diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py
index 4ac00d010..a21a55d22 100644
--- a/swift/proxy/controllers/container.py
+++ b/swift/proxy/controllers/container.py
@@ -20,6 +20,7 @@ import random
import six
from six.moves.urllib.parse import unquote
+from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import public, private, csv_append, Timestamp, \
config_true_value, ShardRange, cache_from_env, filter_shard_ranges
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
@@ -152,9 +153,14 @@ class ContainerController(Controller):
if skip_chance and random.random() < skip_chance:
self.logger.increment('shard_listing.cache.skip')
else:
- cached_ranges = memcache.get(cache_key)
- self.logger.increment('shard_listing.cache.%s' % (
- 'hit' if cached_ranges else 'miss'))
+ try:
+ cached_ranges = memcache.get(
+ cache_key, raise_on_error=True)
+ cache_state = 'hit' if cached_ranges else 'miss'
+ except MemcacheConnectionError:
+ cache_state = 'error'
+ self.logger.increment(
+ 'shard_listing.cache.%s' % cache_state)
if cached_ranges is not None:
infocache[cache_key] = tuple(cached_ranges)
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 40d606933..2bea8ef46 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -410,17 +410,22 @@ def track(f):
class FakeMemcache(object):
- def __init__(self):
+ def __init__(self, error_on_set=None, error_on_get=None):
self.store = {}
self.calls = []
self.error_on_incr = False
+ self.error_on_get = error_on_get or []
+ self.error_on_set = error_on_set or []
self.init_incr_return_neg = False
def clear_calls(self):
del self.calls[:]
@track
- def get(self, key):
+ def get(self, key, raise_on_error=False):
+ if self.error_on_get and self.error_on_get.pop(0):
+ if raise_on_error:
+ raise MemcacheConnectionError()
return self.store.get(key)
@property
@@ -428,7 +433,10 @@ class FakeMemcache(object):
return self.store.keys
@track
- def set(self, key, value, serialize=True, time=0):
+ def set(self, key, value, serialize=True, time=0, raise_on_error=False):
+ if self.error_on_set and self.error_on_set.pop(0):
+ if raise_on_error:
+ raise MemcacheConnectionError()
if serialize:
value = json.loads(json.dumps(value))
else:
diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py
index 06e567d86..f53381760 100644
--- a/test/unit/common/test_memcached.py
+++ b/test/unit/common/test_memcached.py
@@ -32,6 +32,7 @@ from eventlet import GreenPool, sleep, Queue
from eventlet.pools import Pool
from swift.common import memcached
+from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import md5, human_readable
from mock import patch, MagicMock
from test.debug_logger import debug_logger
@@ -581,19 +582,27 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
- ] * 11 + [
- 'Error limiting server 1.2.3.4:11211'
+ ] * 10 + [
+ 'Error talking to memcached: 1.2.3.4:11211: '
+ '[Errno 32] Broken pipe',
+ 'Error limiting server 1.2.3.4:11211',
+ 'All memcached servers error-limited',
])
self.logger.clear()
# continued requests just keep bypassing memcache
for _ in range(12):
memcache_client.set('some_key', [1, 2, 3])
- self.assertEqual(self.logger.get_lines_for_level('error'), [])
+ self.assertEqual(self.logger.get_lines_for_level('error'), [
+ 'All memcached servers error-limited',
+ ] * 12)
+ self.logger.clear()
# and get()s are all a "cache miss"
self.assertIsNone(memcache_client.get('some_key'))
- self.assertEqual(self.logger.get_lines_for_level('error'), [])
+ self.assertEqual(self.logger.get_lines_for_level('error'), [
+ 'All memcached servers error-limited',
+ ])
def test_error_disabled(self):
memcache_client = memcached.MemcacheRing(
@@ -611,6 +620,44 @@ class TestMemcached(unittest.TestCase):
'[Errno 32] Broken pipe',
] * 20)
+ def test_error_raising(self):
+ memcache_client = memcached.MemcacheRing(
+ ['1.2.3.4:11211'], logger=self.logger, error_limit_time=0)
+ mock1 = ExplodingMockMemcached()
+ memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
+ [(mock1, mock1)] * 20)
+
+ # expect exception when requested...
+ with self.assertRaises(MemcacheConnectionError):
+ memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
+ self.assertEqual(self.logger.get_lines_for_level('error'), [
+ 'Error talking to memcached: 1.2.3.4:11211: '
+ '[Errno 32] Broken pipe',
+ ])
+ self.logger.clear()
+
+ with self.assertRaises(MemcacheConnectionError):
+ memcache_client.get('some_key', raise_on_error=True)
+ self.assertEqual(self.logger.get_lines_for_level('error'), [
+ 'Error talking to memcached: 1.2.3.4:11211: '
+ '[Errno 32] Broken pipe',
+ ])
+ self.logger.clear()
+
+ # ...but default is no exception
+ memcache_client.set('some_key', [1, 2, 3])
+ self.assertEqual(self.logger.get_lines_for_level('error'), [
+ 'Error talking to memcached: 1.2.3.4:11211: '
+ '[Errno 32] Broken pipe',
+ ])
+ self.logger.clear()
+
+ memcache_client.get('some_key')
+ self.assertEqual(self.logger.get_lines_for_level('error'), [
+ 'Error talking to memcached: 1.2.3.4:11211: '
+ '[Errno 32] Broken pipe',
+ ])
+
def test_error_limiting_custom_config(self):
def do_calls(time_step, num_calls, **memcache_kwargs):
self.logger.clear()
@@ -632,8 +679,11 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
- ] * 11 + [
- 'Error limiting server 1.2.3.5:11211'
+ ] * 10 + [
+ 'Error talking to memcached: 1.2.3.5:11211: '
+ '[Errno 32] Broken pipe',
+ 'Error limiting server 1.2.3.5:11211',
+ 'All memcached servers error-limited',
])
# with default error_limit_time of 60, one call per 6 secs, error limit
@@ -650,8 +700,11 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
- ] * 11 + [
- 'Error limiting server 1.2.3.5:11211'
+ ] * 10 + [
+ 'Error talking to memcached: 1.2.3.5:11211: '
+ '[Errno 32] Broken pipe',
+ 'Error limiting server 1.2.3.5:11211',
+ 'All memcached servers error-limited',
])
# with error_limit_time of 70, one call per 6 secs, error_limit_count
@@ -660,8 +713,11 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
- ] * 12 + [
- 'Error limiting server 1.2.3.5:11211'
+ ] * 11 + [
+ 'Error talking to memcached: 1.2.3.5:11211: '
+ '[Errno 32] Broken pipe',
+ 'Error limiting server 1.2.3.5:11211',
+ 'All memcached servers error-limited',
])
def test_delete(self):
diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py
index 27e888e1f..ac73465da 100644
--- a/test/unit/proxy/controllers/test_container.py
+++ b/test/unit/proxy/controllers/test_container.py
@@ -2148,7 +2148,7 @@ class TestContainerController(TestRingBase):
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c'),
+ mock.call.get('shard-listing/a/c', raise_on_error=True),
mock.call.set('shard-listing/a/c', self.sr_dicts,
time=exp_recheck_listing),
# Since there was a backend request, we go ahead and cache
@@ -2177,7 +2177,7 @@ class TestContainerController(TestRingBase):
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c')],
+ mock.call.get('shard-listing/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
@@ -2237,7 +2237,7 @@ class TestContainerController(TestRingBase):
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c')],
+ mock.call.get('shard-listing/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
@@ -2390,7 +2390,7 @@ class TestContainerController(TestRingBase):
# deleted from cache
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c'),
+ mock.call.get('shard-listing/a/c', raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=6.0)],
self.memcache.calls)
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
@@ -2400,6 +2400,39 @@ class TestContainerController(TestRingBase):
'container.shard_listing.backend.404': 1},
self.logger.get_increment_counts())
+ def test_GET_shard_ranges_read_from_cache_error(self):
+ self._setup_shard_range_stubs()
+ self.memcache = FakeMemcache()
+ self.memcache.delete_all()
+ self.logger.clear()
+ info = headers_to_container_info(self.root_resp_hdrs)
+ info['status'] = 200
+ info['sharding_state'] = 'sharded'
+ self.memcache.set('container/a/c', info)
+ self.memcache.clear_calls()
+ self.memcache.error_on_get = [False, True]
+
+ req = self._build_request({'X-Backend-Record-Type': 'shard'},
+ {'states': 'listing'}, {})
+ backend_req, resp = self._capture_backend_request(
+ req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS)
+ self._check_backend_req(
+ req, backend_req,
+ extra_hdrs={'X-Backend-Record-Type': 'shard',
+ 'X-Backend-Override-Shard-Name-Filter': 'sharded'})
+ self.assertNotIn('X-Backend-Cached-Results', resp.headers)
+ self.assertEqual(
+ [mock.call.get('container/a/c'),
+ mock.call.get('shard-listing/a/c', raise_on_error=True),
+ mock.call.set('container/a/c', mock.ANY, time=6.0)],
+ self.memcache.calls)
+ self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
+ self.assertEqual(b'', resp.body)
+ self.assertEqual(404, resp.status_int)
+ self.assertEqual({'container.shard_listing.cache.error': 1,
+ 'container.shard_listing.backend.404': 1},
+ self.logger.get_increment_counts())
+
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
# pre-warm cache with container metadata and shard ranges and verify
# that shard range listing are read from cache when appropriate
@@ -2417,7 +2450,7 @@ class TestContainerController(TestRingBase):
resp = req.get_response(self.app)
self.assertEqual(
[mock.call.get('container/a/c'),
- mock.call.get('shard-listing/a/c')],
+ mock.call.get('shard-listing/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertEqual({'container.shard_listing.cache.hit': 1},
self.logger.get_increment_counts())
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 5158a09cc..a03e8b43e 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -4413,6 +4413,23 @@ class TestReplicatedObjectController(
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
+ # shard lookup in memcache may error...
+ req = Request.blank(
+ '/v1/a/c/o', {'swift.cache': cache},
+ method=method, body='', headers={'Content-Type': 'text/plain'})
+ cache.error_on_get = [False, True]
+ with mock.patch('random.random', return_value=1.0), \
+ mocked_http_conn(*status_codes, headers=resp_headers,
+ body=body):
+ resp = req.get_response(self.app)
+
+ self.assertEqual(resp.status_int, 202)
+ stats = self.app.logger.get_increment_counts()
+ self.assertEqual({'object.shard_updating.cache.skip': 1,
+ 'object.shard_updating.cache.hit': 1,
+ 'object.shard_updating.cache.error': 1,
+ 'object.shard_updating.backend.200': 2}, stats)
+
do_test('POST', 'sharding')
do_test('POST', 'sharded')
do_test('DELETE', 'sharding')