summaryrefslogtreecommitdiff
path: root/oslo
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2014-05-06 13:47:12 +0200
committerMehdi Abaakouk <mehdi.abaakouk@enovance.com>2014-06-18 18:41:33 +0200
commit1ea9c35ab4d7446cc819490b33be802e5b2886ea (patch)
tree0db655192c642ec34516bc64866ac6ec774da250 /oslo
parente349c5e6f2d3e6909856a493b7b05827d7eec7c8 (diff)
downloadoslo-messaging-1ea9c35ab4d7446cc819490b33be802e5b2886ea.tar.gz
Transport reconnection retries for notification
This patch add support of reconnection retries for the messaging notifier. Related bug #1282639 Change-Id: Ia30331f8306ff0f6952d83ef42ff8bee6b900427
Diffstat (limited to 'oslo')
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py4
-rw-r--r--oslo/messaging/_drivers/impl_fake.py4
-rw-r--r--oslo/messaging/_drivers/impl_qpid.py4
-rw-r--r--oslo/messaging/_drivers/impl_rabbit.py4
-rw-r--r--oslo/messaging/_drivers/impl_zmq.py4
-rw-r--r--oslo/messaging/notify/_impl_log.py2
-rw-r--r--oslo/messaging/notify/_impl_messaging.py5
-rw-r--r--oslo/messaging/notify/_impl_noop.py2
-rw-r--r--oslo/messaging/notify/_impl_routing.py13
-rw-r--r--oslo/messaging/notify/_impl_test.py4
-rw-r--r--oslo/messaging/notify/notifier.py42
-rw-r--r--oslo/messaging/transport.py5
12 files changed, 62 insertions, 31 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index 6d0b583..820c3f7 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -389,9 +389,9 @@ class AMQPDriverBase(base.BaseDriver):
return self._send(target, ctxt, message, wait_for_reply, timeout,
retry=retry)
- def send_notification(self, target, ctxt, message, version):
+ def send_notification(self, target, ctxt, message, version, retry=None):
return self._send(target, ctxt, message,
- envelope=(version == 2.0), notify=True)
+ envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target):
conn = self._get_connection(pooled=False)
diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py
index b99c1b3..ffc0b79 100644
--- a/oslo/messaging/_drivers/impl_fake.py
+++ b/oslo/messaging/_drivers/impl_fake.py
@@ -168,7 +168,9 @@ class FakeDriver(base.BaseDriver):
# transport always works
return self._send(target, ctxt, message, wait_for_reply, timeout)
- def send_notification(self, target, ctxt, message, version):
+ def send_notification(self, target, ctxt, message, version, retry=None):
+ # NOTE(sileht): retry doesn't need to be implemented, the fake
+ # transport always works
self._send(target, ctxt, message)
def listen(self, target):
diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py
index a8881ad..0d62192 100644
--- a/oslo/messaging/_drivers/impl_qpid.py
+++ b/oslo/messaging/_drivers/impl_qpid.py
@@ -697,10 +697,10 @@ class Connection(object):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
- def notify_send(self, exchange_name, topic, msg, **kwargs):
+ def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
- exchange_name=exchange_name)
+ exchange_name=exchange_name, retry=retry)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index bfae2c9..3ced930 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -788,10 +788,10 @@ class Connection(object):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
- def notify_send(self, exchange_name, topic, msg, **kwargs):
+ def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
- exchange_name=exchange_name, **kwargs)
+ exchange_name=exchange_name, retry=retry, **kwargs)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""
diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py
index f4360d5..5427319 100644
--- a/oslo/messaging/_drivers/impl_zmq.py
+++ b/oslo/messaging/_drivers/impl_zmq.py
@@ -945,9 +945,11 @@ class ZmqDriver(base.BaseDriver):
# retry anything
return self._send(target, ctxt, message, wait_for_reply, timeout)
- def send_notification(self, target, ctxt, message, version):
+ def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
+ # NOTE(sileht): retry is not implemented because this driver never
+ # retry anything
target = target(topic=target.topic.replace('.', '-'))
return self._send(target, ctxt, message, envelope=(version == 2.0))
diff --git a/oslo/messaging/notify/_impl_log.py b/oslo/messaging/notify/_impl_log.py
index 5f6c1ed..3fb62c6 100644
--- a/oslo/messaging/notify/_impl_log.py
+++ b/oslo/messaging/notify/_impl_log.py
@@ -27,7 +27,7 @@ class LogDriver(notifier._Driver):
LOGGER_BASE = 'oslo.messaging.notification'
- def notify(self, ctxt, message, priority):
+ def notify(self, ctxt, message, priority, retry):
logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE,
message['event_type']))
method = getattr(logger, priority.lower(), None)
diff --git a/oslo/messaging/notify/_impl_messaging.py b/oslo/messaging/notify/_impl_messaging.py
index 1eca98e..e562f79 100644
--- a/oslo/messaging/notify/_impl_messaging.py
+++ b/oslo/messaging/notify/_impl_messaging.py
@@ -38,13 +38,14 @@ class MessagingDriver(notifier._Driver):
super(MessagingDriver, self).__init__(conf, topics, transport)
self.version = version
- def notify(self, ctxt, message, priority):
+ def notify(self, ctxt, message, priority, retry):
priority = priority.lower()
for topic in self.topics:
target = messaging.Target(topic='%s.%s' % (topic, priority))
try:
self.transport._send_notification(target, ctxt, message,
- version=self.version)
+ version=self.version,
+ retry=retry)
except Exception:
LOG.exception("Could not send notification to %(topic)s. "
"Payload=%(message)s",
diff --git a/oslo/messaging/notify/_impl_noop.py b/oslo/messaging/notify/_impl_noop.py
index d5bb1e7..e6acb62 100644
--- a/oslo/messaging/notify/_impl_noop.py
+++ b/oslo/messaging/notify/_impl_noop.py
@@ -20,5 +20,5 @@ from oslo.messaging.notify import notifier
class NoOpDriver(notifier._Driver):
- def notify(self, ctxt, message, priority):
+ def notify(self, ctxt, message, priority, retry):
pass
diff --git a/oslo/messaging/notify/_impl_routing.py b/oslo/messaging/notify/_impl_routing.py
index 15f989c..e7d9db5 100644
--- a/oslo/messaging/notify/_impl_routing.py
+++ b/oslo/messaging/notify/_impl_routing.py
@@ -104,21 +104,23 @@ class RoutingDriver(notifier._Driver):
return list(accepted_drivers)
- def _filter_func(self, ext, context, message, priority, accepted_drivers):
+ def _filter_func(self, ext, context, message, priority, retry,
+ accepted_drivers):
"""True/False if the driver should be called for this message.
"""
# context is unused here, but passed in by map()
return ext.name in accepted_drivers
- def _call_notify(self, ext, context, message, priority, accepted_drivers):
+ def _call_notify(self, ext, context, message, priority, retry,
+ accepted_drivers):
"""Emit the notification.
"""
# accepted_drivers is passed in as a result of the map() function
LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") %
{'event': message.get('event_type'), 'driver': ext.name})
- ext.obj.notify(context, message, priority)
+ ext.obj.notify(context, message, priority, retry)
- def notify(self, context, message, priority):
+ def notify(self, context, message, priority, retry):
if not self.plugin_manager:
self._load_notifiers()
@@ -131,4 +133,5 @@ class RoutingDriver(notifier._Driver):
self._get_drivers_for_message(group, event_type,
priority.lower()))
self.plugin_manager.map(self._filter_func, self._call_notify, context,
- message, priority, list(accepted_drivers))
+ message, priority, retry,
+ list(accepted_drivers))
diff --git a/oslo/messaging/notify/_impl_test.py b/oslo/messaging/notify/_impl_test.py
index 10d69e6..0c861d1 100644
--- a/oslo/messaging/notify/_impl_test.py
+++ b/oslo/messaging/notify/_impl_test.py
@@ -30,5 +30,5 @@ class TestDriver(notifier._Driver):
"Store notifications in memory for test verification."
- def notify(self, ctxt, message, priority):
- NOTIFICATIONS.append((ctxt, message, priority))
+ def notify(self, ctxt, message, priority, retry):
+ NOTIFICATIONS.append((ctxt, message, priority, retry))
diff --git a/oslo/messaging/notify/notifier.py b/oslo/messaging/notify/notifier.py
index 8068030..d5ab9c1 100644
--- a/oslo/messaging/notify/notifier.py
+++ b/oslo/messaging/notify/notifier.py
@@ -49,7 +49,7 @@ class _Driver(object):
self.transport = transport
@abc.abstractmethod
- def notify(self, ctxt, msg, priority):
+ def notify(self, ctxt, msg, priority, retry):
pass
@@ -96,7 +96,7 @@ class Notifier(object):
def __init__(self, transport, publisher_id=None,
driver=None, topic=None,
- serializer=None):
+ serializer=None, retry=None):
"""Construct a Notifier object.
:param transport: the transport to use for sending messages
@@ -109,11 +109,17 @@ class Notifier(object):
:type topic: str
:param serializer: an optional entity serializer
:type serializer: Serializer
+ :param retry: an connection retries configuration
+ None or -1 means to retry forever
+ 0 means no retry
+ N means N retries
+ :type retry: int
"""
transport.conf.register_opts(_notifier_opts)
self.transport = transport
self.publisher_id = publisher_id
+ self.retry = retry
self._driver_names = ([driver] if driver is not None
else transport.conf.notification_driver)
@@ -130,12 +136,12 @@ class Notifier(object):
invoke_kwds={
'topics': self._topics,
'transport': self.transport,
- },
+ }
)
_marker = object()
- def prepare(self, publisher_id=_marker):
+ def prepare(self, publisher_id=_marker, retry=_marker):
"""Return a specialized Notifier instance.
Returns a new Notifier instance with the supplied publisher_id. Allows
@@ -144,10 +150,16 @@ class Notifier(object):
:param publisher_id: field in notifications sent, e.g. 'compute.host1'
:type publisher_id: str
+ :param retry: an connection retries configuration
+ None or -1 means to retry forever
+ 0 means no retry
+ N means N retries
+ :type retry: int
"""
- return _SubNotifier._prepare(self, publisher_id)
+ return _SubNotifier._prepare(self, publisher_id, retry=retry)
- def _notify(self, ctxt, event_type, payload, priority, publisher_id=None):
+ def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
+ retry=None):
payload = self._serializer.serialize_entity(ctxt, payload)
ctxt = self._serializer.serialize_context(ctxt)
@@ -160,7 +172,7 @@ class Notifier(object):
def do_notify(ext):
try:
- ext.obj.notify(ctxt, msg, priority)
+ ext.obj.notify(ctxt, msg, priority, retry or self.retry)
except Exception as e:
_LOG.exception("Problem '%(e)s' attempting to send to "
"notification system. Payload=%(payload)s",
@@ -178,6 +190,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'AUDIT')
@@ -190,6 +203,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'DEBUG')
@@ -202,6 +216,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'INFO')
@@ -214,6 +229,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'WARN')
@@ -228,6 +244,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'ERROR')
@@ -240,6 +257,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'CRITICAL')
@@ -258,6 +276,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
+ :raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'SAMPLE')
@@ -266,10 +285,11 @@ class _SubNotifier(Notifier):
_marker = Notifier._marker
- def __init__(self, base, publisher_id):
+ def __init__(self, base, publisher_id, retry):
self._base = base
self.transport = base.transport
self.publisher_id = publisher_id
+ self.retry = retry
self._serializer = self._base._serializer
self._driver_mgr = self._base._driver_mgr
@@ -278,7 +298,9 @@ class _SubNotifier(Notifier):
super(_SubNotifier, self)._notify(ctxt, event_type, payload, priority)
@classmethod
- def _prepare(cls, base, publisher_id=_marker):
+ def _prepare(cls, base, publisher_id=_marker, retry=_marker):
if publisher_id is cls._marker:
publisher_id = base.publisher_id
- return cls(base, publisher_id)
+ if retry is cls._marker:
+ retry = base.retry
+ return cls(base, publisher_id, retry=retry)
diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py
index 9df1486..baeeafd 100644
--- a/oslo/messaging/transport.py
+++ b/oslo/messaging/transport.py
@@ -89,11 +89,12 @@ class Transport(object):
wait_for_reply=wait_for_reply,
timeout=timeout, retry=retry)
- def _send_notification(self, target, ctxt, message, version):
+ def _send_notification(self, target, ctxt, message, version, retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
- self._driver.send_notification(target, ctxt, message, version)
+ self._driver.send_notification(target, ctxt, message, version,
+ retry=retry)
def _listen(self, target):
if not (target.topic and target.server):