diff options
Diffstat (limited to 'oslo_messaging/_drivers/base.py')
-rw-r--r-- | oslo_messaging/_drivers/base.py | 31 |
1 files changed, 28 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 607821f..9c2cb87 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -15,9 +15,12 @@ import abc +from oslo_config import cfg +from oslo_utils import timeutils import six +from six.moves import range as compat_range + -from oslo_config import cfg from oslo_messaging import exceptions base_opts = [ @@ -28,6 +31,27 @@ base_opts = [ ] +def batch_poll_helper(func): + """Decorator to poll messages in batch + + This decorator helps driver that polls message one by one, + to returns a list of message. + """ + def wrapper(in_self, timeout=None, prefetch_size=1): + incomings = [] + watch = timeutils.StopWatch(duration=timeout) + with watch: + for __ in compat_range(prefetch_size): + msg = func(in_self, timeout=watch.leftover(return_none=True)) + if msg is not None: + incomings.append(msg) + else: + # timeout reached or listener stopped + break + return incomings + return wrapper + + class TransportDriverError(exceptions.MessagingException): """Base class for transport driver specific exceptions.""" @@ -61,8 +85,9 @@ class Listener(object): self.driver = driver @abc.abstractmethod - def poll(self, timeout=None): - """Blocking until a message is pending and return IncomingMessage. + def poll(self, timeout=None, prefetch_size=1): + """Blocking until 'prefetch_size' message is pending and return + [IncomingMessage]. Return None after timeout seconds if timeout is set and no message is ending or if the listener have been stopped. """ |