diff options
Diffstat (limited to 'redis/sentinel.py')
-rw-r--r-- | redis/sentinel.py | 136 |
1 files changed, 80 insertions, 56 deletions
diff --git a/redis/sentinel.py b/redis/sentinel.py index 06877bd..c9383d3 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -3,9 +3,8 @@ import weakref from redis.client import Redis from redis.commands import SentinelCommands -from redis.connection import ConnectionPool, Connection, SSLConnection -from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError, - TimeoutError) +from redis.connection import Connection, ConnectionPool, SSLConnection +from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError from redis.utils import str_if_bytes @@ -19,14 +18,14 @@ class SlaveNotFoundError(ConnectionError): class SentinelManagedConnection(Connection): def __init__(self, **kwargs): - self.connection_pool = kwargs.pop('connection_pool') + self.connection_pool = kwargs.pop("connection_pool") super().__init__(**kwargs) def __repr__(self): pool = self.connection_pool - s = f'{type(self).__name__}<service={pool.service_name}%s>' + s = f"{type(self).__name__}<service={pool.service_name}%s>" if self.host: - host_info = f',host={self.host},port={self.port}' + host_info = f",host={self.host},port={self.port}" s = s % host_info return s @@ -34,9 +33,9 @@ class SentinelManagedConnection(Connection): self.host, self.port = address super().connect() if self.connection_pool.check_connection: - self.send_command('PING') - if str_if_bytes(self.read_response()) != 'PONG': - raise ConnectionError('PING failed') + self.send_command("PING") + if str_if_bytes(self.read_response()) != "PONG": + raise ConnectionError("PING failed") def connect(self): if self._sock: @@ -62,7 +61,7 @@ class SentinelManagedConnection(Connection): # calling disconnect will force the connection to re-query # sentinel during the next connect() attempt. self.disconnect() - raise ConnectionError('The previous master is now a slave') + raise ConnectionError("The previous master is now a slave") raise @@ -79,19 +78,21 @@ class SentinelConnectionPool(ConnectionPool): """ def __init__(self, service_name, sentinel_manager, **kwargs): - kwargs['connection_class'] = kwargs.get( - 'connection_class', - SentinelManagedSSLConnection if kwargs.pop('ssl', False) - else SentinelManagedConnection) - self.is_master = kwargs.pop('is_master', True) - self.check_connection = kwargs.pop('check_connection', False) + kwargs["connection_class"] = kwargs.get( + "connection_class", + SentinelManagedSSLConnection + if kwargs.pop("ssl", False) + else SentinelManagedConnection, + ) + self.is_master = kwargs.pop("is_master", True) + self.check_connection = kwargs.pop("check_connection", False) super().__init__(**kwargs) - self.connection_kwargs['connection_pool'] = weakref.proxy(self) + self.connection_kwargs["connection_pool"] = weakref.proxy(self) self.service_name = service_name self.sentinel_manager = sentinel_manager def __repr__(self): - role = 'master' if self.is_master else 'slave' + role = "master" if self.is_master else "slave" return f"{type(self).__name__}<service={self.service_name}({role})" def reset(self): @@ -100,15 +101,14 @@ class SentinelConnectionPool(ConnectionPool): self.slave_rr_counter = None def owns_connection(self, connection): - check = not self.is_master or \ - (self.is_master and - self.master_address == (connection.host, connection.port)) + check = not self.is_master or ( + self.is_master and self.master_address == (connection.host, connection.port) + ) parent = super() return check and parent.owns_connection(connection) def get_master_address(self): - master_address = self.sentinel_manager.discover_master( - self.service_name) + master_address = self.sentinel_manager.discover_master(self.service_name) if self.is_master: if self.master_address != master_address: self.master_address = master_address @@ -124,8 +124,7 @@ class SentinelConnectionPool(ConnectionPool): if self.slave_rr_counter is None: self.slave_rr_counter = random.randint(0, len(slaves) - 1) for _ in range(len(slaves)): - self.slave_rr_counter = ( - self.slave_rr_counter + 1) % len(slaves) + self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) slave = slaves[self.slave_rr_counter] yield slave # Fallback to the master connection @@ -133,7 +132,7 @@ class SentinelConnectionPool(ConnectionPool): yield self.get_master_address() except MasterNotFoundError: pass - raise SlaveNotFoundError(f'No slave found for {self.service_name!r}') + raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") class Sentinel(SentinelCommands): @@ -165,20 +164,25 @@ class Sentinel(SentinelCommands): establishing a connection to a Redis server. """ - def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None, - **connection_kwargs): + def __init__( + self, + sentinels, + min_other_sentinels=0, + sentinel_kwargs=None, + **connection_kwargs, + ): # if sentinel_kwargs isn't defined, use the socket_* options from # connection_kwargs if sentinel_kwargs is None: sentinel_kwargs = { - k: v - for k, v in connection_kwargs.items() - if k.startswith('socket_') + k: v for k, v in connection_kwargs.items() if k.startswith("socket_") } self.sentinel_kwargs = sentinel_kwargs - self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs) - for hostname, port in sentinels] + self.sentinels = [ + Redis(hostname, port, **self.sentinel_kwargs) + for hostname, port in sentinels + ] self.min_other_sentinels = min_other_sentinels self.connection_kwargs = connection_kwargs @@ -188,9 +192,9 @@ class Sentinel(SentinelCommands): once - If set to True, then execute the resulting command on a single node at random, rather than across the entire sentinel cluster. """ - once = bool(kwargs.get('once', False)) - if 'once' in kwargs.keys(): - kwargs.pop('once') + once = bool(kwargs.get("once", False)) + if "once" in kwargs.keys(): + kwargs.pop("once") if once: for sentinel in self.sentinels: @@ -202,16 +206,18 @@ class Sentinel(SentinelCommands): def __repr__(self): sentinel_addresses = [] for sentinel in self.sentinels: - sentinel_addresses.append('{host}:{port}'.format_map( - sentinel.connection_pool.connection_kwargs, - )) + sentinel_addresses.append( + "{host}:{port}".format_map( + sentinel.connection_pool.connection_kwargs, + ) + ) return f'{type(self).__name__}<sentinels=[{",".join(sentinel_addresses)}]>' def check_master_state(self, state, service_name): - if not state['is_master'] or state['is_sdown'] or state['is_odown']: + if not state["is_master"] or state["is_sdown"] or state["is_odown"]: return False # Check if our sentinel doesn't see other nodes - if state['num-other-sentinels'] < self.min_other_sentinels: + if state["num-other-sentinels"] < self.min_other_sentinels: return False return True @@ -232,17 +238,19 @@ class Sentinel(SentinelCommands): if state and self.check_master_state(state, service_name): # Put this sentinel at the top of the list self.sentinels[0], self.sentinels[sentinel_no] = ( - sentinel, self.sentinels[0]) - return state['ip'], state['port'] + sentinel, + self.sentinels[0], + ) + return state["ip"], state["port"] raise MasterNotFoundError(f"No master found for {service_name!r}") def filter_slaves(self, slaves): "Remove slaves that are in an ODOWN or SDOWN state" slaves_alive = [] for slave in slaves: - if slave['is_odown'] or slave['is_sdown']: + if slave["is_odown"] or slave["is_sdown"]: continue - slaves_alive.append((slave['ip'], slave['port'])) + slaves_alive.append((slave["ip"], slave["port"])) return slaves_alive def discover_slaves(self, service_name): @@ -257,8 +265,13 @@ class Sentinel(SentinelCommands): return slaves return [] - def master_for(self, service_name, redis_class=Redis, - connection_pool_class=SentinelConnectionPool, **kwargs): + def master_for( + self, + service_name, + redis_class=Redis, + connection_pool_class=SentinelConnectionPool, + **kwargs, + ): """ Returns a redis client instance for the ``service_name`` master. @@ -281,14 +294,22 @@ class Sentinel(SentinelCommands): passed to this class and passed to the connection pool as keyword arguments to be used to initialize Redis connections. """ - kwargs['is_master'] = True + kwargs["is_master"] = True connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) - return redis_class(connection_pool=connection_pool_class( - service_name, self, **connection_kwargs)) - - def slave_for(self, service_name, redis_class=Redis, - connection_pool_class=SentinelConnectionPool, **kwargs): + return redis_class( + connection_pool=connection_pool_class( + service_name, self, **connection_kwargs + ) + ) + + def slave_for( + self, + service_name, + redis_class=Redis, + connection_pool_class=SentinelConnectionPool, + **kwargs, + ): """ Returns redis client instance for the ``service_name`` slave(s). @@ -306,8 +327,11 @@ class Sentinel(SentinelCommands): passed to this class and passed to the connection pool as keyword arguments to be used to initialize Redis connections. """ - kwargs['is_master'] = False + kwargs["is_master"] = False connection_kwargs = dict(self.connection_kwargs) connection_kwargs.update(kwargs) - return redis_class(connection_pool=connection_pool_class( - service_name, self, **connection_kwargs)) + return redis_class( + connection_pool=connection_pool_class( + service_name, self, **connection_kwargs + ) + ) |