summaryrefslogtreecommitdiff
path: root/redis/sentinel.py
diff options
context:
space:
mode:
Diffstat (limited to 'redis/sentinel.py')
-rw-r--r--redis/sentinel.py136
1 files changed, 80 insertions, 56 deletions
diff --git a/redis/sentinel.py b/redis/sentinel.py
index 06877bd..c9383d3 100644
--- a/redis/sentinel.py
+++ b/redis/sentinel.py
@@ -3,9 +3,8 @@ import weakref
from redis.client import Redis
from redis.commands import SentinelCommands
-from redis.connection import ConnectionPool, Connection, SSLConnection
-from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError,
- TimeoutError)
+from redis.connection import Connection, ConnectionPool, SSLConnection
+from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError
from redis.utils import str_if_bytes
@@ -19,14 +18,14 @@ class SlaveNotFoundError(ConnectionError):
class SentinelManagedConnection(Connection):
def __init__(self, **kwargs):
- self.connection_pool = kwargs.pop('connection_pool')
+ self.connection_pool = kwargs.pop("connection_pool")
super().__init__(**kwargs)
def __repr__(self):
pool = self.connection_pool
- s = f'{type(self).__name__}<service={pool.service_name}%s>'
+ s = f"{type(self).__name__}<service={pool.service_name}%s>"
if self.host:
- host_info = f',host={self.host},port={self.port}'
+ host_info = f",host={self.host},port={self.port}"
s = s % host_info
return s
@@ -34,9 +33,9 @@ class SentinelManagedConnection(Connection):
self.host, self.port = address
super().connect()
if self.connection_pool.check_connection:
- self.send_command('PING')
- if str_if_bytes(self.read_response()) != 'PONG':
- raise ConnectionError('PING failed')
+ self.send_command("PING")
+ if str_if_bytes(self.read_response()) != "PONG":
+ raise ConnectionError("PING failed")
def connect(self):
if self._sock:
@@ -62,7 +61,7 @@ class SentinelManagedConnection(Connection):
# calling disconnect will force the connection to re-query
# sentinel during the next connect() attempt.
self.disconnect()
- raise ConnectionError('The previous master is now a slave')
+ raise ConnectionError("The previous master is now a slave")
raise
@@ -79,19 +78,21 @@ class SentinelConnectionPool(ConnectionPool):
"""
def __init__(self, service_name, sentinel_manager, **kwargs):
- kwargs['connection_class'] = kwargs.get(
- 'connection_class',
- SentinelManagedSSLConnection if kwargs.pop('ssl', False)
- else SentinelManagedConnection)
- self.is_master = kwargs.pop('is_master', True)
- self.check_connection = kwargs.pop('check_connection', False)
+ kwargs["connection_class"] = kwargs.get(
+ "connection_class",
+ SentinelManagedSSLConnection
+ if kwargs.pop("ssl", False)
+ else SentinelManagedConnection,
+ )
+ self.is_master = kwargs.pop("is_master", True)
+ self.check_connection = kwargs.pop("check_connection", False)
super().__init__(**kwargs)
- self.connection_kwargs['connection_pool'] = weakref.proxy(self)
+ self.connection_kwargs["connection_pool"] = weakref.proxy(self)
self.service_name = service_name
self.sentinel_manager = sentinel_manager
def __repr__(self):
- role = 'master' if self.is_master else 'slave'
+ role = "master" if self.is_master else "slave"
return f"{type(self).__name__}<service={self.service_name}({role})"
def reset(self):
@@ -100,15 +101,14 @@ class SentinelConnectionPool(ConnectionPool):
self.slave_rr_counter = None
def owns_connection(self, connection):
- check = not self.is_master or \
- (self.is_master and
- self.master_address == (connection.host, connection.port))
+ check = not self.is_master or (
+ self.is_master and self.master_address == (connection.host, connection.port)
+ )
parent = super()
return check and parent.owns_connection(connection)
def get_master_address(self):
- master_address = self.sentinel_manager.discover_master(
- self.service_name)
+ master_address = self.sentinel_manager.discover_master(self.service_name)
if self.is_master:
if self.master_address != master_address:
self.master_address = master_address
@@ -124,8 +124,7 @@ class SentinelConnectionPool(ConnectionPool):
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in range(len(slaves)):
- self.slave_rr_counter = (
- self.slave_rr_counter + 1) % len(slaves)
+ self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
@@ -133,7 +132,7 @@ class SentinelConnectionPool(ConnectionPool):
yield self.get_master_address()
except MasterNotFoundError:
pass
- raise SlaveNotFoundError(f'No slave found for {self.service_name!r}')
+ raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
class Sentinel(SentinelCommands):
@@ -165,20 +164,25 @@ class Sentinel(SentinelCommands):
establishing a connection to a Redis server.
"""
- def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
- **connection_kwargs):
+ def __init__(
+ self,
+ sentinels,
+ min_other_sentinels=0,
+ sentinel_kwargs=None,
+ **connection_kwargs,
+ ):
# if sentinel_kwargs isn't defined, use the socket_* options from
# connection_kwargs
if sentinel_kwargs is None:
sentinel_kwargs = {
- k: v
- for k, v in connection_kwargs.items()
- if k.startswith('socket_')
+ k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
}
self.sentinel_kwargs = sentinel_kwargs
- self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
- for hostname, port in sentinels]
+ self.sentinels = [
+ Redis(hostname, port, **self.sentinel_kwargs)
+ for hostname, port in sentinels
+ ]
self.min_other_sentinels = min_other_sentinels
self.connection_kwargs = connection_kwargs
@@ -188,9 +192,9 @@ class Sentinel(SentinelCommands):
once - If set to True, then execute the resulting command on a single
node at random, rather than across the entire sentinel cluster.
"""
- once = bool(kwargs.get('once', False))
- if 'once' in kwargs.keys():
- kwargs.pop('once')
+ once = bool(kwargs.get("once", False))
+ if "once" in kwargs.keys():
+ kwargs.pop("once")
if once:
for sentinel in self.sentinels:
@@ -202,16 +206,18 @@ class Sentinel(SentinelCommands):
def __repr__(self):
sentinel_addresses = []
for sentinel in self.sentinels:
- sentinel_addresses.append('{host}:{port}'.format_map(
- sentinel.connection_pool.connection_kwargs,
- ))
+ sentinel_addresses.append(
+ "{host}:{port}".format_map(
+ sentinel.connection_pool.connection_kwargs,
+ )
+ )
return f'{type(self).__name__}<sentinels=[{",".join(sentinel_addresses)}]>'
def check_master_state(self, state, service_name):
- if not state['is_master'] or state['is_sdown'] or state['is_odown']:
+ if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
return False
# Check if our sentinel doesn't see other nodes
- if state['num-other-sentinels'] < self.min_other_sentinels:
+ if state["num-other-sentinels"] < self.min_other_sentinels:
return False
return True
@@ -232,17 +238,19 @@ class Sentinel(SentinelCommands):
if state and self.check_master_state(state, service_name):
# Put this sentinel at the top of the list
self.sentinels[0], self.sentinels[sentinel_no] = (
- sentinel, self.sentinels[0])
- return state['ip'], state['port']
+ sentinel,
+ self.sentinels[0],
+ )
+ return state["ip"], state["port"]
raise MasterNotFoundError(f"No master found for {service_name!r}")
def filter_slaves(self, slaves):
"Remove slaves that are in an ODOWN or SDOWN state"
slaves_alive = []
for slave in slaves:
- if slave['is_odown'] or slave['is_sdown']:
+ if slave["is_odown"] or slave["is_sdown"]:
continue
- slaves_alive.append((slave['ip'], slave['port']))
+ slaves_alive.append((slave["ip"], slave["port"]))
return slaves_alive
def discover_slaves(self, service_name):
@@ -257,8 +265,13 @@ class Sentinel(SentinelCommands):
return slaves
return []
- def master_for(self, service_name, redis_class=Redis,
- connection_pool_class=SentinelConnectionPool, **kwargs):
+ def master_for(
+ self,
+ service_name,
+ redis_class=Redis,
+ connection_pool_class=SentinelConnectionPool,
+ **kwargs,
+ ):
"""
Returns a redis client instance for the ``service_name`` master.
@@ -281,14 +294,22 @@ class Sentinel(SentinelCommands):
passed to this class and passed to the connection pool as keyword
arguments to be used to initialize Redis connections.
"""
- kwargs['is_master'] = True
+ kwargs["is_master"] = True
connection_kwargs = dict(self.connection_kwargs)
connection_kwargs.update(kwargs)
- return redis_class(connection_pool=connection_pool_class(
- service_name, self, **connection_kwargs))
-
- def slave_for(self, service_name, redis_class=Redis,
- connection_pool_class=SentinelConnectionPool, **kwargs):
+ return redis_class(
+ connection_pool=connection_pool_class(
+ service_name, self, **connection_kwargs
+ )
+ )
+
+ def slave_for(
+ self,
+ service_name,
+ redis_class=Redis,
+ connection_pool_class=SentinelConnectionPool,
+ **kwargs,
+ ):
"""
Returns redis client instance for the ``service_name`` slave(s).
@@ -306,8 +327,11 @@ class Sentinel(SentinelCommands):
passed to this class and passed to the connection pool as keyword
arguments to be used to initialize Redis connections.
"""
- kwargs['is_master'] = False
+ kwargs["is_master"] = False
connection_kwargs = dict(self.connection_kwargs)
connection_kwargs.update(kwargs)
- return redis_class(connection_pool=connection_pool_class(
- service_name, self, **connection_kwargs))
+ return redis_class(
+ connection_pool=connection_pool_class(
+ service_name, self, **connection_kwargs
+ )
+ )