summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/base.py
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2015-08-02 10:26:02 +0200
committerMehdi Abaakouk <sileht@redhat.com>2015-12-08 09:14:20 +0100
commit4dd644ac201ee0fe247d648a2f735998416bf2c7 (patch)
treeea234cea1d4f1cc6c834f1caec64459a3596081f /oslo_messaging/_drivers/base.py
parentbdf287e847024368e20f5f806380e97070c9561c (diff)
downloadoslo-messaging-4dd644ac201ee0fe247d648a2f735998416bf2c7.tar.gz
batch notification listener
Gnocchi performs better if measurements are write in batch When Ceilometer is used with Gnocchi, this is not possible. This change introduce a new notification listener that allows that. On the driver side, a default batch implementation is provided. It's just call the legacy poll method many times. Driver can override it to provide a better implementation. For example, kafka handles batch natively and take benefit of this. Change-Id: I16184da24b8661aff7f4fba6196ecf33165f1a77
Diffstat (limited to 'oslo_messaging/_drivers/base.py')
-rw-r--r--oslo_messaging/_drivers/base.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index 607821f..9c2cb87 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -15,9 +15,12 @@
import abc
+from oslo_config import cfg
+from oslo_utils import timeutils
import six
+from six.moves import range as compat_range
+
-from oslo_config import cfg
from oslo_messaging import exceptions
base_opts = [
@@ -28,6 +31,27 @@ base_opts = [
]
+def batch_poll_helper(func):
+ """Decorator to poll messages in batch
+
+ This decorator helps driver that polls message one by one,
+ to returns a list of message.
+ """
+ def wrapper(in_self, timeout=None, prefetch_size=1):
+ incomings = []
+ watch = timeutils.StopWatch(duration=timeout)
+ with watch:
+ for __ in compat_range(prefetch_size):
+ msg = func(in_self, timeout=watch.leftover(return_none=True))
+ if msg is not None:
+ incomings.append(msg)
+ else:
+ # timeout reached or listener stopped
+ break
+ return incomings
+ return wrapper
+
+
class TransportDriverError(exceptions.MessagingException):
"""Base class for transport driver specific exceptions."""
@@ -61,8 +85,9 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
- def poll(self, timeout=None):
- """Blocking until a message is pending and return IncomingMessage.
+ def poll(self, timeout=None, prefetch_size=1):
+ """Blocking until 'prefetch_size' message is pending and return
+ [IncomingMessage].
Return None after timeout seconds if timeout is set and no message is
ending or if the listener have been stopped.
"""