diff options
author | Mehdi Abaakouk <mehdi.abaakouk@enovance.com> | 2014-05-06 13:47:12 +0200 |
---|---|---|
committer | Mehdi Abaakouk <mehdi.abaakouk@enovance.com> | 2014-06-18 18:41:33 +0200 |
commit | 1ea9c35ab4d7446cc819490b33be802e5b2886ea (patch) | |
tree | 0db655192c642ec34516bc64866ac6ec774da250 /oslo | |
parent | e349c5e6f2d3e6909856a493b7b05827d7eec7c8 (diff) | |
download | oslo-messaging-1ea9c35ab4d7446cc819490b33be802e5b2886ea.tar.gz |
Transport reconnection retries for notification
This patch add support of reconnection retries for the
messaging notifier.
Related bug #1282639
Change-Id: Ia30331f8306ff0f6952d83ef42ff8bee6b900427
Diffstat (limited to 'oslo')
-rw-r--r-- | oslo/messaging/_drivers/amqpdriver.py | 4 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_fake.py | 4 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_qpid.py | 4 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_rabbit.py | 4 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_zmq.py | 4 | ||||
-rw-r--r-- | oslo/messaging/notify/_impl_log.py | 2 | ||||
-rw-r--r-- | oslo/messaging/notify/_impl_messaging.py | 5 | ||||
-rw-r--r-- | oslo/messaging/notify/_impl_noop.py | 2 | ||||
-rw-r--r-- | oslo/messaging/notify/_impl_routing.py | 13 | ||||
-rw-r--r-- | oslo/messaging/notify/_impl_test.py | 4 | ||||
-rw-r--r-- | oslo/messaging/notify/notifier.py | 42 | ||||
-rw-r--r-- | oslo/messaging/transport.py | 5 |
12 files changed, 62 insertions, 31 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 6d0b583..820c3f7 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -389,9 +389,9 @@ class AMQPDriverBase(base.BaseDriver): return self._send(target, ctxt, message, wait_for_reply, timeout, retry=retry) - def send_notification(self, target, ctxt, message, version): + def send_notification(self, target, ctxt, message, version, retry=None): return self._send(target, ctxt, message, - envelope=(version == 2.0), notify=True) + envelope=(version == 2.0), notify=True, retry=retry) def listen(self, target): conn = self._get_connection(pooled=False) diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index b99c1b3..ffc0b79 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -168,7 +168,9 @@ class FakeDriver(base.BaseDriver): # transport always works return self._send(target, ctxt, message, wait_for_reply, timeout) - def send_notification(self, target, ctxt, message, version): + def send_notification(self, target, ctxt, message, version, retry=None): + # NOTE(sileht): retry doesn't need to be implemented, the fake + # transport always works self._send(target, ctxt, message) def listen(self, target): diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index a8881ad..0d62192 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -697,10 +697,10 @@ class Connection(object): """Send a 'fanout' message.""" self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry) - def notify_send(self, exchange_name, topic, msg, **kwargs): + def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): """Send a notify message on a topic.""" self.publisher_send(NotifyPublisher, topic=topic, msg=msg, - exchange_name=exchange_name) + exchange_name=exchange_name, retry=retry) def consume(self, limit=None, timeout=None): """Consume from all queues/consumers.""" diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index bfae2c9..3ced930 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -788,10 +788,10 @@ class Connection(object): """Send a 'fanout' message.""" self.publisher_send(FanoutPublisher, topic, msg, retry=retry) - def notify_send(self, exchange_name, topic, msg, **kwargs): + def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): """Send a notify message on a topic.""" self.publisher_send(NotifyPublisher, topic, msg, timeout=None, - exchange_name=exchange_name, **kwargs) + exchange_name=exchange_name, retry=retry, **kwargs) def consume(self, limit=None, timeout=None): """Consume from all queues/consumers.""" diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index f4360d5..5427319 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -945,9 +945,11 @@ class ZmqDriver(base.BaseDriver): # retry anything return self._send(target, ctxt, message, wait_for_reply, timeout) - def send_notification(self, target, ctxt, message, version): + def send_notification(self, target, ctxt, message, version, retry=None): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. + # NOTE(sileht): retry is not implemented because this driver never + # retry anything target = target(topic=target.topic.replace('.', '-')) return self._send(target, ctxt, message, envelope=(version == 2.0)) diff --git a/oslo/messaging/notify/_impl_log.py b/oslo/messaging/notify/_impl_log.py index 5f6c1ed..3fb62c6 100644 --- a/oslo/messaging/notify/_impl_log.py +++ b/oslo/messaging/notify/_impl_log.py @@ -27,7 +27,7 @@ class LogDriver(notifier._Driver): LOGGER_BASE = 'oslo.messaging.notification' - def notify(self, ctxt, message, priority): + def notify(self, ctxt, message, priority, retry): logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE, message['event_type'])) method = getattr(logger, priority.lower(), None) diff --git a/oslo/messaging/notify/_impl_messaging.py b/oslo/messaging/notify/_impl_messaging.py index 1eca98e..e562f79 100644 --- a/oslo/messaging/notify/_impl_messaging.py +++ b/oslo/messaging/notify/_impl_messaging.py @@ -38,13 +38,14 @@ class MessagingDriver(notifier._Driver): super(MessagingDriver, self).__init__(conf, topics, transport) self.version = version - def notify(self, ctxt, message, priority): + def notify(self, ctxt, message, priority, retry): priority = priority.lower() for topic in self.topics: target = messaging.Target(topic='%s.%s' % (topic, priority)) try: self.transport._send_notification(target, ctxt, message, - version=self.version) + version=self.version, + retry=retry) except Exception: LOG.exception("Could not send notification to %(topic)s. " "Payload=%(message)s", diff --git a/oslo/messaging/notify/_impl_noop.py b/oslo/messaging/notify/_impl_noop.py index d5bb1e7..e6acb62 100644 --- a/oslo/messaging/notify/_impl_noop.py +++ b/oslo/messaging/notify/_impl_noop.py @@ -20,5 +20,5 @@ from oslo.messaging.notify import notifier class NoOpDriver(notifier._Driver): - def notify(self, ctxt, message, priority): + def notify(self, ctxt, message, priority, retry): pass diff --git a/oslo/messaging/notify/_impl_routing.py b/oslo/messaging/notify/_impl_routing.py index 15f989c..e7d9db5 100644 --- a/oslo/messaging/notify/_impl_routing.py +++ b/oslo/messaging/notify/_impl_routing.py @@ -104,21 +104,23 @@ class RoutingDriver(notifier._Driver): return list(accepted_drivers) - def _filter_func(self, ext, context, message, priority, accepted_drivers): + def _filter_func(self, ext, context, message, priority, retry, + accepted_drivers): """True/False if the driver should be called for this message. """ # context is unused here, but passed in by map() return ext.name in accepted_drivers - def _call_notify(self, ext, context, message, priority, accepted_drivers): + def _call_notify(self, ext, context, message, priority, retry, + accepted_drivers): """Emit the notification. """ # accepted_drivers is passed in as a result of the map() function LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") % {'event': message.get('event_type'), 'driver': ext.name}) - ext.obj.notify(context, message, priority) + ext.obj.notify(context, message, priority, retry) - def notify(self, context, message, priority): + def notify(self, context, message, priority, retry): if not self.plugin_manager: self._load_notifiers() @@ -131,4 +133,5 @@ class RoutingDriver(notifier._Driver): self._get_drivers_for_message(group, event_type, priority.lower())) self.plugin_manager.map(self._filter_func, self._call_notify, context, - message, priority, list(accepted_drivers)) + message, priority, retry, + list(accepted_drivers)) diff --git a/oslo/messaging/notify/_impl_test.py b/oslo/messaging/notify/_impl_test.py index 10d69e6..0c861d1 100644 --- a/oslo/messaging/notify/_impl_test.py +++ b/oslo/messaging/notify/_impl_test.py @@ -30,5 +30,5 @@ class TestDriver(notifier._Driver): "Store notifications in memory for test verification." - def notify(self, ctxt, message, priority): - NOTIFICATIONS.append((ctxt, message, priority)) + def notify(self, ctxt, message, priority, retry): + NOTIFICATIONS.append((ctxt, message, priority, retry)) diff --git a/oslo/messaging/notify/notifier.py b/oslo/messaging/notify/notifier.py index 8068030..d5ab9c1 100644 --- a/oslo/messaging/notify/notifier.py +++ b/oslo/messaging/notify/notifier.py @@ -49,7 +49,7 @@ class _Driver(object): self.transport = transport @abc.abstractmethod - def notify(self, ctxt, msg, priority): + def notify(self, ctxt, msg, priority, retry): pass @@ -96,7 +96,7 @@ class Notifier(object): def __init__(self, transport, publisher_id=None, driver=None, topic=None, - serializer=None): + serializer=None, retry=None): """Construct a Notifier object. :param transport: the transport to use for sending messages @@ -109,11 +109,17 @@ class Notifier(object): :type topic: str :param serializer: an optional entity serializer :type serializer: Serializer + :param retry: an connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int """ transport.conf.register_opts(_notifier_opts) self.transport = transport self.publisher_id = publisher_id + self.retry = retry self._driver_names = ([driver] if driver is not None else transport.conf.notification_driver) @@ -130,12 +136,12 @@ class Notifier(object): invoke_kwds={ 'topics': self._topics, 'transport': self.transport, - }, + } ) _marker = object() - def prepare(self, publisher_id=_marker): + def prepare(self, publisher_id=_marker, retry=_marker): """Return a specialized Notifier instance. Returns a new Notifier instance with the supplied publisher_id. Allows @@ -144,10 +150,16 @@ class Notifier(object): :param publisher_id: field in notifications sent, e.g. 'compute.host1' :type publisher_id: str + :param retry: an connection retries configuration + None or -1 means to retry forever + 0 means no retry + N means N retries + :type retry: int """ - return _SubNotifier._prepare(self, publisher_id) + return _SubNotifier._prepare(self, publisher_id, retry=retry) - def _notify(self, ctxt, event_type, payload, priority, publisher_id=None): + def _notify(self, ctxt, event_type, payload, priority, publisher_id=None, + retry=None): payload = self._serializer.serialize_entity(ctxt, payload) ctxt = self._serializer.serialize_context(ctxt) @@ -160,7 +172,7 @@ class Notifier(object): def do_notify(ext): try: - ext.obj.notify(ctxt, msg, priority) + ext.obj.notify(ctxt, msg, priority, retry or self.retry) except Exception as e: _LOG.exception("Problem '%(e)s' attempting to send to " "notification system. Payload=%(payload)s", @@ -178,6 +190,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'AUDIT') @@ -190,6 +203,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'DEBUG') @@ -202,6 +216,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'INFO') @@ -214,6 +229,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'WARN') @@ -228,6 +244,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'ERROR') @@ -240,6 +257,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'CRITICAL') @@ -258,6 +276,7 @@ class Notifier(object): :type event_type: str :param payload: the notification payload :type payload: dict + :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'SAMPLE') @@ -266,10 +285,11 @@ class _SubNotifier(Notifier): _marker = Notifier._marker - def __init__(self, base, publisher_id): + def __init__(self, base, publisher_id, retry): self._base = base self.transport = base.transport self.publisher_id = publisher_id + self.retry = retry self._serializer = self._base._serializer self._driver_mgr = self._base._driver_mgr @@ -278,7 +298,9 @@ class _SubNotifier(Notifier): super(_SubNotifier, self)._notify(ctxt, event_type, payload, priority) @classmethod - def _prepare(cls, base, publisher_id=_marker): + def _prepare(cls, base, publisher_id=_marker, retry=_marker): if publisher_id is cls._marker: publisher_id = base.publisher_id - return cls(base, publisher_id) + if retry is cls._marker: + retry = base.retry + return cls(base, publisher_id, retry=retry) diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index 9df1486..baeeafd 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -89,11 +89,12 @@ class Transport(object): wait_for_reply=wait_for_reply, timeout=timeout, retry=retry) - def _send_notification(self, target, ctxt, message, version): + def _send_notification(self, target, ctxt, message, version, retry=None): if not target.topic: raise exceptions.InvalidTarget('A topic is required to send', target) - self._driver.send_notification(target, ctxt, message, version) + self._driver.send_notification(target, ctxt, message, version, + retry=retry) def _listen(self, target): if not (target.topic and target.server): |