summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark McLoughlin <markmc@redhat.com>2013-10-23 07:28:58 +0100
committerMark McLoughlin <markmc@redhat.com>2013-10-25 07:18:36 +0100
commit7914181398630cbcbc25543d72871ccf812df517 (patch)
tree02154a27fb66f460de787f92046eae76470d4560
parent9853fc6ac534a5251010e2695490d22845a17c2e (diff)
downloadoslo-messaging-7914181398630cbcbc25543d72871ccf812df517.tar.gz
Properly handle transport URL config on the client1.3.0a1
On the client side, in the rabbit and qpid drivers, we use a connection pool to avoid opening a connection for each message we send. However, there is only currently one connection pool per process: def get_connection_pool(conf, connection_cls): with _pool_create_sem: # Make sure only one thread tries to create the connection pool. if not connection_cls.pool: connection_cls.pool = ConnectionPool(conf, connection_cls) return connection_cls.pool This is a nasty artifact of the original RPC having no conectp of a transport context - everything was a global. We'll fix this soon enough. In the meantime, we need to make sure we only use this connection pool where we're not using the default transport configuration from the config file - i.e. where we supply a transport URL. The use case here is cells - we send messages to a remote cell by connecting to it using a transport URL. In our devstack testing, the two cells are on the same Rabbit broker but under different virtual hosts. Because we were always using the connection pool on the client side, we were seeing both cells always send messages to the '/' virtual host. Note - avoiding the connection pool in the case of cells is the same behaviour as the current RPC code: def cast_to_server(conf, context, server_params, topic, msg, connection_pool): ... with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: Change-Id: I2f35b45ef237bb85ab8faf58a408c03fcb1de9d7
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py8
-rw-r--r--tests/test_rabbit.py31
2 files changed, 25 insertions, 14 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 903f848..a1cfd85 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -309,10 +309,16 @@ class AMQPDriverBase(base.BaseDriver):
return sp
def _get_connection(self, pooled=True):
+ # FIXME(markmc): we don't yet have a connection pool for each
+ # Transport instance, so we'll only use the pool with the
+ # transport configuration from the config file
+ server_params = self._server_params or None
+ if server_params:
+ pooled = False
return rpc_amqp.ConnectionContext(self.conf,
self._connection_pool,
pooled=pooled,
- server_params=self._server_params)
+ server_params=server_params)
def _get_reply_q(self):
with self._reply_q_lock:
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index fc753c1..dd346fc 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -48,7 +48,7 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [
- ('none', dict(url=None, expected={})),
+ ('none', dict(url=None, expected=None)),
('empty',
dict(url='rabbit:///',
expected=dict(virtual_host=''))),
@@ -84,28 +84,33 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
+
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
- def test_transport_url(self):
+ self._server_params = []
cnx_init = rabbit_driver.Connection.__init__
- passed_params = []
-
- def record_params(self, conf, server_params=None):
- passed_params.append(server_params)
- return cnx_init(self, conf, server_params)
- self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
+ def record_params(cnx, conf, server_params=None):
+ self._server_params.append(server_params)
+ return cnx_init(cnx, conf, server_params)
- transport = messaging.get_transport(self.conf, self.url)
+ def dummy_send(cnx, topic, msg, timeout=None):
+ pass
- driver = transport._driver
+ self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
+ self.stubs.Set(rabbit_driver.Connection, 'topic_send', dummy_send)
- target = messaging.Target(topic='testtopic')
+ self._driver = messaging.get_transport(self.conf, self.url)._driver
+ self._target = messaging.Target(topic='testtopic')
- driver.listen(target)
+ def test_transport_url_listen(self):
+ self._driver.listen(self._target)
+ self.assertEqual(self._server_params[0], self.expected)
- self.assertEqual(passed_params[0], self.expected)
+ def test_transport_url_send(self):
+ self._driver.send(self._target, {}, {})
+ self.assertEqual(self._server_params[0], self.expected)
class TestSendReceive(test_utils.BaseTestCase):