diff options
Diffstat (limited to 'tests/drivers/test_impl_zmq.py')
-rw-r--r-- | tests/drivers/test_impl_zmq.py | 458 |
1 files changed, 131 insertions, 327 deletions
diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py index ddc6753..a6eef2f 100644 --- a/tests/drivers/test_impl_zmq.py +++ b/tests/drivers/test_impl_zmq.py @@ -1,5 +1,4 @@ -# Copyright 2014 Canonical, Ltd. -# All Rights Reserved. +# Copyright 2015 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,27 +14,51 @@ import logging import socket +import threading import fixtures import testtools -from six.moves import mock +import oslo_messaging +from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.zmq_driver.broker.zmq_broker import ZmqBroker +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _ +from oslo_messaging.tests import utils as test_utils -try: - import zmq -except ImportError: - zmq = None +LOG = logging.getLogger(__name__) -from oslo import messaging -from oslo.utils import importutils -from oslo_messaging.tests import utils as test_utils +zmq = zmq_async.import_zmq() -# eventlet is not yet py3 compatible, so skip if not installed -eventlet = importutils.try_import('eventlet') -impl_zmq = importutils.try_import('oslo_messaging._drivers.impl_zmq') +class TestRPCServerListener(object): -LOG = logging.getLogger(__name__) + def __init__(self, driver): + self.driver = driver + self.target = None + self.listener = None + self.executor = zmq_async.get_executor(self._run) + self._stop = threading.Event() + self._received = threading.Event() + self.message = None + + def listen(self, target): + self.target = target + self.listener = self.driver.listen(self.target) + self.executor.execute() + + def _run(self): + try: + message = self.listener.poll() + if message is not None: + self._received.set() + self.message = message + message.reply(reply=True) + except Exception: + LOG.exception(_("Unexpected exception occurred.")) + + def stop(self): + self.executor.stop() def get_unused_port(): @@ -56,7 +79,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' # Get driver - transport = messaging.get_transport(self.conf) + transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver # Set config values @@ -70,10 +93,11 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): # Start RPC LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() + self.broker = ZmqBroker(self.conf) + self.broker.start() + + self.listener = TestRPCServerListener(self.driver) - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') self.addCleanup(stopRpc(self.__dict__)) @@ -85,7 +109,7 @@ class TestConfZmqDriverLoad(test_utils.BaseTestCase): self.messaging_conf.transport_driver = 'zmq' def test_driver_load(self): - transport = messaging.get_transport(self.conf) + transport = oslo_messaging.get_transport(self.conf) self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) @@ -94,347 +118,127 @@ class stopRpc(object): self.attrs = attrs def __call__(self): - if self.attrs['reactor']: - self.attrs['reactor'].close() + if self.attrs['broker']: + self.attrs['broker'].close() if self.attrs['driver']: self.attrs['driver'].cleanup() + if self.attrs['listener']: + self.attrs['listener'].stop() class TestZmqBasics(ZmqBaseTestCase): - def test_start_stop_listener(self): - target = messaging.Target(topic='testtopic') - listener = self.driver.listen(target) - result = listener.poll(0.01) - self.assertEqual(result, None) - def test_send_receive_raises(self): """Call() without method.""" - target = messaging.Target(topic='testtopic') - self.driver.listen(target) + target = oslo_messaging.Target(topic='testtopic') + self.listener.listen(target) self.assertRaises( KeyError, self.driver.send, target, {}, {'tx_id': 1}, wait_for_reply=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqIncomingMessage') - def test_send_receive_topic(self, mock_msg): - """Call() with method.""" - mock_msg.return_value = msg = mock.MagicMock() - msg.received = received = mock.MagicMock() - received.failure = False - received.reply = True - msg.condition = condition = mock.MagicMock() - condition.wait.return_value = True - - target = messaging.Target(topic='testtopic') - self.driver.listen(target) + def test_send_receive_topic(self): + """Call() with topic.""" + + target = oslo_messaging.Target(topic='testtopic') + self.listener.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, wait_for_reply=True) - self.assertEqual(result, True) + self.assertIsNotNone(result) - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - def test_send_receive_fanout(self, mock_call): - target = messaging.Target(topic='testtopic', fanout=True) - self.driver.listen(target) - - mock_call.__name__ = '_call' - mock_call.return_value = [True] + def test_send_noreply(self): + """Cast() with topic.""" + target = oslo_messaging.Target(topic='testtopic', server="127.0.0.1") + self.listener.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True) + wait_for_reply=False) - self.assertEqual(result, True) - mock_call.assert_called_once_with( - self.driver, - 'tcp://127.0.0.1:%s' % self.conf['rpc_zmq_port'], - {}, 'fanout~testtopic.127.0.0.1', - {'tx_id': 1, 'method': 'hello-world'}, - None, False, [], True) - - @mock.patch('oslo_messaging._drivers.impl_zmq._call', autospec=True) - def test_send_receive_direct(self, mock_call): - # Also verifies fix for bug http://pad.lv/1301723 - target = messaging.Target(topic='testtopic', server='localhost') - self.driver.listen(target) + self.listener._received.wait() + + self.assertIsNone(result) + self.assertEqual(True, self.listener._received.isSet()) + method = self.listener.message.message[u'method'] + self.assertEqual(u'hello-world', method) - mock_call.__name__ = '_call' - mock_call.return_value = [True] + @testtools.skip("Not implemented feature") + def test_send_fanout(self): + target = oslo_messaging.Target(topic='testtopic', fanout=True) + self.driver.listen(target) result = self.driver.send( target, {}, {'method': 'hello-world', 'tx_id': 1}, - wait_for_reply=True) + wait_for_reply=False) - self.assertEqual(result, True) - mock_call.assert_called_once_with( - self.driver, - 'tcp://localhost:%s' % self.conf['rpc_zmq_port'], - {}, 'testtopic.localhost', - {'tx_id': 1, 'method': 'hello-world'}, - None, False, [], True) + self.assertIsNone(result) + self.assertEqual(True, self.listener._received.isSet()) + msg_pattern = "{'method': 'hello-world', 'tx_id': 1}" + self.assertEqual(msg_pattern, self.listener.message) + def test_send_receive_direct(self): + """Call() without topic.""" -class TestZmqSocket(test_utils.BaseTestCase): + target = oslo_messaging.Target(server='127.0.0.1') + self.listener.listen(target) + message = {'method': 'hello-world', 'tx_id': 1} + context = {} + result = self.driver.send(target, context, message, + wait_for_reply=True) + self.assertTrue(result) - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqSocket, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_pull(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' - - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PULL, bind=False, - subscribe=None) - self.assertTrue(sock.can_recv) - self.assertFalse(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_sub(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' - - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.SUB, bind=False, - subscribe=None) - self.assertTrue(sock.can_recv) - self.assertFalse(sock.can_send) - self.assertTrue(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_push(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' - - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUSH, bind=False, - subscribe=None) - self.assertFalse(sock.can_recv) - self.assertTrue(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqSocket.subscribe') - @mock.patch('oslo_messaging._drivers.impl_zmq.zmq.Context') - def test_zmqsocket_init_type_pub(self, mock_context, mock_subscribe): - mock_ctxt = mock.Mock() - mock_context.return_value = mock_ctxt - mock_sock = mock.Mock() - mock_ctxt.socket = mock.Mock(return_value=mock_sock) - mock_sock.connect = mock.Mock() - mock_sock.bind = mock.Mock() - addr = '127.0.0.1' - - sock = impl_zmq.ZmqSocket(addr, impl_zmq.zmq.PUB, bind=False, - subscribe=None) - self.assertFalse(sock.can_recv) - self.assertTrue(sock.can_send) - self.assertFalse(sock.can_sub) - self.assertTrue(mock_sock.connect.called) - self.assertFalse(mock_sock.bind.called) - - -class TestZmqIncomingMessage(test_utils.BaseTestCase): +class TestPoller(test_utils.BaseTestCase): - @testtools.skipIf(zmq is None, "zmq not available") def setUp(self): - super(TestZmqIncomingMessage, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver + super(TestPoller, self).setUp() + self.poller = zmq_async.get_poller() + self.ctx = zmq.Context() + self.ADDR_REQ = "ipc://request1" + + def test_poll_blocking(self): + + rep = self.ctx.socket(zmq.REP) + rep.bind(self.ADDR_REQ) + + reply_poller = zmq_async.get_reply_poller() + reply_poller.register(rep) + + def listener(): + incoming, socket = reply_poller.poll() + self.assertEqual(b'Hello', incoming[0]) + socket.send_string('Reply') + reply_poller.resume_polling(socket) + + executor = zmq_async.get_executor(listener) + executor.execute() + + req1 = self.ctx.socket(zmq.REQ) + req1.connect(self.ADDR_REQ) + + req2 = self.ctx.socket(zmq.REQ) + req2.connect(self.ADDR_REQ) + + req1.send_string('Hello') + req2.send_string('Hello') + + reply = req1.recv_string() + self.assertEqual('Reply', reply) + + reply = req2.recv_string() + self.assertEqual('Reply', reply) + + def test_poll_timeout(self): + rep = self.ctx.socket(zmq.REP) + rep.bind(self.ADDR_REQ) + + reply_poller = zmq_async.get_reply_poller() + reply_poller.register(rep) - def test_zmqincomingmessage(self): - msg = impl_zmq.ZmqIncomingMessage(mock.Mock(), None, 'msg.foo') - msg.reply("abc") - self.assertIsInstance( - msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) - self.assertIsInstance( - msg.received, impl_zmq.ZmqIncomingMessage.ReceivedReply) - self.assertEqual(msg.received.reply, "abc") - msg.requeue() - - -class TestZmqConnection(ZmqBaseTestCase): - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_create_consumer(self, mock_reactor): - - mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - topic = 'topic.foo' - context = mock.Mock() - inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % - (self.internal_ipc_dir)) - # No Fanout - conn.create_consumer(topic, context) - conn.reactor.register.assert_called_with(context, inaddr, - impl_zmq.zmq.PULL, - subscribe=None, in_bind=False) - - # Reset for next bunch of checks - conn.reactor.register.reset_mock() - - # Fanout - inaddr = ('ipc://%s/zmq_topic_fanout~topic' % - (self.internal_ipc_dir)) - conn.create_consumer(topic, context, fanout='subscriber.foo') - conn.reactor.register.assert_called_with(context, inaddr, - impl_zmq.zmq.SUB, - subscribe='subscriber.foo', - in_bind=False) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_create_consumer_topic_exists(self, mock_reactor): - mock_reactor.register = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - topic = 'topic.foo' - context = mock.Mock() - inaddr = ('ipc://%s/zmq_topic_topic.127.0.0.1' % - (self.internal_ipc_dir)) - - conn.create_consumer(topic, context) - conn.reactor.register.assert_called_with( - context, inaddr, impl_zmq.zmq.PULL, subscribe=None, in_bind=False) - conn.reactor.register.reset_mock() - # Call again with same topic - conn.create_consumer(topic, context) - self.assertFalse(conn.reactor.register.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker', - autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_close(self, mock_reactor, mock_getmatchmaker): - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.close = mock.Mock() - mock_getmatchmaker.return_value.stop_heartbeat = mock.Mock() - conn.close() - self.assertTrue(mock_getmatchmaker.return_value.stop_heartbeat.called) - self.assertTrue(conn.reactor.close.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_wait(self, mock_reactor): - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.wait = mock.Mock() - conn.wait() - self.assertTrue(conn.reactor.wait.called) - - @mock.patch('oslo_messaging._drivers.impl_zmq._get_matchmaker', - autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) - def test_zmqconnection_consume_in_thread(self, mock_reactor, - mock_getmatchmaker): - mock_getmatchmaker.return_value.start_heartbeat = mock.Mock() - conn = impl_zmq.Connection(self.driver.conf, self.driver) - conn.reactor.consume_in_thread = mock.Mock() - conn.consume_in_thread() - self.assertTrue(mock_getmatchmaker.return_value.start_heartbeat.called) - self.assertTrue(conn.reactor.consume_in_thread.called) - - -class TestZmqListener(ZmqBaseTestCase): - - def test_zmqlistener_no_msg(self): - listener = impl_zmq.ZmqListener(self.driver) - # Timeout = 0 should return straight away since the queue is empty - listener.poll(timeout=0) - - def test_zmqlistener_w_msg(self): - listener = impl_zmq.ZmqListener(self.driver) - kwargs = {'a': 1, 'b': 2} - m = mock.Mock() - ctxt = mock.Mock(autospec=impl_zmq.RpcContext) - message = {'namespace': 'name.space', 'method': m.fake_method, - 'args': kwargs} - eventlet.spawn_n(listener.dispatch, ctxt, message) - resp = listener.poll(timeout=10) - msg = {'method': m.fake_method, 'namespace': 'name.space', - 'args': kwargs} - self.assertEqual(resp.message, msg) - - -class TestZmqDriver(ZmqBaseTestCase): - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) - def test_zmqdriver_send(self, mock_multi_send, mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic' - msg = 'jeronimo' - self.driver.send(messaging.Target(topic=topic), context, msg, - False, 0, False) - mock_multi_send.assert_called_with(self.driver, mock_cast, context, - topic, msg, - allowed_remote_exmods=[], - envelope=False, pooled=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) - def test_zmqdriver_send_notification(self, mock_multi_send, mock_cast): - context = mock.Mock(autospec=impl_zmq.RpcContext) - topic = 'testtopic.foo' - topic_reformat = 'testtopic-foo' - msg = 'jeronimo' - self.driver.send_notification(messaging.Target(topic=topic), context, - msg, False, False) - mock_multi_send.assert_called_with(self.driver, mock_cast, context, - topic_reformat, msg, - allowed_remote_exmods=[], - envelope=False, pooled=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) - def test_zmqdriver_listen(self, mock_connection, mock_listener): - mock_listener.return_value = listener = mock.Mock() - mock_connection.return_value = conn = mock.Mock() - conn.create_consumer = mock.Mock() - conn.consume_in_thread = mock.Mock() - topic = 'testtopic.foo' - self.driver.listen(messaging.Target(topic=topic)) - conn.create_consumer.assert_called_with(topic, listener, fanout=True) - - @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqListener', autospec=True) - @mock.patch('oslo_messaging._drivers.impl_zmq.Connection', autospec=True) - def test_zmqdriver_listen_for_notification(self, mock_connection, - mock_listener): - mock_listener.return_value = listener = mock.Mock() - mock_connection.return_value = conn = mock.Mock() - conn.create_consumer = mock.Mock() - conn.consume_in_thread = mock.Mock() - topic = 'testtopic.foo' - data = [(messaging.Target(topic=topic), 0)] - # NOTE(jamespage): Pooling not supported, just pass None for now. - self.driver.listen_for_notifications(data, None) - conn.create_consumer.assert_called_with("%s-%s" % (topic, 0), listener) + incoming, socket = reply_poller.poll(1) + self.assertIsNone(incoming) + self.assertIsNone(socket) |