summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2016-03-17 15:25:38 +0100
committerJohn Eckersberg <jeckersb@redhat.com>2016-03-23 14:08:33 -0400
commit36030e0955d4cb556b7442d22e12eb73a6430a87 (patch)
tree470e123933e05bee9d84fa121317f7cfc724069d
parent18327c1cd5d80fec4cb9987d7a3f68d315850ff6 (diff)
downloadoslo-messaging-36030e0955d4cb556b7442d22e12eb73a6430a87.tar.gz
Always set all socket timeouts
When exchange/queue are declared the timeout is the default (None) when it should be the no more than heartbeat timeout. Also we have to set TCP_USER_TIMEOUT to ensure our timeouts are respected. socket.settimeout() of python relies on select() that can be blocked because kernel doesn't return during a recv() because of TCP connection breakage. This change fixes that by always setting TCP_USER_TIMEOUT and python socket timeout each time the connection is establish. Conflicts: oslo_messaging/_drivers/impl_rabbit.py oslo_messaging/tests/drivers/test_impl_rabbit.py Change-Id: Ibf6139ac2c22d9eeda7030fb87b7f1139d92332e (cherry picked from commit 98bc5fc2fadce089b6f47421929651606e5e9ab6)
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py57
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py4
2 files changed, 33 insertions, 28 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 488a2f3..639ca8e 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -46,6 +46,9 @@ from oslo_messaging._i18n import _LW
from oslo_messaging import _utils
from oslo_messaging import exceptions
+# NOTE(sileht): don't exists in py2 socket module
+TCP_USER_TIMEOUT = 18
+
rabbit_opts = [
cfg.StrOpt('kombu_ssl_version',
@@ -628,6 +631,7 @@ class Connection(object):
# the kombu underlying connection works
self._set_current_channel(None)
self.ensure(method=lambda: self.connection.connection)
+ self.set_transport_socket_timeout()
def ensure(self, method, retry=None,
recoverable_error_callback=None, error_callback=None,
@@ -695,6 +699,8 @@ class Connection(object):
"""Callback invoked when the kombu reconnects and creates
a new channel, we use it the reconfigure our consumers.
"""
+
+ self.set_transport_socket_timeout()
self._set_current_channel(new_channel)
for consumer in self._consumers:
consumer.declare(self)
@@ -813,8 +819,7 @@ class Connection(object):
self._heartbeat_support_log_emitted = True
return False
- @contextlib.contextmanager
- def _transport_socket_timeout(self, timeout):
+ def set_transport_socket_timeout(self, timeout=None):
# NOTE(sileht): they are some case where the heartbeat check
# or the producer.send return only when the system socket
# timeout if reach. kombu doesn't allow use to customise this
@@ -823,27 +828,37 @@ class Connection(object):
# kombu==3.0.33. Once the commit below is released, we should
# try to set the socket timeout in the constructor:
# https://github.com/celery/py-amqp/pull/64
+
+ heartbeat_timeout = self.heartbeat_timeout_threshold
+ if self._heartbeat_supported_and_enabled():
+ # NOTE(sileht): we are supposed to send heartbeat every
+ # heartbeat_timeout, no need to wait more otherwise will
+ # disconnect us, so raise timeout earlier ourself
+ if timeout is None:
+ timeout = heartbeat_timeout
+ else:
+ timeout = min(heartbeat_timeout, timeout)
+
try:
sock = self.channel.connection.sock
except AttributeError as e:
# Level is set to debug because otherwise we would spam the logs
LOG.debug('Failed to get socket attribute: %s' % str(e))
- sock = None
-
- if sock:
- orig_timeout = sock.gettimeout()
+ else:
sock.settimeout(timeout)
+ sock.setsockopt(socket.IPPROTO_TCP, TCP_USER_TIMEOUT,
+ timeout * 1000 if timeout is not None else 0)
+
+ @contextlib.contextmanager
+ def _transport_socket_timeout(self, timeout):
+ self.set_transport_socket_timeout(timeout)
yield
- if sock:
- sock.settimeout(orig_timeout)
+ self.set_transport_socket_timeout()
def _heartbeat_check(self):
# NOTE(sileht): we are suposed to send at least one heartbeat
# every heartbeat_timeout_threshold, so no need to way more
- with self._transport_socket_timeout(
- self.heartbeat_timeout_threshold):
- self.connection.heartbeat_check(
- rate=self.heartbeat_rate)
+ self.connection.heartbeat_check(rate=self.heartbeat_rate)
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
@@ -1061,25 +1076,15 @@ class Connection(object):
channel=self.channel,
routing_key=routing_key)
- # NOTE(sileht): no need to wait more, caller expects
- # a answer before timeout is reached
- transport_timeout = timeout
-
- heartbeat_timeout = self.heartbeat_timeout_threshold
- if (self._heartbeat_supported_and_enabled() and (
- transport_timeout is None or
- transport_timeout > heartbeat_timeout)):
- # NOTE(sileht): we are supposed to send heartbeat every
- # heartbeat_timeout, no need to wait more otherwise will
- # disconnect us, so raise timeout earlier ourself
- transport_timeout = heartbeat_timeout
-
log_info = {'msg': msg,
'who': exchange or 'default',
'key': routing_key}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
- with self._transport_socket_timeout(transport_timeout):
+
+ # NOTE(sileht): no need to wait more, caller expects
+ # a answer before timeout is reached
+ with self._transport_socket_timeout(timeout):
producer.publish(msg, expiration=self._get_expiration(timeout))
# List of notification queue declared on the channel to avoid
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 1416716..ad84d11 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -93,11 +93,11 @@ class TestHeartbeat(test_utils.BaseTestCase):
if not heartbeat_side_effect:
self.assertEqual(1, fake_ensure_connection.call_count)
- self.assertEqual(1, fake_logger.debug.call_count)
+ self.assertEqual(0, fake_logger.debug.call_count)
self.assertEqual(2, fake_logger.info.call_count)
else:
self.assertEqual(2, fake_ensure_connection.call_count)
- self.assertEqual(1, fake_logger.debug.call_count)
+ self.assertEqual(0, fake_logger.debug.call_count)
self.assertEqual(3, fake_logger.info.call_count)
self.assertIn(mock.call(info, mock.ANY),
fake_logger.info.mock_calls)