summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJie Li <jie.li@easystack.cn>2015-03-04 15:42:13 +0800
committerJie Li <jie.li@easystack.cn>2015-03-04 15:42:13 +0800
commit858353394dd15101ebcb1c4b39e672bc4aa2bd86 (patch)
tree316251e8c6d2b228780e6579b783cccd1a183289
parent569046e4265c47cca9436da9586b246a21f087e5 (diff)
downloadoslo-messaging-858353394dd15101ebcb1c4b39e672bc4aa2bd86.tar.gz
Fix _poll_connection not timeout issue (1/2)
_poll_connection could fall into a loop waiting for a reply message, if rabbit dies and up. This commit will set up a rpc_response_timeout timer for one connection polling; so the rpc will finally jump out with a timeout exception which is expected in such scenario. Related bug: #1338732 The title of the commit in master was: rabbit: more precise iterconsume timeout but this was changed since it didn't describe the actual change. This commit resolved some conflicts due to cherry-pick. Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502 (cherry picked from commit 023b7f44e2ccd77a7e9ee9ee78431a4646c88f13)
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py24
-rw-r--r--oslo/messaging/_drivers/common.py25
-rw-r--r--oslo/messaging/_drivers/impl_rabbit.py27
3 files changed, 60 insertions, 16 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index f9f1b06..2760a8a 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -26,6 +26,7 @@ from oslo import messaging
from oslo.messaging._drivers import amqp as rpc_amqp
from oslo.messaging._drivers import base
from oslo.messaging._drivers import common as rpc_common
+from oslo.messaging.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__)
@@ -202,6 +203,11 @@ class ReplyWaiter(object):
def unlisten(self, msg_id):
self.waiters.remove(msg_id)
+ @staticmethod
+ def _raise_timeout_exception(msg_id):
+ raise messaging.MessagingTimeout(
+ _('Timed out waiting for a reply to message ID %s.') % msg_id)
+
def _process_reply(self, data):
result = None
ending = False
@@ -216,7 +222,7 @@ class ReplyWaiter(object):
result = data['result']
return result, ending
- def _poll_connection(self, msg_id, timeout):
+ def _poll_connection(self, msg_id, timer):
while True:
while self.incoming:
message_data = self.incoming.pop(0)
@@ -227,15 +233,15 @@ class ReplyWaiter(object):
self.waiters.put(incoming_msg_id, message_data)
+ timeout = timer.check_return(self._raise_timeout_exception, msg_id)
try:
self.conn.consume(limit=1, timeout=timeout)
except rpc_common.Timeout:
- raise messaging.MessagingTimeout('Timed out waiting for a '
- 'reply to message ID %s'
- % msg_id)
+ self._raise_timeout_exception(msg_id)
- def _poll_queue(self, msg_id, timeout):
- message = self.waiters.get(msg_id, timeout)
+ def _poll_queue(self, msg_id, timer):
+ timeout = timer.check_return(self._raise_timeout_exception, msg_id)
+ message = self.waiters.get(msg_id, timeout=timeout)
if message is self.waiters.WAKE_UP:
return None, None, True # lock was released
@@ -264,6 +270,8 @@ class ReplyWaiter(object):
# have the first thread take responsibility for passing replies not
# intended for itself to the appropriate thread.
#
+ timer = rpc_common.DecayingTimer(duration=timeout)
+ timer.start()
final_reply = None
while True:
if self.conn_lock.acquire(False):
@@ -282,7 +290,7 @@ class ReplyWaiter(object):
# Now actually poll the connection
while True:
- reply, ending = self._poll_connection(msg_id, timeout)
+ reply, ending = self._poll_connection(msg_id, timer)
if not ending:
final_reply = reply
else:
@@ -295,7 +303,7 @@ class ReplyWaiter(object):
self.waiters.wake_all(msg_id)
else:
# We're going to wait for the first thread to pass us our reply
- reply, ending, trylock = self._poll_queue(msg_id, timeout)
+ reply, ending, trylock = self._poll_queue(msg_id, timer)
if trylock:
# The first thread got its reply, let's try and take over
# the responsibility for polling
diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py
index 71ca02b..24fee5e 100644
--- a/oslo/messaging/_drivers/common.py
+++ b/oslo/messaging/_drivers/common.py
@@ -18,6 +18,7 @@
import copy
import logging
import sys
+import time
import traceback
import six
@@ -347,3 +348,27 @@ def deserialize_msg(msg):
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg
+
+
+class DecayingTimer(object):
+ def __init__(self, duration=None):
+ self._duration = duration
+ self._ends_at = None
+
+ def start(self):
+ if self._duration is not None:
+ self._ends_at = time.time() + max(0, self._duration)
+
+ def check_return(self, timeout_callback, *args, **kwargs):
+ if self._duration is None:
+ return None
+ if self._ends_at is None:
+ raise RuntimeError(_("Can not check/return a timeout from a timer"
+ " that has not been started."))
+
+ maximum = kwargs.pop('maximum', None)
+ left = self._ends_at - time.time()
+ if left <= 0:
+ timeout_callback(*args, **kwargs)
+
+ return left if maximum is None else min(left, maximum)
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index c836eb9..d06b251 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -719,14 +719,18 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers."""
+ timer = rpc_common.DecayingTimer(duration=timeout)
+ timer.start()
+
+ def _raise_timeout(exc):
+ LOG.debug('Timed out waiting for RPC response: %s', exc)
+ raise rpc_common.Timeout()
+
def _error_callback(exc):
- if isinstance(exc, socket.timeout):
- LOG.debug('Timed out waiting for RPC response: %s', exc)
- raise rpc_common.Timeout()
- else:
- LOG.exception(_('Failed to consume message from queue: %s'),
- exc)
- self.do_consume = True
+ timer.check_return(_raise_timeout, exc)
+ LOG.exception(_('Failed to consume message from queue: %s'),
+ exc)
+ self.do_consume = True
def _consume():
if self.do_consume:
@@ -736,7 +740,14 @@ class Connection(object):
queue.consume(nowait=True)
queues_tail.consume(nowait=False)
self.do_consume = False
- return self.connection.drain_events(timeout=timeout)
+
+ poll_timeout = 1 if timeout is None else min(timeout, 1)
+ while True:
+ try:
+ return self.connection.drain_events(timeout=poll_timeout)
+ except socket.timeout as exc:
+ poll_timeout = timer.check_return(_raise_timeout, exc,
+ maximum=1)
for iteration in itertools.count(0):
if limit and iteration >= limit: