diff options
author | Matt Riedemann <mriedem.os@gmail.com> | 2017-06-02 13:19:28 +0000 |
---|---|---|
committer | Matt Riedemann <mriedem.os@gmail.com> | 2017-06-02 13:19:28 +0000 |
commit | a4203b79d77ceee3d884fbb376b1e32dfc54530f (patch) | |
tree | 8fd2a0e7c8c62bfa02ea487735eb26566e6b2198 | |
parent | b3316669263ad5f76e03bb7b54f1704f64c8c17f (diff) | |
download | oslo-messaging-a4203b79d77ceee3d884fbb376b1e32dfc54530f.tar.gz |
Revert "rabbit: restore synchronous ack/requeue"
This reverts commit b3316669263ad5f76e03bb7b54f1704f64c8c17f.
It was reported on master (pike) that this change broke
the default "blocking" executor so we should revert it on
stable/ocata.
Change-Id: Ia4bd74aa3df059e00b209d66afa8e327b76fe6ca
Related-Bug: #1694728
-rw-r--r-- | oslo_messaging/_drivers/amqpdriver.py | 105 |
1 files changed, 23 insertions, 82 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index c5613b0..46e91c9 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -42,58 +42,20 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001 ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0 -class MessageOperationsHandler(object): - """Queue used by message operations to ensure that all tasks are - serialized and run in the same thread, since underlying drivers like kombu - are not thread safe. - """ - def __init__(self, name): - self.name = "%s (%s)" % (name, hex(id(self))) - self._tasks = moves.queue.Queue() - - self._shutdown = threading.Event() - self._shutdown_thread = threading.Thread( - target=self._process_in_background) - self._shutdown_thread.daemon = True - - def stop(self): - self._shutdown.set() - - def process_in_background(self): - """Run all pending tasks queued by do() in an thread during the - shutdown process. - """ - self._shutdown_thread.start() - - def _process_in_background(self): - while not self._shutdown.is_set(): - self.process() - time.sleep(ACK_REQUEUE_EVERY_SECONDS_MIN) - - def process(self): - "Run all pending tasks queued by do()." - - while True: - try: - task, event = self._tasks.get(block=False) - except moves.queue.Empty: - break - try: - task() - finally: - event.set() - - def do(self, task): - "Put the task in the queue and waits until the task is completed." - event = threading.Event() - self._tasks.put((task, event)) - event.wait() +def do_pending_tasks(tasks): + while True: + try: + task = tasks.get(block=False) + except moves.queue.Empty: + break + else: + task() class AMQPIncomingMessage(base.RpcIncomingMessage): def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q, - obsolete_reply_queues, message_operations_handler): + obsolete_reply_queues, pending_message_actions): super(AMQPIncomingMessage, self).__init__(ctxt, message) self.listener = listener @@ -101,7 +63,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): self.msg_id = msg_id self.reply_q = reply_q self._obsolete_reply_queues = obsolete_reply_queues - self._message_operations_handler = message_operations_handler + self._pending_tasks = pending_message_actions self.stopwatch = timeutils.StopWatch() self.stopwatch.start() @@ -171,7 +133,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): return def acknowledge(self): - self._message_operations_handler.do(self.message.acknowledge) + self._pending_tasks.put(self.message.acknowledge) self.listener.msg_id_cache.add(self.unique_id) def requeue(self): @@ -181,7 +143,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): # msg_id_cache, the message will be reconsumed, the only difference is # the message stay at the beginning of the queue instead of moving to # the end. - self._message_operations_handler.do(self.message.requeue) + self._pending_tasks.put(self.message.requeue) class ObsoleteReplyQueuesCache(object): @@ -237,11 +199,9 @@ class AMQPListener(base.PollStyleListener): self.conn = conn self.msg_id_cache = rpc_amqp._MsgIdCache() self.incoming = [] - self._shutdown = threading.Event() - self._shutoff = threading.Event() + self._stopped = threading.Event() self._obsolete_reply_queues = ObsoleteReplyQueuesCache() - self._message_operations_handler = MessageOperationsHandler( - "AMQPListener") + self._pending_tasks = moves.queue.Queue() self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN def __call__(self, message): @@ -262,14 +222,14 @@ class AMQPListener(base.PollStyleListener): ctxt.msg_id, ctxt.reply_q, self._obsolete_reply_queues, - self._message_operations_handler)) + self._pending_tasks)) @base.batch_poll_helper def poll(self, timeout=None): stopwatch = timeutils.StopWatch(duration=timeout).start() - while not self._shutdown.is_set(): - self._message_operations_handler.process() + while not self._stopped.is_set(): + do_pending_tasks(self._pending_tasks) if self.incoming: return self.incoming.pop(0) @@ -288,30 +248,12 @@ class AMQPListener(base.PollStyleListener): else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN - # NOTE(sileht): listener is stopped, just processes remaining messages - # and operations - self._message_operations_handler.process() - if self.incoming: - return self.incoming.pop(0) - - self._shutoff.set() - def stop(self): - self._shutdown.set() + self._stopped.set() self.conn.stop_consuming() - self._shutoff.wait() - - # NOTE(sileht): Here, the listener is stopped, but some incoming - # messages may still live on server side, because callback is still - # running and message is not yet ack/requeue. It's safe to do the ack - # into another thread, side the polling thread is now terminated. - self._message_operations_handler.process_in_background() + do_pending_tasks(self._pending_tasks) def cleanup(self): - # NOTE(sileht): server executor is now stopped, we are sure that no - # more incoming messages in live, we can acknowledge - # remaining messages and stop the thread - self._message_operations_handler.stop() # Closes listener connection self.conn.close() @@ -364,6 +306,7 @@ class ReplyWaiter(object): self.allowed_remote_exmods = allowed_remote_exmods self.msg_id_cache = rpc_amqp._MsgIdCache() self.waiters = ReplyWaiters() + self._pending_tasks = moves.queue.Queue() self.conn.declare_direct_consumer(reply_q, self) @@ -378,10 +321,12 @@ class ReplyWaiter(object): self.conn.stop_consuming() self._thread.join() self._thread = None + do_pending_tasks(self._pending_tasks) def poll(self): current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN while not self._thread_exit_event.is_set(): + do_pending_tasks(self._pending_tasks) try: # ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds self.conn.consume(timeout=current_timeout) @@ -395,11 +340,7 @@ class ReplyWaiter(object): current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN def __call__(self, message): - # NOTE(sileht): __call__ is running within the polling thread, - # (conn.consume -> conn.conn.drain_events() -> __call__ callback) - # it's threadsafe to acknowledge the message here, no need to wait - # the next polling - message.acknowledge() + self._pending_tasks.put(message.acknowledge) incoming_msg_id = message.pop('_msg_id', None) if message.get('ending'): LOG.debug("received reply msg_id: %s", incoming_msg_id) |