summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorelij <elij@wundrd.net>2014-08-24 12:37:25 -0700
committerelij <elij@wundrd.net>2014-08-24 13:02:17 -0700
commitb921f68a8a0040ed5b764aa1ce14f1174806119e (patch)
tree4be0dd913cf29bcb95f70d46976c7afd51c8657c
parent2251916a6a1e8f35609d8d963f68ccab8c1b9402 (diff)
downloadpython-memcached-b921f68a8a0040ed5b764aa1ce14f1174806119e.tar.gz
add support for noreply
the memcache protocol defines a 'noreply' optional parameter, which instructs the server to not send a reply. In heavy usage environments this can lead to significant performance improvements.
-rw-r--r--memcache.py149
1 files changed, 108 insertions, 41 deletions
diff --git a/memcache.py b/memcache.py
index 9bc6168..662b98f 100644
--- a/memcache.py
+++ b/memcache.py
@@ -399,7 +399,7 @@ class Client(threading.local):
for s in self.servers:
s.close_socket()
- def delete_multi(self, keys, time=0, key_prefix=''):
+ def delete_multi(self, keys, time=0, key_prefix='', noreply=False):
"""Delete multiple keys in the memcache doing just one query.
>>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'})
@@ -420,6 +420,8 @@ class Client(threading.local):
@param key_prefix: Optional string to prepend to each key when
sending to memcache. See docs for L{get_multi} and
L{set_multi}.
+ @param noreply: optional parameter instructs the server to not send the
+ reply.
@return: 1 if no failure in communication with any memcacheds.
@rtype: int
"""
@@ -436,12 +438,13 @@ class Client(threading.local):
for server in six.iterkeys(server_keys):
bigcmd = []
write = bigcmd.append
+ extra = ' noreply' if noreply else ''
if time is not None:
for key in server_keys[server]: # These are mangled keys
- write("delete %s %d\r\n" % (key, time))
+ write("delete %s %d%s\r\n" % (key, time, extra))
else:
for key in server_keys[server]: # These are mangled keys
- write("delete %s\r\n" % key)
+ write("delete %s%s\r\n" % (key, extra))
try:
server.send_cmds(''.join(bigcmd))
except socket.error as msg:
@@ -451,6 +454,10 @@ class Client(threading.local):
server.mark_dead(msg)
dead_servers.append(server)
+ # if noreply, just return
+ if noreply:
+ return rc
+
# if any servers died on the way, don't expect them to respond.
for server in dead_servers:
del server_keys[server]
@@ -466,17 +473,20 @@ class Client(threading.local):
rc = 0
return rc
- def delete(self, key, time=0):
+ def delete(self, key, time=0, noreply=False):
'''Deletes a key from the memcache.
@return: Nonzero on success.
@param time: number of seconds any subsequent set / update commands
should fail. Defaults to None for no delay.
+ @param noreply: optional parameter instructs the server to not send the
+ reply.
@rtype: int
'''
- return self._deletetouch(['DELETED', 'NOT_FOUND'], "delete", key, time)
+ return self._deletetouch(['DELETED', 'NOT_FOUND'], "delete", key,
+ time, noreply)
- def touch(self, key, time=0):
+ def touch(self, key, time=0, noreply=False):
'''Updates the expiration time of a key in memcache.
@return: Nonzero on success.
@@ -485,24 +495,29 @@ class Client(threading.local):
unix time-since-the-epoch value. See the memcached protocol
docs section "Storage Commands" for more info on <exptime>. We
default to 0 == cache forever.
+ @param noreply: optional parameter instructs the server to not send the
+ reply.
@rtype: int
'''
- return self._deletetouch(['TOUCHED'], "touch", key, time)
+ return self._deletetouch(['TOUCHED'], "touch", key, time, noreply)
- def _deletetouch(self, expected, cmd, key, time=0):
+ def _deletetouch(self, expected, cmd, key, time=0, noreply=False):
if self.do_check_key:
self.check_key(key)
server, key = self._get_server(key)
if not server:
return 0
self._statlog(cmd)
+ extra = ' noreply' if noreply else ''
if time is not None and time != 0:
- cmd = "%s %s %d" % (cmd, key, time)
+ cmd = "%s %s %d%s" % (cmd, key, time, extra)
else:
- cmd = "%s %s" % (cmd, key)
+ cmd = "%s %s%s" % (cmd, key, extra)
try:
server.send_cmd(cmd)
+ if noreply:
+ return 1
line = server.readline()
if line and line.strip() in expected:
return 1
@@ -514,7 +529,7 @@ class Client(threading.local):
server.mark_dead(msg)
return 0
- def incr(self, key, delta=1):
+ def incr(self, key, delta=1, noreply=False):
"""Increment value for C{key} by C{delta}
Sends a command to the server to atomically increment the
@@ -539,12 +554,15 @@ class Client(threading.local):
@param delta: Integer amount to increment by (should be zero
or greater).
- @return: New value after incrementing.
+ @param noreply: optional parameter instructs the server to not send the
+ reply.
+
+ @return: New value after incrementing, no None for noreply or error.
@rtype: int
"""
- return self._incrdecr("incr", key, delta)
+ return self._incrdecr("incr", key, delta, noreply)
- def decr(self, key, delta=1):
+ def decr(self, key, delta=1, noreply=False):
"""Decrement value for C{key} by C{delta}
Like L{incr}, but decrements. Unlike L{incr}, underflow is
@@ -554,21 +572,27 @@ class Client(threading.local):
@param delta: Integer amount to decrement by (should be zero
or greater).
- @return: New value after decrementing or None on error.
+ @param noreply: optional parameter instructs the server to not send the
+ reply.
+
+ @return: New value after decrementing, or None for noreply or error.
@rtype: int
"""
- return self._incrdecr("decr", key, delta)
+ return self._incrdecr("decr", key, delta, noreply)
- def _incrdecr(self, cmd, key, delta):
+ def _incrdecr(self, cmd, key, delta, noreply=False):
if self.do_check_key:
self.check_key(key)
server, key = self._get_server(key)
if not server:
return None
self._statlog(cmd)
- cmd = "%s %s %d" % (cmd, key, delta)
+ extra = ' noreply' if noreply else ''
+ cmd = "%s %s %d%s" % (cmd, key, delta, extra)
try:
server.send_cmd(cmd)
+ if noreply:
+ return
line = server.readline()
if line is None or line.strip() == 'NOT_FOUND':
return None
@@ -579,7 +603,7 @@ class Client(threading.local):
server.mark_dead(msg)
return None
- def add(self, key, val, time=0, min_compress_len=0):
+ def add(self, key, val, time=0, min_compress_len=0, noreply=False):
'''Add new key with value.
Like L{set}, but only stores in memcache if the key doesn't
@@ -588,9 +612,9 @@ class Client(threading.local):
@return: Nonzero on success.
@rtype: int
'''
- return self._set("add", key, val, time, min_compress_len)
+ return self._set("add", key, val, time, min_compress_len, noreply)
- def append(self, key, val, time=0, min_compress_len=0):
+ def append(self, key, val, time=0, min_compress_len=0, noreply=False):
'''Append the value to the end of the existing key's value.
Only stores in memcache if key already exists.
@@ -599,9 +623,9 @@ class Client(threading.local):
@return: Nonzero on success.
@rtype: int
'''
- return self._set("append", key, val, time, min_compress_len)
+ return self._set("append", key, val, time, min_compress_len, noreply)
- def prepend(self, key, val, time=0, min_compress_len=0):
+ def prepend(self, key, val, time=0, min_compress_len=0, noreply=False):
'''Prepend the value to the beginning of the existing key's value.
Only stores in memcache if key already exists.
@@ -610,9 +634,9 @@ class Client(threading.local):
@return: Nonzero on success.
@rtype: int
'''
- return self._set("prepend", key, val, time, min_compress_len)
+ return self._set("prepend", key, val, time, min_compress_len, noreply)
- def replace(self, key, val, time=0, min_compress_len=0):
+ def replace(self, key, val, time=0, min_compress_len=0, noreply=False):
'''Replace existing key with value.
Like L{set}, but only stores in memcache if the key already exists.
@@ -621,9 +645,9 @@ class Client(threading.local):
@return: Nonzero on success.
@rtype: int
'''
- return self._set("replace", key, val, time, min_compress_len)
+ return self._set("replace", key, val, time, min_compress_len, noreply)
- def set(self, key, val, time=0, min_compress_len=0):
+ def set(self, key, val, time=0, min_compress_len=0, noreply=False):
'''Unconditionally sets a key to a given value in the memcache.
The C{key} can optionally be an tuple, with the first element
@@ -652,10 +676,12 @@ class Client(threading.local):
compatability, this parameter defaults to 0, indicating don't
ever try to compress.
+ @param noreply: optional parameter instructs the server to not
+ send the reply.
'''
- return self._set("set", key, val, time, min_compress_len)
+ return self._set("set", key, val, time, min_compress_len, noreply)
- def cas(self, key, val, time=0, min_compress_len=0):
+ def cas(self, key, val, time=0, min_compress_len=0, noreply=False):
'''Check and set (CAS)
Sets a key to a given value in the memcache if it hasn't been
@@ -686,8 +712,11 @@ class Client(threading.local):
than the input, then it is discarded. For backwards
compatability, this parameter defaults to 0, indicating don't
ever try to compress.
+
+ @param noreply: optional parameter instructs the server to not
+ send the reply.
'''
- return self._set("cas", key, val, time, min_compress_len)
+ return self._set("cas", key, val, time, min_compress_len, noreply)
def _map_and_prefix_keys(self, key_iterable, key_prefix):
"""Compute the mapping of server (_Host instance) -> list of keys to
@@ -734,7 +763,8 @@ class Client(threading.local):
return (server_keys, prefixed_to_orig_key)
- def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0):
+ def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0,
+ noreply=False):
'''Sets multiple keys in the memcache doing just one query.
>>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'})
@@ -786,6 +816,9 @@ class Client(threading.local):
backwards compatability, this parameter defaults to 0,
indicating don't ever try to compress.
+ @param noreply: optional parameter instructs the server to not
+ send the reply.
+
@return: List of keys which failed to be stored [ memcache out
of memory, etc. ].
@@ -800,6 +833,7 @@ class Client(threading.local):
dead_servers = []
notstored = [] # original keys.
+ extra = ' noreply' if noreply else ''
for server in six.iterkeys(server_keys):
bigcmd = []
write = bigcmd.append
@@ -809,11 +843,12 @@ class Client(threading.local):
mapping[prefixed_to_orig_key[key]],
min_compress_len)
if store_info:
- msg = "set %s %d %d %d\r\n%s\r\n"
+ msg = "set %s %d %d %d%s\r\n%s\r\n"
write(msg % (key,
store_info[0],
time,
store_info[1],
+ extra,
store_info[2]))
else:
notstored.append(prefixed_to_orig_key[key])
@@ -824,6 +859,10 @@ class Client(threading.local):
server.mark_dead(msg)
dead_servers.append(server)
+ # if noreply, just return early
+ if noreply:
+ return notstored
+
# if any servers died on the way, don't expect them to respond.
for server in dead_servers:
del server_keys[server]
@@ -896,7 +935,7 @@ class Client(threading.local):
return (flags, len(val), val)
- def _set(self, cmd, key, val, time, min_compress_len=0):
+ def _set(self, cmd, key, val, time, min_compress_len=0, noreply=False):
if self.do_check_key:
self.check_key(key)
server, key = self._get_server(key)
@@ -910,20 +949,24 @@ class Client(threading.local):
if not store_info:
return(0)
+ extra = ' noreply' if noreply else ''
if cmd == 'cas':
if key not in self.cas_ids:
- return self._set('set', key, val, time, min_compress_len)
+ return self._set('set', key, val, time, min_compress_len,
+ noreply)
fullcmd = "%s %s %d %d %d %d\r\n%s" % (
cmd, key, store_info[0], time, store_info[1],
self.cas_ids[key], store_info[2])
else:
- fullcmd = "%s %s %d %d %d\r\n%s" % (
+ fullcmd = "%s %s %d %d %d%s\r\n%s" % (
cmd, key, store_info[0],
- time, store_info[1], store_info[2]
+ time, store_info[1], extra, store_info[2]
)
try:
server.send_cmd(fullcmd)
+ if noreply:
+ return True
return(server.expect("STORED", raise_exception=True)
== "STORED")
except socket.error as msg:
@@ -1393,11 +1436,11 @@ if __name__ == "__main__":
return "%s (%s)" % (val, type(val))
return "%s" % val
- def test_setget(key, val):
+ def test_setget(key, val, noreply=False):
global failures
- print("Testing set/get {'%s': %s} ..."
- % (to_s(key), to_s(val)), end=" ")
- mc.set(key, val)
+ print("Testing set/get (noreply=%s) {'%s': %s} ..."
+ % (noreply, to_s(key), to_s(val)), end=" ")
+ mc.set(key, val, noreply=noreply)
newval = mc.get(key)
if newval == val:
print("OK")
@@ -1421,7 +1464,9 @@ if __name__ == "__main__":
return 0
test_setget("a_string", "some random string")
+ test_setget("a_string_2", "some random string", noreply=True)
test_setget("an_integer", 42)
+ test_setget("an_integer_2", 42, noreply=True)
if test_setget("long", long(1 << 30)):
print("Testing delete ...", end=" ")
if mc.delete("long"):
@@ -1436,7 +1481,8 @@ if __name__ == "__main__":
print("FAIL")
failures += 1
print("Testing get_multi ...",)
- print(mc.get_multi(["a_string", "an_integer"]))
+ print(mc.get_multi(["a_string", "an_integer", "a_string_2",
+ "an_integer_2"]))
# removed from the protocol
# if test_setget("timed_delete", 'foo'):
@@ -1458,6 +1504,7 @@ if __name__ == "__main__":
f = FooStruct()
test_setget("foostruct", f)
+ test_setget("foostruct_2", f, noreply=True)
print("Testing incr ...", end=" ")
x = mc.incr("an_integer", 1)
@@ -1467,6 +1514,15 @@ if __name__ == "__main__":
print("FAIL")
failures += 1
+ print("Testing incr (noreply=True) ...", end=" ")
+ mc.incr("an_integer_2", 1, noreply=True)
+ x = mc.get("an_integer_2")
+ if x == 43:
+ print("OK")
+ else:
+ print("FAIL")
+ failures += 1
+
print("Testing decr ...", end=" ")
x = mc.decr("an_integer", 1)
if x == 42:
@@ -1476,6 +1532,16 @@ if __name__ == "__main__":
failures += 1
sys.stdout.flush()
+ print("Testing decr (noreply=True) ...", end=" ")
+ mc.decr("an_integer_2", 1, noreply=True)
+ x = mc.get("an_integer_2")
+ if x == 42:
+ print("OK")
+ else:
+ print("FAIL")
+ failures += 1
+ sys.stdout.flush()
+
# sanity tests
print("Testing sending spaces...", end=" ")
sys.stdout.flush()
@@ -1499,6 +1565,7 @@ if __name__ == "__main__":
print("Testing using insanely long key...", end=" ")
try:
x = mc.set('a'*SERVER_MAX_KEY_LENGTH, 1)
+ x = mc.set('a'*SERVER_MAX_KEY_LENGTH, 1, noreply=True)
except Client.MemcachedKeyLengthError as msg:
print("FAIL")
failures += 1