summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Riedemann <mriedem.os@gmail.com>2017-06-02 13:19:28 +0000
committerMatt Riedemann <mriedem.os@gmail.com>2017-06-02 13:19:28 +0000
commita4203b79d77ceee3d884fbb376b1e32dfc54530f (patch)
tree8fd2a0e7c8c62bfa02ea487735eb26566e6b2198
parentb3316669263ad5f76e03bb7b54f1704f64c8c17f (diff)
downloadoslo-messaging-a4203b79d77ceee3d884fbb376b1e32dfc54530f.tar.gz
Revert "rabbit: restore synchronous ack/requeue"
This reverts commit b3316669263ad5f76e03bb7b54f1704f64c8c17f. It was reported on master (pike) that this change broke the default "blocking" executor so we should revert it on stable/ocata. Change-Id: Ia4bd74aa3df059e00b209d66afa8e327b76fe6ca Related-Bug: #1694728
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py105
1 files changed, 23 insertions, 82 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index c5613b0..46e91c9 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -42,58 +42,20 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0
-class MessageOperationsHandler(object):
- """Queue used by message operations to ensure that all tasks are
- serialized and run in the same thread, since underlying drivers like kombu
- are not thread safe.
- """
- def __init__(self, name):
- self.name = "%s (%s)" % (name, hex(id(self)))
- self._tasks = moves.queue.Queue()
-
- self._shutdown = threading.Event()
- self._shutdown_thread = threading.Thread(
- target=self._process_in_background)
- self._shutdown_thread.daemon = True
-
- def stop(self):
- self._shutdown.set()
-
- def process_in_background(self):
- """Run all pending tasks queued by do() in an thread during the
- shutdown process.
- """
- self._shutdown_thread.start()
-
- def _process_in_background(self):
- while not self._shutdown.is_set():
- self.process()
- time.sleep(ACK_REQUEUE_EVERY_SECONDS_MIN)
-
- def process(self):
- "Run all pending tasks queued by do()."
-
- while True:
- try:
- task, event = self._tasks.get(block=False)
- except moves.queue.Empty:
- break
- try:
- task()
- finally:
- event.set()
-
- def do(self, task):
- "Put the task in the queue and waits until the task is completed."
- event = threading.Event()
- self._tasks.put((task, event))
- event.wait()
+def do_pending_tasks(tasks):
+ while True:
+ try:
+ task = tasks.get(block=False)
+ except moves.queue.Empty:
+ break
+ else:
+ task()
class AMQPIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
- obsolete_reply_queues, message_operations_handler):
+ obsolete_reply_queues, pending_message_actions):
super(AMQPIncomingMessage, self).__init__(ctxt, message)
self.listener = listener
@@ -101,7 +63,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
self.msg_id = msg_id
self.reply_q = reply_q
self._obsolete_reply_queues = obsolete_reply_queues
- self._message_operations_handler = message_operations_handler
+ self._pending_tasks = pending_message_actions
self.stopwatch = timeutils.StopWatch()
self.stopwatch.start()
@@ -171,7 +133,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
return
def acknowledge(self):
- self._message_operations_handler.do(self.message.acknowledge)
+ self._pending_tasks.put(self.message.acknowledge)
self.listener.msg_id_cache.add(self.unique_id)
def requeue(self):
@@ -181,7 +143,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
- self._message_operations_handler.do(self.message.requeue)
+ self._pending_tasks.put(self.message.requeue)
class ObsoleteReplyQueuesCache(object):
@@ -237,11 +199,9 @@ class AMQPListener(base.PollStyleListener):
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
- self._shutdown = threading.Event()
- self._shutoff = threading.Event()
+ self._stopped = threading.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
- self._message_operations_handler = MessageOperationsHandler(
- "AMQPListener")
+ self._pending_tasks = moves.queue.Queue()
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
def __call__(self, message):
@@ -262,14 +222,14 @@ class AMQPListener(base.PollStyleListener):
ctxt.msg_id,
ctxt.reply_q,
self._obsolete_reply_queues,
- self._message_operations_handler))
+ self._pending_tasks))
@base.batch_poll_helper
def poll(self, timeout=None):
stopwatch = timeutils.StopWatch(duration=timeout).start()
- while not self._shutdown.is_set():
- self._message_operations_handler.process()
+ while not self._stopped.is_set():
+ do_pending_tasks(self._pending_tasks)
if self.incoming:
return self.incoming.pop(0)
@@ -288,30 +248,12 @@ class AMQPListener(base.PollStyleListener):
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
- # NOTE(sileht): listener is stopped, just processes remaining messages
- # and operations
- self._message_operations_handler.process()
- if self.incoming:
- return self.incoming.pop(0)
-
- self._shutoff.set()
-
def stop(self):
- self._shutdown.set()
+ self._stopped.set()
self.conn.stop_consuming()
- self._shutoff.wait()
-
- # NOTE(sileht): Here, the listener is stopped, but some incoming
- # messages may still live on server side, because callback is still
- # running and message is not yet ack/requeue. It's safe to do the ack
- # into another thread, side the polling thread is now terminated.
- self._message_operations_handler.process_in_background()
+ do_pending_tasks(self._pending_tasks)
def cleanup(self):
- # NOTE(sileht): server executor is now stopped, we are sure that no
- # more incoming messages in live, we can acknowledge
- # remaining messages and stop the thread
- self._message_operations_handler.stop()
# Closes listener connection
self.conn.close()
@@ -364,6 +306,7 @@ class ReplyWaiter(object):
self.allowed_remote_exmods = allowed_remote_exmods
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.waiters = ReplyWaiters()
+ self._pending_tasks = moves.queue.Queue()
self.conn.declare_direct_consumer(reply_q, self)
@@ -378,10 +321,12 @@ class ReplyWaiter(object):
self.conn.stop_consuming()
self._thread.join()
self._thread = None
+ do_pending_tasks(self._pending_tasks)
def poll(self):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
while not self._thread_exit_event.is_set():
+ do_pending_tasks(self._pending_tasks)
try:
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
@@ -395,11 +340,7 @@ class ReplyWaiter(object):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
def __call__(self, message):
- # NOTE(sileht): __call__ is running within the polling thread,
- # (conn.consume -> conn.conn.drain_events() -> __call__ callback)
- # it's threadsafe to acknowledge the message here, no need to wait
- # the next polling
- message.acknowledge()
+ self._pending_tasks.put(message.acknowledge)
incoming_msg_id = message.pop('_msg_id', None)
if message.get('ending'):
LOG.debug("received reply msg_id: %s", incoming_msg_id)