summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/base.py')
-rw-r--r--oslo_messaging/_drivers/base.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index 607821f..9c2cb87 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -15,9 +15,12 @@
import abc
+from oslo_config import cfg
+from oslo_utils import timeutils
import six
+from six.moves import range as compat_range
+
-from oslo_config import cfg
from oslo_messaging import exceptions
base_opts = [
@@ -28,6 +31,27 @@ base_opts = [
]
+def batch_poll_helper(func):
+ """Decorator to poll messages in batch
+
+ This decorator helps driver that polls message one by one,
+ to returns a list of message.
+ """
+ def wrapper(in_self, timeout=None, prefetch_size=1):
+ incomings = []
+ watch = timeutils.StopWatch(duration=timeout)
+ with watch:
+ for __ in compat_range(prefetch_size):
+ msg = func(in_self, timeout=watch.leftover(return_none=True))
+ if msg is not None:
+ incomings.append(msg)
+ else:
+ # timeout reached or listener stopped
+ break
+ return incomings
+ return wrapper
+
+
class TransportDriverError(exceptions.MessagingException):
"""Base class for transport driver specific exceptions."""
@@ -61,8 +85,9 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
- def poll(self, timeout=None):
- """Blocking until a message is pending and return IncomingMessage.
+ def poll(self, timeout=None, prefetch_size=1):
+ """Blocking until 'prefetch_size' message is pending and return
+ [IncomingMessage].
Return None after timeout seconds if timeout is set and no message is
ending or if the listener have been stopped.
"""