summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2019-06-17 16:35:55 +0000
committerGerrit Code Review <review@openstack.org>2019-06-17 16:35:55 +0000
commite7420afa2f8eb42d17f9af2ad90ed0090a89ed41 (patch)
tree0d3392f9452548bf6593e0fefb83ca723131ffdb
parente45dfa7ea207b8f95ab09eeb0c863840db02ff7f (diff)
parent83266cc6ea7dbe682f8fdb960a548c68fd1235b5 (diff)
downloadoslo-messaging-e7420afa2f8eb42d17f9af2ad90ed0090a89ed41.tar.gz
Merge "Add transport_options parameter"
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py10
-rw-r--r--oslo_messaging/_drivers/base.py12
-rw-r--r--oslo_messaging/_drivers/impl_fake.py8
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py25
-rw-r--r--oslo_messaging/rpc/client.py39
-rwxr-xr-xoslo_messaging/tests/rpc/test_client.py43
-rwxr-xr-xoslo_messaging/tests/test_transport.py6
-rw-r--r--oslo_messaging/transport.py5
8 files changed, 102 insertions, 46 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 30b4d2b..1cdea3a 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -580,7 +580,7 @@ class AMQPDriverBase(base.BaseDriver):
def _send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
- envelope=True, notify=False, retry=None):
+ envelope=True, notify=False, retry=None, transport_options=None):
msg = message
@@ -626,7 +626,8 @@ class AMQPDriverBase(base.BaseDriver):
" topic '%(topic)s'", {'exchange': exchange,
'topic': topic})
conn.topic_send(exchange_name=exchange, topic=topic,
- msg=msg, timeout=timeout, retry=retry)
+ msg=msg, timeout=timeout, retry=retry,
+ transport_options=transport_options)
if wait_for_reply:
result = self._waiter.wait(msg_id, timeout,
@@ -639,9 +640,10 @@ class AMQPDriverBase(base.BaseDriver):
self._waiter.unlisten(msg_id)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
- call_monitor_timeout=None, retry=None):
+ call_monitor_timeout=None, retry=None, transport_options=None):
return self._send(target, ctxt, message, wait_for_reply, timeout,
- call_monitor_timeout, retry=retry)
+ call_monitor_timeout, retry=retry,
+ transport_options=transport_options)
def send_notification(self, target, ctxt, message, version, retry=None):
return self._send(target, ctxt, message,
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index 82dcda5..fb44540 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -20,7 +20,6 @@ from oslo_utils import excutils
from oslo_utils import timeutils
import six
-
from oslo_messaging import exceptions
base_opts = [
@@ -41,6 +40,7 @@ def batch_poll_helper(func):
:py:meth:`PollStyleListener.poll` implementation that only polls for a
single message per call.
"""
+
def wrapper(in_self, timeout=None, batch_size=1, batch_timeout=None):
incomings = []
driver_prefetch = in_self.prefetch_size
@@ -57,6 +57,7 @@ def batch_poll_helper(func):
break
return incomings
+
return wrapper
@@ -244,9 +245,9 @@ class Listener(object):
all backend implementations.
:type prefetch_size: int
"""
+
def __init__(self, batch_size, batch_timeout,
prefetch_size=-1):
-
self.on_incoming_callback = None
self.batch_timeout = batch_timeout
self.prefetch_size = prefetch_size
@@ -283,6 +284,7 @@ class PollStyleListenerAdapter(Listener):
"""A Listener that uses a PollStyleListener for message transfer. A
dedicated thread is created to do message polling.
"""
+
def __init__(self, poll_style_listener, batch_size, batch_timeout):
super(PollStyleListenerAdapter, self).__init__(
batch_size, batch_timeout, poll_style_listener.prefetch_size
@@ -364,7 +366,7 @@ class BaseDriver(object):
@abc.abstractmethod
def send(self, target, ctxt, message,
wait_for_reply=None, timeout=None, call_monitor_timeout=None,
- retry=None):
+ retry=None, transport_options=None):
"""Send a message to the given target and optionally wait for a reply.
This method is used by the RPC client when sending RPC requests to a
server.
@@ -434,6 +436,10 @@ class BaseDriver(object):
:type call_monitor_timeout: float
:param retry: maximum message send attempts permitted
:type retry: int
+ :param transport_options: additional parameters to configure the driver
+ for example to send parameters as "mandatory"
+ flag in RabbitMQ
+ :type transport_options: dictionary
:returns: A reply message or None if no reply expected
:raises: :py:exc:`MessagingException`, any exception thrown by the
remote server when executing the RPC call.
diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py
index fccfebe..05300e7 100644
--- a/oslo_messaging/_drivers/impl_fake.py
+++ b/oslo_messaging/_drivers/impl_fake.py
@@ -188,7 +188,8 @@ class FakeDriver(base.BaseDriver):
"""
json.dumps(message)
- def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
+ def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
+ transport_options=None):
self._check_serialize(message)
exchange = self._exchange_manager.get_exchange(target.exchange)
@@ -216,10 +217,11 @@ class FakeDriver(base.BaseDriver):
return None
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
- call_monitor_timeout=None, retry=None):
+ call_monitor_timeout=None, retry=None, transport_options=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
# transport always works
- return self._send(target, ctxt, message, wait_for_reply, timeout)
+ return self._send(target, ctxt, message, wait_for_reply, timeout,
+ transport_options)
def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index b08b6ef..aedb163 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -1104,7 +1104,7 @@ class Connection(object):
self.declare_consumer(consumer)
def _ensure_publishing(self, method, exchange, msg, routing_key=None,
- timeout=None, retry=None):
+ timeout=None, retry=None, transport_options=None):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
@@ -1113,7 +1113,8 @@ class Connection(object):
"'%(topic)s': %(err_str)s", log_info)
LOG.debug('Exception', exc_info=exc)
- method = functools.partial(method, exchange, msg, routing_key, timeout)
+ method = functools.partial(method, exchange, msg, routing_key,
+ timeout, transport_options)
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
@@ -1135,7 +1136,8 @@ class Connection(object):
'connection_id': self.connection_id})
return info
- def _publish(self, exchange, msg, routing_key=None, timeout=None):
+ def _publish(self, exchange, msg, routing_key=None, timeout=None,
+ transport_options=None):
"""Publish a message."""
if not (exchange.passive or exchange.name in self._declared_exchanges):
@@ -1144,7 +1146,8 @@ class Connection(object):
log_info = {'msg': msg,
'who': exchange or 'default',
- 'key': routing_key}
+ 'key': routing_key,
+ 'transport_options': str(transport_options)}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
@@ -1158,7 +1161,8 @@ class Connection(object):
compression=self.kombu_compression)
def _publish_and_creates_default_queue(self, exchange, msg,
- routing_key=None, timeout=None):
+ routing_key=None, timeout=None,
+ transport_options=None):
"""Publisher that declares a default queue
When the exchange is missing instead of silently creates an exchange
@@ -1195,7 +1199,8 @@ class Connection(object):
def _publish_and_raises_on_missing_exchange(self, exchange, msg,
routing_key=None,
- timeout=None):
+ timeout=None,
+ transport_options=None):
"""Publisher that raises exception if exchange is missing."""
if not exchange.passive:
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
@@ -1203,7 +1208,7 @@ class Connection(object):
try:
self._publish(exchange, msg, routing_key=routing_key,
- timeout=timeout)
+ timeout=timeout, transport_options=transport_options)
return
except self.connection.channel_errors as exc:
if exc.code == 404:
@@ -1230,7 +1235,8 @@ class Connection(object):
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
- def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
+ def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
+ transport_options=None):
"""Send a 'topic' message."""
exchange = kombu.entity.Exchange(
name=exchange_name,
@@ -1240,7 +1246,8 @@ class Connection(object):
self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, timeout=timeout,
- retry=retry)
+ retry=retry,
+ transport_options=transport_options)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index 636b2a1..ea5f54e 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -1,4 +1,3 @@
-
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@@ -45,7 +44,6 @@ _client_opts = [
class RemoteError(exceptions.MessagingException):
-
"""Signifies that a remote endpoint method has raised an exception.
Contains a string representation of the type of the original exception,
@@ -94,7 +92,7 @@ class _BaseCallContext(object):
def __init__(self, transport, target, serializer,
timeout=None, version_cap=None, retry=None,
- call_monitor_timeout=None):
+ call_monitor_timeout=None, transport_options=None):
self.conf = transport.conf
self.transport = transport
@@ -104,6 +102,7 @@ class _BaseCallContext(object):
self.call_monitor_timeout = call_monitor_timeout
self.retry = retry
self.version_cap = version_cap
+ self.transport_options = transport_options
super(_BaseCallContext, self).__init__()
@@ -150,7 +149,9 @@ class _BaseCallContext(object):
self._check_version_cap(msg.get('version'))
try:
- self.transport._send(self.target, msg_ctxt, msg, retry=self.retry)
+ self.transport._send(self.target, msg_ctxt, msg,
+ retry=self.retry,
+ transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@@ -172,10 +173,12 @@ class _BaseCallContext(object):
self._check_version_cap(msg.get('version'))
try:
- result = self.transport._send(self.target, msg_ctxt, msg,
- wait_for_reply=True, timeout=timeout,
- call_monitor_timeout=cm_timeout,
- retry=self.retry)
+ result = \
+ self.transport._send(self.target, msg_ctxt, msg,
+ wait_for_reply=True, timeout=timeout,
+ call_monitor_timeout=cm_timeout,
+ retry=self.retry,
+ transport_options=self.transport_options)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
@@ -190,7 +193,6 @@ class _BaseCallContext(object):
class _CallContext(_BaseCallContext):
-
_marker = _BaseCallContext._marker
@classmethod
@@ -198,7 +200,7 @@ class _CallContext(_BaseCallContext):
exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker,
- call_monitor_timeout=_marker):
+ call_monitor_timeout=_marker, transport_options=_marker):
cls._check_version(version)
kwargs = dict(
exchange=exchange,
@@ -219,11 +221,13 @@ class _CallContext(_BaseCallContext):
retry = call_context.retry
if call_monitor_timeout is cls._marker:
call_monitor_timeout = call_context.call_monitor_timeout
+ if transport_options is cls._marker:
+ transport_options = call_context.transport_options
return _CallContext(call_context.transport, target,
call_context.serializer,
timeout, version_cap, retry,
- call_monitor_timeout)
+ call_monitor_timeout, transport_options)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
@@ -237,7 +241,6 @@ class _CallContext(_BaseCallContext):
class RPCClient(_BaseCallContext):
-
"""A class for invoking methods on remote RPC servers.
The RPCClient class is responsible for sending method invocations to and
@@ -326,7 +329,7 @@ class RPCClient(_BaseCallContext):
def __init__(self, transport, target,
timeout=None, version_cap=None, serializer=None, retry=None,
- call_monitor_timeout=None):
+ call_monitor_timeout=None, transport_options=None):
"""Construct an RPC client.
:param transport: a messaging transport handle
@@ -362,7 +365,7 @@ class RPCClient(_BaseCallContext):
super(RPCClient, self).__init__(
transport, target, serializer, timeout, version_cap, retry,
- call_monitor_timeout
+ call_monitor_timeout, transport_options
)
self.conf.register_opts(_client_opts)
@@ -370,7 +373,7 @@ class RPCClient(_BaseCallContext):
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker,
- call_monitor_timeout=_marker):
+ call_monitor_timeout=_marker, transport_options=_marker):
"""Prepare a method invocation context.
Use this method to override client properties for an individual method
@@ -401,6 +404,10 @@ class RPCClient(_BaseCallContext):
0 means no retry is attempted.
N means attempt at most N retries.
:type retry: int
+ :param transport_options: additional parameters to configure the driver
+ for example to send parameters as "mandatory"
+ flag in RabbitMQ
+ :type transport_options: dictionary
:param call_monitor_timeout: an optional timeout (in seconds) for
active call heartbeating. If specified,
requires the server to heartbeat
@@ -413,7 +420,7 @@ class RPCClient(_BaseCallContext):
exchange, topic, namespace,
version, server, fanout,
timeout, version_cap, retry,
- call_monitor_timeout)
+ call_monitor_timeout, transport_options)
def cast(self, ctxt, method, **kwargs):
"""Invoke a method without blocking for a return value.
diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py
index 9fa40db..ec22d70 100755
--- a/oslo_messaging/tests/rpc/test_client.py
+++ b/oslo_messaging/tests/rpc/test_client.py
@@ -49,7 +49,31 @@ class TestCastCall(test_utils.BaseTestCase):
transport._send = mock.Mock()
msg = dict(method='foo', args=self.args)
- kwargs = {'retry': None}
+ kwargs = {'retry': None, 'transport_options': None}
+ if self.call:
+ kwargs['wait_for_reply'] = True
+ kwargs['timeout'] = None
+ kwargs['call_monitor_timeout'] = None
+
+ method = client.call if self.call else client.cast
+ method(self.ctxt, 'foo', **self.args)
+
+ transport._send.assert_called_once_with(oslo_messaging.Target(),
+ self.ctxt,
+ msg,
+ **kwargs)
+
+ def test_cast_call_with_transport_options(self):
+ self.config(rpc_response_timeout=None)
+
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
+ client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
+ transport_options={'my_k': 'my_val'})
+
+ transport._send = mock.Mock()
+
+ msg = dict(method='foo', args=self.args)
+ kwargs = {'retry': None, 'transport_options': {'my_k': 'my_val'}}
if self.call:
kwargs['wait_for_reply'] = True
kwargs['timeout'] = None
@@ -203,7 +227,8 @@ class TestCastToTarget(test_utils.BaseTestCase):
transport._send.assert_called_once_with(expect_target,
{},
msg,
- retry=None)
+ retry=None,
+ transport_options=None)
TestCastToTarget.generate_scenarios()
@@ -245,7 +270,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=self.expect, retry=None,
- call_monitor_timeout=self.cm)
+ call_monitor_timeout=self.cm, transport_options=None)
if self.prepare is not _notset:
client = client.prepare(timeout=self.prepare)
@@ -277,7 +302,8 @@ class TestCallRetry(test_utils.BaseTestCase):
msg = dict(method='foo', args={})
kwargs = dict(wait_for_reply=True, timeout=60,
- retry=self.expect, call_monitor_timeout=None)
+ retry=self.expect, call_monitor_timeout=None,
+ transport_options=None)
if self.prepare is not _notset:
client = client.prepare(retry=self.prepare)
@@ -334,8 +360,8 @@ class TestSerializer(test_utils.BaseTestCase):
serializer=serializer)
transport._send = mock.Mock()
-
- kwargs = dict(wait_for_reply=True, timeout=None) if self.call else {}
+ kwargs = dict(wait_for_reply=True,
+ timeout=None) if self.call else {}
kwargs['retry'] = None
if self.call:
kwargs['call_monitor_timeout'] = None
@@ -367,6 +393,7 @@ class TestSerializer(test_utils.BaseTestCase):
transport._send.assert_called_once_with(oslo_messaging.Target(),
dict(user='alice'),
msg,
+ transport_options=None,
**kwargs)
expected_calls = [mock.call(self.ctxt, arg) for arg in self.args]
self.assertEqual(expected_calls,
@@ -466,7 +493,9 @@ class TestVersionCap(test_utils.BaseTestCase):
self.assertFalse(self.success)
else:
self.assertTrue(self.success)
- transport._send.assert_called_once_with(target, {}, msg, **kwargs)
+ transport._send.assert_called_once_with(target, {}, msg,
+ transport_options=None,
+ **kwargs)
TestVersionCap.generate_scenarios()
diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py
index a2c17f9..02a19f7 100755
--- a/oslo_messaging/tests/test_transport.py
+++ b/oslo_messaging/tests/test_transport.py
@@ -231,7 +231,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
wait_for_reply=None,
timeout=None,
call_monitor_timeout=None,
- retry=None)
+ retry=None,
+ transport_options=None)
def test_send_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
@@ -250,7 +251,8 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
wait_for_reply='wait_for_reply',
timeout='timeout',
call_monitor_timeout='cm_timeout',
- retry='retry')
+ retry='retry',
+ transport_options=None)
def test_send_notification(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py
index e44cb26..c263e35 100644
--- a/oslo_messaging/transport.py
+++ b/oslo_messaging/transport.py
@@ -116,7 +116,7 @@ class Transport(object):
self._driver.require_features(requeue=requeue)
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
- call_monitor_timeout=None, retry=None):
+ call_monitor_timeout=None, retry=None, transport_options=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
@@ -124,7 +124,8 @@ class Transport(object):
wait_for_reply=wait_for_reply,
timeout=timeout,
call_monitor_timeout=call_monitor_timeout,
- retry=retry)
+ retry=retry,
+ transport_options=transport_options)
def _send_notification(self, target, ctxt, message, version, retry=None):
if not target.topic: