summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl170
-rw-r--r--src/mirrored_supervisor.erl5
-rw-r--r--src/pmon.erl42
-rw-r--r--src/priority_queue.erl37
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_access_control.erl17
-rw-r--r--src/rabbit_alarm.erl72
-rw-r--r--src/rabbit_amqqueue.erl65
-rw-r--r--src/rabbit_amqqueue_process.erl281
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl5
-rw-r--r--src/rabbit_binding.erl41
-rw-r--r--src/rabbit_channel.erl34
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl5
-rw-r--r--src/rabbit_connection_helper_sup.erl (renamed from src/rabbit_intermediate_sup.erl)23
-rw-r--r--src/rabbit_connection_sup.erl20
-rw-r--r--src/rabbit_control_main.erl51
-rw-r--r--src/rabbit_direct.erl31
-rw-r--r--src/rabbit_disk_monitor.erl77
-rw-r--r--src/rabbit_event.erl4
-rw-r--r--src/rabbit_exchange.erl9
-rw-r--r--src/rabbit_exchange_type_headers.erl12
-rw-r--r--src/rabbit_heartbeat.erl81
-rw-r--r--src/rabbit_memory_monitor.erl136
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl27
-rw-r--r--src/rabbit_mirror_queue_misc.erl26
-rw-r--r--src/rabbit_mirror_queue_slave.erl90
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_mnesia.erl62
-rw-r--r--src/rabbit_networking.erl24
-rw-r--r--src/rabbit_plugins_main.erl20
-rw-r--r--src/rabbit_policies.erl81
-rw-r--r--src/rabbit_policy.erl118
-rw-r--r--src/rabbit_queue_decorator.erl48
-rw-r--r--src/rabbit_queue_index.erl32
-rw-r--r--src/rabbit_reader.erl175
-rw-r--r--src/rabbit_registry.erl1
-rw-r--r--src/rabbit_tests.erl71
-rw-r--r--src/rabbit_upgrade_functions.erl40
-rw-r--r--src/rabbit_writer.erl70
-rw-r--r--src/supervisor2.erl1200
-rw-r--r--src/supervisor2_tests.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl28
46 files changed, 2182 insertions, 1167 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 4e1dcd2e..0331ca01 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,23 +18,32 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2,
+ monitor/2, demonitor/1, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(state, {node, monitors, name}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([monitor_ref/0]).
+
+-type(monitor_ref() :: reference() | {atom(), pid()}).
+-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}).
+
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
--spec(invoke/2 ::
- ( pid(), fun ((pid()) -> A)) -> A;
- ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
- [{pid(), term()}]}).
--spec(invoke_no_result/2 ::
- (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A;
+ ([pid()], fun_or_mfa(A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok').
+-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -50,26 +59,27 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
-invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- Fun(Pid);
-invoke(Pid, Fun) when is_pid(Pid) ->
- case invoke([Pid], Fun) of
+invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ apply1(FunOrMFA, Pid);
+invoke(Pid, FunOrMFA) when is_pid(Pid) ->
+ case invoke([Pid], FunOrMFA) of
{[{Pid, Result}], []} ->
Result;
{[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
-invoke([], _Fun) -> %% optimisation
+invoke([], _FunOrMFA) -> %% optimisation
{[], []};
-invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- case safe_invoke(Pid, Fun) of
+invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
{ok, _, Result} -> {[{Pid, Result}], []};
{error, _, Error} -> {[], [{Pid, Error}]}
end;
-invoke(Pids, Fun) when is_list(Pids) ->
+invoke(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
%% infinity, and thus there is no process spawned in order to do
@@ -78,45 +88,58 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped}, infinity)
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, FunOrMFA, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
Pid <- orddict:fetch(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
[Results || {_Node, Results} <- Replies]]),
lists:foldl(
fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- safe_invoke(Pid, Fun), %% we don't care about any error
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, FunOrMFA), %% we don't care about any error
ok;
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result([Pid], Fun);
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
+ invoke_no_result([Pid], FunOrMFA);
-invoke_no_result([], _Fun) -> %% optimisation
+invoke_no_result([], _FunOrMFA) -> %% optimisation
ok;
-invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- safe_invoke(Pid, Fun), %% must not die
+invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, FunOrMFA), %% must not die
ok;
-invoke_no_result(Pids, Fun) when is_list(Pids) ->
+invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped})
+ RemoteNodes -> gen_server2:abcast(
+ RemoteNodes, delegate(self(), RemoteNodes),
+ {invoke, FunOrMFA, Grouped})
end,
- safe_invoke(LocalPids, Fun), %% must not die
+ safe_invoke(LocalPids, FunOrMFA), %% must not die
ok.
+monitor(process, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(process, Pid);
+monitor(process, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ gen_server2:cast(Name, {monitor, self(), Pid}),
+ {Name, Pid}.
+
+demonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref);
+demonitor({Name, Pid}) ->
+ gen_server2:cast(Name, {demonitor, self(), Pid}).
+
call(PidOrPids, Msg) ->
- invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+ invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
cast(PidOrPids, Msg) ->
- invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+ invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}).
%%----------------------------------------------------------------------------
@@ -134,43 +157,88 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(RemoteNodes) ->
+delegate(Pid, RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(),
+ erlang:phash2(Pid,
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
end.
-safe_invoke(Pids, Fun) when is_list(Pids) ->
- [safe_invoke(Pid, Fun) || Pid <- Pids];
-safe_invoke(Pid, Fun) when is_pid(Pid) ->
+safe_invoke(Pids, FunOrMFA) when is_list(Pids) ->
+ [safe_invoke(Pid, FunOrMFA) || Pid <- Pids];
+safe_invoke(Pid, FunOrMFA) when is_pid(Pid) ->
try
- {ok, Pid, Fun(Pid)}
+ {ok, Pid, apply1(FunOrMFA, Pid)}
catch Class:Reason ->
{error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
+apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]);
+apply1(Fun, Arg) -> Fun(Arg).
+
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, node(), hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-
-handle_cast({invoke, Fun, Grouped}, Node) ->
- safe_invoke(orddict:fetch(Node, Grouped), Fun),
- {noreply, Node, hibernate}.
-
-handle_info(_Info, Node) ->
- {noreply, Node, hibernate}.
+handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State,
+ hibernate}.
+
+handle_cast({monitor, MonitoringPid, Pid},
+ State = #state{monitors = Monitors}) ->
+ Monitors1 = case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Pids1 = gb_sets:add_element(MonitoringPid, Pids),
+ dict:store(Pid, {Ref, Pids1}, Monitors);
+ error ->
+ Ref = erlang:monitor(process, Pid),
+ Pids = gb_sets:singleton(MonitoringPid),
+ dict:store(Pid, {Ref, Pids}, Monitors)
+ end,
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({demonitor, MonitoringPid, Pid},
+ State = #state{monitors = Monitors}) ->
+ Monitors1 = case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Pids1 = gb_sets:del_element(MonitoringPid, Pids),
+ case gb_sets:is_empty(Pids1) of
+ true -> erlang:demonitor(Ref),
+ dict:erase(Pid, Monitors);
+ false -> dict:store(Pid, {Ref, Pids1}, Monitors)
+ end;
+ error ->
+ Monitors
+ end,
+ {noreply, State#state{monitors = Monitors1}, hibernate};
+
+handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) ->
+ safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
+ {noreply, State, hibernate}.
+
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply,
+ case dict:find(Pid, Monitors) of
+ {ok, {Ref, Pids}} ->
+ Msg = {'DOWN', {Name, Pid}, process, Pid, Info},
+ gb_sets:fold(fun (MonitoringPid, _) -> MonitoringPid ! Msg end,
+ none, Pids),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
+
+handle_info(_Info, State) ->
+ {noreply, State, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, Node, _Extra) ->
- {ok, Node}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 3b16c53a..d5f51db0 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -212,9 +212,8 @@ start_link0(Prefix, Group, Init) ->
init(Mod, Args) ->
case Mod:init(Args) of
{ok, {{Bad, _, _}, _ChildSpecs}} when
- Bad =:= simple_one_for_one orelse
- Bad =:= simple_one_for_one_terminate -> erlang:error(badarg);
- Init -> Init
+ Bad =:= simple_one_for_one -> erlang:error(badarg);
+ Init -> Init
end.
start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}).
diff --git a/src/pmon.erl b/src/pmon.erl
index b9db66fb..86308167 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,22 +16,26 @@
-module(pmon).
--export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
- monitored/1, is_empty/1]).
+-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
+ is_monitored/2, erase/2, monitored/1, is_empty/1]).
-compile({no_auto_import, [monitor/2]}).
+-record(state, {dict, module}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
-export_type([?MODULE/0]).
--opaque(?MODULE() :: dict()).
+-opaque(?MODULE() :: #state{dict :: dict(),
+ module :: atom()}).
-type(item() :: pid() | {atom(), node()}).
-spec(new/0 :: () -> ?MODULE()).
+-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
@@ -42,29 +46,33 @@
-endif.
-new() -> dict:new().
+new() -> new(erlang).
+
+new(Module) -> #state{dict = dict:new(),
+ module = Module}.
-monitor(Item, M) ->
+monitor(Item, S = #state{dict = M, module = Module}) ->
case dict:is_key(Item, M) of
- true -> M;
- false -> dict:store(Item, erlang:monitor(process, Item), M)
+ true -> S;
+ false -> S#state{dict = dict:store(
+ Item, Module:monitor(process, Item), M)}
end.
-monitor_all([], M) -> M; %% optimisation
-monitor_all([Item], M) -> monitor(Item, M); %% optimisation
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], S) -> S; %% optimisation
+monitor_all([Item], S) -> monitor(Item, S); %% optimisation
+monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
-demonitor(Item, M) ->
+demonitor(Item, S = #state{dict = M, module = Module}) ->
case dict:find(Item, M) of
- {ok, MRef} -> erlang:demonitor(MRef),
- dict:erase(Item, M);
+ {ok, MRef} -> Module:demonitor(MRef),
+ S#state{dict = dict:erase(Item, M)};
error -> M
end.
-is_monitored(Item, M) -> dict:is_key(Item, M).
+is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
-erase(Item, M) -> dict:erase(Item, M).
+erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
-monitored(M) -> dict:fetch_keys(M).
+monitored(#state{dict = M}) -> dict:fetch_keys(M).
-is_empty(M) -> dict:size(M) == 0.
+is_empty(#state{dict = M}) -> dict:size(M) == 0.
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 6995c3be..c76c0d33 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -40,8 +40,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
- out/1, join/2]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, from_list/1,
+ in/2, in/3, out/1, out_p/1, join/2, filter/2, fold/3, highest/1]).
%%----------------------------------------------------------------------------
@@ -59,10 +59,16 @@
-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(from_list/1 :: ([{priority(), any()}]) -> pqueue()).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(out_p/1 :: (pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
+-spec(filter/2 :: (fun ((any()) -> boolean()), pqueue()) -> pqueue()).
+-spec(fold/3 ::
+ (fun ((any(), priority(), A) -> A), A, pqueue()) -> A).
+-spec(highest/1 :: (pqueue()) -> priority() | 'empty').
-endif.
@@ -96,6 +102,9 @@ to_list({pqueue, Queues}) ->
[{maybe_negate_priority(P), V} || {P, Q} <- Queues,
{0, V} <- to_list(Q)].
+from_list(L) ->
+ lists:foldl(fun ({P, E}, Q) -> in(E, P, Q) end, new(), L).
+
in(Item, Q) ->
in(Item, 0, Q).
@@ -147,6 +156,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
+out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
+
+add_p(R, P) -> case R of
+ {empty, Q} -> {empty, Q};
+ {{value, V}, Q} -> {{value, V, P}, Q}
+ end.
+
join(A, {queue, [], [], 0}) ->
A;
join({queue, [], [], 0}, B) ->
@@ -185,6 +202,22 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity ->
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
+filter(Pred, Q) -> fold(fun(V, P, Acc) ->
+ case Pred(V) of
+ true -> in(V, P, Acc);
+ false -> Acc
+ end
+ end, new(), Q).
+
+fold(Fun, Init, Q) -> case out_p(Q) of
+ {empty, _Q} -> Init;
+ {{value, V, P}, Q1} -> fold(Fun, Fun(V, P, Init), Q1)
+ end.
+
+highest({queue, [], [], 0}) -> empty;
+highest({queue, _, _, _}) -> 0;
+highest({pqueue, [{P, _} | _]}) -> maybe_negate_priority(P).
+
r2f([], 0) -> {queue, [], [], 0};
r2f([_] = R, 1) -> {queue, [], R, 1};
r2f([X,Y], 2) -> {queue, [X], [Y], 2};
diff --git a/src/rabbit.erl b/src/rabbit.erl
index cb9e6376..1b7fe6da 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -588,6 +588,7 @@ boot_delegate() ->
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
+ rabbit_policy:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 90be4f80..d54c2a8d 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -74,9 +74,7 @@ check_vhost_access(User = #user{ username = Username,
true -> Module:check_vhost_access(User, VHostPath)
end
end,
- "~s failed checking vhost access to ~s for ~s: ~p~n",
- [Module, VHostPath, Username],
- "access to vhost '~s' refused for user '~s'",
+ Module, "access to vhost '~s' refused for user '~s'",
[VHostPath, Username]).
check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
@@ -87,15 +85,14 @@ check_resource_access(User = #user{username = Username, auth_backend = Module},
Resource, Permission) ->
check_access(
fun() -> Module:check_resource_access(User, Resource, Permission) end,
- "~s failed checking resource access to ~p for ~s: ~p~n",
- [Module, Resource, Username],
- "access to ~s refused for user '~s'",
+ Module, "access to ~s refused for user '~s'",
[rabbit_misc:rs(Resource), Username]).
-check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
+check_access(Fun, Module, ErrStr, ErrArgs) ->
Allow = case Fun() of
- {error, _} = E ->
- rabbit_log:error(ErrStr, ErrArgs ++ [E]),
+ {error, E} ->
+ rabbit_log:error(ErrStr ++ " by ~s: ~p~n",
+ ErrArgs ++ [Module, E]),
false;
Else ->
Else
@@ -104,5 +101,5 @@ check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
true ->
ok;
false ->
- rabbit_misc:protocol_error(access_refused, RefStr, RefArgs)
+ rabbit_misc:protocol_error(access_refused, ErrStr, ErrArgs)
end.
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 6607c4f6..cd1d125b 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -37,7 +37,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]).
-spec(set_alarm/1 :: (any()) -> 'ok').
-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -93,8 +93,8 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, AlertMFA}, State) ->
- {ok, 0 < dict:size(State#alarms.alarmed_nodes),
+handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
+ {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
@@ -104,11 +104,20 @@ handle_call(_Request, State) ->
{ok, not_understood, State}.
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
- handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]});
+ case lists:member(Alarm, Alarms) of
+ true -> {ok, State};
+ false -> UpdatedAlarms = lists:usort([Alarm|Alarms]),
+ handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms})
+ end;
handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
- handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1,
- Alarms)});
+ case lists:keymember(Alarm, 1, Alarms) of
+ true -> handle_clear_alarm(
+ Alarm, State#alarms{alarms = lists:keydelete(
+ Alarm, 1, Alarms)});
+ false -> {ok, State}
+
+ end;
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
@@ -118,7 +127,7 @@ handle_event({node_up, Node}, State) ->
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -141,45 +150,36 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+dict_append(Key, Val, Dict) ->
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ dict:store(Key, lists:usort([Val|L]), Dict).
+
dict_unappend_all(Key, _Val, Dict) ->
dict:erase(Key, Dict).
dict_unappend(Key, Val, Dict) ->
- case lists:delete(Val, dict:fetch(Key, Dict)) of
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+
+ case lists:delete(Val, L) of
[] -> dict:erase(Key, Dict);
X -> dict:store(Key, X, Dict)
end.
-count_dict_values(Val, Dict) ->
- dict:fold(fun (_Node, List, Count) ->
- Count + case lists:member(Val, List) of
- true -> 1;
- false -> 0
- end
- end, 0, Dict).
-
-maybe_alert(UpdateFun, Node, Source,
+maybe_alert(UpdateFun, Node, Source, Alert,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
- BeforeSz = count_dict_values(Source, AN),
- AfterSz = count_dict_values(Source, AN1),
-
- %% If we have changed our alarm state, inform the remotes.
- IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz ->
- ok = alert_remote(true, Alertees, Source);
- IsLocal andalso BeforeSz > AfterSz ->
- ok = alert_remote(false, Alertees, Source);
- true ->
- ok
- end,
- %% If the overall alarm state has changed, inform the locals.
- case {dict:size(AN), dict:size(AN1)} of
- {0, 1} -> ok = alert_local(true, Alertees, Source);
- {1, 0} -> ok = alert_local(false, Alertees, Source);
- {_, _} -> ok
+ case node() of
+ Node -> ok = alert_remote(Alert, Alertees, Source);
+ _ -> ok
end,
+ ok = alert_local(Alert, Alertees, Source),
State#alarms{alarmed_nodes = AN1}.
alert_local(Alert, Alertees, Source) ->
@@ -214,7 +214,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -229,7 +229,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_alarm({resource_limit, Source, Node}, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)};
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 32feac30..8a84c9f4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -24,9 +24,9 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([force_event_refresh/0, wake_up/1]).
+-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -79,7 +79,8 @@
-> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())).
-spec(update/2 ::
(name(),
- fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue()))
+ -> 'not_found' | rabbit_types:amqqueue()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -111,7 +112,7 @@
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
--spec(wake_up/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -149,12 +150,13 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
+-spec(notify_decorators/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
@@ -184,7 +186,7 @@
%%----------------------------------------------------------------------------
-define(CONSUMER_INFO_KEYS,
- [queue_name, channel_pid, consumer_tag, ack_required]).
+ [queue_name, channel_pid, consumer_tag, ack_required, arguments]).
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
@@ -278,9 +280,10 @@ update(Name, Fun) ->
case Durable of
true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
_ -> ok
- end;
+ end,
+ Q1;
[] ->
- ok
+ not_found
end.
store_queue(Q = #amqqueue{durable = true}) ->
@@ -294,11 +297,15 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) ->
+policy_changed(Q1 = #amqqueue{decorators = Decorators1},
+ Q2 = #amqqueue{decorators = Decorators2}) ->
rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ D1 = rabbit_queue_decorator:select(Decorators1),
+ D2 = rabbit_queue_decorator:select(Decorators2),
+ [ok = M:policy_changed(Q1, Q2) || M <- lists:usort(D1 ++ D2)],
%% Make sure we emit a stats event even if nothing
%% mirroring-related has changed - the policy may have changed anyway.
- wake_up(Q1).
+ notify_policy_changed(Q1).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -393,9 +400,15 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
- [Key || {Key, _Fun} <- args()]).
+ [Key || {Key, _Fun} <- declare_args()]).
check_declare_arguments(QueueName, Args) ->
+ check_arguments(QueueName, Args, declare_args()).
+
+check_consume_arguments(QueueName, Args) ->
+ check_arguments(QueueName, Args, consume_args()).
+
+check_arguments(QueueName, Args, Validators) ->
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
TypeVal -> case Fun(TypeVal, Args) of
@@ -406,15 +419,17 @@ check_declare_arguments(QueueName, Args) ->
[Key, rabbit_misc:rs(QueueName),
Error])
end
- end || {Key, Fun} <- args()],
+ end || {Key, Fun} <- Validators],
ok.
-args() ->
+declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_max_length_arg/2}].
+consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
+
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
@@ -492,7 +507,8 @@ force_event_refresh(QNames) ->
force_event_refresh(Failed)
end.
-wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
+notify_policy_changed(#amqqueue{pid = QPid}) ->
+ gen_server2:cast(QPid, policy_changed).
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
@@ -504,8 +520,8 @@ consumers_all(VHostPath) ->
map(VHostPath,
fun (Q) ->
[lists:zip(ConsumerInfoKeys,
- [Q#amqqueue.name, ChPid, ConsumerTag, AckRequired]) ||
- {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) ||
+ {ChPid, CTag, AckRequired, Args} <- consumers(Q)]
end)).
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
@@ -549,14 +565,20 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid,
+ LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
+ ok = check_consume_arguments(QName, OtherArgs),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
+ OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+notify_decorators(#amqqueue{pid = QPid}) ->
+ delegate:cast(QPid, notify_decorators).
+
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
put(Key, case get(Key) of
@@ -620,8 +642,7 @@ forget_all_durable(Node) ->
internal_delete1(Name)) ||
#amqqueue{name = Name, pid = Pid} = Q <- Qs,
node(Pid) =:= Node,
- rabbit_policy:get(<<"ha-mode">>, Q)
- =:= {error, not_found}],
+ rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
ok
end),
ok.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1baef6d8..4ff30ce0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,10 +52,11 @@
dlx,
dlx_routing_key,
max_length,
+ args_policy_version,
status
}).
--record(consumer, {tag, ack_required}).
+-record(consumer, {tag, ack_required, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -136,19 +137,22 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
senders = Senders,
msg_id_to_channel = MTC},
- State2 = process_args(State1),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State2, Deliveries).
+ State2 = process_args_policy(State1),
+ State3 = lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries),
+ notify_decorators(startup, [], State3),
+ State3.
init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- active_consumers = queue:new(),
- senders = pmon:new(),
+ active_consumers = priority_queue:new(),
+ senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
- status = running},
+ status = running,
+ args_policy_version = 0},
rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -196,8 +200,10 @@ declare(Recover, From, State = #q{q = Q,
BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
recovery_barrier(Recover),
- State1 = process_args(State#q{backing_queue = BQ,
- backing_queue_state = BQS}),
+ State1 = process_args_policy(
+ State#q{backing_queue = BQ,
+ backing_queue_state = BQS}),
+ notify_decorators(startup, [], State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -222,6 +228,27 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
+notify_decorators(Event, Props, State) when Event =:= startup;
+ Event =:= shutdown ->
+ decorator_callback(qname(State), Event, Props);
+
+notify_decorators(Event, Props, State = #q{active_consumers = ACs,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ decorator_callback(
+ qname(State), notify,
+ [Event, [{max_active_consumer_priority, priority_queue:highest(ACs)},
+ {is_empty, BQ:is_empty(BQS)} | Props]]).
+
+decorator_callback(QName, F, A) ->
+ %% Look up again in case policy and hence decorators have changed
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q = #amqqueue{decorators = Ds}} ->
+ [ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
+ {error, not_found} ->
+ ok
+ end.
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover =/= new,
@@ -238,32 +265,51 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
-process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- drop_expired_msgs(
- lists:foldl(
- fun({Arg, Fun}, State1) ->
- case rabbit_misc:table_lookup(Arguments, Arg) of
- {_Type, Val} -> Fun(Val, State1);
- undefined -> State1
- end
- end, State,
- [{<<"x-expires">>, fun init_expires/2},
- {<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
- {<<"x-max-length">>, fun init_max_length/2}])).
-
-init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-
-init_ttl(TTL, State) -> State#q{ttl = TTL}.
+process_args_policy(State = #q{q = Q,
+ args_policy_version = N}) ->
+ ArgsTable =
+ [{<<"expires">>, fun res_min/2, fun init_exp/2},
+ {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
+ {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
+ {<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2}],
+ drop_expired_msgs(
+ lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
+ Fun(args_policy_lookup(Name, Resolve, Q), StateN)
+ end, State#q{args_policy_version = N + 1}, ArgsTable)).
+
+args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
+ AName = <<"x-", Name/binary>>,
+ case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of
+ {undefined, undefined} -> undefined;
+ {undefined, {_Type, Val}} -> Val;
+ {Val, undefined} -> Val;
+ {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
+ end.
+
+res_arg(_PolVal, ArgVal) -> ArgVal.
+res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal).
+
+%% In both these we init with the undefined variant first to stop any
+%% existing timer, then start a new one which may fire after a
+%% different time.
+init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined});
+init_exp(Expires, State) -> State1 = init_exp(undefined, State),
+ ensure_expiry_timer(State1#q{expires = Expires}).
+init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined});
+init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}.
+
+init_dlx(undefined, State) ->
+ State#q{dlx = undefined};
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
-init_dlx_routing_key(RoutingKey, State) ->
- State#q{dlx_routing_key = RoutingKey}.
+init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}.
-init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+init_max_length(MaxLen, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
+ State1.
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
@@ -276,6 +322,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
+ notify_decorators(shutdown, [], State),
[emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
@@ -324,11 +371,12 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref).
%% configured period.
ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
-ensure_expiry_timer(State = #q{expires = Expires}) ->
+ensure_expiry_timer(State = #q{expires = Expires,
+ args_policy_version = Version}) ->
case is_unused(State) of
true -> NewState = stop_expiry_timer(State),
rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref,
- Expires, maybe_expire);
+ Expires, {maybe_expire, Version});
false -> State
end.
@@ -336,12 +384,13 @@ stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref).
ensure_ttl_timer(undefined, State) ->
State;
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
+ args_policy_version = Version}) ->
After = (case Expiry - now_micros() of
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), drop_expired),
+ TRef = erlang:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
@@ -359,7 +408,7 @@ ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(State = #q{active_consumers = AC}) ->
- true = (queue:is_empty(AC) orelse is_empty(State)).
+ true = (priority_queue:is_empty(AC) orelse is_empty(State)).
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
@@ -378,7 +427,7 @@ ch_record(ChPid, LimiterPid) ->
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
- blocked_consumers = queue:new(),
+ blocked_consumers = priority_queue:new(),
limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
@@ -406,15 +455,18 @@ erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
-block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
- update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
+block_consumer(C = #cr{blocked_consumers = Blocked},
+ {_ChPid, #consumer{tag = CTag}} = QEntry, State) ->
+ update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}),
+ notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> [send_drained(C) || C <- all_ch_record()];
+ true -> notify_decorators(queue_empty, [], State),
+ [send_drained(C) || C <- all_ch_record()];
false -> ok
end,
State.
@@ -431,42 +483,43 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case queue:out(ActiveConsumers) of
+ case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
{false, State};
- {{value, QEntry}, Tail} ->
+ {{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
- DeliverFun, QEntry,
+ DeliverFun, QEntry, Priority,
State#q{active_consumers = Tail}),
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, Priority, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
- true -> block_consumer(C, E),
+ true -> block_consumer(C, E, State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
- block_consumer(C#cr{limiter = Limiter}, E),
+ block_consumer(C#cr{limiter = Limiter}, E, State),
{false, State};
{continue, Limiter} ->
- AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
+ AC1 = priority_queue:in(E, Priority,
+ State#q.active_consumers),
+ deliver_msg_to_consumer0(
DeliverFun, Consumer, C#cr{limiter = Limiter},
State#q{active_consumers = AC1})
end
end.
-deliver_msg_to_consumer(DeliverFun,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired},
- C = #cr{ch_pid = ChPid,
- acktags = ChAckTags,
- unsent_message_count = Count},
- State = #q{q = #amqqueue{name = QName}}) ->
+deliver_msg_to_consumer0(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ State = #q{q = #amqqueue{name = QName}}) ->
{{Message, IsDelivered, AckTag}, Stop, State1} =
DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
@@ -534,6 +587,13 @@ run_message_queue(State) ->
is_empty(State), State),
State1.
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, ActiveConsumers) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, ActiveConsumers).
+
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -630,17 +690,17 @@ requeue(AckTags, ChPid, State) ->
fun (State1) -> requeue_and_run(AckTags, State1) end).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
- end, Queue).
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
+ end, Queue).
remove_consumers(ChPid, Queue, QName) ->
- queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag, QName),
- false;
- (_) ->
- true
- end, Queue).
+ priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag, QName),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -655,18 +715,22 @@ possibly_unblock(State, ChPid, Update) ->
unblock(State, C = #cr{limiter = Limiter}) ->
case lists:partition(
- fun({_ChPid, #consumer{tag = CTag}}) ->
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
- end, queue:to_list(C#cr.blocked_consumers)) of
+ end, priority_queue:to_list(C#cr.blocked_consumers)) of
{_, []} ->
update_ch_record(C),
State;
{Blocked, Unblocked} ->
- BlockedQ = queue:from_list(Blocked),
- UnblockedQ = queue:from_list(Unblocked),
+ BlockedQ = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = queue:join(State#q.active_consumers, UnblockedQ),
- run_message_queue(State#q{active_consumers = AC1})
+ AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
+ State1 = State#q{active_consumers = AC1},
+ [notify_decorators(
+ consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -951,8 +1015,7 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
-i(policy, #q{q = #amqqueue{name = Name}}) ->
- {ok, Q} = rabbit_amqqueue:lookup(Name),
+i(policy, #q{q = Q}) ->
case rabbit_policy:name(Q) of
none -> '';
Policy -> Policy
@@ -1003,9 +1066,10 @@ consumers(#q{active_consumers = ActiveConsumers}) ->
consumers(ActiveConsumers, []), all_ch_record()).
consumers(Consumers, Acc) ->
- rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) ->
- [{ChPid, CTag, AckRequired} | Acc1]
+ priority_queue:fold(
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Args} | Acc1]
end, Acc, Consumers).
emit_stats(State) ->
@@ -1014,13 +1078,14 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
- [{consumer_tag, ConsumerTag},
+ [{consumer_tag, CTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, QName}]).
+ {queue, QName},
+ {arguments, Args}]).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
@@ -1052,8 +1117,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
+ {maybe_expire, _Version} -> 8;
+ {drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
_ -> 0
@@ -1063,6 +1128,10 @@ handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
+%% You used to be able to declare an exclusive durable queue. Sadly we
+%% need to still tidy up after that case, there could be the remnants
+%% of one left over from an upgrade. So that's why we don't enforce
+%% Recover = false here.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
case rabbit_misc:is_process_alive(Owner) of
@@ -1129,7 +1198,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
+ ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
@@ -1152,8 +1221,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
true -> send_drained(C1);
false -> ok
end,
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not NoAck},
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck,
+ args = OtherArgs},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
end,
@@ -1161,9 +1231,12 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1)),
- AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
- reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
+ not NoAck, qname(State1), OtherArgs),
+ AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
+ State2 = State1#q{active_consumers = AC1},
+ notify_decorators(
+ basic_consume, [{consumer_tag, ConsumerTag}], State2),
+ reply(ok, run_message_queue(State2))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -1193,6 +1266,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
+ notify_decorators(
+ basic_cancel, [{consumer_tag, ConsumerTag}], State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1259,10 +1334,11 @@ handle_call(force_event_refresh, _From,
QName = qname(State),
case Exclusive of
none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName) ||
- {Ch, CTag, AckRequired} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired, QName)
+ Ch, CTag, false, AckRequired, QName, Args) ||
+ {Ch, CTag, AckRequired, Args} <- consumers(State)];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State),
+ emit_consumer_created(
+ Ch, CTag, true, AckRequired, QName, Args)
end,
reply(ok, State).
@@ -1369,20 +1445,37 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
end
end);
-handle_cast(wake_up, State) ->
- noreply(State).
+handle_cast(notify_decorators, State) ->
+ notify_decorators(refresh, [], State),
+ noreply(State);
+
+handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
+ %% We depend on the #q.q field being up to date at least WRT
+ %% policy (but not slave pids) in various places, so when it
+ %% changes we go and read it from Mnesia again.
+ %%
+ %% This also has the side effect of waking us up so we emit a
+ %% stats event - so event consumers see the changed policy.
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
+ noreply(process_args_policy(State#q{q = Q})).
-handle_info(maybe_expire, State) ->
+handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) ->
case is_unused(State) of
true -> stop(State);
false -> noreply(State#q{expiry_timer_ref = undefined})
end;
-handle_info(drop_expired, State) ->
+handle_info({maybe_expire, _Vsn}, State) ->
+ noreply(State);
+
+handle_info({drop_expired, Vsn}, State = #q{args_policy_version = Vsn}) ->
WasEmpty = is_empty(State),
State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
noreply(maybe_send_drained(WasEmpty, State1));
+handle_info({drop_expired, _Vsn}, State) ->
+ noreply(State);
+
handle_info(emit_stats, State) ->
emit_stats(State),
%% Don't call noreply/1, we don't want to set timers
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 0515e82e..74ae59da 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -47,6 +47,6 @@ start_child(Node, Args) ->
supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
+ {ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index a7e8fb36..5ab22e75 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -31,9 +31,8 @@
%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
%% apparently, by OpenAMQ.
-%% TODO: once the minimum erlang becomes R13B03, reimplement this
-%% using the binary module - that makes use of BIFs to do binary
-%% matching and will thus be much faster.
+%% TODO: reimplement this using the binary module? - that makes use of
+%% BIFs to do binary matching and will thus be much faster.
description() ->
[{description, <<"SASL PLAIN authentication mechanism">>}].
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 5ff96c23..11e6bd38 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -152,7 +152,7 @@ exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->
rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
- end).
+ end, fun not_found_or_absent_errs/1).
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
@@ -177,7 +177,7 @@ add(Binding, InnerFun) ->
{error, _} = Err ->
rabbit_misc:const(Err)
end
- end).
+ end, fun not_found_or_absent_errs/1).
add(Src, Dst, B) ->
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
@@ -200,14 +200,15 @@ remove(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- case mnesia:read(rabbit_route, B, write) of
- [] -> rabbit_misc:const({error, binding_not_found});
- [_] -> case InnerFun(Src, Dst) of
- ok -> remove(Src, Dst, B);
- {error, _} = Err -> rabbit_misc:const(Err)
- end
+ case mnesia:read(rabbit_route, B, write) =:= [] andalso
+ mnesia:read(rabbit_durable_route, B, write) =/= [] of
+ true -> rabbit_misc:const({error, binding_not_found});
+ false -> case InnerFun(Src, Dst) of
+ ok -> remove(Src, Dst, B);
+ {error, _} = Err -> rabbit_misc:const(Err)
+ end
end
- end).
+ end, fun absent_errs_only/1).
remove(Src, Dst, B) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
@@ -308,13 +309,13 @@ durable(#amqqueue{durable = D}) -> D.
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
- args = Arguments}, Fun) ->
+ args = Arguments}, Fun, ErrFun) ->
call_with_source_and_destination(
SrcName, DstName,
fun (Src, Dst) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
Fun(Src, Dst, Binding#binding{args = SortedArgs})
- end).
+ end, ErrFun).
delete_object(Tab, Record, LockKind) ->
%% this 'guarded' delete prevents unnecessary writes to the mnesia
@@ -339,13 +340,9 @@ sync_transient_route(Route, Fun) ->
ok = Fun(rabbit_route, Route, write),
ok = Fun(rabbit_reverse_route, reverse_route(Route), write).
-call_with_source_and_destination(SrcName, DstName, Fun) ->
+call_with_source_and_destination(SrcName, DstName, Fun, ErrFun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Names) ->
- Errs = [not_found_or_absent(Name) || Name <- Names],
- rabbit_misc:const({error, {resources_missing, Errs}})
- end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
@@ -357,6 +354,18 @@ call_with_source_and_destination(SrcName, DstName, Fun) ->
end
end).
+not_found_or_absent_errs(Names) ->
+ Errs = [not_found_or_absent(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}}).
+
+absent_errs_only(Names) ->
+ Errs = [E || Name <- Names,
+ {absent, _Q} = E <- [not_found_or_absent(Name)]],
+ rabbit_misc:const(case Errs of
+ [] -> ok;
+ _ -> {error, {resources_missing, Errs}}
+ end).
+
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6c04f4cd..dc37959b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -734,7 +734,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
- arguments = Arguments},
+ arguments = Args},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -755,12 +755,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
+ {CreditArgs, OtherArgs} = parse_credit_args(Args),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
- parse_credit_args(Arguments),
+ CreditArgs, OtherArgs,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -934,7 +935,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
- rabbit_misc:not_found(ExchangeName);
+ return_ok(State, NoWait, #'exchange.delete_ok'{});
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
@@ -961,7 +962,7 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin,
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
- durable = Durable,
+ durable = DurableDeclare,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
@@ -973,6 +974,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ConnPid;
false -> none
end,
+ Durable = DurableDeclare andalso not ExclusiveDeclare,
ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
"amq.gen");
@@ -1052,9 +1054,15 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
+ end,
+ fun (not_found) -> {ok, 0};
+ ({absent, Q}) -> rabbit_misc:absent(Q)
+ end) of
{error, in_use} ->
precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
@@ -1246,12 +1254,12 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
- {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
- _ -> none
- end;
- undefined -> none
+ {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
+ undefined -> {none, Arguments}
end.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 1d9ba48b..e2c255db 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,6 +43,6 @@ start_channel(Pid, Args) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{channel_sup, {rabbit_channel_sup, start_link, []},
temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index d6536e16..843bb615 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -49,8 +49,9 @@ start_link_worker(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, {Callback, worker}).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}};
init({{M,F,A}, worker}) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{client, {M,F,A}, temporary, ?MAX_WAIT, worker, [M]}]}}.
+
diff --git a/src/rabbit_intermediate_sup.erl b/src/rabbit_connection_helper_sup.erl
index a9381f20..e51615e8 100644
--- a/src/rabbit_intermediate_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -11,21 +11,27 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2013-2013 GoPivotal, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
--module(rabbit_intermediate_sup).
+-module(rabbit_connection_helper_sup).
-behaviour(supervisor2).
-export([start_link/0]).
+-export([start_channel_sup_sup/1,
+ start_queue_collector/1]).
-export([init/1]).
+-include("rabbit.hrl").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -33,7 +39,20 @@
start_link() ->
supervisor2:start_link(?MODULE, []).
+start_channel_sup_sup(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
+
+start_queue_collector(SupPid) ->
+ supervisor2:start_child(
+ SupPid,
+ {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
+
%%----------------------------------------------------------------------------
init([]) ->
{ok, {{one_for_one, 10, 10}, []}}.
+
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index c1fa17aa..9ed5dc77 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -37,27 +37,25 @@
start_link() ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
- {ok, Collector} =
- supervisor2:start_child(
- SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
- intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
%% We need to get channels in the hierarchy here so they get shut
%% down after the reader, so the reader gets a chance to terminate
%% them cleanly. But for 1.0 readers we can't start the real
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
%% so we add another supervisor into the hierarchy.
- {ok, ChannelSup3Pid} =
+ %%
+ %% This supervisor also acts as an intermediary for heartbeaters and
+ %% the queue collector process, since these must not be siblings of the
+ %% reader due to the potential for deadlock if they are added/restarted
+ %% whilst the supervision tree is shutting down.
+ {ok, HelperSup} =
supervisor2:start_child(
SupPid,
- {channel_sup3, {rabbit_intermediate_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_intermediate_sup]}),
+ {helper_sup, {rabbit_connection_helper_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_connection_helper_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
- {reader, {rabbit_reader, start_link,
- [ChannelSup3Pid, Collector,
- rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
+ {reader, {rabbit_reader, start_link, [HelperSup]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0b666a36..6f36f99d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,8 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]).
+-export([start/0, stop/0, parse_arguments/2, action/5,
+ sync_queue/1, cancel_sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -25,12 +26,16 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(PRIORITY_OPT, "--priority").
+-define(APPLY_TO_OPT, "--apply-to").
-define(RAM_OPT, "--ram").
-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(PRIORITY_DEF, {?PRIORITY_OPT, {option, "0"}}).
+-define(APPLY_TO_DEF, {?APPLY_TO_OPT, {option, "all"}}).
-define(RAM_DEF, {?RAM_OPT, flag}).
-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
@@ -72,7 +77,7 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
- {set_policy, [?VHOST_DEF]},
+ {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
@@ -127,19 +132,13 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
end,
- Opts1 = [case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
- _ -> {K, V}
- end || {K, V} <- Opts],
- Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
- Node = proplists:get_value(?NODE_OPT, Opts1),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts),
+ Node = proplists:get_value(?NODE_OPT, Opts),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -230,6 +229,19 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
rabbit_misc:quit(1).
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
%%----------------------------------------------------------------------------
action(stop, Node, Args, _Opts, Inform) ->
@@ -484,16 +496,15 @@ action(list_parameters, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
-action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
- when Prio == [] orelse length(Prio) == 1 ->
- Msg = "Setting policy ~p for pattern ~p to ~p",
- {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
- [P] -> {Msg ++ " with priority ~s", P}
- end,
+action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
+ Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p",
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
- rpc_call(Node, rabbit_policy, parse_set,
- [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
+ PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
+ ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
+ Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
+ rpc_call(
+ Node, rabbit_policy, parse_set,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]);
action(clear_policy, Node, [Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 9002514f..5a004792 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -35,8 +35,10 @@
{rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ rabbit_types:ok_or_error2(
+ {rabbit_types:user(), rabbit_framing:amqp_table()},
+ 'broker_not_found_on_node' |
+ {'auth_failure', string()} | 'access_refused')).
-spec(start_channel/9 ::
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
@@ -76,21 +78,24 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
end;
connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid,
- Infos);
+ connect0(fun () -> rabbit_access_control:check_user_pass_login(
+ Username, Password) end,
+ VHost, Protocol, Pid, Infos);
connect(Username, VHost, Protocol, Pid, Infos) ->
- connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos).
+ connect0(fun () -> rabbit_access_control:check_user_login(
+ Username, []) end,
+ VHost, Protocol, Pid, Infos).
-connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) ->
+connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
- true ->
- case rabbit_access_control:FunctionName(U, P) of
- {ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
- {refused, _M, _A} -> {error, auth_failure}
- end;
- false ->
- {error, broker_not_found_on_node}
+ true -> case AuthFun() of
+ {ok, User} ->
+ connect(User, VHost, Protocol, Pid, Infos);
+ {refused, _M, _A} ->
+ {error, {auth_failure, "Refused"}}
+ end;
+ false -> {error, broker_not_found_on_node}
end.
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 5aaa1b2d..f153641e 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -23,16 +23,22 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([get_disk_free_limit/0, set_disk_free_limit/1, get_check_interval/0,
- set_check_interval/1, get_disk_free/0]).
+-export([get_disk_free_limit/0, set_disk_free_limit/1,
+ get_min_check_interval/0, set_min_check_interval/1,
+ get_max_check_interval/0, set_max_check_interval/1,
+ get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
+-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100).
+-define(DEFAULT_MAX_DISK_CHECK_INTERVAL, 10000).
+%% 250MB/s i.e. 250kB/ms
+-define(FAST_RATE, (250 * 1000)).
-record(state, {dir,
limit,
actual,
- timeout,
+ min_interval,
+ max_interval,
timer,
alarmed
}).
@@ -45,8 +51,10 @@
-spec(start_link/1 :: (disk_free_limit()) -> rabbit_types:ok_pid_or_error()).
-spec(get_disk_free_limit/0 :: () -> integer()).
-spec(set_disk_free_limit/1 :: (disk_free_limit()) -> 'ok').
--spec(get_check_interval/0 :: () -> integer()).
--spec(set_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_min_check_interval/0 :: () -> integer()).
+-spec(set_min_check_interval/1 :: (integer()) -> 'ok').
+-spec(get_max_check_interval/0 :: () -> integer()).
+-spec(set_max_check_interval/1 :: (integer()) -> 'ok').
-spec(get_disk_free/0 :: () -> (integer() | 'unknown')).
-endif.
@@ -61,11 +69,17 @@ get_disk_free_limit() ->
set_disk_free_limit(Limit) ->
gen_server:call(?MODULE, {set_disk_free_limit, Limit}, infinity).
-get_check_interval() ->
- gen_server:call(?MODULE, get_check_interval, infinity).
+get_min_check_interval() ->
+ gen_server:call(?MODULE, get_min_check_interval, infinity).
-set_check_interval(Interval) ->
- gen_server:call(?MODULE, {set_check_interval, Interval}, infinity).
+set_min_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_min_check_interval, Interval}, infinity).
+
+get_max_check_interval() ->
+ gen_server:call(?MODULE, get_max_check_interval, infinity).
+
+set_max_check_interval(Interval) ->
+ gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity).
get_disk_free() ->
gen_server:call(?MODULE, get_disk_free, infinity).
@@ -78,16 +92,15 @@ start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
init([Limit]) ->
- TRef = start_timer(?DEFAULT_DISK_CHECK_INTERVAL),
Dir = dir(),
- State = #state { dir = Dir,
- timeout = ?DEFAULT_DISK_CHECK_INTERVAL,
- timer = TRef,
- alarmed = false},
+ State = #state{dir = Dir,
+ min_interval = ?DEFAULT_MIN_DISK_CHECK_INTERVAL,
+ max_interval = ?DEFAULT_MAX_DISK_CHECK_INTERVAL,
+ alarmed = false},
case {catch get_disk_free(Dir),
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
- {ok, set_disk_limits(State, Limit)};
+ {ok, start_timer(set_disk_limits(State, Limit))};
Err ->
rabbit_log:info("Disabling disk free space monitoring "
"on unsupported platform: ~p~n", [Err]),
@@ -100,12 +113,17 @@ handle_call(get_disk_free_limit, _From, State) ->
handle_call({set_disk_free_limit, Limit}, _From, State) ->
{reply, ok, set_disk_limits(State, Limit)};
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
+handle_call(get_min_check_interval, _From, State) ->
+ {reply, State#state.min_interval, State};
+
+handle_call(get_max_check_interval, _From, State) ->
+ {reply, State#state.max_interval, State};
+
+handle_call({set_min_check_interval, MinInterval}, _From, State) ->
+ {reply, ok, State#state{min_interval = MinInterval}};
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+handle_call({set_max_check_interval, MaxInterval}, _From, State) ->
+ {reply, ok, State#state{max_interval = MaxInterval}};
handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
{reply, Actual, State};
@@ -117,7 +135,7 @@ handle_cast(_Request, State) ->
{noreply, State}.
handle_info(update, State) ->
- {noreply, internal_update(State)};
+ {noreply, start_timer(internal_update(State))};
handle_info(_Info, State) ->
{noreply, State}.
@@ -193,6 +211,15 @@ emit_update_info(StateStr, CurrentFree, Limit) ->
"Disk free space ~s. Free bytes:~p Limit:~p~n",
[StateStr, CurrentFree, Limit]).
-start_timer(Timeout) ->
- {ok, TRef} = timer:send_interval(Timeout, update),
- TRef.
+start_timer(State) ->
+ State#state{timer = erlang:send_after(interval(State), self(), update)}.
+
+interval(#state{alarmed = true,
+ max_interval = MaxInterval}) ->
+ MaxInterval;
+interval(#state{limit = Limit,
+ actual = Actual,
+ min_interval = MinInterval,
+ max_interval = MaxInterval}) ->
+ IdealInterval = 2 * (Actual - Limit) / ?FAST_RATE,
+ trunc(erlang:max(MinInterval, erlang:min(MaxInterval, IdealInterval))).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 4d3ddc79..a713d76b 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -141,8 +141,6 @@ notify_if(true, Type, Props) -> notify(Type, Props);
notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) ->
- %% TODO: switch to os:timestamp() when we drop support for
- %% Erlang/OTP < R13B01
gen_event:notify(?MODULE, #event{type = Type,
props = Props,
- timestamp = now()}).
+ timestamp = os:timestamp()}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 49952a4d..fc131519 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -347,12 +347,11 @@ route1(Delivery, Decorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
AlternateDests ++ DecorateDests ++ ExchangeDests)).
-process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation
- [];
-process_alternate(#exchange{name = XName, arguments = Args}, []) ->
- case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
+process_alternate(X = #exchange{name = XName}, []) ->
+ case rabbit_policy:get_arg(
+ <<"alternate-exchange">>, <<"alternate-exchange">>, X) of
undefined -> [];
- AName -> [AName]
+ AName -> [rabbit_misc:r(XName, exchange, AName)]
end;
process_alternate(_X, _Results) ->
[].
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index baec9c29..c841560e 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -103,17 +103,15 @@ headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
AllMatch, AnyMatch, MatchKind) when PK == DK ->
{AllMatch1, AnyMatch1} =
- if
+ case rabbit_misc:type_class(PT) == rabbit_misc:type_class(DT) of
%% It's not properly specified, but a "no value" in a
%% pattern field is supposed to mean simple presence of
%% the corresponding data field. I've interpreted that to
%% mean a type of "void" for the pattern field.
- PT == void -> {AllMatch, true};
- %% Similarly, it's not specified, but I assume that a
- %% mismatched type causes a mismatched value.
- PT =/= DT -> {false, AnyMatch};
- PV == DV -> {AllMatch, true};
- true -> {false, AnyMatch}
+ _ when PT == void -> {AllMatch, true};
+ false -> {false, AnyMatch};
+ _ when PV == DV -> {AllMatch, true};
+ _ -> {false, AnyMatch}
end,
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index df9baed9..ca67254b 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,11 @@
-module(rabbit_heartbeat).
+-export([start/6]).
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
- start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
+ pause_monitor/1, resume_monitor/1]).
+
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
-include("rabbit.hrl").
@@ -26,16 +29,15 @@
-ifdef(use_specs).
-export_type([heartbeaters/0]).
--export_type([start_heartbeat_fun/0]).
-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}).
-type(heartbeat_callback() :: fun (() -> any())).
--type(start_heartbeat_fun() ::
- fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
- non_neg_integer(), heartbeat_callback()) ->
- no_return())).
+-spec(start/6 ::
+ (pid(), rabbit_net:socket(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
-spec(start_heartbeat_sender/3 ::
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
@@ -44,17 +46,28 @@
(rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
rabbit_types:ok(pid())).
--spec(start_heartbeat_fun/1 ::
- (pid()) -> start_heartbeat_fun()).
-
-
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,{_, _}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
-endif.
%%----------------------------------------------------------------------------
+start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ {ok, Sender} =
+ start_heartbeater(SendTimeoutSec, SupPid, Sock,
+ SendFun, heartbeat_sender,
+ start_heartbeat_sender),
+ {ok, Receiver} =
+ start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
+ ReceiveFun, heartbeat_receiver,
+ start_heartbeat_receiver),
+ {Sender, Receiver}.
+
start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
@@ -69,25 +82,21 @@ start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
fun () -> ReceiveFun(), stop end}).
-start_heartbeat_fun(SupPid) ->
- fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
- {ok, Sender} =
- start_heartbeater(SendTimeoutSec, SupPid, Sock,
- SendFun, heartbeat_sender,
- start_heartbeat_sender),
- {ok, Receiver} =
- start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
- ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
- {Sender, Receiver}
- end.
-
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
resume_monitor({_Sender, none}) -> ok;
resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok.
+system_continue(_Parent, Deb, {Params, State}) ->
+ heartbeater(Params, Deb, State).
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
%%----------------------------------------------------------------------------
start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
{ok, none};
@@ -98,17 +107,27 @@ start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, V) end,
+ Deb, {StatVal, SameCount} = State) ->
+ Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end,
+ System = fun (From, Req) ->
+ sys:handle_system_msg(
+ Req, From, self(), ?MODULE, Deb, {Params, State})
+ end,
receive
- pause -> receive
- resume -> Recurse({0, 0});
- Other -> exit({unexpected_message, Other})
- end;
- Other -> exit({unexpected_message, Other})
+ pause ->
+ receive
+ resume -> Recurse({0, 0});
+ {system, From, Req} -> System(From, Req);
+ Other -> exit({unexpected_message, Other})
+ end;
+ {system, From, Req} ->
+ System(From, Req);
+ Other ->
+ exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 495f6fdd..4bd1a575 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -25,7 +25,7 @@
-behaviour(gen_server2).
-export([start_link/0, register/2, deregister/1,
- report_ram_duration/2, stop/0]).
+ report_ram_duration/2, stop/0, conserve_resources/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,24 +36,14 @@
queue_durations, %% ets #process
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
- desired_duration %% the desired queue duration
+ desired_duration, %% the desired queue duration
+ disk_alarm %% disable paging, disk alarm has fired
}).
-define(SERVER, ?MODULE).
-define(DEFAULT_UPDATE_INTERVAL, 2500).
-define(TABLE_NAME, ?MODULE).
-%% Because we have a feedback loop here, we need to ensure that we
-%% have some space for when the queues don't quite respond as fast as
-%% we would like, or when there is buffering going on in other parts
-%% of the system. In short, we aim to stay some distance away from
-%% when the memory alarms will go off, which cause backpressure (of
-%% some sort) on producers. Note that all other Thresholds are
-%% relative to this scaling.
--define(MEMORY_LIMIT_SCALING, 0.4).
-
--define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this
-
%% If all queues are pushed to disk (duration 0), then the sum of
%% their reported lengths will be 0. If memory then becomes available,
%% unless we manually intervene, the sum will remain 0, and the queues
@@ -97,6 +87,11 @@ report_ram_duration(Pid, QueueDuration) ->
stop() ->
gen_server2:cast(?SERVER, stop).
+conserve_resources(Pid, disk, Conserve) ->
+ gen_server2:cast(Pid, {disk_alarm, Conserve});
+conserve_resources(_Pid, _Source, _Conserve) ->
+ ok.
+
%%----------------------------------------------------------------------------
%% Gen_server callbacks
%%----------------------------------------------------------------------------
@@ -105,13 +100,14 @@ init([]) ->
{ok, TRef} = timer:send_interval(?DEFAULT_UPDATE_INTERVAL, update),
Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
-
+ Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
{ok, internal_update(
#state { timer = TRef,
queue_durations = Ets,
queue_duration_sum = 0.0,
queue_duration_count = 0,
- desired_duration = infinity })}.
+ desired_duration = infinity,
+ disk_alarm = lists:member(disk, Alarms)})}.
handle_call({report_ram_duration, Pid, QueueDuration}, From,
State = #state { queue_duration_sum = Sum,
@@ -148,6 +144,12 @@ handle_call({register, Pid, MFA}, _From,
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({disk_alarm, Alarm}, State = #state{disk_alarm = Alarm}) ->
+ {noreply, State};
+
+handle_cast({disk_alarm, Alarm}, State) ->
+ {noreply, internal_update(State#state{disk_alarm = Alarm})};
+
handle_cast({deregister, Pid}, State) ->
{noreply, internal_deregister(Pid, true, State)};
@@ -203,55 +205,65 @@ internal_deregister(Pid, Demonitor,
queue_duration_count = Count1 }
end.
-internal_update(State = #state { queue_durations = Durations,
- desired_duration = DesiredDurationAvg,
- queue_duration_sum = Sum,
- queue_duration_count = Count }) ->
- MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(),
+internal_update(State = #state{queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ disk_alarm = DiskAlarm}) ->
+ DesiredDurationAvg1 = desired_duration_average(State),
+ ShouldInform = should_inform_predicate(DiskAlarm),
+ case ShouldInform(DesiredDurationAvg, DesiredDurationAvg1) of
+ true -> inform_queues(ShouldInform, DesiredDurationAvg1, Durations);
+ false -> ok
+ end,
+ State#state{desired_duration = DesiredDurationAvg1}.
+
+desired_duration_average(#state{disk_alarm = true}) ->
+ infinity;
+desired_duration_average(#state{disk_alarm = false,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count}) ->
+ {ok, LimitThreshold} =
+ application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
+ MemoryLimit = vm_memory_monitor:get_memory_limit(),
MemoryRatio = case MemoryLimit > 0.0 of
true -> erlang:memory(total) / MemoryLimit;
false -> infinity
end,
- DesiredDurationAvg1 =
- if MemoryRatio =:= infinity ->
- 0.0;
- MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 ->
- infinity;
- MemoryRatio < ?SUM_INC_THRESHOLD ->
- ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
- true ->
- (Sum / Count) / MemoryRatio
- end,
- State1 = State #state { desired_duration = DesiredDurationAvg1 },
-
- %% only inform queues immediately if the desired duration has
- %% decreased
- case DesiredDurationAvg1 == infinity orelse
- (DesiredDurationAvg /= infinity andalso
- DesiredDurationAvg1 >= DesiredDurationAvg) of
- true ->
- ok;
- false ->
- true =
- ets:foldl(
- fun (Proc = #process { reported = QueueDuration,
- sent = PrevSendDuration,
- callback = {M, F, A} }, true) ->
- case should_send(QueueDuration, PrevSendDuration,
- DesiredDurationAvg1) of
- true -> ok = erlang:apply(
- M, F, A ++ [DesiredDurationAvg1]),
- ets:insert(
- Durations,
- Proc #process {
- sent = DesiredDurationAvg1});
- false -> true
- end
- end, true, Durations)
- end,
- State1.
+ if MemoryRatio =:= infinity ->
+ 0.0;
+ MemoryRatio < LimitThreshold orelse Count == 0 ->
+ infinity;
+ MemoryRatio < ?SUM_INC_THRESHOLD ->
+ ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
+ true ->
+ (Sum / Count) / MemoryRatio
+ end.
-should_send(infinity, infinity, _) -> true;
-should_send(infinity, D, DD) -> DD < D;
-should_send(D, infinity, DD) -> DD < D;
-should_send(D1, D2, DD) -> DD < lists:min([D1, D2]).
+inform_queues(ShouldInform, DesiredDurationAvg, Durations) ->
+ true =
+ ets:foldl(
+ fun (Proc = #process{reported = QueueDuration,
+ sent = PrevSendDuration,
+ callback = {M, F, A}}, true) ->
+ case ShouldInform(PrevSendDuration, DesiredDurationAvg)
+ andalso ShouldInform(QueueDuration, DesiredDurationAvg) of
+ true -> ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg]),
+ ets:insert(
+ Durations,
+ Proc#process{sent = DesiredDurationAvg});
+ false -> true
+ end
+ end, true, Durations).
+
+%% In normal use, we only inform queues immediately if the desired
+%% duration has decreased, we want to ensure timely paging.
+should_inform_predicate(false) -> fun greater_than/2;
+%% When the disk alarm has gone off though, we want to inform queues
+%% immediately if the desired duration has *increased* - we want to
+%% ensure timely stopping paging.
+should_inform_predicate(true) -> fun (D1, D2) -> greater_than(D2, D1) end.
+
+greater_than(infinity, infinity) -> false;
+greater_than(infinity, _D2) -> true;
+greater_than(_D1, infinity) -> false;
+greater_than(D1, D2) -> D1 > D2.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index ab466c3c..a0e8bcc6 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -222,20 +222,19 @@
%% sender_death message to all the slaves, saying the sender has
%% died. Once the slaves receive the sender_death message, they know
%% that they're not going to receive any more instructions from the gm
-%% regarding that sender, thus they throw away any publications from
-%% the sender pending publication instructions. However, it is
-%% possible that the coordinator receives the DOWN and communicates
-%% that to the master before the master has finished receiving and
-%% processing publishes from the sender. This turns out not to be a
-%% problem: the sender has actually died, and so will not need to
-%% receive confirms or other feedback, and should further messages be
-%% "received" from the sender, the master will ask the coordinator to
-%% set up a new monitor, and will continue to process the messages
-%% normally. Slaves may thus receive publishes via gm from previously
-%% declared "dead" senders, but again, this is fine: should the slave
-%% have just thrown out the message it had received directly from the
-%% sender (due to receiving a sender_death message via gm), it will be
-%% able to cope with the publication purely from the master via gm.
+%% regarding that sender. However, it is possible that the coordinator
+%% receives the DOWN and communicates that to the master before the
+%% master has finished receiving and processing publishes from the
+%% sender. This turns out not to be a problem: the sender has actually
+%% died, and so will not need to receive confirms or other feedback,
+%% and should further messages be "received" from the sender, the
+%% master will ask the coordinator to set up a new monitor, and
+%% will continue to process the messages normally. Slaves may thus
+%% receive publishes via gm from previously declared "dead" senders,
+%% but again, this is fine: should the slave have just thrown out the
+%% message it had received directly from the sender (due to receiving
+%% a sender_death message via gm), it will be able to cope with the
+%% publication purely from the master via gm.
%%
%% When a slave receives a DOWN message for a sender, if it has not
%% received the sender_death message from the master via gm already,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4ea1d984..8ad7c62f 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -204,8 +204,6 @@ start_child(Name, MirrorNode, Q) ->
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
- rabbit_event:notify(queue_mirror_deaths, [{name, QueueName},
- {pids, DeadPids}]),
rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
[rabbit_misc:rs(QueueName),
case IsMaster of
@@ -223,7 +221,7 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
Q1 = Q#amqqueue{sync_slave_pids = SSPids1},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
- rabbit_amqqueue:wake_up(Q1),
+ rabbit_amqqueue:notify_policy_changed(Q1),
Q1.
%%----------------------------------------------------------------------------
@@ -239,28 +237,32 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, All) ->
+suggested_queue_nodes(Q = #amqqueue{exclusive_owner = Owner}, All) ->
{MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
- Params = policy(<<"ha-params">>, Q),
- case module(Q) of
- {ok, M} -> M:suggested_queue_nodes(Params, MNode, SNodes, SSNodes, All);
- _ -> {MNode, []}
+ case Owner of
+ none -> Params = policy(<<"ha-params">>, Q),
+ case module(Q) of
+ {ok, M} -> M:suggested_queue_nodes(
+ Params, MNode, SNodes, SSNodes, All);
+ _ -> {MNode, []}
+ end;
+ _ -> {MNode, []}
end.
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
- {ok, P} -> P;
- _ -> none
+ undefined -> none;
+ P -> P
end.
module(#amqqueue{} = Q) ->
case rabbit_policy:get(<<"ha-mode">>, Q) of
- {ok, Mode} -> module(Mode);
- _ -> not_mirrored
+ undefined -> not_mirrored;
+ Mode -> module(Mode)
end;
module(Mode) when is_binary(Mode) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a5d1f68e..b1a86493 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -41,15 +41,13 @@
%%----------------------------------------------------------------------------
--define(CREATION_EVENT_KEYS,
+-define(INFO_KEYS,
[pid,
name,
master_pid,
is_synchronised
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -61,7 +59,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId, ChState}
msg_id_ack, %% :: MsgId -> AckTag
msg_id_status,
@@ -120,12 +118,10 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- known_senders = pmon:new(),
+ known_senders = pmon:new(delegate),
depth_delta = undefined
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
@@ -284,7 +280,8 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
- noreply(local_sender_death(ChPid, State));
+ local_sender_death(ChPid, State),
+ noreply(maybe_forget_sender(ChPid, down_from_ch, State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -574,10 +571,15 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- Delivery <- queue:to_list(PubQ)],
+ Deliveries = [Delivery ||
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ Delivery <- queue:to_list(PubQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ KS1 = lists:foldl(fun (ChPid0, KS0) ->
+ pmon:demonitor(ChPid0, KS0)
+ end, KS, AwaitGmDown),
rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
noreply(State) ->
@@ -617,7 +619,7 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
-local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+local_sender_death(ChPid, #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
@@ -625,8 +627,7 @@ local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
- end,
- State.
+ end.
confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
@@ -655,12 +656,38 @@ confirm_sender_death(Pid) ->
State
end,
%% Note that we do not remove our knowledge of this ChPid until we
- %% get the sender_death from GM.
+ %% get the sender_death from GM as well as a DOWN notification.
{ok, _TRef} = timer:apply_after(
?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue,
[self(), rabbit_mirror_queue_master, Fun]),
ok.
+forget_sender(_, running) -> false;
+forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
+
+%% Record and process lifetime events from channels. Forget all about a channel
+%% only when down notifications are received from both the channel and from gm.
+maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ State;
+ {ok, {MQ, PendCh, ChStateRecord}} ->
+ case forget_sender(ChState, ChStateRecord) of
+ true ->
+ credit_flow:peer_down(ChPid),
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = lists:foldl(
+ fun dict:erase/2,
+ MS, sets:to_list(PendCh)),
+ known_senders = pmon:demonitor(ChPid, KS) };
+ false ->
+ SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ State #state { sender_queues = SQ1 }
+ end
+ end.
+
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
@@ -669,9 +696,9 @@ maybe_enqueue_message(
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
@@ -683,7 +710,7 @@ maybe_enqueue_message(
get_sender_queue(ChPid, SQ) ->
case dict:find(ChPid, SQ) of
- error -> {queue:new(), sets:new()};
+ error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
@@ -691,19 +718,20 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
case dict:find(ChPid, SQ) of
error ->
SQ;
- {ok, {MQ, PendingCh}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ {ok, {MQ, PendingCh, ChState}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ SQ)
end.
publish_or_discard(Status, ChPid, MsgId,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
%% We really are going to do the publish/discard right now, even
%% though we may not have seen it directly from the channel. But
- %% we cannot issues confirms until the latter has happened. So we
+ %% we cannot issue confirms until the latter has happened. So we
%% need to keep track of the MsgId and its confirmation status in
%% the meantime.
State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
@@ -723,7 +751,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -784,25 +812,13 @@ process_instruction({requeue, MsgIds},
{ok, State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
- State = #state { sender_queues = SQ,
- msg_id_status = MS,
- known_senders = KS }) ->
+ State = #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a message
%% from it. In this case we just want to avoid doing work if we
%% never got any messages.
{ok, case pmon:is_monitored(ChPid, KS) of
false -> State;
- true -> MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = pmon:demonitor(ChPid, KS) }
+ true -> maybe_forget_sender(ChPid, down_from_gm, State)
end};
process_instruction({depth, Depth},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 43dbb4e9..6fba99db 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -31,7 +31,7 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
+ {ok, {{simple_one_for_one, 10, 10},
[{rabbit_mirror_queue_slave,
{rabbit_mirror_queue_slave, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index bca9d5ce..00c4eaf3 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -21,7 +21,8 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1, absent/1, assert_args_equivalence/4]).
+-export([not_found/1, absent/1]).
+-export([type_class/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
-export([table_lookup/2, set_table_value/4]).
-export([r/3, r/2, r_arg/4, rs/1]).
@@ -75,9 +76,6 @@
R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal;
R =:= shutdown).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -120,6 +118,7 @@
(rabbit_types:amqp_error()) -> channel_or_connection_exit()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
-spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()).
+-spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 0ed6c7b2..3a8fae7f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -396,7 +396,7 @@ cluster_status(WhichNodes) ->
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
- delegate_beam_hash(), cluster_status_from_mnesia()}.
+ cluster_status_from_mnesia()}.
node_type() ->
{_AllNodes, DiscNodes, _RunningNodes} =
@@ -460,10 +460,11 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
%% about the cluster
case NodeType of
ram -> start_mnesia(),
- change_extra_db_nodes(ClusterNodes, false),
- rabbit_table:wait_for_replicated();
+ change_extra_db_nodes(ClusterNodes, false);
disc -> ok
end,
+ %% ...and all nodes will need to wait for tables
+ rabbit_table:wait_for_replicated(),
ok.
init_db_with_mnesia(ClusterNodes, NodeType,
@@ -570,16 +571,16 @@ check_cluster_consistency(Node) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
{error, not_found};
- {_OTP, _Rabbit, _Hash, {error, _}} ->
+ {_OTP, _Rabbit, {error, _}} ->
{error, not_found};
- {_OTP, Rabbit, _Status} ->
- %% pre-2013/04 format implies version mismatch
- version_error("Rabbit", rabbit_misc:version(), Rabbit);
- {OTP, Rabbit, Hash, {ok, Status}} ->
- case check_consistency(OTP, Rabbit, Hash, Node, Status) of
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
- end
+ end;
+ {_OTP, Rabbit, _Hash, _Status} ->
+ %% delegate hash checking implies version mismatch
+ version_error("Rabbit", rabbit_misc:version(), Rabbit)
end.
%%--------------------------------------------------------------------
@@ -743,17 +744,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-check_consistency(OTP, Rabbit, Hash) ->
+check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
- check_rabbit_consistency(Rabbit),
- check_beam_compatibility(Hash)]).
+ check_rabbit_consistency(Rabbit)]).
-check_consistency(OTP, Rabbit, Hash, Node, Status) ->
+check_consistency(OTP, Rabbit, Node, Status) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
check_rabbit_consistency(Rabbit),
- check_beam_compatibility(Hash),
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
@@ -784,31 +783,11 @@ version_error(Name, This, Remote) ->
check_otp_consistency(Remote) ->
check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
-%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be
-%% removed after 3.1.0 is released.
-check_rabbit_consistency("3.0.0") ->
- version_error("Rabbit", rabbit_misc:version(), "3.0.0");
-
check_rabbit_consistency(Remote) ->
check_version_consistency(
rabbit_misc:version(), Remote, "Rabbit",
fun rabbit_misc:version_minor_equivalent/2).
-check_beam_compatibility(RemoteHash) ->
- case RemoteHash == delegate_beam_hash() of
- true -> ok;
- false -> {error, {incompatible_bytecode,
- "Incompatible Erlang bytecode found on nodes"}}
- end.
-
-%% The delegate module sends functions across the cluster; if it is
-%% out of sync (say due to mixed compilers), we will get badfun
-%% exceptions when trying to do so. Let's detect that at startup.
-delegate_beam_hash() ->
- {delegate, Obj, _} = code:get_object_code(delegate),
- {ok, {delegate, Hash}} = beam_lib:md5(Obj),
- Hash.
-
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
%% mnesia tables aren't there because restarted RAM nodes won't have
@@ -834,12 +813,13 @@ find_good_node([]) ->
none;
find_good_node([Node | Nodes]) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
- {_OTP, _Rabbit, _} -> find_good_node(Nodes);
- {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ %% old delegate hash check
+ {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 46cfabe3..91be4dcb 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -149,14 +149,22 @@ ensure_ssl() ->
ok = app_utils:start_applications(SslAppsConfig),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
- % unknown_ca errors are silently ignored prior to R14B unless we
- % supply this verify_fun - remove when at least R14B is required
- case proplists:get_value(verify, SslOptsConfig, verify_none) of
- verify_none -> SslOptsConfig;
- verify_peer -> [{verify_fun, fun([]) -> true;
- ([_|_]) -> false
- end}
- | SslOptsConfig]
+ case rabbit_misc:pget(verify_fun, SslOptsConfig) of
+ {Module, Function} ->
+ rabbit_misc:pset(verify_fun,
+ fun (ErrorList) ->
+ Module:Function(ErrorList)
+ end, SslOptsConfig);
+ undefined ->
+ % unknown_ca errors are silently ignored prior to R14B unless we
+ % supply this verify_fun - remove when at least R14B is required
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end
end.
ssl_transform_fun(SslOpts) ->
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 6355f935..948d2ab0 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -121,7 +121,6 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
fmt_missing("dependencies", MissingDeps)})
end,
write_enabled_plugins(PluginsFile, NewEnabled),
- maybe_warn_mochiweb(NewImplicitlyEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
@@ -263,25 +262,6 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-maybe_warn_mochiweb(Enabled) ->
- V = erlang:system_info(otp_release),
- case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
- true ->
- Stars = string:copies("*", 80),
- io:format("~n~n~s~n"
- " Warning: Mochiweb enabled and Erlang version ~s "
- "detected.~n"
- " Enabling plugins that depend on Mochiweb is not "
- "supported on this Erlang~n"
- " version. At least R13B01 is required.~n~n"
- " RabbitMQ will not start successfully in this "
- "configuration. You *must*~n"
- " disable the Mochiweb plugin, or upgrade Erlang.~n"
- "~s~n~n~n", [Stars, V, Stars]);
- false ->
- ok
- end.
-
report_change() ->
io:format("Plugin configuration has changed. "
"Restart RabbitMQ for changes to take effect.~n").
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
new file mode 100644
index 00000000..c4a37e7a
--- /dev/null
+++ b/src/rabbit_policies.erl
@@ -0,0 +1,81 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_policies).
+-behaviour(rabbit_policy_validator).
+
+-include("rabbit.hrl").
+
+-export([register/0, validate_policy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "internal policies"},
+ {mfa, {rabbit_policies, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ [rabbit_registry:register(Class, Name, ?MODULE) ||
+ {Class, Name} <- [{policy_validator, <<"alternate-exchange">>},
+ {policy_validator, <<"dead-letter-exchange">>},
+ {policy_validator, <<"dead-letter-routing-key">>},
+ {policy_validator, <<"message-ttl">>},
+ {policy_validator, <<"expires">>},
+ {policy_validator, <<"max-length">>}]],
+ ok.
+
+validate_policy(Terms) ->
+ lists:foldl(fun ({Key, Value}, ok) -> validate_policy0(Key, Value);
+ (_, Error) -> Error
+ end, ok, Terms).
+
+validate_policy0(<<"alternate-exchange">>, Value)
+ when is_binary(Value) ->
+ ok;
+validate_policy0(<<"alternate-exchange">>, Value) ->
+ {error, "~p is not a valid alternate exchange name", [Value]};
+
+validate_policy0(<<"dead-letter-exchange">>, Value)
+ when is_binary(Value) ->
+ ok;
+validate_policy0(<<"dead-letter-exchange">>, Value) ->
+ {error, "~p is not a valid dead letter exchange name", [Value]};
+
+validate_policy0(<<"dead-letter-routing-key">>, Value)
+ when is_binary(Value) ->
+ ok;
+validate_policy0(<<"dead-letter-routing-key">>, Value) ->
+ {error, "~p is not a valid dead letter routing key", [Value]};
+
+validate_policy0(<<"message-ttl">>, Value)
+ when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ ok;
+validate_policy0(<<"message-ttl">>, Value) ->
+ {error, "~p is not a valid message TTL", [Value]};
+
+validate_policy0(<<"expires">>, Value)
+ when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ ok;
+validate_policy0(<<"expires">>, Value) ->
+ {error, "~p is not a valid queue expiry", [Value]};
+
+validate_policy0(<<"max-length">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"max-length">>, Value) ->
+ {error, "~p is not a valid maximum length", [Value]}.
+
+
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 924737f6..db6d042b 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -25,9 +25,10 @@
-import(rabbit_misc, [pget/2]).
-export([register/0]).
--export([name/1, get/2, set/1]).
+-export([invalidate/0, recover/0]).
+-export([name/1, get/2, get_arg/3, set/1]).
-export([validate/4, notify/4, notify_clear/3]).
--export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
+-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
-rabbit_boot_step({?MODULE,
@@ -45,55 +46,106 @@ name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
name0(Policy) -> pget(name, Policy).
-set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(Q = #amqqueue{name = Name}) -> rabbit_queue_decorator:set(
+ Q#amqqueue{policy = set0(Name)});
set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
X#exchange{policy = set0(Name)}).
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
+set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
+set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Ps)}).
+
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
get(Name, EntityName = #resource{virtual_host = VHost}) ->
get0(Name, match(EntityName, list(VHost))).
-get0(_Name, undefined) -> {error, not_found};
+get0(_Name, undefined) -> undefined;
get0(Name, List) -> case pget(definition, List) of
- undefined -> {error, not_found};
- Policy -> case pget(Name, Policy) of
- undefined -> {error, not_found};
- Value -> {ok, Value}
- end
+ undefined -> undefined;
+ Policy -> pget(Name, Policy)
end.
+%% Many heads for optimisation
+get_arg(_AName, _PName, #exchange{arguments = [], policy = undefined}) ->
+ undefined;
+get_arg(_AName, PName, X = #exchange{arguments = []}) ->
+ get(PName, X);
+get_arg(AName, PName, X = #exchange{arguments = Args}) ->
+ case rabbit_misc:table_lookup(Args, AName) of
+ undefined -> get(PName, X);
+ {_Type, Arg} -> Arg
+ end.
+
+%%----------------------------------------------------------------------------
+
+%% Gets called during upgrades - therefore must not assume anything about the
+%% state of Mnesia
+invalidate() ->
+ rabbit_file:write_file(invalid_file(), <<"">>).
+
+recover() ->
+ case rabbit_file:is_file(invalid_file()) of
+ true -> recover0(),
+ rabbit_file:delete(invalid_file());
+ false -> ok
+ end.
+
+%% To get here we have to have just completed an Mnesia upgrade - i.e. we are
+%% the first node starting. So we can rewrite the whole database. Note that
+%% recovery has not yet happened; we must work with the rabbit_durable_<thing>
+%% variants.
+recover0() ->
+ Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
+ Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
+ Policies = list(),
+ [rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
+ end) || X <- Xs],
+ [rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
+ end) || Q <- Qs],
+ ok.
+
+invalid_file() ->
+ filename:join(rabbit_mnesia:dir(), "policies_are_invalid").
+
%%----------------------------------------------------------------------------
-parse_set(VHost, Name, Pattern, Definition, undefined) ->
- parse_set0(VHost, Name, Pattern, Definition, 0);
-parse_set(VHost, Name, Pattern, Definition, Priority) ->
+parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
try list_to_integer(Priority) of
- Num -> parse_set0(VHost, Name, Pattern, Definition, Num)
+ Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo)
catch
error:badarg -> {error, "~p priority must be a number", [Priority]}
end.
-parse_set0(VHost, Name, Pattern, Defn, Priority) ->
+parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
case rabbit_misc:json_decode(Defn) of
{ok, JSON} ->
set0(VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
{<<"definition">>, rabbit_misc:json_to_term(JSON)},
- {<<"priority">>, Priority}]);
+ {<<"priority">>, Priority},
+ {<<"apply-to">>, ApplyTo}]);
error ->
{error_string, "JSON decoding error"}
end.
-set(VHost, Name, Pattern, Definition, Priority) ->
+set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition},
{<<"priority">>, case Priority of
undefined -> 0;
_ -> Priority
+ end},
+ {<<"apply-to">>, case ApplyTo of
+ undefined -> <<"all">>;
+ _ -> ApplyTo
end}],
set0(VHost, Name, PolicyProps).
@@ -130,6 +182,7 @@ p(Parameter, DefnFun) ->
[{vhost, pget(vhost, Parameter)},
{name, pget(name, Parameter)},
{pattern, pget(<<"pattern">>, Value)},
+ {'apply-to', pget(<<"apply-to">>, Value)},
{definition, DefnFun(pget(<<"definition">>, Value))},
{priority, pget(<<"priority">>, Value)}].
@@ -139,7 +192,7 @@ format(Term) ->
ident(X) -> X.
-info_keys() -> [vhost, name, pattern, definition, priority].
+info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
%%----------------------------------------------------------------------------
@@ -211,9 +264,14 @@ update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
case match(QName, Policies) of
OldPolicy -> no_change;
- NewPolicy -> rabbit_amqqueue:update(
- QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
- {Q, Q#amqqueue{policy = NewPolicy}}
+ NewPolicy -> case rabbit_amqqueue:update(
+ QName, fun(Q1) ->
+ rabbit_queue_decorator:set(
+ Q1#amqqueue{policy = NewPolicy})
+ end) of
+ #amqqueue{} = Q1 -> {Q, Q1};
+ not_found -> {Q, Q }
+ end
end.
notify(no_change)->
@@ -229,8 +287,16 @@ match(Name, Policies) ->
[Policy | _Rest] -> Policy
end.
-matches(#resource{name = Name}, Policy) ->
- match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]).
+matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
+ matches_type(Kind, pget('apply-to', Policy)) andalso
+ match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
+ VHost =:= pget(vhost, Policy).
+
+matches_type(exchange, <<"exchanges">>) -> true;
+matches_type(queue, <<"queues">>) -> true;
+matches_type(exchange, <<"all">>) -> true;
+matches_type(queue, <<"all">>) -> true;
+matches_type(_, _) -> false.
sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
@@ -239,6 +305,7 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
policy_validation() ->
[{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
{<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"apply-to">>, fun apply_to_validation/2, optional},
{<<"definition">>, fun validation/2, mandatory}].
validation(_Name, []) ->
@@ -284,3 +351,10 @@ a2b(A) -> list_to_binary(atom_to_list(A)).
dups(L) -> L -- lists:usort(L).
is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
+
+apply_to_validation(_Name, <<"all">>) -> ok;
+apply_to_validation(_Name, <<"exchanges">>) -> ok;
+apply_to_validation(_Name, <<"queues">>) -> ok;
+apply_to_validation(_Name, Term) ->
+ {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' "
+ "or 'all'", [Term]}.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
new file mode 100644
index 00000000..8f6375a5
--- /dev/null
+++ b/src/rabbit_queue_decorator.erl
@@ -0,0 +1,48 @@
+-module(rabbit_queue_decorator).
+
+-include("rabbit.hrl").
+
+-export([select/1, set/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(notify_event() :: 'consumer_blocked' |
+ 'consumer_unblocked' |
+ 'queue_empty' |
+ 'basic_consume' |
+ 'basic_cancel' |
+ 'refresh').
+
+-callback startup(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
+
+-callback policy_changed(rabbit_types:amqqueue(), rabbit_types:amqqueue()) ->
+ 'ok'.
+
+-callback active_for(rabbit_types:amqqueue()) -> boolean().
+
+-callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
+ {active_for, 1}, {notify, 3}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+select(Modules) ->
+ [M || M <- Modules, code:which(M) =/= non_existing].
+
+set(Q) -> Q#amqqueue{decorators = [D || D <- list(), D:active_for(Q)]}.
+
+list() -> [M || {_, M} <- rabbit_registry:lookup_all(queue_decorator)].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0908fb73..f69d8355 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,7 +23,7 @@
-export([scan/3]).
--export([add_queue_ttl/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -123,9 +123,9 @@
-define(REL_SEQ_BITS, 14).
-define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
-%% seq only is binary 00 followed by 14 bits of rel seq id
+%% seq only is binary 01 followed by 14 bits of rel seq id
%% (range: 0 - 16383)
--define(REL_SEQ_ONLY_PREFIX, 00).
+-define(REL_SEQ_ONLY_PREFIX, 01).
-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
@@ -171,6 +171,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-ifdef(use_specs).
@@ -715,7 +716,11 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- {ok, Bin} ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
+ State;
+ {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
{MsgId, MsgProps} = parse_pub_record_body(Bin),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
@@ -1068,6 +1073,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
add_queue_ttl_segment(_) ->
stop.
+avoid_zeroes() ->
+ foreach_queue_index({none, fun avoid_zeroes_segment/1}).
+
+avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest};
+avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+avoid_zeroes_segment(_) ->
+ stop.
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
@@ -1092,7 +1112,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
|| Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
-transform_file(Path, Fun) ->
+transform_file(_Path, none) ->
+ ok;
+transform_file(Path, Fun) when is_function(Fun)->
PathTmp = Path ++ ".upgrade",
case rabbit_file:file_size(Path) of
0 -> ok;
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9b6039d1..e00732fd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,12 +18,12 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1,
+-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/1,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -36,16 +36,16 @@
%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
- connection_state, queue_collector, heartbeater, stats_timer,
- ch_sup3_pid, channel_sup_sup_pid, start_heartbeat_fun,
- buf, buf_len, throttle}).
+ connection_state, helper_sup, queue_collector, heartbeater,
+ stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
--record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
+-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
+ blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -73,8 +73,7 @@
-ifdef(use_specs).
--spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) ->
- rabbit_types:ok(pid())).
+-spec(start_link/1 :: (pid()) -> rabbit_types:ok(pid())).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
@@ -85,11 +84,9 @@
rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
--spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
- -> no_return()).
--spec(start_connection/7 ::
- (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(),
- rabbit_net:socket(),
+-spec(init/2 :: (pid(), pid()) -> no_return()).
+-spec(start_connection/5 ::
+ (pid(), pid(), any(), rabbit_net:socket(),
fun ((rabbit_net:socket()) ->
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
@@ -103,24 +100,21 @@
%%--------------------------------------------------------------------------
-start_link(ChannelSup3Pid, Collector, StartHeartbeatFun) ->
- {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSup3Pid,
- Collector, StartHeartbeatFun])}.
+start_link(HelperSup) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [self(), HelperSup])}.
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
+init(Parent, HelperSup) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
- start_connection(
- Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, Sock,
- SockTransform)
+ start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
+ mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -142,8 +136,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, _Source, Conserve) ->
- Pid ! {conserve_resources, Conserve},
+conserve_resources(Pid, Source, Conserve) ->
+ Pid ! {conserve_resources, Source, Conserve},
ok.
server_properties(Protocol) ->
@@ -175,10 +169,13 @@ server_properties(Protocol) ->
NormalizedConfigServerProps).
server_capabilities(rabbit_framing_amqp_0_9_1) ->
- [{<<"publisher_confirms">>, bool, true},
- {<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true},
- {<<"consumer_cancel_notify">>, bool, true}];
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true},
+ {<<"connection.blocked">>, bool, true},
+ {<<"consumer_priorities">>, bool, true},
+ {<<"authentication_failure_close">>, bool, true}];
server_capabilities(_) ->
[].
@@ -201,8 +198,7 @@ socket_op(Sock, Fun) ->
exit(normal)
end.
-start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
- Sock, SockTransform) ->
+start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Str} -> Str;
@@ -238,17 +234,17 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
- queue_collector = Collector,
+ queue_collector = undefined, %% started on tune-ok
+ helper_sup = HelperSup,
heartbeater = none,
- ch_sup3_pid = ChSup3Pid,
channel_sup_sup_pid = none,
- start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
throttle = #throttle{
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never}},
+ alarmed_by = [],
+ last_blocked_by = none,
+ last_blocked_at = never,
+ blocked_sent = false}},
try
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -321,9 +317,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
-handle_other({conserve_resources, Conserve},
- State = #v1{throttle = Throttle}) ->
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle =
+ #throttle{alarmed_by = CR}}) ->
+ CR1 = case Conserve of
+ true -> lists:usort([Source | CR]);
+ false -> CR -- [Source]
+ end,
+ Throttle1 = Throttle#throttle{alarmed_by = CR1},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
@@ -409,30 +410,61 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, (Throttle#throttle.conserve_resources orelse
- credit_flow:blocked())} of
+ IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- State#v1{connection_state = running};
+ maybe_send_unblocked(State),
+ State#v1{connection_state = running,
+ throttle = Throttle#throttle{
+ blocked_sent = false}};
{blocked, true} -> State#v1{throttle = update_last_blocked_by(
Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
+maybe_block(State = #v1{connection_state = blocking,
+ throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ Sent = maybe_send_blocked(State),
State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now()})};
+ Throttle#throttle{last_blocked_at = erlang:now(),
+ blocked_sent = Sent})};
maybe_block(State) ->
State.
-update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
- Throttle#throttle{last_blocked_by = resource};
-update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
- Throttle#throttle{last_blocked_by = flow}.
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) ->
+ false;
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{
+ protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ RStr = string:join([atom_to_list(A) || A <- CR], " & "),
+ Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol),
+ true;
+ _ ->
+ false
+ end.
+
+maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
+ ok;
+maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+
+update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
+ Throttle#throttle{last_blocked_by = flow};
+update_last_blocked_by(Throttle) ->
+ Throttle#throttle{last_blocked_by = resource}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -810,8 +842,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
- sock = Sock,
- start_heartbeat_fun = SHF}) ->
+ helper_sup = SupPid,
+ sock = Sock}) ->
ServerFrameMax = server_frame_max(),
if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
rabbit_misc:protocol_error(
@@ -822,16 +854,20 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ServerFrameMax]);
true ->
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
- ClientHeartbeat, ReceiveFun),
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
timeout_sec = ClientHeartbeat,
frame_max = FrameMax},
+ queue_collector = Collector,
heartbeater = Heartbeater}
end;
@@ -840,19 +876,16 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- ch_sup3_pid = ChSup3Pid,
+ helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ Throttle1 = Throttle#throttle{alarmed_by = Conserve},
{ok, ChannelSupSupPid} =
- supervisor2:start_child(
- ChSup3Pid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ rabbit_connection_helper_sup:start_channel_sup_sup(SupPid),
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
@@ -925,14 +958,29 @@ auth_mechanisms_binary(Sock) ->
auth_phase(Response,
State = #v1{connection = Connection =
#connection{protocol = Protocol,
+ capabilities = Capabilities,
auth_mechanism = {Name, AuthMechanism},
auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
- rabbit_misc:protocol_error(
- access_refused, "~s login refused: ~s",
- [Name, io_lib:format(Msg, Args)]);
+ AmqpError = rabbit_misc:amqp_error(
+ access_refused, "~s login refused: ~s",
+ [Name, io_lib:format(Msg, Args)], none),
+ case rabbit_misc:table_lookup(Capabilities,
+ <<"authentication_failure_close">>) of
+ {bool, true} ->
+ SafeMsg = io_lib:format(
+ "Login was refused using authentication "
+ "mechanism ~s. For details see the broker "
+ "logfile.", [Name]),
+ AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
+ {0, CloseMethod} = rabbit_binary_generator:map_exception(
+ 0, AmqpError1, Protocol),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
+ _ -> ok
+ end,
+ rabbit_misc:protocol_error(AmqpError);
{protocol_error, Msg, Args} ->
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
@@ -1050,10 +1098,7 @@ pack_for_1_0(#v1{parent = Parent,
sock = Sock,
recv_len = RecvLen,
pending_recv = PendingRecv,
- queue_collector = QueueCollector,
- ch_sup3_pid = ChSup3Pid,
- start_heartbeat_fun = SHF,
+ helper_sup = SupPid,
buf = Buf,
buf_len = BufLen}) ->
- {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ChSup3Pid, SHF,
- Buf, BufLen}.
+ {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index f933e4e9..3014aeb7 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -130,6 +130,7 @@ class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
class_module(policy_validator) -> rabbit_policy_validator;
class_module(ha_mode) -> rabbit_mirror_queue_mode.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 30cf9114..76421d1a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -60,6 +60,7 @@ all_tests() ->
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_policy_validation(),
+ passed = test_policy_opts_validation(),
passed = test_ha_policy_validation(),
passed = test_server_status(),
passed = test_amqp_connection_refusal(),
@@ -1083,29 +1084,57 @@ test_runtime_parameters() ->
test_policy_validation() ->
rabbit_runtime_parameters_test:register_policy_validator(),
- SetPol =
- fun (Key, Val) ->
- control_action(
- set_policy,
- ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
- end,
+ SetPol = fun (Key, Val) ->
+ control_action_opts(
+ ["set_policy", "name", ".*",
+ rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
+ end,
- ok = SetPol("testeven", []),
- ok = SetPol("testeven", [1, 2]),
- ok = SetPol("testeven", [1, 2, 3, 4]),
- ok = SetPol("testpos", [2, 5, 5678]),
+ ok = SetPol("testeven", []),
+ ok = SetPol("testeven", [1, 2]),
+ ok = SetPol("testeven", [1, 2, 3, 4]),
+ ok = SetPol("testpos", [2, 5, 5678]),
- {error_string, _} = SetPol("testpos", [-1, 0, 1]),
- {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+ error = SetPol("testpos", [-1, 0, 1]),
+ error = SetPol("testeven", [ 1, 2, 3]),
ok = control_action(clear_policy, ["name"]),
rabbit_runtime_parameters_test:unregister_policy_validator(),
passed.
+test_policy_opts_validation() ->
+ Set = fun (Extra) -> control_action_opts(
+ ["set_policy", "name", ".*", "{\"ha-mode\":\"all\"}"
+ | Extra]) end,
+ OK = fun (Extra) -> ok = Set(Extra) end,
+ Fail = fun (Extra) -> error = Set(Extra) end,
+
+ OK ([]),
+
+ OK (["--priority", "0"]),
+ OK (["--priority", "3"]),
+ Fail(["--priority", "banana"]),
+ Fail(["--priority"]),
+
+ OK (["--apply-to", "all"]),
+ OK (["--apply-to", "queues"]),
+ Fail(["--apply-to", "bananas"]),
+ Fail(["--apply-to"]),
+
+ OK (["--priority", "3", "--apply-to", "queues"]),
+ Fail(["--priority", "banana", "--apply-to", "queues"]),
+ Fail(["--priority", "3", "--apply-to", "bananas"]),
+
+ Fail(["--offline"]),
+
+ ok = control_action(clear_policy, ["name"]),
+ passed.
+
test_ha_policy_validation() ->
- Set = fun (JSON) -> control_action(set_policy, ["name", ".*", JSON]) end,
+ Set = fun (JSON) -> control_action_opts(
+ ["set_policy", "name", ".*", JSON]) end,
OK = fun (JSON) -> ok = Set(JSON) end,
- Fail = fun (JSON) -> {error_string, _} = Set(JSON) end,
+ Fail = fun (JSON) -> error = Set(JSON) end,
OK ("{\"ha-mode\":\"all\"}"),
Fail("{\"ha-mode\":\"made_up\"}"),
@@ -1139,7 +1168,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, none, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1611,6 +1640,18 @@ control_action(Command, Node, Args, Opts) ->
Other
end.
+control_action_opts(Raw) ->
+ NodeStr = atom_to_list(node()),
+ case rabbit_control_main:parse_arguments(Raw, NodeStr) of
+ {ok, {Cmd, Opts, Args}} ->
+ case control_action(Cmd, node(), Args, Opts) of
+ ok -> ok;
+ _ -> error
+ end;
+ _ ->
+ error
+ end.
+
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1613838c..6f95ef60 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -44,6 +44,8 @@
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
+-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
+-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
%% -------------------------------------------------------------------
@@ -70,6 +72,8 @@
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
+-spec(policy_apply_to/0 :: () -> 'ok').
+-spec(queue_decorators/0 :: () -> 'ok').
-endif.
@@ -299,6 +303,42 @@ exchange_decorators(Table) ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+policy_apply_to() ->
+ transform(
+ rabbit_runtime_parameters,
+ fun ({runtime_parameters, Key = {_VHost, <<"policy">>, _Name}, Value}) ->
+ ApplyTo = apply_to(proplists:get_value(<<"definition">>, Value)),
+ {runtime_parameters, Key, [{<<"apply-to">>, ApplyTo} | Value]};
+ ({runtime_parameters, Key, Value}) ->
+ {runtime_parameters, Key, Value}
+ end,
+ [key, value]),
+ rabbit_policy:invalidate(),
+ ok.
+
+apply_to(Def) ->
+ case [proplists:get_value(K, Def) ||
+ K <- [<<"federation-upstream-set">>, <<"ha-mode">>]] of
+ [undefined, undefined] -> <<"all">>;
+ [_, undefined] -> <<"exchanges">>;
+ [undefined, _] -> <<"queues">>;
+ [_, _] -> <<"all">>
+ end.
+
+queue_decorators() ->
+ ok = queue_decorators(rabbit_queue),
+ ok = queue_decorators(rabbit_durable_queue).
+
+queue_decorators(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, Policy, GmPids, []}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, policy, gm_pids, decorators]).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index c0b1f8e4..34dd3d3b 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -19,14 +19,18 @@
-include("rabbit_framing.hrl").
-export([start/5, start_link/5, start/6, start_link/6]).
+
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5,
+ send_command_flow/2, send_command_flow/3,
flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/1, mainloop1/1]).
+-export([mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -53,6 +57,11 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
-> rabbit_types:ok(pid())).
+
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,#wstate{}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -70,6 +79,11 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(send_command_flow/2 ::
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command_flow/3 ::
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
@@ -94,12 +108,14 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -113,32 +129,54 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
pending = []},
#wstate.stats_timer).
-mainloop(State) ->
+system_continue(Parent, Deb, State) ->
+ mainloop(Deb, State#wstate{reader = Parent}).
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
+mainloop(Deb, State) ->
try
- mainloop1(State)
+ mainloop1(Deb, State)
catch
exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(State = #wstate{pending = []}) ->
+mainloop1(Deb, State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ Message -> {Deb1, State1} = handle_message(Deb, Message, State),
+ ?MODULE:mainloop1(Deb1, State1)
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [Deb, State])
end;
-mainloop1(State) ->
+mainloop1(Deb, State) ->
receive
- Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ Message -> {Deb1, State1} = handle_message(Deb, Message, State),
+ ?MODULE:mainloop1(Deb1, State1)
after 0 ->
- ?MODULE:mainloop1(internal_flush(State))
+ ?MODULE:mainloop1(Deb, internal_flush(State))
end.
+handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State);
+handle_message(Deb, Message, State) ->
+ {Deb, handle_message(Message, State)}.
+
handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
+handle_message({send_command_flow, MethodRecord, Sender}, State) ->
+ credit_flow:ack(Sender),
+ internal_send_command_async(MethodRecord, State);
+handle_message({send_command_flow, MethodRecord, Content, Sender}, State) ->
+ credit_flow:ack(Sender),
+ internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
@@ -186,6 +224,16 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
+send_command_flow(W, MethodRecord) ->
+ credit_flow:send(W),
+ W ! {send_command_flow, MethodRecord, self()},
+ ok.
+
+send_command_flow(W, MethodRecord, Content) ->
+ credit_flow:send(W),
+ W ! {send_command_flow, MethodRecord, Content, self()},
+ ok.
+
send_command_sync(W, MethodRecord) ->
call(W, {send_command_sync, MethodRecord}).
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 23bfe7f1..db4c388a 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -1,14 +1,18 @@
-%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP
+%% This file is a copy of supervisor.erl from the R16B Erlang/OTP
%% distribution, with the following modifications:
%%
%% 1) the module name is supervisor2
%%
-%% 2) there is a new strategy called
-%% simple_one_for_one_terminate. This is exactly the same as for
-%% simple_one_for_one, except that children *are* explicitly
-%% terminated as per the shutdown component of the child_spec.
+%% 2) a find_child/2 utility function has been added
%%
-%% 3) child specifications can contain, as the restart type, a tuple
+%% 3) Added an 'intrinsic' restart type. Like the transient type, this
+%% type means the child should only be restarted if the child exits
+%% abnormally. Unlike the transient type, if the child exits
+%% normally, the supervisor itself also exits normally. If the
+%% child is a supervisor and it exits normally (i.e. with reason of
+%% 'shutdown') then the child's parent also exits normally.
+%%
+%% 4) child specifications can contain, as the restart type, a tuple
%% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay}
%% where Delay >= 0 (see point (4) below for intrinsic). The delay,
%% in seconds, indicates what should happen if a child, upon being
@@ -41,13 +45,6 @@
%% perspective it's a normal exit, whilst from supervisor's
%% perspective, it's an abnormal exit.
%%
-%% 4) Added an 'intrinsic' restart type. Like the transient type, this
-%% type means the child should only be restarted if the child exits
-%% abnormally. Unlike the transient type, if the child exits
-%% normally, the supervisor itself also exits normally. If the
-%% child is a supervisor and it exits normally (i.e. with reason of
-%% 'shutdown') then the child's parent also exits normally.
-%%
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
@@ -55,7 +52,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -75,61 +72,34 @@
-behaviour(gen_server).
%% External exports
--export([start_link/2,start_link/3,
+-export([start_link/2, start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1, find_child/2,
- check_childspecs/1]).
+ which_children/1, count_children/1,
+ find_child/2, check_childspecs/1]).
%% Internal exports
--export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
--export([handle_cast/2]).
-
--define(DICT, dict).
-
--record(state, {name,
- strategy,
- children = [],
- dynamics = ?DICT:new(),
- intensity,
- period,
- restarts = [],
- module,
- args}).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([try_again_restart/3]).
--record(child, {pid = undefined, % pid is undefined when child is not running
- name,
- mfa,
- restart_type,
- shutdown,
- child_type,
- modules = []}).
-
--define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse
- State#state.strategy =:= simple_one_for_one_terminate).
--define(is_terminate_simple(State),
- State#state.strategy =:= simple_one_for_one_terminate).
-
--ifdef(use_specs).
-
-%%--------------------------------------------------------------------------
-%% Types
%%--------------------------------------------------------------------------
-
+-ifdef(use_specs).
-export_type([child_spec/0, startchild_ret/0, strategy/0, sup_name/0]).
+-endif.
+%%--------------------------------------------------------------------------
--type child() :: 'undefined' | pid().
+-ifdef(use_specs).
+-type child() :: 'undefined' | pid().
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
--type modules() :: [module()] | 'dynamic'.
--type delay() :: non_neg_integer().
--type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic'
- | {'permanent', delay()} | {'transient', delay()}
- | {'intrinsic', delay()}.
+-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
+-type modules() :: [module()] | 'dynamic'.
+-type delay() :: non_neg_integer().
+-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' | {'permanent', delay()} | {'transient', delay()} | {'intrinsic', delay()}.
-type shutdown() :: 'brutal_kill' | timeout().
--type worker() :: 'worker' | 'supervisor'.
+-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
--type sup_ref() :: (Name :: atom())
+-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
@@ -140,40 +110,110 @@
Type :: worker(),
Modules :: modules()}.
-
-type strategy() :: 'one_for_all' | 'one_for_one'
- | 'rest_for_one' | 'simple_one_for_one'
- | 'simple_one_for_one_terminate'.
-
--type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()],
- name :: child_id(),
- mfa :: mfargs(),
- restart_type :: restart(),
- shutdown :: shutdown(),
- child_type :: worker(),
- modules :: modules()}.
-
--type state() :: #state{strategy :: strategy(),
- children :: [child_rec()],
- dynamics :: ?DICT(),
- intensity :: non_neg_integer(),
- period :: pos_integer()}.
+ | 'rest_for_one' | 'simple_one_for_one'.
+-endif.
%%--------------------------------------------------------------------------
-%% Callback behaviour
-%%--------------------------------------------------------------------------
+-ifdef(use_specs).
+-record(child, {% pid is undefined when child is not running
+ pid = undefined :: child() | {restarting,pid()} | [pid()],
+ name :: child_id(),
+ mfargs :: mfargs(),
+ restart_type :: restart(),
+ shutdown :: shutdown(),
+ child_type :: worker(),
+ modules = [] :: modules()}).
+-type child_rec() :: #child{}.
+-else.
+-record(child, {
+ pid = undefined,
+ name,
+ mfargs,
+ restart_type,
+ shutdown,
+ child_type,
+ modules = []}).
+-endif.
+
+-define(DICT, dict).
+-define(SETS, sets).
+-define(SET, set).
+
+-ifdef(use_specs).
+-record(state, {name,
+ strategy :: strategy(),
+ children = [] :: [child_rec()],
+ dynamics :: ?DICT() | ?SET(),
+ intensity :: non_neg_integer(),
+ period :: pos_integer(),
+ restarts = [],
+ module,
+ args}).
+-type state() :: #state{}.
+-else.
+-record(state, {name,
+ strategy,
+ children = [],
+ dynamics,
+ intensity,
+ period,
+ restarts = [],
+ module,
+ args}).
+-endif.
+
+-define(is_simple(State), State#state.strategy =:= simple_one_for_one).
+-define(is_permanent(R), ((R =:= permanent) orelse
+ (is_tuple(R) andalso
+ tuple_size(R) == 2 andalso
+ element(1, R) =:= permanent))).
+-define(is_explicit_restart(R),
+ R == {shutdown, restart}).
+
+-ifdef(use_specs).
-callback init(Args :: term()) ->
{ok, {{RestartStrategy :: strategy(),
- MaxR :: non_neg_integer(),
- MaxT :: non_neg_integer()},
+ MaxR :: non_neg_integer(),
+ MaxT :: non_neg_integer()},
[ChildSpec :: child_spec()]}}
| ignore.
+-endif.
+-define(restarting(_Pid_), {restarting,_Pid_}).
-%%--------------------------------------------------------------------------
-%% Specs
-%%--------------------------------------------------------------------------
+%%% ---------------------------------------------------
+%%% This is a general process supervisor built upon gen_server.erl.
+%%% Servers/processes should/could also be built using gen_server.erl.
+%%% SupName = {local, atom()} | {global, atom()}.
+%%% ---------------------------------------------------
+-ifdef(use_specs).
+-type startlink_err() :: {'already_started', pid()}
+ | {'shutdown', term()}
+ | term().
+-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
+
+-spec start_link(Module, Args) -> startlink_ret() when
+ Module :: module(),
+ Args :: term().
+-endif.
+start_link(Mod, Args) ->
+ gen_server:start_link(?MODULE, {self, Mod, Args}, []).
+
+-ifdef(use_specs).
+-spec start_link(SupName, Module, Args) -> startlink_ret() when
+ SupName :: sup_name(),
+ Module :: module(),
+ Args :: term().
+-endif.
+start_link(SupName, Mod, Args) ->
+ gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
+
+%%% ---------------------------------------------------
+%%% Interface functions.
+%%% ---------------------------------------------------
+-ifdef(use_specs).
-type startchild_err() :: 'already_present'
| {'already_started', Child :: child()} | term().
-type startchild_ret() :: {'ok', Child :: child()}
@@ -183,91 +223,30 @@
-spec start_child(SupRef, ChildSpec) -> startchild_ret() when
SupRef :: sup_ref(),
ChildSpec :: child_spec() | (List :: [term()]).
+-endif.
+start_child(Supervisor, ChildSpec) ->
+ call(Supervisor, {start_child, ChildSpec}).
+-ifdef(use_specs).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
Id :: child_id(),
Result :: {'ok', Child :: child()}
| {'ok', Child :: child(), Info :: term()}
| {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one' | term().
+ Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one' |
+ term().
+-endif.
+restart_child(Supervisor, Name) ->
+ call(Supervisor, {restart_child, Name}).
+-ifdef(use_specs).
-spec delete_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
Id :: child_id(),
Result :: 'ok' | {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one'.
-
--spec terminate_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: pid() | child_id(),
- Result :: 'ok' | {'error', Error},
- Error :: 'not_found' | 'simple_one_for_one'.
-
--spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
- SupRef :: sup_ref(),
- Id :: child_id() | 'undefined',
- Child :: child(),
- Type :: worker(),
- Modules :: modules().
-
--spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
- Result :: 'ok' | {'error', Error :: term()}.
-
--type init_sup_name() :: sup_name() | 'self'.
-
--type stop_rsn() :: 'shutdown' | {'bad_return', {module(),'init', term()}}
- | {'bad_start_spec', term()} | {'start_spec', term()}
- | {'supervisor_data', term()}.
-
--spec init({init_sup_name(), module(), [term()]}) ->
- {'ok', state()} | 'ignore' | {'stop', stop_rsn()}.
-
--type call() :: 'which_children'.
--spec handle_call(call(), term(), state()) -> {'reply', term(), state()}.
-
--spec handle_cast('null', state()) -> {'noreply', state()}.
-
--spec handle_info(term(), state()) ->
- {'noreply', state()} | {'stop', 'shutdown', state()}.
-
--spec terminate(term(), state()) -> 'ok'.
-
--spec code_change(term(), state(), term()) ->
- {'ok', state()} | {'error', term()}.
-
--else.
-
--export([behaviour_info/1]).
-
-behaviour_info(callbacks) ->
- [{init,1}];
-behaviour_info(_Other) ->
- undefined.
-
+ Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one'.
-endif.
-
-%%% ---------------------------------------------------
-%%% This is a general process supervisor built upon gen_server.erl.
-%%% Servers/processes should/could also be built using gen_server.erl.
-%%% SupName = {local, atom()} | {global, atom()}.
-%%% ---------------------------------------------------
-start_link(Mod, Args) ->
- gen_server:start_link(?MODULE, {self, Mod, Args}, []).
-
-start_link(SupName, Mod, Args) ->
- gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
-%%% ---------------------------------------------------
-%%% Interface functions.
-%%% ---------------------------------------------------
-start_child(Supervisor, ChildSpec) ->
- call(Supervisor, {start_child, ChildSpec}).
-
-restart_child(Supervisor, Name) ->
- call(Supervisor, {restart_child, Name}).
-
delete_child(Supervisor, Name) ->
call(Supervisor, {delete_child, Name}).
@@ -277,12 +256,44 @@ delete_child(Supervisor, Name) ->
%% Note that the child is *always* terminated in some
%% way (maybe killed).
%%-----------------------------------------------------------------
+-ifdef(use_specs).
+-spec terminate_child(SupRef, Id) -> Result when
+ SupRef :: sup_ref(),
+ Id :: pid() | child_id(),
+ Result :: 'ok' | {'error', Error},
+ Error :: 'not_found' | 'simple_one_for_one'.
+-endif.
terminate_child(Supervisor, Name) ->
call(Supervisor, {terminate_child, Name}).
+-ifdef(use_specs).
+-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
+ SupRef :: sup_ref(),
+ Id :: child_id() | undefined,
+ Child :: child() | 'restarting',
+ Type :: worker(),
+ Modules :: modules().
+-endif.
which_children(Supervisor) ->
call(Supervisor, which_children).
+-ifdef(use_specs).
+-spec count_children(SupRef) -> PropListOfCounts when
+ SupRef :: sup_ref(),
+ PropListOfCounts :: [Count],
+ Count :: {specs, ChildSpecCount :: non_neg_integer()}
+ | {active, ActiveProcessCount :: non_neg_integer()}
+ | {supervisors, ChildSupervisorCount :: non_neg_integer()}
+ |{workers, ChildWorkerCount :: non_neg_integer()}.
+-endif.
+count_children(Supervisor) ->
+ call(Supervisor, count_children).
+
+-ifdef(use_specs).
+-spec find_child(Supervisor, Name) -> [pid()] when
+ Supervisor :: sup_ref(),
+ Name :: child_id().
+-endif.
find_child(Supervisor, Name) ->
[Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
Name1 =:= Name].
@@ -290,6 +301,11 @@ find_child(Supervisor, Name) ->
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
+-ifdef(use_specs).
+-spec check_childspecs(ChildSpecs) -> Result when
+ ChildSpecs :: [child_spec()],
+ Result :: 'ok' | {'error', Error :: term()}.
+-endif.
check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
case check_startspec(ChildSpecs) of
{ok, _} -> ok;
@@ -297,11 +313,37 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.
+%%%-----------------------------------------------------------------
+%%% Called by timer:apply_after from restart/2
+-ifdef(use_specs).
+-spec try_again_restart(SupRef, Child, Reason) -> ok when
+ SupRef :: sup_ref(),
+ Child :: child_id() | pid(),
+ Reason :: term().
+-endif.
+try_again_restart(Supervisor, Child, Reason) ->
+ cast(Supervisor, {try_again_restart, Child, Reason}).
+
+cast(Supervisor, Req) ->
+ gen_server:cast(Supervisor, Req).
+
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
+-ifdef(use_specs).
+-type init_sup_name() :: sup_name() | 'self'.
+
+-type stop_rsn() :: {'shutdown', term()}
+ | {'bad_return', {module(),'init', term()}}
+ | {'bad_start_spec', term()}
+ | {'start_spec', term()}
+ | {'supervisor_data', term()}.
+
+-spec init({init_sup_name(), module(), [term()]}) ->
+ {'ok', state()} | 'ignore' | {'stop', stop_rsn()}.
+-endif.
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
case Mod:init(Args) of
@@ -327,9 +369,9 @@ init_children(State, StartSpec) ->
case start_children(Children, SupName) of
{ok, NChildren} ->
{ok, State#state{children = NChildren}};
- {error, NChildren} ->
+ {error, NChildren, Reason} ->
terminate_children(NChildren, SupName),
- {stop, shutdown}
+ {stop, {shutdown, Reason}}
end;
Error ->
{stop, {start_spec, Error}}
@@ -347,32 +389,35 @@ init_dynamic(_State, StartSpec) ->
%%-----------------------------------------------------------------
%% Func: start_children/2
-%% Args: Children = [#child] in start order
-%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Args: Children = [child_rec()] in start order
+%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod}
%% Purpose: Start all children. The new list contains #child's
%% with pids.
-%% Returns: {ok, NChildren} | {error, NChildren}
-%% NChildren = [#child] in termination order (reversed
+%% Returns: {ok, NChildren} | {error, NChildren, Reason}
+%% NChildren = [child_rec()] in termination order (reversed
%% start order)
%%-----------------------------------------------------------------
start_children(Children, SupName) -> start_children(Children, [], SupName).
start_children([Child|Chs], NChildren, SupName) ->
case do_start_child(SupName, Child) of
+ {ok, undefined} when Child#child.restart_type =:= temporary ->
+ start_children(Chs, NChildren, SupName);
{ok, Pid} ->
start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
{ok, Pid, _Extra} ->
start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
{error, Reason} ->
report_error(start_error, Reason, Child, SupName),
- {error, lists:reverse(Chs) ++ [Child | NChildren]}
+ {error, lists:reverse(Chs) ++ [Child | NChildren],
+ {failed_to_start_child,Child#child.name,Reason}}
end;
start_children([], NChildren, _SupName) ->
{ok, NChildren}.
do_start_child(SupName, Child) ->
- #child{mfa = {M, F, A}} = Child,
- case catch apply(M, F, A) of
+ #child{mfargs = {M, F, Args}} = Child,
+ case catch apply(M, F, Args) of
{ok, Pid} when is_pid(Pid) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
@@ -401,35 +446,54 @@ do_start_child_i(M, F, A) ->
{error, What}
end.
-
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
+-ifdef(use_specs).
+-type call() :: 'which_children' | 'count_children' | {_, _}. % XXX: refine
+-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}.
+-endif.
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
- #child{mfa = {M, F, A}} = hd(State#state.children),
+ Child = hd(State#state.children),
+ #child{mfargs = {M, F, A}} = Child,
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
- {ok, undefined} ->
- {reply, {ok, undefined}, State};
+ {ok, undefined} when Child#child.restart_type =:= temporary ->
+ {reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
- ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
- ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid, Extra}, NState};
What ->
{reply, What, State}
end;
-%%% The requests terminate_child, delete_child and restart_child are
-%%% invalid for simple_one_for_one and simple_one_for_one_terminate
-%%% supervisors.
+%% terminate_child for simple_one_for_one can only be done with pid
+handle_call({terminate_child, Name}, _From, State) when not is_pid(Name),
+ ?is_simple(State) ->
+ {reply, {error, simple_one_for_one}, State};
+
+handle_call({terminate_child, Name}, _From, State) ->
+ case get_child(Name, State, ?is_simple(State)) of
+ {value, Child} ->
+ case do_terminate(Child, State#state.name) of
+ #child{restart_type=RT} when RT=:=temporary; ?is_simple(State) ->
+ {reply, ok, state_del_child(Child, State)};
+ NChild ->
+ {reply, ok, replace_child(NChild, State)}
+ end;
+ false ->
+ {reply, {error, not_found}, State}
+ end;
+
+%%% The requests delete_child and restart_child are invalid for
+%%% simple_one_for_one supervisors.
handle_call({_Req, _Data}, _From, State) when ?is_simple(State) ->
- {reply, {error, State#state.strategy}, State};
+ {reply, {error, simple_one_for_one}, State};
handle_call({start_child, ChildSpec}, _From, State) ->
case check_childspec(ChildSpec) of
@@ -453,6 +517,8 @@ handle_call({restart_child, Name}, _From, State) ->
Error ->
{reply, Error, State}
end;
+ {value, #child{pid=?restarting(_)}} ->
+ {reply, {error, restarting}, State};
{value, _} ->
{reply, {error, running}, State};
_ ->
@@ -464,60 +530,146 @@ handle_call({delete_child, Name}, _From, State) ->
{value, Child} when Child#child.pid =:= undefined ->
NState = remove_child(Child, State),
{reply, ok, NState};
+ {value, #child{pid=?restarting(_)}} ->
+ {reply, {error, restarting}, State};
{value, _} ->
{reply, {error, running}, State};
_ ->
{reply, {error, not_found}, State}
end;
-handle_call({terminate_child, Name}, _From, State) ->
- case get_child(Name, State) of
- {value, Child} ->
- NChild = do_terminate(Child, State#state.name),
- {reply, ok, replace_child(NChild, State)};
- _ ->
- {reply, {error, not_found}, State}
- end;
+handle_call(which_children, _From, #state{children = [#child{restart_type = temporary,
+ child_type = CT,
+ modules = Mods}]} =
+ State) when ?is_simple(State) ->
+ Reply = lists:map(fun(Pid) -> {undefined, Pid, CT, Mods} end,
+ ?SETS:to_list(dynamics_db(temporary, State#state.dynamics))),
+ {reply, Reply, State};
-handle_call(which_children, _From, State) when ?is_simple(State) ->
- [#child{child_type = CT, modules = Mods}] = State#state.children,
- Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
- ?DICT:to_list(State#state.dynamics)),
+handle_call(which_children, _From, #state{children = [#child{restart_type = RType,
+ child_type = CT,
+ modules = Mods}]} =
+ State) when ?is_simple(State) ->
+ Reply = lists:map(fun({?restarting(_),_}) -> {undefined,restarting,CT,Mods};
+ ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ ?DICT:to_list(dynamics_db(RType, State#state.dynamics))),
{reply, Reply, State};
handle_call(which_children, _From, State) ->
Resp =
- lists:map(fun (#child{pid = Pid, name = Name,
+ lists:map(fun(#child{pid = ?restarting(_), name = Name,
+ child_type = ChildType, modules = Mods}) ->
+ {Name, restarting, ChildType, Mods};
+ (#child{pid = Pid, name = Name,
child_type = ChildType, modules = Mods}) ->
- {Name, Pid, ChildType, Mods}
+ {Name, Pid, ChildType, Mods}
end,
State#state.children),
- {reply, Resp, State}.
+ {reply, Resp, State};
-%%% Hopefully cause a function-clause as there is no API function
-%%% that utilizes cast.
-handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
- []),
- {noreply, State}.
+handle_call(count_children, _From, #state{children = [#child{restart_type = temporary,
+ child_type = CT}]} = State)
+ when ?is_simple(State) ->
+ {Active, Count} =
+ ?SETS:fold(fun(Pid, {Alive, Tot}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true ->{Alive+1, Tot +1};
+ false ->
+ {Alive, Tot + 1}
+ end
+ end, {0, 0}, dynamics_db(temporary, State#state.dynamics)),
+ Reply = case CT of
+ supervisor -> [{specs, 1}, {active, Active},
+ {supervisors, Count}, {workers, 0}];
+ worker -> [{specs, 1}, {active, Active},
+ {supervisors, 0}, {workers, Count}]
+ end,
+ {reply, Reply, State};
-handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
+handle_call(count_children, _From, #state{children = [#child{restart_type = RType,
+ child_type = CT}]} = State)
when ?is_simple(State) ->
- {ok, NState} = do_restart(RestartType, Reason, Child, State),
- {noreply, NState};
-handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
- case get_child(Child#child.name, State) of
- {value, Child1} ->
- {ok, NState} = do_restart(RestartType, Reason, Child1, State),
- {noreply, NState};
- _ ->
+ {Active, Count} =
+ ?DICT:fold(fun(Pid, _Val, {Alive, Tot}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true ->
+ {Alive+1, Tot +1};
+ false ->
+ {Alive, Tot + 1}
+ end
+ end, {0, 0}, dynamics_db(RType, State#state.dynamics)),
+ Reply = case CT of
+ supervisor -> [{specs, 1}, {active, Active},
+ {supervisors, Count}, {workers, 0}];
+ worker -> [{specs, 1}, {active, Active},
+ {supervisors, 0}, {workers, Count}]
+ end,
+ {reply, Reply, State};
+
+handle_call(count_children, _From, State) ->
+ %% Specs and children are together on the children list...
+ {Specs, Active, Supers, Workers} =
+ lists:foldl(fun(Child, Counts) ->
+ count_child(Child, Counts)
+ end, {0,0,0,0}, State#state.children),
+
+ %% Reformat counts to a property list.
+ Reply = [{specs, Specs}, {active, Active},
+ {supervisors, Supers}, {workers, Workers}],
+ {reply, Reply, State}.
+
+
+count_child(#child{pid = Pid, child_type = worker},
+ {Specs, Active, Supers, Workers}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true -> {Specs+1, Active+1, Supers, Workers+1};
+ false -> {Specs+1, Active, Supers, Workers+1}
+ end;
+count_child(#child{pid = Pid, child_type = supervisor},
+ {Specs, Active, Supers, Workers}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true -> {Specs+1, Active+1, Supers+1, Workers};
+ false -> {Specs+1, Active, Supers+1, Workers}
+ end.
+
+
+%%% If a restart attempt failed, this message is sent via
+%%% timer:apply_after(0,...) in order to give gen_server the chance to
+%%% check it's inbox before trying again.
+-ifdef(use_specs).
+-spec handle_cast({try_again_restart, child_id() | pid(), term()}, state()) ->
+ {'noreply', state()} | {stop, shutdown, state()}.
+-endif.
+handle_cast({try_again_restart,Pid,Reason}, #state{children=[Child]}=State)
+ when ?is_simple(State) ->
+ RT = Child#child.restart_type,
+ RPid = restarting(Pid),
+ case dynamic_child_args(RPid, dynamics_db(RT, State#state.dynamics)) of
+ {ok, Args} ->
+ {M, F, _} = Child#child.mfargs,
+ NChild = Child#child{pid = RPid, mfargs = {M, F, Args}},
+ try_restart(Child#child.restart_type, Reason, NChild, State);
+ error ->
{noreply, State}
end;
+handle_cast({try_again_restart,Name,Reason}, State) ->
+ %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist
+ case lists:keysearch(Name,#child.name,State#state.children) of
+ {value, Child = #child{pid=?restarting(_), restart_type=RestartType}} ->
+ try_restart(RestartType, Reason, Child, State);
+ _ ->
+ {noreply,State}
+ end.
+
%%
%% Take care of terminated children.
%%
+-ifdef(use_specs).
+-spec handle_info(term(), state()) ->
+ {'noreply', state()} | {'stop', 'shutdown', state()}.
+-endif.
handle_info({'EXIT', Pid, Reason}, State) ->
case restart_child(Pid, Reason, State) of
{ok, State1} ->
@@ -526,20 +678,34 @@ handle_info({'EXIT', Pid, Reason}, State) ->
{stop, shutdown, State1}
end;
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
+ when ?is_simple(State) ->
+ try_restart(RestartType, Reason, Child, State);
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
+ case get_child(Child#child.name, State) of
+ {value, Child1} ->
+ try_restart(RestartType, Reason, Child1, State);
+ _What ->
+ {noreply, State}
+ end;
+
handle_info(Msg, State) ->
- error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
+
%%
%% Terminate this server.
%%
-terminate(_Reason, State) when ?is_terminate_simple(State) ->
- terminate_simple_children(
- hd(State#state.children), State#state.dynamics, State#state.name),
- ok;
+-ifdef(use_specs).
+-spec terminate(term(), state()) -> 'ok'.
+-endif.
+terminate(_Reason, #state{children=[Child]} = State) when ?is_simple(State) ->
+ terminate_dynamic_children(Child, dynamics_db(Child#child.restart_type,
+ State#state.dynamics),
+ State#state.name);
terminate(_Reason, State) ->
- terminate_children(State#state.children, State#state.name),
- ok.
+ terminate_children(State#state.children, State#state.name).
%%
%% Change code for the supervisor.
@@ -550,6 +716,10 @@ terminate(_Reason, State) ->
%% NOTE: This requires that the init function of the call-back module
%% does not have any side effects.
%%
+-ifdef(use_specs).
+-spec code_change(term(), state(), term()) ->
+ {'ok', state()} | {'error', term()}.
+-endif.
code_change(_, State, _) ->
case (State#state.module):init(State#state.args) of
{ok, {SupFlags, StartSpec}} ->
@@ -577,14 +747,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
+update_childspec(State, StartSpec) when ?is_simple(State) ->
case check_startspec(StartSpec) of
{ok, [Child]} ->
{ok, State#state{children = [Child]}};
Error ->
{error, Error}
end;
-
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
{ok, Children} ->
@@ -603,11 +772,11 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
update_childspec1(OldC, Children, [Child|KeepOld])
end;
update_childspec1([], Children, KeepOld) ->
- % Return them in (keeped) reverse start order.
+ %% Return them in (kept) reverse start order.
lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
- case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
+ case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
Ch#child{pid = OldCh#child.pid};
(Ch) ->
Ch
@@ -618,7 +787,7 @@ update_chsp(OldCh, Children) ->
NewC ->
{ok, NewC}
end.
-
+
%%% ---------------------------------------------------
%%% Start a new child.
%%% ---------------------------------------------------
@@ -627,20 +796,16 @@ handle_start_child(Child, State) ->
case get_child(Child#child.name, State) of
false ->
case do_start_child(State#state.name, Child) of
+ {ok, undefined} when Child#child.restart_type =:= temporary ->
+ {{ok, undefined}, State};
{ok, Pid} ->
- Children = State#state.children,
- {{ok, Pid},
- State#state{children =
- [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid}, save_child(Child#child{pid = Pid}, State)};
{ok, Pid, Extra} ->
- Children = State#state.children,
- {{ok, Pid, Extra},
- State#state{children =
- [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)};
{error, What} ->
{{error, {What, Child}}, State}
end;
- {value, OldChild} when OldChild#child.pid =/= undefined ->
+ {value, OldChild} when is_pid(OldChild#child.pid) ->
{{error, {already_started, OldChild#child.pid}}, State};
{value, _OldChild} ->
{{error, already_present}, State}
@@ -648,105 +813,145 @@ handle_start_child(Child, State) ->
%%% ---------------------------------------------------
%%% Restart. A process has terminated.
-%%% Returns: {ok, #state} | {shutdown, #state}
+%%% Returns: {ok, state()} | {shutdown, state()}
%%% ---------------------------------------------------
-restart_child(Pid, Reason, State) when ?is_simple(State) ->
- case ?DICT:find(Pid, State#state.dynamics) of
+restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) ->
+ RestartType = Child#child.restart_type,
+ case dynamic_child_args(Pid, dynamics_db(RestartType, State#state.dynamics)) of
{ok, Args} ->
- [Child] = State#state.children,
- RestartType = Child#child.restart_type,
- {M, F, _} = Child#child.mfa,
- NChild = Child#child{pid = Pid, mfa = {M, F, Args}},
+ {M, F, _} = Child#child.mfargs,
+ NChild = Child#child{pid = Pid, mfargs = {M, F, Args}},
do_restart(RestartType, Reason, NChild, State);
error ->
- {ok, State}
+ {ok, State}
end;
+
restart_child(Pid, Reason, State) ->
Children = State#state.children,
+ %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist
case lists:keysearch(Pid, #child.pid, Children) of
- {value, Child} ->
- RestartType = Child#child.restart_type,
+ {value, #child{restart_type = RestartType} = Child} ->
do_restart(RestartType, Reason, Child, State);
- _ ->
+ false ->
{ok, State}
end.
-do_restart({permanent = RestartType, Delay}, Reason, Child, State) ->
- do_restart_delay({RestartType, Delay}, Reason, Child, State);
-do_restart(permanent, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
- restart(Child, State);
-do_restart(Type, normal, Child, State) ->
- del_child_and_maybe_shutdown(Type, Child, State);
-do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State)
- when RestartType =:= transient orelse RestartType =:= intrinsic ->
- do_restart_delay({RestartType, Delay}, Reason, Child, State);
-do_restart(Type, {shutdown, _}, Child, State) ->
- del_child_and_maybe_shutdown(Type, Child, State);
-do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
- del_child_and_maybe_shutdown(Type, Child, State);
-do_restart({RestartType, Delay}, Reason, Child, State)
- when RestartType =:= transient orelse RestartType =:= intrinsic ->
- do_restart_delay({RestartType, Delay}, Reason, Child, State);
-do_restart(Type, Reason, Child, State) when Type =:= transient orelse
- Type =:= intrinsic ->
- report_error(child_terminated, Reason, Child, State#state.name),
+try_restart(RestartType, Reason, Child, State) ->
+ case handle_restart(RestartType, Reason, Child, State) of
+ {ok, NState} -> {noreply, NState};
+ {shutdown, State2} -> {stop, shutdown, State2}
+ end.
+
+do_restart(RestartType, Reason, Child, State) ->
+ maybe_report_error(RestartType, Reason, Child, State),
+ handle_restart(RestartType, Reason, Child, State).
+
+maybe_report_error(permanent, Reason, Child, State) ->
+ report_child_termination(Reason, Child, State);
+maybe_report_error({permanent, _}, Reason, Child, State) ->
+ report_child_termination(Reason, Child, State);
+maybe_report_error(_Type, Reason, Child, State) ->
+ case is_abnormal_termination(Reason) of
+ true -> report_child_termination(Reason, Child, State);
+ false -> ok
+ end.
+
+report_child_termination(Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name).
+
+handle_restart(permanent, _Reason, Child, State) ->
restart(Child, State);
-do_restart(temporary, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
- NState = state_del_child(Child, State),
- {ok, NState}.
+handle_restart(transient, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(fun restart/2,
+ fun delete_child_and_continue/2,
+ Reason, Child, State);
+handle_restart(intrinsic, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(fun restart/2,
+ fun delete_child_and_stop/2,
+ Reason, Child, State);
+handle_restart(temporary, _Reason, Child, State) ->
+ delete_child_and_continue(Child, State);
+handle_restart({permanent, _Delay}=Restart, Reason, Child, State) ->
+ do_restart_delay(Restart, Reason, Child, State);
+handle_restart({transient, _Delay}=Restart, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason),
+ fun delete_child_and_continue/2,
+ Reason, Child, State);
+handle_restart({intrinsic, _Delay}=Restart, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason),
+ fun delete_child_and_stop/2,
+ Reason, Child, State).
+
+restart_if_explicit_or_abnormal(RestartHow, Otherwise, Reason, Child, State) ->
+ case ?is_explicit_restart(Reason) orelse is_abnormal_termination(Reason) of
+ true -> RestartHow(Child, State);
+ false -> Otherwise(Child, State)
+ end.
+
+defer_to_restart_delay(Restart, Reason) ->
+ fun(Child, State) -> do_restart_delay(Restart, Reason, Child, State) end.
+
+delete_child_and_continue(Child, State) ->
+ {ok, state_del_child(Child, State)}.
+
+delete_child_and_stop(Child, State) ->
+ {shutdown, state_del_child(Child, State)}.
+
+is_abnormal_termination(normal) -> false;
+is_abnormal_termination(shutdown) -> false;
+is_abnormal_termination({shutdown, _}) -> false;
+is_abnormal_termination(_Other) -> true.
do_restart_delay({RestartType, Delay}, Reason, Child, State) ->
- case restart1(Child, State) of
+ case add_restart(State) of
{ok, NState} ->
- {ok, NState};
- {terminate, NState} ->
+ maybe_restart(NState#state.strategy, Child, NState);
+ {terminate, _NState} ->
+ %% we've reached the max restart intensity, but the
+ %% add_restart will have added to the restarts
+ %% field. Given we don't want to die here, we need to go
+ %% back to the old restarts field otherwise we'll never
+ %% attempt to restart later, which is why we ignore
+ %% NState for this clause.
_TRef = erlang:send_after(trunc(Delay*1000), self(),
{delayed_restart,
{{RestartType, Delay}, Reason, Child}}),
- {ok, state_del_child(Child, NState)}
+ {ok, state_del_child(Child, State)}
end.
-del_child_and_maybe_shutdown(intrinsic, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-del_child_and_maybe_shutdown({intrinsic, _Delay}, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-del_child_and_maybe_shutdown(_, Child, State) ->
- {ok, state_del_child(Child, State)}.
-
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
- restart(NState#state.strategy, Child, NState, fun restart/2);
+ maybe_restart(NState#state.strategy, Child, NState);
{terminate, NState} ->
report_error(shutdown, reached_max_restart_intensity,
Child, State#state.name),
- {shutdown, state_del_child(Child, NState)}
+ {shutdown, remove_child(Child, NState)}
end.
-restart1(Child, State) ->
- case add_restart(State) of
- {ok, NState} ->
- restart(NState#state.strategy, Child, NState, fun restart1/2);
- {terminate, _NState} ->
- %% we've reached the max restart intensity, but the
- %% add_restart will have added to the restarts
- %% field. Given we don't want to die here, we need to go
- %% back to the old restarts field otherwise we'll never
- %% attempt to restart later.
- {terminate, State}
+maybe_restart(Strategy, Child, State) ->
+ case restart(Strategy, Child, State) of
+ {try_again, Reason, NState2} ->
+ %% Leaving control back to gen_server before
+ %% trying again. This way other incoming requsts
+ %% for the supervisor can be handled - e.g. a
+ %% shutdown request for the supervisor or the
+ %% child.
+ Id = if ?is_simple(State) -> Child#child.pid;
+ true -> Child#child.name
+ end,
+ timer:apply_after(0,?MODULE,try_again_restart,[self(),Id,Reason]),
+ {ok,NState2};
+ Other ->
+ Other
end.
-restart(Strategy, Child, State, Restart)
- when Strategy =:= simple_one_for_one orelse
- Strategy =:= simple_one_for_one_terminate ->
- #child{mfa = {M, F, A}} = Child,
- Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
+restart(simple_one_for_one, Child, State) ->
+ #child{pid = OldPid, mfargs = {M, F, A}} = Child,
+ Dynamics = ?DICT:erase(OldPid, dynamics_db(Child#child.restart_type,
+ State#state.dynamics)),
case do_start_child_i(M, F, A) of
- {ok, undefined} ->
- {ok, State};
{ok, Pid} ->
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};
@@ -754,10 +959,13 @@ restart(Strategy, Child, State, Restart)
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};
{error, Error} ->
+ NState = State#state{dynamics = ?DICT:store(restarting(OldPid), A,
+ Dynamics)},
report_error(start_error, Error, Child, State#state.name),
- Restart(Child, State)
+ {try_again, Error, NState}
end;
-restart(one_for_one, Child, State, Restart) ->
+restart(one_for_one, Child, State) ->
+ OldPid = Child#child.pid,
case do_start_child(State#state.name, Child) of
{ok, Pid} ->
NState = replace_child(Child#child{pid = Pid}, State),
@@ -766,139 +974,83 @@ restart(one_for_one, Child, State, Restart) ->
NState = replace_child(Child#child{pid = Pid}, State),
{ok, NState};
{error, Reason} ->
+ NState = replace_child(Child#child{pid = restarting(OldPid)}, State),
report_error(start_error, Reason, Child, State#state.name),
- Restart(Child, State)
+ {try_again, Reason, NState}
end;
-restart(rest_for_one, Child, State, Restart) ->
+restart(rest_for_one, Child, State) ->
{ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children),
ChAfter2 = terminate_children(ChAfter, State#state.name),
case start_children(ChAfter2, State#state.name) of
{ok, ChAfter3} ->
{ok, State#state{children = ChAfter3 ++ ChBefore}};
- {error, ChAfter3} ->
- Restart(Child, State#state{children = ChAfter3 ++ ChBefore})
+ {error, ChAfter3, Reason} ->
+ NChild = Child#child{pid=restarting(Child#child.pid)},
+ NState = State#state{children = ChAfter3 ++ ChBefore},
+ {try_again, Reason, replace_child(NChild,NState)}
end;
-restart(one_for_all, Child, State, Restart) ->
+restart(one_for_all, Child, State) ->
Children1 = del_child(Child#child.pid, State#state.children),
Children2 = terminate_children(Children1, State#state.name),
case start_children(Children2, State#state.name) of
{ok, NChs} ->
{ok, State#state{children = NChs}};
- {error, NChs} ->
- Restart(Child, State#state{children = NChs})
+ {error, NChs, Reason} ->
+ NChild = Child#child{pid=restarting(Child#child.pid)},
+ NState = State#state{children = NChs},
+ {try_again, Reason, replace_child(NChild,NState)}
end.
+restarting(Pid) when is_pid(Pid) -> ?restarting(Pid);
+restarting(RPid) -> RPid.
+
%%-----------------------------------------------------------------
%% Func: terminate_children/2
-%% Args: Children = [#child] in termination order
+%% Args: Children = [child_rec()] in termination order
%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Returns: NChildren = [#child] in
+%% Returns: NChildren = [child_rec()] in
%% startup order (reversed termination order)
%%-----------------------------------------------------------------
terminate_children(Children, SupName) ->
terminate_children(Children, SupName, []).
+%% Temporary children should not be restarted and thus should
+%% be skipped when building the list of terminated children, although
+%% we do want them to be shut down as many functions from this module
+%% use this function to just clear everything.
+terminate_children([Child = #child{restart_type=temporary} | Children], SupName, Res) ->
+ do_terminate(Child, SupName),
+ terminate_children(Children, SupName, Res);
terminate_children([Child | Children], SupName, Res) ->
NChild = do_terminate(Child, SupName),
terminate_children(Children, SupName, [NChild | Res]);
terminate_children([], _SupName, Res) ->
Res.
-terminate_simple_children(Child, Dynamics, SupName) ->
- Pids = dict:fold(fun (Pid, _Args, Pids) ->
- erlang:monitor(process, Pid),
- unlink(Pid),
- exit(Pid, child_exit_reason(Child)),
- [Pid | Pids]
- end, [], Dynamics),
- TimeoutMsg = {timeout, make_ref()},
- TRef = timeout_start(Child, TimeoutMsg),
- {Replies, Timedout} =
- lists:foldl(
- fun (_Pid, {Replies, Timedout}) ->
- {Pid1, Reason1, Timedout1} =
- receive
- TimeoutMsg ->
- Remaining = Pids -- [P || {P, _} <- Replies],
- [exit(P, kill) || P <- Remaining],
- receive
- {'DOWN', _MRef, process, Pid, Reason} ->
- {Pid, Reason, true}
- end;
- {'DOWN', _MRef, process, Pid, Reason} ->
- {Pid, Reason, Timedout}
- end,
- {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies],
- Timedout1}
- end, {[], false}, Pids),
- timeout_stop(Child, TRef, TimeoutMsg, Timedout),
- ReportError = shutdown_error_reporter(SupName),
- Report = fun(_, ok) -> ok;
- (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid})
- end,
- [receive
- {'EXIT', Pid, Reason} ->
- Report(Pid, child_res(Child, Reason, Timedout))
- after
- 0 -> Report(Pid, Reply)
- end || {Pid, Reply} <- Replies],
- ok.
-
-child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
-child_exit_reason(#child{}) -> shutdown.
-
-child_res(#child{shutdown=brutal_kill}, killed, false) -> ok;
-child_res(#child{}, shutdown, false) -> ok;
-child_res(#child{restart_type=permanent}, normal, false) -> {error, normal};
-child_res(#child{restart_type={permanent,_}},normal, false) -> {error, normal};
-child_res(#child{}, normal, false) -> ok;
-child_res(#child{}, R, _) -> {error, R}.
-
-timeout_start(#child{shutdown = Time}, Msg) when is_integer(Time) ->
- erlang:send_after(Time, self(), Msg);
-timeout_start(#child{}, _Msg) ->
- ok.
-
-timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
- erlang:cancel_timer(TRef),
- receive
- Msg -> ok
- after
- 0 -> ok
- end;
-timeout_stop(#child{}, _TRef, _Msg, _Timedout) ->
- ok.
-
-do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
- ReportError = shutdown_error_reporter(SupName),
+do_terminate(Child, SupName) when is_pid(Child#child.pid) ->
case shutdown(Child#child.pid, Child#child.shutdown) of
ok ->
ok;
- {error, normal} ->
- case Child#child.restart_type of
- permanent -> ReportError(normal, Child);
- {permanent, _Delay} -> ReportError(normal, Child);
- _ -> ok
- end;
+ {error, normal} when not ?is_permanent(Child#child.restart_type) ->
+ ok;
{error, OtherReason} ->
- ReportError(OtherReason, Child)
+ report_error(shutdown_error, OtherReason, Child, SupName)
end,
Child#child{pid = undefined};
do_terminate(Child, _SupName) ->
- Child.
+ Child#child{pid = undefined}.
%%-----------------------------------------------------------------
-%% Shutdowns a child. We must check the EXIT value
+%% Shutdowns a child. We must check the EXIT value
%% of the child, because it might have died with another reason than
-%% the wanted. In that case we want to report the error. We put a
-%% monitor on the child an check for the 'DOWN' message instead of
-%% checking for the 'EXIT' message, because if we check the 'EXIT'
-%% message a "naughty" child, who does unlink(Sup), could hang the
-%% supervisor.
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
%% Returns: ok | {error, OtherReason} (this should be reported)
%%-----------------------------------------------------------------
shutdown(Pid, brutal_kill) ->
-
case monitor_child(Pid) of
ok ->
exit(Pid, kill),
@@ -908,16 +1060,14 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end;
-
shutdown(Pid, Time) ->
-
case monitor_child(Pid) of
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
- receive
+ receive
{'DOWN', _MRef, process, Pid, shutdown} ->
ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
@@ -929,14 +1079,14 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end.
%% Help function to shutdown/2 switches from link to monitor approach
monitor_child(Pid) ->
-
- %% Do the monitor operation first so that if the child dies
+
+ %% Do the monitor operation first so that if the child dies
%% before the monitoring is done causing a 'DOWN'-message with
%% reason noproc, we will get the real reason in the 'EXIT'-message
%% unless a naughty child has already done unlink...
@@ -946,34 +1096,177 @@ monitor_child(Pid) ->
receive
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
- {'EXIT', Pid, Reason} ->
- receive
+ {'EXIT', Pid, Reason} ->
+ receive
{'DOWN', _, process, Pid, _} ->
{error, Reason}
end
- after 0 ->
+ after 0 ->
%% If a naughty child did unlink and the child dies before
- %% monitor the result will be that shutdown/2 receives a
+ %% monitor the result will be that shutdown/2 receives a
%% 'DOWN'-message with reason noproc.
%% If the child should die after the unlink there
%% will be a 'DOWN'-message with a correct reason
- %% that will be handled in shutdown/2.
- ok
+ %% that will be handled in shutdown/2.
+ ok
end.
%%-----------------------------------------------------------------
+%% Func: terminate_dynamic_children/3
+%% Args: Child = child_rec()
+%% Dynamics = ?DICT() | ?SET()
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Returns: ok
+%%
+%%
+%% Shutdown all dynamic children. This happens when the supervisor is
+%% stopped. Because the supervisor can have millions of dynamic children, we
+%% can have an significative overhead here.
+%%-----------------------------------------------------------------
+terminate_dynamic_children(Child, Dynamics, SupName) ->
+ {Pids, EStack0} = monitor_dynamic_children(Child, Dynamics),
+ Sz = ?SETS:size(Pids),
+ EStack = case Child#child.shutdown of
+ brutal_kill ->
+ ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
+ infinity ->
+ ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
+ Time ->
+ ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ TRef = erlang:start_timer(Time, self(), kill),
+ wait_dynamic_children(Child, Pids, Sz, TRef, EStack0)
+ end,
+ %% Unroll stacked errors and report them
+ ?DICT:fold(fun(Reason, Ls, _) ->
+ report_error(shutdown_error, Reason,
+ Child#child{pid=Ls}, SupName)
+ end, ok, EStack).
+
+
+monitor_dynamic_children(#child{restart_type=temporary}, Dynamics) ->
+ ?SETS:fold(fun(P, {Pids, EStack}) ->
+ case monitor_child(P) of
+ ok ->
+ {?SETS:add_element(P, Pids), EStack};
+ {error, normal} ->
+ {Pids, EStack};
+ {error, Reason} ->
+ {Pids, ?DICT:append(Reason, P, EStack)}
+ end
+ end, {?SETS:new(), ?DICT:new()}, Dynamics);
+monitor_dynamic_children(#child{restart_type=RType}, Dynamics) ->
+ ?DICT:fold(fun(P, _, {Pids, EStack}) when is_pid(P) ->
+ case monitor_child(P) of
+ ok ->
+ {?SETS:add_element(P, Pids), EStack};
+ {error, normal} when not ?is_permanent(RType) ->
+ {Pids, EStack};
+ {error, Reason} ->
+ {Pids, ?DICT:append(Reason, P, EStack)}
+ end;
+ (?restarting(_), _, {Pids, EStack}) ->
+ {Pids, EStack}
+ end, {?SETS:new(), ?DICT:new()}, Dynamics).
+
+wait_dynamic_children(_Child, _Pids, 0, undefined, EStack) ->
+ EStack;
+wait_dynamic_children(_Child, _Pids, 0, TRef, EStack) ->
+ %% If the timer has expired before its cancellation, we must empty the
+ %% mail-box of the 'timeout'-message.
+ erlang:cancel_timer(TRef),
+ receive
+ {timeout, TRef, kill} ->
+ EStack
+ after 0 ->
+ EStack
+ end;
+wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz,
+ TRef, EStack) ->
+ receive
+ {'DOWN', _MRef, process, Pid, killed} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, ?DICT:append(Reason, Pid, EStack))
+ end;
+wait_dynamic_children(#child{restart_type=RType} = Child, Pids, Sz,
+ TRef, EStack) ->
+ receive
+ {'DOWN', _MRef, process, Pid, shutdown} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, normal} when not ?is_permanent(RType) ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, ?DICT:append(Reason, Pid, EStack));
+
+ {timeout, TRef, kill} ->
+ ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz-1, undefined, EStack)
+ end.
+
+%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
-state_del_child(#child{pid = Pid}, State) when ?is_simple(State) ->
- NDynamics = ?DICT:erase(Pid, State#state.dynamics),
+
+%% Note we do not want to save the parameter list for temporary processes as
+%% they will not be restarted, and hence we do not need this information.
+%% Especially for dynamic children to simple_one_for_one supervisors
+%% it could become very costly as it is not uncommon to spawn
+%% very many such processes.
+save_child(#child{restart_type = temporary,
+ mfargs = {M, F, _}} = Child, #state{children = Children} = State) ->
+ State#state{children = [Child#child{mfargs = {M, F, undefined}} |Children]};
+save_child(Child, #state{children = Children} = State) ->
+ State#state{children = [Child |Children]}.
+
+save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) ->
+ State#state{dynamics = ?SETS:add_element(Pid, dynamics_db(temporary, Dynamics))};
+save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) ->
+ State#state{dynamics = ?DICT:store(Pid, Args, dynamics_db(RestartType, Dynamics))}.
+
+dynamics_db(temporary, undefined) ->
+ ?SETS:new();
+dynamics_db(_, undefined) ->
+ ?DICT:new();
+dynamics_db(_,Dynamics) ->
+ Dynamics.
+
+dynamic_child_args(Pid, Dynamics) ->
+ case ?SETS:is_set(Dynamics) of
+ true ->
+ {ok, undefined};
+ false ->
+ ?DICT:find(Pid, Dynamics)
+ end.
+
+state_del_child(#child{pid = Pid, restart_type = temporary}, State) when ?is_simple(State) ->
+ NDynamics = ?SETS:del_element(Pid, dynamics_db(temporary, State#state.dynamics)),
+ State#state{dynamics = NDynamics};
+state_del_child(#child{pid = Pid, restart_type = RType}, State) when ?is_simple(State) ->
+ NDynamics = ?DICT:erase(Pid, dynamics_db(RType, State#state.dynamics)),
State#state{dynamics = NDynamics};
state_del_child(Child, State) ->
NChildren = del_child(Child#child.name, State#state.children),
State#state{children = NChildren}.
+del_child(Name, [Ch=#child{pid = ?restarting(_)}|_]=Chs) when Ch#child.name =:= Name ->
+ Chs;
+del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name, Ch#child.restart_type =:= temporary ->
+ Chs;
del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name ->
[Ch#child{pid = undefined} | Chs];
+del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid, Ch#child.restart_type =:= temporary ->
+ Chs;
del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid ->
[Ch#child{pid = undefined} | Chs];
del_child(Name, [Ch|Chs]) ->
@@ -996,7 +1289,38 @@ split_child(_, [], After) ->
{lists:reverse(After), []}.
get_child(Name, State) ->
+ get_child(Name, State, false).
+get_child(Pid, State, AllowPid) when AllowPid, is_pid(Pid) ->
+ get_dynamic_child(Pid, State);
+get_child(Name, State, _) ->
lists:keysearch(Name, #child.name, State#state.children).
+
+get_dynamic_child(Pid, #state{children=[Child], dynamics=Dynamics}) ->
+ DynamicsDb = dynamics_db(Child#child.restart_type, Dynamics),
+ case is_dynamic_pid(Pid, DynamicsDb) of
+ true ->
+ {value, Child#child{pid=Pid}};
+ false ->
+ RPid = restarting(Pid),
+ case is_dynamic_pid(RPid, DynamicsDb) of
+ true ->
+ {value, Child#child{pid=RPid}};
+ false ->
+ case erlang:is_process_alive(Pid) of
+ true -> false;
+ false -> {value, Child}
+ end
+ end
+ end.
+
+is_dynamic_pid(Pid, Dynamics) ->
+ case ?SETS:is_set(Dynamics) of
+ true ->
+ ?SETS:is_element(Pid, Dynamics);
+ false ->
+ ?DICT:is_key(Pid, Dynamics)
+ end.
+
replace_child(Child, State) ->
Chs = do_replace_child(Child, State#state.children),
State#state{children = Chs}.
@@ -1016,12 +1340,12 @@ remove_child(Child, State) ->
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
%% rest_for_one
-%% MaxIntensity = integer()
-%% Period = integer()
+%% MaxIntensity = integer() >= 0
+%% Period = integer() > 0
%% Mod :== atom()
-%% Arsg :== term()
+%% Args :== term()
%% Purpose: Check that Type is of correct type (!)
-%% Returns: {ok, #state} | Error
+%% Returns: {ok, state()} | Error
%%-----------------------------------------------------------------
init_state(SupName, Type, Mod, Args) ->
case catch init_state1(SupName, Type, Mod, Args) of
@@ -1036,46 +1360,45 @@ init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) ->
validIntensity(MaxIntensity),
validPeriod(Period),
{ok, #state{name = supname(SupName,Mod),
- strategy = Strategy,
- intensity = MaxIntensity,
- period = Period,
- module = Mod,
- args = Args}};
+ strategy = Strategy,
+ intensity = MaxIntensity,
+ period = Period,
+ module = Mod,
+ args = Args}};
init_state1(_SupName, Type, _, _) ->
{invalid_type, Type}.
-validStrategy(simple_one_for_one_terminate) -> true;
-validStrategy(simple_one_for_one) -> true;
-validStrategy(one_for_one) -> true;
-validStrategy(one_for_all) -> true;
-validStrategy(rest_for_one) -> true;
-validStrategy(What) -> throw({invalid_strategy, What}).
+validStrategy(simple_one_for_one) -> true;
+validStrategy(one_for_one) -> true;
+validStrategy(one_for_all) -> true;
+validStrategy(rest_for_one) -> true;
+validStrategy(What) -> throw({invalid_strategy, What}).
validIntensity(Max) when is_integer(Max),
Max >= 0 -> true;
-validIntensity(What) -> throw({invalid_intensity, What}).
+validIntensity(What) -> throw({invalid_intensity, What}).
validPeriod(Period) when is_integer(Period),
Period > 0 -> true;
validPeriod(What) -> throw({invalid_period, What}).
-supname(self,Mod) -> {self(),Mod};
-supname(N,_) -> N.
+supname(self, Mod) -> {self(), Mod};
+supname(N, _) -> N.
%%% ------------------------------------------------------
%%% Check that the children start specification is valid.
%%% Shall be a six (6) tuple
%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
%%% where Name is an atom
-%%% Func is {Mod, Fun, Args} == {atom, atom, list}
+%%% Func is {Mod, Fun, Args} == {atom(), atom(), list()}
%%% RestartType is permanent | temporary | transient |
%%% intrinsic | {permanent, Delay} |
%%% {transient, Delay} | {intrinsic, Delay}
%% where Delay >= 0
-%%% Shutdown = integer() | infinity | brutal_kill
+%%% Shutdown = integer() > 0 | infinity | brutal_kill
%%% ChildType = supervisor | worker
%%% Modules = [atom()] | dynamic
-%%% Returns: {ok, [#child]} | Error
+%%% Returns: {ok, [child_rec()]} | Error
%%% ------------------------------------------------------
check_startspec(Children) -> check_startspec(Children, []).
@@ -1103,7 +1426,7 @@ check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) ->
validChildType(ChildType),
validShutdown(Shutdown, ChildType),
validMods(Mods),
- {ok, #child{name = Name, mfa = Func, restart_type = RestartType,
+ {ok, #child{name = Name, mfargs = Func, restart_type = RestartType,
shutdown = Shutdown, child_type = ChildType, modules = Mods}}.
validChildType(supervisor) -> true;
@@ -1112,8 +1435,8 @@ validChildType(What) -> throw({invalid_child_type, What}).
validName(_Name) -> true.
-validFunc({M, F, A}) when is_atom(M),
- is_atom(F),
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
@@ -1131,15 +1454,15 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
-validShutdown(infinity, supervisor) -> true;
+validShutdown(infinity, _) -> true;
validShutdown(brutal_kill, _) -> true;
validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
validMods(dynamic) -> true;
validMods(Mods) when is_list(Mods) ->
- lists:foreach(fun (Mod) ->
+ lists:foreach(fun(Mod) ->
if
is_atom(Mod) -> ok;
true -> throw({invalid_module, Mod})
@@ -1157,7 +1480,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}).
%%% Returns: {ok, State'} | {terminate, State'}
%%% ------------------------------------------------------
-add_restart(State) ->
+add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
@@ -1213,15 +1536,18 @@ report_error(Error, Reason, Child, SupName) ->
{offender, extract_child(Child)}],
error_logger:error_report(supervisor_report, ErrorMsg).
-shutdown_error_reporter(SupName) ->
- fun(Reason, Child) ->
- report_error(shutdown_error, Reason, Child, SupName)
- end.
+extract_child(Child) when is_list(Child#child.pid) ->
+ [{nb_children, length(Child#child.pid)},
+ {name, Child#child.name},
+ {mfargs, Child#child.mfargs},
+ {restart_type, Child#child.restart_type},
+ {shutdown, Child#child.shutdown},
+ {child_type, Child#child.child_type}];
extract_child(Child) ->
[{pid, Child#child.pid},
{name, Child#child.name},
- {mfa, Child#child.mfa},
+ {mfargs, Child#child.mfargs},
{restart_type, Child#child.restart_type},
{shutdown, Child#child.shutdown},
{child_type, Child#child.child_type}].
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
index a841b1f0..5a47e309 100644
--- a/src/supervisor2_tests.erl
+++ b/src/supervisor2_tests.erl
@@ -65,6 +65,6 @@ init([Timeout]) ->
[{local, ?MODULE}, ?MODULE, []]},
transient, Timeout, supervisor, [?MODULE]}]}};
init([]) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{test_worker, {?MODULE, start_link, []},
temporary, 1000, worker, [?MODULE]}]}}.
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 51ff7b4e..da325f1e 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -34,7 +34,7 @@
%%----------------------------------------------------------------------------
test_supervisor_delayed_restart() ->
- passed = with_sup(simple_one_for_one_terminate,
+ passed = with_sup(simple_one_for_one,
fun (SupPid) ->
{ok, _ChildPid} =
supervisor2:start_child(SupPid, []),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index d60b7fec..a07f6c65 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -263,29 +263,11 @@ get_total_memory({unix,openbsd}) ->
sysctl("hw.usermem");
get_total_memory({win32,_OSname}) ->
- %% Due to the Erlang print format bug, on Windows boxes the memory
- %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM
- %% we get negative memory size:
- %% > os_mon_sysinfo:get_mem_info().
- %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"]
- %% Due to this bug, we don't actually know anything. Even if the
- %% number is postive we can't be sure if it's correct. This only
- %% affects us on os_mon versions prior to 2.2.1.
- case application:get_key(os_mon, vsn) of
- undefined ->
- unknown;
- {ok, Version} ->
- case rabbit_misc:version_compare(Version, "2.2.1", lt) of
- true -> %% os_mon is < 2.2.1, so we know nothing
- unknown;
- false ->
- [Result|_] = os_mon_sysinfo:get_mem_info(),
- {ok, [_MemLoad, TotPhys, _AvailPhys,
- _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
- io_lib:fread("~d~d~d~d~d~d~d", Result),
- TotPhys
- end
- end;
+ [Result|_] = os_mon_sysinfo:get_mem_info(),
+ {ok, [_MemLoad, TotPhys, _AvailPhys, _TotPage, _AvailPage, _TotV, _AvailV],
+ _RestStr} =
+ io_lib:fread("~d~d~d~d~d~d~d", Result),
+ TotPhys;
get_total_memory({unix, linux}) ->
File = read_proc_file("/proc/meminfo"),