summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-04 14:37:53 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-04 14:37:53 +0100
commitd726495570aa4a7773c68d8ebb753ddc1a2959e3 (patch)
treed9f6995ea1442ee91005497dafe8a8517cf55e59
parente41f42aee9a6b95ca95c6dc63d2310c336f7d51f (diff)
parentf36c506dc7f8ad5bb52fa91f23de76d8fc216a27 (diff)
downloadrabbitmq-server-d726495570aa4a7773c68d8ebb753ddc1a2959e3.tar.gz
Merge default
-rw-r--r--docs/rabbitmqctl.1.xml27
-rw-r--r--packaging/macports/Portfile.in2
-rw-r--r--src/rabbit_amqqueue_process.erl62
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_control_main.erl22
-rw-r--r--src/rabbit_heartbeat.erl45
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_misc.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl148
-rw-r--r--src/rabbit_net.erl33
-rw-r--r--src/rabbit_networking.erl12
-rw-r--r--src/rabbit_policy.erl56
-rw-r--r--src/rabbit_reader.erl19
-rw-r--r--src/rabbit_runtime_parameter.erl18
-rw-r--r--src/rabbit_runtime_parameters.erl134
-rw-r--r--src/rabbit_runtime_parameters_test.erl18
-rw-r--r--src/rabbit_tests.erl15
-rw-r--r--src/rabbit_variable_queue.erl8
-rw-r--r--src/rabbit_vhost.erl13
-rw-r--r--src/supervisor2.erl135
-rw-r--r--src/supervisor2_tests.erl79
22 files changed, 493 insertions, 371 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c74ec785..4a038da0 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -657,7 +657,7 @@
</para>
<para>
Deleting a virtual host deletes all its exchanges,
- queues, user mappings and associated permissions.
+ queues, bindings, user permissions and parameters.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl delete_vhost test</screen>
@@ -826,15 +826,16 @@
Certain features of RabbitMQ (such as the federation plugin)
are controlled by dynamic,
cluster-wide <emphasis>parameters</emphasis>. Each parameter
- consists of a component name, a key and a value. The
- component name and key are strings, and the value is an
- Erlang term. Parameters can be set, cleared and listed. In
- general you should refer to the documentation for the feature
- in question to see how to set parameters.
+ consists of a component name, a key and a value, and is
+ associated with a virtual host. The component name and key are
+ strings, and the value is an Erlang term. Parameters can be
+ set, cleared and listed. In general you should refer to the
+ documentation for the feature in question to see how to set
+ parameters.
</para>
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Sets a parameter.
@@ -865,12 +866,12 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl set_parameter federation local_username '&lt;&lt;"guest">>'</screen>
<para role="example">
- This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command>&lt;&lt;"guest">></command>.
+ This command sets the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host to the Erlang term <command>&lt;&lt;"guest">></command>.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>clear_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Clears a parameter.
@@ -893,20 +894,20 @@
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl clear_parameter federation local_username</screen>
<para role="example">
- This command clears the parameter <command>local_username</command> for the <command>federation</command> component.
+ This command clears the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term>
+ <term><cmdsynopsis><command>list_parameters</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
- Lists all parameters.
+ Lists all parameters for a virtual host.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl list_parameters</screen>
<para role="example">
- This command lists all parameters.
+ This command lists all parameters in the default virtual host.
</para>
</listitem>
</varlistentry>
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e461e49e..82c1fb0c 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -59,7 +59,7 @@ set mandest ${destroot}${prefix}/share/man
use_configure no
-use_parallel_build yes
+use_parallel_build no
build.env-append HOME=${workpath}
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6bf290de..f5a3a5f1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ ttl_timer_expiry,
senders,
publish_seqno,
unconfirmed,
@@ -107,7 +108,7 @@
]).
-define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%%----------------------------------------------------------------------------
@@ -576,7 +577,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -716,30 +718,42 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
- ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- BQS1 = case DLXFun of
- undefined -> {undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- BQS2;
- _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
+ ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
Msgs),
- BQS2
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
- when TTL =/= undefined ->
- case BQ:is_empty(BQS) of
- true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
- State#q{ttl_timer_ref = TRef}
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 95523bed..ed5340fe 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -124,9 +124,11 @@
%% necessitate an ack or not. If they do, the function returns a list of
%% messages with the respective acktags.
-callback dropwhile(msg_pred(), true, state())
- -> {[{rabbit_types:basic_message(), ack()}], state()};
+ -> {rabbit_types:message_properties() | undefined,
+ [{rabbit_types:basic_message(), ack()}], state()};
(msg_pred(), false, state())
- -> {undefined, state()}.
+ -> {rabbit_types:message_properties() | undefined,
+ undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a84800c0..e40d9b29 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [2, Res]},
+ BQ = {call, erlang, element, [3, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0caa5be6..997b8e4d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -66,9 +66,9 @@
{list_permissions, [?VHOST_DEF]},
list_user_permissions,
- set_parameter,
- clear_parameter,
- list_parameters,
+ {set_parameter, [?VHOST_DEF]},
+ {clear_parameter, [?VHOST_DEF]},
+ {list_parameters, [?VHOST_DEF]},
{list_queues, [?VHOST_DEF]},
{list_exchanges, [?VHOST_DEF]},
@@ -434,21 +434,25 @@ action(list_permissions, Node, [], Opts, Inform) ->
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
-action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) ->
+action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting runtime parameter ~p for component ~p to ~p",
[Key, Component, Value]),
rpc_call(Node, rabbit_runtime_parameters, parse_set,
- [list_to_binary(Component), list_to_binary(Key), Value]);
+ [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]);
-action(clear_parameter, Node, [Component, Key], _Opts, Inform) ->
+action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
- rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component),
+ rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg,
+ list_to_binary(Component),
list_to_binary(Key)]);
-action(list_parameters, Node, Args = [], _Opts, Inform) ->
+action(list_parameters, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
display_info_list(
- rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args),
+ rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
action(report, Node, _Args, _Opts, Inform) ->
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 80b4e768..05aad8c9 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -59,21 +59,15 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
- heartbeater(
- {Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () ->
- SendFun(),
- continue
- end}).
+ heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
+ fun () -> SendFun(), continue end}).
start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
- heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () ->
- ReceiveFun(),
- stop
- end}).
+ heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
+ fun () -> ReceiveFun(), stop end}).
start_heartbeat_fun(SupPid) ->
fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
@@ -88,17 +82,11 @@ start_heartbeat_fun(SupPid) ->
{Sender, Receiver}
end.
-pause_monitor({_Sender, none}) ->
- ok;
-pause_monitor({_Sender, Receiver}) ->
- Receiver ! pause,
- ok.
+pause_monitor({_Sender, none}) -> ok;
+pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
-resume_monitor({_Sender, none}) ->
- ok;
-resume_monitor({_Sender, Receiver}) ->
- Receiver ! resume,
- ok.
+resume_monitor({_Sender, none}) -> ok;
+resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok.
%%----------------------------------------------------------------------------
start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
@@ -106,8 +94,7 @@ start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback,
- [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
@@ -117,15 +104,11 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
{StatVal, SameCount}) ->
Recurse = fun (V) -> heartbeater(Params, V) end,
receive
- pause ->
- receive
- resume ->
- Recurse({0, 0});
- Other ->
- exit({unexpected_message, Other})
- end;
- Other ->
- exit({unexpected_message, Other})
+ pause -> receive
+ resume -> Recurse({0, 0});
+ Other -> exit({unexpected_message, Other})
+ end;
+ Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 750bcd56..477449e3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
+ {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 29e2d29f..89e334dd 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -64,9 +64,7 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid),
- DeadNodes) orelse
- rabbit_misc:is_process_alive(Pid)],
+ not lists:member(node(Pid), DeadNodes)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e4d78c45..964c3e24 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -19,17 +19,8 @@
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
%%
-%% We join the GM group before we add ourselves to the amqqueue
-%% record. As a result:
-%% 1. We can receive msgs from GM that correspond to messages we will
-%% never receive from publishers.
-%% 2. When we receive a message from publishers, we must receive a
-%% message from the GM group for it.
-%% 3. However, that instruction from the GM group can arrive either
-%% before or after the actual message. We need to be able to
-%% distinguish between GM instructions arriving early, and case (1)
-%% above.
-%%
+%% We receive messages from GM and from publishers, and the gm
+%% messages can arrive either before or after the 'actual' message.
%% All instructions from the GM group must be processed in the order
%% in which they're received.
@@ -89,28 +80,33 @@
synchronised
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
+info(QPid) -> gen_server2:call(QPid, info, infinity).
init(#amqqueue { name = QueueName } = Q) ->
+ %% We join the GM group before we add ourselves to the amqqueue
+ %% record. As a result:
+ %% 1. We can receive msgs from GM that correspond to messages we will
+ %% never receive from publishers.
+ %% 2. When we receive a message from publishers, we must receive a
+ %% message from the GM group for it.
+ %% 3. However, that instruction from the GM group can arrive either
+ %% before or after the actual message. We need to be able to
+ %% distinguish between GM instructions arriving early, and case (1)
+ %% above.
+ %%
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(fun() ->
- init_it(Self, Node,
- QueueName)
- end) of
+ case rabbit_misc:execute_mnesia_transaction(
+ fun() -> init_it(Self, Node, QueueName) end) of
{new, MPid} ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
@@ -153,24 +149,21 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid};
- [QPid] ->
- case rabbit_misc:is_process_alive(QPid) of
- true -> duplicate_live_master;
- false -> {stale, QPid}
- end;
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
- {new, QPid}
- end
+ [] -> MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
@@ -356,14 +349,10 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
handle_msg([_SPid], _From, master_changed) ->
ok;
@@ -584,9 +573,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State #state {
backing_queue_state = BQS1 })),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
@@ -680,26 +669,24 @@ maybe_enqueue_message(
%% msg_seq_no was at the time. We do now!
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
+ {MS1, SQ1} =
+ case needs_confirming(Delivery, State1) of
+ never -> {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)};
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ {dict:store(MsgId, MMS, MS), SQ};
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ {dict:erase(MsgId, MS),
+ remove_from_pending_ch(MsgId, ChPid, SQ)}
+ end,
+ State1 #state { msg_id_status = MS1,
+ sender_queues = SQ1 };
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
@@ -748,18 +735,17 @@ process_instruction(
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } },
_EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never -> MS;
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS , MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ MS
+ end};
{{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index bedf5142..038154c3 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
- close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ close/1, fast_close/1, sockname/1, peername/1, peercert/1,
tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -59,7 +59,7 @@
-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
+-spec(fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -77,6 +77,8 @@
%%---------------------------------------------------------------------------
+-define(SSL_CLOSE_TIMEOUT, 5000).
+
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
@@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
-maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
-maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(Sock#ssl_socket.tcp),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 94a5a2b7..2d0ded12 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
+ {error, timeout} ->
+ {error, {ssl_upgrade_error, timeout}};
{error, Reason} ->
+ %% We have no idea what state the ssl_connection
+ %% process is in - it could still be happily
+ %% going, it might be stuck, or it could be just
+ %% about to fail. There is little that our caller
+ %% can do but close the TCP socket, but this could
+ %% cause ssl alerts to get dropped (which is bad
+ %% form, according to the TLS spec). So we give
+ %% the ssl_connection a little bit of time to send
+ %% such alerts.
+ timer:sleep(?SSL_TIMEOUT * 1000),
{error, {ssl_upgrade_error, Reason}};
{'EXIT', Reason} ->
{error, {ssl_upgrade_failure, Reason}}
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 1551795f..05b43a2e 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -26,7 +26,7 @@
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -46,12 +46,13 @@ name0(Policy) -> pget(<<"name">>, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
-set0(Name) -> match(Name, list()).
+set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
-get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())).
+get(Name, EntityName = #resource{virtual_host = VHost}) ->
+ get0(Name, match(EntityName, list(VHost))).
get0(_Name, undefined) -> {error, not_found};
get0(Name, List) -> case pget(<<"policy">>, List) of
@@ -64,35 +65,33 @@ get0(Name, List) -> case pget(<<"policy">>, List) of
%%----------------------------------------------------------------------------
-validate(<<"policy">>, Name, Term) ->
+validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(<<"policy">>, _Name) ->
+validate_clear(_VHost, <<"policy">>, _Name) ->
ok.
-notify(<<"policy">>, _Name, _Term) ->
- update_policies().
+notify(VHost, <<"policy">>, _Name, _Term) ->
+ update_policies(VHost).
-notify_clear(<<"policy">>, _Name) ->
- update_policies().
+notify_clear(VHost, <<"policy">>, _Name) ->
+ update_policies(VHost).
%%----------------------------------------------------------------------------
-list() ->
+list(VHost) ->
[[{<<"name">>, pget(key, P)} | pget(value, P)]
- || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+ || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-update_policies() ->
- Policies = list(),
+update_policies(VHost) ->
+ Policies = list(VHost),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
{[update_exchange(X, Policies) ||
- VHost <- rabbit_vhost:list(),
- X <- rabbit_exchange:list(VHost)],
+ X <- rabbit_exchange:list(VHost)],
[update_queue(Q, Policies) ||
- VHost <- rabbit_vhost:list(),
- Q <- rabbit_amqqueue:list(VHost)]}
+ Q <- rabbit_amqqueue:list(VHost)]}
end),
[notify(X) || X <- Xs],
[notify(Q) || Q <- Qs],
@@ -129,28 +128,15 @@ match(Name, Policies) ->
[Policy | _Rest] -> Policy
end.
-matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
- Prefix = pget(<<"prefix">>, Policy),
- case pget(<<"vhost">>, Policy) of
- undefined -> prefix(Prefix, Name);
- VHost -> prefix(Prefix, Name);
- _ -> false
- end.
-
-prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+matches(#resource{name = Name}, Policy) ->
+ lists:prefix(binary_to_list(pget(<<"prefix">>, Policy)),
+ binary_to_list(Name)).
sort_pred(A, B) ->
- R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
- case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
- {undefined, undefined} -> R;
- {undefined, _} -> true;
- {_, undefined} -> false;
- _ -> R
- end.
+ size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)).
%%----------------------------------------------------------------------------
policy_validation() ->
- [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional},
- {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
+ [{<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
{<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 19dac70c..aef48b20 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -184,6 +184,8 @@ socket_op(Sock, Fun) ->
{ok, Res} -> Res;
{error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
[self(), Reason]),
+ %% NB: this is tcp socket, even in case of ssl
+ rabbit_net:fast_close(Sock),
exit(normal)
end.
@@ -236,15 +238,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), ConnStr, Ex])
after
- %% The reader is the controlling process and hence its
- %% termination will close the socket. Furthermore,
- %% gen_tcp:close/1 waits for pending output to be sent, which
- %% results in unnecessary delays. However, to keep the
- %% file_handle_cache accounting as accurate as possible it
- %% would be good to close the socket immediately if we
- %% can. But we can only do this for non-ssl sockets.
- %%
- rabbit_net:maybe_fast_close(ClientSock),
+ %% We don't call gen_tcp:close/1 here since it waits for
+ %% pending output to be sent, which results in unnecessary
+ %% delays. We could just terminate - the reader is the
+ %% controlling process and hence its termination will close
+ %% the socket. However, to keep the file_handle_cache
+ %% accounting as accurate as possible we ought to close the
+ %% socket w/o delay before termination.
+ rabbit_net:fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index c7d30116..18668049 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -21,10 +21,12 @@
-type(validate_results() ::
'ok' | {error, string(), [term()]} | [validate_results()]).
--callback validate(binary(), binary(), term()) -> validate_results().
--callback validate_clear(binary(), binary()) -> validate_results().
--callback notify(binary(), binary(), term()) -> 'ok'.
--callback notify_clear(binary(), binary()) -> 'ok'.
+-callback validate(rabbit_types:vhost(), binary(), binary(),
+ term()) -> validate_results().
+-callback validate_clear(rabbit_types:vhost(), binary(),
+ binary()) -> validate_results().
+-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
+-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
-else.
@@ -32,10 +34,10 @@
behaviour_info(callbacks) ->
[
- {validate, 3},
- {validate_clear, 2},
- {notify, 3},
- {notify_clear, 2}
+ {validate, 4},
+ {validate_clear, 3},
+ {notify, 4},
+ {notify_clear, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 3a54e8f6..0707193c 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,8 +18,9 @@
-include("rabbit.hrl").
--export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1,
- list_formatted/0, lookup/2, value/2, value/3, info_keys/0]).
+-export([parse_set/4, set/4, clear/3,
+ list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
+ lookup/3, value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -27,16 +28,23 @@
-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
--spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()).
--spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()).
--spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()).
+-spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string())
+ -> ok_or_error_string()).
+-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
+ -> ok_or_error_string()).
+-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
--spec(list/1 :: (binary()) -> [rabbit_types:infos()]).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
--spec(list_formatted/0 :: () -> [rabbit_types:infos()]).
--spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()).
--spec(value/2 :: (binary(), binary()) -> term()).
--spec(value/3 :: (binary(), binary(), term()) -> term()).
+-spec(list/2 :: (rabbit_types:vhost(), binary()) -> [rabbit_types:infos()]).
+-spec(list_strict/2 :: (rabbit_types:vhost(), binary())
+ -> [rabbit_types:infos()] | 'not_found').
+-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
+-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> rabbit_types:infos()).
+-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
+-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-endif.
@@ -49,14 +57,14 @@
%%---------------------------------------------------------------------------
-parse_set(Component, Key, String) ->
+parse_set(VHost, Component, Key, String) ->
case parse(String) of
- {ok, Term} -> set(Component, Key, Term);
+ {ok, Term} -> set(VHost, Component, Key, Term);
{errors, L} -> format_error(L)
end.
-set(Component, Key, Term) ->
- case set0(Component, Key, Term) of
+set(VHost, Component, Key, Term) ->
+ case set0(VHost, Component, Key, Term) of
ok -> ok;
{errors, L} -> format_error(L)
end.
@@ -64,16 +72,18 @@ set(Component, Key, Term) ->
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set0(Component, Key, Term) ->
+set0(VHost, Component, Key, Term) ->
case lookup_component(Component) of
{ok, Mod} ->
case flatten_errors(validate(Term)) of
ok ->
- case flatten_errors(Mod:validate(Component, Key, Term)) of
+ case flatten_errors(
+ Mod:validate(VHost, Component, Key, Term)) of
ok ->
- case mnesia_update(Component, Key, Term) of
+ case mnesia_update(VHost, Component, Key, Term) of
{old, Term} -> ok;
- _ -> Mod:notify(Component, Key, Term)
+ _ -> Mod:notify(
+ VHost, Component, Key, Term)
end,
ok;
E ->
@@ -86,95 +96,103 @@ set0(Component, Key, Term) ->
E
end.
-mnesia_update(Component, Key, Term) ->
+mnesia_update(VHost, Component, Key, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {Component, Key}, read) of
+ Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(Component, Key, Term), write),
+ ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write),
Res
end).
-clear(Component, Key) ->
- case clear0(Component, Key) of
+clear(VHost, Component, Key) ->
+ case clear0(VHost, Component, Key) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-clear0(Component, Key) ->
+clear0(VHost, Component, Key) ->
case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of
- ok -> mnesia_clear(Component, Key),
- Mod:notify_clear(Component, Key),
+ {ok, Mod} -> case flatten_errors(
+ Mod:validate_clear(VHost, Component, Key)) of
+ ok -> mnesia_clear(VHost, Component, Key),
+ Mod:notify_clear(VHost, Component, Key),
ok;
E -> E
end;
E -> E
end.
-mnesia_clear(Component, Key) ->
+mnesia_clear(VHost, Component, Key) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
- ok = mnesia:delete(?TABLE, {Component, Key}, write)
+ ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write)
end).
list() ->
[p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
-list(Component) -> list(Component, []).
-list_strict(Component) -> list(Component, not_found).
-
-list(Component, Default) ->
- case lookup_component(Component) of
- {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'},
- [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
- _ -> Default
+list(VHost) -> list(VHost, '_', []).
+list_strict(Component) -> list('_', Component, not_found).
+list(VHost, Component) -> list(VHost, Component, []).
+list_strict(VHost, Component) -> list(VHost, Component, not_found).
+
+list(VHost, Component, Default) ->
+ case component_good(Component) of
+ true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
+ _ = '_'},
+ [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ _ -> Default
end.
-list_formatted() ->
- [pset(value, format(pget(value, P)), P) || P <- list()].
+list_formatted(VHost) ->
+ [pset(value, format(pget(value, P)), P) || P <- list(VHost)].
-lookup(Component, Key) ->
- case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+lookup(VHost, Component, Key) ->
+ case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(Component, Key) ->
- case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+value(VHost, Component, Key) ->
+ case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(Component, Key, Default) ->
- Params = lookup0(Component, Key,
- fun () -> lookup_missing(Component, Key, Default) end),
+value(VHost, Component, Key, Default) ->
+ Params = lookup0(VHost, Component, Key,
+ fun () ->
+ lookup_missing(VHost, Component, Key, Default)
+ end),
Params#runtime_parameters.value.
-lookup0(Component, Key, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {Component, Key}) of
+lookup0(VHost, Component, Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(Component, Key, Default) ->
+lookup_missing(VHost, Component, Key, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {Component, Key}, read) of
- [] -> Record = c(Component, Key, Default),
+ case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
+ [] -> Record = c(VHost, Component, Key, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key},
- value = Default}.
+c(VHost, Component, Key, Default) ->
+ #runtime_parameters{key = {VHost, Component, Key},
+ value = Default}.
-p(#runtime_parameters{key = {Component, Key}, value = Value}) ->
- [{component, Component},
+p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) ->
+ [{vhost, VHost},
+ {component, Component},
{key, Key},
{value, Value}].
@@ -182,6 +200,12 @@ info_keys() -> [component, key, value].
%%---------------------------------------------------------------------------
+component_good('_') -> true;
+component_good(Component) -> case lookup_component(Component) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
lookup_component(Component) ->
case rabbit_registry:lookup_module(
runtime_parameter, list_to_atom(binary_to_list(Component))) of
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index f23b3227..5224ccaa 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -17,7 +17,7 @@
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
--export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
register() ->
@@ -26,13 +26,13 @@ register() ->
unregister() ->
rabbit_registry:unregister(runtime_parameter, <<"test">>).
-validate(<<"test">>, <<"good">>, _Term) -> ok;
-validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok;
-validate(<<"test">>, _, _) -> {error, "meh", []}.
+validate(_, <<"test">>, <<"good">>, _Term) -> ok;
+validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
+validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(<<"test">>, <<"good">>) -> ok;
-validate_clear(<<"test">>, <<"maybe">>) -> ok;
-validate_clear(<<"test">>, _) -> {error, "meh", []}.
+validate_clear(_, <<"test">>, <<"good">>) -> ok;
+validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
+validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-notify(_, _, _) -> ok.
-notify_clear(_, _) -> ok.
+notify(_, _, _, _) -> ok.
+notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7fbe6bab..1fe655e7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -33,6 +33,7 @@
all_tests() ->
ok = setup_cluster(),
+ ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
@@ -2287,10 +2288,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
+ {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2307,11 +2308,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {undefined, VQ6} =
+ {_, undefined, VQ6} =
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
VQ6.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49213c95..bd606dfb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
- (S) -> {undefined, S}
+ End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
+ (Next, S) -> {Next, undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
- End(a(State1));
+ End(undefined, a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
@@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- End(a(in_r(MsgStatus, State1)))
+ End(MsgProps, a(in_r(MsgStatus, State1)))
end
end.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 5548ef6d..03dfbe24 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -90,12 +90,13 @@ delete(VHostPath) ->
R.
internal_delete(VHostPath) ->
- lists:foreach(
- fun (Info) ->
- ok = rabbit_auth_backend_internal:clear_permissions(
- proplists:get_value(user, Info), VHostPath)
- end,
- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)),
+ [ok = rabbit_auth_backend_internal:clear_permissions(
+ proplists:get_value(user, Info), VHostPath)
+ || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
+ [ok = rabbit_runtime_parameters:clear(VHostPath,
+ proplists:get_value(component, Info),
+ proplists:get_value(key, Info))
+ || Info <- rabbit_runtime_parameters:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 3d3623d7..5af38573 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -255,10 +255,10 @@ behaviour_info(_Other) ->
%%% ---------------------------------------------------
start_link(Mod, Args) ->
gen_server:start_link(?MODULE, {self, Mod, Args}, []).
-
+
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
+
%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------
@@ -298,9 +298,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
check_childspecs(X) -> {error, {badarg, X}}.
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
@@ -319,7 +319,7 @@ init({SupName, Mod, Args}) ->
Error ->
{stop, {bad_return, {Mod, init, Error}}}
end.
-
+
init_children(State, StartSpec) ->
SupName = State#state.name,
case check_startspec(StartSpec) of
@@ -349,7 +349,7 @@ init_dynamic(_State, StartSpec) ->
%% Func: start_children/2
%% Args: Children = [#child] in start order
%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Purpose: Start all children. The new list contains #child's
+%% Purpose: Start all children. The new list contains #child's
%% with pids.
%% Returns: {ok, NChildren} | {error, NChildren}
%% NChildren = [#child] in termination order (reversed
@@ -381,7 +381,7 @@ do_start_child(SupName, Child) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
{ok, Pid, Extra};
- ignore ->
+ ignore ->
{ok, undefined};
{error, What} -> {error, What};
What -> {error, What}
@@ -400,12 +400,12 @@ do_start_child_i(M, F, A) ->
What ->
{error, What}
end.
-
+
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
@@ -414,11 +414,11 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
+ NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid, Extra}, NState};
What ->
@@ -497,7 +497,7 @@ handle_call(which_children, _From, State) ->
%%% Hopefully cause a function-clause as there is no API function
%%% that utilizes cast.
handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
[]),
{noreply, State}.
@@ -527,7 +527,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
end;
handle_info(Msg, State) ->
- error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
%%
@@ -577,13 +577,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
- case check_startspec(StartSpec) of
- {ok, [Child]} ->
- {ok, State#state{children = [Child]}};
- Error ->
- {error, Error}
- end;
+update_childspec(State, StartSpec) when ?is_simple(State) ->
+ case check_startspec(StartSpec) of
+ {ok, [Child]} ->
+ {ok, State#state{children = [Child]}};
+ Error ->
+ {error, Error}
+ end;
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
@@ -604,7 +604,7 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
end;
update_childspec1([], Children, KeepOld) ->
% Return them in (keeped) reverse start order.
- lists:reverse(Children ++ KeepOld).
+ lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
@@ -618,7 +618,7 @@ update_chsp(OldCh, Children) ->
NewC ->
{ok, NewC}
end.
-
+
%%% ---------------------------------------------------
%%% Start a new child.
%%% ---------------------------------------------------
@@ -630,12 +630,12 @@ handle_start_child(Child, State) ->
{ok, Pid} ->
Children = State#state.children,
{{ok, Pid},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{ok, Pid, Extra} ->
Children = State#state.children,
{{ok, Pid, Extra},
- State#state{children =
+ State#state{children =
[Child#child{pid = Pid}|Children]}};
{error, What} ->
{{error, {What, Child}}, State}
@@ -816,29 +816,32 @@ terminate_simple_children(Child, Dynamics, SupName) ->
{Replies, Timedout} =
lists:foldl(
fun (_Pid, {Replies, Timedout}) ->
- {Reply, Timedout1} =
+ {Pid1, Reason1, Timedout1} =
receive
TimeoutMsg ->
Remaining = Pids -- [P || {P, _} <- Replies],
[exit(P, kill) || P <- Remaining],
- receive {'DOWN', _MRef, process, Pid, Reason} ->
- {{error, Reason}, true}
+ receive
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ {Pid, Reason, true}
end;
{'DOWN', _MRef, process, Pid, Reason} ->
- {child_res(Child, Reason, Timedout), Timedout};
- {'EXIT', Pid, Reason} ->
- receive {'DOWN', _MRef, process, Pid, _} ->
- {{error, Reason}, Timedout}
- end
+ {Pid, Reason, Timedout}
end,
- {[{Pid, Reply} | Replies], Timedout1}
+ {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies],
+ Timedout1}
end, {[], false}, Pids),
timeout_stop(Child, TRef, TimeoutMsg, Timedout),
ReportError = shutdown_error_reporter(SupName),
- [case Reply of
- {_Pid, ok} -> ok;
- {Pid, {error, R}} -> ReportError(R, Child#child{pid = Pid})
- end || Reply <- Replies],
+ Report = fun(_, ok) -> ok;
+ (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid})
+ end,
+ [receive
+ {'EXIT', Pid, Reason} ->
+ Report(Pid, child_res(Child, Reason, Timedout))
+ after
+ 0 -> Report(Pid, Reply)
+ end || {Pid, Reply} <- Replies],
ok.
child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
@@ -863,7 +866,7 @@ timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
after
0 -> ok
end;
-timeout_stop(#child{}, ok, _Msg, _Timedout) ->
+timeout_stop(#child{}, _TRef, _Msg, _Timedout) ->
ok.
do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
@@ -885,17 +888,17 @@ do_terminate(Child, _SupName) ->
Child.
%%-----------------------------------------------------------------
-%% Shutdowns a child. We must check the EXIT value
+%% Shutdowns a child. We must check the EXIT value
%% of the child, because it might have died with another reason than
-%% the wanted. In that case we want to report the error. We put a
-%% monitor on the child an check for the 'DOWN' message instead of
-%% checking for the 'EXIT' message, because if we check the 'EXIT'
-%% message a "naughty" child, who does unlink(Sup), could hang the
-%% supervisor.
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
%% Returns: ok | {error, OtherReason} (this should be reported)
%%-----------------------------------------------------------------
shutdown(Pid, brutal_kill) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, kill),
@@ -905,16 +908,16 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end;
shutdown(Pid, Time) ->
-
+
case monitor_child(Pid) of
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
- receive
+ receive
{'DOWN', _MRef, process, Pid, shutdown} ->
ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
@@ -926,14 +929,14 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end.
%% Help function to shutdown/2 switches from link to monitor approach
monitor_child(Pid) ->
-
- %% Do the monitor operation first so that if the child dies
+
+ %% Do the monitor operation first so that if the child dies
%% before the monitoring is done causing a 'DOWN'-message with
%% reason noproc, we will get the real reason in the 'EXIT'-message
%% unless a naughty child has already done unlink...
@@ -943,22 +946,22 @@ monitor_child(Pid) ->
receive
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
- {'EXIT', Pid, Reason} ->
- receive
+ {'EXIT', Pid, Reason} ->
+ receive
{'DOWN', _, process, Pid, _} ->
{error, Reason}
end
- after 0 ->
+ after 0 ->
%% If a naughty child did unlink and the child dies before
- %% monitor the result will be that shutdown/2 receives a
+ %% monitor the result will be that shutdown/2 receives a
%% 'DOWN'-message with reason noproc.
%% If the child should die after the unlink there
%% will be a 'DOWN'-message with a correct reason
- %% that will be handled in shutdown/2.
- ok
+ %% that will be handled in shutdown/2.
+ ok
end.
-
-
+
+
%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
@@ -1012,7 +1015,7 @@ remove_child(Child, State) ->
%% Args: SupName = {local, atom()} | {global, atom()} | self
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
-%% rest_for_one
+%% rest_for_one
%% MaxIntensity = integer()
%% Period = integer()
%% Mod :== atom()
@@ -1107,10 +1110,10 @@ validChildType(supervisor) -> true;
validChildType(worker) -> true;
validChildType(What) -> throw({invalid_child_type, What}).
-validName(_Name) -> true.
+validName(_Name) -> true.
-validFunc({M, F, A}) when is_atom(M),
- is_atom(F),
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
@@ -1128,7 +1131,7 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
validShutdown(infinity, supervisor) -> true;
validShutdown(brutal_kill, _) -> true;
@@ -1154,7 +1157,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}).
%%% Returns: {ok, State'} | {terminate, State'}
%%% ------------------------------------------------------
-add_restart(State) ->
+add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
new file mode 100644
index 00000000..5c5e6d85
--- /dev/null
+++ b/src/supervisor2_tests.erl
@@ -0,0 +1,79 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(supervisor2_tests).
+-behaviour(supervisor2).
+
+-export([test_all/0, start_link/0]).
+-export([init/1]).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TEST_RUNS, 2000).
+-define(SLOW_TEST_RUNS, 45).
+-define(CHILDREN, 100).
+
+test_all() ->
+ eunit:test(?MODULE, [verbose]).
+
+simple_child_shutdown_without_deadlock_test_() ->
+ [{timeout, ?TEST_RUNS * 10,
+ check_shutdown_handling(stop, ?TEST_RUNS, ?CHILDREN)}].
+
+simple_child_shutdown_with_timeout_test_() ->
+ [{timeout, ?SLOW_TEST_RUNS * 10,
+ check_shutdown_handling(ignored, ?SLOW_TEST_RUNS, ?CHILDREN)}].
+
+check_shutdown_handling(SigStop, Iterations, ChildCount) ->
+ fun() ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, [?CHILDREN]),
+ [begin
+ TestSupPid = erlang:whereis(?MODULE),
+ ChildPids = [begin
+ {ok, ChildPid} =
+ supervisor2:start_child(TestSupPid, []),
+ ChildPid
+ end || _ <- lists:seq(1, ChildCount)],
+ erlang:monitor(process, TestSupPid),
+ [P ! SigStop || P <- ChildPids],
+ ?assertEqual(ok, supervisor2:terminate_child(Sup, test_sup)),
+ {ok, _} = supervisor2:restart_child(Sup, test_sup),
+ receive
+ {'DOWN', _MRef, process, TestSupPid, Reason} ->
+ ?assertEqual(shutdown, Reason)
+ end
+ end || _ <- lists:seq(1, Iterations)],
+ unlink(Sup),
+ exit(Sup, shutdown)
+ end.
+
+start_link() ->
+ Pid = spawn_link(fun () ->
+ process_flag(trap_exit, true),
+ receive stop -> ok end
+ end),
+ {ok, Pid}.
+
+init([N]) ->
+ {ok, {{one_for_one, 0, 1},
+ [{test_sup, {supervisor2, start_link,
+ [{local, ?MODULE}, ?MODULE, []]},
+ transient, N * 100, supervisor, [?MODULE]}]}};
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{test_worker, {?MODULE, start_link, []},
+ temporary, 1000, worker, [?MODULE]}]}}.
+