summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Reifschneider <jafo@tummy.com>2011-11-29 12:34:32 -0700
committerSean Reifschneider <jafo@tummy.com>2011-11-29 12:34:32 -0700
commitf9af49b250ea4b753ea0b31477d7c321433a1c4f (patch)
treeb9bb50f177786acee6b2059f1df48a77411d7395
parentc25c0d6ed1fc4024cd672a0660010c6d87360960 (diff)
downloadpython-memcached-f9af49b250ea4b753ea0b31477d7c321433a1c4f.tar.gz
Bug #887765: Interrupted connection to memcache server can cause
inconsistencies. Added "flush_on_reconnect" (defaults to off) to Client() which will cause a client that has lost connection to a server and then reconnects to flush the cache on the reconnect so that it doesn't get old values from that server. Patch by Daniel Benamy.
-rw-r--r--ChangeLog7
-rw-r--r--memcache.py33
2 files changed, 33 insertions, 7 deletions
diff --git a/ChangeLog b/ChangeLog
index 5546440..cc0d908 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+ * Bug #887765: Interrupted connection to memcache server can cause
+ inconsistencies.
+ Added "flush_on_reconnect" (defaults to off) to Client() which will
+ cause a client that has lost connection to a server and then reconnects
+ to flush the cache on the reconnect so that it doesn't get old values
+ from that server. Patch by Daniel Benamy.
+
Sun, 27 Nov 2011 18:15:32 -0700 Sean Reifschneider <jafo@tummy.com>
* Bug #745633: Values of maximum size are not stored
diff --git a/memcache.py b/memcache.py
index 25fd426..9d8fc9e 100644
--- a/memcache.py
+++ b/memcache.py
@@ -161,7 +161,7 @@ class Client(local):
server_max_key_length=SERVER_MAX_KEY_LENGTH,
server_max_value_length=SERVER_MAX_VALUE_LENGTH,
dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT,
- cache_cas = False):
+ cache_cas = False, flush_on_reconnect=0):
"""
Create a new Client object with the given list of servers.
@@ -187,11 +187,19 @@ class Client(local):
Data that is larger than this will not be sent to the server.
@param server_max_value_length: (default SERVER_MAX_VALUE_LENGTH)
Data that is larger than this will not be sent to the server.
+ @param flush_on_reconnect: optional flag which prevents a scenario that
+ can cause stale data to be read: If there's more than one memcached
+ server and the connection to one is interrupted, keys that mapped to
+ that server will get reassigned to another. If the first server comes
+ back, those keys will map to it again. If it still has its data, get()s
+ can read stale data that was overwritten on another server. This flag
+ is off by default for backwards compatibility.
"""
local.__init__(self)
self.debug = debug
self.dead_retry = dead_retry
self.socket_timeout = socket_timeout
+ self.flush_on_reconnect = flush_on_reconnect
self.set_servers(servers)
self.stats = {}
self.cache_cas = cache_cas
@@ -235,8 +243,9 @@ class Client(local):
an integer weight value.
"""
self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry,
- socket_timeout=self.socket_timeout)
- for s in servers]
+ socket_timeout=self.socket_timeout,
+ flush_on_reconnect=self.flush_on_reconnect)
+ for s in servers]
self._init_buckets()
def get_stats(self, stat_args = None):
@@ -297,11 +306,10 @@ class Client(local):
return data
def flush_all(self):
- 'Expire all data currently in the memcache servers.'
+ """Expire all data in memcache servers that are reachable."""
for s in self.servers:
if not s.connect(): continue
- s.send_cmd('flush_all')
- s.expect("OK")
+ s.flush()
def debuglog(self, str):
if self.debug:
@@ -1030,10 +1038,11 @@ class Client(local):
class _Host(object):
def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY,
- socket_timeout=_SOCKET_TIMEOUT):
+ socket_timeout=_SOCKET_TIMEOUT, flush_on_reconnect=0):
self.dead_retry = dead_retry
self.socket_timeout = socket_timeout
self.debug = debug
+ self.flush_on_reconnect = flush_on_reconnect
if isinstance(host, tuple):
host, self.weight = host
else:
@@ -1060,6 +1069,7 @@ class _Host(object):
self.deaduntil = 0
self.socket = None
+ self.flush_on_next_connect = 0
self.buffer = ''
@@ -1081,6 +1091,8 @@ class _Host(object):
def mark_dead(self, reason):
self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason))
self.deaduntil = time.time() + self.dead_retry
+ if self.flush_on_reconnect:
+ self.flush_on_next_connect = 1
self.close_socket()
def _get_socket(self):
@@ -1101,6 +1113,9 @@ class _Host(object):
return None
self.socket = s
self.buffer = ''
+ if self.flush_on_next_connect:
+ self.flush()
+ self.flush_on_next_connect = 0
return s
def close_socket(self):
@@ -1151,6 +1166,10 @@ class _Host(object):
self.buffer = buf[rlen:]
return buf[:rlen]
+ def flush(self):
+ self.send_cmd('flush_all')
+ self.expect('OK')
+
def __str__(self):
d = ''
if self.deaduntil: