summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py41
-rw-r--r--tests/test_rabbit.py60
2 files changed, 100 insertions, 1 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 4a16452..930afe7 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -24,6 +24,7 @@ from oslo import messaging
from oslo.messaging._drivers import amqp as rpc_amqp
from oslo.messaging._drivers import base
from oslo.messaging._drivers import common as rpc_common
+from oslo.messaging import _urls as urls
LOG = logging.getLogger(__name__)
@@ -245,6 +246,8 @@ class AMQPDriverBase(base.BaseDriver):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
+ self._server_params = self._parse_url(self._url)
+
self._default_exchange = default_exchange
# FIXME(markmc): temp hack
@@ -258,10 +261,46 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q_conn = None
self._waiter = None
+ @staticmethod
+ def _parse_url(url):
+ if url is None:
+ return None
+
+ parsed = urls.parse_url(url)
+
+ # Make sure there's not a query string; that could identify
+ # requirements we can't comply with (e.g., ssl), so reject it if
+ # it's present
+ if parsed['parameters']:
+ raise messaging.InvalidTransportURL(
+ url, "Cannot comply with query string in transport URL")
+
+ if not parsed['hosts']:
+ return None
+
+ sp = {
+ 'virtual_host': parsed['virtual_host'],
+ }
+
+ # FIXME(markmc): support multiple hosts
+ host = parsed['hosts'][0]
+
+ if ':' in host['host']:
+ (sp['hostname'], sp['port']) = host['host'].split(':', 1)
+ sp['port'] = int(sp['port'])
+ else:
+ sp['hostname'] = host['host']
+
+ sp['username'] = host['username']
+ sp['password'] = host['password']
+
+ return sp
+
def _get_connection(self, pooled=True):
return rpc_amqp.ConnectionContext(self.conf,
self._connection_pool,
- pooled=pooled)
+ pooled=pooled,
+ server_params=self._server_params)
def _get_reply_q(self):
with self._reply_q_lock:
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index 06d21a0..4b6b28d 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -47,6 +47,66 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
+class TestRabbitTransportURL(test_utils.BaseTestCase):
+
+ scenarios = [
+ ('none', dict(url=None, expected=None)),
+ ('empty', dict(url='rabbit:///', expected=None)),
+ ('localhost',
+ dict(url='rabbit://localhost/',
+ expected=dict(hostname='localhost',
+ username='',
+ password='',
+ virtual_host=''))),
+ ('no_creds',
+ dict(url='rabbit://host/virtual_host',
+ expected=dict(hostname='host',
+ username='',
+ password='',
+ virtual_host='virtual_host'))),
+ ('no_port',
+ dict(url='rabbit://user:password@host/virtual_host',
+ expected=dict(hostname='host',
+ username='user',
+ password='password',
+ virtual_host='virtual_host'))),
+ ('full_url',
+ dict(url='rabbit://user:password@host:10/virtual_host',
+ expected=dict(hostname='host',
+ port=10,
+ username='user',
+ password='password',
+ virtual_host='virtual_host'))),
+ ]
+
+ def setUp(self):
+ super(TestRabbitTransportURL, self).setUp()
+ self.conf.register_opts(msg_transport._transport_opts)
+ self.conf.register_opts(rabbit_driver.rabbit_opts)
+ self.config(rpc_backend='rabbit')
+ self.config(fake_rabbit=True)
+
+ def test_transport_url(self):
+ cnx_init = rabbit_driver.Connection.__init__
+ passed_params = []
+
+ def record_params(self, conf, server_params=None):
+ passed_params.append(server_params)
+ return cnx_init(self, conf, server_params)
+
+ self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
+
+ transport = messaging.get_transport(self.conf, self.url)
+
+ driver = transport._driver
+
+ target = messaging.Target(topic='testtopic')
+
+ driver.send(target, {}, {})
+
+ self.assertEquals(passed_params[0], self.expected)
+
+
class TestSendReceive(test_utils.BaseTestCase):
_n_senders = [