summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <mehdi.abaakouk@enovance.com>2014-07-22 09:42:52 -0700
committerEdward Hope-Morley <edward.hope-morley@canonical.com>2015-04-16 17:27:33 +0100
commit33f3b18539da88ddf984ada4b63e2767d64438f9 (patch)
tree01f2b62bf139ef1afa48b002e2f25012c9ce3174
parent106dd46101f536359df984394b5994f911df4912 (diff)
downloadoslo-messaging-stable/icehouse.tar.gz
Declare DirectPublisher exchanges with passive=Trueicehouse-eolstable/icehouse
If rabbit dies, the consumer can be disconnected before the publisher sends, and if the consumer hasn't declared the queue, the publisher's will send a message to an exchange that's not bound to a queue, and the message wll be lost. Setting passive=True will cause the publisher to fail and retry if the consumer hasn't declared the receiving queue yet. Co-Authored-By: Noel Burton-Krahn <noel@burton-krahn.com> Closes-Bug: #1338732 (cherry picked from commit 434b5c8781b36cd65b7b642d723c0502e7093795) Conflicts: oslo_messaging/_drivers/common.py oslo_messaging/tests/test_utils.py Change-Id: I5ba4d311b97236d3a85a9f5badff61f12b08c12d
-rw-r--r--oslo/messaging/_drivers/common.py4
-rw-r--r--oslo/messaging/_drivers/impl_rabbit.py30
-rw-r--r--tests/test_rabbit.py23
-rw-r--r--tests/test_utils.py26
4 files changed, 73 insertions, 10 deletions
diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py
index 49fd712..6d5f1fb 100644
--- a/oslo/messaging/_drivers/common.py
+++ b/oslo/messaging/_drivers/common.py
@@ -520,7 +520,7 @@ class DecayingTimer(object):
self._ends_at = time.time() + max(0, self._duration)
return self
- def check_return(self, timeout_callback, *args, **kwargs):
+ def check_return(self, timeout_callback=None, *args, **kwargs):
maximum = kwargs.pop('maximum', None)
if self._duration is None:
return None if maximum is None else maximum
@@ -529,7 +529,7 @@ class DecayingTimer(object):
" that has not been started")
left = self._ends_at - time.time()
- if left <= 0:
+ if left <= 0 and timeout_callback is not None:
timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum)
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 0114951..0182800 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -354,7 +354,8 @@ class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
- 'exclusive': False}
+ 'exclusive': False,
+ 'passive': True}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
@@ -370,6 +371,7 @@ class TopicPublisher(Publisher):
options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete,
'exclusive': False}
+
options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(channel,
@@ -760,7 +762,31 @@ class Connection(object):
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
- self.publisher_send(DirectPublisher, msg_id, msg)
+
+ timer = rpc_common.DecayingTimer(duration=60)
+ timer.start()
+ # NOTE(sileht): retry at least 60sec, after we have a good change
+ # that the caller is really dead too...
+
+ while True:
+ try:
+ self.publisher_send(DirectPublisher, msg_id, msg)
+ except self.connection.channel_errors as exc:
+ # NOTE(noelbk/sileht):
+ # If rabbit dies, the consumer can be disconnected before the
+ # publisher sends, and if the consumer hasn't declared the
+ # queue, the publisher's will send a message to an exchange
+ # that's not bound to a queue, and the message wll be lost.
+ # So we set passive=True to the publisher exchange and catch
+ # the 404 kombu ChannelError and retry until the exchange
+ # appears
+ if exc.code == 404 and timer.check_return() > 0:
+ LOG.info(_("The exchange to reply to %s doesn't "
+ "exist yet, retrying...") % msg_id)
+ time.sleep(1)
+ continue
+ raise
+ return
def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message."""
diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py
index a680697..88c8481 100644
--- a/tests/test_rabbit.py
+++ b/tests/test_rabbit.py
@@ -221,8 +221,27 @@ class TestSendReceive(test_utils.BaseTestCase):
raise ZeroDivisionError
except Exception:
failure = sys.exc_info()
- msgs[i].reply(failure=failure,
- log_failure=not self.expected)
+
+ # NOTE(noelbk) confirm that Publisher exchanges
+ # are always declared with passive=True
+ outer_self = self
+ test_exchange_was_called = [False]
+ old_init = kombu.entity.Exchange.__init__
+
+ def new_init(self, *args, **kwargs):
+ test_exchange_was_called[0] = True
+ outer_self.assertTrue(kwargs['passive'])
+ old_init(self, *args, **kwargs)
+ kombu.entity.Exchange.__init__ = new_init
+
+ try:
+ msgs[i].reply(failure=failure,
+ log_failure=not self.expected)
+ finally:
+ kombu.entity.Exchange.__init__ = old_init
+
+ self.assertTrue(test_exchange_was_called[0])
+
elif self.rx_id:
msgs[i].reply({'rx_id': i})
else:
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 38ee1d0..ad5fb0d 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import time
+
+import mock
+
from oslo.messaging._drivers import common
from oslo.messaging import _utils as utils
from tests import utils as test_utils
@@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
class TimerTestCase(test_utils.BaseTestCase):
- def test_duration_is_none(self):
+ def test_no_duration_no_callback(self):
t = common.DecayingTimer()
t.start()
- remaining = t.check_return(None)
+ remaining = t.check_return()
self.assertEqual(None, remaining)
- def test_duration_is_none_and_maximun_set(self):
+ def test_no_duration_but_maximun(self):
t = common.DecayingTimer()
t.start()
- remaining = t.check_return(None, maximum=2)
+ remaining = t.check_return(maximum=2)
self.assertEqual(2, remaining)
+
+ def test_duration_expired_no_callback(self):
+ t = common.DecayingTimer(2)
+ t._ends_at = time.time() - 10
+ remaining = t.check_return()
+ self.assertAlmostEqual(-10, remaining, 0)
+
+ def test_duration_callback(self):
+ t = common.DecayingTimer(2)
+ t._ends_at = time.time() - 10
+ callback = mock.Mock()
+ remaining = t.check_return(callback)
+ self.assertAlmostEqual(-10, remaining, 0)
+ callback.assert_called_once