summaryrefslogtreecommitdiff
path: root/redis/sentinel.py
diff options
context:
space:
mode:
authorVitja Makarov <vitja.makarov@gmail.com>2013-08-25 10:13:40 +0400
committerVitja Makarov <vitja.makarov@gmail.com>2013-09-04 23:34:56 +0400
commit0967fa0063cff9c016f1c833e890cf061d3f2fab (patch)
tree9622cd5bf4818e0b6917d08dae4a89be01bb4418 /redis/sentinel.py
parentc04a74e4c1f2978decbe66acbcd5fbaf9da64188 (diff)
downloadredis-py-0967fa0063cff9c016f1c833e890cf061d3f2fab.tar.gz
Rework slave selection logic
Diffstat (limited to 'redis/sentinel.py')
-rw-r--r--redis/sentinel.py97
1 files changed, 52 insertions, 45 deletions
diff --git a/redis/sentinel.py b/redis/sentinel.py
index e7a73c3..e9e4473 100644
--- a/redis/sentinel.py
+++ b/redis/sentinel.py
@@ -3,6 +3,7 @@ import random
from redis.client import StrictRedis
from redis.connection import ConnectionPool, Connection
from redis.exceptions import ConnectionError, ResponseError
+from redis._compat import xrange
class MasterNotFoundError(ConnectionError):
@@ -19,8 +20,19 @@ class SentinelManagedConnection(Connection):
super(SentinelManagedConnection, self).__init__(**kwargs)
def connect(self):
- self.host, self.port = self.connection_pool.discover()
- super(SentinelManagedConnection, self).connect()
+ if self._sock:
+ return # already connected
+ if self.connection_pool.is_master:
+ self.host, self.port = self.connection_pool.get_master_address()
+ super(SentinelManagedConnection, self).connect()
+ else:
+ for slave in self.connection_pool.rotate_slaves():
+ self.host, self.port = slave
+ try:
+ return super(SentinelManagedConnection, self).connect()
+ except ConnectionError:
+ continue
+ raise SlaveNotFoundError # Never be here
class SentinelConnectionPool(ConnectionPool):
@@ -31,27 +43,38 @@ class SentinelConnectionPool(ConnectionPool):
super(SentinelConnectionPool, self).__init__(**kwargs)
self.connection_kwargs['connection_pool'] = self
self.service_name = service_name
- self.master_address = None
self.sentinel_manager = sentinel_manager
+ self.master_address = None
+ self.slave_rr_counter = None
- def discover_master(self):
+ def get_master_address(self):
master_address = self.sentinel_manager.discover_master(
self.service_name)
- if self.master_address is None:
+ if not self.is_master:
+ pass
+ elif self.master_address is None:
self.master_address = master_address
elif master_address != self.master_address:
- # Master address change disconnect
- self.disconnect()
+ self.disconnect() # Master address changed
return master_address
- def discover_slave(self):
- return self.sentinel_manager.discover_slave(self.service_name)
-
- def discover(self):
- if self.is_master:
- return self.discover_master()
- else:
- return self.discover_slave()
+ def rotate_slaves(self):
+ "Round-robin slave balancer"
+ slaves = self.sentinel_manager.discover_slaves(self.service_name)
+ if slaves:
+ if self.slave_rr_counter is None:
+ self.slave_rr_counter = random.randint(0, len(slaves) - 1)
+ for _ in xrange(len(slaves)):
+ self.slave_rr_counter = (
+ self.slave_rr_counter + 1) % len(slaves)
+ slave = slaves[self.slave_rr_counter]
+ yield slave
+ # Fallback to the master connection
+ try:
+ yield self.get_master_address()
+ except MasterNotFoundError:
+ pass
+ raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
class Sentinel(object):
@@ -111,49 +134,33 @@ class Sentinel(object):
return state['ip'], state['port']
raise MasterNotFoundError("No master found for %r" % (service_name,))
- def choose_slave(self, slaves):
- "Choose random slave from ``slaves``."
+ def filter_slaves(self, slaves):
+ "Remove slaves that are in ODOWN or SDOWN state"
slaves_alive = []
for slave in slaves:
if slave['is_odown'] or slave['is_sdown']:
continue
- slaves_alive.append(slave)
- if slaves_alive:
- return random.choice(slaves_alive)
+ slaves_alive.append((slave['ip'], slave['port']))
+ return slaves_alive
- def discover_slave(self, service_name, use_master=True):
- """
- Asks sentinels for the list of slaves corresponding to the service
- labeled ``service_name``. Slave is selected with choose_slave() method
- feel free to override it with your own slave selection algorithm.
-
- If no slave is found and ``use_master`` is set to True master's address
- is returned instead.
-
- Returns a pair (address, port) or raises SlaveNotFoundError if neither
- slave nor master is found.
- """
+ def discover_slaves(self, service_name):
+ "Returns list of alive slaves for service ``service_name``"
for sentinel in self.sentinels:
try:
slaves = sentinel.sentinel_slaves(service_name)
except (ConnectionError, ResponseError):
continue
- slave = self.choose_slave(slaves)
- if slave is not None:
- return slave['ip'], slave['port']
- if use_master:
- try:
- return self.discover_master(service_name)
- except MasterNotFoundError:
- pass
- raise SlaveNotFoundError("No slave found for %r" % (service_name,))
+ slaves = self.filter_slaves(slaves)
+ if slaves:
+ return slaves
+ return []
def master_for(self, service_name, redis_class=StrictRedis, **kwargs):
"""
Returns redis client instance for master of ``service_name``.
- Undercover it calls discover_master() to retrive master's address each
- time before establishing new connection.
+ Undercover it uses SentinelConnectionPool class to retrive master's
+ address each time before establishing new connection.
NOTE: If master address change is detected all other connections from
the pool are closed.
@@ -170,8 +177,8 @@ class Sentinel(object):
"""
Returns redis client instance for slave of ``service_name``.
- Undercover it calls discover_slave() to retrive slave's address each
- time before establishing new connection.
+ Undercover it uses SentinelConnectionPool class to choose slave's
+ address each time before establishing new connection.
By default redis.StrictRedis class is used you can override this with
``redis_class`` argument. All other arguments are passed directly to