diff options
author | Jie Li <jie.li@easystack.cn> | 2015-03-04 15:42:13 +0800 |
---|---|---|
committer | Jie Li <jie.li@easystack.cn> | 2015-03-04 15:42:13 +0800 |
commit | 858353394dd15101ebcb1c4b39e672bc4aa2bd86 (patch) | |
tree | 316251e8c6d2b228780e6579b783cccd1a183289 | |
parent | 569046e4265c47cca9436da9586b246a21f087e5 (diff) | |
download | oslo-messaging-858353394dd15101ebcb1c4b39e672bc4aa2bd86.tar.gz |
Fix _poll_connection not timeout issue (1/2)
_poll_connection could fall into a loop waiting for a reply message, if
rabbit dies and up. This commit will set up a rpc_response_timeout timer
for one connection polling; so the rpc will finally jump out with a
timeout exception which is expected in such scenario.
Related bug: #1338732
The title of the commit in master was:
rabbit: more precise iterconsume timeout
but this was changed since it didn't describe the actual change.
This commit resolved some conflicts due to cherry-pick.
Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502
(cherry picked from commit 023b7f44e2ccd77a7e9ee9ee78431a4646c88f13)
-rw-r--r-- | oslo/messaging/_drivers/amqpdriver.py | 24 | ||||
-rw-r--r-- | oslo/messaging/_drivers/common.py | 25 | ||||
-rw-r--r-- | oslo/messaging/_drivers/impl_rabbit.py | 27 |
3 files changed, 60 insertions, 16 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index f9f1b06..2760a8a 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -26,6 +26,7 @@ from oslo import messaging from oslo.messaging._drivers import amqp as rpc_amqp from oslo.messaging._drivers import base from oslo.messaging._drivers import common as rpc_common +from oslo.messaging.openstack.common.gettextutils import _ LOG = logging.getLogger(__name__) @@ -202,6 +203,11 @@ class ReplyWaiter(object): def unlisten(self, msg_id): self.waiters.remove(msg_id) + @staticmethod + def _raise_timeout_exception(msg_id): + raise messaging.MessagingTimeout( + _('Timed out waiting for a reply to message ID %s.') % msg_id) + def _process_reply(self, data): result = None ending = False @@ -216,7 +222,7 @@ class ReplyWaiter(object): result = data['result'] return result, ending - def _poll_connection(self, msg_id, timeout): + def _poll_connection(self, msg_id, timer): while True: while self.incoming: message_data = self.incoming.pop(0) @@ -227,15 +233,15 @@ class ReplyWaiter(object): self.waiters.put(incoming_msg_id, message_data) + timeout = timer.check_return(self._raise_timeout_exception, msg_id) try: self.conn.consume(limit=1, timeout=timeout) except rpc_common.Timeout: - raise messaging.MessagingTimeout('Timed out waiting for a ' - 'reply to message ID %s' - % msg_id) + self._raise_timeout_exception(msg_id) - def _poll_queue(self, msg_id, timeout): - message = self.waiters.get(msg_id, timeout) + def _poll_queue(self, msg_id, timer): + timeout = timer.check_return(self._raise_timeout_exception, msg_id) + message = self.waiters.get(msg_id, timeout=timeout) if message is self.waiters.WAKE_UP: return None, None, True # lock was released @@ -264,6 +270,8 @@ class ReplyWaiter(object): # have the first thread take responsibility for passing replies not # intended for itself to the appropriate thread. # + timer = rpc_common.DecayingTimer(duration=timeout) + timer.start() final_reply = None while True: if self.conn_lock.acquire(False): @@ -282,7 +290,7 @@ class ReplyWaiter(object): # Now actually poll the connection while True: - reply, ending = self._poll_connection(msg_id, timeout) + reply, ending = self._poll_connection(msg_id, timer) if not ending: final_reply = reply else: @@ -295,7 +303,7 @@ class ReplyWaiter(object): self.waiters.wake_all(msg_id) else: # We're going to wait for the first thread to pass us our reply - reply, ending, trylock = self._poll_queue(msg_id, timeout) + reply, ending, trylock = self._poll_queue(msg_id, timer) if trylock: # The first thread got its reply, let's try and take over # the responsibility for polling diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py index 71ca02b..24fee5e 100644 --- a/oslo/messaging/_drivers/common.py +++ b/oslo/messaging/_drivers/common.py @@ -18,6 +18,7 @@ import copy import logging import sys +import time import traceback import six @@ -347,3 +348,27 @@ def deserialize_msg(msg): raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) return raw_msg + + +class DecayingTimer(object): + def __init__(self, duration=None): + self._duration = duration + self._ends_at = None + + def start(self): + if self._duration is not None: + self._ends_at = time.time() + max(0, self._duration) + + def check_return(self, timeout_callback, *args, **kwargs): + if self._duration is None: + return None + if self._ends_at is None: + raise RuntimeError(_("Can not check/return a timeout from a timer" + " that has not been started.")) + + maximum = kwargs.pop('maximum', None) + left = self._ends_at - time.time() + if left <= 0: + timeout_callback(*args, **kwargs) + + return left if maximum is None else min(left, maximum) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index c836eb9..d06b251 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -719,14 +719,18 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" + timer = rpc_common.DecayingTimer(duration=timeout) + timer.start() + + def _raise_timeout(exc): + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() + def _error_callback(exc): - if isinstance(exc, socket.timeout): - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() - else: - LOG.exception(_('Failed to consume message from queue: %s'), - exc) - self.do_consume = True + timer.check_return(_raise_timeout, exc) + LOG.exception(_('Failed to consume message from queue: %s'), + exc) + self.do_consume = True def _consume(): if self.do_consume: @@ -736,7 +740,14 @@ class Connection(object): queue.consume(nowait=True) queues_tail.consume(nowait=False) self.do_consume = False - return self.connection.drain_events(timeout=timeout) + + poll_timeout = 1 if timeout is None else min(timeout, 1) + while True: + try: + return self.connection.drain_events(timeout=poll_timeout) + except socket.timeout as exc: + poll_timeout = timer.check_return(_raise_timeout, exc, + maximum=1) for iteration in itertools.count(0): if limit and iteration >= limit: |