summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@sileht.net>2017-05-10 09:19:38 +0200
committerMatt Riedemann <mriedem.os@gmail.com>2017-06-15 19:51:44 +0000
commit2775568e01b8429dc90fade4e38673085098da34 (patch)
tree19b25b7a355d5da5e6d1d37d182ae1ea95f333ac
parenta4203b79d77ceee3d884fbb376b1e32dfc54530f (diff)
downloadoslo-messaging-5.17.2.tar.gz
rabbit: restore synchronous ack/requeue5.17.2
Note this change also contains the fix for the regression it introduced. In https://review.openstack.org/#/c/436958, we fix a thread safety issue. But we make the ack/requeue of message asynchronous. In nominal case, it works, but if network/rabbit connection issue occurs this can result to rpc call handle twice. By chance we double check already processed message ids, and drop duplicates, but that if the message goes to another node, the mitigation won't work. This restore the previous behavior, to ensure we run application callback of rpc.call/rpc.cast only when the message have been successfully ack. (cherry picked from commit da02bc2169b09959d857c605961ead1bbba1019d) Fix rabbitmq driver with blocking executor We recently move ack/requeue of messages in main/polling thread of rabbitmq drivers. And break the blocking executor. This one is not tested by any tests and now deprecated. This change workaround the issue until we completely remove the blocking executor. Closes-bug: #1694728 (cherry picked from commit 8ee5ae135a6ecb918f40619982e3dc7e38ed0bbf) Change-Id: I62b9e09513e3ebfebc64a941d4b21b6c053b511d
-rw-r--r--oslo_messaging/_drivers/amqpdriver.py117
-rw-r--r--oslo_messaging/server.py12
2 files changed, 107 insertions, 22 deletions
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index 46e91c9..539e48b 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -42,20 +42,72 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
ACK_REQUEUE_EVERY_SECONDS_MAX = 1.0
-def do_pending_tasks(tasks):
- while True:
- try:
- task = tasks.get(block=False)
- except moves.queue.Empty:
- break
- else:
+class MessageOperationsHandler(object):
+ """Queue used by message operations to ensure that all tasks are
+ serialized and run in the same thread, since underlying drivers like kombu
+ are not thread safe.
+ """
+ def __init__(self, name):
+ self.name = "%s (%s)" % (name, hex(id(self)))
+ self._tasks = moves.queue.Queue()
+
+ self._shutdown = threading.Event()
+ self._shutdown_thread = threading.Thread(
+ target=self._process_in_background)
+ self._shutdown_thread.daemon = True
+
+ # HACK(sileht): this is set by the server.Server temporary
+ # to not have to rewrite the entire internal API to pass
+ # executor everywhere to make Listener aware of the server
+ # executor. All this hack is only for the blocking executor.
+ # And it's deprecated so...
+ self._executor = None
+
+ def stop(self):
+ self._shutdown.set()
+
+ def process_in_background(self):
+ """Run all pending tasks queued by do() in an thread during the
+ shutdown process.
+ """
+ self._shutdown_thread.start()
+
+ def _process_in_background(self):
+ while not self._shutdown.is_set():
+ self.process()
+ time.sleep(ACK_REQUEUE_EVERY_SECONDS_MIN)
+
+ def process(self):
+ "Run all pending tasks queued by do()."
+
+ while True:
+ try:
+ task, event = self._tasks.get(block=False)
+ except moves.queue.Empty:
+ break
+ try:
+ task()
+ finally:
+ event.set()
+
+ def do(self, task):
+ "Put the task in the queue and waits until the task is completed."
+ if self._executor is None:
+ raise RuntimeError("Unexpected error, no executor is setuped")
+ elif self._executor == "blocking":
+ # NOTE(sileht): Blocking will hang forever if we waiting the
+ # polling thread
task()
+ else:
+ event = threading.Event()
+ self._tasks.put((task, event))
+ event.wait()
class AMQPIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
- obsolete_reply_queues, pending_message_actions):
+ obsolete_reply_queues, message_operations_handler):
super(AMQPIncomingMessage, self).__init__(ctxt, message)
self.listener = listener
@@ -63,7 +115,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
self.msg_id = msg_id
self.reply_q = reply_q
self._obsolete_reply_queues = obsolete_reply_queues
- self._pending_tasks = pending_message_actions
+ self._message_operations_handler = message_operations_handler
self.stopwatch = timeutils.StopWatch()
self.stopwatch.start()
@@ -133,7 +185,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
return
def acknowledge(self):
- self._pending_tasks.put(self.message.acknowledge)
+ self._message_operations_handler.do(self.message.acknowledge)
self.listener.msg_id_cache.add(self.unique_id)
def requeue(self):
@@ -143,7 +195,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
# msg_id_cache, the message will be reconsumed, the only difference is
# the message stay at the beginning of the queue instead of moving to
# the end.
- self._pending_tasks.put(self.message.requeue)
+ self._message_operations_handler.do(self.message.requeue)
class ObsoleteReplyQueuesCache(object):
@@ -199,9 +251,11 @@ class AMQPListener(base.PollStyleListener):
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
- self._stopped = threading.Event()
+ self._shutdown = threading.Event()
+ self._shutoff = threading.Event()
self._obsolete_reply_queues = ObsoleteReplyQueuesCache()
- self._pending_tasks = moves.queue.Queue()
+ self._message_operations_handler = MessageOperationsHandler(
+ "AMQPListener")
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
def __call__(self, message):
@@ -222,14 +276,14 @@ class AMQPListener(base.PollStyleListener):
ctxt.msg_id,
ctxt.reply_q,
self._obsolete_reply_queues,
- self._pending_tasks))
+ self._message_operations_handler))
@base.batch_poll_helper
def poll(self, timeout=None):
stopwatch = timeutils.StopWatch(duration=timeout).start()
- while not self._stopped.is_set():
- do_pending_tasks(self._pending_tasks)
+ while not self._shutdown.is_set():
+ self._message_operations_handler.process()
if self.incoming:
return self.incoming.pop(0)
@@ -248,12 +302,30 @@ class AMQPListener(base.PollStyleListener):
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
+ # NOTE(sileht): listener is stopped, just processes remaining messages
+ # and operations
+ self._message_operations_handler.process()
+ if self.incoming:
+ return self.incoming.pop(0)
+
+ self._shutoff.set()
+
def stop(self):
- self._stopped.set()
+ self._shutdown.set()
self.conn.stop_consuming()
- do_pending_tasks(self._pending_tasks)
+ self._shutoff.wait()
+
+ # NOTE(sileht): Here, the listener is stopped, but some incoming
+ # messages may still live on server side, because callback is still
+ # running and message is not yet ack/requeue. It's safe to do the ack
+ # into another thread, side the polling thread is now terminated.
+ self._message_operations_handler.process_in_background()
def cleanup(self):
+ # NOTE(sileht): server executor is now stopped, we are sure that no
+ # more incoming messages in live, we can acknowledge
+ # remaining messages and stop the thread
+ self._message_operations_handler.stop()
# Closes listener connection
self.conn.close()
@@ -306,7 +378,6 @@ class ReplyWaiter(object):
self.allowed_remote_exmods = allowed_remote_exmods
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.waiters = ReplyWaiters()
- self._pending_tasks = moves.queue.Queue()
self.conn.declare_direct_consumer(reply_q, self)
@@ -321,12 +392,10 @@ class ReplyWaiter(object):
self.conn.stop_consuming()
self._thread.join()
self._thread = None
- do_pending_tasks(self._pending_tasks)
def poll(self):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
while not self._thread_exit_event.is_set():
- do_pending_tasks(self._pending_tasks)
try:
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
@@ -340,7 +409,11 @@ class ReplyWaiter(object):
current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
def __call__(self, message):
- self._pending_tasks.put(message.acknowledge)
+ # NOTE(sileht): __call__ is running within the polling thread,
+ # (conn.consume -> conn.conn.drain_events() -> __call__ callback)
+ # it's threadsafe to acknowledge the message here, no need to wait
+ # the next polling
+ message.acknowledge()
incoming_msg_id = message.pop('_msg_id', None)
if message.get('ending'):
LOG.debug("received reply msg_id: %s", incoming_msg_id)
diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py
index ac4a964..0bc5b85 100644
--- a/oslo_messaging/server.py
+++ b/oslo_messaging/server.py
@@ -421,6 +421,18 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
+ # HACK(sileht): We temporary pass the executor to the rabbit
+ # listener to fix a race with the deprecated blocking executor.
+ # We do this hack because this is need only for 'synchronous'
+ # executor like blocking. And this one is deprecated. Making
+ # driver working in an sync and an async way is complicated
+ # and blocking have 0% tests coverage.
+ if hasattr(self.listener, '_poll_style_listener'):
+ l = self.listener._poll_style_listener
+ if hasattr(l, "_message_operations_handler"):
+ l._message_operations_handler._executor = (
+ self.executor_type)
+
self.listener.start(self._on_incoming)
@ordered(after='start')