summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/amqpdriver.py
diff options
context:
space:
mode:
authorDmitriy Ukhlov <dukhlov@mirantis.com>2016-04-02 14:58:29 +0300
committerDmitriy Ukhlov <dukhlov@mirantis.com>2016-04-05 18:08:08 +0000
commit5d7d7253d10a4e60a1796ff4e99dd67176fb067f (patch)
tree96649e9d99badb0757d38f179124292703ea9d70 /oslo_messaging/_drivers/amqpdriver.py
parentee394d3c5b9b8a87de8783aeb9935b22a217e82d (diff)
downloadoslo-messaging-5d7d7253d10a4e60a1796ff4e99dd67176fb067f.tar.gz
Refactor driver's listener interface
Current Listener interface has poll() method which return messages To use it we need have poller thread which is located in MessageHandlerServer But my investigations of existing driver's code shows that some implemetations have its own thread inside for processing connection event loop. This event loop received messages and store in queue object. And then our poller's thread reads this queue This situation can be improved. we can remove poller's thread, remove queue object and just call on_message server's callback from connection eventloop thread This path provide posibility to do this for one of drivers and leave as is other drivers Change-Id: I3e3d4369d8fdadcecf079d10af58b1e4f5616047
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index d899713..6989e55 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -176,7 +176,7 @@ class ObsoleteReplyQueuesCache(object):
'msg_id': msg_id})
-class AMQPListener(base.Listener):
+class AMQPListener(base.PollStyleListener):
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
@@ -473,7 +473,7 @@ class AMQPDriverBase(base.BaseDriver):
return self._send(target, ctxt, message,
envelope=(version == 2.0), notify=True, retry=retry)
- def listen(self, target):
+ def listen(self, target, on_incoming_callback, batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
@@ -487,9 +487,12 @@ class AMQPDriverBase(base.BaseDriver):
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
- return listener
+ return base.PollStyleListenerAdapter(listener, on_incoming_callback,
+ batch_size, batch_timeout)
- def listen_for_notifications(self, targets_and_priorities, pool):
+ def listen_for_notifications(self, targets_and_priorities, pool,
+ on_incoming_callback, batch_size,
+ batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
@@ -498,7 +501,8 @@ class AMQPDriverBase(base.BaseDriver):
exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic, priority),
callback=listener, queue_name=pool)
- return listener
+ return base.PollStyleListenerAdapter(listener, on_incoming_callback,
+ batch_size, batch_timeout)
def cleanup(self):
if self._connection_pool: