summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-05-17 10:09:26 +0000
committerGerrit Code Review <review@openstack.org>2017-05-17 10:09:26 +0000
commit6feb4a6f5463c86fd8078449206dfdbde4b06a32 (patch)
tree4387e74fe7a3329577661abd88bed2a53c12002b
parentdb01a0eec00c9c8f65a0069d10d384fb09aaaa95 (diff)
parentec4d6639bc7556afaf439d261b716818b02a1605 (diff)
downloadoslo-messaging-6feb4a6f5463c86fd8078449206dfdbde4b06a32.tar.gz
Merge "Add get_rpc_transport call"
-rw-r--r--doc/source/transport.rst2
-rw-r--r--oslo_messaging/notify/notifier.py4
-rw-r--r--oslo_messaging/rpc/__init__.py2
-rw-r--r--oslo_messaging/rpc/client.py14
-rw-r--r--oslo_messaging/rpc/server.py8
-rw-r--r--oslo_messaging/rpc/transport.py47
-rw-r--r--oslo_messaging/tests/notify/test_logger.py4
-rw-r--r--oslo_messaging/tests/rpc/test_server.py32
-rw-r--r--oslo_messaging/transport.py75
-rw-r--r--releasenotes/notes/get_rpc_transport-4aa3511ad9754a60.yaml10
10 files changed, 144 insertions, 54 deletions
diff --git a/doc/source/transport.rst b/doc/source/transport.rst
index 3449e9b..7dfca97 100644
--- a/doc/source/transport.rst
+++ b/doc/source/transport.rst
@@ -4,8 +4,6 @@ Transport
.. currentmodule:: oslo_messaging
-.. autofunction:: get_transport
-
.. autoclass:: Transport
.. autoclass:: TransportURL
diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py
index 8af1142..a544c7a 100644
--- a/oslo_messaging/notify/notifier.py
+++ b/oslo_messaging/notify/notifier.py
@@ -171,8 +171,8 @@ def get_notification_transport(conf, url=None,
group='oslo_messaging_notifications')
if url is None:
url = conf.oslo_messaging_notifications.transport_url
- return msg_transport.get_transport(conf, url,
- allowed_remote_exmods, aliases)
+ return msg_transport._get_transport(conf, url,
+ allowed_remote_exmods, aliases)
class Notifier(object):
diff --git a/oslo_messaging/rpc/__init__.py b/oslo_messaging/rpc/__init__.py
index 7a6b235..9a320a8 100644
--- a/oslo_messaging/rpc/__init__.py
+++ b/oslo_messaging/rpc/__init__.py
@@ -28,10 +28,12 @@ __all__ = [
'RemoteError',
'UnsupportedVersion',
'expected_exceptions',
+ 'get_rpc_transport',
'get_rpc_server',
'expose'
]
from .client import *
from .dispatcher import *
+from .transport import *
from .server import *
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index b746fbe..2a27a9f 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -282,7 +282,7 @@ class RPCClient(_BaseCallContext):
However, this class can be used directly without wrapping it another class.
For example::
- transport = messaging.get_transport(cfg.CONF)
+ transport = messaging.get_rpc_transport(cfg.CONF)
target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
@@ -440,12 +440,12 @@ class RPCClient(_BaseCallContext):
method are handled are quite subtle.
Firstly, if the remote exception is contained in one of the modules
- listed in the allow_remote_exmods messaging.get_transport() parameter,
- then it this exception will be re-raised by call(). However, such
- locally re-raised remote exceptions are distinguishable from the same
- exception type raised locally because re-raised remote exceptions are
- modified such that their class name ends with the '_Remote' suffix so
- you may do::
+ listed in the allow_remote_exmods messaging.get_rpc_transport()
+ parameter, then it this exception will be re-raised by call(). However,
+ such locally re-raised remote exceptions are distinguishable from the
+ same exception type raised locally because re-raised remote exceptions
+ are modified such that their class name ends with the '_Remote' suffix
+ so you may do::
if ex.__class__.__name__.endswith('_Remote'):
# Some special case for locally re-raised remote exceptions
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index 8ee9426..195eb10 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -20,12 +20,12 @@ methods which may be invoked remotely by clients over a given transport.
To create an RPC server, you supply a transport, target and a list of
endpoints.
-A transport can be obtained simply by calling the get_transport() method::
+A transport can be obtained simply by calling the get_rpc_transport() method::
- transport = messaging.get_transport(conf)
+ transport = messaging.get_rpc_transport(conf)
which will load the appropriate transport driver according to the user's
-messaging configuration. See get_transport() for more details.
+messaging configuration. See get_rpc_transport() for more details.
The target supplied when creating an RPC server expresses the topic, server
name and - optionally - the exchange to listen on. See Target for more details
@@ -98,7 +98,7 @@ A simple example of an RPC server with multiple endpoints might be::
def test(self, ctx, arg):
return arg
- transport = oslo_messaging.get_transport(cfg.CONF)
+ transport = oslo_messaging.get_rpc_transport(cfg.CONF)
target = oslo_messaging.Target(topic='test', server='server1')
endpoints = [
ServerControlEndpoint(None),
diff --git a/oslo_messaging/rpc/transport.py b/oslo_messaging/rpc/transport.py
new file mode 100644
index 0000000..06a7c38
--- /dev/null
+++ b/oslo_messaging/rpc/transport.py
@@ -0,0 +1,47 @@
+# Copyright 2017 OpenStack Foundation.
+# All Rights Reserved.
+# Copyright 2017 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+__all__ = [
+ 'get_rpc_transport'
+]
+
+from oslo_messaging import transport as msg_transport
+
+
+def get_rpc_transport(conf, url=None,
+ allowed_remote_exmods=None):
+ """A factory method for Transport objects for RPCs.
+
+ This method should be used to ensure the correct messaging functionality
+ for RPCs. RPCs and Notifications may use separate messaging systems
+ that utilize different drivers, different access permissions,
+ message delivery, etc.
+
+ Presently, this function works exactly the same as get_transport. It's
+ use is recommended as disambiguates the intended use for the transport
+ and may in the future extend functionality related to the separation of
+ messaging backends.
+
+ :param conf: the user configuration
+ :type conf: cfg.ConfigOpts
+ :param url: a transport URL
+ :type url: str or TransportURL
+ :param allowed_remote_exmods: a list of modules which a client using this
+ transport will deserialize remote exceptions
+ from
+ :type allowed_remote_exmods: list
+ """
+ return msg_transport._get_transport(conf, url,
+ allowed_remote_exmods)
diff --git a/oslo_messaging/tests/notify/test_logger.py b/oslo_messaging/tests/notify/test_logger.py
index ede3fdc..e7cd868 100644
--- a/oslo_messaging/tests/notify/test_logger.py
+++ b/oslo_messaging/tests/notify/test_logger.py
@@ -58,7 +58,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logger(self, mock_utcnow):
- with mock.patch('oslo_messaging.transport.get_transport',
+ with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
self.logger = oslo_messaging.LoggingNotificationHandler('test://')
@@ -102,7 +102,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_logging_conf(self, mock_utcnow):
- with mock.patch('oslo_messaging.transport.get_transport',
+ with mock.patch('oslo_messaging.transport._get_transport',
return_value=test_notifier._FakeTransport(self.conf)):
logging.config.dictConfig({
'version': 1,
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index f44cd8b..3d32dde 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -113,7 +113,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
def test_constructor(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
@@ -135,7 +135,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual('blocking', server.executor_type)
def test_constructor_without_explicit_RPCAccessPolicy(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
@@ -148,7 +148,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(FutureWarning, w.category)
def test_server_wait_method(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
@@ -180,7 +180,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
server = oslo_messaging.get_rpc_server(
transport,
@@ -195,7 +195,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False)
def test_no_server_topic(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(server='testserver')
server = oslo_messaging.get_rpc_server(transport, target, [])
try:
@@ -207,7 +207,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False)
def _test_no_client_topic(self, call=True):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
client = self._setup_client(transport, topic=None)
@@ -228,7 +228,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._test_no_client_topic(call=False)
def test_client_call_timeout(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
finished = False
wait = threading.Condition()
@@ -256,7 +256,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_unknown_executor(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
try:
oslo_messaging.get_rpc_server(transport, None, [], executor='foo')
@@ -267,7 +267,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertTrue(False)
def test_cast(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def __init__(self):
@@ -288,7 +288,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
def test_call(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
@@ -307,7 +307,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_direct_call(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
@@ -327,7 +327,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_context(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ctxt_check(self, ctxt, key):
@@ -344,7 +344,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_failure(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
@@ -384,7 +384,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self._stop_server(client, server_thread)
def test_expected_failure(self):
- transport = oslo_messaging.get_transport(self.conf, url='fake:')
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
debugs = []
errors = []
@@ -529,9 +529,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
url1 = 'fake:///' + (self.exchange1 or '')
url2 = 'fake:///' + (self.exchange2 or '')
- transport1 = oslo_messaging.get_transport(self.conf, url=url1)
+ transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
if url1 != url2:
- transport2 = oslo_messaging.get_transport(self.conf, url=url1)
+ transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
else:
transport2 = transport1
diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py
index c42ffa7..040cea7 100644
--- a/oslo_messaging/transport.py
+++ b/oslo_messaging/transport.py
@@ -77,8 +77,33 @@ class Transport(object):
This is a mostly opaque handle for an underlying messaging transport
driver.
- It has a single 'conf' property which is the cfg.ConfigOpts instance used
- to construct the transport object.
+ RPCs and Notifications may use separate messaging systems that utilize
+ different drivers, access permissions, message delivery, etc. To ensure
+ the correct messaging functionality, the corresponding method should be
+ used to construct a Transport object from transport configuration
+ gleaned from the user's configuration and, optionally, a transport URL.
+
+ The factory method for RPC Transport objects::
+
+ def get_rpc_transport(conf, url=None,
+ allowed_remote_exmods=None)
+
+ If a transport URL is supplied as a parameter, any transport configuration
+ contained in it takes precedence. If no transport URL is supplied, but
+ there is a transport URL supplied in the user's configuration then that
+ URL will take the place of the URL parameter.
+
+ The factory method for Notification Transport objects::
+
+ def get_notification_transport(conf, url=None,
+ allowed_remote_exmods=None)
+
+ If no transport URL is provided, the URL in the notifications section of
+ the config file will be used. If that URL is also absent, the same
+ transport as specified in the user's default section will be used.
+
+ The Transport has a single 'conf' property which is the cfg.ConfigOpts
+ instance used to construct the transport object.
"""
def __init__(self, driver):
@@ -146,6 +171,31 @@ class DriverLoadFailure(exceptions.MessagingException):
self.ex = ex
+def _get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
+ allowed_remote_exmods = allowed_remote_exmods or []
+ conf.register_opts(_transport_opts)
+
+ if not isinstance(url, TransportURL):
+ url = TransportURL.parse(conf, url, aliases)
+
+ kwargs = dict(default_exchange=conf.control_exchange,
+ allowed_remote_exmods=allowed_remote_exmods)
+
+ try:
+ mgr = driver.DriverManager('oslo.messaging.drivers',
+ url.transport.split('+')[0],
+ invoke_on_load=True,
+ invoke_args=[conf, url],
+ invoke_kwds=kwargs)
+ except RuntimeError as ex:
+ raise DriverLoadFailure(url.transport, ex)
+
+ return Transport(mgr.driver)
+
+
+@removals.remove(
+ message='use get_rpc_transport or get_notification_transport'
+)
@removals.removed_kwarg('aliases',
'Parameter aliases is deprecated for removal.')
def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
@@ -178,25 +228,8 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
:param aliases: DEPRECATED: A map of transport alias to transport name
:type aliases: dict
"""
- allowed_remote_exmods = allowed_remote_exmods or []
- conf.register_opts(_transport_opts)
-
- if not isinstance(url, TransportURL):
- url = TransportURL.parse(conf, url, aliases)
-
- kwargs = dict(default_exchange=conf.control_exchange,
- allowed_remote_exmods=allowed_remote_exmods)
-
- try:
- mgr = driver.DriverManager('oslo.messaging.drivers',
- url.transport.split('+')[0],
- invoke_on_load=True,
- invoke_args=[conf, url],
- invoke_kwds=kwargs)
- except RuntimeError as ex:
- raise DriverLoadFailure(url.transport, ex)
-
- return Transport(mgr.driver)
+ return _get_transport(conf, url,
+ allowed_remote_exmods, aliases)
class TransportHost(object):
diff --git a/releasenotes/notes/get_rpc_transport-4aa3511ad9754a60.yaml b/releasenotes/notes/get_rpc_transport-4aa3511ad9754a60.yaml
new file mode 100644
index 0000000..416ffac
--- /dev/null
+++ b/releasenotes/notes/get_rpc_transport-4aa3511ad9754a60.yaml
@@ -0,0 +1,10 @@
+---
+features:
+ - |
+ Add get_rpc_transport call to make the API clear for the separation
+ of RPC and Notification messaging backends.
+deprecations:
+ - |
+ Deprecate get_transport and use get_rpc_transport or
+ get_notification_transport to make the API usage clear for the
+ separation of RPC and Notification messaging backends.