diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-20 11:03:38 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-20 11:03:38 +0100 |
commit | 403d476e5e077b9753bccc4c1404a214517c4f5c (patch) | |
tree | ab2947d15eba7d03f94ddf86a3b0bd99995a87ac | |
parent | 33544e9bacdbe59ffdf6515a32abad29882aae78 (diff) | |
download | rabbitmq-server-403d476e5e077b9753bccc4c1404a214517c4f5c.tar.gz |
Add an intermediate supervisor so that if a queue exceeds its restart limit we don't bring the whole server down!
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 29 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 51 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 2 |
4 files changed, 73 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 391da1ae..8f25bf2e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -210,9 +210,9 @@ recover() -> BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, - {rabbit_amqqueue_sup, - {rabbit_amqqueue_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}), Recovered = recover_durable_queues( lists:zip(DurableQueues, OrderedRecoveryTerms)), unlink(Marker), @@ -220,8 +220,8 @@ recover() -> Recovered. stop() -> - ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), - ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup), + ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup), + ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:stop(). @@ -247,7 +247,7 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = gen_server2:mcall( - [{rabbit_amqqueue_sup:start_queue_process(node(), Q, recovery), + [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery), {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", [Pid, Error]) || {Pid, Error} <- Failures], @@ -274,8 +274,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call(rabbit_amqqueue_sup:start_queue_process(Node, Q, declare), - {init, new}, infinity). + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), + {init, new}, infinity). internal_declare(Q = #amqqueue{name = QueueName}) -> case not_found_or_absent(QueueName) of diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 99909e55..591de408 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,35 +18,30 @@ -behaviour(supervisor2). --export([start_link/0, start_queue_process/3]). +-export([start_link/2]). -export([init/1]). -include("rabbit.hrl"). --define(SERVER, ?MODULE). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), - 'declare' | 'recovery' | 'slave') -> pid()). +-spec(start_link/2 :: + (rabbit_types:amqqueue(), 'declare' | 'recovery' | 'slave') -> + {'ok', pid(), pid()}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_queue_process(Node, Q, Hint) -> - {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), - Pid. +start_link(Q, Hint) -> + ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]}, + transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + rabbit_mirror_queue_slave]}, + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), + {ok, SupPid, QPid}. -init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, - transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, - rabbit_mirror_queue_slave]}]}}. +init([]) -> {ok, {{one_for_one, 5, 10}, []}}. diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl new file mode 100644 index 00000000..6870f7c4 --- /dev/null +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue_sup_sup). + +-behaviour(supervisor2). + +-export([start_link/0, start_queue_process/3]). + +-export([init/1]). + +-include("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), + 'declare' | 'recovery' | 'slave') -> pid()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_queue_process(Node, Q, Hint) -> + {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]), + QPid. + +init([]) -> + {ok, {{simple_one_for_one, 10, 10}, + [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, + temporary, ?MAX_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 7c7fbbeb..5cb871e8 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -206,7 +206,7 @@ add_mirror(QName, MirrorNode, SyncMode) -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> - SPid = rabbit_amqqueue_sup:start_queue_process( + SPid = rabbit_amqqueue_sup_sup:start_queue_process( MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), |