diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-06-20 18:50:16 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-06-20 18:50:16 +0000 |
commit | a64692689a5beef6a70dfe0afa2eba3ddced2a84 (patch) | |
tree | c42144bfd5b37a38374f0715650e41efef4ad041 /oslo | |
parent | 830b00d11c4198bfee44c21ad54d6505ca7778bd (diff) | |
parent | 7fe2ef7334ce14c2255683c28b01e7d70a4e761f (diff) | |
download | oslo-messaging-a64692689a5beef6a70dfe0afa2eba3ddced2a84.tar.gz |
Merge "Add an optional timeout parameter to Listener.poll"
Diffstat (limited to 'oslo')
-rw-r--r-- | oslo/messaging/_drivers/amqpdriver.py | 15 | ||||
-rw-r--r-- | oslo/messaging/_drivers/base.py | 7 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_fake.py | 16 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_zmq.py | 9 |
4 files changed, 38 insertions, 9 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index b9b7a94..033d84b 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase'] import logging import threading +import time import uuid from six import moves @@ -103,11 +104,21 @@ class AMQPListener(base.Listener): ctxt.msg_id, ctxt.reply_q)) - def poll(self): + def poll(self, timeout=None): + if timeout is not None: + deadline = time.time() + timeout + else: + deadline = None while True: if self.incoming: return self.incoming.pop(0) - self.conn.consume(limit=1) + if deadline is not None: + timeout = deadline - time.time() + if timeout < 0: + return None + self.conn.consume(limit=1, timeout=timeout) + else: + self.conn.consume(limit=1) class ReplyWaiters(object): diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 82b3641..a35f776 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -53,8 +53,11 @@ class Listener(object): self.driver = driver @abc.abstractmethod - def poll(self): - "Blocking until a message is pending and return IncomingMessage." + def poll(self, timeout=None): + """Blocking until a message is pending and return IncomingMessage. + Return None after timeout seconds if timeout is set and no message is + ending. + """ @six.add_metaclass(abc.ABCMeta) diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index ffc0b79..4f974e9 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -45,7 +45,11 @@ class FakeListener(base.Listener): self._exchange_manager = exchange_manager self._targets = targets - def poll(self): + def poll(self, timeout=None): + if timeout is not None: + deadline = time.time() + timeout + else: + deadline = None while True: for target in self._targets: exchange = self._exchange_manager.get_exchange(target.exchange) @@ -54,7 +58,15 @@ class FakeListener(base.Listener): message = FakeIncomingMessage(self, ctxt, message, reply_q, requeue) return message - time.sleep(.05) + if deadline is not None: + pause = deadline - time.time() + if pause < 0: + break + pause = min(pause, 0.050) + else: + pause = 0.050 + time.sleep(pause) + return None class FakeExchange(object): diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 804fed0..e2a9613 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -879,9 +879,12 @@ class ZmqListener(base.Listener): else: return incoming.received.reply - def poll(self): - while True: - return self.incoming_queue.get() + def poll(self, timeout=None): + try: + return self.incoming_queue.get(timeout=timeout) + except six.moves.queue.Empty: + # timeout + return None class ZmqDriver(base.BaseDriver): |