diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 14:55:42 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 14:55:42 +0100 |
commit | 640fc3b30d9044e94b8466f233b58fe9dd5876cd (patch) | |
tree | bd6b03456c7a4d1009acd2ac7d93041ab97cb1f9 | |
parent | f0ee2e3a51f3635c69b0058283cb58d1ef35530a (diff) | |
download | rabbitmq-server-640fc3b30d9044e94b8466f233b58fe9dd5876cd.tar.gz |
Roll slave startup into the new mechanism.
-rw-r--r-- | src/rabbit.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 15 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 45 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 37 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 42 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 2 |
9 files changed, 65 insertions, 105 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index b00a1ad7..bd34cf8b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -134,17 +134,10 @@ {requires, core_initialized}, {enables, routing_ready}]}). --rabbit_boot_step({mirror_queue_slave_sup, - [{description, "mirror queue slave sup"}, - {mfa, {rabbit_sup, start_supervisor_child, - [rabbit_mirror_queue_slave_sup]}}, - {requires, recovery}, - {enables, routing_ready}]}). - -rabbit_boot_step({mirrored_queues, [{description, "adding mirrors to queues"}, {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, mirror_queue_slave_sup}, + {requires, recovery}, {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b93b6be6..e25e0f97 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -246,7 +246,7 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = - gen_server2:mcall([{start_queue_process(node(), Q), + gen_server2:mcall([{rabbit_amqqueue_sup:start_queue_process(node(), Q), {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", @@ -274,7 +274,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). + gen_server2:call( + rabbit_amqqueue_sup:start_queue_process(Node, Q), {init, new}, infinity). internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of @@ -331,10 +332,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -start_queue_process(Node, Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Pid. - add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 951542f8..84832f9f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -105,8 +105,7 @@ statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(_) -> - exit(cannot_be_called_directly). +init(_) -> exit(cannot_be_called_directly). %% We have just been declared or recovered init_declared(Recover, From, Q = #amqqueue{name = QName, diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 137422d4..149014e8 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start_link/0, start_child/2]). +-export([start_link/0, start_queue_process/2]). -export([init/1]). @@ -31,10 +31,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_child/2 :: - (node(), [any()]) -> rabbit_types:ok(pid() | undefined) | - rabbit_types:ok({pid(), any()}) | - rabbit_types:error(any())). +-spec(start_queue_process/2 :: (node(), rabbit_types:amqqueue()) -> pid()). -endif. @@ -43,10 +40,12 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Node, Args) -> - supervisor2:start_child({?SERVER, Node}, Args). +start_queue_process(Node, Q) -> + {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q]), + Pid. init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. + temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9e8c4a18..86f73366 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -220,11 +220,13 @@ start_child(Name, MirrorNode, Q, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]), + SPid = rabbit_amqqueue_sup:start_queue_process(MirrorNode, Q), log_info(Name, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode) + case SyncMode of + sync -> rabbit_mirror_queue_slave:await(SPid); + async -> ok + end end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6d0064ab..7f65af65 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1, go/2]). +-export([set_maximum_since_use/2, info/1, init_slave/1, await/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -71,23 +71,17 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). info(QPid) -> gen_server2:call(QPid, info, infinity). -init(Q) -> - ?store_proc_name(Q#amqqueue.name), - {ok, {not_started, Q}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}. +await(SPid) -> gen_server2:call(SPid, await, infinity). -go(SPid, sync) -> gen_server2:call(SPid, go, infinity); -go(SPid, async) -> gen_server2:cast(SPid, go). +init(_) -> exit(cannot_be_called_directly). -handle_go(Q = #amqqueue{name = QName}) -> +init_slave(Q = #amqqueue{name = QName}) -> + ?store_proc_name(QName), %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -141,25 +135,26 @@ handle_go(Q = #amqqueue{name = QName}) -> ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), rabbit_mirror_queue_misc:maybe_auto_sync(Q1), - {ok, State}; + {become, ?MODULE, State, hibernate}; {stale, StalePid} -> rabbit_mirror_queue_misc:log_warning( QName, "Detected stale HA master: ~p~n", [StalePid]), gm:leave(GM), - {error, {stale_master_pid, StalePid}}; + {stop, {stale_master_pid, StalePid}, Q}; duplicate_live_master -> gm:leave(GM), - {error, {duplicate_live_master, Node}}; + {stop, {duplicate_live_master, Node}, Q}; existing -> gm:leave(GM), - {error, normal}; + {stop, normal, Q}; + %% TODO what about this case? master_in_recovery -> gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - {error, normal} + {stop, normal, Q} end. init_it(Self, GM, Node, QName) -> @@ -193,11 +188,8 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). -handle_call(go, _From, {not_started, Q} = NotStarted) -> - case handle_go(Q) of - {ok, State} -> {reply, ok, State}; - {error, Error} -> {stop, Error, NotStarted} - end; +handle_call(await, _From, State) -> + {reply, ok, State}; handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { @@ -235,12 +227,6 @@ handle_call({gm_deaths, DeadGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). -handle_cast(go, {not_started, Q} = NotStarted) -> - case handle_go(Q) of - {ok, State} -> {noreply, State}; - {error, Error} -> {stop, Error, NotStarted} - end; - handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -321,8 +307,6 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. -terminate(_Reason, {not_started, _Q}) -> - ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -361,9 +345,6 @@ terminate_common(State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_pre_hibernate({not_started, _Q} = State) -> - {hibernate, State}; - handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl deleted file mode 100644 index b631cc31..00000000 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ /dev/null @@ -1,37 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. -%% - --module(rabbit_mirror_queue_slave_sup). - --behaviour(supervisor2). - --export([start_link/0, start_child/2]). - --export([init/1]). - --include_lib("rabbit.hrl"). - --define(SERVER, ?MODULE). - -start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). - -init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_mirror_queue_slave, - {rabbit_mirror_queue_slave, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 07df581b..ddf14326 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -71,16 +71,13 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> - {decl, rabbit_amqqueue:internal_declare(Q)}; - [ExistingQ = #amqqueue{pid = QPid}] -> - case rabbit_misc:is_process_alive(QPid) of - true -> {decl, {existing, ExistingQ}}; - false -> exit(todo) - end + {declared, rabbit_amqqueue:internal_declare(Q)}; + [ExistingQ] -> + init_existing(ExistingQ) end end), case Result of - {decl, DeclResult} -> + {declared, DeclResult} -> %% We have just been declared. Block waiting for an init %% call so that we don't respond to any other message first receive {'$gen_call', From, {init, new}} -> @@ -92,9 +89,38 @@ init_non_recovery(Q = #amqqueue{name = QueueName}) -> gen_server2:reply(From, DeclResult), {stop, normal, Q} end - end + end; + new_slave -> + rabbit_mirror_queue_slave:init_slave(Q); + crash_restart -> + exit(todo); + sleep_retry -> + timer:sleep(25), + init_non_recovery(Q) end. +init_existing(Q = #amqqueue{pid = QPid, slave_pids = SPids}) -> + Alive = fun rabbit_misc:is_process_alive/1, + case {Alive(QPid), node(QPid) =:= node()} of + {true, true} -> {declared, {existing, Q}}; %% [1] + {true, false} -> new_slave; %% [2] + {false, _} -> case [SPid || SPid <- SPids, Alive(SPid)] of + [] -> crash_restart; %% [3] + _ -> sleep_retry %% [4] + end + end. +%% [1] Lost a race to declare a queue - just return the winner. +%% +%% [2] There is a master on another node. Regardless of whether we +%% just crashed (as a master or slave) and restarted or were asked to +%% start as a slave, we are now a new slave. +%% +%% [3] Nothing is alive. We must have just died. Try to restart as a master. +%% +%% [4] The current master is dead but there are alive slaves. This is +%% not a stable situation. Sleep and wait for somebody else to make a +%% move - those slaves should either promote one of their own or die. + init_recovery(Q) -> rabbit_misc:execute_mnesia_transaction( fun () -> ok = rabbit_amqqueue:store_queue(Q) end), diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 6fe65c12..212cf973 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -34,7 +34,7 @@ %% Like erlang:memory(), but with awareness of rabbit-y things memory() -> ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup], - QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup], + QProcs = [rabbit_amqqueue_sup], MsgIndexProcs = [msg_store_transient, msg_store_persistent], MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), |