summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy McCurdy <andy@andymccurdy.com>2013-09-13 11:40:39 -0700
committerAndy McCurdy <andy@andymccurdy.com>2013-09-13 11:40:39 -0700
commit89d88d6eb58a8b845ed063012c652a6555d54b1b (patch)
tree34fdcded4e30455f03ad39fa925300f0d6f1db2b
parenteab8e29238b950f9d53f50c641ff11762f48b973 (diff)
parentb2fc50b6e315e527064ed0e74a39174979a27361 (diff)
downloadredis-py-89d88d6eb58a8b845ed063012c652a6555d54b1b.tar.gz
Merge branch 'vitek-sentinel'
Conflicts: CHANGES
-rw-r--r--CHANGES1
-rw-r--r--README.rst46
-rw-r--r--redis/client.py84
-rw-r--r--redis/sentinel.py215
-rw-r--r--tests/test_sentinel.py173
5 files changed, 509 insertions, 10 deletions
diff --git a/CHANGES b/CHANGES
index b352ed0..ee81c5e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -5,6 +5,7 @@
the server. Thanks EliFinkelshteyn.
* Errors when authenticating (incorrect password) and selecting a database
now close the socket.
+ * Full Sentinel support thanks to Vitja Makarov. Thanks!
* 2.8.0
* redis-py should play better with gevent when a gevent Timeout is raised.
Thanks leifkb.
diff --git a/README.rst b/README.rst
index 35d53f2..8eed776 100644
--- a/README.rst
+++ b/README.rst
@@ -394,6 +394,52 @@ execution.
>>> pipe.execute()
[True, 25]
+Sentinel support
+^^^^^^^^^^^^^^^^
+
+redis-py can be used together with `Redis Sentinel <http://redis.io/topics/sentinel>`_
+to discover Redis nodes. You need to have at least one Sentinel daemon running
+in order to use redis-py's Sentinel support.
+
+Connecting redis-py to the Sentinel instance(s) is easy. You can use a
+Sentinel connection to discover the master and slaves network addresses:
+
+.. code-block:: pycon
+
+ >>> from redis.sentinel import Sentinel
+ >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
+ >>> sentinel.discover_master('mymaster')
+ ('127.0.0.1', 6379)
+ >>> sentinel.discover_slaves('mymaster')
+ [('127.0.0.1', 6380)]
+
+You can also create Redis client connections from a Sentinel instnace. You can
+connect to either the master (for write operations) or a slave (for read-only
+operations).
+
+.. code-block:: pycon
+
+ >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
+ >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
+ >>> master.set('foo', 'bar')
+ >>> slave.get('foo')
+ 'bar'
+
+The master and slave objects are normal StrictRedis instances with their
+connection pool bound to the Sentinel instance. When a Sentinel backed client
+attempts to establish a connection, it first queries the Sentinel servers to
+determine an appropriate host to connect to. If no server is found,
+a MasterNotFoundError or SlaveNotFoundError is raised. Both exceptions are
+subclasses of ConnectionError.
+
+When trying to connect to a slave client, the Sentinel connection pool will
+iterate over the list of slaves until it finds one that can be connected to.
+If no slaves can be connected to, a connection will be established with the
+master.
+
+See `Guidelines for Redis clients with support for Redis Sentinel
+<http://redis.io/topics/sentinel-clients>`_ to learn more about Redis Sentinel.
+
Author
^^^^^^
diff --git a/redis/client.py b/redis/client.py
index 4c3b76a..bd46e3a 100644
--- a/redis/client.py
+++ b/redis/client.py
@@ -109,19 +109,53 @@ def parse_info(response):
return info
+SENTINEL_STATE_TYPES = {
+ 'can-failover-its-master': int,
+ 'info-refresh': int,
+ 'last-hello-message': int,
+ 'last-ok-ping-reply': int,
+ 'last-ping-reply': int,
+ 'master-link-down-time': int,
+ 'master-port': int,
+ 'num-other-sentinels': int,
+ 'num-slaves': int,
+ 'o-down-time': int,
+ 'pending-commands': int,
+ 'port': int,
+ 'quorum': int,
+ 's-down-time': int,
+ 'slave-priority': int,
+}
+
+
+def parse_sentinel_state(item):
+ result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES)
+ flags = set(result['flags'].split(','))
+ for name, flag in (('is_master', 'master'), ('is_slave', 'slave'),
+ ('is_sdown', 's_down'), ('is_odown', 'o_down'),
+ ('is_sentinel', 'sentinel'),
+ ('is_disconnected', 'disconnected'),
+ ('is_master_down', 'master_down')):
+ result[name] = flag in flags
+ return result
+
+
def parse_sentinel(response, **options):
"Parse the result of Redis's SENTINEL command"
- output = []
- parse = options['parse']
-
+ parse = options.get('parse')
if parse == 'SENTINEL_INFO':
- for sub_list in response:
- it = iter(sub_list)
- output.append(dict(izip(it, it)))
- else:
- output = response
-
- return output
+ return [parse_sentinel_state(item) for item in response]
+ elif parse == 'SENTINEL_INFO_MASTERS':
+ result = {}
+ for item in response:
+ state = parse_sentinel_state(item)
+ result[state['name']] = state
+ return result
+ elif parse == 'SENTINEL_ADDR_PORT':
+ if response is None:
+ return
+ return response[0], int(response[1])
+ return response
def pairs_to_dict(response):
@@ -130,6 +164,16 @@ def pairs_to_dict(response):
return dict(izip(it, it))
+def pairs_to_dict_typed(response, type_info):
+ it = iter(response)
+ result = {}
+ for key, value in izip(it, it):
+ if key in type_info:
+ value = type_info[key](value)
+ result[key] = value
+ return result
+
+
def zset_score_pairs(response, **options):
"""
If ``withscores`` is specified in the options, return the response as
@@ -511,6 +555,26 @@ class StrictRedis(object):
parse = 'SENTINEL'
return self.execute_command('SENTINEL', *args, **{'parse': parse})
+ def sentinel_masters(self):
+ "Returns a dictionary containing the master's state."
+ return self.execute_command('SENTINEL', 'masters',
+ parse='SENTINEL_INFO_MASTERS')
+
+ def sentinel_slaves(self, service_name):
+ "Returns a list of slaves for ``service_name``"
+ return self.execute_command('SENTINEL', 'slaves', service_name,
+ parse='SENTINEL_INFO')
+
+ def sentinel_sentinels(self, service_name):
+ "Returns a list of sentinels for ``service_name``"
+ return self.execute_command('SENTINEL', 'sentinels', service_name,
+ parse='SENTINEL_INFO')
+
+ def sentinel_get_master_addr_by_name(self, service_name):
+ "Returns a (host, port) pair for the given ``service_name``"
+ return self.execute_command('SENTINEL', 'get-master-addr-by-name',
+ service_name, parse='SENTINEL_ADDR_PORT')
+
def shutdown(self):
"Shutdown the server"
try:
diff --git a/redis/sentinel.py b/redis/sentinel.py
new file mode 100644
index 0000000..bdb955f
--- /dev/null
+++ b/redis/sentinel.py
@@ -0,0 +1,215 @@
+import random
+
+from redis.client import StrictRedis
+from redis.connection import ConnectionPool, Connection
+from redis.exceptions import ConnectionError, ResponseError
+from redis._compat import xrange, nativestr
+
+
+class MasterNotFoundError(ConnectionError):
+ pass
+
+
+class SlaveNotFoundError(ConnectionError):
+ pass
+
+
+class SentinelManagedConnection(Connection):
+ def __init__(self, **kwargs):
+ self.connection_pool = kwargs.pop('connection_pool')
+ super(SentinelManagedConnection, self).__init__(**kwargs)
+
+ def connect_to(self, address):
+ self.host, self.port = address
+ super(SentinelManagedConnection, self).connect()
+ if self.connection_pool.check_connection:
+ self.send_command('PING')
+ if nativestr(self.read_response()) != 'PONG':
+ raise ConnectionError('PING failed')
+
+ def connect(self):
+ if self._sock:
+ return # already connected
+ if self.connection_pool.is_master:
+ self.connect_to(self.connection_pool.get_master_address())
+ else:
+ for slave in self.connection_pool.rotate_slaves():
+ try:
+ return self.connect_to(slave)
+ except ConnectionError:
+ continue
+ raise SlaveNotFoundError # Never be here
+
+
+class SentinelConnectionPool(ConnectionPool):
+ """
+ Sentinel backed connection pool.
+
+ If ``check_connection`` flag is set to True, SentinelManagedConnection
+ sends a PING command right after establishing the connection.
+ """
+
+ def __init__(self, service_name, sentinel_manager, **kwargs):
+ kwargs['connection_class'] = kwargs.get(
+ 'connection_class', SentinelManagedConnection)
+ self.is_master = kwargs.pop('is_master', True)
+ self.check_connection = kwargs.pop('check_connection', False)
+ super(SentinelConnectionPool, self).__init__(**kwargs)
+ self.connection_kwargs['connection_pool'] = self
+ self.service_name = service_name
+ self.sentinel_manager = sentinel_manager
+ self.master_address = None
+ self.slave_rr_counter = None
+
+ def get_master_address(self):
+ master_address = self.sentinel_manager.discover_master(
+ self.service_name)
+ if not self.is_master:
+ pass
+ elif self.master_address is None:
+ self.master_address = master_address
+ elif master_address != self.master_address:
+ self.disconnect() # Master address changed
+ return master_address
+
+ def rotate_slaves(self):
+ "Round-robin slave balancer"
+ slaves = self.sentinel_manager.discover_slaves(self.service_name)
+ if slaves:
+ if self.slave_rr_counter is None:
+ self.slave_rr_counter = random.randint(0, len(slaves) - 1)
+ for _ in xrange(len(slaves)):
+ self.slave_rr_counter = (
+ self.slave_rr_counter + 1) % len(slaves)
+ slave = slaves[self.slave_rr_counter]
+ yield slave
+ # Fallback to the master connection
+ try:
+ yield self.get_master_address()
+ except MasterNotFoundError:
+ pass
+ raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
+
+
+class Sentinel(object):
+ """
+ Redis Sentinel cluster client
+
+ >>> from redis.sentinel import Sentinel
+ >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
+ >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
+ >>> master.set('foo', 'bar')
+ >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
+ >>> slave.get('foo')
+ 'bar'
+
+ ``sentinels`` is a list of sentinel nodes. Each node is represented by
+ a pair (hostname, port).
+
+ Use ``socket_timeout`` to specify a timeout for sentinel clients.
+ It's recommended to use short timeouts.
+
+ Use ``min_other_sentinels`` to filter out sentinels with not enough peers.
+ """
+
+ def __init__(self, sentinels, password=None, socket_timeout=None,
+ min_other_sentinels=0):
+ self.sentinels = [StrictRedis(hostname, port, password=password,
+ socket_timeout=socket_timeout)
+ for hostname, port in sentinels]
+ self.min_other_sentinels = min_other_sentinels
+
+ def check_master_state(self, state, service_name):
+ if not state['is_master'] or state['is_sdown'] or state['is_odown']:
+ return False
+ # Check if our sentinel doesn't see other nodes
+ if state['num-other-sentinels'] < self.min_other_sentinels:
+ return False
+ return True
+
+ def discover_master(self, service_name):
+ """
+ Asks sentinel servers for the Redis master's address corresponding
+ to the service labeled ``service_name``.
+
+ Returns a pair (address, port) or raises MasterNotFoundError if no
+ master is found.
+ """
+ for sentinel_no, sentinel in enumerate(self.sentinels):
+ try:
+ masters = sentinel.sentinel_masters()
+ except ConnectionError:
+ continue
+ state = masters.get(service_name)
+ if state and self.check_master_state(state, service_name):
+ # Put this sentinel at the top of the list
+ self.sentinels[0], self.sentinels[sentinel_no] = (
+ sentinel, self.sentinels[0])
+ return state['ip'], state['port']
+ raise MasterNotFoundError("No master found for %r" % (service_name,))
+
+ def filter_slaves(self, slaves):
+ "Remove slaves that are in an ODOWN or SDOWN state"
+ slaves_alive = []
+ for slave in slaves:
+ if slave['is_odown'] or slave['is_sdown']:
+ continue
+ slaves_alive.append((slave['ip'], slave['port']))
+ return slaves_alive
+
+ def discover_slaves(self, service_name):
+ "Returns a list of alive slaves for service ``service_name``"
+ for sentinel in self.sentinels:
+ try:
+ slaves = sentinel.sentinel_slaves(service_name)
+ except (ConnectionError, ResponseError):
+ continue
+ slaves = self.filter_slaves(slaves)
+ if slaves:
+ return slaves
+ return []
+
+ def master_for(self, service_name, redis_class=StrictRedis,
+ connection_pool_class=SentinelConnectionPool, **kwargs):
+ """
+ Returns a redis client instance for the ``service_name`` master.
+
+ A SentinelConnectionPool class is used to retrive the master's
+ address before establishing a new connection.
+
+ NOTE: If the master's address has changed, any cached connections to
+ the old master are closed.
+
+ By default clients will be a redis.StrictRedis instance. Specify a
+ different class to the ``redis_class`` argument if you desire
+ something different.
+
+ The ``connection_pool_class`` specifies the connection pool to use.
+ The SentinelConnectionPool will be used by default.
+
+ All other arguments are passed directly to the SentinelConnectionPool.
+ """
+ kwargs['is_master'] = True
+ return redis_class(connection_pool=connection_pool_class(
+ service_name, self, **kwargs))
+
+ def slave_for(self, service_name, redis_class=StrictRedis,
+ connection_pool_class=SentinelConnectionPool, **kwargs):
+ """
+ Returns redis client instance for the ``service_name`` slave(s).
+
+ A SentinelConnectionPool class is used to retrive the slave's
+ address before establishing a new connection.
+
+ By default clients will be a redis.StrictRedis instance. Specify a
+ different class to the ``redis_class`` argument if you desire
+ something different.
+
+ The ``connection_pool_class`` specifies the connection pool to use.
+ The SentinelConnectionPool will be used by default.
+
+ All other arguments are passed directly to the SentinelConnectionPool.
+ """
+ kwargs['is_master'] = False
+ return redis_class(connection_pool=connection_pool_class(
+ service_name, self, **kwargs))
diff --git a/tests/test_sentinel.py b/tests/test_sentinel.py
new file mode 100644
index 0000000..0a6e98b
--- /dev/null
+++ b/tests/test_sentinel.py
@@ -0,0 +1,173 @@
+from __future__ import with_statement
+import pytest
+
+from redis import exceptions
+from redis.sentinel import (Sentinel, SentinelConnectionPool,
+ MasterNotFoundError, SlaveNotFoundError)
+from redis._compat import next
+import redis.sentinel
+
+
+class SentinelTestClient(object):
+ def __init__(self, cluster, id):
+ self.cluster = cluster
+ self.id = id
+
+ def sentinel_masters(self):
+ self.cluster.connection_error_if_down(self)
+ return {self.cluster.service_name: self.cluster.master}
+
+ def sentinel_slaves(self, master_name):
+ self.cluster.connection_error_if_down(self)
+ if master_name != self.cluster.service_name:
+ return []
+ return self.cluster.slaves
+
+
+class SentinelTestCluster(object):
+ def __init__(self, service_name='mymaster', ip='127.0.0.1', port=6379):
+ self.clients = {}
+ self.master = {
+ 'ip': ip,
+ 'port': port,
+ 'is_master': True,
+ 'is_sdown': False,
+ 'is_odown': False,
+ 'num-other-sentinels': 0,
+ }
+ self.service_name = service_name
+ self.slaves = []
+ self.nodes_down = set()
+
+ def connection_error_if_down(self, node):
+ if node.id in self.nodes_down:
+ raise exceptions.ConnectionError
+
+ def client(self, host, port, **kwargs):
+ return SentinelTestClient(self, (host, port))
+
+
+@pytest.fixture()
+def cluster(request):
+ def teardown():
+ redis.sentinel.StrictRedis = saved_StrictRedis
+ cluster = SentinelTestCluster()
+ saved_StrictRedis = redis.sentinel.StrictRedis
+ redis.sentinel.StrictRedis = cluster.client
+ request.addfinalizer(teardown)
+ return cluster
+
+
+@pytest.fixture()
+def sentinel(request, cluster):
+ return Sentinel([('foo', 26379), ('bar', 26379)])
+
+
+def test_discover_master(sentinel):
+ address = sentinel.discover_master('mymaster')
+ assert address == ('127.0.0.1', 6379)
+
+
+def test_discover_master_error(sentinel):
+ with pytest.raises(MasterNotFoundError):
+ sentinel.discover_master('xxx')
+
+
+def test_discover_master_sentinel_down(cluster, sentinel):
+ # Put first sentinel 'foo' down
+ cluster.nodes_down.add(('foo', 26379))
+ address = sentinel.discover_master('mymaster')
+ assert address == ('127.0.0.1', 6379)
+ # 'bar' is now first sentinel
+ assert sentinel.sentinels[0].id == ('bar', 26379)
+
+
+def test_master_min_other_sentinels(cluster):
+ sentinel = Sentinel([('foo', 26379)], min_other_sentinels=1)
+ # min_other_sentinels
+ with pytest.raises(MasterNotFoundError):
+ sentinel.discover_master('mymaster')
+ cluster.master['num-other-sentinels'] = 2
+ address = sentinel.discover_master('mymaster')
+ assert address == ('127.0.0.1', 6379)
+
+
+def test_master_odown(cluster, sentinel):
+ cluster.master['is_odown'] = True
+ with pytest.raises(MasterNotFoundError):
+ sentinel.discover_master('mymaster')
+
+
+def test_master_sdown(cluster, sentinel):
+ cluster.master['is_sdown'] = True
+ with pytest.raises(MasterNotFoundError):
+ sentinel.discover_master('mymaster')
+
+
+def test_discover_slaves(cluster, sentinel):
+ assert sentinel.discover_slaves('mymaster') == []
+
+ cluster.slaves = [
+ {'ip': 'slave0', 'port': 1234, 'is_odown': False, 'is_sdown': False},
+ {'ip': 'slave1', 'port': 1234, 'is_odown': False, 'is_sdown': False},
+ ]
+ assert sentinel.discover_slaves('mymaster') == [
+ ('slave0', 1234), ('slave1', 1234)]
+
+ # slave0 -> ODOWN
+ cluster.slaves[0]['is_odown'] = True
+ assert sentinel.discover_slaves('mymaster') == [
+ ('slave1', 1234)]
+
+ # slave1 -> SDOWN
+ cluster.slaves[1]['is_sdown'] = True
+ assert sentinel.discover_slaves('mymaster') == []
+
+ cluster.slaves[0]['is_odown'] = False
+ cluster.slaves[1]['is_sdown'] = False
+
+ # node0 -> DOWN
+ cluster.nodes_down.add(('foo', 26379))
+ assert sentinel.discover_slaves('mymaster') == [
+ ('slave0', 1234), ('slave1', 1234)]
+
+
+def test_master_for(cluster, sentinel):
+ master = sentinel.master_for('mymaster', db=9)
+ assert master.ping()
+ assert master.connection_pool.master_address == ('127.0.0.1', 6379)
+
+ # Use internal connection check
+ master = sentinel.master_for('mymaster', db=9, check_connection=True)
+ assert master.ping()
+
+
+def test_slave_for(cluster, sentinel):
+ cluster.slaves = [
+ {'ip': '127.0.0.1', 'port': 6379,
+ 'is_odown': False, 'is_sdown': False},
+ ]
+ slave = sentinel.slave_for('mymaster', db=9)
+ assert slave.ping()
+
+
+def test_slave_for_slave_not_found_error(cluster, sentinel):
+ cluster.master['is_odown'] = True
+ slave = sentinel.slave_for('mymaster', db=9)
+ with pytest.raises(SlaveNotFoundError):
+ slave.ping()
+
+
+def test_slave_round_robin(cluster, sentinel):
+ cluster.slaves = [
+ {'ip': 'slave0', 'port': 6379, 'is_odown': False, 'is_sdown': False},
+ {'ip': 'slave1', 'port': 6379, 'is_odown': False, 'is_sdown': False},
+ ]
+ pool = SentinelConnectionPool('mymaster', sentinel)
+ rotator = pool.rotate_slaves()
+ assert next(rotator) in (('slave0', 6379), ('slave1', 6379))
+ assert next(rotator) in (('slave0', 6379), ('slave1', 6379))
+ # Fallback to master
+ assert next(rotator) == ('127.0.0.1', 6379)
+ with pytest.raises(SlaveNotFoundError):
+ next(rotator)