diff options
author | ChangBo Guo(gcb) <eric.guo@easystack.cn> | 2016-02-18 16:12:56 +0800 |
---|---|---|
committer | ChangBo Guo(gcb) <eric.guo@easystack.cn> | 2016-02-19 13:20:03 +0800 |
commit | ee46dda59f605f721c9b31641d1950c454856131 (patch) | |
tree | bdd7a38aec21b6671943ccc85ee79d68bd0fdf22 /oslo_messaging | |
parent | 26c85209af04c73246e1aa695a79bb45793fe6b4 (diff) | |
download | oslo-messaging-ee46dda59f605f721c9b31641d1950c454856131.tar.gz |
Fix kombu accept different TTL since version 3.0.25
kombu accept TTL as seconds instead of millisecond since version 3.0.25,
We remove TTL conversion in commit d49ddc3b9828f097d5bb39bca0381386a9de7762,
which is valid only when kombu >=3.0.25, so need do conversion according
to kombu version.
Note: Remove this workaround when all supported branches with
requirement kombu >=3.0.25
Closes-Bug: #1531148
Change-Id: I762265f23084a95f2d24cb434bad7d9556d4edd5
(cherry picked from commit 9cfaf50c45cf82f102f157655359e9705dc5733c)
Diffstat (limited to 'oslo_messaging')
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 16 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 13 |
2 files changed, 27 insertions, 2 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d5ab80..ca525ff 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -30,6 +30,8 @@ import kombu.messaging from oslo_config import cfg from oslo_log import log as logging from oslo_utils import netutils +from oslo_utils import versionutils +import pkg_resources import six from six.moves.urllib import parse @@ -997,6 +999,18 @@ class Connection(object): with self._connection_lock: self.ensure(method, retry=retry, error_callback=_error_callback) + def _get_expiration(self, timeout): + # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since + # version 3.0.25, so do conversion according to kombu version. + # TODO(gcb) remove this workaround when all supported branches + # with requirement kombu >=3.0.25 + if timeout is not None: + kombu_version = pkg_resources.get_distribution('kombu').version + if not versionutils.is_compatible('3.0.25', kombu_version): + timeout = int(timeout * 1000) + + return timeout + def _publish(self, exchange, msg, routing_key=None, timeout=None): """Publish a message.""" producer = kombu.messaging.Producer(exchange=exchange, @@ -1022,7 +1036,7 @@ class Connection(object): LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) with self._transport_socket_timeout(transport_timeout): - producer.publish(msg, expiration=timeout) + producer.publish(msg, expiration=self._get_expiration(timeout)) # List of notification queue declared on the channel to avoid # unnecessary redeclaration. This list is resetted each time diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 6d21e3a..8a7c396 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -24,7 +24,9 @@ import kombu import kombu.transport.memory from oslo_config import cfg from oslo_serialization import jsonutils +from oslo_utils import versionutils from oslotest import mockpatch +import pkg_resources import testscenarios import oslo_messaging @@ -179,7 +181,16 @@ class TestRabbitPublisher(test_utils.BaseTestCase): conn = pool_conn.connection conn._publish(mock.Mock(), 'msg', routing_key='routing_key', timeout=1) - fake_publish.assert_called_with('msg', expiration=1) + + # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since + # version 3.0.25, so do conversion according to kombu version. + # TODO(gcb) remove this workaround when all supported branches + # with requirement kombu >=3.0.25 + kombu_version = pkg_resources.get_distribution('kombu').version + if versionutils.is_compatible('3.0.25', kombu_version): + fake_publish.assert_called_with('msg', expiration=1) + else: + fake_publish.assert_called_with('msg', expiration=1000) @mock.patch('kombu.messaging.Producer.publish') def test_send_no_timeout(self, fake_publish): |