summaryrefslogtreecommitdiff
path: root/swift
diff options
context:
space:
mode:
authorJianjian Huo <jhuo@nvidia.com>2023-04-19 15:54:50 -0700
committerJianjian Huo <jhuo@nvidia.com>2023-05-01 11:03:34 -0700
commit9fb860880d755291377c28702679c4d5deb55a6f (patch)
tree93b062ff19ad7eb4da39043b4b68b4367a7b3ad7 /swift
parentf99a6e5762896c7789d168bc49d8cdcb47903264 (diff)
downloadswift-9fb860880d755291377c28702679c4d5deb55a6f.tar.gz
memcached: log user provided keys in exception error logging.
User provided keys are need to debug those tracebacks/timeouts when clients talking to memcached, in order to associate those failures with specific memcache usages within swift services. Change-Id: I07491bb4ebc3baa13cf09f64a04a61011d561409
Diffstat (limited to 'swift')
-rw-r--r--swift/common/memcached.py114
1 files changed, 72 insertions, 42 deletions
diff --git a/swift/common/memcached.py b/swift/common/memcached.py
index 74ec8efc7..22ec81c71 100644
--- a/swift/common/memcached.py
+++ b/swift/common/memcached.py
@@ -117,6 +117,13 @@ def set_msg(key, flags, timeout, value):
]) + (b'\r\n' + value + b'\r\n')
+# get the prefix of a user provided memcache key by removing the content after
+# the last '/', all current usages within swift are using prefix, such as
+# "shard-updating-v2", "nvratelimit" and etc.
+def get_key_prefix(key):
+ return key.rsplit('/', 1)[0]
+
+
class MemcacheConnectionError(Exception):
pass
@@ -216,18 +223,24 @@ class MemcacheRing(object):
def memcache_servers(self):
return list(self._client_cache.keys())
- def _exception_occurred(self, server, e, action='talking',
+ def _exception_occurred(self, server, e, key_prefix, action='talking',
sock=None, fp=None, got_connection=True):
if isinstance(e, Timeout):
- self.logger.error("Timeout %(action)s to memcached: %(server)s",
- {'action': action, 'server': server})
+ self.logger.error(
+ "Timeout %(action)s to memcached: %(server)s"
+ ": with key_prefix %(key_prefix)s",
+ {'action': action, 'server': server, 'key_prefix': key_prefix})
elif isinstance(e, (socket.error, MemcacheConnectionError)):
self.logger.error(
- "Error %(action)s to memcached: %(server)s: %(err)s",
- {'action': action, 'server': server, 'err': e})
+ "Error %(action)s to memcached: %(server)s: "
+ "with key_prefix %(key_prefix)s: %(err)s",
+ {'action': action, 'server': server, 'err': e,
+ 'key_prefix': key_prefix})
else:
- self.logger.exception("Error %(action)s to memcached: %(server)s",
- {'action': action, 'server': server})
+ self.logger.exception("Error %(action)s to memcached: %(server)s"
+ ": with key_prefix %(key_prefix)s",
+ {'action': action, 'server': server,
+ 'key_prefix': key_prefix})
try:
if fp:
fp.close()
@@ -257,14 +270,17 @@ class MemcacheRing(object):
self._error_limited[server] = now + self._error_limit_duration
self.logger.error('Error limiting server %s', server)
- def _get_conns(self, key):
+ def _get_conns(self, key_prefix, hash_key):
"""
Retrieves a server conn from the pool, or connects a new one.
Chooses the server based on a consistent hash of "key".
+ :param key_prefix: the prefix of user provided key.
+ :param hash_key: the consistent hash of user key, or server key for
+ set_multi and get_multi.
:return: generator to serve memcached connection
"""
- pos = bisect(self._sorted, key)
+ pos = bisect(self._sorted, hash_key)
served = []
any_yielded = False
while len(served) < self._tries:
@@ -283,14 +299,14 @@ class MemcacheRing(object):
yield server, fp, sock
except MemcachePoolTimeout as e:
self._exception_occurred(
- server, e, action='getting a connection',
+ server, e, key_prefix, action='getting a connection',
got_connection=False)
except (Exception, Timeout) as e:
# Typically a Timeout exception caught here is the one raised
# by the create() method of this server's MemcacheConnPool
# object.
self._exception_occurred(
- server, e, action='connecting', sock=sock)
+ server, e, key_prefix, action='connecting', sock=sock)
if not any_yielded:
self.logger.error('All memcached servers error-limited')
@@ -318,7 +334,8 @@ class MemcacheRing(object):
:param raise_on_error: if True, propagate Timeouts and other errors.
By default, errors are ignored.
"""
- key = md5hash(key)
+ key_prefix = get_key_prefix(key)
+ hash_key = md5hash(key)
timeout = sanitize_timeout(time)
flags = 0
if serialize:
@@ -329,10 +346,10 @@ class MemcacheRing(object):
elif not isinstance(value, bytes):
value = str(value).encode('utf-8')
- for (server, fp, sock) in self._get_conns(key):
+ for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try:
with Timeout(self._io_timeout):
- sock.sendall(set_msg(key, flags, timeout, value))
+ sock.sendall(set_msg(hash_key, flags, timeout, value))
# Wait for the set to complete
msg = fp.readline().strip()
if msg != b'STORED':
@@ -352,7 +369,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return
except (Exception, Timeout) as e:
- self._exception_occurred(server, e, sock=sock, fp=fp)
+ self._exception_occurred(
+ server, e, key_prefix, sock=sock, fp=fp)
if raise_on_error:
raise MemcacheConnectionError(
"No memcached connections succeeded.")
@@ -368,19 +386,20 @@ class MemcacheRing(object):
By default, errors are treated as cache misses.
:returns: value of the key in memcache
"""
- key = md5hash(key)
+ key_prefix = get_key_prefix(key)
+ hash_key = md5hash(key)
value = None
- for (server, fp, sock) in self._get_conns(key):
+ for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try:
with Timeout(self._io_timeout):
- sock.sendall(b'get ' + key + b'\r\n')
+ sock.sendall(b'get ' + hash_key + b'\r\n')
line = fp.readline().strip().split()
while True:
if not line:
raise MemcacheConnectionError('incomplete read')
if line[0].upper() == b'END':
break
- if line[0].upper() == b'VALUE' and line[1] == key:
+ if line[0].upper() == b'VALUE' and line[1] == hash_key:
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
@@ -392,7 +411,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return value
except (Exception, Timeout) as e:
- self._exception_occurred(server, e, sock=sock, fp=fp)
+ self._exception_occurred(
+ server, e, key_prefix, sock=sock, fp=fp)
if raise_on_error:
raise MemcacheConnectionError(
"No memcached connections succeeded.")
@@ -415,17 +435,18 @@ class MemcacheRing(object):
:returns: result of incrementing
:raises MemcacheConnectionError:
"""
- key = md5hash(key)
+ key_prefix = get_key_prefix(key)
+ hash_key = md5hash(key)
command = b'incr'
if delta < 0:
command = b'decr'
delta = str(abs(int(delta))).encode('ascii')
timeout = sanitize_timeout(time)
- for (server, fp, sock) in self._get_conns(key):
+ for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try:
with Timeout(self._io_timeout):
sock.sendall(b' '.join([
- command, key, delta]) + b'\r\n')
+ command, hash_key, delta]) + b'\r\n')
line = fp.readline().strip().split()
if not line:
raise MemcacheConnectionError('incomplete read')
@@ -433,14 +454,16 @@ class MemcacheRing(object):
add_val = delta
if command == b'decr':
add_val = b'0'
- sock.sendall(b' '.join([
- b'add', key, b'0', str(timeout).encode('ascii'),
- str(len(add_val)).encode('ascii')
- ]) + b'\r\n' + add_val + b'\r\n')
+ sock.sendall(
+ b' '.join(
+ [b'add', hash_key, b'0', str(timeout).encode(
+ 'ascii'),
+ str(len(add_val)).encode('ascii')
+ ]) + b'\r\n' + add_val + b'\r\n')
line = fp.readline().strip().split()
if line[0].upper() == b'NOT_STORED':
sock.sendall(b' '.join([
- command, key, delta]) + b'\r\n')
+ command, hash_key, delta]) + b'\r\n')
line = fp.readline().strip().split()
ret = int(line[0].strip())
else:
@@ -450,7 +473,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return ret
except (Exception, Timeout) as e:
- self._exception_occurred(server, e, sock=sock, fp=fp)
+ self._exception_occurred(
+ server, e, key_prefix, sock=sock, fp=fp)
raise MemcacheConnectionError("No Memcached connections succeeded.")
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
@@ -478,18 +502,20 @@ class MemcacheRing(object):
:param server_key: key to use in determining which server in the ring
is used
"""
- key = md5hash(key)
- server_key = md5hash(server_key) if server_key else key
- for (server, fp, sock) in self._get_conns(server_key):
+ key_prefix = get_key_prefix(key)
+ hash_key = md5hash(key)
+ server_key = md5hash(server_key) if server_key else hash_key
+ for (server, fp, sock) in self._get_conns(key_prefix, server_key):
try:
with Timeout(self._io_timeout):
- sock.sendall(b'delete ' + key + b'\r\n')
+ sock.sendall(b'delete ' + hash_key + b'\r\n')
# Wait for the delete to complete
fp.readline()
self._return_conn(server, fp, sock)
return
except (Exception, Timeout) as e:
- self._exception_occurred(server, e, sock=sock, fp=fp)
+ self._exception_occurred(
+ server, e, key_prefix, sock=sock, fp=fp)
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
def set_multi(self, mapping, server_key, serialize=True, time=0,
@@ -508,7 +534,8 @@ class MemcacheRing(object):
python-memcached interface. This implementation
ignores it
"""
- server_key = md5hash(server_key)
+ key_prefix = get_key_prefix(server_key)
+ hash_key = md5hash(server_key)
timeout = sanitize_timeout(time)
msg = []
for key, value in mapping.items():
@@ -520,7 +547,7 @@ class MemcacheRing(object):
value = json.dumps(value).encode('ascii')
flags |= JSON_FLAG
msg.append(set_msg(key, flags, timeout, value))
- for (server, fp, sock) in self._get_conns(server_key):
+ for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try:
with Timeout(self._io_timeout):
sock.sendall(b''.join(msg))
@@ -530,7 +557,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return
except (Exception, Timeout) as e:
- self._exception_occurred(server, e, sock=sock, fp=fp)
+ self._exception_occurred(
+ server, e, key_prefix, sock=sock, fp=fp)
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
def get_multi(self, keys, server_key):
@@ -542,12 +570,13 @@ class MemcacheRing(object):
is used
:returns: list of values
"""
+ key_prefix = get_key_prefix(server_key)
server_key = md5hash(server_key)
- keys = [md5hash(key) for key in keys]
- for (server, fp, sock) in self._get_conns(server_key):
+ hash_keys = [md5hash(key) for key in keys]
+ for (server, fp, sock) in self._get_conns(key_prefix, server_key):
try:
with Timeout(self._io_timeout):
- sock.sendall(b'get ' + b' '.join(keys) + b'\r\n')
+ sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n')
line = fp.readline().strip().split()
responses = {}
while True:
@@ -566,7 +595,7 @@ class MemcacheRing(object):
fp.readline()
line = fp.readline().strip().split()
values = []
- for key in keys:
+ for key in hash_keys:
if key in responses:
values.append(responses[key])
else:
@@ -574,7 +603,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock)
return values
except (Exception, Timeout) as e:
- self._exception_occurred(server, e, sock=sock, fp=fp)
+ self._exception_occurred(
+ server, e, key_prefix, sock=sock, fp=fp)
def load_memcache(conf, logger):