diff options
author | Loïc Hoguin <lhoguin@vmware.com> | 2021-10-26 14:25:29 +0200 |
---|---|---|
committer | Loïc Hoguin <lhoguin@vmware.com> | 2021-10-26 14:25:29 +0200 |
commit | f751cc1bae3beb3f6fda29fdd9b06ba8bfa34885 (patch) | |
tree | da542247c292a83f77ccf2b4c0f341294e10abb6 | |
parent | 7f0c1982a3a217cd7bd4f5d59ea396a0995335c5 (diff) | |
download | rabbitmq-server-git-f751cc1bae3beb3f6fda29fdd9b06ba8bfa34885.tar.gz |
WIP
-rw-r--r-- | deps/rabbit/src/rabbit.erl | 54 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | deps/rabbit_common/src/supervisor2.erl | 10 |
4 files changed, 74 insertions, 9 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 23719c517b..df70eb929d 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -10,6 +10,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/logging.hrl"). +-include_lib("stdlib/include/qlc.hrl"). -ignore_xref({rabbit_direct, force_event_refresh, 1}). -ignore_xref({rabbit_networking, force_connection_event_refresh, 1}). @@ -479,22 +480,63 @@ do_stop() -> Apps0 = ?APPS ++ rabbit_plugins:active(), %% We ensure that Mnesia is stopped last (or more exactly, after rabbit). Apps1 = app_utils:app_dependency_order(Apps0, true) -- [mnesia], - Apps = [mnesia | Apps1], + Apps = [mnesia|Apps1], %% this will also perform unregistration with the peer discovery backend %% as needed - stop_apps(Apps). + stop_apps(Apps), +% rabbit_amqqueue:delete_queues_on_node_down(node()), +% stop_apps([mnesia]), + ok. -spec stop_and_halt() -> no_return(). +queues_to_delete_on_stop(NodeDown) -> + rabbit_misc:execute_mnesia_transaction(fun () -> + qlc:e(qlc:q([amqqueue:get_name(Q) || + Q <- mnesia:table(rabbit_queue), + amqqueue:qnode(Q) == NodeDown andalso + (not rabbit_amqqueue:is_replicated(Q) orelse + rabbit_amqqueue:is_exclusive(Q))] + )) + end). + +delete_queues_on_stop(Node) -> + QueuesToDelete = queues_to_delete_on_stop(Node), + _ = [begin + {ok, Queue} = rabbit_amqqueue:lookup(QueueName), + exit(amqqueue:get_pid(Queue), shutdown), + MRef = monitor(process, amqqueue:get_pid(Queue)), + receive {'DOWN', MRef, _, _, _} -> ok after 600000 -> exit(amqqueue:get_pid(Queue), kill) end + end || QueueName <- QueuesToDelete], + lists:unzip(lists:flatten([ + rabbit_misc:execute_mnesia_transaction( + fun () -> + [begin + {Queue, rabbit_amqqueue:delete_queue(Queue)} + end || Queue <- Queues] + end + ) || Queues <- rabbit_amqqueue:partition_queues(QueuesToDelete) + ])). + + stop_and_halt() -> try + {Time, {QueueNames, QueueDeletions}} = timer:tc(fun() -> delete_queues_on_stop(node()) end), + logger:error("STOP DELETIONS TIME ~p LENGTH ~p", [Time, length(QueueNames)]), + + rabbit_amqqueue:notify_queue_binding_deletions(QueueDeletions), + rabbit_core_metrics:queues_deleted(QueueNames), + rabbit_amqqueue:notify_queues_deleted(QueueNames), + + %rabbit_amqqueue:on_node_down(node()), + stop() - catch Type:Reason -> + catch Type:Reason:Stacktrace -> ?LOG_ERROR( - "Error trying to stop ~s: ~p:~p", - [product_name(), Type, Reason], + "Error trying to stop ~s: ~p:~p ~0p", + [product_name(), Type, Reason, Stacktrace], #{domain => ?RMQLOG_DOMAIN_PRELAUNCH}), - error({Type, Reason}) + erlang:raise(Type, Reason, Stacktrace) after %% Enclose all the logging in the try block. %% init:stop() will be called regardless of any errors. diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 410105e310..19b185959c 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -6,6 +6,10 @@ %% -module(rabbit_amqqueue). +-compile(export_all). +-compile(nowarn_export_all). +-export([delete_queues_on_node_down/1]). +-export([delete_queue/1]). -export([warn_file_limit/0]). -export([recover/1, stop/1, start/1, declare/6, declare/7, @@ -1905,7 +1909,8 @@ maybe_clear_recoverable_node(Node, Q) -> -spec on_node_down(node()) -> 'ok'. on_node_down(Node) -> - {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node), + {Time, {QueueNames, QueueDeletions}} = timer:tc(fun() -> delete_queues_on_node_down(Node) end), + logger:error("TIME ~p LENGTH ~p", [Time, length(QueueNames)]), notify_queue_binding_deletions(QueueDeletions), rabbit_core_metrics:queues_deleted(QueueNames), notify_queues_deleted(QueueNames), diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index d8b29d1d32..387fa98083 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -278,7 +278,11 @@ init_with_backing_queue_state(Q, BQ, BQS, notify_decorators(startup, State3), State3. +-define(DBGLOG(Format, Args), file:write_file("/tmp/rabbit_amqqueue_process.log", io_lib:format(Format ++ "~n", Args), [append])). + +%% Other terminates here. terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> + ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, R, State]), QName = amqqueue:get_name(Q0), rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown( @@ -295,12 +299,16 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, missing_owner} = Reason, State) -> + ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, Reason, State]), %% if the owner was missing then there will be no queue, so don't emit stats terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, R, State]), rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +%% Queue that gets deleted goes here. terminate(normal, State = #q{status = {terminated_by, auto_delete}}) -> + ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, normal, State]), %% auto_delete case %% To increase performance we want to avoid a mnesia_sync:sync call %% after every transaction, as we could be deleting simultaneously @@ -309,9 +317,11 @@ terminate(normal, State = #q{status = {terminated_by, auto_delete}}) -> %% operation on `rabbit_durable_queue` terminate_shutdown(terminate_delete(true, auto_delete, State), State); terminate(normal, State) -> %% delete case + ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, normal, State]), terminate_shutdown(terminate_delete(true, normal, State), State); %% If we crashed don't try to clean up the BQS, probably best to leave it. -terminate(_Reason, State = #q{q = Q}) -> +terminate(Reason, State = #q{q = Q}) -> + ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, Reason, State]), terminate_shutdown(fun (BQS) -> Q2 = amqqueue:set_state(Q, crashed), rabbit_misc:execute_mnesia_transaction( diff --git a/deps/rabbit_common/src/supervisor2.erl b/deps/rabbit_common/src/supervisor2.erl index d6e8b82262..4705af8a40 100644 --- a/deps/rabbit_common/src/supervisor2.erl +++ b/deps/rabbit_common/src/supervisor2.erl @@ -1080,6 +1080,8 @@ monitor_child(Pid) -> ok end. +-define(DBGLOG(Format, Args), file:write_file("/tmp/supervisor2.log", io_lib:format(Format ++ "~n", Args), [append])). + %%----------------------------------------------------------------- %% Func: terminate_dynamic_children/1 %% Args: State @@ -1093,12 +1095,13 @@ terminate_dynamic_children(State) -> Child = get_dynamic_child(State), {Pids, EStack0} = monitor_dynamic_children(Child,State), Sz = sets:size(Pids), + ?DBGLOG("~p: ~p ~0p", [?MODULE, Sz, Child]), EStack = case Child#child.shutdown of brutal_kill -> sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); infinity -> - sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + sets:fold(fun(P, _) -> ?DBGLOG("~p: shutdown ~p", [?MODULE, P]), exit(P, shutdown) end, ok, Pids), wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); Time -> sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), @@ -1151,22 +1154,27 @@ wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz, wait_dynamic_children(Child, Pids, Sz, TRef, EStack) -> receive {'DOWN', _MRef, process, Pid, shutdown} -> + ?DBGLOG("~p: did shutdown shutdown ~0p", [?MODULE, Pid]), wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); {'DOWN', _MRef, process, Pid, {shutdown, _}} -> + ?DBGLOG("~p: did shutdown tuple ~0p", [?MODULE, Pid]), wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); {'DOWN', _MRef, process, Pid, normal} when not (?is_permanent(Child)) -> + ?DBGLOG("~p: did shutdown normal ~0p", [?MODULE, Pid]), wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1, TRef, EStack); {'DOWN', _MRef, process, Pid, Reason} -> + ?DBGLOG("~p: did shutdown ~0p ~0p", [?MODULE, Reason, Pid]), wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1, TRef, dict:append(Reason, Pid, EStack)); {timeout, TRef, kill} -> + ?DBGLOG("~p: kill ~p ~0p", [?MODULE, Sz, Pids]), sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), wait_dynamic_children(Child, Pids, Sz, undefined, EStack) end. |