From 33f3b18539da88ddf984ada4b63e2767d64438f9 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 22 Jul 2014 09:42:52 -0700 Subject: Declare DirectPublisher exchanges with passive=True If rabbit dies, the consumer can be disconnected before the publisher sends, and if the consumer hasn't declared the queue, the publisher's will send a message to an exchange that's not bound to a queue, and the message wll be lost. Setting passive=True will cause the publisher to fail and retry if the consumer hasn't declared the receiving queue yet. Co-Authored-By: Noel Burton-Krahn Closes-Bug: #1338732 (cherry picked from commit 434b5c8781b36cd65b7b642d723c0502e7093795) Conflicts: oslo_messaging/_drivers/common.py oslo_messaging/tests/test_utils.py Change-Id: I5ba4d311b97236d3a85a9f5badff61f12b08c12d --- oslo/messaging/_drivers/common.py | 4 ++-- oslo/messaging/_drivers/impl_rabbit.py | 30 ++++++++++++++++++++++++++++-- tests/test_rabbit.py | 23 +++++++++++++++++++++-- tests/test_utils.py | 26 ++++++++++++++++++++++---- 4 files changed, 73 insertions(+), 10 deletions(-) diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py index 49fd712..6d5f1fb 100644 --- a/oslo/messaging/_drivers/common.py +++ b/oslo/messaging/_drivers/common.py @@ -520,7 +520,7 @@ class DecayingTimer(object): self._ends_at = time.time() + max(0, self._duration) return self - def check_return(self, timeout_callback, *args, **kwargs): + def check_return(self, timeout_callback=None, *args, **kwargs): maximum = kwargs.pop('maximum', None) if self._duration is None: return None if maximum is None else maximum @@ -529,7 +529,7 @@ class DecayingTimer(object): " that has not been started") left = self._ends_at - time.time() - if left <= 0: + if left <= 0 and timeout_callback is not None: timeout_callback(*args, **kwargs) return left if maximum is None else min(left, maximum) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 0114951..0182800 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -354,7 +354,8 @@ class DirectPublisher(Publisher): options = {'durable': False, 'auto_delete': True, - 'exclusive': False} + 'exclusive': False, + 'passive': True} options.update(kwargs) super(DirectPublisher, self).__init__(channel, msg_id, msg_id, type='direct', **options) @@ -370,6 +371,7 @@ class TopicPublisher(Publisher): options = {'durable': conf.amqp_durable_queues, 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} + options.update(kwargs) exchange_name = rpc_amqp.get_control_exchange(conf) super(TopicPublisher, self).__init__(channel, @@ -760,7 +762,31 @@ class Connection(object): def direct_send(self, msg_id, msg): """Send a 'direct' message.""" - self.publisher_send(DirectPublisher, msg_id, msg) + + timer = rpc_common.DecayingTimer(duration=60) + timer.start() + # NOTE(sileht): retry at least 60sec, after we have a good change + # that the caller is really dead too... + + while True: + try: + self.publisher_send(DirectPublisher, msg_id, msg) + except self.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404 and timer.check_return() > 0: + LOG.info(_("The exchange to reply to %s doesn't " + "exist yet, retrying...") % msg_id) + time.sleep(1) + continue + raise + return def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message.""" diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index a680697..88c8481 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -221,8 +221,27 @@ class TestSendReceive(test_utils.BaseTestCase): raise ZeroDivisionError except Exception: failure = sys.exc_info() - msgs[i].reply(failure=failure, - log_failure=not self.expected) + + # NOTE(noelbk) confirm that Publisher exchanges + # are always declared with passive=True + outer_self = self + test_exchange_was_called = [False] + old_init = kombu.entity.Exchange.__init__ + + def new_init(self, *args, **kwargs): + test_exchange_was_called[0] = True + outer_self.assertTrue(kwargs['passive']) + old_init(self, *args, **kwargs) + kombu.entity.Exchange.__init__ = new_init + + try: + msgs[i].reply(failure=failure, + log_failure=not self.expected) + finally: + kombu.entity.Exchange.__init__ = old_init + + self.assertTrue(test_exchange_was_called[0]) + elif self.rx_id: msgs[i].reply({'rx_id': i}) else: diff --git a/tests/test_utils.py b/tests/test_utils.py index 38ee1d0..ad5fb0d 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import time + +import mock + from oslo.messaging._drivers import common from oslo.messaging import _utils as utils from tests import utils as test_utils @@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase): class TimerTestCase(test_utils.BaseTestCase): - def test_duration_is_none(self): + def test_no_duration_no_callback(self): t = common.DecayingTimer() t.start() - remaining = t.check_return(None) + remaining = t.check_return() self.assertEqual(None, remaining) - def test_duration_is_none_and_maximun_set(self): + def test_no_duration_but_maximun(self): t = common.DecayingTimer() t.start() - remaining = t.check_return(None, maximum=2) + remaining = t.check_return(maximum=2) self.assertEqual(2, remaining) + + def test_duration_expired_no_callback(self): + t = common.DecayingTimer(2) + t._ends_at = time.time() - 10 + remaining = t.check_return() + self.assertAlmostEqual(-10, remaining, 0) + + def test_duration_callback(self): + t = common.DecayingTimer(2) + t._ends_at = time.time() - 10 + callback = mock.Mock() + remaining = t.check_return(callback) + self.assertAlmostEqual(-10, remaining, 0) + callback.assert_called_once -- cgit v1.2.1