summaryrefslogtreecommitdiff
path: root/openstack/common/rpc/impl_qpid.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/rpc/impl_qpid.py')
-rw-r--r--openstack/common/rpc/impl_qpid.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 97ac7c7a..1b81cd9f 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
- def __init__(self, conf, session, topic, callback, name=None):
+ def __init__(self, conf, session, topic, callback, name=None,
+ exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
@@ -180,7 +181,7 @@ class TopicConsumer(ConsumerBase):
:param name: optional queue name, defaults to topic
"""
- exchange_name = rpc_amqp.get_control_exchange(conf)
+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
@@ -464,10 +465,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
- def declare_topic_consumer(self, topic, callback=None, queue_name=None):
+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+ exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
+ exchange_name=exchange_name,
),
topic, callback)