diff options
author | Jianjian Huo <jhuo@nvidia.com> | 2023-04-19 15:54:50 -0700 |
---|---|---|
committer | Jianjian Huo <jhuo@nvidia.com> | 2023-05-01 11:03:34 -0700 |
commit | 9fb860880d755291377c28702679c4d5deb55a6f (patch) | |
tree | 93b062ff19ad7eb4da39043b4b68b4367a7b3ad7 | |
parent | f99a6e5762896c7789d168bc49d8cdcb47903264 (diff) | |
download | swift-9fb860880d755291377c28702679c4d5deb55a6f.tar.gz |
memcached: log user provided keys in exception error logging.
User provided keys are need to debug those tracebacks/timeouts when
clients talking to memcached, in order to associate those failures
with specific memcache usages within swift services.
Change-Id: I07491bb4ebc3baa13cf09f64a04a61011d561409
-rw-r--r-- | swift/common/memcached.py | 114 | ||||
-rw-r--r-- | test/unit/common/test_memcached.py | 87 |
2 files changed, 133 insertions, 68 deletions
diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 74ec8efc7..22ec81c71 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -117,6 +117,13 @@ def set_msg(key, flags, timeout, value): ]) + (b'\r\n' + value + b'\r\n') +# get the prefix of a user provided memcache key by removing the content after +# the last '/', all current usages within swift are using prefix, such as +# "shard-updating-v2", "nvratelimit" and etc. +def get_key_prefix(key): + return key.rsplit('/', 1)[0] + + class MemcacheConnectionError(Exception): pass @@ -216,18 +223,24 @@ class MemcacheRing(object): def memcache_servers(self): return list(self._client_cache.keys()) - def _exception_occurred(self, server, e, action='talking', + def _exception_occurred(self, server, e, key_prefix, action='talking', sock=None, fp=None, got_connection=True): if isinstance(e, Timeout): - self.logger.error("Timeout %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.error( + "Timeout %(action)s to memcached: %(server)s" + ": with key_prefix %(key_prefix)s", + {'action': action, 'server': server, 'key_prefix': key_prefix}) elif isinstance(e, (socket.error, MemcacheConnectionError)): self.logger.error( - "Error %(action)s to memcached: %(server)s: %(err)s", - {'action': action, 'server': server, 'err': e}) + "Error %(action)s to memcached: %(server)s: " + "with key_prefix %(key_prefix)s: %(err)s", + {'action': action, 'server': server, 'err': e, + 'key_prefix': key_prefix}) else: - self.logger.exception("Error %(action)s to memcached: %(server)s", - {'action': action, 'server': server}) + self.logger.exception("Error %(action)s to memcached: %(server)s" + ": with key_prefix %(key_prefix)s", + {'action': action, 'server': server, + 'key_prefix': key_prefix}) try: if fp: fp.close() @@ -257,14 +270,17 @@ class MemcacheRing(object): self._error_limited[server] = now + self._error_limit_duration self.logger.error('Error limiting server %s', server) - def _get_conns(self, key): + def _get_conns(self, key_prefix, hash_key): """ Retrieves a server conn from the pool, or connects a new one. Chooses the server based on a consistent hash of "key". + :param key_prefix: the prefix of user provided key. + :param hash_key: the consistent hash of user key, or server key for + set_multi and get_multi. :return: generator to serve memcached connection """ - pos = bisect(self._sorted, key) + pos = bisect(self._sorted, hash_key) served = [] any_yielded = False while len(served) < self._tries: @@ -283,14 +299,14 @@ class MemcacheRing(object): yield server, fp, sock except MemcachePoolTimeout as e: self._exception_occurred( - server, e, action='getting a connection', + server, e, key_prefix, action='getting a connection', got_connection=False) except (Exception, Timeout) as e: # Typically a Timeout exception caught here is the one raised # by the create() method of this server's MemcacheConnPool # object. self._exception_occurred( - server, e, action='connecting', sock=sock) + server, e, key_prefix, action='connecting', sock=sock) if not any_yielded: self.logger.error('All memcached servers error-limited') @@ -318,7 +334,8 @@ class MemcacheRing(object): :param raise_on_error: if True, propagate Timeouts and other errors. By default, errors are ignored. """ - key = md5hash(key) + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) timeout = sanitize_timeout(time) flags = 0 if serialize: @@ -329,10 +346,10 @@ class MemcacheRing(object): elif not isinstance(value, bytes): value = str(value).encode('utf-8') - for (server, fp, sock) in self._get_conns(key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): - sock.sendall(set_msg(key, flags, timeout, value)) + sock.sendall(set_msg(hash_key, flags, timeout, value)) # Wait for the set to complete msg = fp.readline().strip() if msg != b'STORED': @@ -352,7 +369,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) if raise_on_error: raise MemcacheConnectionError( "No memcached connections succeeded.") @@ -368,19 +386,20 @@ class MemcacheRing(object): By default, errors are treated as cache misses. :returns: value of the key in memcache """ - key = md5hash(key) + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) value = None - for (server, fp, sock) in self._get_conns(key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): - sock.sendall(b'get ' + key + b'\r\n') + sock.sendall(b'get ' + hash_key + b'\r\n') line = fp.readline().strip().split() while True: if not line: raise MemcacheConnectionError('incomplete read') if line[0].upper() == b'END': break - if line[0].upper() == b'VALUE' and line[1] == key: + if line[0].upper() == b'VALUE' and line[1] == hash_key: size = int(line[3]) value = fp.read(size) if int(line[2]) & PICKLE_FLAG: @@ -392,7 +411,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return value except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) if raise_on_error: raise MemcacheConnectionError( "No memcached connections succeeded.") @@ -415,17 +435,18 @@ class MemcacheRing(object): :returns: result of incrementing :raises MemcacheConnectionError: """ - key = md5hash(key) + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) command = b'incr' if delta < 0: command = b'decr' delta = str(abs(int(delta))).encode('ascii') timeout = sanitize_timeout(time) - for (server, fp, sock) in self._get_conns(key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): sock.sendall(b' '.join([ - command, key, delta]) + b'\r\n') + command, hash_key, delta]) + b'\r\n') line = fp.readline().strip().split() if not line: raise MemcacheConnectionError('incomplete read') @@ -433,14 +454,16 @@ class MemcacheRing(object): add_val = delta if command == b'decr': add_val = b'0' - sock.sendall(b' '.join([ - b'add', key, b'0', str(timeout).encode('ascii'), - str(len(add_val)).encode('ascii') - ]) + b'\r\n' + add_val + b'\r\n') + sock.sendall( + b' '.join( + [b'add', hash_key, b'0', str(timeout).encode( + 'ascii'), + str(len(add_val)).encode('ascii') + ]) + b'\r\n' + add_val + b'\r\n') line = fp.readline().strip().split() if line[0].upper() == b'NOT_STORED': sock.sendall(b' '.join([ - command, key, delta]) + b'\r\n') + command, hash_key, delta]) + b'\r\n') line = fp.readline().strip().split() ret = int(line[0].strip()) else: @@ -450,7 +473,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return ret except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) raise MemcacheConnectionError("No Memcached connections succeeded.") @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW) @@ -478,18 +502,20 @@ class MemcacheRing(object): :param server_key: key to use in determining which server in the ring is used """ - key = md5hash(key) - server_key = md5hash(server_key) if server_key else key - for (server, fp, sock) in self._get_conns(server_key): + key_prefix = get_key_prefix(key) + hash_key = md5hash(key) + server_key = md5hash(server_key) if server_key else hash_key + for (server, fp, sock) in self._get_conns(key_prefix, server_key): try: with Timeout(self._io_timeout): - sock.sendall(b'delete ' + key + b'\r\n') + sock.sendall(b'delete ' + hash_key + b'\r\n') # Wait for the delete to complete fp.readline() self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) def set_multi(self, mapping, server_key, serialize=True, time=0, @@ -508,7 +534,8 @@ class MemcacheRing(object): python-memcached interface. This implementation ignores it """ - server_key = md5hash(server_key) + key_prefix = get_key_prefix(server_key) + hash_key = md5hash(server_key) timeout = sanitize_timeout(time) msg = [] for key, value in mapping.items(): @@ -520,7 +547,7 @@ class MemcacheRing(object): value = json.dumps(value).encode('ascii') flags |= JSON_FLAG msg.append(set_msg(key, flags, timeout, value)) - for (server, fp, sock) in self._get_conns(server_key): + for (server, fp, sock) in self._get_conns(key_prefix, hash_key): try: with Timeout(self._io_timeout): sock.sendall(b''.join(msg)) @@ -530,7 +557,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) def get_multi(self, keys, server_key): @@ -542,12 +570,13 @@ class MemcacheRing(object): is used :returns: list of values """ + key_prefix = get_key_prefix(server_key) server_key = md5hash(server_key) - keys = [md5hash(key) for key in keys] - for (server, fp, sock) in self._get_conns(server_key): + hash_keys = [md5hash(key) for key in keys] + for (server, fp, sock) in self._get_conns(key_prefix, server_key): try: with Timeout(self._io_timeout): - sock.sendall(b'get ' + b' '.join(keys) + b'\r\n') + sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n') line = fp.readline().strip().split() responses = {} while True: @@ -566,7 +595,7 @@ class MemcacheRing(object): fp.readline() line = fp.readline().strip().split() values = [] - for key in keys: + for key in hash_keys: if key in responses: values.append(responses[key]) else: @@ -574,7 +603,8 @@ class MemcacheRing(object): self._return_conn(server, fp, sock) return values except (Exception, Timeout) as e: - self._exception_occurred(server, e, sock=sock, fp=fp) + self._exception_occurred( + server, e, key_prefix, sock=sock, fp=fp) def load_memcache(conf, logger): diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index ace7f3008..7de13ff10 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -34,7 +34,8 @@ from eventlet.pools import Pool from eventlet.green import ssl from swift.common import memcached -from swift.common.memcached import MemcacheConnectionError +from swift.common.memcached import MemcacheConnectionError, md5hash, \ + get_key_prefix from swift.common.utils import md5, human_readable from mock import patch, MagicMock from test.debug_logger import debug_logger @@ -200,6 +201,21 @@ class TestMemcached(unittest.TestCase): def setUp(self): self.logger = debug_logger() + def test_get_key_prefix(self): + self.assertEqual( + get_key_prefix("shard-updating-v2/a/c"), + "shard-updating-v2/a") + self.assertEqual( + get_key_prefix("shard-listing-v2/accout/container3"), + "shard-listing-v2/accout") + self.assertEqual( + get_key_prefix("auth_reseller_name/token/X58E34EL2SDFLEY3"), + "auth_reseller_name/token") + self.assertEqual( + get_key_prefix("nvratelimit/v2/wf/2345392374"), + "nvratelimit/v2/wf") + self.assertEqual(get_key_prefix("some_key"), "some_key") + def test_logger_kwarg(self): server_socket = '%s:%s' % ('[::1]', 11211) client = memcached.MemcacheRing([server_socket]) @@ -219,7 +235,7 @@ class TestMemcached(unittest.TestCase): self.assertIs(client._client_cache[server]._tls_context, context) key = uuid4().hex.encode('ascii') - list(client._get_conns(key)) + list(client._get_conns('test', key)) context.wrap_socket.assert_called_once() def test_get_conns(self): @@ -241,7 +257,7 @@ class TestMemcached(unittest.TestCase): one = two = True while one or two: # Run until we match hosts one and two key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns(key): + for conn in memcache_client._get_conns('test', key): if 'b' not in getattr(conn[1], 'mode', ''): self.assertIsInstance(conn[1], ( io.RawIOBase, io.BufferedIOBase)) @@ -268,7 +284,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_socket], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns(key): + for conn in memcache_client._get_conns('test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) self.assertEqual(peer_socket, server_socket) @@ -290,7 +306,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_host], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns(key): + for conn in memcache_client._get_conns('test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) self.assertEqual(peer_socket, server_socket) @@ -319,7 +335,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_socket], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns(key): + for conn in memcache_client._get_conns('test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '%s:%s' % (peer_sockaddr[0], peer_sockaddr[1]) @@ -345,7 +361,7 @@ class TestMemcached(unittest.TestCase): memcache_client = memcached.MemcacheRing([server_socket], logger=self.logger) key = uuid4().hex.encode('ascii') - for conn in memcache_client._get_conns(key): + for conn in memcache_client._get_conns('test', key): peer_sockaddr = conn[2].getpeername() peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) @@ -539,7 +555,7 @@ class TestMemcached(unittest.TestCase): self.assertEqual(mock1.exploded, True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ]) self.logger.clear() @@ -548,7 +564,7 @@ class TestMemcached(unittest.TestCase): self.assertEqual(mock1.exploded, True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ]) # Check that we really did call create() twice self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks, @@ -571,7 +587,7 @@ class TestMemcached(unittest.TestCase): # to .4 self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 11 + [ 'Error limiting server 1.2.3.5:11211' ]) @@ -583,10 +599,10 @@ class TestMemcached(unittest.TestCase): # as we keep going, eventually .4 gets error limited, too self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 10 + [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', 'Error limiting server 1.2.3.4:11211', 'All memcached servers error-limited', ]) @@ -619,7 +635,7 @@ class TestMemcached(unittest.TestCase): # to .4 self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 20) def test_error_raising(self): @@ -634,7 +650,7 @@ class TestMemcached(unittest.TestCase): memcache_client.set('some_key', [1, 2, 3], raise_on_error=True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ]) self.logger.clear() @@ -642,7 +658,17 @@ class TestMemcached(unittest.TestCase): memcache_client.get('some_key', raise_on_error=True) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', + ]) + self.logger.clear() + + with self.assertRaises(MemcacheConnectionError): + memcache_client.set( + 'shard-updating-v2/acc/container', [1, 2, 3], + raise_on_error=True) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe', ]) self.logger.clear() @@ -650,14 +676,21 @@ class TestMemcached(unittest.TestCase): memcache_client.set('some_key', [1, 2, 3]) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ]) self.logger.clear() memcache_client.get('some_key') self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', + ]) + self.logger.clear() + + memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3]) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + 'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe', ]) def test_error_limiting_custom_config(self): @@ -680,10 +713,10 @@ class TestMemcached(unittest.TestCase): do_calls(5, 12) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 10 + [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', 'All memcached servers error-limited', ]) @@ -693,7 +726,7 @@ class TestMemcached(unittest.TestCase): do_calls(6, 20) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 20) # with error_limit_time of 66, one call per 6 secs, twelfth one @@ -701,10 +734,10 @@ class TestMemcached(unittest.TestCase): do_calls(6, 12, error_limit_time=66) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 10 + [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', 'All memcached servers error-limited', ]) @@ -714,10 +747,10 @@ class TestMemcached(unittest.TestCase): do_calls(6, 13, error_limit_time=70, error_limit_count=11) self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', ] * 11 + [ 'Error talking to memcached: 1.2.3.5:11211: ' - '[Errno 32] Broken pipe', + 'with key_prefix some_key: [Errno 32] Broken pipe', 'Error limiting server 1.2.3.5:11211', 'All memcached servers error-limited', ]) @@ -953,7 +986,8 @@ class TestMemcached(unittest.TestCase): self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8) self.assertEqual( self.logger.get_lines_for_level('error'), - ['Timeout getting a connection to memcached: 1.2.3.5:11211'] * 8) + ['Timeout getting a connection to memcached: 1.2.3.5:11211' + ': with key_prefix key'] * 8) self.assertEqual(served['1.2.3.5'], 2) self.assertEqual(pending['1.2.3.4'], 0) self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0) @@ -992,7 +1026,8 @@ class TestMemcached(unittest.TestCase): # try to get connect and no connection found # so it will result in StopIteration - conn_generator = memcache_client._get_conns(b'key') + conn_generator = memcache_client._get_conns( + 'key', md5hash(b'key')) with self.assertRaises(StopIteration): next(conn_generator) |