diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-04-22 14:01:08 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-04-22 14:01:08 +0000 |
commit | f22a1a4fecb35759f8bee19ba73b5095edf2f77c (patch) | |
tree | 6cc1d2eb2e1c35d1ac51d1dd96d4a877477c59e8 /oslo_messaging/notify | |
parent | 715b5b1c3f4e1d8c8fe7f0612637b7c8f498eb6c (diff) | |
parent | 6db00c77b054dc6584a68b0c81c13e309fc6c914 (diff) | |
download | oslo-messaging-f22a1a4fecb35759f8bee19ba73b5095edf2f77c.tar.gz |
Merge "Refactor base interfaces"
Diffstat (limited to 'oslo_messaging/notify')
-rw-r--r-- | oslo_messaging/notify/listener.py | 51 |
1 files changed, 24 insertions, 27 deletions
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py index 89f42f3..4026e9c 100644 --- a/oslo_messaging/notify/listener.py +++ b/oslo_messaging/notify/listener.py @@ -113,11 +113,12 @@ from oslo_messaging import server as msg_server LOG = logging.getLogger(__name__) -class NotificationServer(msg_server.MessageHandlingServer): +class NotificationServerBase(msg_server.MessageHandlingServer): def __init__(self, transport, targets, dispatcher, executor='blocking', - allow_requeue=True, pool=None): - super(NotificationServer, self).__init__(transport, dispatcher, - executor) + allow_requeue=True, pool=None, batch_size=1, + batch_timeout=None): + super(NotificationServerBase, self).__init__(transport, dispatcher, + executor) self._allow_requeue = allow_requeue self._pool = pool self.targets = targets @@ -126,46 +127,42 @@ class NotificationServer(msg_server.MessageHandlingServer): self.dispatcher.supported_priorities) ) + self._batch_size = batch_size + self._batch_timeout = batch_timeout + def _create_listener(self): return self.transport._listen_for_notifications( - self._targets_priorities, self._pool, - lambda incoming: self._on_incoming(incoming[0]), 1, None + self._targets_priorities, self._pool, self._batch_size, + self._batch_timeout + ) + + +class NotificationServer(NotificationServerBase): + def __init__(self, transport, targets, dispatcher, executor='blocking', + allow_requeue=True, pool=None): + super(NotificationServer, self).__init__( + transport, targets, dispatcher, executor, allow_requeue, pool, 1, + None ) def _process_incoming(self, incoming): - res = notify_dispatcher.NotificationResult.REQUEUE + message = incoming[0] try: - res = self.dispatcher.dispatch(incoming) + res = self.dispatcher.dispatch(message) except Exception: LOG.error(_LE('Exception during message handling'), exc_info=True) try: if (res == notify_dispatcher.NotificationResult.REQUEUE and self._allow_requeue): - incoming.requeue() + message.requeue() else: - incoming.acknowledge() + message.acknowledge() except Exception: LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) -class BatchNotificationServer(NotificationServer): - def __init__(self, transport, targets, dispatcher, executor='blocking', - allow_requeue=True, pool=None, batch_size=1, - batch_timeout=None): - super(BatchNotificationServer, self).__init__( - transport=transport, targets=targets, dispatcher=dispatcher, - executor=executor, allow_requeue=allow_requeue, pool=pool - ) - - self._batch_size = batch_size - self._batch_timeout = batch_timeout - - def _create_listener(self): - return self.transport._listen_for_notifications( - self._targets_priorities, self._pool, self._on_incoming, - self._batch_size, self._batch_timeout, - ) +class BatchNotificationServer(NotificationServerBase): def _process_incoming(self, incoming): try: |