summaryrefslogtreecommitdiff
path: root/oslo
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-06-20 18:50:16 +0000
committerGerrit Code Review <review@openstack.org>2014-06-20 18:50:16 +0000
commita64692689a5beef6a70dfe0afa2eba3ddced2a84 (patch)
treec42144bfd5b37a38374f0715650e41efef4ad041 /oslo
parent830b00d11c4198bfee44c21ad54d6505ca7778bd (diff)
parent7fe2ef7334ce14c2255683c28b01e7d70a4e761f (diff)
downloadoslo-messaging-a64692689a5beef6a70dfe0afa2eba3ddced2a84.tar.gz
Merge "Add an optional timeout parameter to Listener.poll"
Diffstat (limited to 'oslo')
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py15
-rw-r--r--oslo/messaging/_drivers/base.py7
-rw-r--r--oslo/messaging/_drivers/impl_fake.py16
-rw-r--r--oslo/messaging/_drivers/impl_zmq.py9
4 files changed, 38 insertions, 9 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index b9b7a94..033d84b 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
import logging
import threading
+import time
import uuid
from six import moves
@@ -103,11 +104,21 @@ class AMQPListener(base.Listener):
ctxt.msg_id,
ctxt.reply_q))
- def poll(self):
+ def poll(self, timeout=None):
+ if timeout is not None:
+ deadline = time.time() + timeout
+ else:
+ deadline = None
while True:
if self.incoming:
return self.incoming.pop(0)
- self.conn.consume(limit=1)
+ if deadline is not None:
+ timeout = deadline - time.time()
+ if timeout < 0:
+ return None
+ self.conn.consume(limit=1, timeout=timeout)
+ else:
+ self.conn.consume(limit=1)
class ReplyWaiters(object):
diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py
index 82b3641..a35f776 100644
--- a/oslo/messaging/_drivers/base.py
+++ b/oslo/messaging/_drivers/base.py
@@ -53,8 +53,11 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
- def poll(self):
- "Blocking until a message is pending and return IncomingMessage."
+ def poll(self, timeout=None):
+ """Blocking until a message is pending and return IncomingMessage.
+ Return None after timeout seconds if timeout is set and no message is
+ ending.
+ """
@six.add_metaclass(abc.ABCMeta)
diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py
index ffc0b79..4f974e9 100644
--- a/oslo/messaging/_drivers/impl_fake.py
+++ b/oslo/messaging/_drivers/impl_fake.py
@@ -45,7 +45,11 @@ class FakeListener(base.Listener):
self._exchange_manager = exchange_manager
self._targets = targets
- def poll(self):
+ def poll(self, timeout=None):
+ if timeout is not None:
+ deadline = time.time() + timeout
+ else:
+ deadline = None
while True:
for target in self._targets:
exchange = self._exchange_manager.get_exchange(target.exchange)
@@ -54,7 +58,15 @@ class FakeListener(base.Listener):
message = FakeIncomingMessage(self, ctxt, message,
reply_q, requeue)
return message
- time.sleep(.05)
+ if deadline is not None:
+ pause = deadline - time.time()
+ if pause < 0:
+ break
+ pause = min(pause, 0.050)
+ else:
+ pause = 0.050
+ time.sleep(pause)
+ return None
class FakeExchange(object):
diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py
index 804fed0..e2a9613 100644
--- a/oslo/messaging/_drivers/impl_zmq.py
+++ b/oslo/messaging/_drivers/impl_zmq.py
@@ -879,9 +879,12 @@ class ZmqListener(base.Listener):
else:
return incoming.received.reply
- def poll(self):
- while True:
- return self.incoming_queue.get()
+ def poll(self, timeout=None):
+ try:
+ return self.incoming_queue.get(timeout=timeout)
+ except six.moves.queue.Empty:
+ # timeout
+ return None
class ZmqDriver(base.BaseDriver):