summaryrefslogtreecommitdiff
path: root/oslo/messaging/_drivers/amqpdriver.py
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-08-26 13:16:06 +0000
committerGerrit Code Review <review@openstack.org>2013-08-26 13:16:06 +0000
commitfd98ebadbb4f3fae7d31bb1d90598a259fac0794 (patch)
tree8dd50fa7d6246c9f8b6007ba5795bd5986cd13d6 /oslo/messaging/_drivers/amqpdriver.py
parent59299dc20243e52b2234b2beb11a23a318fb1012 (diff)
parentaebe53f242f40df1b77c61ae60e2ba6a575a1329 (diff)
downloadoslo-messaging-fd98ebadbb4f3fae7d31bb1d90598a259fac0794.tar.gz
Merge "Fix race-condition in rabbit reply processing"
Diffstat (limited to 'oslo/messaging/_drivers/amqpdriver.py')
-rw-r--r--oslo/messaging/_drivers/amqpdriver.py36
1 files changed, 33 insertions, 3 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py
index cfbd161..8c36162 100644
--- a/oslo/messaging/_drivers/amqpdriver.py
+++ b/oslo/messaging/_drivers/amqpdriver.py
@@ -93,6 +93,8 @@ class AMQPListener(base.Listener):
class ReplyWaiters(object):
+ WAKE_UP = object()
+
def __init__(self):
self._queues = {}
self._wrn_threshhold = 10
@@ -104,6 +106,12 @@ class ReplyWaiters(object):
raise messaging.MessagingTimeout('Timed out waiting for a reply '
'to message ID %s' % msg_id)
+ def check(self, msg_id):
+ try:
+ return self._queues[msg_id].get(block=False)
+ except Queue.Empty:
+ return None
+
def put(self, msg_id, message_data):
queue = self._queues.get(msg_id)
if not queue:
@@ -117,7 +125,7 @@ class ReplyWaiters(object):
def wake_all(self, except_id):
msg_ids = [i for i in self._queues.keys() if i != except_id]
for msg_id in msg_ids:
- self.put(msg_id, None)
+ self.put(msg_id, self.WAKE_UP)
def add(self, msg_id, queue):
self._queues[msg_id] = queue
@@ -189,10 +197,20 @@ class ReplyWaiter(object):
% msg_id)
def _poll_queue(self, msg_id, timeout):
+ message = self.waiters.get(msg_id, timeout)
+ if message is self.waiters.WAKE_UP:
+ return None, None, True # lock was released
+
+ reply, ending = self._process_reply(message)
+ return reply, ending, False
+
+ def _check_queue(self, msg_id):
while True:
- message = self.waiters.get(msg_id, timeout)
+ message = self.waiters.check(msg_id)
+ if message is self.waiters.WAKE_UP:
+ continue
if message is None:
- return None, None, True # lock was released
+ return None, None, True # queue is empty
reply, ending = self._process_reply(message)
return reply, ending, False
@@ -213,6 +231,18 @@ class ReplyWaiter(object):
if self.conn_lock.acquire(False):
# Ok, we're the thread responsible for polling the connection
try:
+ # Check the queue to see if a previous lock-holding thread
+ # queued up a reply already
+ while True:
+ reply, ending, empty = self._check_queue(msg_id)
+ if empty:
+ break
+ if not ending:
+ final_reply = reply
+ else:
+ return final_reply
+
+ # Now actually poll the connection
while True:
reply, ending = self._poll_connection(msg_id, timeout)
if not ending: