summaryrefslogtreecommitdiff
path: root/tests/drivers/test_impl_zmq.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/drivers/test_impl_zmq.py')
-rw-r--r--tests/drivers/test_impl_zmq.py458
1 files changed, 131 insertions, 327 deletions
diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py
index ddc6753..a6eef2f 100644
--- a/tests/drivers/test_impl_zmq.py
+++ b/tests/drivers/test_impl_zmq.py
@@ -1,5 +1,4 @@
-# Copyright 2014 Canonical, Ltd.
-# All Rights Reserved.
+# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -15,27 +14,51 @@
import logging
import socket
+import threading
import fixtures
import testtools
-from six.moves import mock
+import oslo_messaging
+from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._i18n import _
+from oslo_messaging.tests import utils as test_utils
-try:
- import zmq
-except ImportError:
- zmq = None
+LOG = logging.getLogger(__name__)
-from oslo import messaging
-from oslo.utils import importutils
-from oslo_messaging.tests import utils as test_utils
+zmq = zmq_async.import_zmq()
-# eventlet is not yet py3 compatible, so skip if not installed
-eventlet = importutils.try_import('eventlet')
-impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq')
+class TestRPCServerListener(object):
-LOG = logging.getLogger(__name__)
+ def __init__(self, driver):
+ self.driver = driver
+ self.target = None
+ self.listener = None
+ self.executor = zmq_async.get_executor(self._run)
+ self._stop = threading.Event()
+ self._received = threading.Event()
+ self.message = None
+
+ def listen(self, target):
+ self.target = target
+ self.listener = self.driver.listen(self.target)
+ self.executor.execute()
+
+ def _run(self):
+ try:
+ message = self.listener.poll()
+ if message is not None:
+ self._received.set()
+ self.message = message
+ message.reply(reply=True)
+ except Exception:
+ LOG.exception(_("Unexpected exception occurred."))
+
+ def stop(self):
+ self.executor.stop()
def get_unused_port():
@@ -56,7 +79,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
- transport = messaging.get_transport(self.conf)
+ transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
@@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
# Start RPC
LOG.info("Running internal zmq receiver.")
- self.reactor = impl_zmq.ZmqProxy(self.conf)
- self.reactor.consume_in_thread()
+ self.broker = ZmqBroker(self.conf)
+ self.broker.start()
+
+ self.listener = TestRPCServerListener(self.driver)
- self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
@@ -85,7 +109,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase):
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
- transport = messaging.get_transport(self.conf)
+ transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
@@ -94,347 +118,127 @@ class stopRpc(object):
self.attrs = attrs
def __call__(self):
- if self.attrs['reactor']:
- self.attrs['reactor'].close()
+ if self.attrs['broker']:
+ self.attrs['broker'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
+ if self.attrs['listener']:
+ self.attrs['listener'].stop()
class TestZmqBasics(ZmqBaseTestCase):
- def test_start_stop_listener(self):
- target = messaging.Target(topic='testtopic')
- listener = self.driver.listen(target)
- result = listener.poll(0.01)
- self.assertEqual(result, None)
-
def test_send_receive_raises(self):
"""Call() without method."""
- target = messaging.Target(topic='testtopic')
- self.driver.listen(target)
+ target = oslo_messaging.Target(topic='testtopic')
+ self.listener.listen(target)
self.assertRaises(
KeyError,
self.driver.send,
target, {}, {'tx_id': 1}, wait_for_reply=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage')
- def test_send_receive_topic(self, mock_msg):
- """Call() with method."""
- mock_msg.return_value = msg = mock.MagicMock()
- msg.received = received = mock.MagicMock()
- received.failure = False
- received.reply = True
- msg.condition = condition = mock.MagicMock()
- condition.wait.return_value = True
-
- target = messaging.Target(topic='testtopic')
- self.driver.listen(target)
+ def test_send_receive_topic(self):
+ """Call() with topic."""
+
+ target = oslo_messaging.Target(topic='testtopic')
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
wait_for_reply=True)
- self.assertEqual(result, True)
+ self.assertIsNotNone(result)
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- def test_send_receive_fanout(self, mock_call):
- target = messaging.Target(topic='testtopic', fanout=True)
- self.driver.listen(target)
-
- mock_call.__name__ = '_call'
- mock_call.return_value = [True]
+ def test_send_noreply(self):
+ """Cast() with topic."""
+ target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1")
+ self.listener.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
+ wait_for_reply=False)
- self.assertEqual(result, True)
- mock_call.assert_called_once_with(
- self.driver,
- 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'],
- {}, 'fanout~testtopic.127.0.0.1',
- {'tx_id': 1, 'method': 'hello-world'},
- None, False, [], True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True)
- def test_send_receive_direct(self, mock_call):
- # Also verifies fix for bug http://pad.lv/1301723
- target = messaging.Target(topic='testtopic', server='localhost')
- self.driver.listen(target)
+ self.listener._received.wait()
+
+ self.assertIsNone(result)
+ self.assertEqual(True, self.listener._received.isSet())
+ method = self.listener.message.message[u'method']
+ self.assertEqual(u'hello-world', method)
- mock_call.__name__ = '_call'
- mock_call.return_value = [True]
+ @testtools.skip("Not implemented feature")
+ def test_send_fanout(self):
+ target = oslo_messaging.Target(topic='testtopic', fanout=True)
+ self.driver.listen(target)
result = self.driver.send(
target, {},
{'method': 'hello-world', 'tx_id': 1},
- wait_for_reply=True)
+ wait_for_reply=False)
- self.assertEqual(result, True)
- mock_call.assert_called_once_with(
- self.driver,
- 'tcp://localhost:%s' % self.conf['rpc_zmq_port'],
- {}, 'testtopic.localhost',
- {'tx_id': 1, 'method': 'hello-world'},
- None, False, [], True)
+ self.assertIsNone(result)
+ self.assertEqual(True, self.listener._received.isSet())
+ msg_pattern = "{'method': 'hello-world', 'tx_id': 1}"
+ self.assertEqual(msg_pattern, self.listener.message)
+ def test_send_receive_direct(self):
+ """Call() without topic."""
-class TestZmqSocket(test_utils.BaseTestCase):
+ target = oslo_messaging.Target(server='127.0.0.1')
+ self.listener.listen(target)
+ message = {'method': 'hello-world', 'tx_id': 1}
+ context = {}
+ result = self.driver.send(target, context, message,
+ wait_for_reply=True)
+ self.assertTrue(result)
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestZmqSocket, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False,
- subscribe=None)
- self.assertTrue(sock.can_recv)
- self.assertFalse(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False,
- subscribe=None)
- self.assertTrue(sock.can_recv)
- self.assertFalse(sock.can_send)
- self.assertTrue(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False,
- subscribe=None)
- self.assertFalse(sock.can_recv)
- self.assertTrue(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe')
- @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context')
- def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe):
- mock_ctxt = mock.Mock()
- mock_context.return_value = mock_ctxt
- mock_sock = mock.Mock()
- mock_ctxt.socket = mock.Mock(return_value=mock_sock)
- mock_sock.connect = mock.Mock()
- mock_sock.bind = mock.Mock()
- addr = '127.0.0.1'
-
- sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False,
- subscribe=None)
- self.assertFalse(sock.can_recv)
- self.assertTrue(sock.can_send)
- self.assertFalse(sock.can_sub)
- self.assertTrue(mock_sock.connect.called)
- self.assertFalse(mock_sock.bind.called)
-
-
-class TestZmqIncomingMessage(test_utils.BaseTestCase):
+class TestPoller(test_utils.BaseTestCase):
- @testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
- super(TestZmqIncomingMessage, self).setUp()
- self.messaging_conf.transport_driver = 'zmq'
- # Get driver
- transport = messaging.get_transport(self.conf)
- self.driver = transport._driver
+ super(TestPoller, self).setUp()
+ self.poller = zmq_async.get_poller()
+ self.ctx = zmq.Context()
+ self.ADDR_REQ = "ipc://request1"
+
+ def test_poll_blocking(self):
+
+ rep = self.ctx.socket(zmq.REP)
+ rep.bind(self.ADDR_REQ)
+
+ reply_poller = zmq_async.get_reply_poller()
+ reply_poller.register(rep)
+
+ def listener():
+ incoming, socket = reply_poller.poll()
+ self.assertEqual(b'Hello', incoming[0])
+ socket.send_string('Reply')
+ reply_poller.resume_polling(socket)
+
+ executor = zmq_async.get_executor(listener)
+ executor.execute()
+
+ req1 = self.ctx.socket(zmq.REQ)
+ req1.connect(self.ADDR_REQ)
+
+ req2 = self.ctx.socket(zmq.REQ)
+ req2.connect(self.ADDR_REQ)
+
+ req1.send_string('Hello')
+ req2.send_string('Hello')
+
+ reply = req1.recv_string()
+ self.assertEqual('Reply', reply)
+
+ reply = req2.recv_string()
+ self.assertEqual('Reply', reply)
+
+ def test_poll_timeout(self):
+ rep = self.ctx.socket(zmq.REP)
+ rep.bind(self.ADDR_REQ)
+
+ reply_poller = zmq_async.get_reply_poller()
+ reply_poller.register(rep)
- def test_zmqincomingmessage(self):
- msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo')
- msg.reply("abc")
- self.assertIsInstance(
- msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
- self.assertIsInstance(
- msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply)
- self.assertEqual(msg.received.reply, "abc")
- msg.requeue()
-
-
-class TestZmqConnection(ZmqBaseTestCase):
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_create_consumer(self, mock_reactor):
-
- mock_reactor.register = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- topic = 'topic.foo'
- context = mock.Mock()
- inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
- (self.internal_ipc_dir))
- # No Fanout
- conn.create_consumer(topic, context)
- conn.reactor.register.assert_called_with(context, inaddr,
- impl_zmq.zmq.PULL,
- subscribe=None, in_bind=False)
-
- # Reset for next bunch of checks
- conn.reactor.register.reset_mock()
-
- # Fanout
- inaddr = ('ipc://%s/zmq_topic_fanout~topic' %
- (self.internal_ipc_dir))
- conn.create_consumer(topic, context, fanout='subscriber.foo')
- conn.reactor.register.assert_called_with(context, inaddr,
- impl_zmq.zmq.SUB,
- subscribe='subscriber.foo',
- in_bind=False)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor):
- mock_reactor.register = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- topic = 'topic.foo'
- context = mock.Mock()
- inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' %
- (self.internal_ipc_dir))
-
- conn.create_consumer(topic, context)
- conn.reactor.register.assert_called_with(
- context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False)
- conn.reactor.register.reset_mock()
- # Call again with same topic
- conn.create_consumer(topic, context)
- self.assertFalse(conn.reactor.register.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
- autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker):
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.close = mock.Mock()
- mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock()
- conn.close()
- self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called)
- self.assertTrue(conn.reactor.close.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_wait(self, mock_reactor):
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.wait = mock.Mock()
- conn.wait()
- self.assertTrue(conn.reactor.wait.called)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker',
- autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
- def test_zmqconnection_consume_in_thread(self, mock_reactor,
- mock_getmatchmaker):
- mock_getmatchmaker.return_value.start_heartbeat = mock.Mock()
- conn = impl_zmq.Connection(self.driver.conf, self.driver)
- conn.reactor.consume_in_thread = mock.Mock()
- conn.consume_in_thread()
- self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called)
- self.assertTrue(conn.reactor.consume_in_thread.called)
-
-
-class TestZmqListener(ZmqBaseTestCase):
-
- def test_zmqlistener_no_msg(self):
- listener = impl_zmq.ZmqListener(self.driver)
- # Timeout = 0 should return straight away since the queue is empty
- listener.poll(timeout=0)
-
- def test_zmqlistener_w_msg(self):
- listener = impl_zmq.ZmqListener(self.driver)
- kwargs = {'a': 1, 'b': 2}
- m = mock.Mock()
- ctxt = mock.Mock(autospec=impl_zmq.RpcContext)
- message = {'namespace': 'name.space', 'method': m.fake_method,
- 'args': kwargs}
- eventlet.spawn_n(listener.dispatch, ctxt, message)
- resp = listener.poll(timeout=10)
- msg = {'method': m.fake_method, 'namespace': 'name.space',
- 'args': kwargs}
- self.assertEqual(resp.message, msg)
-
-
-class TestZmqDriver(ZmqBaseTestCase):
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
- def test_zmqdriver_send(self, mock_multi_send, mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic'
- msg = 'jeronimo'
- self.driver.send(messaging.Target(topic=topic), context, msg,
- False, 0, False)
- mock_multi_send.assert_called_with(self.driver, mock_cast, context,
- topic, msg,
- allowed_remote_exmods=[],
- envelope=False, pooled=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)
- def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast):
- context = mock.Mock(autospec=impl_zmq.RpcContext)
- topic = 'testtopic.foo'
- topic_reformat = 'testtopic-foo'
- msg = 'jeronimo'
- self.driver.send_notification(messaging.Target(topic=topic), context,
- msg, False, False)
- mock_multi_send.assert_called_with(self.driver, mock_cast, context,
- topic_reformat, msg,
- allowed_remote_exmods=[],
- envelope=False, pooled=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
- def test_zmqdriver_listen(self, mock_connection, mock_listener):
- mock_listener.return_value = listener = mock.Mock()
- mock_connection.return_value = conn = mock.Mock()
- conn.create_consumer = mock.Mock()
- conn.consume_in_thread = mock.Mock()
- topic = 'testtopic.foo'
- self.driver.listen(messaging.Target(topic=topic))
- conn.create_consumer.assert_called_with(topic, listener, fanout=True)
-
- @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True)
- @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True)
- def test_zmqdriver_listen_for_notification(self, mock_connection,
- mock_listener):
- mock_listener.return_value = listener = mock.Mock()
- mock_connection.return_value = conn = mock.Mock()
- conn.create_consumer = mock.Mock()
- conn.consume_in_thread = mock.Mock()
- topic = 'testtopic.foo'
- data = [(messaging.Target(topic=topic), 0)]
- # NOTE(jamespage): Pooling not supported, just pass None for now.
- self.driver.listen_for_notifications(data, None)
- conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener)
+ incoming, socket = reply_poller.poll(1)
+ self.assertIsNone(incoming)
+ self.assertIsNone(socket)