summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Sergeyev <vsergeyev@mirantis.com>2015-05-12 14:25:47 +0300
committerVictor Sergeyev <vsergeyev@mirantis.com>2015-05-28 15:32:37 +0300
commitc1c0af206944ee283509fbf49a932ff623d78a0c (patch)
tree50341149f9aefa6577f7f108e31a4bb85c1db5e5
parentbaddce34a2f94487f162014df7423a63d58fcd18 (diff)
downloadoslo-messaging-c1c0af206944ee283509fbf49a932ff623d78a0c.tar.gz
Don't create a new channel in RabbitMQ Connection.reset()
Current implementation of RabbitMQ driver in in Connection.reset() change the channel to use and create a new channel for it. This happens after the each message send. There no big need to create a new channel each time, so we can cancel all consumer queues, instead of creating a channel in reset(). Test test_connection_reset_always_succeed() removed, because we are not create channel on reset() anymore. Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com> Change-Id: Ie164840e6c055b01525b13aabdb8b9c7f5d1b98b
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py8
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py12
2 files changed, 6 insertions, 14 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 8c696cb..75041cd 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -231,6 +231,9 @@ class Consumer(object):
consumer_tag=six.text_type(tag),
nowait=self.nowait)
+ def cancel(self, tag):
+ self.queue.cancel(six.text_type(tag))
+
def _callback(self, message):
"""Call callback with deserialized message.
@@ -689,11 +692,12 @@ class Connection(object):
with self._connection_lock:
try:
- self._set_current_channel(self.connection.channel())
+ for tag, consumer in enumerate(self._consumers):
+ consumer.cancel(tag=tag)
except recoverable_errors:
self._set_current_channel(None)
self.ensure_connection()
- self._consumers = []
+ self._consumers = []
def _heartbeat_supported_and_enabled(self):
if self.driver_conf.heartbeat_timeout_threshold <= 0:
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 1e942a9..48ffffc 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -257,18 +257,6 @@ class TestRabbitConsume(test_utils.BaseTestCase):
self.assertEqual(0, int(deadline - time.time()))
- def test_connection_reset_always_succeed(self):
- transport = oslo_messaging.get_transport(self.conf,
- 'kombu+memory:////')
- self.addCleanup(transport.cleanup)
- channel = mock.Mock()
- with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
- conn.connection.connection.recoverable_channel_errors = (IOError,)
- with mock.patch.object(conn.connection.connection, 'channel',
- side_effect=[IOError, IOError, channel]):
- conn.connection.reset()
- self.assertEqual(channel, conn.connection.channel)
-
def test_connection_ack_have_disconnected_kombu_connection(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')