summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_messaging/notify/listener.py6
-rw-r--r--oslo_messaging/notify/notifier.py9
-rw-r--r--oslo_messaging/rpc/client.py10
-rw-r--r--oslo_messaging/rpc/server.py5
-rw-r--r--oslo_messaging/rpc/transport.py5
-rw-r--r--oslo_messaging/tests/notify/test_listener.py31
-rw-r--r--oslo_messaging/tests/notify/test_log_handler.py3
-rw-r--r--oslo_messaging/tests/notify/test_logger.py7
-rwxr-xr-xoslo_messaging/tests/notify/test_notifier.py51
-rwxr-xr-xoslo_messaging/tests/rpc/test_client.py39
-rw-r--r--oslo_messaging/tests/rpc/test_server.py14
-rwxr-xr-xoslo_messaging/tests/test_transport.py1
-rw-r--r--oslo_messaging/transport.py22
13 files changed, 148 insertions, 55 deletions
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py
index 0142856..9de09f8 100644
--- a/oslo_messaging/notify/listener.py
+++ b/oslo_messaging/notify/listener.py
@@ -132,6 +132,7 @@ import logging
from oslo_messaging._i18n import _LE
from oslo_messaging.notify import dispatcher as notify_dispatcher
from oslo_messaging import server as msg_server
+from oslo_messaging import transport as msg_transport
LOG = logging.getLogger(__name__)
@@ -163,6 +164,11 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
class NotificationServer(NotificationServerBase):
def __init__(self, transport, targets, dispatcher, executor='blocking',
allow_requeue=True, pool=None):
+ if not isinstance(transport, msg_transport.NotificationTransport):
+ LOG.warning("Using RPC transport for notifications. Please use "
+ "get_notification_transport to obtain a "
+ "notification transport instance.")
+
super(NotificationServer, self).__init__(
transport, targets, dispatcher, executor, allow_requeue, pool, 1,
None
diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py
index 7774dea..8bdb13c 100644
--- a/oslo_messaging/notify/notifier.py
+++ b/oslo_messaging/notify/notifier.py
@@ -171,8 +171,9 @@ def get_notification_transport(conf, url=None,
group='oslo_messaging_notifications')
if url is None:
url = conf.oslo_messaging_notifications.transport_url
- return msg_transport._get_transport(conf, url,
- allowed_remote_exmods, aliases)
+ return msg_transport._get_transport(
+ conf, url, allowed_remote_exmods, aliases,
+ transport_cls=msg_transport.NotificationTransport)
class Notifier(object):
@@ -245,6 +246,10 @@ class Notifier(object):
conf.register_opts(_notifier_opts,
group='oslo_messaging_notifications')
+ if not isinstance(transport, msg_transport.NotificationTransport):
+ _LOG.warning("Using RPC transport for notifications. Please use "
+ "get_notification_transport to obtain a "
+ "notification transport instance.")
self.transport = transport
self.publisher_id = publisher_id
if retry is not None:
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index 2a27a9f..4bf7fc4 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -24,6 +24,7 @@ __all__ = [
]
import abc
+import logging
from oslo_config import cfg
import six
@@ -32,6 +33,10 @@ from oslo_messaging._drivers import base as driver_base
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
from oslo_messaging import serializer as msg_serializer
+from oslo_messaging import transport as msg_transport
+
+
+LOG = logging.getLogger(__name__)
_client_opts = [
cfg.IntOpt('rpc_response_timeout',
@@ -331,6 +336,11 @@ class RPCClient(_BaseCallContext):
if serializer is None:
serializer = msg_serializer.NoOpSerializer()
+ if not isinstance(transport, msg_transport.RPCTransport):
+ LOG.warning("Using notification transport for RPC. Please use "
+ "get_rpc_transport to obtain an RPC transport "
+ "instance.")
+
super(RPCClient, self).__init__(
transport, target, serializer, timeout, version_cap, retry
)
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index c94669f..b91fffe 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -135,6 +135,7 @@ from debtcollector.updating import updated_kwarg_default_value
from oslo_messaging._i18n import _LE
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from oslo_messaging import server as msg_server
+from oslo_messaging import transport as msg_transport
LOG = logging.getLogger(__name__)
@@ -142,6 +143,10 @@ LOG = logging.getLogger(__name__)
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
+ if not isinstance(transport, msg_transport.RPCTransport):
+ LOG.warning("Using notification transport for RPC. Please use "
+ "get_rpc_transport to obtain an RPC transport "
+ "instance.")
self._target = target
def _create_listener(self):
diff --git a/oslo_messaging/rpc/transport.py b/oslo_messaging/rpc/transport.py
index e3abe32..90815fc 100644
--- a/oslo_messaging/rpc/transport.py
+++ b/oslo_messaging/rpc/transport.py
@@ -43,5 +43,6 @@ def get_rpc_transport(conf, url=None,
from
:type allowed_remote_exmods: list
"""
- return msg_transport._get_transport(conf, url,
- allowed_remote_exmods)
+ return msg_transport._get_transport(
+ conf, url, allowed_remote_exmods,
+ transport_cls=msg_transport.RPCTransport)
diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py
index 8467c0b..44a48e0 100644
--- a/oslo_messaging/tests/notify/test_listener.py
+++ b/oslo_messaging/tests/notify/test_listener.py
@@ -23,7 +23,6 @@ import oslo_messaging
from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import notifier as msg_notifier
from oslo_messaging.tests import utils as test_utils
-import six
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
@@ -187,7 +186,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertTrue(False)
def test_batch_timeout(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@@ -195,7 +195,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
batch=(5, 1))
notifier = self._setup_notifier(transport)
- for i in six.moves.range(12):
+ for _ in range(12):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(3)
@@ -213,7 +213,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
mock.call(messages * 2)])
def test_batch_size(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@@ -221,7 +222,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
batch=(5, None))
notifier = self._setup_notifier(transport)
- for i in six.moves.range(10):
+ for _ in range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
@@ -238,7 +239,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
mock.call(messages * 5)])
def test_batch_size_exception_path(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
endpoint = mock.Mock()
endpoint.info.side_effect = [None, Exception('boom!')]
@@ -246,7 +248,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
batch=(5, None))
notifier = self._setup_notifier(transport)
- for i in six.moves.range(10):
+ for _ in range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
@@ -510,3 +512,18 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
for call in mocked_endpoint1_calls:
self.assertIn(call, endpoint2.info.mock_calls +
endpoint3.info.mock_calls)
+
+
+class TestListenerTransportWarning(test_utils.BaseTestCase):
+
+ @mock.patch('oslo_messaging.notify.listener.LOG')
+ def test_warning_when_rpc_transport(self, log):
+ transport = oslo_messaging.get_rpc_transport(self.conf)
+ target = oslo_messaging.Target(topic='foo')
+ endpoints = [object()]
+ oslo_messaging.get_notification_listener(
+ transport, [target], endpoints)
+ log.warning.assert_called_once_with(
+ "Using RPC transport for notifications. Please use "
+ "get_notification_transport to obtain a "
+ "notification transport instance.")
diff --git a/oslo_messaging/tests/notify/test_log_handler.py b/oslo_messaging/tests/notify/test_log_handler.py
index 6ca5f68..40d1094 100644
--- a/oslo_messaging/tests/notify/test_log_handler.py
+++ b/oslo_messaging/tests/notify/test_log_handler.py
@@ -16,7 +16,6 @@ import fixtures
import oslo_messaging
from oslo_messaging.notify import log_handler
-from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@@ -34,7 +33,7 @@ class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
self.stub_flg = True
- transport = test_notifier._FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf)
notifier = oslo_messaging.Notifier(transport)
def fake_notifier(*args, **kwargs):
diff --git a/oslo_messaging/tests/notify/test_logger.py b/oslo_messaging/tests/notify/test_logger.py
index e7cd868..37ce82e 100644
--- a/oslo_messaging/tests/notify/test_logger.py
+++ b/oslo_messaging/tests/notify/test_logger.py
@@ -22,7 +22,6 @@ from oslo_utils import timeutils
import testscenarios
import oslo_messaging
-from oslo_messaging.tests.notify import test_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@@ -58,8 +57,9 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
+ fake_transport = oslo_messaging.get_notification_transport(self.conf)
with mock.patch('oslo_messaging.transport._get_transport',
- return_value=test_notifier._FakeTransport(self.conf)):
+ return_value=fake_transport):
self.logger = oslo_messaging.LoggingNotificationHandler('test://')
mock_utcnow.return_value = datetime.datetime.utcnow()
@@ -102,8 +102,9 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
+ fake_transport = oslo_messaging.get_notification_transport(self.conf)
with mock.patch('oslo_messaging.transport._get_transport',
- return_value=test_notifier._FakeTransport(self.conf)):
+ return_value=fake_transport):
logging.config.dictConfig({
'version': 1,
'handlers': {
diff --git a/oslo_messaging/tests/notify/test_notifier.py b/oslo_messaging/tests/notify/test_notifier.py
index 364803e..02bb8a4 100755
--- a/oslo_messaging/tests/notify/test_notifier.py
+++ b/oslo_messaging/tests/notify/test_notifier.py
@@ -47,15 +47,6 @@ class JsonMessageMatcher(object):
return self.message == jsonutils.loads(other)
-class _FakeTransport(object):
-
- def __init__(self, conf):
- self.conf = conf
-
- def _send_notification(self, target, ctxt, message, version, retry=None):
- pass
-
-
class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
"""Record logged exceptions and re-raise in cleanup.
@@ -73,6 +64,9 @@ class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
def exception(self, msg, *args, **kwargs):
self.exceptions.append(sys.exc_info()[1])
+ def warning(self, msg, *args, **kwargs):
+ return
+
def setUp(self):
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
@@ -170,7 +164,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
topics=self.topics,
group='oslo_messaging_notifications')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
if hasattr(self, 'ctor_pub_id'):
notifier = oslo_messaging.Notifier(transport,
@@ -241,7 +236,8 @@ class TestSerializer(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_serializer(self, mock_utcnow):
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
serializer = msg_serializer.NoOpSerializer()
@@ -289,7 +285,8 @@ class TestNotifierTopics(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
self.config(topics=['topic1', 'topic2'],
group='oslo_messaging_notifications')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
self.assertEqual(['topic1', 'topic2'], notifier._topics)
@@ -297,7 +294,8 @@ class TestNotifierTopics(test_utils.BaseTestCase):
def test_topics_from_kwargs(self):
self.config(driver=['log'],
group='oslo_messaging_notifications')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost',
topics=['topic1', 'topic2'])
@@ -311,7 +309,8 @@ class TestLogNotifier(test_utils.BaseTestCase):
self.config(driver=['log'],
group='oslo_messaging_notifications')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
@@ -386,7 +385,8 @@ class TestNotificationConfig(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
conf.set_override('retry', 3, group='oslo_messaging_notifications')
- transport = _FakeTransport(conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
notifier = oslo_messaging.Notifier(transport)
self.assertEqual(3, notifier.retry)
@@ -397,7 +397,8 @@ class TestNotificationConfig(test_utils.BaseTestCase):
group='oslo_messaging_notifications')
conf.set_override('retry', 3, group='oslo_messaging_notifications')
- transport = _FakeTransport(conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
notifier = oslo_messaging.Notifier(transport, retry=5)
self.assertEqual(5, notifier.retry)
@@ -409,7 +410,8 @@ class TestRoutingNotifier(test_utils.BaseTestCase):
self.config(driver=['routing'],
group='oslo_messaging_notifications')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
self.notifier = oslo_messaging.Notifier(transport)
self.router = self.notifier._driver_mgr['routing'].obj
@@ -642,8 +644,21 @@ class TestNoOpNotifier(test_utils.BaseTestCase):
self.config(driver=['noop'],
group='oslo_messaging_notifications')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ url='fake:')
notifier = oslo_messaging.Notifier(transport, 'test.localhost')
self.assertFalse(notifier.is_enabled())
+
+
+class TestNotifierTransportWarning(test_utils.BaseTestCase):
+
+ @mock.patch('oslo_messaging.notify.notifier._LOG')
+ def test_warning_when_rpc_transport(self, log):
+ transport = oslo_messaging.get_rpc_transport(self.conf)
+ oslo_messaging.Notifier(transport, 'test.localhost')
+ log.warning.assert_called_once_with(
+ "Using RPC transport for notifications. Please use "
+ "get_notification_transport to obtain a "
+ "notification transport instance.")
diff --git a/oslo_messaging/tests/rpc/test_client.py b/oslo_messaging/tests/rpc/test_client.py
index e955167..f57de54 100755
--- a/oslo_messaging/tests/rpc/test_client.py
+++ b/oslo_messaging/tests/rpc/test_client.py
@@ -25,15 +25,6 @@ from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
-class _FakeTransport(object):
-
- def __init__(self, conf):
- self.conf = conf
-
- def _send(self, *args, **kwargs):
- pass
-
-
class TestCastCall(test_utils.BaseTestCase):
scenarios = [
@@ -52,7 +43,7 @@ class TestCastCall(test_utils.BaseTestCase):
def test_cast_call(self):
self.config(rpc_response_timeout=None)
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target())
transport._send = mock.Mock()
@@ -191,7 +182,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
target = oslo_messaging.Target(**self.ctor)
expect_target = oslo_messaging.Target(**self.expect)
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target)
transport._send = mock.Mock()
@@ -242,7 +233,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
def test_call_timeout(self):
self.config(rpc_response_timeout=self.confval)
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
timeout=self.ctor)
@@ -273,7 +264,7 @@ class TestCallRetry(test_utils.BaseTestCase):
]
def test_call_retry(self):
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
retry=self.ctor)
@@ -302,7 +293,7 @@ class TestCallFanout(test_utils.BaseTestCase):
]
def test_call_fanout(self):
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport,
oslo_messaging.Target(**self.target))
@@ -331,7 +322,7 @@ class TestSerializer(test_utils.BaseTestCase):
def test_call_serializer(self):
self.config(rpc_response_timeout=None)
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
serializer = msg_serializer.NoOpSerializer()
client = oslo_messaging.RPCClient(transport, oslo_messaging.Target(),
@@ -430,7 +421,7 @@ class TestVersionCap(test_utils.BaseTestCase):
def test_version_cap(self):
self.config(rpc_response_timeout=None)
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target,
@@ -535,7 +526,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_version_cap(self):
self.config(rpc_response_timeout=None)
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(version=self.version)
client = oslo_messaging.RPCClient(transport, target,
@@ -561,7 +552,7 @@ class TestCanSendVersion(test_utils.BaseTestCase):
def test_invalid_version_type(self):
target = oslo_messaging.Target(topic='sometopic')
- transport = _FakeTransport(self.conf)
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = oslo_messaging.RPCClient(transport, target)
self.assertRaises(exceptions.MessagingException,
client.prepare, version='5')
@@ -569,3 +560,15 @@ class TestCanSendVersion(test_utils.BaseTestCase):
client.prepare, version='5.a')
self.assertRaises(exceptions.MessagingException,
client.prepare, version='5.5.a')
+
+
+class TestTransportWarning(test_utils.BaseTestCase):
+
+ @mock.patch('oslo_messaging.rpc.client.LOG')
+ def test_warning_when_notifier_transport(self, log):
+ transport = oslo_messaging.get_notification_transport(self.conf)
+ oslo_messaging.RPCClient(transport, oslo_messaging.Target())
+ log.warning.assert_called_once_with(
+ "Using notification transport for RPC. Please use "
+ "get_rpc_transport to obtain an RPC transport "
+ "instance.")
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 4d5c6d5..e720e6b 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -449,6 +449,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
+ @mock.patch('oslo_messaging.rpc.server.LOG')
+ def test_warning_when_notifier_transport(self, log):
+ transport = oslo_messaging.get_notification_transport(self.conf)
+ target = oslo_messaging.Target(topic='foo', server='bar')
+ endpoints = [object()]
+ serializer = object()
+
+ oslo_messaging.get_rpc_server(transport, target,
+ endpoints, serializer=serializer)
+ log.warning.assert_called_once_with(
+ "Using notification transport for RPC. Please use "
+ "get_rpc_transport to obtain an RPC transport "
+ "instance.")
+
class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py
index 71ff9ec..e0d327f 100755
--- a/oslo_messaging/tests/test_transport.py
+++ b/oslo_messaging/tests/test_transport.py
@@ -148,6 +148,7 @@ class GetTransportTestCase(test_utils.BaseTestCase):
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
self.assertIs(transport_._driver, drvr)
+ self.assertTrue(isinstance(transport_, transport.RPCTransport))
driver.DriverManager.assert_called_once_with('oslo.messaging.drivers',
self.expect['backend'],
diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py
index f00c901..c2d0e25 100644
--- a/oslo_messaging/transport.py
+++ b/oslo_messaging/transport.py
@@ -153,6 +153,20 @@ class Transport(object):
self._driver.cleanup()
+class RPCTransport(Transport):
+ """Transport object for RPC."""
+
+ def __init__(self, driver):
+ super(RPCTransport, self).__init__(driver)
+
+
+class NotificationTransport(Transport):
+ """Transport object for notifications."""
+
+ def __init__(self, driver):
+ super(NotificationTransport, self).__init__(driver)
+
+
class InvalidTransportURL(exceptions.MessagingException):
"""Raised if transport URL is invalid."""
@@ -171,7 +185,8 @@ class DriverLoadFailure(exceptions.MessagingException):
self.ex = ex
-def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
+def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None,
+ transport_cls=RPCTransport):
allowed_remote_exmods = allowed_remote_exmods or []
conf.register_opts(_transport_opts)
@@ -190,7 +205,7 @@ def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
except RuntimeError as ex:
raise DriverLoadFailure(url.transport, ex)
- return Transport(mgr.driver)
+ return transport_cls(mgr.driver)
@removals.remove(
@@ -229,7 +244,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
:type aliases: dict
"""
return _get_transport(conf, url,
- allowed_remote_exmods, aliases)
+ allowed_remote_exmods, aliases,
+ transport_cls=RPCTransport)
class TransportHost(object):