diff options
author | Dmitriy Ukhlov <dukhlov@mirantis.com> | 2016-04-02 14:58:29 +0300 |
---|---|---|
committer | Dmitriy Ukhlov <dukhlov@mirantis.com> | 2016-04-05 18:08:08 +0000 |
commit | 5d7d7253d10a4e60a1796ff4e99dd67176fb067f (patch) | |
tree | 96649e9d99badb0757d38f179124292703ea9d70 /oslo_messaging/_drivers/amqpdriver.py | |
parent | ee394d3c5b9b8a87de8783aeb9935b22a217e82d (diff) | |
download | oslo-messaging-5d7d7253d10a4e60a1796ff4e99dd67176fb067f.tar.gz |
Refactor driver's listener interface
Current Listener interface has poll() method which return messages
To use it we need have poller thread which is located in MessageHandlerServer
But my investigations of existing driver's code shows that some implemetations have
its own thread inside for processing connection event loop. This event loop received
messages and store in queue object. And then our poller's thread reads this queue
This situation can be improved. we can remove poller's thread, remove queue object
and just call on_message server's callback from connection eventloop thread
This path provide posibility to do this for one of drivers and leave as is other drivers
Change-Id: I3e3d4369d8fdadcecf079d10af58b1e4f5616047
Diffstat (limited to 'oslo_messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index d899713..6989e55 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -176,7 +176,7 @@ class ObsoleteReplyQueuesCache(object): 'msg_id': msg_id}) -class AMQPListener(base.Listener): +class AMQPListener(base.PollStyleListener): def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -473,7 +473,7 @@ class AMQPDriverBase(base.BaseDriver): return self._send(target, ctxt, message, envelope=(version == 2.0), notify=True, retry=retry) - def listen(self, target): + def listen(self, target, on_incoming_callback, batch_size, batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -487,9 +487,12 @@ class AMQPDriverBase(base.BaseDriver): callback=listener) conn.declare_fanout_consumer(target.topic, listener) - return listener + return base.PollStyleListenerAdapter(listener, on_incoming_callback, + batch_size, batch_timeout) - def listen_for_notifications(self, targets_and_priorities, pool): + def listen_for_notifications(self, targets_and_priorities, pool, + on_incoming_callback, batch_size, + batch_timeout): conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -498,7 +501,8 @@ class AMQPDriverBase(base.BaseDriver): exchange_name=self._get_exchange(target), topic='%s.%s' % (target.topic, priority), callback=listener, queue_name=pool) - return listener + return base.PollStyleListenerAdapter(listener, on_incoming_callback, + batch_size, batch_timeout) def cleanup(self): if self._connection_pool: |