summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Ukhlov <dukhlov@mirantis.com>2015-12-14 11:36:28 +0200
committerdukhlov <dukhlov@mirantis.com>2016-01-04 14:49:55 -0500
commit83a08d4b7ed36019f17a070a43ba6e4863cafe34 (patch)
treec94acfa294903c10baaf6c892499303ea0256600
parent5149461fd2523a0c2afc187bd023784438f7b81a (diff)
downloadoslo-messaging-83a08d4b7ed36019f17a070a43ba6e4863cafe34.tar.gz
Adds unit tests for pika_poll module
Change-Id: I69cc0e0302382ab45ba464bb5993300d44679106
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_engine.py3
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_message.py33
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py79
-rw-r--r--oslo_messaging/tests/drivers/pika/test_message.py28
-rw-r--r--oslo_messaging/tests/drivers/pika/test_poller.py536
5 files changed, 605 insertions, 74 deletions
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
index 6e877bb..4f38295 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py
@@ -200,6 +200,9 @@ class PikaEngine(object):
self._connection_host_param_list = []
self._connection_host_status_list = []
+ if not url.hosts:
+ raise ValueError("You should provide at least one RabbitMQ host")
+
for transport_host in url.hosts:
pika_params = common_pika_params.copy()
pika_params.update(
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
index edd5c73..eac2be9 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_message.py
@@ -72,18 +72,17 @@ class PikaIncomingMessage(object):
information from RabbitMQ message and provide access to it
"""
- def __init__(self, pika_engine, channel, method, properties, body, no_ack):
+ def __init__(self, pika_engine, channel, method, properties, body):
"""Parse RabbitMQ message
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param channel: Channel, RabbitMQ channel which was used for
- this message delivery
+ this message delivery, used for sending ack back.
+ If None - ack is not required
:param method: Method, RabbitMQ message method
:param properties: Properties, RabbitMQ message properties
:param body: Bytes, RabbitMQ message body
- :param no_ack: Boolean, defines should this message be acked by
- consumer or not
"""
headers = getattr(properties, "headers", {})
version = headers.get(_VERSION_HEADER, None)
@@ -93,7 +92,6 @@ class PikaIncomingMessage(object):
"{}".format(version, _VERSION))
self._pika_engine = pika_engine
- self._no_ack = no_ack
self._channel = channel
self._delivery_tag = method.delivery_tag
@@ -128,12 +126,15 @@ class PikaIncomingMessage(object):
self.message = message_dict
self.ctxt = context_dict
+ def need_ack(self):
+ return self._channel is not None
+
def acknowledge(self):
"""Ack the message. Should be called by message processing logic when
it considered as consumed (means that we don't need redelivery of this
message anymore)
"""
- if not self._no_ack:
+ if self.need_ack():
self._channel.basic_ack(delivery_tag=self._delivery_tag)
def requeue(self):
@@ -141,7 +142,7 @@ class PikaIncomingMessage(object):
when it can not process the message right now and should be redelivered
later if it is possible
"""
- if not self._no_ack:
+ if self.need_ack():
return self._channel.basic_nack(delivery_tag=self._delivery_tag,
requeue=True)
@@ -152,22 +153,21 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
method added to allow consumer to send RPC reply back to the RPC client
"""
- def __init__(self, pika_engine, channel, method, properties, body, no_ack):
+ def __init__(self, pika_engine, channel, method, properties, body):
"""Defines default values of msg_id and reply_q fields and just call
super.__init__ method
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param channel: Channel, RabbitMQ channel which was used for
- this message delivery
+ this message delivery, used for sending ack back.
+ If None - ack is not required
:param method: Method, RabbitMQ message method
:param properties: Properties, RabbitMQ message properties
:param body: Bytes, RabbitMQ message body
- :param no_ack: Boolean, defines should this message be acked by
- consumer or not
"""
super(RpcPikaIncomingMessage, self).__init__(
- pika_engine, channel, method, properties, body, no_ack
+ pika_engine, channel, method, properties, body
)
self.reply_q = properties.reply_to
self.msg_id = properties.correlation_id
@@ -231,7 +231,7 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
"""PikaIncomingMessage implementation for RPC reply messages. It expects
extra RPC reply related fields in message body (result and failure).
"""
- def __init__(self, pika_engine, channel, method, properties, body, no_ack):
+ def __init__(self, pika_engine, channel, method, properties, body):
"""Defines default values of result and failure fields, call
super.__init__ method and then construct Exception object if failure is
not None
@@ -239,15 +239,14 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param channel: Channel, RabbitMQ channel which was used for
- this message delivery
+ this message delivery, used for sending ack back.
+ If None - ack is not required
:param method: Method, RabbitMQ message method
:param properties: Properties, RabbitMQ message properties
:param body: Bytes, RabbitMQ message body
- :param no_ack: Boolean, defines should this message be acked by
- consumer or not
"""
super(RpcReplyPikaIncomingMessage, self).__init__(
- pika_engine, channel, method, properties, body, no_ack
+ pika_engine, channel, method, properties, body
)
self.msg_id = properties.correlation_id
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index 5aa948a..3533dad 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -31,8 +31,7 @@ class PikaPoller(object):
connectivity related problem detected
"""
- def __init__(self, pika_engine, prefetch_count,
- incoming_message_class=pika_drv_msg.PikaIncomingMessage):
+ def __init__(self, pika_engine, prefetch_count, incoming_message_class):
"""Initialize required fields
:param pika_engine: PikaEngine, shared object with configuration and
@@ -110,8 +109,7 @@ class PikaPoller(object):
"""
self._message_queue.append(
self._incoming_message_class(
- self._pika_engine, self._channel, method, properties, body,
- True
+ self._pika_engine, None, method, properties, body
)
)
@@ -121,8 +119,7 @@ class PikaPoller(object):
"""
self._message_queue.append(
self._incoming_message_class(
- self._pika_engine, self._channel, method, properties, body,
- False
+ self._pika_engine, self._channel, method, properties, body
)
)
@@ -146,6 +143,11 @@ class PikaPoller(object):
LOG.exception("Unexpected error during closing connection")
self._connection = None
+ for i in xrange(len(self._message_queue) - 1, -1, -1):
+ message = self._message_queue[i]
+ if message.need_ack():
+ del self._message_queue[i]
+
def poll(self, timeout=None, prefetch_size=1):
"""Main method of this class - consumes message from RabbitMQ
@@ -158,32 +160,29 @@ class PikaPoller(object):
"""
expiration_time = time.time() + timeout if timeout else None
- while len(self._message_queue) < prefetch_size:
+ while True:
with self._lock:
- if not self._started:
- return None
-
- try:
- if self._channel is None:
- self._reconnect()
- # we need some time_limit here, not too small to avoid a
- # lot of not needed iterations but not too large to release
- # lock time to time and give a chance to perform another
- # method waiting this lock
- self._connection.process_data_events(
- time_limit=0.25
- )
- except Exception as e:
- LOG.warn("Exception during consuming message. " + str(e))
- self._cleanup()
- if timeout is not None:
- timeout = expiration_time - time.time()
- if timeout <= 0:
- break
-
- result = self._message_queue[:prefetch_size]
- self._message_queue = self._message_queue[prefetch_size:]
- return result
+ if timeout is not None:
+ timeout = expiration_time - time.time()
+ if (len(self._message_queue) < prefetch_size and
+ self._started and ((timeout is None) or timeout > 0)):
+ try:
+ if self._channel is None:
+ self._reconnect()
+ # we need some time_limit here, not too small to avoid
+ # a lot of not needed iterations but not too large to
+ # release lock time to time and give a chance to
+ # perform another method waiting this lock
+ self._connection.process_data_events(
+ time_limit=0.25
+ )
+ except pika_pool.Connection.connectivity_errors:
+ self._cleanup()
+ raise
+ else:
+ result = self._message_queue[:prefetch_size]
+ del self._message_queue[:prefetch_size]
+ return result
def start(self):
"""Starts poller. Should be called before polling to allow message
@@ -201,7 +200,6 @@ class PikaPoller(object):
return
self._started = False
- self._cleanup()
def reconnect(self):
"""Safe version of _reconnect. Performs reconnection to the broker."""
@@ -249,9 +247,7 @@ class RpcServicePikaPoller(PikaPoller):
:return Dictionary, declared_queue_name -> no_ack_mode
"""
- queue_expiration = (
- self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
- )
+ queue_expiration = self._pika_engine.rpc_queue_expiration
queues_to_consume = {}
@@ -319,15 +315,11 @@ class RpcReplyPikaPoller(PikaPoller):
:return Dictionary, declared_queue_name -> no_ack_mode
"""
- queue_expiration = (
- self._pika_engine.conf.oslo_messaging_pika.rpc_queue_expiration
- )
-
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=self._exchange, queue=self._queue,
routing_key=self._queue, exchange_type='direct',
- queue_expiration=queue_expiration,
+ queue_expiration=self._pika_engine.rpc_queue_expiration,
durable=False
)
@@ -363,8 +355,8 @@ class NotificationPikaPoller(PikaPoller):
"""
def __init__(self, pika_engine, targets_and_priorities,
queue_name=None, prefetch_count=100):
- """Adds exchange and queue parameter for declaring exchange and queue
- used for RPC reply delivery
+ """Adds targets_and_priorities and queue_name parameter
+ for declaring exchanges and queues used for notification delivery
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
@@ -379,7 +371,8 @@ class NotificationPikaPoller(PikaPoller):
self._queue_name = queue_name
super(NotificationPikaPoller, self).__init__(
- pika_engine, prefetch_count=prefetch_count
+ pika_engine, prefetch_count=prefetch_count,
+ incoming_message_class=pika_drv_msg.PikaIncomingMessage
)
def _declare_queue_binding(self):
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
index 5008ce3..3c3f87e 100644
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -46,7 +46,7 @@ class PikaIncomingMessageTestCase(unittest.TestCase):
def test_message_body_parsing(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._body
)
self.assertEqual(message.ctxt.get("key_context", None),
@@ -57,7 +57,7 @@ class PikaIncomingMessageTestCase(unittest.TestCase):
def test_message_acknowledge(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, False
+ self._body
)
message.acknowledge()
@@ -68,8 +68,8 @@ class PikaIncomingMessageTestCase(unittest.TestCase):
def test_message_acknowledge_no_ack(self):
message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._pika_engine, None, self._method, self._properties,
+ self._body
)
message.acknowledge()
@@ -79,7 +79,7 @@ class PikaIncomingMessageTestCase(unittest.TestCase):
def test_message_requeue(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, False
+ self._body
)
message.requeue()
@@ -90,8 +90,8 @@ class PikaIncomingMessageTestCase(unittest.TestCase):
def test_message_requeue_no_ack(self):
message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._pika_engine, None, self._method, self._properties,
+ self._body
)
message.requeue()
@@ -126,7 +126,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._body
)
self.assertEqual(message.ctxt.get("key_context", None),
@@ -140,7 +140,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
def test_cast_message_body_parsing(self):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._body
)
self.assertEqual(message.ctxt.get("key_context", None),
@@ -156,7 +156,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
def test_reply_for_cast_message(self, send_reply_mock):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._body
)
self.assertEqual(message.ctxt.get("key_context", None),
@@ -182,7 +182,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._body
)
self.assertEqual(message.ctxt.get("key_context", None),
@@ -218,7 +218,7 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- self._body, True
+ self._body
)
self.assertEqual(message.ctxt.get("key_context", None),
@@ -274,7 +274,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- body, True
+ body
)
self.assertEqual(message.msg_id, 123456789)
@@ -294,7 +294,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
- body, True
+ body
)
self.assertEqual(message.msg_id, 123456789)
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
new file mode 100644
index 0000000..77a3b6b
--- /dev/null
+++ b/oslo_messaging/tests/drivers/pika/test_poller.py
@@ -0,0 +1,536 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+import unittest
+
+import mock
+
+from oslo_messaging._drivers.pika_driver import pika_poller
+
+
+class PikaPollerTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._poller_connection_mock = mock.Mock()
+ self._poller_channel_mock = mock.Mock()
+ self._poller_connection_mock.channel.return_value = (
+ self._poller_channel_mock
+ )
+ self._pika_engine.create_connection.return_value = (
+ self._poller_connection_mock
+ )
+ self._prefetch_count = 123
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
+ "_declare_queue_binding")
+ def test_poll(self, declare_queue_binding_mock):
+ incoming_message_class_mock = mock.Mock()
+ poller = pika_poller.PikaPoller(
+ self._pika_engine, self._prefetch_count,
+ incoming_message_class=incoming_message_class_mock
+ )
+ unused = object()
+ method = object()
+ properties = object()
+ body = object()
+
+ self._poller_connection_mock.process_data_events.side_effect = (
+ lambda time_limit: poller._on_message_with_ack_callback(
+ unused, method, properties, body
+ )
+ )
+
+ poller.start()
+ res = poller.poll()
+
+ self.assertEqual(len(res), 1)
+
+ self.assertEqual(res[0], incoming_message_class_mock.return_value)
+ incoming_message_class_mock.assert_called_once_with(
+ self._pika_engine, self._poller_channel_mock, method, properties,
+ body
+ )
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ self.assertTrue(declare_queue_binding_mock.called)
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
+ "_declare_queue_binding")
+ def test_poll_after_stop(self, declare_queue_binding_mock):
+ incoming_message_class_mock = mock.Mock()
+ poller = pika_poller.PikaPoller(
+ self._pika_engine, self._prefetch_count,
+ incoming_message_class=incoming_message_class_mock
+ )
+
+ n = 10
+ params = []
+
+ for i in range(n):
+ params.append((object(), object(), object(), object()))
+
+ index = [0]
+
+ def f(time_limit):
+ for i in range(10):
+ poller._on_message_no_ack_callback(
+ *params[index[0]]
+ )
+ index[0] += 1
+
+ self._poller_connection_mock.process_data_events.side_effect = f
+
+ poller.start()
+ res = poller.poll(prefetch_size=1)
+ self.assertEqual(len(res), 1)
+ self.assertEqual(res[0], incoming_message_class_mock.return_value)
+ self.assertEqual(
+ incoming_message_class_mock.call_args_list[0][0],
+ (self._pika_engine, None) + params[0][1:]
+ )
+
+ poller.stop()
+
+ res2 = poller.poll(prefetch_size=n)
+
+ self.assertEqual(len(res2), n-1)
+ self.assertEqual(incoming_message_class_mock.call_count, n)
+
+ self.assertEqual(
+ self._poller_connection_mock.process_data_events.call_count, 1)
+
+ for i in range(n-1):
+ self.assertEqual(res2[i], incoming_message_class_mock.return_value)
+ self.assertEqual(
+ incoming_message_class_mock.call_args_list[i+1][0],
+ (self._pika_engine, None) + params[i+1][1:]
+ )
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ self.assertTrue(declare_queue_binding_mock.called)
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
+ "_declare_queue_binding")
+ def test_poll_batch(self, declare_queue_binding_mock):
+ incoming_message_class_mock = mock.Mock()
+ poller = pika_poller.PikaPoller(
+ self._pika_engine, self._prefetch_count,
+ incoming_message_class=incoming_message_class_mock
+ )
+
+ n = 10
+ params = []
+
+ for i in range(n):
+ params.append((object(), object(), object(), object()))
+
+ index = [0]
+
+ def f(time_limit):
+ poller._on_message_with_ack_callback(
+ *params[index[0]]
+ )
+ index[0] += 1
+
+ self._poller_connection_mock.process_data_events.side_effect = f
+
+ poller.start()
+ res = poller.poll(prefetch_size=n)
+
+ self.assertEqual(len(res), n)
+ self.assertEqual(incoming_message_class_mock.call_count, n)
+
+ for i in range(n):
+ self.assertEqual(res[i], incoming_message_class_mock.return_value)
+ self.assertEqual(
+ incoming_message_class_mock.call_args_list[i][0],
+ (self._pika_engine, self._poller_channel_mock) + params[i][1:]
+ )
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ self.assertTrue(declare_queue_binding_mock.called)
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
+ "_declare_queue_binding")
+ def test_poll_batch_with_timeout(self, declare_queue_binding_mock):
+ incoming_message_class_mock = mock.Mock()
+ poller = pika_poller.PikaPoller(
+ self._pika_engine, self._prefetch_count,
+ incoming_message_class=incoming_message_class_mock
+ )
+
+ n = 10
+ timeout = 1
+ sleep_time = 0.2
+ params = []
+
+ success_count = 5
+
+ for i in range(n):
+ params.append((object(), object(), object(), object()))
+
+ index = [0]
+
+ def f(time_limit):
+ time.sleep(sleep_time)
+ poller._on_message_with_ack_callback(
+ *params[index[0]]
+ )
+ index[0] += 1
+
+ self._poller_connection_mock.process_data_events.side_effect = f
+
+ poller.start()
+ res = poller.poll(prefetch_size=n, timeout=timeout)
+
+ self.assertEqual(len(res), success_count)
+ self.assertEqual(incoming_message_class_mock.call_count, success_count)
+
+ for i in range(success_count):
+ self.assertEqual(res[i], incoming_message_class_mock.return_value)
+ self.assertEqual(
+ incoming_message_class_mock.call_args_list[i][0],
+ (self._pika_engine, self._poller_channel_mock) + params[i][1:]
+ )
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ self.assertTrue(declare_queue_binding_mock.called)
+
+
+class RpcServicePikaPollerTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._poller_connection_mock = mock.Mock()
+ self._poller_channel_mock = mock.Mock()
+ self._poller_connection_mock.channel.return_value = (
+ self._poller_channel_mock
+ )
+ self._pika_engine.create_connection.return_value = (
+ self._poller_connection_mock
+ )
+
+ self._pika_engine.get_rpc_queue_name.side_effect = (
+ lambda topic, server, no_ack: "_".join(
+ [topic, str(server), str(no_ack)]
+ )
+ )
+
+ self._pika_engine.get_rpc_exchange_name.side_effect = (
+ lambda exchange, topic, fanout, no_ack: "_".join(
+ [exchange, topic, str(fanout), str(no_ack)]
+ )
+ )
+
+ self._prefetch_count = 123
+ self._target = mock.Mock(exchange="exchange", topic="topic",
+ server="server")
+ self._pika_engine.rpc_queue_expiration = 12345
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
+ "RpcPikaIncomingMessage")
+ def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock):
+ poller = pika_poller.RpcServicePikaPoller(
+ self._pika_engine, self._target, self._prefetch_count,
+ )
+ self._poller_connection_mock.process_data_events.side_effect = (
+ lambda time_limit: poller._on_message_with_ack_callback(
+ None, None, None, None
+ )
+ )
+
+ poller.start()
+ res = poller.poll()
+
+ self.assertEqual(len(res), 1)
+
+ self.assertEqual(res[0], rpc_pika_incoming_message_mock.return_value)
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ declare_queue_binding_by_channel_mock = (
+ self._pika_engine.declare_queue_binding_by_channel
+ )
+
+ self.assertEqual(
+ declare_queue_binding_by_channel_mock.call_count, 6
+ )
+
+ declare_queue_binding_by_channel_mock.assert_has_calls((
+ mock.call(
+ channel=self._poller_channel_mock, durable=False,
+ exchange="exchange_topic_False_True",
+ exchange_type='direct',
+ queue="topic_None_True",
+ queue_expiration=12345,
+ routing_key="topic_None_True"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock, durable=False,
+ exchange="exchange_topic_False_True",
+ exchange_type='direct',
+ queue="topic_server_True",
+ queue_expiration=12345,
+ routing_key="topic_server_True"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock, durable=False,
+ exchange="exchange_topic_True_True",
+ exchange_type='fanout',
+ queue="topic_server_True",
+ queue_expiration=12345,
+ routing_key=''
+ ),
+ mock.call(
+ channel=self._poller_channel_mock, durable=False,
+ exchange="exchange_topic_False_False",
+ exchange_type='direct',
+ queue="topic_None_False",
+ queue_expiration=12345,
+ routing_key="topic_None_False"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock, durable=False,
+ exchange="exchange_topic_False_False",
+ exchange_type='direct',
+ queue="topic_server_False",
+ queue_expiration=12345,
+ routing_key="topic_server_False"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock, durable=False,
+ exchange="exchange_topic_True_False",
+ exchange_type='fanout',
+ queue="topic_server_False",
+ queue_expiration=12345,
+ routing_key=''
+ ),
+ ))
+
+
+class RpcReplyServicePikaPollerTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._poller_connection_mock = mock.Mock()
+ self._poller_channel_mock = mock.Mock()
+ self._poller_connection_mock.channel.return_value = (
+ self._poller_channel_mock
+ )
+ self._pika_engine.create_connection.return_value = (
+ self._poller_connection_mock
+ )
+
+ self._prefetch_count = 123
+ self._exchange = "rpc_reply_exchange"
+ self._queue = "rpc_reply_queue"
+
+ self._pika_engine.rpc_reply_retry_delay = 12132543456
+
+ self._pika_engine.rpc_queue_expiration = 12345
+ self._pika_engine.rpc_reply_retry_attempts = 3
+
+ def test_start(self):
+ poller = pika_poller.RpcReplyPikaPoller(
+ self._pika_engine, self._exchange, self._queue,
+ self._prefetch_count,
+ )
+
+ poller.start()
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ def test_declare_rpc_reply_queue_binding(self):
+ poller = pika_poller.RpcReplyPikaPoller(
+ self._pika_engine, self._exchange, self._queue,
+ self._prefetch_count,
+ )
+
+ poller.start()
+
+ declare_queue_binding_by_channel_mock = (
+ self._pika_engine.declare_queue_binding_by_channel
+ )
+
+ self.assertEqual(
+ declare_queue_binding_by_channel_mock.call_count, 1
+ )
+
+ declare_queue_binding_by_channel_mock.assert_called_once_with(
+ channel=self._poller_channel_mock, durable=False,
+ exchange='rpc_reply_exchange', exchange_type='direct',
+ queue='rpc_reply_queue', queue_expiration=12345,
+ routing_key='rpc_reply_queue'
+ )
+
+
+class NotificationPikaPollerTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._poller_connection_mock = mock.Mock()
+ self._poller_channel_mock = mock.Mock()
+ self._poller_connection_mock.channel.return_value = (
+ self._poller_channel_mock
+ )
+ self._pika_engine.create_connection.return_value = (
+ self._poller_connection_mock
+ )
+
+ self._prefetch_count = 123
+ self._target_and_priorities = (
+ (
+ mock.Mock(exchange="exchange1", topic="topic1",
+ server="server1"), 1
+ ),
+ (
+ mock.Mock(exchange="exchange1", topic="topic1"), 2
+ ),
+ (
+ mock.Mock(exchange="exchange2", topic="topic2",), 1
+ ),
+ )
+ self._pika_engine.notification_persistence = object()
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
+ "PikaIncomingMessage")
+ def test_declare_notification_queue_bindings_default_queue(
+ self, pika_incoming_message_mock):
+ poller = pika_poller.NotificationPikaPoller(
+ self._pika_engine, self._target_and_priorities, None,
+ self._prefetch_count,
+ )
+ self._poller_connection_mock.process_data_events.side_effect = (
+ lambda time_limit: poller._on_message_with_ack_callback(
+ None, None, None, None
+ )
+ )
+
+ poller.start()
+ res = poller.poll()
+
+ self.assertEqual(len(res), 1)
+
+ self.assertEqual(res[0], pika_incoming_message_mock.return_value)
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ declare_queue_binding_by_channel_mock = (
+ self._pika_engine.declare_queue_binding_by_channel
+ )
+
+ self.assertEqual(
+ declare_queue_binding_by_channel_mock.call_count, 3
+ )
+
+ declare_queue_binding_by_channel_mock.assert_has_calls((
+ mock.call(
+ channel=self._poller_channel_mock,
+ durable=self._pika_engine.notification_persistence,
+ exchange="exchange1",
+ exchange_type='direct',
+ queue="topic1.1",
+ queue_expiration=None,
+ routing_key="topic1.1"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock,
+ durable=self._pika_engine.notification_persistence,
+ exchange="exchange1",
+ exchange_type='direct',
+ queue="topic1.2",
+ queue_expiration=None,
+ routing_key="topic1.2"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock,
+ durable=self._pika_engine.notification_persistence,
+ exchange="exchange2",
+ exchange_type='direct',
+ queue="topic2.1",
+ queue_expiration=None,
+ routing_key="topic2.1"
+ )
+ ))
+
+ @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
+ "PikaIncomingMessage")
+ def test_declare_notification_queue_bindings_custom_queue(
+ self, pika_incoming_message_mock):
+ poller = pika_poller.NotificationPikaPoller(
+ self._pika_engine, self._target_and_priorities,
+ "custom_queue_name", self._prefetch_count
+ )
+ self._poller_connection_mock.process_data_events.side_effect = (
+ lambda time_limit: poller._on_message_with_ack_callback(
+ None, None, None, None
+ )
+ )
+
+ poller.start()
+ res = poller.poll()
+
+ self.assertEqual(len(res), 1)
+
+ self.assertEqual(res[0], pika_incoming_message_mock.return_value)
+
+ self.assertTrue(self._pika_engine.create_connection.called)
+ self.assertTrue(self._poller_connection_mock.channel.called)
+
+ declare_queue_binding_by_channel_mock = (
+ self._pika_engine.declare_queue_binding_by_channel
+ )
+
+ self.assertEqual(
+ declare_queue_binding_by_channel_mock.call_count, 3
+ )
+
+ declare_queue_binding_by_channel_mock.assert_has_calls((
+ mock.call(
+ channel=self._poller_channel_mock,
+ durable=self._pika_engine.notification_persistence,
+ exchange="exchange1",
+ exchange_type='direct',
+ queue="custom_queue_name",
+ queue_expiration=None,
+ routing_key="topic1.1"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock,
+ durable=self._pika_engine.notification_persistence,
+ exchange="exchange1",
+ exchange_type='direct',
+ queue="custom_queue_name",
+ queue_expiration=None,
+ routing_key="topic1.2"
+ ),
+ mock.call(
+ channel=self._poller_channel_mock,
+ durable=self._pika_engine.notification_persistence,
+ exchange="exchange2",
+ exchange_type='direct',
+ queue="custom_queue_name",
+ queue_expiration=None,
+ routing_key="topic2.1"
+ )
+ ))