diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 15:33:24 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 15:33:24 +0100 |
commit | a34e13465725855a8707a8897a12088ec49d1a29 (patch) | |
tree | 3b9823826a311a300be9fe1ff5085121956480f8 | |
parent | 640fc3b30d9044e94b8466f233b58fe9dd5876cd (diff) | |
download | rabbitmq-server-a34e13465725855a8707a8897a12088ec49d1a29.tar.gz |
Another copy of the master_in_recovery check. Not sure that's very elegant, but I don't think we have much choice.
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 3 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 52 |
4 files changed, 50 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e25e0f97..391da1ae 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -246,9 +246,9 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = - gen_server2:mcall([{rabbit_amqqueue_sup:start_queue_process(node(), Q), - {init, {self(), Terms}}} || - {Q, Terms} <- QueuesAndRecoveryTerms]), + gen_server2:mcall( + [{rabbit_amqqueue_sup:start_queue_process(node(), Q, recovery), + {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. @@ -274,8 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call( - rabbit_amqqueue_sup:start_queue_process(Node, Q), {init, new}, infinity). + gen_server2:call(rabbit_amqqueue_sup:start_queue_process(Node, Q, declare), + {init, new}, infinity). internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 149014e8..8b6fcc01 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_queue_process/2]). +-export([start_link/0, start_queue_process/3]). -export([init/1]). @@ -31,7 +31,8 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_queue_process/2 :: (node(), rabbit_types:amqqueue()) -> pid()). +-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), + 'declare' | 'recovery' | 'slave') -> pid()). -endif. @@ -40,8 +41,8 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_queue_process(Node, Q) -> - {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q]), +start_queue_process(Node, Q, Hint) -> + {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), Pid. init([]) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 86f73366..017a5187 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -220,7 +220,8 @@ start_child(Name, MirrorNode, Q, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - SPid = rabbit_amqqueue_sup:start_queue_process(MirrorNode, Q), + SPid = rabbit_amqqueue_sup:start_queue_process( + MirrorNode, Q, slave), log_info(Name, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), case SyncMode of diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index ddf14326..2bfffa28 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -22,7 +22,7 @@ %% new queue, or whether we are in recovery. Thus a crashing queue %% process can restart from here and always do the right thing. --export([start_link/1]). +-export([start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -31,25 +31,36 @@ -include("rabbit.hrl"). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +%%-spec(start_link/2 :: () -> rabbit_types:ok_pid_or_error()). + +-endif. %%---------------------------------------------------------------------------- -init(Q) -> +start_link(Q, Hint) -> + gen_server2:start_link(?MODULE, {Q, Hint}, []). + +%%---------------------------------------------------------------------------- + +init({Q, Hint}) -> %% Hand back to supervisor ASAP gen_server2:cast(self(), init), - {ok, Q#amqqueue{pid = self()}, hibernate, + {ok, {Q#amqqueue{pid = self()}, Hint}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, {unexpected_call, Msg}, State}. -handle_cast(init, Q) -> +handle_cast(init, {Q, Hint}) -> case whereis(rabbit_recovery) of - undefined -> init_non_recovery(Q); - _Pid -> init_recovery(Q) + undefined -> init_non_recovery(Q, Hint); + _Pid -> recovery = Hint, %% assertion + init_recovery(Q) end; handle_cast(Msg, State) -> @@ -66,14 +77,12 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_non_recovery(Q = #amqqueue{name = QueueName}) -> +init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> Result = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - {declared, rabbit_amqqueue:internal_declare(Q)}; - [ExistingQ] -> - init_existing(ExistingQ) + [] -> init_missing(Q, Hint); + [ExistingQ] -> init_existing(ExistingQ) end end), case Result of @@ -96,8 +105,23 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) -> exit(todo); sleep_retry -> timer:sleep(25), - init_non_recovery(Q) + init_non_recovery(Q, Hint); + master_in_recovery -> + {stop, normal, Q} + end. + +%% The Hint is how we were originally started. Of course, if we +%% crashed it might no longer be true - but we can only get here if +%% there is no Mnesia record, which should mean we can't be here if we +%% crashed. +init_missing(Q, Hint) -> + case Hint of + declare -> {declared, rabbit_amqqueue:internal_declare(Q)}; + slave -> master_in_recovery %% [1] end. +%% [1] This is the same concept as the master_in_recovery case in the +%% slave startup code. Unfortunately since we start slaves with two +%% transactions we need to check twice. init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> Alive = fun rabbit_misc:is_process_alive/1, |