summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-20 11:03:38 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-20 11:03:38 +0100
commit403d476e5e077b9753bccc4c1404a214517c4f5c (patch)
treeab2947d15eba7d03f94ddf86a3b0bd99995a87ac
parent33544e9bacdbe59ffdf6515a32abad29882aae78 (diff)
downloadrabbitmq-server-403d476e5e077b9753bccc4c1404a214517c4f5c.tar.gz
Add an intermediate supervisor so that if a queue exceeds its restart limit we don't bring the whole server down!
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_sup.erl29
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl51
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
4 files changed, 73 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 391da1ae..8f25bf2e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -210,9 +210,9 @@ recover() ->
BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
- {rabbit_amqqueue_sup,
- {rabbit_amqqueue_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}),
Recovered = recover_durable_queues(
lists:zip(DurableQueues, OrderedRecoveryTerms)),
unlink(Marker),
@@ -220,8 +220,8 @@ recover() ->
Recovered.
stop() ->
- ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
- ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup),
+ ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup),
+ ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:stop().
@@ -247,7 +247,7 @@ find_durable_queues() ->
recover_durable_queues(QueuesAndRecoveryTerms) ->
{Results, Failures} =
gen_server2:mcall(
- [{rabbit_amqqueue_sup:start_queue_process(node(), Q, recovery),
+ [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery),
{init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]),
[rabbit_log:error("Queue ~p failed to initialise: ~p~n",
[Pid, Error]) || {Pid, Error} <- Failures],
@@ -274,8 +274,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
down_slave_nodes = [],
gm_pids = []})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
- gen_server2:call(rabbit_amqqueue_sup:start_queue_process(Node, Q, declare),
- {init, new}, infinity).
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
+ {init, new}, infinity).
internal_declare(Q = #amqqueue{name = QueueName}) ->
case not_found_or_absent(QueueName) of
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 99909e55..591de408 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -18,35 +18,30 @@
-behaviour(supervisor2).
--export([start_link/0, start_queue_process/3]).
+-export([start_link/2]).
-export([init/1]).
-include("rabbit.hrl").
--define(SERVER, ?MODULE).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(),
- 'declare' | 'recovery' | 'slave') -> pid()).
+-spec(start_link/2 ::
+ (rabbit_types:amqqueue(), 'declare' | 'recovery' | 'slave') ->
+ {'ok', pid(), pid()}).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_queue_process(Node, Q, Hint) ->
- {ok, Pid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]),
- Pid.
+start_link(Q, Hint) ->
+ ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, Hint]},
+ transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
+ rabbit_mirror_queue_slave]},
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec),
+ {ok, SupPid, QPid}.
-init([]) ->
- {ok, {{simple_one_for_one, 10, 10},
- [{rabbit_amqqueue, {rabbit_prequeue, start_link, []},
- transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
- rabbit_mirror_queue_slave]}]}}.
+init([]) -> {ok, {{one_for_one, 5, 10}, []}}.
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
new file mode 100644
index 00000000..6870f7c4
--- /dev/null
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_amqqueue_sup_sup).
+
+-behaviour(supervisor2).
+
+-export([start_link/0, start_queue_process/3]).
+
+-export([init/1]).
+
+-include("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(),
+ 'declare' | 'recovery' | 'slave') -> pid()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_queue_process(Node, Q, Hint) ->
+ {ok, _SupPid, QPid} = supervisor2:start_child({?SERVER, Node}, [Q, Hint]),
+ QPid.
+
+init([]) ->
+ {ok, {{simple_one_for_one, 10, 10},
+ [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []},
+ temporary, ?MAX_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 7c7fbbeb..5cb871e8 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -206,7 +206,7 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup:start_queue_process(
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),