summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChangBo Guo(gcb) <eric.guo@easystack.cn>2016-02-18 16:12:56 +0800
committerChangBo Guo(gcb) <eric.guo@easystack.cn>2016-02-18 16:41:24 +0800
commit9cfaf50c45cf82f102f157655359e9705dc5733c (patch)
tree57b52a0c7247fd1a4558424dcb7d87662ed3d278
parentb80fdc850dd76b3ef45b2d2deaa2d5d24678d1f8 (diff)
downloadoslo-messaging-9cfaf50c45cf82f102f157655359e9705dc5733c.tar.gz
Fix kombu accept different TTL since version 3.0.25
kombu accept TTL as seconds instead of millisecond since version 3.0.25, We remove TTL conversion in commit d49ddc3b9828f097d5bb39bca0381386a9de7762, which is valid only when kombu >=3.0.25, so need do conversion according to kombu version. Note: Remove this workaround when all supported branches with requirement kombu >=3.0.25 Closes-Bug: #1531148 Change-Id: I762265f23084a95f2d24cb434bad7d9556d4edd5
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py16
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py19
2 files changed, 31 insertions, 4 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index a9d0ba2..455767f 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -30,6 +30,8 @@ import kombu.messaging
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import netutils
+from oslo_utils import versionutils
+import pkg_resources
import six
from six.moves.urllib import parse
@@ -1042,6 +1044,18 @@ class Connection(object):
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
+ def _get_expiration(self, timeout):
+ # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
+ # version 3.0.25, so do conversion according to kombu version.
+ # TODO(gcb) remove this workaround when all supported branches
+ # with requirement kombu >=3.0.25
+ if timeout is not None:
+ kombu_version = pkg_resources.get_distribution('kombu').version
+ if not versionutils.is_compatible('3.0.25', kombu_version):
+ timeout = int(timeout * 1000)
+
+ return timeout
+
def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message."""
producer = kombu.messaging.Producer(exchange=exchange,
@@ -1068,7 +1082,7 @@ class Connection(object):
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
with self._transport_socket_timeout(transport_timeout):
- producer.publish(msg, expiration=timeout,
+ producer.publish(msg, expiration=self._get_expiration(timeout),
compression=self.kombu_compression)
# List of notification queue declared on the channel to avoid
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 2342734..bb44960 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -24,7 +24,9 @@ import kombu
import kombu.transport.memory
from oslo_config import cfg
from oslo_serialization import jsonutils
+from oslo_utils import versionutils
from oslotest import mockpatch
+import pkg_resources
import testscenarios
import oslo_messaging
@@ -206,9 +208,20 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key',
timeout=1)
- fake_publish.assert_called_with(
- 'msg', expiration=1,
- compression=self.conf.oslo_messaging_rabbit.kombu_compression)
+
+ # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
+ # version 3.0.25, so do conversion according to kombu version.
+ # TODO(gcb) remove this workaround when all supported branches
+ # with requirement kombu >=3.0.25
+ kombu_version = pkg_resources.get_distribution('kombu').version
+ if versionutils.is_compatible('3.0.25', kombu_version):
+ fake_publish.assert_called_with(
+ 'msg', expiration=1,
+ compression=self.conf.oslo_messaging_rabbit.kombu_compression)
+ else:
+ fake_publish.assert_called_with(
+ 'msg', expiration=1000,
+ compression=self.conf.oslo_messaging_rabbit.kombu_compression)
@mock.patch('kombu.messaging.Producer.publish')
def test_send_no_timeout(self, fake_publish):