diff options
authorSimon MacMullen <>2014-08-20 18:15:43 +0100
committerSimon MacMullen <>2014-08-20 18:15:43 +0100
commitb4d0c121fc0d2507da17b7878dc7ffb7ebe4587e (patch)
parentf1c0c943194a6bfb8503a38e1dbf15382ab0e0ff (diff)
Switch to making our restart decisions based on explicitly determining whether we are starting for the first time or not. This is not very OTPish but it turns out to be necessary: there's no way to distinguish between losing a race to declare, starting a new slave, and restarting to a new slave otherwise. As an upside this code is shorter and more obviously correct (to me at least).
4 files changed, 78 insertions, 96 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8f25bf2e..0ad6af51 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,8 +195,6 @@
recover() ->
- Marker = spawn_link(fun() -> receive stop -> ok end end),
- register(rabbit_recovery, Marker),
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
@@ -213,11 +211,7 @@ recover() ->
{rabbit_amqqueue_sup_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
- Recovered = recover_durable_queues(
- lists:zip(DurableQueues, OrderedRecoveryTerms)),
- unlink(Marker),
- Marker ! stop,
- Recovered.
+ recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 591de408..9f7060d7 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -36,12 +36,16 @@
-start_link(Q, Hint) ->
- ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]},
+start_link(Q, StartMode) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ ChildSpec = {rabbit_amqqueue,
+ {rabbit_prequeue, start_link, [Q, StartMode, Marker]},
transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, QPid} = supervisor2:start_child(SupPid, ChildSpec),
+ unlink(Marker),
+ Marker ! stop,
{ok, SupPid, QPid}.
init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index 6870f7c4..793cb7c9 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -41,8 +41,9 @@
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_queue_process(Node, Q, Hint) ->
- {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]),
+start_queue_process(Node, Q, StartMode) ->
+ {ok, _SupPid, QPid} = supervisor2:start_child(
+ {?SERVER, Node}, [Q, StartMode]),
init([]) ->
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 059297cb..b084967d 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -22,7 +22,7 @@
%% new queue, or whether we are in recovery. Thus a crashing queue
%% process can restart from here and always do the right thing.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -41,27 +41,29 @@
-start_link(Q, Hint) ->
- gen_server2:start_link(?MODULE, {Q, Hint}, []).
+start_link(Q, StartMode, Marker) ->
+ gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []).
-init({Q, Hint}) ->
+init({Q, StartMode0, Marker}) ->
%% Hand back to supervisor ASAP
gen_server2:cast(self(), init),
- {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate,
+ StartMode = case is_process_alive(Marker) of
+ true -> StartMode0;
+ false -> restart
+ end,
+ {ok, {Q#amqqueue{pid = self()}, StartMode}, hibernate,
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
-handle_cast(init, {Q, Hint}) ->
- case whereis(rabbit_recovery) of
- undefined -> init_non_recovery(Q, Hint);
- _Pid -> recovery = Hint, %% assertion
- init_recovery(Q)
- end;
+handle_cast(init, {Q, declare}) -> init_declared(Q);
+handle_cast(init, {Q, recovery}) -> init_recovery(Q);
+handle_cast(init, {Q, slave}) -> init_slave(Q);
+handle_cast(init, {Q, restart}) -> init_restart(Q);
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -77,82 +79,27 @@ code_change(_OldVsn, State, _Extra) ->
-init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) ->
- Result = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> init_missing(Q, Hint);
- [ExistingQ] -> init_existing(ExistingQ)
- end
- end),
- case Result of
- {declared, DeclResult} ->
- %% We have just been declared. Block waiting for an init
- %% call so that we don't respond to any other message first
- receive {'$gen_call', From, {init, new}} ->
- case DeclResult of
- {new, Fun} ->
- Q1 = Fun(),
- rabbit_amqqueue_process:init_declared(new,From, Q1);
- {F, _} when F =:= absent; F =:= existing ->
- gen_server2:reply(From, DeclResult),
- {stop, normal, Q}
- end
- end;
- new_slave ->
- rabbit_mirror_queue_slave:init_slave(Q);
- {crash_restart, Q1} ->
- rabbit_log:error(
- "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- ok = rabbit_amqqueue:store_queue(Q1#amqqueue{pid = Self})
- end),
- rabbit_amqqueue_process:init_declared(
- {no_barrier, non_clean_shutdown}, none, Q1);
- sleep_retry ->
- timer:sleep(25),
- init_non_recovery(Q, Hint);
- master_in_recovery ->
- {stop, normal, Q}
+init_declared(Q = #amqqueue{name = QueueName}) ->
+ Decl = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> rabbit_amqqueue:internal_declare(Q);
+ [ExistingQ] -> {existing, ExistingQ}
+ end
+ end),
+ %% We have just been declared. Block waiting for an init
+ %% call so that we don't respond to any other message first
+ receive {'$gen_call', From, {init, new}} ->
+ case Decl of
+ {new, Fun} ->
+ Q1 = Fun(),
+ rabbit_amqqueue_process:init_declared(new,From, Q1);
+ {F, _} when F =:= absent; F =:= existing ->
+ gen_server2:reply(From, Decl),
+ {stop, normal, Q}
+ end
-%% The Hint is how we were originally started. Of course, if we
-%% crashed it might no longer be true - but we can only get here if
-%% there is no Mnesia record, which should mean we can't be here if we
-%% crashed.
-init_missing(Q, Hint) ->
- case Hint of
- declare -> {declared, rabbit_amqqueue:internal_declare(Q)};
- slave -> master_in_recovery %% [1]
- end.
-%% [1] This is the same concept as the master_in_recovery case in the
-%% slave startup code. Unfortunately since we start slaves with two
-%% transactions we need to check twice.
-init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) ->
- Alive = fun rabbit_misc:is_process_alive/1,
- case {Alive(QPid), node(QPid) =:= node()} of
- {true, true} -> {declared, {existing, Q}}; %% [1]
- {true, false} -> new_slave; %% [2]
- {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of
- [] -> {crash_restart, Q}; %% [3]
- _ -> sleep_retry %% [4]
- end
- end.
-%% [1] Lost a race to declare a queue - just return the winner.
-%% [2] There is a master on another node. Regardless of whether we
-%% just crashed (as a master or slave) and restarted or were asked to
-%% start as a slave, we are now a new slave.
-%% [3] Nothing is alive. We must have just died. Try to restart as a master.
-%% [4] The current master is dead but there are alive slaves. This is
-%% not a stable situation. Sleep and wait for somebody else to make a
-%% move - those slaves should either promote one of their own or die.
init_recovery(Q) ->
fun () -> ok = rabbit_amqqueue:store_queue(Q) end),
@@ -160,3 +107,39 @@ init_recovery(Q) ->
receive {'$gen_call', From, {init, Terms}} ->
rabbit_amqqueue_process:init_declared(Terms, From, Q)
+init_slave(Q) ->
+ rabbit_mirror_queue_slave:init_slave(Q).
+init_restart(#amqqueue{name = QueueName}) ->
+ {ok, Q = #amqqueue{pid = QPid,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
+ Local = node(QPid) =:= node(),
+ Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> false = Local, %% assertion
+ rabbit_mirror_queue_slave:init_slave(Q); %% [1]
+ false -> case Local andalso Slaves =:= [] of
+ true -> crash_restart(Q); %% [2]
+ false -> timer:sleep(25),
+ init_restart(Q) %% [3]
+ end
+ end.
+%% [1] There is a master on another node. Regardless of whether we
+%% were originally a master or a slave, we are now a new slave.
+%% [2] Nothing is alive. We are the last best hope. Try to restart as a master.
+%% [3] The current master is dead but either there are alive slaves to
+%% take over or it's all happening on a different node anyway. This is
+%% not a stable situation. Sleep and wait for somebody else to make a
+%% move.
+crash_restart(Q = #amqqueue{name = QueueName}) ->
+ rabbit_log:error(
+ "Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]),
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok = rabbit_amqqueue:store_queue(Q#amqqueue{pid = Self}) end),
+ rabbit_amqqueue_process:init_declared(
+ {no_barrier, non_clean_shutdown}, none, Q).