summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgord chung <gord@live.ca>2016-12-09 18:31:06 +0000
committergordon chung <gord@live.ca>2017-02-10 13:21:22 +0000
commit5bacea1f42f7f5dc822b5f4f5968a3d8d3361b59 (patch)
tree3b8400ebb5f0e6302896bc844de8e91d99f04341
parentf3cc165dba2cabf44cf4759c18a7b65ca3e4a260 (diff)
downloadoslo-messaging-5bacea1f42f7f5dc822b5f4f5968a3d8d3361b59.tar.gz
support kombu4
- kombu4 wraps recoverable errors as OperationalErrors rather than raising amqp errors - also, raise a recoverable error and redeclare if for some reason a message is double acknowledged... previously, this was hidden. - ensure socket is not none - use connect method to ensure connection Depends-On: I9f980b51901ac31599b9651633956ad2eea6a1ac Change-Id: I73958c8057353a2eefe1baaa7a41148193d507f7
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py59
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py55
-rw-r--r--requirements.txt5
-rwxr-xr-xtools/tox_install.sh3
-rw-r--r--tox.ini4
5 files changed, 66 insertions, 60 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index eb6d11a..352fb53 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -335,12 +335,15 @@ class Consumer(object):
# bugs.launchpad.net/oslo.messaging/+bug/1609766
# bugs.launchpad.net/neutron/+bug/1318721
+ # 406 error code relates to messages that are doubled ack'd
+
# At any channel error, the RabbitMQ closes
# the channel, but the amqp-lib quietly re-open
# it. So, we must reset all tags and declare
# all consumers again.
conn._new_tags = set(conn._consumers.values())
- if exc.code == 404:
+ if exc.code == 404 or (exc.code == 406 and
+ exc.method_name == 'Basic.ack'):
self.declare(conn)
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
@@ -593,6 +596,24 @@ class Connection(object):
' %(hostname)s:%(port)s',
self._get_connection_info())
+ # FIXME(gordc): wrapper to catch both kombu v3 and v4 errors
+ # remove this and only catch OperationalError when >4.0.0
+ if hasattr(kombu.exceptions, 'OperationalError'):
+ self.recoverable_errors = kombu.exceptions.OperationalError
+ else:
+ # NOTE(sileht): Some dummy driver like the in-memory one doesn't
+ # have notion of recoverable connection, so we must raise the
+ # original exception like kombu does in this case.
+ has_modern_errors = hasattr(
+ self.connection.transport, 'recoverable_connection_errors',
+ )
+ if has_modern_errors:
+ self.recoverable_errors = (
+ self.connection.recoverable_channel_errors +
+ self.connection.recoverable_connection_errors)
+ else:
+ self.recoverable_errors = ()
+
# NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection
# so, to not lock to much this lock to most of the time do nothing
@@ -707,7 +728,7 @@ class Connection(object):
# NOTE(sileht): we reset the channel and ensure
# the kombu underlying connection works
self._set_current_channel(None)
- self.ensure(method=lambda: self.connection.connection)
+ self.ensure(method=self.connection.connect)
self.set_transport_socket_timeout()
def ensure(self, method, retry=None,
@@ -792,19 +813,6 @@ class Connection(object):
self._set_current_channel(channel)
method()
- # NOTE(sileht): Some dummy driver like the in-memory one doesn't
- # have notion of recoverable connection, so we must raise the original
- # exception like kombu does in this case.
- has_modern_errors = hasattr(
- self.connection.transport, 'recoverable_connection_errors',
- )
- if has_modern_errors:
- recoverable_errors = (
- self.connection.recoverable_channel_errors +
- self.connection.recoverable_connection_errors)
- else:
- recoverable_errors = ()
-
try:
autoretry_method = self.connection.autoretry(
execute_method, channel=self.channel,
@@ -817,7 +825,7 @@ class Connection(object):
ret, channel = autoretry_method()
self._set_current_channel(channel)
return ret
- except recoverable_errors as exc:
+ except self.recoverable_errors as exc:
LOG.debug("Received recoverable error from kombu:",
exc_info=True)
error_callback and error_callback(exc)
@@ -883,13 +891,11 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
- recoverable_errors = (self.connection.recoverable_channel_errors +
- self.connection.recoverable_connection_errors)
with self._connection_lock:
try:
for consumer, tag in self._consumers.items():
consumer.cancel(tag=tag)
- except recoverable_errors:
+ except self.recoverable_errors:
self.ensure_connection()
self._consumers.clear()
self._active_tags.clear()
@@ -987,10 +993,6 @@ class Connection(object):
while not self._heartbeat_exit_event.is_set():
with self._connection_lock.for_heartbeat():
- recoverable_errors = (
- self.connection.recoverable_channel_errors +
- self.connection.recoverable_connection_errors)
-
try:
try:
self._heartbeat_check()
@@ -1004,7 +1006,7 @@ class Connection(object):
self.connection.drain_events(timeout=0.001)
except socket.timeout:
pass
- except recoverable_errors as exc:
+ except self.recoverable_errors as exc:
LOG.info(_LI("A recoverable connection/channel error "
"occurred, trying to reconnect: %s"), exc)
self.ensure_connection()
@@ -1091,6 +1093,12 @@ class Connection(object):
except socket.timeout as exc:
poll_timeout = timer.check_return(
_raise_timeout, exc, maximum=self._poll_timeout)
+ except self.connection.channel_errors as exc:
+ if exc.code == 406 and exc.method_name == 'Basic.ack':
+ # NOTE(gordc): occasionally multiple workers will grab
+ # same message and acknowledge it. if it happens, meh.
+ raise self.connection.recoverable_channel_errors[0]
+ raise
with self._connection_lock:
self.ensure(_consume,
@@ -1172,7 +1180,8 @@ class Connection(object):
def _get_connection_info(self):
info = self.connection.info()
client_port = None
- if self.channel and hasattr(self.channel.connection, 'sock'):
+ if (self.channel and hasattr(self.channel.connection, 'sock')
+ and self.channel.connection.sock):
client_port = self.channel.connection.sock.getsockname()[1]
info.update({'client_port': client_port,
'connection_id': self.connection_id})
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index f3ddef6..6ab452e 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -24,9 +24,7 @@ import kombu
import kombu.transport.memory
from oslo_config import cfg
from oslo_serialization import jsonutils
-from oslo_utils import versionutils
from oslotest import mockpatch
-import pkg_resources
import testscenarios
import oslo_messaging
@@ -106,7 +104,7 @@ class TestHeartbeat(test_utils.BaseTestCase):
def test_test_heartbeat_sent_connection_fail(self):
self._do_test_heartbeat_sent(
- heartbeat_side_effect=kombu.exceptions.ConnectionError,
+ heartbeat_side_effect=kombu.exceptions.OperationalError,
info='A recoverable connection/channel error occurred, '
'trying to reconnect: %s')
@@ -219,23 +217,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
conn._publish(exchange_mock, 'msg', routing_key='routing_key',
timeout=1)
- # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
- # version 3.0.25, so do conversion according to kombu version.
- # TODO(gcb) remove this workaround when all supported branches
- # with requirement kombu >=3.0.25
- kombu_version = pkg_resources.get_distribution('kombu').version
- if versionutils.is_compatible('3.0.25', kombu_version):
- fake_publish.assert_called_with(
- 'msg', expiration=1,
- exchange=exchange_mock,
- compression=self.conf.oslo_messaging_rabbit.kombu_compression,
- routing_key='routing_key')
- else:
- fake_publish.assert_called_with(
- 'msg', expiration=1000,
- exchange=exchange_mock,
- compression=self.conf.oslo_messaging_rabbit.kombu_compression,
- routing_key='routing_key')
+ fake_publish.assert_called_with(
+ 'msg', expiration=1,
+ exchange=exchange_mock,
+ compression=self.conf.oslo_messaging_rabbit.kombu_compression,
+ routing_key='routing_key')
@mock.patch('kombu.messaging.Producer.publish')
def test_send_no_timeout(self, fake_publish):
@@ -279,7 +265,8 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
with mock.patch('kombu.transport.virtual.Channel.close'):
# Ensure the exchange does not exists
- self.assertRaises(exc, try_send, e_passive)
+ self.assertRaises(oslo_messaging.MessageDeliveryFailure,
+ try_send, e_passive)
# Create it
try_send(e_active)
# Ensure it creates it
@@ -287,12 +274,14 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
with mock.patch('kombu.messaging.Producer.publish',
side_effect=exc):
- # Ensure the exchange is already in cache
- self.assertIn('foobar', conn._declared_exchanges)
- # Reset connection
- self.assertRaises(exc, try_send, e_passive)
- # Ensure the cache is empty
- self.assertEqual(0, len(conn._declared_exchanges))
+ with mock.patch('kombu.transport.virtual.Channel.close'):
+ # Ensure the exchange is already in cache
+ self.assertIn('foobar', conn._declared_exchanges)
+ # Reset connection
+ self.assertRaises(oslo_messaging.MessageDeliveryFailure,
+ try_send, e_passive)
+ # Ensure the cache is empty
+ self.assertEqual(0, len(conn._declared_exchanges))
try_send(e_active)
self.assertIn('foobar', conn._declared_exchanges)
@@ -336,7 +325,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
conn.connection.connection.recoverable_connection_errors = ()
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
- conn.connection.connection.transport.drain_events = mock.Mock()
+ conn.connection.connection.drain_events = mock.Mock()
# Ensure that a queue will be re-declared if the consume method
# of kombu.Queue raise amqp.NotFound
conn.consume()
@@ -360,7 +349,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
IOError,)
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
- conn.connection.connection.transport.drain_events = mock.Mock()
+ conn.connection.connection.drain_events = mock.Mock()
# Ensure that a queue will be re-declared after
# 'queue not found' exception despite on connection error.
conn.consume()
@@ -963,10 +952,6 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
- self.kombu_connect = mock.Mock()
- self.useFixture(mockpatch.Patch(
- 'kombu.connection.Connection.connect',
- side_effect=self.kombu_connect))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connection'))
self.useFixture(mockpatch.Patch(
@@ -976,6 +961,10 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
url = oslo_messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url,
driver_common.PURPOSE_SEND)
+ self.kombu_connect = mock.Mock()
+ self.useFixture(mockpatch.Patch(
+ 'kombu.connection.Connection.connect',
+ side_effect=self.kombu_connect))
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):
diff --git a/requirements.txt b/requirements.txt
index 7ef1b53..b1a992e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -27,8 +27,9 @@ PyYAML>=3.10.0 # MIT
# rabbit driver is the default
# we set the amqp version to ensure heartbeat works
-amqp<2.0,>=1.4.0 # LGPL
-kombu<4.0.0,>=3.0.25 # BSD
+# FIXME(gordc): bump to amqp2 and kombu4 once requirements updated
+amqp>=1.4.0 # LGPL
+kombu>=3.0.25 # BSD
pika>=0.10.0 # BSD
pika-pool>=0.1.3 # BSD
diff --git a/tools/tox_install.sh b/tools/tox_install.sh
index 97a198d..48ccf96 100755
--- a/tools/tox_install.sh
+++ b/tools/tox_install.sh
@@ -27,5 +27,8 @@ pip install -c$localfile openstack-requirements
edit-constraints $localfile -- $CLIENT_NAME
pip install -c$localfile -U $*
+# NOTE(gordc): temporary override since kombu capped at <4.0.0
+pip install -U 'amqp>=2.0.0'
+pip install -U 'kombu>=4.0.0'
exit $?
diff --git a/tox.ini b/tox.ini
index beab5fa..5b0365d 100644
--- a/tox.ini
+++ b/tox.ini
@@ -32,12 +32,16 @@ commands = python setup.py build_sphinx
setenv =
{[testenv]setenv}
TRANSPORT_DRIVER=rabbit
+ amqp>=2.0.0
+ kombu>=4.0.0
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py35-func-rabbit]
setenv =
{[testenv]setenv}
TRANSPORT_DRIVER=rabbit
+ amqp>=2.0.0
+ kombu>=4.0.0
basepython = python3.5
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'