summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/base.py
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-12-11 04:16:57 +0000
committerGerrit Code Review <review@openstack.org>2015-12-11 04:16:57 +0000
commit213176657d55a65ecd2dfebc7320651f5442761c (patch)
tree6eda9c30c6bbbf11a175fc7509452a4044b61377 /oslo_messaging/_drivers/base.py
parent4b6144a3db4b2d10b0b4fa7e437ed264e7460df1 (diff)
parent4dd644ac201ee0fe247d648a2f735998416bf2c7 (diff)
downloadoslo-messaging-213176657d55a65ecd2dfebc7320651f5442761c.tar.gz
Merge "batch notification listener"
Diffstat (limited to 'oslo_messaging/_drivers/base.py')
-rw-r--r--oslo_messaging/_drivers/base.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index 607821f..9c2cb87 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -15,9 +15,12 @@
import abc
+from oslo_config import cfg
+from oslo_utils import timeutils
import six
+from six.moves import range as compat_range
+
-from oslo_config import cfg
from oslo_messaging import exceptions
base_opts = [
@@ -28,6 +31,27 @@ base_opts = [
]
+def batch_poll_helper(func):
+ """Decorator to poll messages in batch
+
+ This decorator helps driver that polls message one by one,
+ to returns a list of message.
+ """
+ def wrapper(in_self, timeout=None, prefetch_size=1):
+ incomings = []
+ watch = timeutils.StopWatch(duration=timeout)
+ with watch:
+ for __ in compat_range(prefetch_size):
+ msg = func(in_self, timeout=watch.leftover(return_none=True))
+ if msg is not None:
+ incomings.append(msg)
+ else:
+ # timeout reached or listener stopped
+ break
+ return incomings
+ return wrapper
+
+
class TransportDriverError(exceptions.MessagingException):
"""Base class for transport driver specific exceptions."""
@@ -61,8 +85,9 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
- def poll(self, timeout=None):
- """Blocking until a message is pending and return IncomingMessage.
+ def poll(self, timeout=None, prefetch_size=1):
+ """Blocking until 'prefetch_size' message is pending and return
+ [IncomingMessage].
Return None after timeout seconds if timeout is set and no message is
ending or if the listener have been stopped.
"""