summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGevorg Davoian <gdavoian@mirantis.com>2016-10-04 12:36:13 +0300
committerGevorg Davoian <gdavoian@mirantis.com>2016-10-10 15:13:15 +0300
commitcb3af2167fdb1ba8977163217169f0beedcae5e7 (patch)
tree3c74561aac82b31e8c695cd1c65d3bc02b70c190
parentcb13e65bed7f12d8e03abd94e3e343afffd0c475 (diff)
downloadoslo-messaging-cb3af2167fdb1ba8977163217169f0beedcae5e7.tar.gz
[zmq] Maintain several redis hosts
This patch makes it possible to maintain several redis hosts at once in order to increase driver's reliability and fault tolerance. Change-Id: Id6f63a4bb67a39340a74d16144c79028c7af245d
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py11
-rw-r--r--oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py316
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py18
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_options.py2
-rw-r--r--oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py27
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py51
-rw-r--r--setup.cfg1
7 files changed, 292 insertions, 134 deletions
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 90c2c20..7ab43fb 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -102,7 +102,7 @@ class ZmqDriver(base.BaseDriver):
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
- self.get_matchmaker_backend(url),
+ self.get_matchmaker_backend(self.conf, url),
).driver(self.conf, url=url)
client_cls = zmq_client.ZmqClientProxy
@@ -124,12 +124,13 @@ class ZmqDriver(base.BaseDriver):
super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
- def get_matchmaker_backend(self, url):
- zmq_transport, p, matchmaker_backend = url.transport.partition('+')
+ @staticmethod
+ def get_matchmaker_backend(conf, url):
+ zmq_transport, _, matchmaker_backend = url.transport.partition('+')
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
if not matchmaker_backend:
- return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker
- elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
+ return conf.oslo_messaging_zmq.rpc_zmq_matchmaker
+ if matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
raise rpc_common.RPCException(
_LE("Incorrect matchmaker backend name %(backend_name)s!"
"Available names are: %(available_names)s") %
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py
index a1aedf2..196f87f 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py
@@ -1,3 +1,4 @@
+# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -11,15 +12,21 @@
# License for the specific language governing permissions and limitations
# under the License.
+import abc
+import functools
import logging
-from retrying import retry
+import random
+import time
from oslo_config import cfg
from oslo_utils import importutils
+from retrying import retry
+import six
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._i18n import _LW, _LE
+from oslo_messaging._drivers.zmq_driver import zmq_updater
+from oslo_messaging._i18n import _LE, _LI, _LW
redis = importutils.try_import('redis')
redis_sentinel = importutils.try_import('redis.sentinel')
@@ -67,20 +74,46 @@ _PUBLISHERS_KEY = "PUBLISHERS"
_ROUTERS_KEY = "ROUTERS"
-def redis_connection_warn(func):
- def func_wrapper(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except redis.ConnectionError:
- LOG.warning(_LW("Redis is currently not available. "
- "Messages are being sent to known targets using "
- "existing connections. But new nodes "
- "can not be discovered until Redis is up "
- "and running."))
+def write_to_redis_connection_warn(func):
+ @functools.wraps(func)
+ def func_wrapper(self, *args, **kwargs):
+ # try to perform a write operation to all available hosts
+ success = False
+ for redis_instance in self._redis_instances:
+ if not redis_instance._is_available:
+ continue
+ try:
+ func(self, redis_instance, *args, **kwargs)
+ success = True
+ except redis.ConnectionError:
+ LOG.warning(_LW("Redis host %s is not available now."),
+ redis_instance._address)
+ redis_instance._is_available = False
+ redis_instance._ready_from = float("inf")
+ if not success:
raise zmq_matchmaker_base.MatchmakerUnavailable()
return func_wrapper
+def read_from_redis_connection_warn(func):
+ @functools.wraps(func)
+ def func_wrapper(self, *args, **kwargs):
+ # try to perform a read operation from any available and ready host
+ for redis_instance in self._redis_instances:
+ if not redis_instance._is_available \
+ or redis_instance._ready_from > time.time():
+ continue
+ try:
+ return func(self, redis_instance, *args, **kwargs)
+ except redis.ConnectionError:
+ LOG.warning(_LW("Redis host %s is not available now."),
+ redis_instance._address)
+ redis_instance._is_available = False
+ redis_instance._ready_from = float("inf")
+ raise zmq_matchmaker_base.MatchmakerUnavailable()
+ return func_wrapper
+
+
def no_reraise(func):
def func_wrapper(*args, **kwargs):
try:
@@ -107,131 +140,84 @@ def retry_if_empty(hosts):
return not hosts
-class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
+@six.add_metaclass(abc.ABCMeta)
+class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
def __init__(self, conf, *args, **kwargs):
- super(MatchmakerRedis, self).__init__(conf, *args, **kwargs)
- self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
if redis is None:
raise ImportError(_LE("Redis package is not available!"))
- self.sentinel_hosts = self._extract_sentinel_options()
- if not self.sentinel_hosts:
- self.standalone_redis = self._extract_standalone_redis_options()
- self._redis = redis.StrictRedis(
- host=self.standalone_redis["host"],
- port=self.standalone_redis["port"],
- password=self.standalone_redis["password"]
- )
- else:
- socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
- sentinel = redis.sentinel.Sentinel(
- sentinels=self.sentinel_hosts,
- socket_timeout=socket_timeout
- )
-
- self._redis = sentinel.master_for(
- self.conf.matchmaker_redis.sentinel_group_name,
- socket_timeout=socket_timeout
- )
-
- def _extract_sentinel_options(self):
- if self.url and self.url.hosts:
- if len(self.url.hosts) > 1:
- return [(host.hostname, host.port) for host in self.url.hosts]
- elif self.conf.matchmaker_redis.sentinel_hosts:
- s = self.conf.matchmaker_redis.sentinel_hosts
- return [tuple(i.split(":")) for i in s]
+ super(MatchmakerRedisBase, self).__init__(conf, *args, **kwargs)
- def _extract_standalone_redis_options(self):
- if self.url and self.url.hosts:
- redis_host = self.url.hosts[0]
- return {"host": redis_host.hostname,
- "port": redis_host.port,
- "password": redis_host.password}
- else:
- return {"host": self.conf.matchmaker_redis.host,
- "port": self.conf.matchmaker_redis.port,
- "password": self.conf.matchmaker_redis.password}
+ self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
- def _add_key_with_expire(self, key, value, expire):
- self._redis.sadd(key, value)
- if expire > 0:
- self._redis.expire(key, expire)
+ @abc.abstractmethod
+ def _sadd(self, key, value, expire):
+ pass
+
+ @abc.abstractmethod
+ def _srem(self, key, value):
+ pass
+
+ @abc.abstractmethod
+ def _smembers(self, key):
+ pass
@no_reraise
- @redis_connection_warn
def register_publisher(self, hostname, expire=-1):
- host_str = ",".join(hostname)
- self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire)
+ self._sadd(_PUBLISHERS_KEY, ','.join(hostname), expire)
@no_reraise
- @redis_connection_warn
def unregister_publisher(self, hostname):
- host_str = ",".join(hostname)
- self._redis.srem(_PUBLISHERS_KEY, host_str)
+ self._srem(_PUBLISHERS_KEY, ','.join(hostname))
@empty_list_on_error
- @redis_connection_warn
def get_publishers(self):
- hosts = []
- hosts.extend([tuple(host_str.split(","))
- for host_str in
- self._get_hosts_by_key(_PUBLISHERS_KEY)])
- return hosts
+ return [tuple(hostname.split(',')) for hostname
+ in self._smembers(_PUBLISHERS_KEY)]
@no_reraise
- @redis_connection_warn
def register_router(self, hostname, expire=-1):
- self._add_key_with_expire(_ROUTERS_KEY, hostname, expire)
+ self._sadd(_ROUTERS_KEY, hostname, expire)
@no_reraise
- @redis_connection_warn
def unregister_router(self, hostname):
- self._redis.srem(_ROUTERS_KEY, hostname)
+ self._srem(_ROUTERS_KEY, hostname)
@empty_list_on_error
- @redis_connection_warn
def get_routers(self):
- return self._get_hosts_by_key(_ROUTERS_KEY)
+ return self._smembers(_ROUTERS_KEY)
- @redis_connection_warn
def get_hosts_by_key(self, key):
- return self._get_hosts_by_key(key)
+ return self._smembers(key)
- def _get_hosts_by_key(self, key):
- return self._redis.smembers(key)
-
- @redis_connection_warn
def register(self, target, hostname, listener_type, expire=-1):
if target.server:
key = zmq_address.target_to_key(target, listener_type)
- self._add_key_with_expire(key, hostname, expire)
+ self._sadd(key, hostname, expire)
key = zmq_address.prefix_str(target.topic, listener_type)
- self._add_key_with_expire(key, hostname, expire)
+ self._sadd(key, hostname, expire)
@no_reraise
- @redis_connection_warn
def unregister(self, target, hostname, listener_type):
if target.server:
key = zmq_address.target_to_key(target, listener_type)
- self._redis.srem(key, hostname)
+ self._srem(key, hostname)
key = zmq_address.prefix_str(target.topic, listener_type)
- self._redis.srem(key, hostname)
+ self._srem(key, hostname)
- @redis_connection_warn
def get_hosts(self, target, listener_type):
hosts = []
if target.server:
key = zmq_address.target_to_key(target, listener_type)
- hosts.extend(self._get_hosts_by_key(key))
+ hosts.extend(self._smembers(key))
if not hosts:
key = zmq_address.prefix_str(target.topic, listener_type)
- hosts.extend(self._get_hosts_by_key(key))
+ hosts.extend(self._smembers(key))
LOG.debug("[Redis] get_hosts for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})
@@ -241,10 +227,9 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
def get_hosts_retry(self, target, listener_type):
return self._retry_method(target, listener_type, self.get_hosts)
- @redis_connection_warn
def get_hosts_fanout(self, target, listener_type):
key = zmq_address.prefix_str(target.topic, listener_type)
- hosts = list(self._get_hosts_by_key(key))
+ hosts = list(self._smembers(key))
LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts})
@@ -262,3 +247,154 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase):
def _get_hosts_retry(target, listener_type):
return method(target, listener_type)
return _get_hosts_retry(target, listener_type)
+
+
+class MatchmakerRedis(MatchmakerRedisBase):
+
+ def __init__(self, conf, *args, **kwargs):
+ super(MatchmakerRedis, self).__init__(conf, *args, **kwargs)
+
+ self._redis_hosts = self._extract_redis_hosts()
+
+ self._redis_instances = [
+ redis.StrictRedis(host=redis_host["host"],
+ port=redis_host["port"],
+ password=redis_host["password"])
+ for redis_host in self._redis_hosts
+ ]
+
+ for redis_host, redis_instance \
+ in six.moves.zip(self._redis_hosts, self._redis_instances):
+ address = "{host}:{port}".format(host=redis_host["host"],
+ port=redis_host["port"])
+ redis_instance._address = address
+ is_available = self._check_availability(redis_instance)
+ if is_available:
+ redis_instance._is_available = True
+ redis_instance._ready_from = time.time()
+ else:
+ LOG.warning(_LW("Redis host %s is not available now."),
+ address)
+ redis_instance._is_available = False
+ redis_instance._ready_from = float("inf")
+
+ # NOTE(gdavoian): store instances in a random order
+ # (for the sake of load balancing)
+ random.shuffle(self._redis_instances)
+
+ self._availability_updater = \
+ MatchmakerRedisAvailabilityUpdater(self.conf, self)
+
+ def _extract_redis_hosts(self):
+ if self.url and self.url.hosts:
+ return [{"host": redis_host.hostname,
+ "port": redis_host.port,
+ "password": redis_host.password}
+ for redis_host in self.url.hosts]
+ else:
+ # FIXME(gdavoian): remove the code below along with the
+ # corresponding deprecated options in the next release
+ return [{"host": self.conf.matchmaker_redis.host,
+ "port": self.conf.matchmaker_redis.port,
+ "password": self.conf.matchmaker_redis.password}]
+
+ @staticmethod
+ def _check_availability(redis_instance):
+ try:
+ redis_instance.ping()
+ return True
+ except redis.ConnectionError:
+ return False
+
+ @write_to_redis_connection_warn
+ def _sadd(self, redis_instance, key, value, expire):
+ redis_instance.sadd(key, value)
+ if expire > 0:
+ redis_instance.expire(key, expire)
+
+ @write_to_redis_connection_warn
+ def _srem(self, redis_instance, key, value):
+ redis_instance.srem(key, value)
+
+ @read_from_redis_connection_warn
+ def _smembers(self, redis_instance, key):
+ return redis_instance.smembers(key)
+
+
+class MatchmakerRedisAvailabilityUpdater(zmq_updater.UpdaterBase):
+
+ _MIN_SLEEP_FOR = 10
+
+ def __init__(self, conf, matchmaker):
+ super(MatchmakerRedisAvailabilityUpdater, self).__init__(
+ conf, matchmaker, self._update_availability,
+ sleep_for=conf.oslo_messaging_zmq.zmq_target_update
+ )
+
+ def _update_availability(self):
+ fraction_of_available_instances = 0
+ for redis_instance in self.matchmaker._redis_instances:
+ if not redis_instance._is_available:
+ is_available = \
+ self.matchmaker._check_availability(redis_instance)
+ if is_available:
+ LOG.info(_LI("Redis host %s is available again."),
+ redis_instance._address)
+ fraction_of_available_instances += 1
+ # NOTE(gdavoian): mark an instance as available for
+ # writing to, but wait until all services register
+ # themselves in it for making the instance ready for
+ # reading from
+ redis_instance._is_available = True
+ redis_instance._ready_from = time.time() + \
+ self.conf.oslo_messaging_zmq.zmq_target_update
+ else:
+ fraction_of_available_instances += 1
+ fraction_of_available_instances /= \
+ float(len(self.matchmaker._redis_instances))
+ # NOTE(gdavoian): make the sleep time proportional to the number of
+ # currently available instances
+ self._sleep_for = max(self.conf.oslo_messaging_zmq.zmq_target_update *
+ fraction_of_available_instances,
+ self._MIN_SLEEP_FOR)
+
+
+class MatchmakerSentinel(MatchmakerRedisBase):
+
+ def __init__(self, conf, *args, **kwargs):
+ super(MatchmakerSentinel, self).__init__(conf, *args, **kwargs)
+
+ self._sentinel_hosts = self._extract_sentinel_hosts()
+
+ socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
+
+ sentinel = redis_sentinel.Sentinel(
+ sentinels=self._sentinel_hosts,
+ socket_timeout=socket_timeout
+ )
+
+ self._redis_instance = sentinel.master_for(
+ self.conf.matchmaker_redis.sentinel_group_name,
+ socket_timeout=socket_timeout
+ )
+
+ def _extract_sentinel_hosts(self):
+ if self.url and self.url.hosts:
+ return [(sentinel_host.hostname, sentinel_host.port)
+ for sentinel_host in self.url.hosts]
+ elif self.conf.matchmaker_redis.sentinel_hosts:
+ return [tuple(sentinel_host.split(':')) for sentinel_host
+ in self.conf.matchmaker_redis.sentinel_hosts]
+ else:
+ return []
+
+ def _sadd(self, key, value, expire):
+ self._redis_instance.sadd(key, value)
+ if expire > 0:
+ self._redis_instance.expire(key, expire)
+
+ def _srem(self, key, value):
+ self._redis_instance.srem(key, value)
+
+ def _smembers(self, key):
+ return self._redis_instance.smembers(key)
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index bfd6f98..76ade41 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -19,9 +19,11 @@ import socket
from oslo_config import cfg
from stevedore import driver
+from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
+from oslo_messaging import transport
LOG = logging.getLogger(__name__)
@@ -51,7 +53,10 @@ zmq_proxy_opts = [
cfg.BoolOpt('ack_pub_sub', default=False,
help='Use acknowledgements for notifying senders about '
'receiving their fanout messages. '
- 'The option is ignored if PUB/SUB is disabled.')
+ 'The option is ignored if PUB/SUB is disabled.'),
+
+ cfg.StrOpt('url', default='zmq://127.0.0.1:6379/',
+ help='ZMQ-driver transport URL with additional configurations')
]
@@ -79,6 +84,8 @@ def parse_command_line_args(conf):
parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
action='store_true',
help='Acknowledge PUB/SUB messages')
+ parser.add_argument('-u', '--url', dest='url', type=str,
+ help='Transport URL with configurations')
parser.add_argument('-d', '--debug', dest='debug', action='store_true',
help='Turn on DEBUG logging level instead of INFO')
@@ -108,6 +115,8 @@ def parse_command_line_args(conf):
if args.ack_pub_sub:
conf.set_override('ack_pub_sub', args.ack_pub_sub,
group='zmq_proxy_opts')
+ if args.url:
+ conf.set_override('url', args.url, group='zmq_proxy_opts')
class ZmqProxy(object):
@@ -152,10 +161,13 @@ class ZmqProxy(object):
def __init__(self, conf):
super(ZmqProxy, self).__init__()
self.conf = conf
+ url = transport.TransportURL.parse(
+ self.conf, url=self.conf.zmq_proxy_opts.url
+ )
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
- self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
- ).driver(self.conf)
+ impl_zmq.ZmqDriver.get_matchmaker_backend(self.conf, url)
+ ).driver(self.conf, url=url)
self.context = zmq.Context()
self.proxy = self._choose_proxy_implementation()
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
index 5bbe02e..eec2004 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
@@ -20,7 +20,7 @@ from oslo_messaging._drivers import base
from oslo_messaging import server
-MATCHMAKER_BACKENDS = ('redis', 'dummy')
+MATCHMAKER_BACKENDS = ('redis', 'sentinel', 'dummy')
MATCHMAKER_DEFAULT = 'redis'
diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
index 5eda0cc..7c117ff 100644
--- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
+++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
@@ -12,12 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
-from fixtures._fixtures import timeout
import inspect
-import retrying
from stevedore import driver
import testscenarios
-import testtools
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
@@ -31,8 +28,7 @@ def redis_available():
if not redis:
return False
try:
- c = redis.StrictRedis(socket_timeout=1)
- c.ping()
+ redis.StrictRedis(socket_timeout=1).ping()
return True
except redis.exceptions.ConnectionError:
return False
@@ -41,7 +37,6 @@ def redis_available():
load_tests = testscenarios.load_tests_apply_scenarios
-@testtools.skipIf(not redis_available(), "redis unavailable")
class TestImplMatchmaker(test_utils.BaseTestCase):
scenarios = [
@@ -52,13 +47,18 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
def setUp(self):
super(TestImplMatchmaker, self).setUp()
+ if self.rpc_zmq_matchmaker == "redis":
+ if not redis_available():
+ self.skipTest("redis unavailable")
+
self.test_matcher = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.rpc_zmq_matchmaker,
).driver(self.conf)
if self.rpc_zmq_matchmaker == "redis":
- self.addCleanup(self.test_matcher._redis.flushdb)
+ for redis_instance in self.test_matcher._redis_instances:
+ self.addCleanup(redis_instance.flushdb)
self.target = oslo_messaging.Target(topic="test_topic")
self.host1 = b"test_host1"
@@ -77,7 +77,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2])
- def test_register_unsibscribe(self):
+ def test_register_unregister(self):
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host2, "test")
@@ -95,12 +95,7 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
- hosts = []
- try:
- hosts = self.test_matcher.get_hosts(target, "test")
- except (timeout.TimeoutException, retrying.RetryError):
- pass
- self.assertEqual([], hosts)
+ self.assertEqual([], self.test_matcher.get_hosts(target, "test"))
def test_handle_redis_package_error(self):
if self.rpc_zmq_matchmaker == "redis":
@@ -108,10 +103,10 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
module = inspect.getmodule(self.test_matcher)
redis_package = module.redis
- # 'redis' variable is set None, when importing package is failed
+ # 'redis' variable is set to None, when package importing is failed
module.redis = None
self.assertRaises(ImportError, self.test_matcher.__init__,
self.conf)
- # retrieve 'redis' variable wihch is set originally
+ # retrieve 'redis' variable which is set originally
module.redis = redis_package
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
index 2456ddf..765c5fa 100644
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py
@@ -22,7 +22,6 @@ from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
-
zmq = zmq_async.import_zmq()
@@ -44,7 +43,7 @@ class TestZmqTransportUrl(test_utils.BaseTestCase):
driver.matchmaker.__class__)
self.assertEqual('zmq', driver.matchmaker.url.transport)
- def test_error_name(self):
+ def test_error_url(self):
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
def test_dummy_url(self):
@@ -59,30 +58,44 @@ class TestZmqTransportUrl(test_utils.BaseTestCase):
driver.matchmaker.__class__)
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
- def test_redis_url_no_creds(self):
- driver, url = self.setup_url("zmq+redis://host:65123/")
+ def test_sentinel_url(self):
+ driver, url = self.setup_url("zmq+sentinel:///")
+ self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
+ driver.matchmaker.__class__)
+ self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
+
+ def test_host_with_credentials_url(self):
+ driver, url = self.setup_url("zmq://:password@host:60000/")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
- self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
- self.assertEqual("host", driver.matchmaker.standalone_redis["host"])
- self.assertEqual(65123, driver.matchmaker.standalone_redis["port"])
+ self.assertEqual('zmq', driver.matchmaker.url.transport)
+ self.assertEqual(
+ [{"host": "host", "port": 60000, "password": "password"}],
+ driver.matchmaker._redis_hosts
+ )
- def test_redis_url_no_port(self):
- driver, url = self.setup_url("zmq+redis://:p12@host:65123/")
+ def test_redis_multiple_hosts_url(self):
+ driver, url = self.setup_url(
+ "zmq+redis://host1:60001,host2:60002,host3:60003/"
+ )
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
- self.assertEqual("host", driver.matchmaker.standalone_redis["host"])
- self.assertEqual(65123, driver.matchmaker.standalone_redis["port"])
- self.assertEqual("p12", driver.matchmaker.standalone_redis["password"])
+ self.assertEqual(
+ [{"host": "host1", "port": 60001, "password": None},
+ {"host": "host2", "port": 60002, "password": None},
+ {"host": "host3", "port": 60003, "password": None}],
+ driver.matchmaker._redis_hosts
+ )
def test_sentinel_multiple_hosts_url(self):
driver, url = self.setup_url(
- "zmq+redis://sentinel1:20001,sentinel2:20001,sentinel3:20001/")
- self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
+ "zmq+sentinel://host1:20001,host2:20002,host3:20003/"
+ )
+ self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
driver.matchmaker.__class__)
- self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
- self.assertEqual(3, len(driver.matchmaker.sentinel_hosts))
- expected = [("sentinel1", 20001), ("sentinel2", 20001),
- ("sentinel3", 20001)]
- self.assertEqual(expected, driver.matchmaker.sentinel_hosts)
+ self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
+ self.assertEqual(
+ [("host1", 20001), ("host2", 20002), ("host3", 20003)],
+ driver.matchmaker._sentinel_hosts
+ )
diff --git a/setup.cfg b/setup.cfg
index 014f158..90abec9 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -76,6 +76,7 @@ oslo.messaging.zmq.matchmaker =
# Matchmakers for ZeroMQ
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy
redis = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerRedis
+ sentinel = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerSentinel
oslo.config.opts =
oslo.messaging = oslo_messaging.opts:list_opts