summaryrefslogtreecommitdiff
path: root/oslo_messaging/notify
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-04-22 14:01:08 +0000
committerGerrit Code Review <review@openstack.org>2016-04-22 14:01:08 +0000
commitf22a1a4fecb35759f8bee19ba73b5095edf2f77c (patch)
tree6cc1d2eb2e1c35d1ac51d1dd96d4a877477c59e8 /oslo_messaging/notify
parent715b5b1c3f4e1d8c8fe7f0612637b7c8f498eb6c (diff)
parent6db00c77b054dc6584a68b0c81c13e309fc6c914 (diff)
downloadoslo-messaging-f22a1a4fecb35759f8bee19ba73b5095edf2f77c.tar.gz
Merge "Refactor base interfaces"
Diffstat (limited to 'oslo_messaging/notify')
-rw-r--r--oslo_messaging/notify/listener.py51
1 files changed, 24 insertions, 27 deletions
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py
index 89f42f3..4026e9c 100644
--- a/oslo_messaging/notify/listener.py
+++ b/oslo_messaging/notify/listener.py
@@ -113,11 +113,12 @@ from oslo_messaging import server as msg_server
LOG = logging.getLogger(__name__)
-class NotificationServer(msg_server.MessageHandlingServer):
+class NotificationServerBase(msg_server.MessageHandlingServer):
def __init__(self, transport, targets, dispatcher, executor='blocking',
- allow_requeue=True, pool=None):
- super(NotificationServer, self).__init__(transport, dispatcher,
- executor)
+ allow_requeue=True, pool=None, batch_size=1,
+ batch_timeout=None):
+ super(NotificationServerBase, self).__init__(transport, dispatcher,
+ executor)
self._allow_requeue = allow_requeue
self._pool = pool
self.targets = targets
@@ -126,46 +127,42 @@ class NotificationServer(msg_server.MessageHandlingServer):
self.dispatcher.supported_priorities)
)
+ self._batch_size = batch_size
+ self._batch_timeout = batch_timeout
+
def _create_listener(self):
return self.transport._listen_for_notifications(
- self._targets_priorities, self._pool,
- lambda incoming: self._on_incoming(incoming[0]), 1, None
+ self._targets_priorities, self._pool, self._batch_size,
+ self._batch_timeout
+ )
+
+
+class NotificationServer(NotificationServerBase):
+ def __init__(self, transport, targets, dispatcher, executor='blocking',
+ allow_requeue=True, pool=None):
+ super(NotificationServer, self).__init__(
+ transport, targets, dispatcher, executor, allow_requeue, pool, 1,
+ None
)
def _process_incoming(self, incoming):
- res = notify_dispatcher.NotificationResult.REQUEUE
+ message = incoming[0]
try:
- res = self.dispatcher.dispatch(incoming)
+ res = self.dispatcher.dispatch(message)
except Exception:
LOG.error(_LE('Exception during message handling'), exc_info=True)
try:
if (res == notify_dispatcher.NotificationResult.REQUEUE and
self._allow_requeue):
- incoming.requeue()
+ message.requeue()
else:
- incoming.acknowledge()
+ message.acknowledge()
except Exception:
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
-class BatchNotificationServer(NotificationServer):
- def __init__(self, transport, targets, dispatcher, executor='blocking',
- allow_requeue=True, pool=None, batch_size=1,
- batch_timeout=None):
- super(BatchNotificationServer, self).__init__(
- transport=transport, targets=targets, dispatcher=dispatcher,
- executor=executor, allow_requeue=allow_requeue, pool=pool
- )
-
- self._batch_size = batch_size
- self._batch_timeout = batch_timeout
-
- def _create_listener(self):
- return self.transport._listen_for_notifications(
- self._targets_priorities, self._pool, self._on_incoming,
- self._batch_size, self._batch_timeout,
- )
+class BatchNotificationServer(NotificationServerBase):
def _process_incoming(self, incoming):
try: