summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2021-10-26 14:25:29 +0200
committerLoïc Hoguin <lhoguin@vmware.com>2021-10-26 14:25:29 +0200
commitf751cc1bae3beb3f6fda29fdd9b06ba8bfa34885 (patch)
treeda542247c292a83f77ccf2b4c0f341294e10abb6
parent7f0c1982a3a217cd7bd4f5d59ea396a0995335c5 (diff)
downloadrabbitmq-server-git-f751cc1bae3beb3f6fda29fdd9b06ba8bfa34885.tar.gz
WIP
-rw-r--r--deps/rabbit/src/rabbit.erl54
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl7
-rw-r--r--deps/rabbit/src/rabbit_amqqueue_process.erl12
-rw-r--r--deps/rabbit_common/src/supervisor2.erl10
4 files changed, 74 insertions, 9 deletions
diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl
index 23719c517b..df70eb929d 100644
--- a/deps/rabbit/src/rabbit.erl
+++ b/deps/rabbit/src/rabbit.erl
@@ -10,6 +10,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/logging.hrl").
+-include_lib("stdlib/include/qlc.hrl").
-ignore_xref({rabbit_direct, force_event_refresh, 1}).
-ignore_xref({rabbit_networking, force_connection_event_refresh, 1}).
@@ -479,22 +480,63 @@ do_stop() ->
Apps0 = ?APPS ++ rabbit_plugins:active(),
%% We ensure that Mnesia is stopped last (or more exactly, after rabbit).
Apps1 = app_utils:app_dependency_order(Apps0, true) -- [mnesia],
- Apps = [mnesia | Apps1],
+ Apps = [mnesia|Apps1],
%% this will also perform unregistration with the peer discovery backend
%% as needed
- stop_apps(Apps).
+ stop_apps(Apps),
+% rabbit_amqqueue:delete_queues_on_node_down(node()),
+% stop_apps([mnesia]),
+ ok.
-spec stop_and_halt() -> no_return().
+queues_to_delete_on_stop(NodeDown) ->
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ qlc:e(qlc:q([amqqueue:get_name(Q) ||
+ Q <- mnesia:table(rabbit_queue),
+ amqqueue:qnode(Q) == NodeDown andalso
+ (not rabbit_amqqueue:is_replicated(Q) orelse
+ rabbit_amqqueue:is_exclusive(Q))]
+ ))
+ end).
+
+delete_queues_on_stop(Node) ->
+ QueuesToDelete = queues_to_delete_on_stop(Node),
+ _ = [begin
+ {ok, Queue} = rabbit_amqqueue:lookup(QueueName),
+ exit(amqqueue:get_pid(Queue), shutdown),
+ MRef = monitor(process, amqqueue:get_pid(Queue)),
+ receive {'DOWN', MRef, _, _, _} -> ok after 600000 -> exit(amqqueue:get_pid(Queue), kill) end
+ end || QueueName <- QueuesToDelete],
+ lists:unzip(lists:flatten([
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [begin
+ {Queue, rabbit_amqqueue:delete_queue(Queue)}
+ end || Queue <- Queues]
+ end
+ ) || Queues <- rabbit_amqqueue:partition_queues(QueuesToDelete)
+ ])).
+
+
stop_and_halt() ->
try
+ {Time, {QueueNames, QueueDeletions}} = timer:tc(fun() -> delete_queues_on_stop(node()) end),
+ logger:error("STOP DELETIONS TIME ~p LENGTH ~p", [Time, length(QueueNames)]),
+
+ rabbit_amqqueue:notify_queue_binding_deletions(QueueDeletions),
+ rabbit_core_metrics:queues_deleted(QueueNames),
+ rabbit_amqqueue:notify_queues_deleted(QueueNames),
+
+ %rabbit_amqqueue:on_node_down(node()),
+
stop()
- catch Type:Reason ->
+ catch Type:Reason:Stacktrace ->
?LOG_ERROR(
- "Error trying to stop ~s: ~p:~p",
- [product_name(), Type, Reason],
+ "Error trying to stop ~s: ~p:~p ~0p",
+ [product_name(), Type, Reason, Stacktrace],
#{domain => ?RMQLOG_DOMAIN_PRELAUNCH}),
- error({Type, Reason})
+ erlang:raise(Type, Reason, Stacktrace)
after
%% Enclose all the logging in the try block.
%% init:stop() will be called regardless of any errors.
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 410105e310..19b185959c 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -6,6 +6,10 @@
%%
-module(rabbit_amqqueue).
+-compile(export_all).
+-compile(nowarn_export_all).
+-export([delete_queues_on_node_down/1]).
+-export([delete_queue/1]).
-export([warn_file_limit/0]).
-export([recover/1, stop/1, start/1, declare/6, declare/7,
@@ -1905,7 +1909,8 @@ maybe_clear_recoverable_node(Node, Q) ->
-spec on_node_down(node()) -> 'ok'.
on_node_down(Node) ->
- {QueueNames, QueueDeletions} = delete_queues_on_node_down(Node),
+ {Time, {QueueNames, QueueDeletions}} = timer:tc(fun() -> delete_queues_on_node_down(Node) end),
+ logger:error("TIME ~p LENGTH ~p", [Time, length(QueueNames)]),
notify_queue_binding_deletions(QueueDeletions),
rabbit_core_metrics:queues_deleted(QueueNames),
notify_queues_deleted(QueueNames),
diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl
index d8b29d1d32..387fa98083 100644
--- a/deps/rabbit/src/rabbit_amqqueue_process.erl
+++ b/deps/rabbit/src/rabbit_amqqueue_process.erl
@@ -278,7 +278,11 @@ init_with_backing_queue_state(Q, BQ, BQS,
notify_decorators(startup, State3),
State3.
+-define(DBGLOG(Format, Args), file:write_file("/tmp/rabbit_amqqueue_process.log", io_lib:format(Format ++ "~n", Args), [append])).
+
+%% Other terminates here.
terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
+ ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, R, State]),
QName = amqqueue:get_name(Q0),
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(
@@ -295,12 +299,16 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
BQ:terminate(R, BQS)
end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
+ ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, Reason, State]),
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, R, State]),
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+%% Queue that gets deleted goes here.
terminate(normal, State = #q{status = {terminated_by, auto_delete}}) ->
+ ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, normal, State]),
%% auto_delete case
%% To increase performance we want to avoid a mnesia_sync:sync call
%% after every transaction, as we could be deleting simultaneously
@@ -309,9 +317,11 @@ terminate(normal, State = #q{status = {terminated_by, auto_delete}}) ->
%% operation on `rabbit_durable_queue`
terminate_shutdown(terminate_delete(true, auto_delete, State), State);
terminate(normal, State) -> %% delete case
+ ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, normal, State]),
terminate_shutdown(terminate_delete(true, normal, State), State);
%% If we crashed don't try to clean up the BQS, probably best to leave it.
-terminate(_Reason, State = #q{q = Q}) ->
+terminate(Reason, State = #q{q = Q}) ->
+ ?DBGLOG("~p:terminate ~p ~0p", [?MODULE, Reason, State]),
terminate_shutdown(fun (BQS) ->
Q2 = amqqueue:set_state(Q, crashed),
rabbit_misc:execute_mnesia_transaction(
diff --git a/deps/rabbit_common/src/supervisor2.erl b/deps/rabbit_common/src/supervisor2.erl
index d6e8b82262..4705af8a40 100644
--- a/deps/rabbit_common/src/supervisor2.erl
+++ b/deps/rabbit_common/src/supervisor2.erl
@@ -1080,6 +1080,8 @@ monitor_child(Pid) ->
ok
end.
+-define(DBGLOG(Format, Args), file:write_file("/tmp/supervisor2.log", io_lib:format(Format ++ "~n", Args), [append])).
+
%%-----------------------------------------------------------------
%% Func: terminate_dynamic_children/1
%% Args: State
@@ -1093,12 +1095,13 @@ terminate_dynamic_children(State) ->
Child = get_dynamic_child(State),
{Pids, EStack0} = monitor_dynamic_children(Child,State),
Sz = sets:size(Pids),
+ ?DBGLOG("~p: ~p ~0p", [?MODULE, Sz, Child]),
EStack = case Child#child.shutdown of
brutal_kill ->
sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
infinity ->
- sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ sets:fold(fun(P, _) -> ?DBGLOG("~p: shutdown ~p", [?MODULE, P]), exit(P, shutdown) end, ok, Pids),
wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
Time ->
sets:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
@@ -1151,22 +1154,27 @@ wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz,
wait_dynamic_children(Child, Pids, Sz, TRef, EStack) ->
receive
{'DOWN', _MRef, process, Pid, shutdown} ->
+ ?DBGLOG("~p: did shutdown shutdown ~0p", [?MODULE, Pid]),
wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1,
TRef, EStack);
{'DOWN', _MRef, process, Pid, {shutdown, _}} ->
+ ?DBGLOG("~p: did shutdown tuple ~0p", [?MODULE, Pid]),
wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1,
TRef, EStack);
{'DOWN', _MRef, process, Pid, normal} when not (?is_permanent(Child)) ->
+ ?DBGLOG("~p: did shutdown normal ~0p", [?MODULE, Pid]),
wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1,
TRef, EStack);
{'DOWN', _MRef, process, Pid, Reason} ->
+ ?DBGLOG("~p: did shutdown ~0p ~0p", [?MODULE, Reason, Pid]),
wait_dynamic_children(Child, sets:del_element(Pid, Pids), Sz-1,
TRef, dict:append(Reason, Pid, EStack));
{timeout, TRef, kill} ->
+ ?DBGLOG("~p: kill ~p ~0p", [?MODULE, Sz, Pids]),
sets:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
wait_dynamic_children(Child, Pids, Sz, undefined, EStack)
end.