summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-19 14:55:42 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-19 14:55:42 +0100
commit640fc3b30d9044e94b8466f233b58fe9dd5876cd (patch)
treebd6b03456c7a4d1009acd2ac7d93041ab97cb1f9
parentf0ee2e3a51f3635c69b0058283cb58d1ef35530a (diff)
downloadrabbitmq-server-640fc3b30d9044e94b8466f233b58fe9dd5876cd.tar.gz
Roll slave startup into the new mechanism.
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_amqqueue_sup.erl15
-rw-r--r--src/rabbit_mirror_queue_misc.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl45
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl37
-rw-r--r--src/rabbit_prequeue.erl42
-rw-r--r--src/rabbit_vm.erl2
9 files changed, 65 insertions, 105 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b00a1ad7..bd34cf8b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -134,17 +134,10 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({mirror_queue_slave_sup,
- [{description, "mirror queue slave sup"},
- {mfa, {rabbit_sup, start_supervisor_child,
- [rabbit_mirror_queue_slave_sup]}},
- {requires, recovery},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({mirrored_queues,
[{description, "adding mirrors to queues"},
{mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
- {requires, mirror_queue_slave_sup},
+ {requires, recovery},
{enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b93b6be6..e25e0f97 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -246,7 +246,7 @@ find_durable_queues() ->
recover_durable_queues(QueuesAndRecoveryTerms) ->
{Results, Failures} =
- gen_server2:mcall([{start_queue_process(node(), Q),
+ gen_server2:mcall([{rabbit_amqqueue_sup:start_queue_process(node(), Q),
{init, {self(), Terms}}} ||
{Q, Terms} <- QueuesAndRecoveryTerms]),
[rabbit_log:error("Queue ~p failed to initialise: ~p~n",
@@ -274,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
down_slave_nodes = [],
gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
- gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity).
+ gen_server2:call(
+ rabbit_amqqueue_sup:start_queue_process(Node, Q), {init, new}, infinity).
internal_declare(Q = #amqqueue{name = QueueName}) ->
case not_found_or_absent(QueueName) of
@@ -331,10 +332,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
-start_queue_process(Node, Q) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
- Pid.
-
add_default_binding(#amqqueue{name = QueueName}) ->
ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 951542f8..84832f9f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -105,8 +105,7 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
-init(_) ->
- exit(cannot_be_called_directly).
+init(_) -> exit(cannot_be_called_directly).
%% We have just been declared or recovered
init_declared(Recover, From, Q = #amqqueue{name = QName,
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 137422d4..149014e8 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start_link/0, start_child/2]).
+-export([start_link/0, start_queue_process/2]).
-export([init/1]).
@@ -31,10 +31,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(start_child/2 ::
- (node(), [any()]) -> rabbit_types:ok(pid() | undefined) |
- rabbit_types:ok({pid(), any()}) |
- rabbit_types:error(any())).
+-spec(start_queue_process/2 :: (node(), rabbit_types:amqqueue()) -> pid()).
-endif.
@@ -43,10 +40,12 @@
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_child(Node, Args) ->
- supervisor2:start_child({?SERVER, Node}, Args).
+start_queue_process(Node, Q) ->
+ {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q]),
+ Pid.
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_prequeue, start_link, []},
- temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
+ temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
+ rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9e8c4a18..86f73366 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -220,11 +220,13 @@ start_child(Name, MirrorNode, Q, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]),
+ SPid = rabbit_amqqueue_sup:start_queue_process(MirrorNode, Q),
log_info(Name, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ case SyncMode of
+ sync -> rabbit_mirror_queue_slave:await(SPid);
+ async -> ok
+ end
end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6d0064ab..7f65af65 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2, info/1, go/2]).
+-export([set_maximum_since_use/2, info/1, init_slave/1, await/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -71,23 +71,17 @@
%%----------------------------------------------------------------------------
-start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(Q) ->
- ?store_proc_name(Q#amqqueue.name),
- {ok, {not_started, Q}, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}}.
+await(SPid) -> gen_server2:call(SPid, await, infinity).
-go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
-go(SPid, async) -> gen_server2:cast(SPid, go).
+init(_) -> exit(cannot_be_called_directly).
-handle_go(Q = #amqqueue{name = QName}) ->
+init_slave(Q = #amqqueue{name = QName}) ->
+ ?store_proc_name(QName),
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -141,25 +135,26 @@ handle_go(Q = #amqqueue{name = QName}) ->
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
- {ok, State};
+ {become, ?MODULE, State, hibernate};
{stale, StalePid} ->
rabbit_mirror_queue_misc:log_warning(
QName, "Detected stale HA master: ~p~n", [StalePid]),
gm:leave(GM),
- {error, {stale_master_pid, StalePid}};
+ {stop, {stale_master_pid, StalePid}, Q};
duplicate_live_master ->
gm:leave(GM),
- {error, {duplicate_live_master, Node}};
+ {stop, {duplicate_live_master, Node}, Q};
existing ->
gm:leave(GM),
- {error, normal};
+ {stop, normal, Q};
+ %% TODO what about this case?
master_in_recovery ->
gm:leave(GM),
%% The queue record vanished - we must have a master starting
%% concurrently with us. In that case we can safely decide to do
%% nothing here, and the master will start us in
%% master:init_with_existing_bq/3
- {error, normal}
+ {stop, normal, Q}
end.
init_it(Self, GM, Node, QName) ->
@@ -193,11 +188,8 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
-handle_call(go, _From, {not_started, Q} = NotStarted) ->
- case handle_go(Q) of
- {ok, State} -> {reply, ok, State};
- {error, Error} -> {stop, Error, NotStarted}
- end;
+handle_call(await, _From, State) ->
+ {reply, ok, State};
handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
@@ -235,12 +227,6 @@ handle_call({gm_deaths, DeadGMPids}, From,
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State).
-handle_cast(go, {not_started, Q} = NotStarted) ->
- case handle_go(Q) of
- {ok, State} -> {noreply, State};
- {error, Error} -> {stop, Error, NotStarted}
- end;
-
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -321,8 +307,6 @@ handle_info({bump_credit, Msg}, State) ->
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
-terminate(_Reason, {not_started, _Q}) ->
- ok;
terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
@@ -361,9 +345,6 @@ terminate_common(State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_pre_hibernate({not_started, _Q} = State) ->
- {hibernate, State};
-
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
deleted file mode 100644
index b631cc31..00000000
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ /dev/null
@@ -1,37 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_mirror_queue_slave_sup).
-
--behaviour(supervisor2).
-
--export([start_link/0, start_child/2]).
-
--export([init/1]).
-
--include_lib("rabbit.hrl").
-
--define(SERVER, ?MODULE).
-
-start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
-
-init([]) ->
- {ok, {{simple_one_for_one, 10, 10},
- [{rabbit_mirror_queue_slave,
- {rabbit_mirror_queue_slave, start_link, []},
- temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 07df581b..ddf14326 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -71,16 +71,13 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
- {decl, rabbit_amqqueue:internal_declare(Q)};
- [ExistingQ = #amqqueue{pid = QPid}] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> {decl, {existing, ExistingQ}};
- false -> exit(todo)
- end
+ {declared, rabbit_amqqueue:internal_declare(Q)};
+ [ExistingQ] ->
+ init_existing(ExistingQ)
end
end),
case Result of
- {decl, DeclResult} ->
+ {declared, DeclResult} ->
%% We have just been declared. Block waiting for an init
%% call so that we don't respond to any other message first
receive {'$gen_call', From, {init, new}} ->
@@ -92,9 +89,38 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) ->
gen_server2:reply(From, DeclResult),
{stop, normal, Q}
end
- end
+ end;
+ new_slave ->
+ rabbit_mirror_queue_slave:init_slave(Q);
+ crash_restart ->
+ exit(todo);
+ sleep_retry ->
+ timer:sleep(25),
+ init_non_recovery(Q)
end.
+init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) ->
+ Alive = fun rabbit_misc:is_process_alive/1,
+ case {Alive(QPid), node(QPid) =:= node()} of
+ {true, true} -> {declared, {existing, Q}}; %% [1]
+ {true, false} -> new_slave; %% [2]
+ {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of
+ [] -> crash_restart; %% [3]
+ _ -> sleep_retry %% [4]
+ end
+ end.
+%% [1] Lost a race to declare a queue - just return the winner.
+%%
+%% [2] There is a master on another node. Regardless of whether we
+%% just crashed (as a master or slave) and restarted or were asked to
+%% start as a slave, we are now a new slave.
+%%
+%% [3] Nothing is alive. We must have just died. Try to restart as a master.
+%%
+%% [4] The current master is dead but there are alive slaves. This is
+%% not a stable situation. Sleep and wait for somebody else to make a
+%% move - those slaves should either promote one of their own or die.
+
init_recovery(Q) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> ok = rabbit_amqqueue:store_queue(Q) end),
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 6fe65c12..212cf973 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -34,7 +34,7 @@
%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup],
- QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup],
+ QProcs = [rabbit_amqqueue_sup],
MsgIndexProcs = [msg_store_transient, msg_store_persistent],
MgmtDbProcs = [rabbit_mgmt_sup_sup],
PluginProcs = plugin_sups(),