diff options
author | Andy McCurdy <andy@andymccurdy.com> | 2013-09-13 11:40:39 -0700 |
---|---|---|
committer | Andy McCurdy <andy@andymccurdy.com> | 2013-09-13 11:40:39 -0700 |
commit | 89d88d6eb58a8b845ed063012c652a6555d54b1b (patch) | |
tree | 34fdcded4e30455f03ad39fa925300f0d6f1db2b | |
parent | eab8e29238b950f9d53f50c641ff11762f48b973 (diff) | |
parent | b2fc50b6e315e527064ed0e74a39174979a27361 (diff) | |
download | redis-py-89d88d6eb58a8b845ed063012c652a6555d54b1b.tar.gz |
Merge branch 'vitek-sentinel'
Conflicts:
CHANGES
-rw-r--r-- | CHANGES | 1 | ||||
-rw-r--r-- | README.rst | 46 | ||||
-rw-r--r-- | redis/client.py | 84 | ||||
-rw-r--r-- | redis/sentinel.py | 215 | ||||
-rw-r--r-- | tests/test_sentinel.py | 173 |
5 files changed, 509 insertions, 10 deletions
@@ -5,6 +5,7 @@ the server. Thanks EliFinkelshteyn. * Errors when authenticating (incorrect password) and selecting a database now close the socket. + * Full Sentinel support thanks to Vitja Makarov. Thanks! * 2.8.0 * redis-py should play better with gevent when a gevent Timeout is raised. Thanks leifkb. @@ -394,6 +394,52 @@ execution. >>> pipe.execute() [True, 25] +Sentinel support +^^^^^^^^^^^^^^^^ + +redis-py can be used together with `Redis Sentinel <http://redis.io/topics/sentinel>`_ +to discover Redis nodes. You need to have at least one Sentinel daemon running +in order to use redis-py's Sentinel support. + +Connecting redis-py to the Sentinel instance(s) is easy. You can use a +Sentinel connection to discover the master and slaves network addresses: + +.. code-block:: pycon + + >>> from redis.sentinel import Sentinel + >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) + >>> sentinel.discover_master('mymaster') + ('127.0.0.1', 6379) + >>> sentinel.discover_slaves('mymaster') + [('127.0.0.1', 6380)] + +You can also create Redis client connections from a Sentinel instnace. You can +connect to either the master (for write operations) or a slave (for read-only +operations). + +.. code-block:: pycon + + >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) + >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) + >>> master.set('foo', 'bar') + >>> slave.get('foo') + 'bar' + +The master and slave objects are normal StrictRedis instances with their +connection pool bound to the Sentinel instance. When a Sentinel backed client +attempts to establish a connection, it first queries the Sentinel servers to +determine an appropriate host to connect to. If no server is found, +a MasterNotFoundError or SlaveNotFoundError is raised. Both exceptions are +subclasses of ConnectionError. + +When trying to connect to a slave client, the Sentinel connection pool will +iterate over the list of slaves until it finds one that can be connected to. +If no slaves can be connected to, a connection will be established with the +master. + +See `Guidelines for Redis clients with support for Redis Sentinel +<http://redis.io/topics/sentinel-clients>`_ to learn more about Redis Sentinel. + Author ^^^^^^ diff --git a/redis/client.py b/redis/client.py index 4c3b76a..bd46e3a 100644 --- a/redis/client.py +++ b/redis/client.py @@ -109,19 +109,53 @@ def parse_info(response): return info +SENTINEL_STATE_TYPES = { + 'can-failover-its-master': int, + 'info-refresh': int, + 'last-hello-message': int, + 'last-ok-ping-reply': int, + 'last-ping-reply': int, + 'master-link-down-time': int, + 'master-port': int, + 'num-other-sentinels': int, + 'num-slaves': int, + 'o-down-time': int, + 'pending-commands': int, + 'port': int, + 'quorum': int, + 's-down-time': int, + 'slave-priority': int, +} + + +def parse_sentinel_state(item): + result = pairs_to_dict_typed(item, SENTINEL_STATE_TYPES) + flags = set(result['flags'].split(',')) + for name, flag in (('is_master', 'master'), ('is_slave', 'slave'), + ('is_sdown', 's_down'), ('is_odown', 'o_down'), + ('is_sentinel', 'sentinel'), + ('is_disconnected', 'disconnected'), + ('is_master_down', 'master_down')): + result[name] = flag in flags + return result + + def parse_sentinel(response, **options): "Parse the result of Redis's SENTINEL command" - output = [] - parse = options['parse'] - + parse = options.get('parse') if parse == 'SENTINEL_INFO': - for sub_list in response: - it = iter(sub_list) - output.append(dict(izip(it, it))) - else: - output = response - - return output + return [parse_sentinel_state(item) for item in response] + elif parse == 'SENTINEL_INFO_MASTERS': + result = {} + for item in response: + state = parse_sentinel_state(item) + result[state['name']] = state + return result + elif parse == 'SENTINEL_ADDR_PORT': + if response is None: + return + return response[0], int(response[1]) + return response def pairs_to_dict(response): @@ -130,6 +164,16 @@ def pairs_to_dict(response): return dict(izip(it, it)) +def pairs_to_dict_typed(response, type_info): + it = iter(response) + result = {} + for key, value in izip(it, it): + if key in type_info: + value = type_info[key](value) + result[key] = value + return result + + def zset_score_pairs(response, **options): """ If ``withscores`` is specified in the options, return the response as @@ -511,6 +555,26 @@ class StrictRedis(object): parse = 'SENTINEL' return self.execute_command('SENTINEL', *args, **{'parse': parse}) + def sentinel_masters(self): + "Returns a dictionary containing the master's state." + return self.execute_command('SENTINEL', 'masters', + parse='SENTINEL_INFO_MASTERS') + + def sentinel_slaves(self, service_name): + "Returns a list of slaves for ``service_name``" + return self.execute_command('SENTINEL', 'slaves', service_name, + parse='SENTINEL_INFO') + + def sentinel_sentinels(self, service_name): + "Returns a list of sentinels for ``service_name``" + return self.execute_command('SENTINEL', 'sentinels', service_name, + parse='SENTINEL_INFO') + + def sentinel_get_master_addr_by_name(self, service_name): + "Returns a (host, port) pair for the given ``service_name``" + return self.execute_command('SENTINEL', 'get-master-addr-by-name', + service_name, parse='SENTINEL_ADDR_PORT') + def shutdown(self): "Shutdown the server" try: diff --git a/redis/sentinel.py b/redis/sentinel.py new file mode 100644 index 0000000..bdb955f --- /dev/null +++ b/redis/sentinel.py @@ -0,0 +1,215 @@ +import random + +from redis.client import StrictRedis +from redis.connection import ConnectionPool, Connection +from redis.exceptions import ConnectionError, ResponseError +from redis._compat import xrange, nativestr + + +class MasterNotFoundError(ConnectionError): + pass + + +class SlaveNotFoundError(ConnectionError): + pass + + +class SentinelManagedConnection(Connection): + def __init__(self, **kwargs): + self.connection_pool = kwargs.pop('connection_pool') + super(SentinelManagedConnection, self).__init__(**kwargs) + + def connect_to(self, address): + self.host, self.port = address + super(SentinelManagedConnection, self).connect() + if self.connection_pool.check_connection: + self.send_command('PING') + if nativestr(self.read_response()) != 'PONG': + raise ConnectionError('PING failed') + + def connect(self): + if self._sock: + return # already connected + if self.connection_pool.is_master: + self.connect_to(self.connection_pool.get_master_address()) + else: + for slave in self.connection_pool.rotate_slaves(): + try: + return self.connect_to(slave) + except ConnectionError: + continue + raise SlaveNotFoundError # Never be here + + +class SentinelConnectionPool(ConnectionPool): + """ + Sentinel backed connection pool. + + If ``check_connection`` flag is set to True, SentinelManagedConnection + sends a PING command right after establishing the connection. + """ + + def __init__(self, service_name, sentinel_manager, **kwargs): + kwargs['connection_class'] = kwargs.get( + 'connection_class', SentinelManagedConnection) + self.is_master = kwargs.pop('is_master', True) + self.check_connection = kwargs.pop('check_connection', False) + super(SentinelConnectionPool, self).__init__(**kwargs) + self.connection_kwargs['connection_pool'] = self + self.service_name = service_name + self.sentinel_manager = sentinel_manager + self.master_address = None + self.slave_rr_counter = None + + def get_master_address(self): + master_address = self.sentinel_manager.discover_master( + self.service_name) + if not self.is_master: + pass + elif self.master_address is None: + self.master_address = master_address + elif master_address != self.master_address: + self.disconnect() # Master address changed + return master_address + + def rotate_slaves(self): + "Round-robin slave balancer" + slaves = self.sentinel_manager.discover_slaves(self.service_name) + if slaves: + if self.slave_rr_counter is None: + self.slave_rr_counter = random.randint(0, len(slaves) - 1) + for _ in xrange(len(slaves)): + self.slave_rr_counter = ( + self.slave_rr_counter + 1) % len(slaves) + slave = slaves[self.slave_rr_counter] + yield slave + # Fallback to the master connection + try: + yield self.get_master_address() + except MasterNotFoundError: + pass + raise SlaveNotFoundError('No slave found for %r' % (self.service_name)) + + +class Sentinel(object): + """ + Redis Sentinel cluster client + + >>> from redis.sentinel import Sentinel + >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1) + >>> master = sentinel.master_for('mymaster', socket_timeout=0.1) + >>> master.set('foo', 'bar') + >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1) + >>> slave.get('foo') + 'bar' + + ``sentinels`` is a list of sentinel nodes. Each node is represented by + a pair (hostname, port). + + Use ``socket_timeout`` to specify a timeout for sentinel clients. + It's recommended to use short timeouts. + + Use ``min_other_sentinels`` to filter out sentinels with not enough peers. + """ + + def __init__(self, sentinels, password=None, socket_timeout=None, + min_other_sentinels=0): + self.sentinels = [StrictRedis(hostname, port, password=password, + socket_timeout=socket_timeout) + for hostname, port in sentinels] + self.min_other_sentinels = min_other_sentinels + + def check_master_state(self, state, service_name): + if not state['is_master'] or state['is_sdown'] or state['is_odown']: + return False + # Check if our sentinel doesn't see other nodes + if state['num-other-sentinels'] < self.min_other_sentinels: + return False + return True + + def discover_master(self, service_name): + """ + Asks sentinel servers for the Redis master's address corresponding + to the service labeled ``service_name``. + + Returns a pair (address, port) or raises MasterNotFoundError if no + master is found. + """ + for sentinel_no, sentinel in enumerate(self.sentinels): + try: + masters = sentinel.sentinel_masters() + except ConnectionError: + continue + state = masters.get(service_name) + if state and self.check_master_state(state, service_name): + # Put this sentinel at the top of the list + self.sentinels[0], self.sentinels[sentinel_no] = ( + sentinel, self.sentinels[0]) + return state['ip'], state['port'] + raise MasterNotFoundError("No master found for %r" % (service_name,)) + + def filter_slaves(self, slaves): + "Remove slaves that are in an ODOWN or SDOWN state" + slaves_alive = [] + for slave in slaves: + if slave['is_odown'] or slave['is_sdown']: + continue + slaves_alive.append((slave['ip'], slave['port'])) + return slaves_alive + + def discover_slaves(self, service_name): + "Returns a list of alive slaves for service ``service_name``" + for sentinel in self.sentinels: + try: + slaves = sentinel.sentinel_slaves(service_name) + except (ConnectionError, ResponseError): + continue + slaves = self.filter_slaves(slaves) + if slaves: + return slaves + return [] + + def master_for(self, service_name, redis_class=StrictRedis, + connection_pool_class=SentinelConnectionPool, **kwargs): + """ + Returns a redis client instance for the ``service_name`` master. + + A SentinelConnectionPool class is used to retrive the master's + address before establishing a new connection. + + NOTE: If the master's address has changed, any cached connections to + the old master are closed. + + By default clients will be a redis.StrictRedis instance. Specify a + different class to the ``redis_class`` argument if you desire + something different. + + The ``connection_pool_class`` specifies the connection pool to use. + The SentinelConnectionPool will be used by default. + + All other arguments are passed directly to the SentinelConnectionPool. + """ + kwargs['is_master'] = True + return redis_class(connection_pool=connection_pool_class( + service_name, self, **kwargs)) + + def slave_for(self, service_name, redis_class=StrictRedis, + connection_pool_class=SentinelConnectionPool, **kwargs): + """ + Returns redis client instance for the ``service_name`` slave(s). + + A SentinelConnectionPool class is used to retrive the slave's + address before establishing a new connection. + + By default clients will be a redis.StrictRedis instance. Specify a + different class to the ``redis_class`` argument if you desire + something different. + + The ``connection_pool_class`` specifies the connection pool to use. + The SentinelConnectionPool will be used by default. + + All other arguments are passed directly to the SentinelConnectionPool. + """ + kwargs['is_master'] = False + return redis_class(connection_pool=connection_pool_class( + service_name, self, **kwargs)) diff --git a/tests/test_sentinel.py b/tests/test_sentinel.py new file mode 100644 index 0000000..0a6e98b --- /dev/null +++ b/tests/test_sentinel.py @@ -0,0 +1,173 @@ +from __future__ import with_statement +import pytest + +from redis import exceptions +from redis.sentinel import (Sentinel, SentinelConnectionPool, + MasterNotFoundError, SlaveNotFoundError) +from redis._compat import next +import redis.sentinel + + +class SentinelTestClient(object): + def __init__(self, cluster, id): + self.cluster = cluster + self.id = id + + def sentinel_masters(self): + self.cluster.connection_error_if_down(self) + return {self.cluster.service_name: self.cluster.master} + + def sentinel_slaves(self, master_name): + self.cluster.connection_error_if_down(self) + if master_name != self.cluster.service_name: + return [] + return self.cluster.slaves + + +class SentinelTestCluster(object): + def __init__(self, service_name='mymaster', ip='127.0.0.1', port=6379): + self.clients = {} + self.master = { + 'ip': ip, + 'port': port, + 'is_master': True, + 'is_sdown': False, + 'is_odown': False, + 'num-other-sentinels': 0, + } + self.service_name = service_name + self.slaves = [] + self.nodes_down = set() + + def connection_error_if_down(self, node): + if node.id in self.nodes_down: + raise exceptions.ConnectionError + + def client(self, host, port, **kwargs): + return SentinelTestClient(self, (host, port)) + + +@pytest.fixture() +def cluster(request): + def teardown(): + redis.sentinel.StrictRedis = saved_StrictRedis + cluster = SentinelTestCluster() + saved_StrictRedis = redis.sentinel.StrictRedis + redis.sentinel.StrictRedis = cluster.client + request.addfinalizer(teardown) + return cluster + + +@pytest.fixture() +def sentinel(request, cluster): + return Sentinel([('foo', 26379), ('bar', 26379)]) + + +def test_discover_master(sentinel): + address = sentinel.discover_master('mymaster') + assert address == ('127.0.0.1', 6379) + + +def test_discover_master_error(sentinel): + with pytest.raises(MasterNotFoundError): + sentinel.discover_master('xxx') + + +def test_discover_master_sentinel_down(cluster, sentinel): + # Put first sentinel 'foo' down + cluster.nodes_down.add(('foo', 26379)) + address = sentinel.discover_master('mymaster') + assert address == ('127.0.0.1', 6379) + # 'bar' is now first sentinel + assert sentinel.sentinels[0].id == ('bar', 26379) + + +def test_master_min_other_sentinels(cluster): + sentinel = Sentinel([('foo', 26379)], min_other_sentinels=1) + # min_other_sentinels + with pytest.raises(MasterNotFoundError): + sentinel.discover_master('mymaster') + cluster.master['num-other-sentinels'] = 2 + address = sentinel.discover_master('mymaster') + assert address == ('127.0.0.1', 6379) + + +def test_master_odown(cluster, sentinel): + cluster.master['is_odown'] = True + with pytest.raises(MasterNotFoundError): + sentinel.discover_master('mymaster') + + +def test_master_sdown(cluster, sentinel): + cluster.master['is_sdown'] = True + with pytest.raises(MasterNotFoundError): + sentinel.discover_master('mymaster') + + +def test_discover_slaves(cluster, sentinel): + assert sentinel.discover_slaves('mymaster') == [] + + cluster.slaves = [ + {'ip': 'slave0', 'port': 1234, 'is_odown': False, 'is_sdown': False}, + {'ip': 'slave1', 'port': 1234, 'is_odown': False, 'is_sdown': False}, + ] + assert sentinel.discover_slaves('mymaster') == [ + ('slave0', 1234), ('slave1', 1234)] + + # slave0 -> ODOWN + cluster.slaves[0]['is_odown'] = True + assert sentinel.discover_slaves('mymaster') == [ + ('slave1', 1234)] + + # slave1 -> SDOWN + cluster.slaves[1]['is_sdown'] = True + assert sentinel.discover_slaves('mymaster') == [] + + cluster.slaves[0]['is_odown'] = False + cluster.slaves[1]['is_sdown'] = False + + # node0 -> DOWN + cluster.nodes_down.add(('foo', 26379)) + assert sentinel.discover_slaves('mymaster') == [ + ('slave0', 1234), ('slave1', 1234)] + + +def test_master_for(cluster, sentinel): + master = sentinel.master_for('mymaster', db=9) + assert master.ping() + assert master.connection_pool.master_address == ('127.0.0.1', 6379) + + # Use internal connection check + master = sentinel.master_for('mymaster', db=9, check_connection=True) + assert master.ping() + + +def test_slave_for(cluster, sentinel): + cluster.slaves = [ + {'ip': '127.0.0.1', 'port': 6379, + 'is_odown': False, 'is_sdown': False}, + ] + slave = sentinel.slave_for('mymaster', db=9) + assert slave.ping() + + +def test_slave_for_slave_not_found_error(cluster, sentinel): + cluster.master['is_odown'] = True + slave = sentinel.slave_for('mymaster', db=9) + with pytest.raises(SlaveNotFoundError): + slave.ping() + + +def test_slave_round_robin(cluster, sentinel): + cluster.slaves = [ + {'ip': 'slave0', 'port': 6379, 'is_odown': False, 'is_sdown': False}, + {'ip': 'slave1', 'port': 6379, 'is_odown': False, 'is_sdown': False}, + ] + pool = SentinelConnectionPool('mymaster', sentinel) + rotator = pool.rotate_slaves() + assert next(rotator) in (('slave0', 6379), ('slave1', 6379)) + assert next(rotator) in (('slave0', 6379), ('slave1', 6379)) + # Fallback to master + assert next(rotator) == ('127.0.0.1', 6379) + with pytest.raises(SlaveNotFoundError): + next(rotator) |