summaryrefslogtreecommitdiff
path: root/oslo_messaging/tests/rpc
diff options
context:
space:
mode:
authordparalen <vetrisko@gmail.com>2017-08-31 16:32:53 +0200
committerdparalen <vetrisko@gmail.com>2017-08-02 08:23:34 +0200
commitd1dac1c11d357aa8391de7e62f4d003eb820948d (patch)
treebef26f11f37352f4c54f47c5d8c4ddfc1a6cca7a /oslo_messaging/tests/rpc
parentba30a3067d7db81cc864ae9e52b2761df44efddd (diff)
downloadoslo-messaging-d1dac1c11d357aa8391de7e62f4d003eb820948d.tar.gz
Class-level _exchanges in FakeExchangeManager
The FakeExchangeManager uses an instance-level storage for FakeExchanges mapping[1]. When a client--server pair is created, each keeps their own instance of FakeDriver -> FakeExchangeManager -> FakeExchange, each of which has their own (instance-level) copy of e.g _server_queues[2], making it impossible for them to communicate. This patch makes the _exchanges mapping a class-level attribute in order to keep the registered exchanges shared between all Manager instances, allowing client and server communication (within a single process). The test_server unit-tests had to be refactored to explicitly pass an exchange name when building a target. This is required for an exchange name change to have any effect during a test case run time when compared to passing the exchange name through the URL. This issue was revealed with this patch. [1] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L145,#L148 [2] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L88,#L92 Change-Id: I8dff66f4cafeb1f4c57dbfbfaba5d49e50f55fee Closes-Bug: #1714055
Diffstat (limited to 'oslo_messaging/tests/rpc')
-rw-r--r--oslo_messaging/tests/rpc/test_server.py75
1 files changed, 48 insertions, 27 deletions
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 2279324..4d5c6d5 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -35,9 +35,11 @@ load_tests = testscenarios.load_tests_apply_scenarios
class ServerSetupMixin(object):
class Server(object):
- def __init__(self, transport, topic, server, endpoint, serializer):
+ def __init__(self, transport, topic, server, endpoint, serializer,
+ exchange):
self.controller = ServerSetupMixin.ServerController()
- target = oslo_messaging.Target(topic=topic, server=server)
+ target = oslo_messaging.Target(topic=topic, server=server,
+ exchange=exchange)
self.server = oslo_messaging.get_rpc_server(transport,
target,
[endpoint,
@@ -81,25 +83,25 @@ class ServerSetupMixin(object):
def __init__(self):
self.serializer = self.TestSerializer()
- def _setup_server(self, transport, endpoint, topic=None, server=None):
+ def _setup_server(self, transport, endpoint, topic=None, server=None,
+ exchange=None):
server = self.Server(transport,
topic=topic or 'testtopic',
server=server or 'testserver',
endpoint=endpoint,
- serializer=self.serializer)
+ serializer=self.serializer,
+ exchange=exchange)
server.start()
return server
- def _stop_server(self, client, server, topic=None):
- if topic is not None:
- client = client.prepare(topic=topic)
+ def _stop_server(self, client, server, topic=None, exchange=None):
client.cast({}, 'stop')
server.wait()
- def _setup_client(self, transport, topic='testtopic'):
- return oslo_messaging.RPCClient(transport,
- oslo_messaging.Target(topic=topic),
+ def _setup_client(self, transport, topic='testtopic', exchange=None):
+ target = oslo_messaging.Target(topic=topic, exchange=exchange)
+ return oslo_messaging.RPCClient(transport, target=target,
serializer=self.serializer)
@@ -111,6 +113,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def setUp(self):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
+ # FakeExchangeManager uses a class-level exchanges mapping; "reset" it
+ # before tests assert amount of items stored
+ self.useFixture(fixtures.MonkeyPatch(
+ 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
+ new_value={}))
@mock.patch('warnings.warn')
def test_constructor(self, warn):
@@ -300,14 +307,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
def test_call(self):
- transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
+ # NOTE(milan): using a separate transport instance for each the client
+ # and the server to be able to check independent transport instances
+ # can communicate over same exchange&topic
+ transport_srv = oslo_messaging.get_rpc_transport(self.conf,
+ url='fake:')
+ transport_cli = oslo_messaging.get_rpc_transport(self.conf,
+ url='fake:')
class TestEndpoint(object):
def ping(self, ctxt, arg):
return arg
- server_thread = self._setup_server(transport, TestEndpoint())
- client = self._setup_client(transport)
+ server_thread = self._setup_server(transport_srv, TestEndpoint())
+ client = self._setup_client(transport_cli)
self.assertIsNone(client.call({}, 'ping', arg=None))
self.assertEqual(0, client.call({}, 'ping', arg=0))
@@ -498,8 +511,8 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
single_server = params['server1'] == params['server2']
return not (single_topic and single_server)
- # fanout to multiple servers on same topic and exchange
- # each endpoint will receive both messages
+ # fanout to multiple servers on same topic and exchange each endpoint
+ # will receive both messages
def fanout_to_servers(scenario):
params = scenario[1]
fanout = params['fanout1'] or params['fanout2']
@@ -536,14 +549,16 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
def setUp(self):
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
+ self.useFixture(fixtures.MonkeyPatch(
+ 'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
+ new_value={}))
def test_multiple_servers(self):
- url1 = 'fake:///' + (self.exchange1 or '')
- url2 = 'fake:///' + (self.exchange2 or '')
-
- transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
- if url1 != url2:
- transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
+ transport1 = oslo_messaging.get_rpc_transport(self.conf,
+ url='fake:')
+ if self.exchange1 != self.exchange2:
+ transport2 = oslo_messaging.get_rpc_transport(self.conf,
+ url='fake:')
else:
transport2 = transport1
@@ -563,12 +578,18 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
endpoint1 = endpoint2 = TestEndpoint()
server1 = self._setup_server(transport1, endpoint1,
- topic=self.topic1, server=self.server1)
+ topic=self.topic1,
+ exchange=self.exchange1,
+ server=self.server1)
server2 = self._setup_server(transport2, endpoint2,
- topic=self.topic2, server=self.server2)
+ topic=self.topic2,
+ exchange=self.exchange2,
+ server=self.server2)
- client1 = self._setup_client(transport1, topic=self.topic1)
- client2 = self._setup_client(transport2, topic=self.topic2)
+ client1 = self._setup_client(transport1, topic=self.topic1,
+ exchange=self.exchange1)
+ client2 = self._setup_client(transport2, topic=self.topic2,
+ exchange=self.exchange2)
client1 = client1.prepare(server=self.server1)
client2 = client2.prepare(server=self.server2)
@@ -584,9 +605,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
self._stop_server(client1.prepare(fanout=None),
- server1, topic=self.topic1)
+ server1, topic=self.topic1, exchange=self.exchange1)
self._stop_server(client2.prepare(fanout=None),
- server2, topic=self.topic2)
+ server2, topic=self.topic2, exchange=self.exchange2)
def check(pings, expect):
self.assertEqual(len(expect), len(pings))