diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-08-26 13:16:06 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-08-26 13:16:06 +0000 |
commit | fd98ebadbb4f3fae7d31bb1d90598a259fac0794 (patch) | |
tree | 8dd50fa7d6246c9f8b6007ba5795bd5986cd13d6 /oslo/messaging/_drivers/amqpdriver.py | |
parent | 59299dc20243e52b2234b2beb11a23a318fb1012 (diff) | |
parent | aebe53f242f40df1b77c61ae60e2ba6a575a1329 (diff) | |
download | oslo-messaging-fd98ebadbb4f3fae7d31bb1d90598a259fac0794.tar.gz |
Merge "Fix race-condition in rabbit reply processing"
Diffstat (limited to 'oslo/messaging/_drivers/amqpdriver.py')
-rw-r--r-- | oslo/messaging/_drivers/amqpdriver.py | 36 |
1 files changed, 33 insertions, 3 deletions
diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index cfbd161..8c36162 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -93,6 +93,8 @@ class AMQPListener(base.Listener): class ReplyWaiters(object): + WAKE_UP = object() + def __init__(self): self._queues = {} self._wrn_threshhold = 10 @@ -104,6 +106,12 @@ class ReplyWaiters(object): raise messaging.MessagingTimeout('Timed out waiting for a reply ' 'to message ID %s' % msg_id) + def check(self, msg_id): + try: + return self._queues[msg_id].get(block=False) + except Queue.Empty: + return None + def put(self, msg_id, message_data): queue = self._queues.get(msg_id) if not queue: @@ -117,7 +125,7 @@ class ReplyWaiters(object): def wake_all(self, except_id): msg_ids = [i for i in self._queues.keys() if i != except_id] for msg_id in msg_ids: - self.put(msg_id, None) + self.put(msg_id, self.WAKE_UP) def add(self, msg_id, queue): self._queues[msg_id] = queue @@ -189,10 +197,20 @@ class ReplyWaiter(object): % msg_id) def _poll_queue(self, msg_id, timeout): + message = self.waiters.get(msg_id, timeout) + if message is self.waiters.WAKE_UP: + return None, None, True # lock was released + + reply, ending = self._process_reply(message) + return reply, ending, False + + def _check_queue(self, msg_id): while True: - message = self.waiters.get(msg_id, timeout) + message = self.waiters.check(msg_id) + if message is self.waiters.WAKE_UP: + continue if message is None: - return None, None, True # lock was released + return None, None, True # queue is empty reply, ending = self._process_reply(message) return reply, ending, False @@ -213,6 +231,18 @@ class ReplyWaiter(object): if self.conn_lock.acquire(False): # Ok, we're the thread responsible for polling the connection try: + # Check the queue to see if a previous lock-holding thread + # queued up a reply already + while True: + reply, ending, empty = self._check_queue(msg_id) + if empty: + break + if not ending: + final_reply = reply + else: + return final_reply + + # Now actually poll the connection while True: reply, ending = self._poll_connection(msg_id, timeout) if not ending: |