From f9af49b250ea4b753ea0b31477d7c321433a1c4f Mon Sep 17 00:00:00 2001 From: Sean Reifschneider Date: Tue, 29 Nov 2011 12:34:32 -0700 Subject: Bug #887765: Interrupted connection to memcache server can cause inconsistencies. Added "flush_on_reconnect" (defaults to off) to Client() which will cause a client that has lost connection to a server and then reconnects to flush the cache on the reconnect so that it doesn't get old values from that server. Patch by Daniel Benamy. --- ChangeLog | 7 +++++++ memcache.py | 33 ++++++++++++++++++++++++++------- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/ChangeLog b/ChangeLog index 5546440..cc0d908 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ + * Bug #887765: Interrupted connection to memcache server can cause + inconsistencies. + Added "flush_on_reconnect" (defaults to off) to Client() which will + cause a client that has lost connection to a server and then reconnects + to flush the cache on the reconnect so that it doesn't get old values + from that server. Patch by Daniel Benamy. + Sun, 27 Nov 2011 18:15:32 -0700 Sean Reifschneider * Bug #745633: Values of maximum size are not stored diff --git a/memcache.py b/memcache.py index 25fd426..9d8fc9e 100644 --- a/memcache.py +++ b/memcache.py @@ -161,7 +161,7 @@ class Client(local): server_max_key_length=SERVER_MAX_KEY_LENGTH, server_max_value_length=SERVER_MAX_VALUE_LENGTH, dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT, - cache_cas = False): + cache_cas = False, flush_on_reconnect=0): """ Create a new Client object with the given list of servers. @@ -187,11 +187,19 @@ class Client(local): Data that is larger than this will not be sent to the server. @param server_max_value_length: (default SERVER_MAX_VALUE_LENGTH) Data that is larger than this will not be sent to the server. + @param flush_on_reconnect: optional flag which prevents a scenario that + can cause stale data to be read: If there's more than one memcached + server and the connection to one is interrupted, keys that mapped to + that server will get reassigned to another. If the first server comes + back, those keys will map to it again. If it still has its data, get()s + can read stale data that was overwritten on another server. This flag + is off by default for backwards compatibility. """ local.__init__(self) self.debug = debug self.dead_retry = dead_retry self.socket_timeout = socket_timeout + self.flush_on_reconnect = flush_on_reconnect self.set_servers(servers) self.stats = {} self.cache_cas = cache_cas @@ -235,8 +243,9 @@ class Client(local): an integer weight value. """ self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry, - socket_timeout=self.socket_timeout) - for s in servers] + socket_timeout=self.socket_timeout, + flush_on_reconnect=self.flush_on_reconnect) + for s in servers] self._init_buckets() def get_stats(self, stat_args = None): @@ -297,11 +306,10 @@ class Client(local): return data def flush_all(self): - 'Expire all data currently in the memcache servers.' + """Expire all data in memcache servers that are reachable.""" for s in self.servers: if not s.connect(): continue - s.send_cmd('flush_all') - s.expect("OK") + s.flush() def debuglog(self, str): if self.debug: @@ -1030,10 +1038,11 @@ class Client(local): class _Host(object): def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY, - socket_timeout=_SOCKET_TIMEOUT): + socket_timeout=_SOCKET_TIMEOUT, flush_on_reconnect=0): self.dead_retry = dead_retry self.socket_timeout = socket_timeout self.debug = debug + self.flush_on_reconnect = flush_on_reconnect if isinstance(host, tuple): host, self.weight = host else: @@ -1060,6 +1069,7 @@ class _Host(object): self.deaduntil = 0 self.socket = None + self.flush_on_next_connect = 0 self.buffer = '' @@ -1081,6 +1091,8 @@ class _Host(object): def mark_dead(self, reason): self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason)) self.deaduntil = time.time() + self.dead_retry + if self.flush_on_reconnect: + self.flush_on_next_connect = 1 self.close_socket() def _get_socket(self): @@ -1101,6 +1113,9 @@ class _Host(object): return None self.socket = s self.buffer = '' + if self.flush_on_next_connect: + self.flush() + self.flush_on_next_connect = 0 return s def close_socket(self): @@ -1151,6 +1166,10 @@ class _Host(object): self.buffer = buf[rlen:] return buf[:rlen] + def flush(self): + self.send_cmd('flush_all') + self.expect('OK') + def __str__(self): d = '' if self.deaduntil: -- cgit v1.2.1