diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-04 14:37:53 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-04 14:37:53 +0100 |
commit | d726495570aa4a7773c68d8ebb753ddc1a2959e3 (patch) | |
tree | d9f6995ea1442ee91005497dafe8a8517cf55e59 | |
parent | e41f42aee9a6b95ca95c6dc63d2310c336f7d51f (diff) | |
parent | f36c506dc7f8ad5bb52fa91f23de76d8fc216a27 (diff) | |
download | rabbitmq-server-d726495570aa4a7773c68d8ebb753ddc1a2959e3.tar.gz |
Merge default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 27 | ||||
-rw-r--r-- | packaging/macports/Portfile.in | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 62 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 22 | ||||
-rw-r--r-- | src/rabbit_heartbeat.erl | 45 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 148 | ||||
-rw-r--r-- | src/rabbit_net.erl | 33 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 12 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 56 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 19 | ||||
-rw-r--r-- | src/rabbit_runtime_parameter.erl | 18 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 134 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters_test.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 15 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 13 | ||||
-rw-r--r-- | src/supervisor2.erl | 135 | ||||
-rw-r--r-- | src/supervisor2_tests.erl | 79 |
22 files changed, 493 insertions, 371 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c74ec785..4a038da0 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -657,7 +657,7 @@ </para> <para> Deleting a virtual host deletes all its exchanges, - queues, user mappings and associated permissions. + queues, bindings, user permissions and parameters. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl delete_vhost test</screen> @@ -826,15 +826,16 @@ Certain features of RabbitMQ (such as the federation plugin) are controlled by dynamic, cluster-wide <emphasis>parameters</emphasis>. Each parameter - consists of a component name, a key and a value. The - component name and key are strings, and the value is an - Erlang term. Parameters can be set, cleared and listed. In - general you should refer to the documentation for the feature - in question to see how to set parameters. + consists of a component name, a key and a value, and is + associated with a virtual host. The component name and key are + strings, and the value is an Erlang term. Parameters can be + set, cleared and listed. In general you should refer to the + documentation for the feature in question to see how to set + parameters. </para> <variablelist> <varlistentry> - <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>set_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> <listitem> <para> Sets a parameter. @@ -865,12 +866,12 @@ <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl set_parameter federation local_username '<<"guest">>'</screen> <para role="example"> - This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command><<"guest">></command>. + This command sets the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host to the Erlang term <command><<"guest">></command>. </para> </listitem> </varlistentry> <varlistentry> - <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>clear_parameter</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term> <listitem> <para> Clears a parameter. @@ -893,20 +894,20 @@ <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl clear_parameter federation local_username</screen> <para role="example"> - This command clears the parameter <command>local_username</command> for the <command>federation</command> component. + This command clears the parameter <command>local_username</command> for the <command>federation</command> component in the default virtual host. </para> </listitem> </varlistentry> <varlistentry> - <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term> + <term><cmdsynopsis><command>list_parameters</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> <listitem> <para> - Lists all parameters. + Lists all parameters for a virtual host. </para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl list_parameters</screen> <para role="example"> - This command lists all parameters. + This command lists all parameters in the default virtual host. </para> </listitem> </varlistentry> diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index e461e49e..82c1fb0c 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -59,7 +59,7 @@ set mandest ${destroot}${prefix}/share/man use_configure no -use_parallel_build yes +use_parallel_build no build.env-append HOME=${workpath} diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6bf290de..f5a3a5f1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + ttl_timer_expiry, senders, publish_seqno, unconfirmed, @@ -107,7 +108,7 @@ ]). -define(INFO_KEYS, - ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]). + ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). %%---------------------------------------------------------------------------- @@ -576,7 +577,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -716,30 +718,42 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), DLXFun = dead_letter_fun(expired, State), - ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - BQS1 = case DLXFun of - undefined -> {undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - BQS2; - _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - lists:foreach( - fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, + ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, + {Props, BQS1} = + case DLXFun of + undefined -> + {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> + {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), - BQS2 - end, - ensure_ttl_timer(State#q{backing_queue_state = BQS1}). - -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) - when TTL =/= undefined -> - case BQ:is_empty(BQS) of - true -> State; - false -> TRef = erlang:send_after(TTL, self(), drop_expired), - State#q{ttl_timer_ref = TRef} + {Next, BQS2} + end, + ensure_ttl_timer(case Props of + undefined -> undefined; + #message_properties{expiry = Exp} -> Exp + end, State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(undefined, State) -> + State; +ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) -> + State; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> + After = (case Expiry - now_micros() of + V when V > 0 -> V + 999; %% always fire later + _ -> 0 + end) div 1000, + TRef = erlang:send_after(After, self(), drop_expired), + State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, + ttl_timer_expiry = TExpiry}) + when Expiry + 1000 < TExpiry -> + case erlang:cancel_timer(TRef) of + false -> State; + _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; -ensure_ttl_timer(State) -> +ensure_ttl_timer(_Expiry, State) -> State. ack_if_no_dlx(AckTags, State = #q{dlx = undefined, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 95523bed..ed5340fe 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -124,9 +124,11 @@ %% necessitate an ack or not. If they do, the function returns a list of %% messages with the respective acktags. -callback dropwhile(msg_pred(), true, state()) - -> {[{rabbit_types:basic_message(), ack()}], state()}; + -> {rabbit_types:message_properties() | undefined, + [{rabbit_types:basic_message(), ack()}], state()}; (msg_pred(), false, state()) - -> {undefined, state()}. + -> {rabbit_types:message_properties() | undefined, + undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index a84800c0..e40d9b29 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> S#state{bqstate = BQ1}; next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [2, Res]}, + BQ = {call, erlang, element, [3, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 0caa5be6..997b8e4d 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -66,9 +66,9 @@ {list_permissions, [?VHOST_DEF]}, list_user_permissions, - set_parameter, - clear_parameter, - list_parameters, + {set_parameter, [?VHOST_DEF]}, + {clear_parameter, [?VHOST_DEF]}, + {list_parameters, [?VHOST_DEF]}, {list_queues, [?VHOST_DEF]}, {list_exchanges, [?VHOST_DEF]}, @@ -434,21 +434,25 @@ action(list_permissions, Node, [], Opts, Inform) -> list_vhost_permissions, [VHost]}), rabbit_auth_backend_internal:vhost_perms_info_keys()); -action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) -> +action(set_parameter, Node, [Component, Key, Value], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Setting runtime parameter ~p for component ~p to ~p", [Key, Component, Value]), rpc_call(Node, rabbit_runtime_parameters, parse_set, - [list_to_binary(Component), list_to_binary(Key), Value]); + [VHostArg, list_to_binary(Component), list_to_binary(Key), Value]); -action(clear_parameter, Node, [Component, Key], _Opts, Inform) -> +action(clear_parameter, Node, [Component, Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]), - rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component), + rpc_call(Node, rabbit_runtime_parameters, clear, [VHostArg, + list_to_binary(Component), list_to_binary(Key)]); -action(list_parameters, Node, Args = [], _Opts, Inform) -> +action(list_parameters, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing runtime parameters", []), display_info_list( - rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args), + rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), rabbit_runtime_parameters:info_keys()); action(report, Node, _Args, _Opts, Inform) -> diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 80b4e768..05aad8c9 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -59,21 +59,15 @@ start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - heartbeater( - {Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> - SendFun(), - continue - end}). + heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> SendFun(), continue end}). start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - ReceiveFun(), - stop - end}). + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, + fun () -> ReceiveFun(), stop end}). start_heartbeat_fun(SupPid) -> fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> @@ -88,17 +82,11 @@ start_heartbeat_fun(SupPid) -> {Sender, Receiver} end. -pause_monitor({_Sender, none}) -> - ok; -pause_monitor({_Sender, Receiver}) -> - Receiver ! pause, - ok. +pause_monitor({_Sender, none}) -> ok; +pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor({_Sender, none}) -> - ok; -resume_monitor({_Sender, Receiver}) -> - Receiver ! resume, - ok. +resume_monitor({_Sender, none}) -> ok; +resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> @@ -106,8 +94,7 @@ start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> supervisor2:start_child( SupPid, {Name, - {rabbit_heartbeat, Callback, - [Sock, TimeoutSec, TimeoutFun]}, + {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> @@ -117,15 +104,11 @@ heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, {StatVal, SameCount}) -> Recurse = fun (V) -> heartbeater(Params, V) end, receive - pause -> - receive - resume -> - Recurse({0, 0}); - Other -> - exit({unexpected_message, Other}) - end; - Other -> - exit({unexpected_message, Other}) + pause -> receive + resume -> Recurse({0, 0}); + Other -> exit({unexpected_message, Other}) + end; + Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 750bcd56..477449e3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), + {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - {Msgs, State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 } }. + {Next, Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 29e2d29f..89e334dd 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -64,9 +64,7 @@ remove_from_queue(QueueName, DeadPids) -> slave_pids = SPids }] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), - DeadNodes) orelse - rabbit_misc:is_process_alive(Pid)], + not lists:member(node(Pid), DeadNodes)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, []}; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e4d78c45..964c3e24 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -19,17 +19,8 @@ %% For general documentation of HA design, see %% rabbit_mirror_queue_coordinator %% -%% We join the GM group before we add ourselves to the amqqueue -%% record. As a result: -%% 1. We can receive msgs from GM that correspond to messages we will -%% never receive from publishers. -%% 2. When we receive a message from publishers, we must receive a -%% message from the GM group for it. -%% 3. However, that instruction from the GM group can arrive either -%% before or after the actual message. We need to be able to -%% distinguish between GM instructions arriving early, and case (1) -%% above. -%% +%% We receive messages from GM and from publishers, and the gm +%% messages can arrive either before or after the 'actual' message. %% All instructions from the GM group must be processed in the order %% in which they're received. @@ -89,28 +80,33 @@ synchronised }). -start_link(Q) -> - gen_server2:start_link(?MODULE, Q, []). +start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -info(QPid) -> - gen_server2:call(QPid, info, infinity). +info(QPid) -> gen_server2:call(QPid, info, infinity). init(#amqqueue { name = QueueName } = Q) -> + %% We join the GM group before we add ourselves to the amqqueue + %% record. As a result: + %% 1. We can receive msgs from GM that correspond to messages we will + %% never receive from publishers. + %% 2. When we receive a message from publishers, we must receive a + %% message from the GM group for it. + %% 3. However, that instruction from the GM group can arrive either + %% before or after the actual message. We need to be able to + %% distinguish between GM instructions arriving early, and case (1) + %% above. + %% + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> ok end, Self = self(), Node = node(), - case rabbit_misc:execute_mnesia_transaction(fun() -> - init_it(Self, Node, - QueueName) - end) of + case rabbit_misc:execute_mnesia_transaction( + fun() -> init_it(Self, Node, QueueName) end) of {new, MPid} -> - process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), - receive {joined, GM} -> - ok - end, erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), @@ -153,24 +149,21 @@ init_it(Self, Node, QueueName) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> - MPids1 = MPids ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), - {new, QPid}; - [QPid] -> - case rabbit_misc:is_process_alive(QPid) of - true -> duplicate_live_master; - false -> {stale, QPid} - end; - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), - {new, QPid} - end + [] -> MPids1 = MPids ++ [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), + {new, QPid}; + [QPid] -> case rabbit_misc:is_process_alive(QPid) of + true -> duplicate_live_master; + false -> {stale, QPid} + end; + [SPid] -> case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> MPids1 = (MPids -- [SPid]) ++ [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{slave_pids = MPids1}), + {new, QPid} + end end. handle_call({deliver, Delivery = #delivery { immediate = true }}, @@ -356,14 +349,10 @@ prioritise_info(Msg, _State) -> %% GM %% --------------------------------------------------------------------------- -joined([SPid], _Members) -> - SPid ! {joined, self()}, - ok. +joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, []) -> - ok; -members_changed([SPid], _Births, Deaths) -> - inform_deaths(SPid, Deaths). +members_changed([_SPid], _Births, []) -> ok; +members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). handle_msg([_SPid], _From, master_changed) -> ok; @@ -584,9 +573,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 })), case BQ:needs_timeout(BQS1) of - false -> {stop_sync_timer(State1), hibernate}; - idle -> {stop_sync_timer(State1), 0 }; - timed -> {ensure_sync_timer(State1), 0 } + false -> {stop_sync_timer(State1), hibernate }; + idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; + timed -> {ensure_sync_timer(State1), 0 } end. backing_queue_timeout(State = #state { backing_queue = BQ }) -> @@ -680,26 +669,24 @@ maybe_enqueue_message( %% msg_seq_no was at the time. We do now! ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { sender_queues = SQ1, - msg_id_status = dict:erase(MsgId, MS) }; + State1 #state { msg_id_status = dict:erase(MsgId, MS), + sender_queues = SQ1 }; {ok, {published, ChPid}} -> %% It was published to the BQ and we didn't know the %% msg_seq_no so couldn't confirm it at the time. - case needs_confirming(Delivery, State1) of - never -> - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - eventually -> - State1 #state { - msg_id_status = - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 } - end; + {MS1, SQ1} = + case needs_confirming(Delivery, State1) of + never -> {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)}; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + {dict:store(MsgId, MMS, MS), SQ}; + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + {dict:erase(MsgId, MS), + remove_from_pending_ch(MsgId, ChPid, SQ)} + end, + State1 #state { msg_id_status = MS1, + sender_queues = SQ1 }; {ok, discarded} -> %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. @@ -748,18 +735,17 @@ process_instruction( msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId } }, _EnqueueOnPromotion}}, MQ2} -> - %% We received the msg from the channel first. Thus we - %% need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> - {MQ2, PendingCh, MS}; - eventually -> - {MQ2, PendingCh, - dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; - immediately -> - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), - {MQ2, PendingCh, MS} - end; + {MQ2, PendingCh, + %% We received the msg from the channel first. Thus + %% we need to deal with confirms here. + case needs_confirming(Delivery, State1) of + never -> MS; + eventually -> MMS = {published, ChPid, MsgSeqNo}, + dict:store(MsgId, MMS , MS); + immediately -> ok = rabbit_misc:confirm_to_sender( + ChPid, [MsgSeqNo]), + MS + end}; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index bedf5142..038154c3 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -19,7 +19,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, - close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1, + close/1, fast_close/1, sockname/1, peername/1, peercert/1, tune_buffer_size/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -59,7 +59,7 @@ -spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). --spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()). +-spec(fast_close/1 :: (socket()) -> ok_or_any_error()). -spec(sockname/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). @@ -77,6 +77,8 @@ %%--------------------------------------------------------------------------- +-define(SSL_CLOSE_TIMEOUT, 5000). + -define(IS_SSL(Sock), is_record(Sock, ssl_socket)). is_ssl(Sock) -> ?IS_SSL(Sock). @@ -148,8 +150,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). -maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok; -maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok. +fast_close(Sock) when ?IS_SSL(Sock) -> + %% We cannot simply port_close the underlying tcp socket since the + %% TLS protocol is quite insistent that a proper closing handshake + %% should take place (see RFC 5245 s7.2.1). So we call ssl:close + %% instead, but that can block for a very long time, e.g. when + %% there is lots of pending output and there is tcp backpressure, + %% or the ssl_connection process has entered the the + %% workaround_transport_delivery_problems function during + %% termination, which, inexplicably, does a gen_tcp:recv(Socket, + %% 0), which may never return if the client doesn't send a FIN or + %% that gets swallowed by the network. Since there is no timeout + %% variant of ssl:close, we construct our own. + {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end), + erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}), + receive + {Pid, ssl_close_timeout} -> + erlang:demonitor(MRef, [flush]), + exit(Pid, kill); + {'DOWN', MRef, process, Pid, _Reason} -> + ok + end, + catch port_close(Sock#ssl_socket.tcp), + ok; +fast_close(Sock) when is_port(Sock) -> + catch port_close(Sock), ok. sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 94a5a2b7..2d0ded12 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -160,7 +160,19 @@ ssl_transform_fun(SslOpts) -> case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of {ok, SslSock} -> {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; + {error, timeout} -> + {error, {ssl_upgrade_error, timeout}}; {error, Reason} -> + %% We have no idea what state the ssl_connection + %% process is in - it could still be happily + %% going, it might be stuck, or it could be just + %% about to fail. There is little that our caller + %% can do but close the TCP socket, but this could + %% cause ssl alerts to get dropped (which is bad + %% form, according to the TLS spec). So we give + %% the ssl_connection a little bit of time to send + %% such alerts. + timer:sleep(?SSL_TIMEOUT * 1000), {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 1551795f..05b43a2e 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -26,7 +26,7 @@ -export([register/0]). -export([name/1, get/2, set/1]). --export([validate/3, validate_clear/2, notify/3, notify_clear/2]). +-export([validate/4, validate_clear/3, notify/4, notify_clear/3]). -rabbit_boot_step({?MODULE, [{description, "policy parameters"}, @@ -46,12 +46,13 @@ name0(Policy) -> pget(<<"name">>, Policy). set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. -set0(Name) -> match(Name, list()). +set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. -get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())). +get(Name, EntityName = #resource{virtual_host = VHost}) -> + get0(Name, match(EntityName, list(VHost))). get0(_Name, undefined) -> {error, not_found}; get0(Name, List) -> case pget(<<"policy">>, List) of @@ -64,35 +65,33 @@ get0(Name, List) -> case pget(<<"policy">>, List) of %%---------------------------------------------------------------------------- -validate(<<"policy">>, Name, Term) -> +validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). -validate_clear(<<"policy">>, _Name) -> +validate_clear(_VHost, <<"policy">>, _Name) -> ok. -notify(<<"policy">>, _Name, _Term) -> - update_policies(). +notify(VHost, <<"policy">>, _Name, _Term) -> + update_policies(VHost). -notify_clear(<<"policy">>, _Name) -> - update_policies(). +notify_clear(VHost, <<"policy">>, _Name) -> + update_policies(VHost). %%---------------------------------------------------------------------------- -list() -> +list(VHost) -> [[{<<"name">>, pget(key, P)} | pget(value, P)] - || P <- rabbit_runtime_parameters:list(<<"policy">>)]. + || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. -update_policies() -> - Policies = list(), +update_policies(VHost) -> + Policies = list(VHost), {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( fun() -> {[update_exchange(X, Policies) || - VHost <- rabbit_vhost:list(), - X <- rabbit_exchange:list(VHost)], + X <- rabbit_exchange:list(VHost)], [update_queue(Q, Policies) || - VHost <- rabbit_vhost:list(), - Q <- rabbit_amqqueue:list(VHost)]} + Q <- rabbit_amqqueue:list(VHost)]} end), [notify(X) || X <- Xs], [notify(Q) || Q <- Qs], @@ -129,28 +128,15 @@ match(Name, Policies) -> [Policy | _Rest] -> Policy end. -matches(#resource{name = Name, virtual_host = VHost}, Policy) -> - Prefix = pget(<<"prefix">>, Policy), - case pget(<<"vhost">>, Policy) of - undefined -> prefix(Prefix, Name); - VHost -> prefix(Prefix, Name); - _ -> false - end. - -prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)). +matches(#resource{name = Name}, Policy) -> + lists:prefix(binary_to_list(pget(<<"prefix">>, Policy)), + binary_to_list(Name)). sort_pred(A, B) -> - R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)), - case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of - {undefined, undefined} -> R; - {undefined, _} -> true; - {_, undefined} -> false; - _ -> R - end. + size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)). %%---------------------------------------------------------------------------- policy_validation() -> - [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional}, - {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory}, + [{<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory}, {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 19dac70c..aef48b20 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -184,6 +184,8 @@ socket_op(Sock, Fun) -> {ok, Res} -> Res; {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", [self(), Reason]), + %% NB: this is tcp socket, even in case of ssl + rabbit_net:fast_close(Sock), exit(normal) end. @@ -236,15 +238,14 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, end, "closing AMQP connection ~p (~s):~n~p~n", [self(), ConnStr, Ex]) after - %% The reader is the controlling process and hence its - %% termination will close the socket. Furthermore, - %% gen_tcp:close/1 waits for pending output to be sent, which - %% results in unnecessary delays. However, to keep the - %% file_handle_cache accounting as accurate as possible it - %% would be good to close the socket immediately if we - %% can. But we can only do this for non-ssl sockets. - %% - rabbit_net:maybe_fast_close(ClientSock), + %% We don't call gen_tcp:close/1 here since it waits for + %% pending output to be sent, which results in unnecessary + %% delays. We could just terminate - the reader is the + %% controlling process and hence its termination will close + %% the socket. However, to keep the file_handle_cache + %% accounting as accurate as possible we ought to close the + %% socket w/o delay before termination. + rabbit_net:fast_close(ClientSock), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl index c7d30116..18668049 100644 --- a/src/rabbit_runtime_parameter.erl +++ b/src/rabbit_runtime_parameter.erl @@ -21,10 +21,12 @@ -type(validate_results() :: 'ok' | {error, string(), [term()]} | [validate_results()]). --callback validate(binary(), binary(), term()) -> validate_results(). --callback validate_clear(binary(), binary()) -> validate_results(). --callback notify(binary(), binary(), term()) -> 'ok'. --callback notify_clear(binary(), binary()) -> 'ok'. +-callback validate(rabbit_types:vhost(), binary(), binary(), + term()) -> validate_results(). +-callback validate_clear(rabbit_types:vhost(), binary(), + binary()) -> validate_results(). +-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'. +-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'. -else. @@ -32,10 +34,10 @@ behaviour_info(callbacks) -> [ - {validate, 3}, - {validate_clear, 2}, - {notify, 3}, - {notify_clear, 2} + {validate, 4}, + {validate_clear, 3}, + {notify, 4}, + {notify_clear, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 3a54e8f6..0707193c 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,8 +18,9 @@ -include("rabbit.hrl"). --export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1, - list_formatted/0, lookup/2, value/2, value/3, info_keys/0]). +-export([parse_set/4, set/4, clear/3, + list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1, + lookup/3, value/3, value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -27,16 +28,23 @@ -type(ok_or_error_string() :: 'ok' | {'error_string', string()}). --spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()). --spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()). --spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()). +-spec(parse_set/4 :: (rabbit_types:vhost(), binary(), binary(), string()) + -> ok_or_error_string()). +-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term()) + -> ok_or_error_string()). +-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) + -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). --spec(list/1 :: (binary()) -> [rabbit_types:infos()]). +-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). --spec(list_formatted/0 :: () -> [rabbit_types:infos()]). --spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()). --spec(value/2 :: (binary(), binary()) -> term()). --spec(value/3 :: (binary(), binary(), term()) -> term()). +-spec(list/2 :: (rabbit_types:vhost(), binary()) -> [rabbit_types:infos()]). +-spec(list_strict/2 :: (rabbit_types:vhost(), binary()) + -> [rabbit_types:infos()] | 'not_found'). +-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). +-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary()) + -> rabbit_types:infos()). +-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()). +-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -endif. @@ -49,14 +57,14 @@ %%--------------------------------------------------------------------------- -parse_set(Component, Key, String) -> +parse_set(VHost, Component, Key, String) -> case parse(String) of - {ok, Term} -> set(Component, Key, Term); + {ok, Term} -> set(VHost, Component, Key, Term); {errors, L} -> format_error(L) end. -set(Component, Key, Term) -> - case set0(Component, Key, Term) of +set(VHost, Component, Key, Term) -> + case set0(VHost, Component, Key, Term) of ok -> ok; {errors, L} -> format_error(L) end. @@ -64,16 +72,18 @@ set(Component, Key, Term) -> format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. -set0(Component, Key, Term) -> +set0(VHost, Component, Key, Term) -> case lookup_component(Component) of {ok, Mod} -> case flatten_errors(validate(Term)) of ok -> - case flatten_errors(Mod:validate(Component, Key, Term)) of + case flatten_errors( + Mod:validate(VHost, Component, Key, Term)) of ok -> - case mnesia_update(Component, Key, Term) of + case mnesia_update(VHost, Component, Key, Term) of {old, Term} -> ok; - _ -> Mod:notify(Component, Key, Term) + _ -> Mod:notify( + VHost, Component, Key, Term) end, ok; E -> @@ -86,95 +96,103 @@ set0(Component, Key, Term) -> E end. -mnesia_update(Component, Key, Term) -> +mnesia_update(VHost, Component, Key, Term) -> rabbit_misc:execute_mnesia_transaction( fun () -> - Res = case mnesia:read(?TABLE, {Component, Key}, read) of + Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of [] -> new; [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(Component, Key, Term), write), + ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write), Res end). -clear(Component, Key) -> - case clear0(Component, Key) of +clear(VHost, Component, Key) -> + case clear0(VHost, Component, Key) of ok -> ok; {errors, L} -> format_error(L) end. -clear0(Component, Key) -> +clear0(VHost, Component, Key) -> case lookup_component(Component) of - {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of - ok -> mnesia_clear(Component, Key), - Mod:notify_clear(Component, Key), + {ok, Mod} -> case flatten_errors( + Mod:validate_clear(VHost, Component, Key)) of + ok -> mnesia_clear(VHost, Component, Key), + Mod:notify_clear(VHost, Component, Key), ok; E -> E end; E -> E end. -mnesia_clear(Component, Key) -> +mnesia_clear(VHost, Component, Key) -> ok = rabbit_misc:execute_mnesia_transaction( fun () -> - ok = mnesia:delete(?TABLE, {Component, Key}, write) + ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write) end). list() -> [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. -list(Component) -> list(Component, []). -list_strict(Component) -> list(Component, not_found). - -list(Component, Default) -> - case lookup_component(Component) of - {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'}, - [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; - _ -> Default +list(VHost) -> list(VHost, '_', []). +list_strict(Component) -> list('_', Component, not_found). +list(VHost, Component) -> list(VHost, Component, []). +list_strict(VHost, Component) -> list(VHost, Component, not_found). + +list(VHost, Component, Default) -> + case component_good(Component) of + true -> Match = #runtime_parameters{key = {VHost, Component, '_'}, + _ = '_'}, + [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; + _ -> Default end. -list_formatted() -> - [pset(value, format(pget(value, P)), P) || P <- list()]. +list_formatted(VHost) -> + [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. -lookup(Component, Key) -> - case lookup0(Component, Key, rabbit_misc:const(not_found)) of +lookup(VHost, Component, Key) -> + case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(Component, Key) -> - case lookup0(Component, Key, rabbit_misc:const(not_found)) of +value(VHost, Component, Key) -> + case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(Component, Key, Default) -> - Params = lookup0(Component, Key, - fun () -> lookup_missing(Component, Key, Default) end), +value(VHost, Component, Key, Default) -> + Params = lookup0(VHost, Component, Key, + fun () -> + lookup_missing(VHost, Component, Key, Default) + end), Params#runtime_parameters.value. -lookup0(Component, Key, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {Component, Key}) of +lookup0(VHost, Component, Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(Component, Key, Default) -> +lookup_missing(VHost, Component, Key, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {Component, Key}, read) of - [] -> Record = c(Component, Key, Default), + case mnesia:read(?TABLE, {VHost, Component, Key}, read) of + [] -> Record = c(VHost, Component, Key, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key}, - value = Default}. +c(VHost, Component, Key, Default) -> + #runtime_parameters{key = {VHost, Component, Key}, + value = Default}. -p(#runtime_parameters{key = {Component, Key}, value = Value}) -> - [{component, Component}, +p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) -> + [{vhost, VHost}, + {component, Component}, {key, Key}, {value, Value}]. @@ -182,6 +200,12 @@ info_keys() -> [component, key, value]. %%--------------------------------------------------------------------------- +component_good('_') -> true; +component_good(Component) -> case lookup_component(Component) of + {ok, _} -> true; + _ -> false + end. + lookup_component(Component) -> case rabbit_registry:lookup_module( runtime_parameter, list_to_atom(binary_to_list(Component))) of diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index f23b3227..5224ccaa 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -17,7 +17,7 @@ -module(rabbit_runtime_parameters_test). -behaviour(rabbit_runtime_parameter). --export([validate/3, validate_clear/2, notify/3, notify_clear/2]). +-export([validate/4, validate_clear/3, notify/4, notify_clear/3]). -export([register/0, unregister/0]). register() -> @@ -26,13 +26,13 @@ register() -> unregister() -> rabbit_registry:unregister(runtime_parameter, <<"test">>). -validate(<<"test">>, <<"good">>, _Term) -> ok; -validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok; -validate(<<"test">>, _, _) -> {error, "meh", []}. +validate(_, <<"test">>, <<"good">>, _Term) -> ok; +validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok; +validate(_, <<"test">>, _, _) -> {error, "meh", []}. -validate_clear(<<"test">>, <<"good">>) -> ok; -validate_clear(<<"test">>, <<"maybe">>) -> ok; -validate_clear(<<"test">>, _) -> {error, "meh", []}. +validate_clear(_, <<"test">>, <<"good">>) -> ok; +validate_clear(_, <<"test">>, <<"maybe">>) -> ok; +validate_clear(_, <<"test">>, _) -> {error, "meh", []}. -notify(_, _, _) -> ok. -notify_clear(_, _) -> ok. +notify(_, _, _, _) -> ok. +notify_clear(_, _, _) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7fbe6bab..1fe655e7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -33,6 +33,7 @@ all_tests() -> ok = setup_cluster(), + ok = supervisor2_tests:test_all(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), @@ -2287,10 +2288,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - {undefined, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, false, VQ1), + {_, undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2307,11 +2308,11 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {undefined, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, false, VQ2), + {_, undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {undefined, VQ6} = + {_, undefined, VQ6} = rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), VQ6. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 49213c95..bd606dfb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; - (S) -> {undefined, S} + End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; + (Next, S) -> {Next, undefined, S} end, case queue_out(State) of {empty, State1} -> - End(a(State1)); + End(undefined, a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case {Pred(MsgProps), AckRequired} of {true, true} -> @@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - End(a(in_r(MsgStatus, State1))) + End(MsgProps, a(in_r(MsgStatus, State1))) end end. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 5548ef6d..03dfbe24 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -90,12 +90,13 @@ delete(VHostPath) -> R. internal_delete(VHostPath) -> - lists:foreach( - fun (Info) -> - ok = rabbit_auth_backend_internal:clear_permissions( - proplists:get_value(user, Info), VHostPath) - end, - rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)), + [ok = rabbit_auth_backend_internal:clear_permissions( + proplists:get_value(user, Info), VHostPath) + || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], + [ok = rabbit_runtime_parameters:clear(VHostPath, + proplists:get_value(component, Info), + proplists:get_value(key, Info)) + || Info <- rabbit_runtime_parameters:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 3d3623d7..5af38573 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -255,10 +255,10 @@ behaviour_info(_Other) -> %%% --------------------------------------------------- start_link(Mod, Args) -> gen_server:start_link(?MODULE, {self, Mod, Args}, []). - + start_link(SupName, Mod, Args) -> gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). - + %%% --------------------------------------------------- %%% Interface functions. %%% --------------------------------------------------- @@ -298,9 +298,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> check_childspecs(X) -> {error, {badarg, X}}. %%% --------------------------------------------------- -%%% +%%% %%% Initialize the supervisor. -%%% +%%% %%% --------------------------------------------------- init({SupName, Mod, Args}) -> process_flag(trap_exit, true), @@ -319,7 +319,7 @@ init({SupName, Mod, Args}) -> Error -> {stop, {bad_return, {Mod, init, Error}}} end. - + init_children(State, StartSpec) -> SupName = State#state.name, case check_startspec(StartSpec) of @@ -349,7 +349,7 @@ init_dynamic(_State, StartSpec) -> %% Func: start_children/2 %% Args: Children = [#child] in start order %% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} -%% Purpose: Start all children. The new list contains #child's +%% Purpose: Start all children. The new list contains #child's %% with pids. %% Returns: {ok, NChildren} | {error, NChildren} %% NChildren = [#child] in termination order (reversed @@ -381,7 +381,7 @@ do_start_child(SupName, Child) -> NChild = Child#child{pid = Pid}, report_progress(NChild, SupName), {ok, Pid, Extra}; - ignore -> + ignore -> {ok, undefined}; {error, What} -> {error, What}; What -> {error, What} @@ -400,12 +400,12 @@ do_start_child_i(M, F, A) -> What -> {error, What} end. - + %%% --------------------------------------------------- -%%% +%%% %%% Callback functions. -%%% +%%% %%% --------------------------------------------------- handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> #child{mfa = {M, F, A}} = hd(State#state.children), @@ -414,11 +414,11 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> {ok, undefined} -> {reply, {ok, undefined}, State}; {ok, Pid} -> - NState = State#state{dynamics = + NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, {reply, {ok, Pid}, NState}; {ok, Pid, Extra} -> - NState = State#state{dynamics = + NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, {reply, {ok, Pid, Extra}, NState}; What -> @@ -497,7 +497,7 @@ handle_call(which_children, _From, State) -> %%% Hopefully cause a function-clause as there is no API function %%% that utilizes cast. handle_cast(null, State) -> - error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", + error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", []), {noreply, State}. @@ -527,7 +527,7 @@ handle_info({'EXIT', Pid, Reason}, State) -> end; handle_info(Msg, State) -> - error_logger:error_msg("Supervisor received unexpected message: ~p~n", + error_logger:error_msg("Supervisor received unexpected message: ~p~n", [Msg]), {noreply, State}. %% @@ -577,13 +577,13 @@ check_flags({Strategy, MaxIntensity, Period}) -> check_flags(What) -> {bad_flags, What}. -update_childspec(State, StartSpec) when ?is_simple(State) -> - case check_startspec(StartSpec) of - {ok, [Child]} -> - {ok, State#state{children = [Child]}}; - Error -> - {error, Error} - end; +update_childspec(State, StartSpec) when ?is_simple(State) -> + case check_startspec(StartSpec) of + {ok, [Child]} -> + {ok, State#state{children = [Child]}}; + Error -> + {error, Error} + end; update_childspec(State, StartSpec) -> case check_startspec(StartSpec) of @@ -604,7 +604,7 @@ update_childspec1([Child|OldC], Children, KeepOld) -> end; update_childspec1([], Children, KeepOld) -> % Return them in (keeped) reverse start order. - lists:reverse(Children ++ KeepOld). + lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name -> @@ -618,7 +618,7 @@ update_chsp(OldCh, Children) -> NewC -> {ok, NewC} end. - + %%% --------------------------------------------------- %%% Start a new child. %%% --------------------------------------------------- @@ -630,12 +630,12 @@ handle_start_child(Child, State) -> {ok, Pid} -> Children = State#state.children, {{ok, Pid}, - State#state{children = + State#state{children = [Child#child{pid = Pid}|Children]}}; {ok, Pid, Extra} -> Children = State#state.children, {{ok, Pid, Extra}, - State#state{children = + State#state{children = [Child#child{pid = Pid}|Children]}}; {error, What} -> {{error, {What, Child}}, State} @@ -816,29 +816,32 @@ terminate_simple_children(Child, Dynamics, SupName) -> {Replies, Timedout} = lists:foldl( fun (_Pid, {Replies, Timedout}) -> - {Reply, Timedout1} = + {Pid1, Reason1, Timedout1} = receive TimeoutMsg -> Remaining = Pids -- [P || {P, _} <- Replies], [exit(P, kill) || P <- Remaining], - receive {'DOWN', _MRef, process, Pid, Reason} -> - {{error, Reason}, true} + receive + {'DOWN', _MRef, process, Pid, Reason} -> + {Pid, Reason, true} end; {'DOWN', _MRef, process, Pid, Reason} -> - {child_res(Child, Reason, Timedout), Timedout}; - {'EXIT', Pid, Reason} -> - receive {'DOWN', _MRef, process, Pid, _} -> - {{error, Reason}, Timedout} - end + {Pid, Reason, Timedout} end, - {[{Pid, Reply} | Replies], Timedout1} + {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies], + Timedout1} end, {[], false}, Pids), timeout_stop(Child, TRef, TimeoutMsg, Timedout), ReportError = shutdown_error_reporter(SupName), - [case Reply of - {_Pid, ok} -> ok; - {Pid, {error, R}} -> ReportError(R, Child#child{pid = Pid}) - end || Reply <- Replies], + Report = fun(_, ok) -> ok; + (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid}) + end, + [receive + {'EXIT', Pid, Reason} -> + Report(Pid, child_res(Child, Reason, Timedout)) + after + 0 -> Report(Pid, Reply) + end || {Pid, Reply} <- Replies], ok. child_exit_reason(#child{shutdown = brutal_kill}) -> kill; @@ -863,7 +866,7 @@ timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) -> after 0 -> ok end; -timeout_stop(#child{}, ok, _Msg, _Timedout) -> +timeout_stop(#child{}, _TRef, _Msg, _Timedout) -> ok. do_terminate(Child, SupName) when Child#child.pid =/= undefined -> @@ -885,17 +888,17 @@ do_terminate(Child, _SupName) -> Child. %%----------------------------------------------------------------- -%% Shutdowns a child. We must check the EXIT value +%% Shutdowns a child. We must check the EXIT value %% of the child, because it might have died with another reason than -%% the wanted. In that case we want to report the error. We put a -%% monitor on the child an check for the 'DOWN' message instead of -%% checking for the 'EXIT' message, because if we check the 'EXIT' -%% message a "naughty" child, who does unlink(Sup), could hang the -%% supervisor. +%% the wanted. In that case we want to report the error. We put a +%% monitor on the child an check for the 'DOWN' message instead of +%% checking for the 'EXIT' message, because if we check the 'EXIT' +%% message a "naughty" child, who does unlink(Sup), could hang the +%% supervisor. %% Returns: ok | {error, OtherReason} (this should be reported) %%----------------------------------------------------------------- shutdown(Pid, brutal_kill) -> - + case monitor_child(Pid) of ok -> exit(Pid, kill), @@ -905,16 +908,16 @@ shutdown(Pid, brutal_kill) -> {'DOWN', _MRef, process, Pid, OtherReason} -> {error, OtherReason} end; - {error, Reason} -> + {error, Reason} -> {error, Reason} end; shutdown(Pid, Time) -> - + case monitor_child(Pid) of ok -> exit(Pid, shutdown), %% Try to shutdown gracefully - receive + receive {'DOWN', _MRef, process, Pid, shutdown} -> ok; {'DOWN', _MRef, process, Pid, OtherReason} -> @@ -926,14 +929,14 @@ shutdown(Pid, Time) -> {error, OtherReason} end end; - {error, Reason} -> + {error, Reason} -> {error, Reason} end. %% Help function to shutdown/2 switches from link to monitor approach monitor_child(Pid) -> - - %% Do the monitor operation first so that if the child dies + + %% Do the monitor operation first so that if the child dies %% before the monitoring is done causing a 'DOWN'-message with %% reason noproc, we will get the real reason in the 'EXIT'-message %% unless a naughty child has already done unlink... @@ -943,22 +946,22 @@ monitor_child(Pid) -> receive %% If the child dies before the unlik we must empty %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. - {'EXIT', Pid, Reason} -> - receive + {'EXIT', Pid, Reason} -> + receive {'DOWN', _, process, Pid, _} -> {error, Reason} end - after 0 -> + after 0 -> %% If a naughty child did unlink and the child dies before - %% monitor the result will be that shutdown/2 receives a + %% monitor the result will be that shutdown/2 receives a %% 'DOWN'-message with reason noproc. %% If the child should die after the unlink there %% will be a 'DOWN'-message with a correct reason - %% that will be handled in shutdown/2. - ok + %% that will be handled in shutdown/2. + ok end. - - + + %%----------------------------------------------------------------- %% Child/State manipulating functions. %%----------------------------------------------------------------- @@ -1012,7 +1015,7 @@ remove_child(Child, State) -> %% Args: SupName = {local, atom()} | {global, atom()} | self %% Type = {Strategy, MaxIntensity, Period} %% Strategy = one_for_one | one_for_all | simple_one_for_one | -%% rest_for_one +%% rest_for_one %% MaxIntensity = integer() %% Period = integer() %% Mod :== atom() @@ -1107,10 +1110,10 @@ validChildType(supervisor) -> true; validChildType(worker) -> true; validChildType(What) -> throw({invalid_child_type, What}). -validName(_Name) -> true. +validName(_Name) -> true. -validFunc({M, F, A}) when is_atom(M), - is_atom(F), +validFunc({M, F, A}) when is_atom(M), + is_atom(F), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). @@ -1128,7 +1131,7 @@ validDelay(Delay) when is_number(Delay), Delay >= 0 -> true; validDelay(What) -> throw({invalid_delay, What}). -validShutdown(Shutdown, _) +validShutdown(Shutdown, _) when is_integer(Shutdown), Shutdown > 0 -> true; validShutdown(infinity, supervisor) -> true; validShutdown(brutal_kill, _) -> true; @@ -1154,7 +1157,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}). %%% Returns: {ok, State'} | {terminate, State'} %%% ------------------------------------------------------ -add_restart(State) -> +add_restart(State) -> I = State#state.intensity, P = State#state.period, R = State#state.restarts, diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl new file mode 100644 index 00000000..5c5e6d85 --- /dev/null +++ b/src/supervisor2_tests.erl @@ -0,0 +1,79 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% + +-module(supervisor2_tests). +-behaviour(supervisor2). + +-export([test_all/0, start_link/0]). +-export([init/1]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(TEST_RUNS, 2000). +-define(SLOW_TEST_RUNS, 45). +-define(CHILDREN, 100). + +test_all() -> + eunit:test(?MODULE, [verbose]). + +simple_child_shutdown_without_deadlock_test_() -> + [{timeout, ?TEST_RUNS * 10, + check_shutdown_handling(stop, ?TEST_RUNS, ?CHILDREN)}]. + +simple_child_shutdown_with_timeout_test_() -> + [{timeout, ?SLOW_TEST_RUNS * 10, + check_shutdown_handling(ignored, ?SLOW_TEST_RUNS, ?CHILDREN)}]. + +check_shutdown_handling(SigStop, Iterations, ChildCount) -> + fun() -> + {ok, Sup} = supervisor2:start_link(?MODULE, [?CHILDREN]), + [begin + TestSupPid = erlang:whereis(?MODULE), + ChildPids = [begin + {ok, ChildPid} = + supervisor2:start_child(TestSupPid, []), + ChildPid + end || _ <- lists:seq(1, ChildCount)], + erlang:monitor(process, TestSupPid), + [P ! SigStop || P <- ChildPids], + ?assertEqual(ok, supervisor2:terminate_child(Sup, test_sup)), + {ok, _} = supervisor2:restart_child(Sup, test_sup), + receive + {'DOWN', _MRef, process, TestSupPid, Reason} -> + ?assertEqual(shutdown, Reason) + end + end || _ <- lists:seq(1, Iterations)], + unlink(Sup), + exit(Sup, shutdown) + end. + +start_link() -> + Pid = spawn_link(fun () -> + process_flag(trap_exit, true), + receive stop -> ok end + end), + {ok, Pid}. + +init([N]) -> + {ok, {{one_for_one, 0, 1}, + [{test_sup, {supervisor2, start_link, + [{local, ?MODULE}, ?MODULE, []]}, + transient, N * 100, supervisor, [?MODULE]}]}}; +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{test_worker, {?MODULE, start_link, []}, + temporary, 1000, worker, [?MODULE]}]}}. + |