diff options
author | Vitja Makarov <vitja.makarov@gmail.com> | 2013-08-25 10:13:40 +0400 |
---|---|---|
committer | Vitja Makarov <vitja.makarov@gmail.com> | 2013-09-04 23:34:56 +0400 |
commit | 0967fa0063cff9c016f1c833e890cf061d3f2fab (patch) | |
tree | 9622cd5bf4818e0b6917d08dae4a89be01bb4418 /redis/sentinel.py | |
parent | c04a74e4c1f2978decbe66acbcd5fbaf9da64188 (diff) | |
download | redis-py-0967fa0063cff9c016f1c833e890cf061d3f2fab.tar.gz |
Rework slave selection logic
Diffstat (limited to 'redis/sentinel.py')
-rw-r--r-- | redis/sentinel.py | 97 |
1 files changed, 52 insertions, 45 deletions
diff --git a/redis/sentinel.py b/redis/sentinel.py index e7a73c3..e9e4473 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -3,6 +3,7 @@ import random from redis.client import StrictRedis from redis.connection import ConnectionPool, Connection from redis.exceptions import ConnectionError, ResponseError +from redis._compat import xrange class MasterNotFoundError(ConnectionError): @@ -19,8 +20,19 @@ class SentinelManagedConnection(Connection): super(SentinelManagedConnection, self).__init__(**kwargs) def connect(self): - self.host, self.port = self.connection_pool.discover() - super(SentinelManagedConnection, self).connect() + if self._sock: + return # already connected + if self.connection_pool.is_master: + self.host, self.port = self.connection_pool.get_master_address() + super(SentinelManagedConnection, self).connect() + else: + for slave in self.connection_pool.rotate_slaves(): + self.host, self.port = slave + try: + return super(SentinelManagedConnection, self).connect() + except ConnectionError: + continue + raise SlaveNotFoundError # Never be here class SentinelConnectionPool(ConnectionPool): @@ -31,27 +43,38 @@ class SentinelConnectionPool(ConnectionPool): super(SentinelConnectionPool, self).__init__(**kwargs) self.connection_kwargs['connection_pool'] = self self.service_name = service_name - self.master_address = None self.sentinel_manager = sentinel_manager + self.master_address = None + self.slave_rr_counter = None - def discover_master(self): + def get_master_address(self): master_address = self.sentinel_manager.discover_master( self.service_name) - if self.master_address is None: + if not self.is_master: + pass + elif self.master_address is None: self.master_address = master_address elif master_address != self.master_address: - # Master address change disconnect - self.disconnect() + self.disconnect() # Master address changed return master_address - def discover_slave(self): - return self.sentinel_manager.discover_slave(self.service_name) - - def discover(self): - if self.is_master: - return self.discover_master() - else: - return self.discover_slave() + def rotate_slaves(self): + "Round-robin slave balancer" + slaves = self.sentinel_manager.discover_slaves(self.service_name) + if slaves: + if self.slave_rr_counter is None: + self.slave_rr_counter = random.randint(0, len(slaves) - 1) + for _ in xrange(len(slaves)): + self.slave_rr_counter = ( + self.slave_rr_counter + 1) % len(slaves) + slave = slaves[self.slave_rr_counter] + yield slave + # Fallback to the master connection + try: + yield self.get_master_address() + except MasterNotFoundError: + pass + raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) class Sentinel(object): @@ -111,49 +134,33 @@ class Sentinel(object): return state['ip'], state['port'] raise MasterNotFoundError("No master found for %r" % (service_name,)) - def choose_slave(self, slaves): - "Choose random slave from ``slaves``." + def filter_slaves(self, slaves): + "Remove slaves that are in ODOWN or SDOWN state" slaves_alive = [] for slave in slaves: if slave['is_odown'] or slave['is_sdown']: continue - slaves_alive.append(slave) - if slaves_alive: - return random.choice(slaves_alive) + slaves_alive.append((slave['ip'], slave['port'])) + return slaves_alive - def discover_slave(self, service_name, use_master=True): - """ - Asks sentinels for the list of slaves corresponding to the service - labeled ``service_name``. Slave is selected with choose_slave() method - feel free to override it with your own slave selection algorithm. - - If no slave is found and ``use_master`` is set to True master's address - is returned instead. - - Returns a pair (address, port) or raises SlaveNotFoundError if neither - slave nor master is found. - """ + def discover_slaves(self, service_name): + "Returns list of alive slaves for service ``service_name``" for sentinel in self.sentinels: try: slaves = sentinel.sentinel_slaves(service_name) except (ConnectionError, ResponseError): continue - slave = self.choose_slave(slaves) - if slave is not None: - return slave['ip'], slave['port'] - if use_master: - try: - return self.discover_master(service_name) - except MasterNotFoundError: - pass - raise SlaveNotFoundError("No slave found for %r" % (service_name,)) + slaves = self.filter_slaves(slaves) + if slaves: + return slaves + return [] def master_for(self, service_name, redis_class=StrictRedis, **kwargs): """ Returns redis client instance for master of ``service_name``. - Undercover it calls discover_master() to retrive master's address each - time before establishing new connection. + Undercover it uses SentinelConnectionPool class to retrive master's + address each time before establishing new connection. NOTE: If master address change is detected all other connections from the pool are closed. @@ -170,8 +177,8 @@ class Sentinel(object): """ Returns redis client instance for slave of ``service_name``. - Undercover it calls discover_slave() to retrive slave's address each - time before establishing new connection. + Undercover it uses SentinelConnectionPool class to choose slave's + address each time before establishing new connection. By default redis.StrictRedis class is used you can override this with ``redis_class`` argument. All other arguments are passed directly to |