From b921f68a8a0040ed5b764aa1ce14f1174806119e Mon Sep 17 00:00:00 2001 From: elij Date: Sun, 24 Aug 2014 12:37:25 -0700 Subject: add support for noreply the memcache protocol defines a 'noreply' optional parameter, which instructs the server to not send a reply. In heavy usage environments this can lead to significant performance improvements. --- memcache.py | 149 +++++++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 108 insertions(+), 41 deletions(-) diff --git a/memcache.py b/memcache.py index 9bc6168..662b98f 100644 --- a/memcache.py +++ b/memcache.py @@ -399,7 +399,7 @@ class Client(threading.local): for s in self.servers: s.close_socket() - def delete_multi(self, keys, time=0, key_prefix=''): + def delete_multi(self, keys, time=0, key_prefix='', noreply=False): """Delete multiple keys in the memcache doing just one query. >>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'}) @@ -420,6 +420,8 @@ class Client(threading.local): @param key_prefix: Optional string to prepend to each key when sending to memcache. See docs for L{get_multi} and L{set_multi}. + @param noreply: optional parameter instructs the server to not send the + reply. @return: 1 if no failure in communication with any memcacheds. @rtype: int """ @@ -436,12 +438,13 @@ class Client(threading.local): for server in six.iterkeys(server_keys): bigcmd = [] write = bigcmd.append + extra = ' noreply' if noreply else '' if time is not None: for key in server_keys[server]: # These are mangled keys - write("delete %s %d\r\n" % (key, time)) + write("delete %s %d%s\r\n" % (key, time, extra)) else: for key in server_keys[server]: # These are mangled keys - write("delete %s\r\n" % key) + write("delete %s%s\r\n" % (key, extra)) try: server.send_cmds(''.join(bigcmd)) except socket.error as msg: @@ -451,6 +454,10 @@ class Client(threading.local): server.mark_dead(msg) dead_servers.append(server) + # if noreply, just return + if noreply: + return rc + # if any servers died on the way, don't expect them to respond. for server in dead_servers: del server_keys[server] @@ -466,17 +473,20 @@ class Client(threading.local): rc = 0 return rc - def delete(self, key, time=0): + def delete(self, key, time=0, noreply=False): '''Deletes a key from the memcache. @return: Nonzero on success. @param time: number of seconds any subsequent set / update commands should fail. Defaults to None for no delay. + @param noreply: optional parameter instructs the server to not send the + reply. @rtype: int ''' - return self._deletetouch(['DELETED', 'NOT_FOUND'], "delete", key, time) + return self._deletetouch(['DELETED', 'NOT_FOUND'], "delete", key, + time, noreply) - def touch(self, key, time=0): + def touch(self, key, time=0, noreply=False): '''Updates the expiration time of a key in memcache. @return: Nonzero on success. @@ -485,24 +495,29 @@ class Client(threading.local): unix time-since-the-epoch value. See the memcached protocol docs section "Storage Commands" for more info on . We default to 0 == cache forever. + @param noreply: optional parameter instructs the server to not send the + reply. @rtype: int ''' - return self._deletetouch(['TOUCHED'], "touch", key, time) + return self._deletetouch(['TOUCHED'], "touch", key, time, noreply) - def _deletetouch(self, expected, cmd, key, time=0): + def _deletetouch(self, expected, cmd, key, time=0, noreply=False): if self.do_check_key: self.check_key(key) server, key = self._get_server(key) if not server: return 0 self._statlog(cmd) + extra = ' noreply' if noreply else '' if time is not None and time != 0: - cmd = "%s %s %d" % (cmd, key, time) + cmd = "%s %s %d%s" % (cmd, key, time, extra) else: - cmd = "%s %s" % (cmd, key) + cmd = "%s %s%s" % (cmd, key, extra) try: server.send_cmd(cmd) + if noreply: + return 1 line = server.readline() if line and line.strip() in expected: return 1 @@ -514,7 +529,7 @@ class Client(threading.local): server.mark_dead(msg) return 0 - def incr(self, key, delta=1): + def incr(self, key, delta=1, noreply=False): """Increment value for C{key} by C{delta} Sends a command to the server to atomically increment the @@ -539,12 +554,15 @@ class Client(threading.local): @param delta: Integer amount to increment by (should be zero or greater). - @return: New value after incrementing. + @param noreply: optional parameter instructs the server to not send the + reply. + + @return: New value after incrementing, no None for noreply or error. @rtype: int """ - return self._incrdecr("incr", key, delta) + return self._incrdecr("incr", key, delta, noreply) - def decr(self, key, delta=1): + def decr(self, key, delta=1, noreply=False): """Decrement value for C{key} by C{delta} Like L{incr}, but decrements. Unlike L{incr}, underflow is @@ -554,21 +572,27 @@ class Client(threading.local): @param delta: Integer amount to decrement by (should be zero or greater). - @return: New value after decrementing or None on error. + @param noreply: optional parameter instructs the server to not send the + reply. + + @return: New value after decrementing, or None for noreply or error. @rtype: int """ - return self._incrdecr("decr", key, delta) + return self._incrdecr("decr", key, delta, noreply) - def _incrdecr(self, cmd, key, delta): + def _incrdecr(self, cmd, key, delta, noreply=False): if self.do_check_key: self.check_key(key) server, key = self._get_server(key) if not server: return None self._statlog(cmd) - cmd = "%s %s %d" % (cmd, key, delta) + extra = ' noreply' if noreply else '' + cmd = "%s %s %d%s" % (cmd, key, delta, extra) try: server.send_cmd(cmd) + if noreply: + return line = server.readline() if line is None or line.strip() == 'NOT_FOUND': return None @@ -579,7 +603,7 @@ class Client(threading.local): server.mark_dead(msg) return None - def add(self, key, val, time=0, min_compress_len=0): + def add(self, key, val, time=0, min_compress_len=0, noreply=False): '''Add new key with value. Like L{set}, but only stores in memcache if the key doesn't @@ -588,9 +612,9 @@ class Client(threading.local): @return: Nonzero on success. @rtype: int ''' - return self._set("add", key, val, time, min_compress_len) + return self._set("add", key, val, time, min_compress_len, noreply) - def append(self, key, val, time=0, min_compress_len=0): + def append(self, key, val, time=0, min_compress_len=0, noreply=False): '''Append the value to the end of the existing key's value. Only stores in memcache if key already exists. @@ -599,9 +623,9 @@ class Client(threading.local): @return: Nonzero on success. @rtype: int ''' - return self._set("append", key, val, time, min_compress_len) + return self._set("append", key, val, time, min_compress_len, noreply) - def prepend(self, key, val, time=0, min_compress_len=0): + def prepend(self, key, val, time=0, min_compress_len=0, noreply=False): '''Prepend the value to the beginning of the existing key's value. Only stores in memcache if key already exists. @@ -610,9 +634,9 @@ class Client(threading.local): @return: Nonzero on success. @rtype: int ''' - return self._set("prepend", key, val, time, min_compress_len) + return self._set("prepend", key, val, time, min_compress_len, noreply) - def replace(self, key, val, time=0, min_compress_len=0): + def replace(self, key, val, time=0, min_compress_len=0, noreply=False): '''Replace existing key with value. Like L{set}, but only stores in memcache if the key already exists. @@ -621,9 +645,9 @@ class Client(threading.local): @return: Nonzero on success. @rtype: int ''' - return self._set("replace", key, val, time, min_compress_len) + return self._set("replace", key, val, time, min_compress_len, noreply) - def set(self, key, val, time=0, min_compress_len=0): + def set(self, key, val, time=0, min_compress_len=0, noreply=False): '''Unconditionally sets a key to a given value in the memcache. The C{key} can optionally be an tuple, with the first element @@ -652,10 +676,12 @@ class Client(threading.local): compatability, this parameter defaults to 0, indicating don't ever try to compress. + @param noreply: optional parameter instructs the server to not + send the reply. ''' - return self._set("set", key, val, time, min_compress_len) + return self._set("set", key, val, time, min_compress_len, noreply) - def cas(self, key, val, time=0, min_compress_len=0): + def cas(self, key, val, time=0, min_compress_len=0, noreply=False): '''Check and set (CAS) Sets a key to a given value in the memcache if it hasn't been @@ -686,8 +712,11 @@ class Client(threading.local): than the input, then it is discarded. For backwards compatability, this parameter defaults to 0, indicating don't ever try to compress. + + @param noreply: optional parameter instructs the server to not + send the reply. ''' - return self._set("cas", key, val, time, min_compress_len) + return self._set("cas", key, val, time, min_compress_len, noreply) def _map_and_prefix_keys(self, key_iterable, key_prefix): """Compute the mapping of server (_Host instance) -> list of keys to @@ -734,7 +763,8 @@ class Client(threading.local): return (server_keys, prefixed_to_orig_key) - def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0): + def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0, + noreply=False): '''Sets multiple keys in the memcache doing just one query. >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'}) @@ -786,6 +816,9 @@ class Client(threading.local): backwards compatability, this parameter defaults to 0, indicating don't ever try to compress. + @param noreply: optional parameter instructs the server to not + send the reply. + @return: List of keys which failed to be stored [ memcache out of memory, etc. ]. @@ -800,6 +833,7 @@ class Client(threading.local): dead_servers = [] notstored = [] # original keys. + extra = ' noreply' if noreply else '' for server in six.iterkeys(server_keys): bigcmd = [] write = bigcmd.append @@ -809,11 +843,12 @@ class Client(threading.local): mapping[prefixed_to_orig_key[key]], min_compress_len) if store_info: - msg = "set %s %d %d %d\r\n%s\r\n" + msg = "set %s %d %d %d%s\r\n%s\r\n" write(msg % (key, store_info[0], time, store_info[1], + extra, store_info[2])) else: notstored.append(prefixed_to_orig_key[key]) @@ -824,6 +859,10 @@ class Client(threading.local): server.mark_dead(msg) dead_servers.append(server) + # if noreply, just return early + if noreply: + return notstored + # if any servers died on the way, don't expect them to respond. for server in dead_servers: del server_keys[server] @@ -896,7 +935,7 @@ class Client(threading.local): return (flags, len(val), val) - def _set(self, cmd, key, val, time, min_compress_len=0): + def _set(self, cmd, key, val, time, min_compress_len=0, noreply=False): if self.do_check_key: self.check_key(key) server, key = self._get_server(key) @@ -910,20 +949,24 @@ class Client(threading.local): if not store_info: return(0) + extra = ' noreply' if noreply else '' if cmd == 'cas': if key not in self.cas_ids: - return self._set('set', key, val, time, min_compress_len) + return self._set('set', key, val, time, min_compress_len, + noreply) fullcmd = "%s %s %d %d %d %d\r\n%s" % ( cmd, key, store_info[0], time, store_info[1], self.cas_ids[key], store_info[2]) else: - fullcmd = "%s %s %d %d %d\r\n%s" % ( + fullcmd = "%s %s %d %d %d%s\r\n%s" % ( cmd, key, store_info[0], - time, store_info[1], store_info[2] + time, store_info[1], extra, store_info[2] ) try: server.send_cmd(fullcmd) + if noreply: + return True return(server.expect("STORED", raise_exception=True) == "STORED") except socket.error as msg: @@ -1393,11 +1436,11 @@ if __name__ == "__main__": return "%s (%s)" % (val, type(val)) return "%s" % val - def test_setget(key, val): + def test_setget(key, val, noreply=False): global failures - print("Testing set/get {'%s': %s} ..." - % (to_s(key), to_s(val)), end=" ") - mc.set(key, val) + print("Testing set/get (noreply=%s) {'%s': %s} ..." + % (noreply, to_s(key), to_s(val)), end=" ") + mc.set(key, val, noreply=noreply) newval = mc.get(key) if newval == val: print("OK") @@ -1421,7 +1464,9 @@ if __name__ == "__main__": return 0 test_setget("a_string", "some random string") + test_setget("a_string_2", "some random string", noreply=True) test_setget("an_integer", 42) + test_setget("an_integer_2", 42, noreply=True) if test_setget("long", long(1 << 30)): print("Testing delete ...", end=" ") if mc.delete("long"): @@ -1436,7 +1481,8 @@ if __name__ == "__main__": print("FAIL") failures += 1 print("Testing get_multi ...",) - print(mc.get_multi(["a_string", "an_integer"])) + print(mc.get_multi(["a_string", "an_integer", "a_string_2", + "an_integer_2"])) # removed from the protocol # if test_setget("timed_delete", 'foo'): @@ -1458,6 +1504,7 @@ if __name__ == "__main__": f = FooStruct() test_setget("foostruct", f) + test_setget("foostruct_2", f, noreply=True) print("Testing incr ...", end=" ") x = mc.incr("an_integer", 1) @@ -1467,6 +1514,15 @@ if __name__ == "__main__": print("FAIL") failures += 1 + print("Testing incr (noreply=True) ...", end=" ") + mc.incr("an_integer_2", 1, noreply=True) + x = mc.get("an_integer_2") + if x == 43: + print("OK") + else: + print("FAIL") + failures += 1 + print("Testing decr ...", end=" ") x = mc.decr("an_integer", 1) if x == 42: @@ -1476,6 +1532,16 @@ if __name__ == "__main__": failures += 1 sys.stdout.flush() + print("Testing decr (noreply=True) ...", end=" ") + mc.decr("an_integer_2", 1, noreply=True) + x = mc.get("an_integer_2") + if x == 42: + print("OK") + else: + print("FAIL") + failures += 1 + sys.stdout.flush() + # sanity tests print("Testing sending spaces...", end=" ") sys.stdout.flush() @@ -1499,6 +1565,7 @@ if __name__ == "__main__": print("Testing using insanely long key...", end=" ") try: x = mc.set('a'*SERVER_MAX_KEY_LENGTH, 1) + x = mc.set('a'*SERVER_MAX_KEY_LENGTH, 1, noreply=True) except Client.MemcachedKeyLengthError as msg: print("FAIL") failures += 1 -- cgit v1.2.1