summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDmitriy Ukhlov <dukhlov@mirantis.com>2015-12-14 11:36:28 +0200
committerdukhlov <dukhlov@mirantis.com>2015-12-19 18:42:10 -0500
commit5149461fd2523a0c2afc187bd023784438f7b81a (patch)
tree50b5fa1f2b13469e1583f62b171a3666ad3f8a35
parent3976a2ff81408b7f86e898eb0a87634a3f9ed2c0 (diff)
downloadoslo-messaging-5149461fd2523a0c2afc187bd023784438f7b81a.tar.gz
Adds tests for pika_message.py
Also small mistakes were fixed, msg_id, unique_id and reply_q fields were moved to corresponding AMQP properties Change-Id: I5147c35c1a2ce0205e08ca81db164a3cc879fb0a
-rw-r--r--oslo_messaging/_drivers/impl_pika.py4
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_engine.py12
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_message.py261
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py3
-rw-r--r--oslo_messaging/tests/drivers/pika/__init__.py0
-rw-r--r--oslo_messaging/tests/drivers/pika/test_message.py622
6 files changed, 781 insertions, 121 deletions
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index eaa4a76..3d633a5 100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
@@ -198,8 +198,8 @@ class PikaDriver(object):
"Timeout for current operation was expired."
)
try:
- with self._pika_engine.connection_pool.acquire(
- timeout=timeout) as conn:
+ with (self._pika_engine.connection_without_confirmation_pool
+ .acquire)(timeout=timeout) as conn:
self._pika_engine.declare_queue_binding_by_channel(
conn.channel,
exchange=(
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
index 06dfcdb..6e877bb 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import random
import socket
import sys
import threading
@@ -44,7 +45,7 @@ def _is_eventlet_monkey_patched(module):
return eventlet.patcher.is_monkey_patched(module)
-def _create__select_poller_connection_impl(
+def _create_select_poller_connection_impl(
parameters, on_open_callback, on_open_error_callback,
on_close_callback, stop_ioloop_on_close):
"""Used for disabling autochoise of poller ('select', 'poll', 'epool', etc)
@@ -198,7 +199,6 @@ class PikaEngine(object):
self._connection_host_param_list = []
self._connection_host_status_list = []
- self._next_connection_host_num = 0
for transport_host in url.hosts:
pika_params = common_pika_params.copy()
@@ -215,9 +215,13 @@ class PikaEngine(object):
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0
})
+ self._next_connection_host_num = random.randint(
+ 0, len(self._connection_host_param_list) - 1
+ )
+
# initializing 2 connection pools: 1st for connections without
# confirmations, 2nd - with confirmations
- self.connection_pool = pika_pool.QueuedPool(
+ self.connection_without_confirmation_pool = pika_pool.QueuedPool(
create=self.create_connection,
max_size=self.conf.oslo_messaging_pika.pool_max_size,
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
@@ -336,7 +340,7 @@ class PikaEngine(object):
),
**base_host_params
),
- _impl_class=(_create__select_poller_connection_impl
+ _impl_class=(_create_select_poller_connection_impl
if self._force_select_poller_use else None)
)
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
index 9bf9feb..edd5c73 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_message.py
@@ -95,40 +95,36 @@ class PikaIncomingMessage(object):
self._pika_engine = pika_engine
self._no_ack = no_ack
self._channel = channel
- self.delivery_tag = method.delivery_tag
+ self._delivery_tag = method.delivery_tag
- self.version = version
+ self._version = version
- self.content_type = getattr(properties, "content_type",
- "application/json")
- self.content_encoding = getattr(properties, "content_encoding",
- "utf-8")
+ self._content_type = properties.content_type
+ self._content_encoding = properties.content_encoding
+ self.unique_id = properties.message_id
self.expiration_time = (
None if properties.expiration is None else
time.time() + float(properties.expiration) / 1000
)
- if self.content_type != "application/json":
+ if self._content_type != "application/json":
raise NotImplementedError(
"Content-type['{}'] is not valid, "
"'application/json' only is supported.".format(
- self.content_type
+ self._content_type
)
)
- message_dict = jsonutils.loads(body, encoding=self.content_encoding)
+ message_dict = jsonutils.loads(body, encoding=self._content_encoding)
context_dict = {}
for key in list(message_dict.keys()):
key = six.text_type(key)
- if key.startswith('_context_'):
+ if key.startswith('_$_'):
value = message_dict.pop(key)
- context_dict[key[9:]] = value
- elif key.startswith('_'):
- value = message_dict.pop(key)
- setattr(self, key[1:], value)
+ context_dict[key[3:]] = value
self.message = message_dict
self.ctxt = context_dict
@@ -138,7 +134,7 @@ class PikaIncomingMessage(object):
message anymore)
"""
if not self._no_ack:
- self._channel.basic_ack(delivery_tag=self.delivery_tag)
+ self._channel.basic_ack(delivery_tag=self._delivery_tag)
def requeue(self):
"""Rollback the message. Should be called by message processing logic
@@ -146,7 +142,7 @@ class PikaIncomingMessage(object):
later if it is possible
"""
if not self._no_ack:
- return self._channel.basic_nack(delivery_tag=self.delivery_tag,
+ return self._channel.basic_nack(delivery_tag=self._delivery_tag,
requeue=True)
@@ -170,58 +166,30 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
:param no_ack: Boolean, defines should this message be acked by
consumer or not
"""
- self.msg_id = None
- self.reply_q = None
-
super(RpcPikaIncomingMessage, self).__init__(
pika_engine, channel, method, properties, body, no_ack
)
+ self.reply_q = properties.reply_to
+ self.msg_id = properties.correlation_id
def reply(self, reply=None, failure=None, log_failure=True):
"""Send back reply to the RPC client
- :param reply - Dictionary, reply. In case of exception should be None
- :param failure - Exception, exception, raised during processing RPC
- request. Should be None if RPC request was successfully processed
- :param log_failure, Boolean, not used in this implementation.
+ :param reply: Dictionary, reply. In case of exception should be None
+ :param failure: Tuple, should be a sys.exc_info() tuple.
+ Should be None if RPC request was successfully processed.
+ :param log_failure: Boolean, not used in this implementation.
It present here to be compatible with driver API
- """
- if not (self.msg_id and self.reply_q):
- return
-
- msg = {
- '_msg_id': self.msg_id,
- }
- if failure is not None:
- if isinstance(failure, RemoteExceptionMixin):
- failure_data = {
- 'class': failure.clazz,
- 'module': failure.module,
- 'message': failure.message,
- 'tb': failure.trace
- }
- else:
- tb = traceback.format_exception(*failure)
- failure = failure[1]
-
- cls_name = six.text_type(failure.__class__.__name__)
- mod_name = six.text_type(failure.__class__.__module__)
-
- failure_data = {
- 'class': cls_name,
- 'module': mod_name,
- 'message': six.text_type(failure),
- 'tb': tb
- }
-
- msg['_failure'] = failure_data
+ :return RpcReplyPikaIncomingMessage, message with reply
+ """
- if reply is not None:
- msg['_result'] = reply
+ if self.reply_q is None:
+ return
- reply_outgoing_message = PikaOutgoingMessage(
- self._pika_engine, msg, self.ctxt, content_type=self.content_type,
- content_encoding=self.content_encoding
+ reply_outgoing_message = RpcReplyPikaOutgoingMessage(
+ self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
+ content_type=self._content_type,
+ content_encoding=self._content_encoding
)
def on_exception(ex):
@@ -242,11 +210,7 @@ class RpcPikaIncomingMessage(PikaIncomingMessage):
try:
reply_outgoing_message.send(
- exchange=self._pika_engine.rpc_reply_exchange,
- routing_key=self.reply_q,
- confirm=True,
- mandatory=False,
- persistent=False,
+ reply_q=self.reply_q,
expiration_time=self.expiration_time,
retrier=retrier
)
@@ -282,18 +246,20 @@ class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
:param no_ack: Boolean, defines should this message be acked by
consumer or not
"""
- self.result = None
- self.failure = None
-
super(RpcReplyPikaIncomingMessage, self).__init__(
pika_engine, channel, method, properties, body, no_ack
)
+ self.msg_id = properties.correlation_id
+
+ self.result = self.message.get("s", None)
+ self.failure = self.message.get("e", None)
+
if self.failure is not None:
- trace = self.failure.get('tb', [])
- message = self.failure.get('message', "")
- class_name = self.failure.get('class')
- module_name = self.failure.get('module')
+ trace = self.failure.get('t', [])
+ message = self.failure.get('s', "")
+ class_name = self.failure.get('c')
+ module_name = self.failure.get('m')
res_exc = None
@@ -343,14 +309,14 @@ class PikaOutgoingMessage(object):
self._pika_engine = pika_engine
- self.content_type = content_type
- self.content_encoding = content_encoding
+ self._content_type = content_type
+ self._content_encoding = content_encoding
- if self.content_type != "application/json":
+ if self._content_type != "application/json":
raise NotImplementedError(
"Content-type['{}'] is not valid, "
"'application/json' only is supported.".format(
- self.content_type
+ self._content_type
)
)
@@ -362,23 +328,21 @@ class PikaOutgoingMessage(object):
def _prepare_message_to_send(self):
"""Combine user's message fields an system fields (_unique_id,
context's data etc)
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param message: Dictionary, user's message fields
- :param context: Dictionary, request context's fields
- :param content_type: String, content-type header, defines serialization
- mechanism
- :param content_encoding: String, defines encoding for text data
"""
msg = self.message.copy()
- msg['_unique_id'] = self.unique_id
+ if self.context:
+ for key, value in six.iteritems(self.context):
+ key = six.text_type(key)
+ msg['_$_' + key] = value
- for key, value in self.context.iteritems():
- key = six.text_type(key)
- msg['_context_' + key] = value
- return msg
+ props = pika_spec.BasicProperties(
+ content_encoding=self._content_encoding,
+ content_type=self._content_type,
+ headers={_VERSION_HEADER: _VERSION},
+ message_id=self.unique_id,
+ )
+ return msg, props
@staticmethod
def _publish(pool, exchange, routing_key, body, properties, mandatory,
@@ -456,14 +420,15 @@ class PikaOutgoingMessage(object):
"Socket timeout exceeded."
)
- def _do_send(self, exchange, routing_key, msg_dict, confirm=True,
- mandatory=True, persistent=False, expiration_time=None,
- retrier=None):
+ def _do_send(self, exchange, routing_key, msg_dict, msg_props,
+ confirm=True, mandatory=True, persistent=False,
+ expiration_time=None, retrier=None):
"""Send prepared message with configured retrying
:param exchange: String, RabbitMQ exchange name for message sending
:param routing_key: String, RabbitMQ routing key for message routing
:param msg_dict: Dictionary, message payload
+ :param msg_props: Properties, message properties
:param confirm: Boolean, enable publisher confirmation if True
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
exception if it is not possible to deliver message to any queue)
@@ -474,29 +439,26 @@ class PikaOutgoingMessage(object):
:param retrier: retrying.Retrier, configured retrier object for sending
message, if None no retrying is performed
"""
- properties = pika_spec.BasicProperties(
- content_encoding=self.content_encoding,
- content_type=self.content_type,
- headers={_VERSION_HEADER: _VERSION},
- delivery_mode=2 if persistent else 1
- )
+ msg_props.delivery_mode = 2 if persistent else 1
pool = (self._pika_engine.connection_with_confirmation_pool
- if confirm else self._pika_engine.connection_pool)
+ if confirm else
+ self._pika_engine.connection_without_confirmation_pool)
- body = jsonutils.dumps(msg_dict, encoding=self.content_encoding)
+ body = jsonutils.dump_as_bytes(msg_dict,
+ encoding=self._content_encoding)
LOG.debug(
"Sending message:[body:{}; properties: {}] to target: "
"[exchange:{}; routing_key:{}]".format(
- body, properties, exchange, routing_key
+ body, msg_props, exchange, routing_key
)
)
publish = (self._publish if retrier is None else
retrier(self._publish))
- return publish(pool, exchange, routing_key, body, properties,
+ return publish(pool, exchange, routing_key, body, msg_props,
mandatory, expiration_time)
def send(self, exchange, routing_key='', confirm=True, mandatory=True,
@@ -515,10 +477,11 @@ class PikaOutgoingMessage(object):
:param retrier: retrying.Retrier, configured retrier object for sending
message, if None no retrying is performed
"""
- msg_dict = self._prepare_message_to_send()
+ msg_dict, msg_props = self._prepare_message_to_send()
- return self._do_send(exchange, routing_key, msg_dict, confirm,
- mandatory, persistent, expiration_time, retrier)
+ return self._do_send(exchange, routing_key, msg_dict, msg_props,
+ confirm, mandatory, persistent, expiration_time,
+ retrier)
class RpcPikaOutgoingMessage(PikaOutgoingMessage):
@@ -554,23 +517,25 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
target.topic, target.server, retrier is None
)
- msg_dict = self._prepare_message_to_send()
+ msg_dict, msg_props = self._prepare_message_to_send()
if reply_listener:
- msg_id = uuid.uuid4().hex
- msg_dict["_msg_id"] = msg_id
- LOG.debug('MSG_ID is %s', msg_id)
+ self.msg_id = uuid.uuid4().hex
+ msg_props.correlation_id = self.msg_id
+ LOG.debug('MSG_ID is %s', self.msg_id)
- msg_dict["_reply_q"] = reply_listener.get_reply_qname(
+ self.reply_q = reply_listener.get_reply_qname(
expiration_time - time.time()
)
+ msg_props.reply_to = self.reply_q
- future = reply_listener.register_reply_waiter(msg_id=msg_id)
+ future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
self._do_send(
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
- confirm=True, mandatory=True, persistent=False,
- expiration_time=expiration_time, retrier=retrier
+ msg_props=msg_props, confirm=True, mandatory=True,
+ persistent=False, expiration_time=expiration_time,
+ retrier=retrier
)
try:
@@ -580,10 +545,78 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
if isinstance(e, futures.TimeoutError):
e = exceptions.MessagingTimeout()
raise e
-
else:
self._do_send(
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
- confirm=True, mandatory=True, persistent=False,
- expiration_time=expiration_time, retrier=retrier
+ msg_props=msg_props, confirm=True, mandatory=True,
+ persistent=False, expiration_time=expiration_time,
+ retrier=retrier
)
+
+
+class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
+ """PikaOutgoingMessage implementation for RPC reply messages. It sets
+ correlation_id AMQP property to link this reply with response
+ """
+ def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
+ content_type="application/json", content_encoding="utf-8"):
+ """Initialize with reply information for sending
+
+ :param pika_engine: PikaEngine, shared object with configuration and
+ shared driver functionality
+ :param msg_id: String, msg_id of RPC request, which waits for reply
+ :param reply: Dictionary, reply. In case of exception should be None
+ :param failure_info: Tuple, should be a sys.exc_info() tuple.
+ Should be None if RPC request was successfully processed.
+ :param content_type: String, content-type header, defines serialization
+ mechanism
+ :param content_encoding: String, defines encoding for text data
+ """
+ self.msg_id = msg_id
+
+ if failure_info is not None:
+ ex_class = failure_info[0]
+ ex = failure_info[1]
+ tb = traceback.format_exception(*failure_info)
+ if issubclass(ex_class, RemoteExceptionMixin):
+ failure_data = {
+ 'c': ex.clazz,
+ 'm': ex.module,
+ 's': ex.message,
+ 't': tb
+ }
+ else:
+ failure_data = {
+ 'c': six.text_type(ex_class.__name__),
+ 'm': six.text_type(ex_class.__module__),
+ 's': six.text_type(ex),
+ 't': tb
+ }
+
+ msg = {'e': failure_data}
+ else:
+ msg = {'s': reply}
+
+ super(RpcReplyPikaOutgoingMessage, self).__init__(
+ pika_engine, msg, None, content_type, content_encoding
+ )
+
+ def send(self, reply_q, expiration_time=None, retrier=None):
+ """Send RPC message with configured retrying
+
+ :param reply_q: String, queue name for sending reply
+ :param expiration_time: Float, expiration time in seconds
+ (like time.time())
+ :param retrier: retrying.Retrier, configured retrier object for sending
+ message, if None no retrying is performed
+ """
+
+ msg_dict, msg_props = self._prepare_message_to_send()
+ msg_props.correlation_id = self.msg_id
+
+ self._do_send(
+ exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
+ msg_dict=msg_dict, msg_props=msg_props, confirm=True,
+ mandatory=True, persistent=False, expiration_time=expiration_time,
+ retrier=retrier
+ )
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
index 1390ced..5aa948a 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py
@@ -18,6 +18,7 @@ import time
from oslo_log import log as logging
import pika_pool
import retrying
+import six
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
@@ -68,7 +69,7 @@ class PikaPoller(object):
if self._queues_to_consume is None:
self._queues_to_consume = self._declare_queue_binding()
- for queue, no_ack in self._queues_to_consume.iteritems():
+ for queue, no_ack in six.iteritems(self._queues_to_consume):
self._start_consuming(queue, no_ack)
def _declare_queue_binding(self):
diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/tests/drivers/pika/__init__.py
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
new file mode 100644
index 0000000..5008ce3
--- /dev/null
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -0,0 +1,622 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import functools
+import time
+import unittest
+
+from concurrent import futures
+from mock import mock, patch
+from oslo_serialization import jsonutils
+import pika
+from pika import spec
+
+import oslo_messaging
+from oslo_messaging._drivers.pika_driver import pika_engine
+from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
+
+
+class PikaIncomingMessageTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._channel = mock.Mock()
+
+ self._delivery_tag = 12345
+
+ self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
+ self._properties = pika.BasicProperties(
+ content_type="application/json",
+ headers={"version": "1.0"},
+ )
+ self._body = (
+ b'{"_$_key_context":"context_value",'
+ b'"payload_key": "payload_value"}'
+ )
+
+ def test_message_body_parsing(self):
+ message = pika_drv_msg.PikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ self.assertEqual(message.ctxt.get("key_context", None),
+ "context_value")
+ self.assertEqual(message.message.get("payload_key", None),
+ "payload_value")
+
+ def test_message_acknowledge(self):
+ message = pika_drv_msg.PikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, False
+ )
+
+ message.acknowledge()
+
+ self.assertEqual(1, self._channel.basic_ack.call_count)
+ self.assertEqual({"delivery_tag": self._delivery_tag},
+ self._channel.basic_ack.call_args[1])
+
+ def test_message_acknowledge_no_ack(self):
+ message = pika_drv_msg.PikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ message.acknowledge()
+
+ self.assertEqual(0, self._channel.basic_ack.call_count)
+
+ def test_message_requeue(self):
+ message = pika_drv_msg.PikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, False
+ )
+
+ message.requeue()
+
+ self.assertEqual(1, self._channel.basic_nack.call_count)
+ self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
+ self._channel.basic_nack.call_args[1])
+
+ def test_message_requeue_no_ack(self):
+ message = pika_drv_msg.PikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ message.requeue()
+
+ self.assertEqual(0, self._channel.basic_nack.call_count)
+
+
+class RpcPikaIncomingMessageTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._pika_engine.rpc_reply_retry_attempts = 3
+ self._pika_engine.rpc_reply_retry_delay = 0.25
+
+ self._channel = mock.Mock()
+
+ self._delivery_tag = 12345
+
+ self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
+ self._body = (
+ b'{"_$_key_context":"context_value",'
+ b'"payload_key":"payload_value"}'
+ )
+ self._properties = pika.BasicProperties(
+ content_type="application/json",
+ content_encoding="utf-8",
+ headers={"version": "1.0"},
+ )
+
+ def test_call_message_body_parsing(self):
+ self._properties.correlation_id = 123456789
+ self._properties.reply_to = "reply_queue"
+
+ message = pika_drv_msg.RpcPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ self.assertEqual(message.ctxt.get("key_context", None),
+ "context_value")
+ self.assertEqual(message.msg_id, 123456789)
+ self.assertEqual(message.reply_q, "reply_queue")
+
+ self.assertEqual(message.message.get("payload_key", None),
+ "payload_value")
+
+ def test_cast_message_body_parsing(self):
+ message = pika_drv_msg.RpcPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ self.assertEqual(message.ctxt.get("key_context", None),
+ "context_value")
+ self.assertEqual(message.msg_id, None)
+ self.assertEqual(message.reply_q, None)
+
+ self.assertEqual(message.message.get("payload_key", None),
+ "payload_value")
+
+ @patch(("oslo_messaging._drivers.pika_driver.pika_message."
+ "PikaOutgoingMessage.send"))
+ def test_reply_for_cast_message(self, send_reply_mock):
+ message = pika_drv_msg.RpcPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ self.assertEqual(message.ctxt.get("key_context", None),
+ "context_value")
+ self.assertEqual(message.msg_id, None)
+ self.assertEqual(message.reply_q, None)
+
+ self.assertEqual(message.message.get("payload_key", None),
+ "payload_value")
+
+ message.reply(reply=object())
+
+ self.assertEqual(send_reply_mock.call_count, 0)
+
+ @patch("oslo_messaging._drivers.pika_driver.pika_message."
+ "RpcReplyPikaOutgoingMessage")
+ @patch("retrying.retry")
+ def test_positive_reply_for_call_message(self,
+ retry_mock,
+ outgoing_message_mock):
+ self._properties.correlation_id = 123456789
+ self._properties.reply_to = "reply_queue"
+
+ message = pika_drv_msg.RpcPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ self.assertEqual(message.ctxt.get("key_context", None),
+ "context_value")
+ self.assertEqual(message.msg_id, 123456789)
+ self.assertEqual(message.reply_q, "reply_queue")
+
+ self.assertEqual(message.message.get("payload_key", None),
+ "payload_value")
+ reply = "all_fine"
+ message.reply(reply=reply)
+
+ outgoing_message_mock.assert_called_once_with(
+ self._pika_engine, 123456789, failure_info=None, reply='all_fine',
+ content_encoding='utf-8', content_type='application/json'
+ )
+ outgoing_message_mock().send.assert_called_once_with(
+ expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
+ )
+ retry_mock.assert_called_once_with(
+ retry_on_exception=mock.ANY, stop_max_attempt_number=3,
+ wait_fixed=250.0
+ )
+
+ @patch("oslo_messaging._drivers.pika_driver.pika_message."
+ "RpcReplyPikaOutgoingMessage")
+ @patch("retrying.retry")
+ def test_negative_reply_for_call_message(self,
+ retry_mock,
+ outgoing_message_mock):
+ self._properties.correlation_id = 123456789
+ self._properties.reply_to = "reply_queue"
+
+ message = pika_drv_msg.RpcPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ self._body, True
+ )
+
+ self.assertEqual(message.ctxt.get("key_context", None),
+ "context_value")
+ self.assertEqual(message.msg_id, 123456789)
+ self.assertEqual(message.reply_q, "reply_queue")
+
+ self.assertEqual(message.message.get("payload_key", None),
+ "payload_value")
+
+ failure_info = object()
+ message.reply(failure=failure_info)
+
+ outgoing_message_mock.assert_called_once_with(
+ self._pika_engine, 123456789,
+ failure_info=failure_info,
+ reply=None,
+ content_encoding='utf-8',
+ content_type='application/json'
+ )
+ outgoing_message_mock().send.assert_called_once_with(
+ expiration_time=None, reply_q='reply_queue', retrier=mock.ANY
+ )
+ retry_mock.assert_called_once_with(
+ retry_on_exception=mock.ANY, stop_max_attempt_number=3,
+ wait_fixed=250.0
+ )
+
+
+class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.Mock()
+ self._pika_engine.allowed_remote_exmods = [
+ pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
+ ]
+
+ self._channel = mock.Mock()
+
+ self._delivery_tag = 12345
+
+ self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
+
+ self._properties = pika.BasicProperties(
+ content_type="application/json",
+ content_encoding="utf-8",
+ headers={"version": "1.0"},
+ correlation_id=123456789
+ )
+
+ def test_positive_reply_message_body_parsing(self):
+
+ body = b'{"s": "all fine"}'
+
+ message = pika_drv_msg.RpcReplyPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ body, True
+ )
+
+ self.assertEqual(message.msg_id, 123456789)
+ self.assertIsNone(message.failure)
+ self.assertEquals(message.result, "all fine")
+
+ def test_negative_reply_message_body_parsing(self):
+
+ body = (b'{'
+ b' "e": {'
+ b' "s": "Error message",'
+ b' "t": ["TRACE HERE"],'
+ b' "c": "MessagingException",'
+ b' "m": "oslo_messaging.exceptions"'
+ b' }'
+ b'}')
+
+ message = pika_drv_msg.RpcReplyPikaIncomingMessage(
+ self._pika_engine, self._channel, self._method, self._properties,
+ body, True
+ )
+
+ self.assertEqual(message.msg_id, 123456789)
+ self.assertIsNone(message.result)
+ self.assertEquals(
+ str(message.failure),
+ 'Error message\n'
+ 'TRACE HERE'
+ )
+ self.assertIsInstance(message.failure,
+ oslo_messaging.MessagingException)
+
+
+class PikaOutgoingMessageTestCase(unittest.TestCase):
+ def setUp(self):
+ self._pika_engine = mock.MagicMock()
+ self._exchange = "it is exchange"
+ self._routing_key = "it is routing key"
+ self._expiration = 1
+ self._expiration_time = time.time() + self._expiration
+ self._mandatory = object()
+
+ self._message = {"msg_type": 1, "msg_str": "hello"}
+ self._context = {"request_id": 555, "token": "it is a token"}
+
+ @patch("oslo_serialization.jsonutils.dumps",
+ new=functools.partial(jsonutils.dumps, sort_keys=True))
+ def test_send_with_confirmation(self):
+ message = pika_drv_msg.PikaOutgoingMessage(
+ self._pika_engine, self._message, self._context
+ )
+
+ message.send(
+ exchange=self._exchange,
+ routing_key=self._routing_key,
+ confirm=True,
+ mandatory=self._mandatory,
+ persistent=True,
+ expiration_time=self._expiration_time,
+ retrier=None
+ )
+
+ self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.assert_called_once_with(
+ body=mock.ANY,
+ exchange=self._exchange, mandatory=self._mandatory,
+ properties=mock.ANY,
+ routing_key=self._routing_key
+ )
+
+ body = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["body"]
+
+ self.assertEqual(
+ b'{"_$_request_id": 555, "_$_token": "it is a token", '
+ b'"msg_str": "hello", "msg_type": 1}',
+ body
+ )
+
+ props = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["properties"]
+
+ self.assertEqual(props.content_encoding, 'utf-8')
+ self.assertEqual(props.content_type, 'application/json')
+ self.assertEqual(props.delivery_mode, 2)
+ self.assertTrue(self._expiration * 1000 - float(props.expiration) <
+ 100)
+ self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertTrue(props.message_id)
+
+ @patch("oslo_serialization.jsonutils.dumps",
+ new=functools.partial(jsonutils.dumps, sort_keys=True))
+ def test_send_without_confirmation(self):
+ message = pika_drv_msg.PikaOutgoingMessage(
+ self._pika_engine, self._message, self._context
+ )
+
+ message.send(
+ exchange=self._exchange,
+ routing_key=self._routing_key,
+ confirm=False,
+ mandatory=self._mandatory,
+ persistent=False,
+ expiration_time=self._expiration_time,
+ retrier=None
+ )
+
+ self._pika_engine.connection_without_confirmation_pool.acquire(
+ ).__enter__().channel.publish.assert_called_once_with(
+ body=mock.ANY,
+ exchange=self._exchange, mandatory=self._mandatory,
+ properties=mock.ANY,
+ routing_key=self._routing_key
+ )
+
+ body = self._pika_engine.connection_without_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["body"]
+
+ self.assertEqual(
+ b'{"_$_request_id": 555, "_$_token": "it is a token", '
+ b'"msg_str": "hello", "msg_type": 1}',
+ body
+ )
+
+ props = self._pika_engine.connection_without_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["properties"]
+
+ self.assertEqual(props.content_encoding, 'utf-8')
+ self.assertEqual(props.content_type, 'application/json')
+ self.assertEqual(props.delivery_mode, 1)
+ self.assertTrue(self._expiration * 1000 - float(props.expiration)
+ < 100)
+ self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertTrue(props.message_id)
+
+
+class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
+ def setUp(self):
+ self._exchange = "it is exchange"
+ self._routing_key = "it is routing key"
+
+ self._pika_engine = mock.MagicMock()
+ self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
+ self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
+
+ self._message = {"msg_type": 1, "msg_str": "hello"}
+ self._context = {"request_id": 555, "token": "it is a token"}
+
+ @patch("oslo_serialization.jsonutils.dumps",
+ new=functools.partial(jsonutils.dumps, sort_keys=True))
+ def test_send_cast_message(self):
+ message = pika_drv_msg.RpcPikaOutgoingMessage(
+ self._pika_engine, self._message, self._context
+ )
+
+ expiration = 1
+ expiration_time = time.time() + expiration
+
+ message.send(
+ target=oslo_messaging.Target(exchange=self._exchange,
+ topic=self._routing_key),
+ reply_listener=None,
+ expiration_time=expiration_time,
+ retrier=None
+ )
+
+ self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.assert_called_once_with(
+ body=mock.ANY,
+ exchange=self._exchange, mandatory=True,
+ properties=mock.ANY,
+ routing_key=self._routing_key
+ )
+
+ body = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["body"]
+
+ self.assertEqual(
+ b'{"_$_request_id": 555, "_$_token": "it is a token", '
+ b'"msg_str": "hello", "msg_type": 1}',
+ body
+ )
+
+ props = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["properties"]
+
+ self.assertEqual(props.content_encoding, 'utf-8')
+ self.assertEqual(props.content_type, 'application/json')
+ self.assertEqual(props.delivery_mode, 1)
+ self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
+ self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertIsNone(props.correlation_id)
+ self.assertIsNone(props.reply_to)
+ self.assertTrue(props.message_id)
+
+ @patch("oslo_serialization.jsonutils.dumps",
+ new=functools.partial(jsonutils.dumps, sort_keys=True))
+ def test_send_call_message(self):
+ message = pika_drv_msg.RpcPikaOutgoingMessage(
+ self._pika_engine, self._message, self._context
+ )
+
+ expiration = 1
+ expiration_time = time.time() + expiration
+
+ result = "it is a result"
+ reply_queue_name = "reply_queue_name"
+
+ future = futures.Future()
+ future.set_result(result)
+ reply_listener = mock.Mock()
+ reply_listener.register_reply_waiter.return_value = future
+ reply_listener.get_reply_qname.return_value = reply_queue_name
+
+ res = message.send(
+ target=oslo_messaging.Target(exchange=self._exchange,
+ topic=self._routing_key),
+ reply_listener=reply_listener,
+ expiration_time=expiration_time,
+ retrier=None
+ )
+
+ self.assertEqual(result, res)
+
+ self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.assert_called_once_with(
+ body=mock.ANY,
+ exchange=self._exchange, mandatory=True,
+ properties=mock.ANY,
+ routing_key=self._routing_key
+ )
+
+ body = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["body"]
+
+ self.assertEqual(
+ b'{"_$_request_id": 555, "_$_token": "it is a token", '
+ b'"msg_str": "hello", "msg_type": 1}',
+ body
+ )
+
+ props = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["properties"]
+
+ self.assertEqual(props.content_encoding, 'utf-8')
+ self.assertEqual(props.content_type, 'application/json')
+ self.assertEqual(props.delivery_mode, 1)
+ self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
+ self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertEqual(props.correlation_id, message.msg_id)
+ self.assertEquals(props.reply_to, reply_queue_name)
+ self.assertTrue(props.message_id)
+
+
+class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
+ def setUp(self):
+ self._reply_q = "reply_queue_name"
+
+ self._expiration = 1
+ self._expiration_time = time.time() + self._expiration
+
+ self._pika_engine = mock.MagicMock()
+
+ self._rpc_reply_exchange = "rpc_reply_exchange"
+ self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
+
+ self._msg_id = 12345567
+
+ @patch("oslo_serialization.jsonutils.dumps",
+ new=functools.partial(jsonutils.dumps, sort_keys=True))
+ def test_success_message_send(self):
+ message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
+ self._pika_engine, self._msg_id, reply="all_fine"
+ )
+
+ message.send(self._reply_q, expiration_time=self._expiration_time,
+ retrier=None)
+
+ self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.assert_called_once_with(
+ body=b'{"s": "all_fine"}',
+ exchange=self._rpc_reply_exchange, mandatory=True,
+ properties=mock.ANY,
+ routing_key=self._reply_q
+ )
+
+ props = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["properties"]
+
+ self.assertEqual(props.content_encoding, 'utf-8')
+ self.assertEqual(props.content_type, 'application/json')
+ self.assertEqual(props.delivery_mode, 1)
+ self.assertTrue(self._expiration * 1000 - float(props.expiration) <
+ 100)
+ self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertEqual(props.correlation_id, message.msg_id)
+ self.assertIsNone(props.reply_to)
+ self.assertTrue(props.message_id)
+
+ @patch("traceback.format_exception", new=lambda x,y,z:z)
+ @patch("oslo_serialization.jsonutils.dumps",
+ new=functools.partial(jsonutils.dumps, sort_keys=True))
+ def test_failure_message_send(self):
+ failure_info = (oslo_messaging.MessagingException,
+ oslo_messaging.MessagingException("Error message"),
+ ['It is a trace'])
+
+
+ message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
+ self._pika_engine, self._msg_id, failure_info=failure_info
+ )
+
+ message.send(self._reply_q, expiration_time=self._expiration_time,
+ retrier=None)
+
+ self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.assert_called_once_with(
+ body=mock.ANY,
+ exchange=self._rpc_reply_exchange,
+ mandatory=True,
+ properties=mock.ANY,
+ routing_key=self._reply_q
+ )
+
+ body = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["body"]
+ self.assertEqual(
+ b'{"e": {"c": "MessagingException", '
+ b'"m": "oslo_messaging.exceptions", "s": "Error message", '
+ b'"t": ["It is a trace"]}}',
+ body
+ )
+
+ props = self._pika_engine.connection_with_confirmation_pool.acquire(
+ ).__enter__().channel.publish.call_args[1]["properties"]
+
+ self.assertEqual(props.content_encoding, 'utf-8')
+ self.assertEqual(props.content_type, 'application/json')
+ self.assertEqual(props.delivery_mode, 1)
+ self.assertTrue(self._expiration * 1000 - float(props.expiration) <
+ 100)
+ self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertEqual(props.correlation_id, message.msg_id)
+ self.assertIsNone(props.reply_to)
+ self.assertTrue(props.message_id)