summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-19 15:33:24 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-19 15:33:24 +0100
commita34e13465725855a8707a8897a12088ec49d1a29 (patch)
tree3b9823826a311a300be9fe1ff5085121956480f8
parent640fc3b30d9044e94b8466f233b58fe9dd5876cd (diff)
downloadrabbitmq-server-a34e13465725855a8707a8897a12088ec49d1a29.tar.gz
Another copy of the master_in_recovery check. Not sure that's very elegant, but I don't think we have much choice.
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_sup.erl9
-rw-r--r--src/rabbit_mirror_queue_misc.erl3
-rw-r--r--src/rabbit_prequeue.erl52
4 files changed, 50 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e25e0f97..391da1ae 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -246,9 +246,9 @@ find_durable_queues() ->
recover_durable_queues(QueuesAndRecoveryTerms) ->
{Results, Failures} =
- gen_server2:mcall([{rabbit_amqqueue_sup:start_queue_process(node(), Q),
- {init, {self(), Terms}}} ||
- {Q, Terms} <- QueuesAndRecoveryTerms]),
+ gen_server2:mcall(
+ [{rabbit_amqqueue_sup:start_queue_process(node(), Q, recovery),
+ {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]),
[rabbit_log:error("Queue ~p failed to initialise: ~p~n",
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
@@ -274,8 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
down_slave_nodes = [],
gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
- gen_server2:call(
- rabbit_amqqueue_sup:start_queue_process(Node, Q), {init, new}, infinity).
+ gen_server2:call(rabbit_amqqueue_sup:start_queue_process(Node, Q, declare),
+ {init, new}, infinity).
internal_declare(Q = #amqqueue{name = QueueName}) ->
case not_found_or_absent(QueueName) of
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 149014e8..8b6fcc01 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start_link/0, start_queue_process/2]).
+-export([start_link/0, start_queue_process/3]).
-export([init/1]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_process/2 :: (node(), rabbit_types:amqqueue()) -> pid()).
+-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(),
+ 'declare' | 'recovery' | 'slave') -> pid()).
-endif.
@@ -40,8 +41,8 @@
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_queue_process(Node, Q) ->
- {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q]),
+start_queue_process(Node, Q, Hint) ->
+ {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]),
Pid.
init([]) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 86f73366..017a5187 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -220,7 +220,8 @@ start_child(Name, MirrorNode, Q, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup:start_queue_process(MirrorNode, Q),
+ SPid = rabbit_amqqueue_sup:start_queue_process(
+ MirrorNode, Q, slave),
log_info(Name, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
case SyncMode of
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index ddf14326..2bfffa28 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -22,7 +22,7 @@
%% new queue, or whether we are in recovery. Thus a crashing queue
%% process can restart from here and always do the right thing.
--export([start_link/1]).
+-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -31,25 +31,36 @@
-include("rabbit.hrl").
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+%%-spec(start_link/2 :: () -> rabbit_types:ok_pid_or_error()).
+
+-endif.
%%----------------------------------------------------------------------------
-init(Q) ->
+start_link(Q, Hint) ->
+ gen_server2:start_link(?MODULE, {Q, Hint}, []).
+
+%%----------------------------------------------------------------------------
+
+init({Q, Hint}) ->
%% Hand back to supervisor ASAP
gen_server2:cast(self(), init),
- {ok, Q#amqqueue{pid = self()}, hibernate,
+ {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}.
-handle_cast(init, Q) ->
+handle_cast(init, {Q, Hint}) ->
case whereis(rabbit_recovery) of
- undefined -> init_non_recovery(Q);
- _Pid -> init_recovery(Q)
+ undefined -> init_non_recovery(Q, Hint);
+ _Pid -> recovery = Hint, %% assertion
+ init_recovery(Q)
end;
handle_cast(Msg, State) ->
@@ -66,14 +77,12 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-init_non_recovery(Q = #amqqueue{name = QueueName}) ->
+init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) ->
Result = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- {declared, rabbit_amqqueue:internal_declare(Q)};
- [ExistingQ] ->
- init_existing(ExistingQ)
+ [] -> init_missing(Q, Hint);
+ [ExistingQ] -> init_existing(ExistingQ)
end
end),
case Result of
@@ -96,8 +105,23 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) ->
exit(todo);
sleep_retry ->
timer:sleep(25),
- init_non_recovery(Q)
+ init_non_recovery(Q, Hint);
+ master_in_recovery ->
+ {stop, normal, Q}
+ end.
+
+%% The Hint is how we were originally started. Of course, if we
+%% crashed it might no longer be true - but we can only get here if
+%% there is no Mnesia record, which should mean we can't be here if we
+%% crashed.
+init_missing(Q, Hint) ->
+ case Hint of
+ declare -> {declared, rabbit_amqqueue:internal_declare(Q)};
+ slave -> master_in_recovery %% [1]
end.
+%% [1] This is the same concept as the master_in_recovery case in the
+%% slave startup code. Unfortunately since we start slaves with two
+%% transactions we need to check twice.
init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) ->
Alive = fun rabbit_misc:is_process_alive/1,