diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2016-03-17 15:25:38 +0100 |
---|---|---|
committer | John Eckersberg <jeckersb@redhat.com> | 2016-03-23 14:08:33 -0400 |
commit | 36030e0955d4cb556b7442d22e12eb73a6430a87 (patch) | |
tree | 470e123933e05bee9d84fa121317f7cfc724069d | |
parent | 18327c1cd5d80fec4cb9987d7a3f68d315850ff6 (diff) | |
download | oslo-messaging-36030e0955d4cb556b7442d22e12eb73a6430a87.tar.gz |
Always set all socket timeouts
When exchange/queue are declared the timeout is the default (None)
when it should be the no more than heartbeat timeout.
Also we have to set TCP_USER_TIMEOUT to ensure our timeouts are
respected. socket.settimeout() of python relies on select() that can
be blocked because kernel doesn't return during a recv() because of
TCP connection breakage.
This change fixes that by always setting TCP_USER_TIMEOUT and python
socket timeout each time the connection is establish.
Conflicts:
oslo_messaging/_drivers/impl_rabbit.py
oslo_messaging/tests/drivers/test_impl_rabbit.py
Change-Id: Ibf6139ac2c22d9eeda7030fb87b7f1139d92332e
(cherry picked from commit 98bc5fc2fadce089b6f47421929651606e5e9ab6)
-rw-r--r-- | oslo_messaging/_drivers/impl_rabbit.py | 57 | ||||
-rw-r--r-- | oslo_messaging/tests/drivers/test_impl_rabbit.py | 4 |
2 files changed, 33 insertions, 28 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 488a2f3..639ca8e 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -46,6 +46,9 @@ from oslo_messaging._i18n import _LW from oslo_messaging import _utils from oslo_messaging import exceptions +# NOTE(sileht): don't exists in py2 socket module +TCP_USER_TIMEOUT = 18 + rabbit_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -628,6 +631,7 @@ class Connection(object): # the kombu underlying connection works self._set_current_channel(None) self.ensure(method=lambda: self.connection.connection) + self.set_transport_socket_timeout() def ensure(self, method, retry=None, recoverable_error_callback=None, error_callback=None, @@ -695,6 +699,8 @@ class Connection(object): """Callback invoked when the kombu reconnects and creates a new channel, we use it the reconfigure our consumers. """ + + self.set_transport_socket_timeout() self._set_current_channel(new_channel) for consumer in self._consumers: consumer.declare(self) @@ -813,8 +819,7 @@ class Connection(object): self._heartbeat_support_log_emitted = True return False - @contextlib.contextmanager - def _transport_socket_timeout(self, timeout): + def set_transport_socket_timeout(self, timeout=None): # NOTE(sileht): they are some case where the heartbeat check # or the producer.send return only when the system socket # timeout if reach. kombu doesn't allow use to customise this @@ -823,27 +828,37 @@ class Connection(object): # kombu==3.0.33. Once the commit below is released, we should # try to set the socket timeout in the constructor: # https://github.com/celery/py-amqp/pull/64 + + heartbeat_timeout = self.heartbeat_timeout_threshold + if self._heartbeat_supported_and_enabled(): + # NOTE(sileht): we are supposed to send heartbeat every + # heartbeat_timeout, no need to wait more otherwise will + # disconnect us, so raise timeout earlier ourself + if timeout is None: + timeout = heartbeat_timeout + else: + timeout = min(heartbeat_timeout, timeout) + try: sock = self.channel.connection.sock except AttributeError as e: # Level is set to debug because otherwise we would spam the logs LOG.debug('Failed to get socket attribute: %s' % str(e)) - sock = None - - if sock: - orig_timeout = sock.gettimeout() + else: sock.settimeout(timeout) + sock.setsockopt(socket.IPPROTO_TCP, TCP_USER_TIMEOUT, + timeout * 1000 if timeout is not None else 0) + + @contextlib.contextmanager + def _transport_socket_timeout(self, timeout): + self.set_transport_socket_timeout(timeout) yield - if sock: - sock.settimeout(orig_timeout) + self.set_transport_socket_timeout() def _heartbeat_check(self): # NOTE(sileht): we are suposed to send at least one heartbeat # every heartbeat_timeout_threshold, so no need to way more - with self._transport_socket_timeout( - self.heartbeat_timeout_threshold): - self.connection.heartbeat_check( - rate=self.heartbeat_rate) + self.connection.heartbeat_check(rate=self.heartbeat_rate) def _heartbeat_start(self): if self._heartbeat_supported_and_enabled(): @@ -1061,25 +1076,15 @@ class Connection(object): channel=self.channel, routing_key=routing_key) - # NOTE(sileht): no need to wait more, caller expects - # a answer before timeout is reached - transport_timeout = timeout - - heartbeat_timeout = self.heartbeat_timeout_threshold - if (self._heartbeat_supported_and_enabled() and ( - transport_timeout is None or - transport_timeout > heartbeat_timeout)): - # NOTE(sileht): we are supposed to send heartbeat every - # heartbeat_timeout, no need to wait more otherwise will - # disconnect us, so raise timeout earlier ourself - transport_timeout = heartbeat_timeout - log_info = {'msg': msg, 'who': exchange or 'default', 'key': routing_key} LOG.trace('Connection._publish: sending message %(msg)s to' ' %(who)s with routing key %(key)s', log_info) - with self._transport_socket_timeout(transport_timeout): + + # NOTE(sileht): no need to wait more, caller expects + # a answer before timeout is reached + with self._transport_socket_timeout(timeout): producer.publish(msg, expiration=self._get_expiration(timeout)) # List of notification queue declared on the channel to avoid diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 1416716..ad84d11 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -93,11 +93,11 @@ class TestHeartbeat(test_utils.BaseTestCase): if not heartbeat_side_effect: self.assertEqual(1, fake_ensure_connection.call_count) - self.assertEqual(1, fake_logger.debug.call_count) + self.assertEqual(0, fake_logger.debug.call_count) self.assertEqual(2, fake_logger.info.call_count) else: self.assertEqual(2, fake_ensure_connection.call_count) - self.assertEqual(1, fake_logger.debug.call_count) + self.assertEqual(0, fake_logger.debug.call_count) self.assertEqual(3, fake_logger.info.call_count) self.assertIn(mock.call(info, mock.ANY), fake_logger.info.mock_calls) |