summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-15 14:56:52 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-15 14:56:52 +0100
commit0f52df179315cad77348ac6a541cd747991abaf6 (patch)
treeb6c3f6de8e341a035ce24f6b4d9bdb0cb0b13b1f
parent3402700d7f8ade8555754cbc3c944e538bf33e76 (diff)
parent84e8fb5834a754b5f2e4b5b9a293f028ee10d3d3 (diff)
downloadrabbitmq-server-0f52df179315cad77348ac6a541cd747991abaf6.tar.gz
stable to default
-rw-r--r--.hgignore1
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec4
-rw-r--r--packaging/debs/Debian/debian/control4
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in2
-rw-r--r--src/delegate.erl75
-rw-r--r--src/mirrored_supervisor.erl5
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_access_control.erl17
-rw-r--r--src/rabbit_alarm.erl72
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl5
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl5
-rw-r--r--src/rabbit_control_main.erl51
-rw-r--r--src/rabbit_direct.erl30
-rw-r--r--src/rabbit_event.erl4
-rw-r--r--src/rabbit_heartbeat.erl41
-rw-r--r--src/rabbit_memory_monitor.erl17
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl6
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_mnesia.erl62
-rw-r--r--src/rabbit_plugins_main.erl20
-rw-r--r--src/rabbit_policy.erl82
-rw-r--r--src/rabbit_queue_index.erl32
-rw-r--r--src/rabbit_reader.erl81
-rw-r--r--src/rabbit_tests.erl69
-rw-r--r--src/rabbit_upgrade_functions.erl23
-rw-r--r--src/rabbit_writer.erl48
-rw-r--r--src/supervisor2.erl1200
-rw-r--r--src/supervisor2_tests.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl28
35 files changed, 1259 insertions, 749 deletions
diff --git a/.hgignore b/.hgignore
index 912b4a56..cd017298 100644
--- a/.hgignore
+++ b/.hgignore
@@ -3,6 +3,7 @@ syntax: glob
*~
*.swp
*.patch
+*.orig
erl_crash.dump
deps.mk
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 1d641144..b2361cde 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -960,7 +960,7 @@
</para>
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg> <arg choice="opt"><replaceable>priority</replaceable></arg> </cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">--priority <replaceable>priority</replaceable></arg> <arg choice="opt">--apply-to <replaceable>apply-to</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Sets a policy.
@@ -989,7 +989,13 @@
<varlistentry>
<term>priority</term>
<listitem><para>
- The priority of the policy as an integer, defaulting to 0. Higher numbers indicate greater precedence.
+ The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>apply-to</term>
+ <listitem><para>
+ Which types of object this policy should apply to - "queues", "exchanges" or "all". The default is "all".
</para></listitem>
</varlistentry>
</variablelist>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index a4582e2d..7d0766f8 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -18,6 +18,7 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {vm_memory_high_watermark_paging_ratio, 0.5},
{disk_free_limit, 1000000000}, %% 1GB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index f2195d84..2df8ee9d 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -12,8 +12,8 @@ Source3: rabbitmq-server.logrotate
Source4: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt
-Requires: erlang >= R12B-3, logrotate
+BuildRequires: erlang >= R13B03, python-simplejson, xmlto, libxslt
+Requires: erlang >= R13B03, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
Requires(post): %%REQUIRES%%
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 3a15c4b6..02d23547 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -4,12 +4,12 @@ Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
Uploaders: Emile Joubert <emile@rabbitmq.com>
DM-Upload-Allowed: yes
-Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:13.b.3), erlang-src (>= 1:13.b.3), unzip, zip
Standards-Version: 3.9.2
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox (>= 1:12.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox (>= 1:13.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends}
Description: AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 2ab8eee7..85625a9d 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -213,7 +213,7 @@ Function findErlang
abort:
Abort
${Else}
- ${VersionCompare} $2 "5.6.3" $0
+ ${VersionCompare} $2 "5.7.4" $0
${VersionCompare} $2 "5.8.1" $1
${If} $0 = 2
diff --git a/src/delegate.erl b/src/delegate.erl
index 4e1dcd2e..f680d94a 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -27,14 +27,13 @@
-ifdef(use_specs).
+-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}).
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
--spec(invoke/2 ::
- ( pid(), fun ((pid()) -> A)) -> A;
- ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
- [{pid(), term()}]}).
--spec(invoke_no_result/2 ::
- (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A;
+ ([pid()], fun_or_mfa(A)) -> {[{pid(), A}],
+ [{pid(), term()}]}).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok').
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -52,24 +51,24 @@
start_link(Num) ->
gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
-invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- Fun(Pid);
-invoke(Pid, Fun) when is_pid(Pid) ->
- case invoke([Pid], Fun) of
+invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ apply1(FunOrMFA, Pid);
+invoke(Pid, FunOrMFA) when is_pid(Pid) ->
+ case invoke([Pid], FunOrMFA) of
{[{Pid, Result}], []} ->
Result;
{[], [{Pid, {Class, Reason, StackTrace}}]} ->
erlang:raise(Class, Reason, StackTrace)
end;
-invoke([], _Fun) -> %% optimisation
+invoke([], _FunOrMFA) -> %% optimisation
{[], []};
-invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- case safe_invoke(Pid, Fun) of
+invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
{ok, _, Result} -> {[{Pid, Result}], []};
{error, _, Error} -> {[], [{Pid, Error}]}
end;
-invoke(Pids, Fun) when is_list(Pids) ->
+invoke(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
%% infinity, and thus there is no process spawned in order to do
@@ -79,44 +78,44 @@ invoke(Pids, Fun) when is_list(Pids) ->
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped}, infinity)
+ {invoke, FunOrMFA, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
Pid <- orddict:fetch(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) |
+ ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
[Results || {_Node, Results} <- Replies]]),
lists:foldl(
fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]}
end, {[], BadPids}, ResultsNoNode).
-invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
- safe_invoke(Pid, Fun), %% we don't care about any error
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
+ safe_invoke(Pid, FunOrMFA), %% we don't care about any error
ok;
-invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result([Pid], Fun);
+invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
+ invoke_no_result([Pid], FunOrMFA);
-invoke_no_result([], _Fun) -> %% optimisation
+invoke_no_result([], _FunOrMFA) -> %% optimisation
ok;
-invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
- safe_invoke(Pid, Fun), %% must not die
+invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, FunOrMFA), %% must not die
ok;
-invoke_no_result(Pids, Fun) when is_list(Pids) ->
+invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
- {invoke, Fun, Grouped})
+ {invoke, FunOrMFA, Grouped})
end,
- safe_invoke(LocalPids, Fun), %% must not die
+ safe_invoke(LocalPids, FunOrMFA), %% must not die
ok.
call(PidOrPids, Msg) ->
- invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+ invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}).
cast(PidOrPids, Msg) ->
- invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+ invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}).
%%----------------------------------------------------------------------------
@@ -144,26 +143,30 @@ delegate(RemoteNodes) ->
Name -> Name
end.
-safe_invoke(Pids, Fun) when is_list(Pids) ->
- [safe_invoke(Pid, Fun) || Pid <- Pids];
-safe_invoke(Pid, Fun) when is_pid(Pid) ->
+safe_invoke(Pids, FunOrMFA) when is_list(Pids) ->
+ [safe_invoke(Pid, FunOrMFA) || Pid <- Pids];
+safe_invoke(Pid, FunOrMFA) when is_pid(Pid) ->
try
- {ok, Pid, Fun(Pid)}
+ {ok, Pid, apply1(FunOrMFA, Pid)}
catch Class:Reason ->
{error, Pid, {Class, Reason, erlang:get_stacktrace()}}
end.
+apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]);
+apply1(Fun, Arg) -> Fun(Arg).
+
%%----------------------------------------------------------------------------
init([]) ->
{ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, Node) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
+handle_call({invoke, FunOrMFA, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), Node,
+ hibernate}.
-handle_cast({invoke, Fun, Grouped}, Node) ->
- safe_invoke(orddict:fetch(Node, Grouped), Fun),
+handle_cast({invoke, FunOrMFA, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA),
{noreply, Node, hibernate}.
handle_info(_Info, Node) ->
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 3b16c53a..d5f51db0 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -212,9 +212,8 @@ start_link0(Prefix, Group, Init) ->
init(Mod, Args) ->
case Mod:init(Args) of
{ok, {{Bad, _, _}, _ChildSpecs}} when
- Bad =:= simple_one_for_one orelse
- Bad =:= simple_one_for_one_terminate -> erlang:error(badarg);
- Init -> Init
+ Bad =:= simple_one_for_one -> erlang:error(badarg);
+ Init -> Init
end.
start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2baec885..a9974711 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -588,6 +588,7 @@ boot_delegate() ->
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
+ rabbit_policy:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 90be4f80..d54c2a8d 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -74,9 +74,7 @@ check_vhost_access(User = #user{ username = Username,
true -> Module:check_vhost_access(User, VHostPath)
end
end,
- "~s failed checking vhost access to ~s for ~s: ~p~n",
- [Module, VHostPath, Username],
- "access to vhost '~s' refused for user '~s'",
+ Module, "access to vhost '~s' refused for user '~s'",
[VHostPath, Username]).
check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
@@ -87,15 +85,14 @@ check_resource_access(User = #user{username = Username, auth_backend = Module},
Resource, Permission) ->
check_access(
fun() -> Module:check_resource_access(User, Resource, Permission) end,
- "~s failed checking resource access to ~p for ~s: ~p~n",
- [Module, Resource, Username],
- "access to ~s refused for user '~s'",
+ Module, "access to ~s refused for user '~s'",
[rabbit_misc:rs(Resource), Username]).
-check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
+check_access(Fun, Module, ErrStr, ErrArgs) ->
Allow = case Fun() of
- {error, _} = E ->
- rabbit_log:error(ErrStr, ErrArgs ++ [E]),
+ {error, E} ->
+ rabbit_log:error(ErrStr ++ " by ~s: ~p~n",
+ ErrArgs ++ [Module, E]),
false;
Else ->
Else
@@ -104,5 +101,5 @@ check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) ->
true ->
ok;
false ->
- rabbit_misc:protocol_error(access_refused, RefStr, RefArgs)
+ rabbit_misc:protocol_error(access_refused, ErrStr, ErrArgs)
end.
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 6607c4f6..cd1d125b 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -37,7 +37,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]).
-spec(set_alarm/1 :: (any()) -> 'ok').
-spec(clear_alarm/1 :: (any()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -93,8 +93,8 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, AlertMFA}, State) ->
- {ok, 0 < dict:size(State#alarms.alarmed_nodes),
+handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
+ {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
@@ -104,11 +104,20 @@ handle_call(_Request, State) ->
{ok, not_understood, State}.
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
- handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]});
+ case lists:member(Alarm, Alarms) of
+ true -> {ok, State};
+ false -> UpdatedAlarms = lists:usort([Alarm|Alarms]),
+ handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms})
+ end;
handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
- handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1,
- Alarms)});
+ case lists:keymember(Alarm, 1, Alarms) of
+ true -> handle_clear_alarm(
+ Alarm, State#alarms{alarms = lists:keydelete(
+ Alarm, 1, Alarms)});
+ false -> {ok, State}
+
+ end;
handle_event({node_up, Node}, State) ->
%% Must do this via notify and not call to avoid possible deadlock.
@@ -118,7 +127,7 @@ handle_event({node_up, Node}, State) ->
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -141,45 +150,36 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+dict_append(Key, Val, Dict) ->
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ dict:store(Key, lists:usort([Val|L]), Dict).
+
dict_unappend_all(Key, _Val, Dict) ->
dict:erase(Key, Dict).
dict_unappend(Key, Val, Dict) ->
- case lists:delete(Val, dict:fetch(Key, Dict)) of
+ L = case dict:find(Key, Dict) of
+ {ok, V} -> V;
+ error -> []
+ end,
+
+ case lists:delete(Val, L) of
[] -> dict:erase(Key, Dict);
X -> dict:store(Key, X, Dict)
end.
-count_dict_values(Val, Dict) ->
- dict:fold(fun (_Node, List, Count) ->
- Count + case lists:member(Val, List) of
- true -> 1;
- false -> 0
- end
- end, 0, Dict).
-
-maybe_alert(UpdateFun, Node, Source,
+maybe_alert(UpdateFun, Node, Source, Alert,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
- BeforeSz = count_dict_values(Source, AN),
- AfterSz = count_dict_values(Source, AN1),
-
- %% If we have changed our alarm state, inform the remotes.
- IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz ->
- ok = alert_remote(true, Alertees, Source);
- IsLocal andalso BeforeSz > AfterSz ->
- ok = alert_remote(false, Alertees, Source);
- true ->
- ok
- end,
- %% If the overall alarm state has changed, inform the locals.
- case {dict:size(AN), dict:size(AN1)} of
- {0, 1} -> ok = alert_local(true, Alertees, Source);
- {1, 0} -> ok = alert_local(false, Alertees, Source);
- {_, _} -> ok
+ case node() of
+ Node -> ok = alert_remote(Alert, Alertees, Source);
+ _ -> ok
end,
+ ok = alert_local(Alert, Alertees, Source),
State#alarms{alarmed_nodes = AN1}.
alert_local(Alert, Alertees, Source) ->
@@ -214,7 +214,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict:append/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -229,7 +229,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_alarm({resource_limit, Source, Node}, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)};
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 0515e82e..74ae59da 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -47,6 +47,6 @@ start_child(Node, Args) ->
supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
+ {ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index a7e8fb36..5ab22e75 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -31,9 +31,8 @@
%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
%% apparently, by OpenAMQ.
-%% TODO: once the minimum erlang becomes R13B03, reimplement this
-%% using the binary module - that makes use of BIFs to do binary
-%% matching and will thus be much faster.
+%% TODO: reimplement this using the binary module? - that makes use of
+%% BIFs to do binary matching and will thus be much faster.
description() ->
[{description, <<"SASL PLAIN authentication mechanism">>}].
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 1d9ba48b..e2c255db 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,6 +43,6 @@ start_channel(Pid, Args) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{channel_sup, {rabbit_channel_sup, start_link, []},
temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index d6536e16..843bb615 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -49,8 +49,9 @@ start_link_worker(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, {Callback, worker}).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}};
init({{M,F,A}, worker}) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{client, {M,F,A}, temporary, ?MAX_WAIT, worker, [M]}]}}.
+
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0b666a36..6f36f99d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,8 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]).
+-export([start/0, stop/0, parse_arguments/2, action/5,
+ sync_queue/1, cancel_sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -25,12 +26,16 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(PRIORITY_OPT, "--priority").
+-define(APPLY_TO_OPT, "--apply-to").
-define(RAM_OPT, "--ram").
-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(PRIORITY_DEF, {?PRIORITY_OPT, {option, "0"}}).
+-define(APPLY_TO_DEF, {?APPLY_TO_OPT, {option, "all"}}).
-define(RAM_DEF, {?RAM_OPT, flag}).
-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
@@ -72,7 +77,7 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
- {set_policy, [?VHOST_DEF]},
+ {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
@@ -127,19 +132,13 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
end,
- Opts1 = [case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
- _ -> {K, V}
- end || {K, V} <- Opts],
- Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
- Node = proplists:get_value(?NODE_OPT, Opts1),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts),
+ Node = proplists:get_value(?NODE_OPT, Opts),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -230,6 +229,19 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
rabbit_misc:quit(1).
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
%%----------------------------------------------------------------------------
action(stop, Node, Args, _Opts, Inform) ->
@@ -484,16 +496,15 @@ action(list_parameters, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
-action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
- when Prio == [] orelse length(Prio) == 1 ->
- Msg = "Setting policy ~p for pattern ~p to ~p",
- {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
- [P] -> {Msg ++ " with priority ~s", P}
- end,
+action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
+ Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p",
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
- rpc_call(Node, rabbit_policy, parse_set,
- [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
+ PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
+ ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
+ Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
+ rpc_call(
+ Node, rabbit_policy, parse_set,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]);
action(clear_policy, Node, [Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 9002514f..a7ee3276 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -35,8 +35,10 @@
{rabbit_types:username(), rabbit_types:password()}),
rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ rabbit_types:ok_or_error2(
+ {rabbit_types:user(), rabbit_framing:amqp_table()},
+ 'broker_not_found_on_node' | 'auth_failure' |
+ 'access_refused')).
-spec(start_channel/9 ::
(rabbit_channel:channel_number(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
@@ -76,21 +78,23 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
end;
connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid,
- Infos);
+ connect0(fun () -> rabbit_access_control:check_user_pass_login(
+ Username, Password) end,
+ VHost, Protocol, Pid, Infos);
connect(Username, VHost, Protocol, Pid, Infos) ->
- connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos).
+ connect0(fun () -> rabbit_access_control:check_user_login(
+ Username, []) end,
+ VHost, Protocol, Pid, Infos).
-connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) ->
+connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
- true ->
- case rabbit_access_control:FunctionName(U, P) of
- {ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
- {refused, _M, _A} -> {error, auth_failure}
- end;
- false ->
- {error, broker_not_found_on_node}
+ true -> case AuthFun() of
+ {ok, User} -> connect(User, VHost, Protocol, Pid,
+ Infos);
+ {refused, _M, _A} -> {error, auth_failure}
+ end;
+ false -> {error, broker_not_found_on_node}
end.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 4d3ddc79..a713d76b 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -141,8 +141,6 @@ notify_if(true, Type, Props) -> notify(Type, Props);
notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) ->
- %% TODO: switch to os:timestamp() when we drop support for
- %% Erlang/OTP < R13B01
gen_event:notify(?MODULE, #event{type = Type,
props = Props,
- timestamp = now()}).
+ timestamp = os:timestamp()}).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index df9baed9..fac74edb 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -19,6 +19,8 @@
-export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]).
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -51,6 +53,10 @@
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,{_, _}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -88,6 +94,15 @@ pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
resume_monitor({_Sender, none}) -> ok;
resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok.
+system_continue(_Parent, Deb, {Params, State}) ->
+ heartbeater(Params, Deb, State).
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
%%----------------------------------------------------------------------------
start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
{ok, none};
@@ -98,17 +113,27 @@ start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
heartbeater(Params) ->
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
- {StatVal, SameCount}) ->
- Recurse = fun (V) -> heartbeater(Params, V) end,
+ Deb, {StatVal, SameCount} = State) ->
+ Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end,
+ System = fun (From, Req) ->
+ sys:handle_system_msg(
+ Req, From, self(), ?MODULE, Deb, {Params, State})
+ end,
receive
- pause -> receive
- resume -> Recurse({0, 0});
- Other -> exit({unexpected_message, Other})
- end;
- Other -> exit({unexpected_message, Other})
+ pause ->
+ receive
+ resume -> Recurse({0, 0});
+ {system, From, Req} -> System(From, Req);
+ Other -> exit({unexpected_message, Other})
+ end;
+ {system, From, Req} ->
+ System(From, Req);
+ Other ->
+ exit({unexpected_message, Other})
after TimeoutMillisec ->
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 495f6fdd..b8d8023e 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -43,17 +43,6 @@
-define(DEFAULT_UPDATE_INTERVAL, 2500).
-define(TABLE_NAME, ?MODULE).
-%% Because we have a feedback loop here, we need to ensure that we
-%% have some space for when the queues don't quite respond as fast as
-%% we would like, or when there is buffering going on in other parts
-%% of the system. In short, we aim to stay some distance away from
-%% when the memory alarms will go off, which cause backpressure (of
-%% some sort) on producers. Note that all other Thresholds are
-%% relative to this scaling.
--define(MEMORY_LIMIT_SCALING, 0.4).
-
--define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this
-
%% If all queues are pushed to disk (duration 0), then the sum of
%% their reported lengths will be 0. If memory then becomes available,
%% unless we manually intervene, the sum will remain 0, and the queues
@@ -207,7 +196,9 @@ internal_update(State = #state { queue_durations = Durations,
desired_duration = DesiredDurationAvg,
queue_duration_sum = Sum,
queue_duration_count = Count }) ->
- MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(),
+ {ok, LimitThreshold} =
+ application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
+ MemoryLimit = vm_memory_monitor:get_memory_limit(),
MemoryRatio = case MemoryLimit > 0.0 of
true -> erlang:memory(total) / MemoryLimit;
false -> infinity
@@ -215,7 +206,7 @@ internal_update(State = #state { queue_durations = Durations,
DesiredDurationAvg1 =
if MemoryRatio =:= infinity ->
0.0;
- MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 ->
+ MemoryRatio < LimitThreshold orelse Count == 0 ->
infinity;
MemoryRatio < ?SUM_INC_THRESHOLD ->
((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 335b7c81..eded0411 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -204,8 +204,6 @@ start_child(Name, MirrorNode, Q) ->
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
- rabbit_event:notify(queue_mirror_deaths, [{name, QueueName},
- {pids, DeadPids}]),
rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
[rabbit_misc:rs(QueueName),
case IsMaster of
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 54ee7129..b74ae3a5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -41,15 +41,13 @@
%%----------------------------------------------------------------------------
--define(CREATION_EVENT_KEYS,
+-define(INFO_KEYS,
[pid,
name,
master_pid,
is_synchronised
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -124,8 +122,6 @@ init(Q = #amqqueue { name = QName }) ->
depth_delta = undefined
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
{ok, State, hibernate,
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 43dbb4e9..6fba99db 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -31,7 +31,7 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
+ {ok, {{simple_one_for_one, 10, 10},
[{rabbit_mirror_queue_slave,
{rabbit_mirror_queue_slave, start_link, []},
temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 5fa29b7e..85958400 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -396,7 +396,7 @@ cluster_status(WhichNodes) ->
node_info() ->
{erlang:system_info(otp_release), rabbit_misc:version(),
- delegate_beam_hash(), cluster_status_from_mnesia()}.
+ cluster_status_from_mnesia()}.
node_type() ->
DiscNodes = cluster_nodes(disc),
@@ -459,10 +459,11 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
%% about the cluster
case NodeType of
ram -> start_mnesia(),
- change_extra_db_nodes(ClusterNodes, false),
- rabbit_table:wait_for_replicated();
+ change_extra_db_nodes(ClusterNodes, false);
disc -> ok
end,
+ %% ...and all nodes will need to wait for tables
+ rabbit_table:wait_for_replicated(),
ok.
init_db_with_mnesia(ClusterNodes, NodeType,
@@ -569,16 +570,16 @@ check_cluster_consistency(Node) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
{error, not_found};
- {_OTP, _Rabbit, _Hash, {error, _}} ->
+ {_OTP, _Rabbit, {error, _}} ->
{error, not_found};
- {_OTP, Rabbit, _Status} ->
- %% pre-2013/04 format implies version mismatch
- version_error("Rabbit", rabbit_misc:version(), Rabbit);
- {OTP, Rabbit, Hash, {ok, Status}} ->
- case check_consistency(OTP, Rabbit, Hash, Node, Status) of
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
- end
+ end;
+ {_OTP, Rabbit, _Hash, _Status} ->
+ %% delegate hash checking implies version mismatch
+ version_error("Rabbit", rabbit_misc:version(), Rabbit)
end.
%%--------------------------------------------------------------------
@@ -742,17 +743,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-check_consistency(OTP, Rabbit, Hash) ->
+check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
- check_rabbit_consistency(Rabbit),
- check_beam_compatibility(Hash)]).
+ check_rabbit_consistency(Rabbit)]).
-check_consistency(OTP, Rabbit, Hash, Node, Status) ->
+check_consistency(OTP, Rabbit, Node, Status) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP),
check_rabbit_consistency(Rabbit),
- check_beam_compatibility(Hash),
check_nodes_consistency(Node, Status)]).
check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
@@ -783,31 +782,11 @@ version_error(Name, This, Remote) ->
check_otp_consistency(Remote) ->
check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
-%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be
-%% removed after 3.1.0 is released.
-check_rabbit_consistency("3.0.0") ->
- version_error("Rabbit", rabbit_misc:version(), "3.0.0");
-
check_rabbit_consistency(Remote) ->
check_version_consistency(
rabbit_misc:version(), Remote, "Rabbit",
fun rabbit_misc:version_minor_equivalent/2).
-check_beam_compatibility(RemoteHash) ->
- case RemoteHash == delegate_beam_hash() of
- true -> ok;
- false -> {error, {incompatible_bytecode,
- "Incompatible Erlang bytecode found on nodes"}}
- end.
-
-%% The delegate module sends functions across the cluster; if it is
-%% out of sync (say due to mixed compilers), we will get badfun
-%% exceptions when trying to do so. Let's detect that at startup.
-delegate_beam_hash() ->
- {delegate, Obj, _} = code:get_object_code(delegate),
- {ok, {delegate, Hash}} = beam_lib:md5(Obj),
- Hash.
-
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
%% mnesia tables aren't there because restarted RAM nodes won't have
@@ -833,12 +812,13 @@ find_good_node([]) ->
none;
find_good_node([Node | Nodes]) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _Reason} -> find_good_node(Nodes);
- {_OTP, _Rabbit, _} -> find_good_node(Nodes);
- {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of
- {error, _} -> find_good_node(Nodes);
- ok -> {ok, Node}
- end
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ %% old delegate hash check
+ {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 6355f935..948d2ab0 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -121,7 +121,6 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
fmt_missing("dependencies", MissingDeps)})
end,
write_enabled_plugins(PluginsFile, NewEnabled),
- maybe_warn_mochiweb(NewImplicitlyEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
@@ -263,25 +262,6 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-maybe_warn_mochiweb(Enabled) ->
- V = erlang:system_info(otp_release),
- case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
- true ->
- Stars = string:copies("*", 80),
- io:format("~n~n~s~n"
- " Warning: Mochiweb enabled and Erlang version ~s "
- "detected.~n"
- " Enabling plugins that depend on Mochiweb is not "
- "supported on this Erlang~n"
- " version. At least R13B01 is required.~n~n"
- " RabbitMQ will not start successfully in this "
- "configuration. You *must*~n"
- " disable the Mochiweb plugin, or upgrade Erlang.~n"
- "~s~n~n~n", [Stars, V, Stars]);
- false ->
- ok
- end.
-
report_change() ->
io:format("Plugin configuration has changed. "
"Restart RabbitMQ for changes to take effect.~n").
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 91ca88dd..0785d278 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -25,9 +25,10 @@
-import(rabbit_misc, [pget/2]).
-export([register/0]).
+-export([invalidate/0, recover/0]).
-export([name/1, get/2, set/1]).
-export([validate/4, notify/4, notify_clear/3]).
--export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
+-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
-rabbit_boot_step({?MODULE,
@@ -51,6 +52,10 @@ set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
+set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
+set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Ps)}).
+
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
@@ -68,32 +73,70 @@ get0(Name, List) -> case pget(definition, List) of
%%----------------------------------------------------------------------------
-parse_set(VHost, Name, Pattern, Definition, undefined) ->
- parse_set0(VHost, Name, Pattern, Definition, 0);
-parse_set(VHost, Name, Pattern, Definition, Priority) ->
+%% Gets called during upgrades - therefore must not assume anything about the
+%% state of Mnesia
+invalidate() ->
+ rabbit_file:write_file(invalid_file(), <<"">>).
+
+recover() ->
+ case rabbit_file:is_file(invalid_file()) of
+ true -> recover0(),
+ rabbit_file:delete(invalid_file());
+ false -> ok
+ end.
+
+%% To get here we have to have just completed an Mnesia upgrade - i.e. we are
+%% the first node starting. So we can rewrite the whole database. Note that
+%% recovery has not yet happened; we must work with the rabbit_durable_<thing>
+%% variants.
+recover0() ->
+ Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
+ Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
+ Policies = list(),
+ [rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
+ end) || X <- Xs],
+ [rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
+ end) || Q <- Qs],
+ ok.
+
+invalid_file() ->
+ filename:join(rabbit_mnesia:dir(), "policies_are_invalid").
+
+%%----------------------------------------------------------------------------
+
+parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
try list_to_integer(Priority) of
- Num -> parse_set0(VHost, Name, Pattern, Definition, Num)
+ Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo)
catch
error:badarg -> {error, "~p priority must be a number", [Priority]}
end.
-parse_set0(VHost, Name, Pattern, Defn, Priority) ->
+parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
case rabbit_misc:json_decode(Defn) of
{ok, JSON} ->
set0(VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
{<<"definition">>, rabbit_misc:json_to_term(JSON)},
- {<<"priority">>, Priority}]);
+ {<<"priority">>, Priority},
+ {<<"apply-to">>, ApplyTo}]);
error ->
{error_string, "JSON decoding error"}
end.
-set(VHost, Name, Pattern, Definition, Priority) ->
+set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition},
{<<"priority">>, case Priority of
undefined -> 0;
_ -> Priority
+ end},
+ {<<"apply-to">>, case ApplyTo of
+ undefined -> <<"all">>;
+ _ -> ApplyTo
end}],
set0(VHost, Name, PolicyProps).
@@ -130,6 +173,7 @@ p(Parameter, DefnFun) ->
[{vhost, pget(vhost, Parameter)},
{name, pget(name, Parameter)},
{pattern, pget(<<"pattern">>, Value)},
+ {'apply-to', pget(<<"apply-to">>, Value)},
{definition, DefnFun(pget(<<"definition">>, Value))},
{priority, pget(<<"priority">>, Value)}].
@@ -139,7 +183,7 @@ format(Term) ->
ident(X) -> X.
-info_keys() -> [vhost, name, pattern, definition, priority].
+info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
%%----------------------------------------------------------------------------
@@ -202,8 +246,16 @@ match(Name, Policies) ->
[Policy | _Rest] -> Policy
end.
-matches(#resource{name = Name}, Policy) ->
- match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]).
+matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
+ matches_type(Kind, pget('apply-to', Policy)) andalso
+ match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
+ VHost =:= pget(vhost, Policy).
+
+matches_type(exchange, <<"exchanges">>) -> true;
+matches_type(queue, <<"queues">>) -> true;
+matches_type(exchange, <<"all">>) -> true;
+matches_type(queue, <<"all">>) -> true;
+matches_type(_, _) -> false.
sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
@@ -212,6 +264,7 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
policy_validation() ->
[{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
{<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"apply-to">>, fun apply_to_validation/2, optional},
{<<"definition">>, fun validation/2, mandatory}].
validation(_Name, []) ->
@@ -257,3 +310,10 @@ a2b(A) -> list_to_binary(atom_to_list(A)).
dups(L) -> L -- lists:usort(L).
is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
+
+apply_to_validation(_Name, <<"all">>) -> ok;
+apply_to_validation(_Name, <<"exchanges">>) -> ok;
+apply_to_validation(_Name, <<"queues">>) -> ok;
+apply_to_validation(_Name, Term) ->
+ {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' "
+ "or 'all'", [Term]}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0908fb73..f69d8355 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,7 +23,7 @@
-export([scan/3]).
--export([add_queue_ttl/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -123,9 +123,9 @@
-define(REL_SEQ_BITS, 14).
-define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
-%% seq only is binary 00 followed by 14 bits of rel seq id
+%% seq only is binary 01 followed by 14 bits of rel seq id
%% (range: 0 - 16383)
--define(REL_SEQ_ONLY_PREFIX, 00).
+-define(REL_SEQ_ONLY_PREFIX, 01).
-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
@@ -171,6 +171,7 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-ifdef(use_specs).
@@ -715,7 +716,11 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- {ok, Bin} ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
+ State;
+ {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
{MsgId, MsgProps} = parse_pub_record_body(Bin),
IsPersistent = case Prefix of
?PUB_PERSIST_JPREFIX -> true;
@@ -1068,6 +1073,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
add_queue_ttl_segment(_) ->
stop.
+avoid_zeroes() ->
+ foreach_queue_index({none, fun avoid_zeroes_segment/1}).
+
+avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest};
+avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+avoid_zeroes_segment(_) ->
+ stop.
+
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
@@ -1092,7 +1112,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
|| Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
-transform_file(Path, Fun) ->
+transform_file(_Path, none) ->
+ ok;
+transform_file(Path, Fun) when is_function(Fun)->
PathTmp = Path ++ ".upgrade",
case rabbit_file:file_size(Path) of
0 -> ok;
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9b6039d1..9c902703 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -45,7 +45,8 @@
client_properties, capabilities,
auth_mechanism, auth_state}).
--record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
+-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
+ blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -120,7 +121,7 @@ init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) ->
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
+ mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -142,8 +143,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, _Source, Conserve) ->
- Pid ! {conserve_resources, Conserve},
+conserve_resources(Pid, Source, Conserve) ->
+ Pid ! {conserve_resources, Source, Conserve},
ok.
server_properties(Protocol) ->
@@ -178,7 +179,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
[{<<"publisher_confirms">>, bool, true},
{<<"exchange_exchange_bindings">>, bool, true},
{<<"basic.nack">>, bool, true},
- {<<"consumer_cancel_notify">>, bool, true}];
+ {<<"consumer_cancel_notify">>, bool, true},
+ {<<"connection.blocked">>, bool, true}];
server_capabilities(_) ->
[].
@@ -246,9 +248,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
throttle = #throttle{
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never}},
+ alarmed_by = [],
+ last_blocked_by = none,
+ last_blocked_at = never,
+ blocked_sent = false}},
try
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -321,9 +324,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.
-handle_other({conserve_resources, Conserve},
- State = #v1{throttle = Throttle}) ->
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+handle_other({conserve_resources, Source, Conserve},
+ State = #v1{throttle = Throttle =
+ #throttle{alarmed_by = CR}}) ->
+ CR1 = case Conserve of
+ true -> lists:usort([Source | CR]);
+ false -> CR -- [Source]
+ end,
+ Throttle1 = Throttle#throttle{alarmed_by = CR1},
control_throttle(State#v1{throttle = Throttle1});
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
@@ -409,30 +417,61 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, (Throttle#throttle.conserve_resources orelse
- credit_flow:blocked())} of
+ IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- State#v1{connection_state = running};
+ maybe_send_unblocked(State),
+ State#v1{connection_state = running,
+ throttle = Throttle#throttle{
+ blocked_sent = false}};
{blocked, true} -> State#v1{throttle = update_last_blocked_by(
Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
+maybe_block(State = #v1{connection_state = blocking,
+ throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ Sent = maybe_send_blocked(State),
State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now()})};
+ Throttle#throttle{last_blocked_at = erlang:now(),
+ blocked_sent = Sent})};
maybe_block(State) ->
State.
-update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
- Throttle#throttle{last_blocked_by = resource};
-update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
- Throttle#throttle{last_blocked_by = flow}.
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) ->
+ false;
+maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
+ connection = #connection{
+ protocol = Protocol,
+ capabilities = Capabilities},
+ sock = Sock}) ->
+ case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
+ {bool, true} ->
+ RStr = string:join([atom_to_list(A) || A <- CR], " & "),
+ Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])),
+ ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
+ Protocol),
+ true;
+ _ ->
+ false
+ end.
+
+maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
+ ok;
+maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
+ ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
+
+update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->
+ Throttle#throttle{last_blocked_by = flow};
+update_last_blocked_by(Throttle) ->
+ Throttle#throttle{last_blocked_by = resource}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -847,7 +886,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
- Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ Throttle1 = Throttle#throttle{alarmed_by = Conserve},
{ok, ChannelSupSupPid} =
supervisor2:start_child(
ChSup3Pid,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 30cf9114..5af4969a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -60,6 +60,7 @@ all_tests() ->
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_policy_validation(),
+ passed = test_policy_opts_validation(),
passed = test_ha_policy_validation(),
passed = test_server_status(),
passed = test_amqp_connection_refusal(),
@@ -1083,29 +1084,57 @@ test_runtime_parameters() ->
test_policy_validation() ->
rabbit_runtime_parameters_test:register_policy_validator(),
- SetPol =
- fun (Key, Val) ->
- control_action(
- set_policy,
- ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
- end,
+ SetPol = fun (Key, Val) ->
+ control_action_opts(
+ ["set_policy", "name", ".*",
+ rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
+ end,
- ok = SetPol("testeven", []),
- ok = SetPol("testeven", [1, 2]),
- ok = SetPol("testeven", [1, 2, 3, 4]),
- ok = SetPol("testpos", [2, 5, 5678]),
+ ok = SetPol("testeven", []),
+ ok = SetPol("testeven", [1, 2]),
+ ok = SetPol("testeven", [1, 2, 3, 4]),
+ ok = SetPol("testpos", [2, 5, 5678]),
- {error_string, _} = SetPol("testpos", [-1, 0, 1]),
- {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+ error = SetPol("testpos", [-1, 0, 1]),
+ error = SetPol("testeven", [ 1, 2, 3]),
ok = control_action(clear_policy, ["name"]),
rabbit_runtime_parameters_test:unregister_policy_validator(),
passed.
+test_policy_opts_validation() ->
+ Set = fun (Extra) -> control_action_opts(
+ ["set_policy", "name", ".*", "{\"ha-mode\":\"all\"}"
+ | Extra]) end,
+ OK = fun (Extra) -> ok = Set(Extra) end,
+ Fail = fun (Extra) -> error = Set(Extra) end,
+
+ OK ([]),
+
+ OK (["--priority", "0"]),
+ OK (["--priority", "3"]),
+ Fail(["--priority", "banana"]),
+ Fail(["--priority"]),
+
+ OK (["--apply-to", "all"]),
+ OK (["--apply-to", "queues"]),
+ Fail(["--apply-to", "bananas"]),
+ Fail(["--apply-to"]),
+
+ OK (["--priority", "3", "--apply-to", "queues"]),
+ Fail(["--priority", "banana", "--apply-to", "queues"]),
+ Fail(["--priority", "3", "--apply-to", "bananas"]),
+
+ Fail(["--offline"]),
+
+ ok = control_action(clear_policy, ["name"]),
+ passed.
+
test_ha_policy_validation() ->
- Set = fun (JSON) -> control_action(set_policy, ["name", ".*", JSON]) end,
+ Set = fun (JSON) -> control_action_opts(
+ ["set_policy", "name", ".*", JSON]) end,
OK = fun (JSON) -> ok = Set(JSON) end,
- Fail = fun (JSON) -> {error_string, _} = Set(JSON) end,
+ Fail = fun (JSON) -> error = Set(JSON) end,
OK ("{\"ha-mode\":\"all\"}"),
Fail("{\"ha-mode\":\"made_up\"}"),
@@ -1611,6 +1640,18 @@ control_action(Command, Node, Args, Opts) ->
Other
end.
+control_action_opts(Raw) ->
+ NodeStr = atom_to_list(node()),
+ case rabbit_control_main:parse_arguments(Raw, NodeStr) of
+ {ok, {Cmd, Opts, Args}} ->
+ case control_action(Cmd, node(), Args, Opts) of
+ ok -> ok;
+ _ -> error
+ end;
+ _ ->
+ error
+ end.
+
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1613838c..d50cb282 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -44,6 +44,7 @@
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
+-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
%% -------------------------------------------------------------------
@@ -70,6 +71,7 @@
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
+-spec(policy_apply_to/0 :: () -> 'ok').
-endif.
@@ -299,6 +301,27 @@ exchange_decorators(Table) ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+policy_apply_to() ->
+ transform(
+ rabbit_runtime_parameters,
+ fun ({runtime_parameters, Key = {_VHost, <<"policy">>, _Name}, Value}) ->
+ ApplyTo = apply_to(proplists:get_value(<<"definition">>, Value)),
+ {runtime_parameters, Key, [{<<"apply-to">>, ApplyTo} | Value]};
+ ({runtime_parameters, Key, Value}) ->
+ {runtime_parameters, Key, Value}
+ end,
+ [key, value]),
+ rabbit_policy:invalidate(),
+ ok.
+
+apply_to(Def) ->
+ case [proplists:get_value(K, Def) ||
+ K <- [<<"federation-upstream-set">>, <<"ha-mode">>]] of
+ [undefined, undefined] -> <<"all">>;
+ [_, undefined] -> <<"exchanges">>;
+ [undefined, _] -> <<"queues">>;
+ [_, _] -> <<"all">>
+ end.
%%--------------------------------------------------------------------
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index c0b1f8e4..bf6964d8 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -19,6 +19,9 @@
-include("rabbit_framing.hrl").
-export([start/5, start_link/5, start/6, start_link/6]).
+
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5,
@@ -26,7 +29,7 @@
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/1, mainloop1/1]).
+-export([mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -53,6 +56,11 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
-> rabbit_types:ok(pid())).
+
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,#wstate{}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -94,12 +102,14 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -113,28 +123,44 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
pending = []},
#wstate.stats_timer).
-mainloop(State) ->
+system_continue(Parent, Deb, State) ->
+ mainloop(Deb, State#wstate{reader = Parent}).
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
+mainloop(Deb, State) ->
try
- mainloop1(State)
+ mainloop1(Deb, State)
catch
exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(State = #wstate{pending = []}) ->
+mainloop1(Deb, State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ Message -> {Deb1, State1} = handle_message(Deb, Message, State),
+ ?MODULE:mainloop1(Deb1, State1)
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [State])
+ erlang:hibernate(?MODULE, mainloop, [Deb, State])
end;
-mainloop1(State) ->
+mainloop1(Deb, State) ->
receive
- Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ Message -> {Deb1, State1} = handle_message(Deb, Message, State),
+ ?MODULE:mainloop1(Deb1, State1)
after 0 ->
- ?MODULE:mainloop1(internal_flush(State))
+ ?MODULE:mainloop1(Deb, internal_flush(State))
end.
+handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State);
+handle_message(Deb, Message, State) ->
+ {Deb, handle_message(Message, State)}.
+
handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 23bfe7f1..5a6dc887 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -1,14 +1,18 @@
-%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP
+%% This file is a copy of supervisor.erl from the R16B Erlang/OTP
%% distribution, with the following modifications:
%%
%% 1) the module name is supervisor2
%%
-%% 2) there is a new strategy called
-%% simple_one_for_one_terminate. This is exactly the same as for
-%% simple_one_for_one, except that children *are* explicitly
-%% terminated as per the shutdown component of the child_spec.
+%% 2) a find_child/2 utility function has been added
%%
-%% 3) child specifications can contain, as the restart type, a tuple
+%% 3) Added an 'intrinsic' restart type. Like the transient type, this
+%% type means the child should only be restarted if the child exits
+%% abnormally. Unlike the transient type, if the child exits
+%% normally, the supervisor itself also exits normally. If the
+%% child is a supervisor and it exits normally (i.e. with reason of
+%% 'shutdown') then the child's parent also exits normally.
+%%
+%% 4) child specifications can contain, as the restart type, a tuple
%% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay}
%% where Delay >= 0 (see point (4) below for intrinsic). The delay,
%% in seconds, indicates what should happen if a child, upon being
@@ -41,13 +45,6 @@
%% perspective it's a normal exit, whilst from supervisor's
%% perspective, it's an abnormal exit.
%%
-%% 4) Added an 'intrinsic' restart type. Like the transient type, this
-%% type means the child should only be restarted if the child exits
-%% abnormally. Unlike the transient type, if the child exits
-%% normally, the supervisor itself also exits normally. If the
-%% child is a supervisor and it exits normally (i.e. with reason of
-%% 'shutdown') then the child's parent also exits normally.
-%%
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
@@ -55,7 +52,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -75,61 +72,34 @@
-behaviour(gen_server).
%% External exports
--export([start_link/2,start_link/3,
+-export([start_link/2, start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1, find_child/2,
- check_childspecs/1]).
+ which_children/1, count_children/1,
+ find_child/2, check_childspecs/1]).
%% Internal exports
--export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
--export([handle_cast/2]).
-
--define(DICT, dict).
-
--record(state, {name,
- strategy,
- children = [],
- dynamics = ?DICT:new(),
- intensity,
- period,
- restarts = [],
- module,
- args}).
-
--record(child, {pid = undefined, % pid is undefined when child is not running
- name,
- mfa,
- restart_type,
- shutdown,
- child_type,
- modules = []}).
-
--define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse
- State#state.strategy =:= simple_one_for_one_terminate).
--define(is_terminate_simple(State),
- State#state.strategy =:= simple_one_for_one_terminate).
-
--ifdef(use_specs).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([try_again_restart/3]).
%%--------------------------------------------------------------------------
-%% Types
+-ifdef(use_specs).
+-export_type([child_spec/0, startchild_ret/0, strategy/0]).
+-endif.
%%--------------------------------------------------------------------------
--export_type([child_spec/0, startchild_ret/0, strategy/0, sup_name/0]).
-
--type child() :: 'undefined' | pid().
+-ifdef(use_specs).
+-type child() :: 'undefined' | pid().
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
--type modules() :: [module()] | 'dynamic'.
--type delay() :: non_neg_integer().
--type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic'
- | {'permanent', delay()} | {'transient', delay()}
- | {'intrinsic', delay()}.
+-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
+-type modules() :: [module()] | 'dynamic'.
+-type delay() :: non_neg_integer().
+-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' | {'permanent', delay()} | {'transient', delay()} | {'intrinsic', delay()}.
-type shutdown() :: 'brutal_kill' | timeout().
--type worker() :: 'worker' | 'supervisor'.
+-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
--type sup_ref() :: (Name :: atom())
+-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
@@ -140,40 +110,110 @@
Type :: worker(),
Modules :: modules()}.
-
-type strategy() :: 'one_for_all' | 'one_for_one'
- | 'rest_for_one' | 'simple_one_for_one'
- | 'simple_one_for_one_terminate'.
-
--type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()],
- name :: child_id(),
- mfa :: mfargs(),
- restart_type :: restart(),
- shutdown :: shutdown(),
- child_type :: worker(),
- modules :: modules()}.
-
--type state() :: #state{strategy :: strategy(),
- children :: [child_rec()],
- dynamics :: ?DICT(),
- intensity :: non_neg_integer(),
- period :: pos_integer()}.
+ | 'rest_for_one' | 'simple_one_for_one'.
+-endif.
%%--------------------------------------------------------------------------
-%% Callback behaviour
-%%--------------------------------------------------------------------------
+-ifdef(use_specs).
+-record(child, {% pid is undefined when child is not running
+ pid = undefined :: child() | {restarting,pid()} | [pid()],
+ name :: child_id(),
+ mfargs :: mfargs(),
+ restart_type :: restart(),
+ shutdown :: shutdown(),
+ child_type :: worker(),
+ modules = [] :: modules()}).
+-type child_rec() :: #child{}.
+-else.
+-record(child, {
+ pid = undefined,
+ name,
+ mfargs,
+ restart_type,
+ shutdown,
+ child_type,
+ modules = []}).
+-endif.
+
+-define(DICT, dict).
+-define(SETS, sets).
+-define(SET, set).
+
+-ifdef(use_specs).
+-record(state, {name,
+ strategy :: strategy(),
+ children = [] :: [child_rec()],
+ dynamics :: ?DICT() | ?SET(),
+ intensity :: non_neg_integer(),
+ period :: pos_integer(),
+ restarts = [],
+ module,
+ args}).
+-type state() :: #state{}.
+-else.
+-record(state, {name,
+ strategy,
+ children = [],
+ dynamics,
+ intensity,
+ period,
+ restarts = [],
+ module,
+ args}).
+-endif.
+
+-define(is_simple(State), State#state.strategy =:= simple_one_for_one).
+-define(is_permanent(R), ((R =:= permanent) orelse
+ (is_tuple(R) andalso
+ tuple_size(R) == 2 andalso
+ element(1, R) =:= permanent))).
+-define(is_explicit_restart(R),
+ R == {shutdown, restart}).
+
+-ifdef(use_specs).
-callback init(Args :: term()) ->
{ok, {{RestartStrategy :: strategy(),
- MaxR :: non_neg_integer(),
- MaxT :: non_neg_integer()},
+ MaxR :: non_neg_integer(),
+ MaxT :: non_neg_integer()},
[ChildSpec :: child_spec()]}}
| ignore.
+-endif.
+-define(restarting(_Pid_), {restarting,_Pid_}).
-%%--------------------------------------------------------------------------
-%% Specs
-%%--------------------------------------------------------------------------
+%%% ---------------------------------------------------
+%%% This is a general process supervisor built upon gen_server.erl.
+%%% Servers/processes should/could also be built using gen_server.erl.
+%%% SupName = {local, atom()} | {global, atom()}.
+%%% ---------------------------------------------------
+-ifdef(use_specs).
+-type startlink_err() :: {'already_started', pid()}
+ | {'shutdown', term()}
+ | term().
+-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
+-spec start_link(Module, Args) -> startlink_ret() when
+ Module :: module(),
+ Args :: term().
+
+-endif.
+start_link(Mod, Args) ->
+ gen_server:start_link(?MODULE, {self, Mod, Args}, []).
+
+-ifdef(use_specs).
+-spec start_link(SupName, Module, Args) -> startlink_ret() when
+ SupName :: sup_name(),
+ Module :: module(),
+ Args :: term().
+-endif.
+start_link(SupName, Mod, Args) ->
+ gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
+
+%%% ---------------------------------------------------
+%%% Interface functions.
+%%% ---------------------------------------------------
+-ifdef(use_specs).
-type startchild_err() :: 'already_present'
| {'already_started', Child :: child()} | term().
-type startchild_ret() :: {'ok', Child :: child()}
@@ -183,91 +223,30 @@
-spec start_child(SupRef, ChildSpec) -> startchild_ret() when
SupRef :: sup_ref(),
ChildSpec :: child_spec() | (List :: [term()]).
+-endif.
+start_child(Supervisor, ChildSpec) ->
+ call(Supervisor, {start_child, ChildSpec}).
+-ifdef(use_specs).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
Id :: child_id(),
Result :: {'ok', Child :: child()}
| {'ok', Child :: child(), Info :: term()}
| {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one' | term().
+ Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one' |
+ term().
+-endif.
+restart_child(Supervisor, Name) ->
+ call(Supervisor, {restart_child, Name}).
+-ifdef(use_specs).
-spec delete_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
Id :: child_id(),
Result :: 'ok' | {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one'.
-
--spec terminate_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: pid() | child_id(),
- Result :: 'ok' | {'error', Error},
- Error :: 'not_found' | 'simple_one_for_one'.
-
--spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
- SupRef :: sup_ref(),
- Id :: child_id() | 'undefined',
- Child :: child(),
- Type :: worker(),
- Modules :: modules().
-
--spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
- Result :: 'ok' | {'error', Error :: term()}.
-
--type init_sup_name() :: sup_name() | 'self'.
-
--type stop_rsn() :: 'shutdown' | {'bad_return', {module(),'init', term()}}
- | {'bad_start_spec', term()} | {'start_spec', term()}
- | {'supervisor_data', term()}.
-
--spec init({init_sup_name(), module(), [term()]}) ->
- {'ok', state()} | 'ignore' | {'stop', stop_rsn()}.
-
--type call() :: 'which_children'.
--spec handle_call(call(), term(), state()) -> {'reply', term(), state()}.
-
--spec handle_cast('null', state()) -> {'noreply', state()}.
-
--spec handle_info(term(), state()) ->
- {'noreply', state()} | {'stop', 'shutdown', state()}.
-
--spec terminate(term(), state()) -> 'ok'.
-
--spec code_change(term(), state(), term()) ->
- {'ok', state()} | {'error', term()}.
-
--else.
-
--export([behaviour_info/1]).
-
-behaviour_info(callbacks) ->
- [{init,1}];
-behaviour_info(_Other) ->
- undefined.
-
+ Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one'.
-endif.
-
-%%% ---------------------------------------------------
-%%% This is a general process supervisor built upon gen_server.erl.
-%%% Servers/processes should/could also be built using gen_server.erl.
-%%% SupName = {local, atom()} | {global, atom()}.
-%%% ---------------------------------------------------
-start_link(Mod, Args) ->
- gen_server:start_link(?MODULE, {self, Mod, Args}, []).
-
-start_link(SupName, Mod, Args) ->
- gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
-%%% ---------------------------------------------------
-%%% Interface functions.
-%%% ---------------------------------------------------
-start_child(Supervisor, ChildSpec) ->
- call(Supervisor, {start_child, ChildSpec}).
-
-restart_child(Supervisor, Name) ->
- call(Supervisor, {restart_child, Name}).
-
delete_child(Supervisor, Name) ->
call(Supervisor, {delete_child, Name}).
@@ -277,12 +256,44 @@ delete_child(Supervisor, Name) ->
%% Note that the child is *always* terminated in some
%% way (maybe killed).
%%-----------------------------------------------------------------
+-ifdef(use_specs).
+-spec terminate_child(SupRef, Id) -> Result when
+ SupRef :: sup_ref(),
+ Id :: pid() | child_id(),
+ Result :: 'ok' | {'error', Error},
+ Error :: 'not_found' | 'simple_one_for_one'.
+-endif.
terminate_child(Supervisor, Name) ->
call(Supervisor, {terminate_child, Name}).
+-ifdef(use_specs).
+-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
+ SupRef :: sup_ref(),
+ Id :: child_id() | undefined,
+ Child :: child() | 'restarting',
+ Type :: worker(),
+ Modules :: modules().
+-endif.
which_children(Supervisor) ->
call(Supervisor, which_children).
+-ifdef(use_specs).
+-spec count_children(SupRef) -> PropListOfCounts when
+ SupRef :: sup_ref(),
+ PropListOfCounts :: [Count],
+ Count :: {specs, ChildSpecCount :: non_neg_integer()}
+ | {active, ActiveProcessCount :: non_neg_integer()}
+ | {supervisors, ChildSupervisorCount :: non_neg_integer()}
+ |{workers, ChildWorkerCount :: non_neg_integer()}.
+-endif.
+count_children(Supervisor) ->
+ call(Supervisor, count_children).
+
+-ifdef(use_specs).
+-spec find_child(Supervisor, Name) -> [pid()] when
+ Supervisor :: sup_ref(),
+ Name :: child_id().
+-endif.
find_child(Supervisor, Name) ->
[Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
Name1 =:= Name].
@@ -290,6 +301,11 @@ find_child(Supervisor, Name) ->
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
+-ifdef(use_specs).
+-spec check_childspecs(ChildSpecs) -> Result when
+ ChildSpecs :: [child_spec()],
+ Result :: 'ok' | {'error', Error :: term()}.
+-endif.
check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
case check_startspec(ChildSpecs) of
{ok, _} -> ok;
@@ -297,11 +313,37 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.
+%%%-----------------------------------------------------------------
+%%% Called by timer:apply_after from restart/2
+-ifdef(use_specs).
+-spec try_again_restart(SupRef, Child, Reason) -> ok when
+ SupRef :: sup_ref(),
+ Child :: child_id() | pid(),
+ Reason :: term().
+-endif.
+try_again_restart(Supervisor, Child, Reason) ->
+ cast(Supervisor, {try_again_restart, Child, Reason}).
+
+cast(Supervisor, Req) ->
+ gen_server:cast(Supervisor, Req).
+
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
+-ifdef(use_specs).
+-type init_sup_name() :: sup_name() | 'self'.
+
+-type stop_rsn() :: {'shutdown', term()}
+ | {'bad_return', {module(),'init', term()}}
+ | {'bad_start_spec', term()}
+ | {'start_spec', term()}
+ | {'supervisor_data', term()}.
+
+-spec init({init_sup_name(), module(), [term()]}) ->
+ {'ok', state()} | 'ignore' | {'stop', stop_rsn()}.
+-endif.
init({SupName, Mod, Args}) ->
process_flag(trap_exit, true),
case Mod:init(Args) of
@@ -327,9 +369,9 @@ init_children(State, StartSpec) ->
case start_children(Children, SupName) of
{ok, NChildren} ->
{ok, State#state{children = NChildren}};
- {error, NChildren} ->
+ {error, NChildren, Reason} ->
terminate_children(NChildren, SupName),
- {stop, shutdown}
+ {stop, {shutdown, Reason}}
end;
Error ->
{stop, {start_spec, Error}}
@@ -347,32 +389,35 @@ init_dynamic(_State, StartSpec) ->
%%-----------------------------------------------------------------
%% Func: start_children/2
-%% Args: Children = [#child] in start order
-%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Args: Children = [child_rec()] in start order
+%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod}
%% Purpose: Start all children. The new list contains #child's
%% with pids.
-%% Returns: {ok, NChildren} | {error, NChildren}
-%% NChildren = [#child] in termination order (reversed
+%% Returns: {ok, NChildren} | {error, NChildren, Reason}
+%% NChildren = [child_rec()] in termination order (reversed
%% start order)
%%-----------------------------------------------------------------
start_children(Children, SupName) -> start_children(Children, [], SupName).
start_children([Child|Chs], NChildren, SupName) ->
case do_start_child(SupName, Child) of
+ {ok, undefined} when Child#child.restart_type =:= temporary ->
+ start_children(Chs, NChildren, SupName);
{ok, Pid} ->
start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
{ok, Pid, _Extra} ->
start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
{error, Reason} ->
report_error(start_error, Reason, Child, SupName),
- {error, lists:reverse(Chs) ++ [Child | NChildren]}
+ {error, lists:reverse(Chs) ++ [Child | NChildren],
+ {failed_to_start_child,Child#child.name,Reason}}
end;
start_children([], NChildren, _SupName) ->
{ok, NChildren}.
do_start_child(SupName, Child) ->
- #child{mfa = {M, F, A}} = Child,
- case catch apply(M, F, A) of
+ #child{mfargs = {M, F, Args}} = Child,
+ case catch apply(M, F, Args) of
{ok, Pid} when is_pid(Pid) ->
NChild = Child#child{pid = Pid},
report_progress(NChild, SupName),
@@ -401,35 +446,54 @@ do_start_child_i(M, F, A) ->
{error, What}
end.
-
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
+-ifdef(use_specs).
+-type call() :: 'which_children' | 'count_children' | {_, _}. % XXX: refine
+-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}.
+-endif.
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
- #child{mfa = {M, F, A}} = hd(State#state.children),
+ Child = hd(State#state.children),
+ #child{mfargs = {M, F, A}} = Child,
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
- {ok, undefined} ->
- {reply, {ok, undefined}, State};
+ {ok, undefined} when Child#child.restart_type =:= temporary ->
+ {reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
- ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
- ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State),
{reply, {ok, Pid, Extra}, NState};
What ->
{reply, What, State}
end;
-%%% The requests terminate_child, delete_child and restart_child are
-%%% invalid for simple_one_for_one and simple_one_for_one_terminate
-%%% supervisors.
+%% terminate_child for simple_one_for_one can only be done with pid
+handle_call({terminate_child, Name}, _From, State) when not is_pid(Name),
+ ?is_simple(State) ->
+ {reply, {error, simple_one_for_one}, State};
+
+handle_call({terminate_child, Name}, _From, State) ->
+ case get_child(Name, State, ?is_simple(State)) of
+ {value, Child} ->
+ case do_terminate(Child, State#state.name) of
+ #child{restart_type=RT} when RT=:=temporary; ?is_simple(State) ->
+ {reply, ok, state_del_child(Child, State)};
+ NChild ->
+ {reply, ok, replace_child(NChild, State)}
+ end;
+ false ->
+ {reply, {error, not_found}, State}
+ end;
+
+%%% The requests delete_child and restart_child are invalid for
+%%% simple_one_for_one supervisors.
handle_call({_Req, _Data}, _From, State) when ?is_simple(State) ->
- {reply, {error, State#state.strategy}, State};
+ {reply, {error, simple_one_for_one}, State};
handle_call({start_child, ChildSpec}, _From, State) ->
case check_childspec(ChildSpec) of
@@ -453,6 +517,8 @@ handle_call({restart_child, Name}, _From, State) ->
Error ->
{reply, Error, State}
end;
+ {value, #child{pid=?restarting(_)}} ->
+ {reply, {error, restarting}, State};
{value, _} ->
{reply, {error, running}, State};
_ ->
@@ -464,60 +530,146 @@ handle_call({delete_child, Name}, _From, State) ->
{value, Child} when Child#child.pid =:= undefined ->
NState = remove_child(Child, State),
{reply, ok, NState};
+ {value, #child{pid=?restarting(_)}} ->
+ {reply, {error, restarting}, State};
{value, _} ->
{reply, {error, running}, State};
_ ->
{reply, {error, not_found}, State}
end;
-handle_call({terminate_child, Name}, _From, State) ->
- case get_child(Name, State) of
- {value, Child} ->
- NChild = do_terminate(Child, State#state.name),
- {reply, ok, replace_child(NChild, State)};
- _ ->
- {reply, {error, not_found}, State}
- end;
+handle_call(which_children, _From, #state{children = [#child{restart_type = temporary,
+ child_type = CT,
+ modules = Mods}]} =
+ State) when ?is_simple(State) ->
+ Reply = lists:map(fun(Pid) -> {undefined, Pid, CT, Mods} end,
+ ?SETS:to_list(dynamics_db(temporary, State#state.dynamics))),
+ {reply, Reply, State};
-handle_call(which_children, _From, State) when ?is_simple(State) ->
- [#child{child_type = CT, modules = Mods}] = State#state.children,
- Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
- ?DICT:to_list(State#state.dynamics)),
+handle_call(which_children, _From, #state{children = [#child{restart_type = RType,
+ child_type = CT,
+ modules = Mods}]} =
+ State) when ?is_simple(State) ->
+ Reply = lists:map(fun({?restarting(_),_}) -> {undefined,restarting,CT,Mods};
+ ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ ?DICT:to_list(dynamics_db(RType, State#state.dynamics))),
{reply, Reply, State};
handle_call(which_children, _From, State) ->
Resp =
- lists:map(fun (#child{pid = Pid, name = Name,
+ lists:map(fun(#child{pid = ?restarting(_), name = Name,
+ child_type = ChildType, modules = Mods}) ->
+ {Name, restarting, ChildType, Mods};
+ (#child{pid = Pid, name = Name,
child_type = ChildType, modules = Mods}) ->
- {Name, Pid, ChildType, Mods}
+ {Name, Pid, ChildType, Mods}
end,
State#state.children),
- {reply, Resp, State}.
+ {reply, Resp, State};
-%%% Hopefully cause a function-clause as there is no API function
-%%% that utilizes cast.
-handle_cast(null, State) ->
- error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
- []),
- {noreply, State}.
+handle_call(count_children, _From, #state{children = [#child{restart_type = temporary,
+ child_type = CT}]} = State)
+ when ?is_simple(State) ->
+ {Active, Count} =
+ ?SETS:fold(fun(Pid, {Alive, Tot}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true ->{Alive+1, Tot +1};
+ false ->
+ {Alive, Tot + 1}
+ end
+ end, {0, 0}, dynamics_db(temporary, State#state.dynamics)),
+ Reply = case CT of
+ supervisor -> [{specs, 1}, {active, Active},
+ {supervisors, Count}, {workers, 0}];
+ worker -> [{specs, 1}, {active, Active},
+ {supervisors, 0}, {workers, Count}]
+ end,
+ {reply, Reply, State};
-handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
+handle_call(count_children, _From, #state{children = [#child{restart_type = RType,
+ child_type = CT}]} = State)
when ?is_simple(State) ->
- {ok, NState} = do_restart(RestartType, Reason, Child, State),
- {noreply, NState};
-handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
- case get_child(Child#child.name, State) of
- {value, Child1} ->
- {ok, NState} = do_restart(RestartType, Reason, Child1, State),
- {noreply, NState};
- _ ->
+ {Active, Count} =
+ ?DICT:fold(fun(Pid, _Val, {Alive, Tot}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true ->
+ {Alive+1, Tot +1};
+ false ->
+ {Alive, Tot + 1}
+ end
+ end, {0, 0}, dynamics_db(RType, State#state.dynamics)),
+ Reply = case CT of
+ supervisor -> [{specs, 1}, {active, Active},
+ {supervisors, Count}, {workers, 0}];
+ worker -> [{specs, 1}, {active, Active},
+ {supervisors, 0}, {workers, Count}]
+ end,
+ {reply, Reply, State};
+
+handle_call(count_children, _From, State) ->
+ %% Specs and children are together on the children list...
+ {Specs, Active, Supers, Workers} =
+ lists:foldl(fun(Child, Counts) ->
+ count_child(Child, Counts)
+ end, {0,0,0,0}, State#state.children),
+
+ %% Reformat counts to a property list.
+ Reply = [{specs, Specs}, {active, Active},
+ {supervisors, Supers}, {workers, Workers}],
+ {reply, Reply, State}.
+
+
+count_child(#child{pid = Pid, child_type = worker},
+ {Specs, Active, Supers, Workers}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true -> {Specs+1, Active+1, Supers, Workers+1};
+ false -> {Specs+1, Active, Supers, Workers+1}
+ end;
+count_child(#child{pid = Pid, child_type = supervisor},
+ {Specs, Active, Supers, Workers}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true -> {Specs+1, Active+1, Supers+1, Workers};
+ false -> {Specs+1, Active, Supers+1, Workers}
+ end.
+
+
+%%% If a restart attempt failed, this message is sent via
+%%% timer:apply_after(0,...) in order to give gen_server the chance to
+%%% check it's inbox before trying again.
+-ifdef(use_specs).
+-spec handle_cast({try_again_restart, child_id() | pid(), term()}, state()) ->
+ {'noreply', state()} | {stop, shutdown, state()}.
+-endif.
+handle_cast({try_again_restart,Pid,Reason}, #state{children=[Child]}=State)
+ when ?is_simple(State) ->
+ RT = Child#child.restart_type,
+ RPid = restarting(Pid),
+ case dynamic_child_args(RPid, dynamics_db(RT, State#state.dynamics)) of
+ {ok, Args} ->
+ {M, F, _} = Child#child.mfargs,
+ NChild = Child#child{pid = RPid, mfargs = {M, F, Args}},
+ try_restart(Child#child.restart_type, Reason, NChild, State);
+ error ->
{noreply, State}
end;
+handle_cast({try_again_restart,Name,Reason}, State) ->
+ %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist
+ case lists:keysearch(Name,#child.name,State#state.children) of
+ {value, Child = #child{pid=?restarting(_), restart_type=RestartType}} ->
+ try_restart(RestartType, Reason, Child, State);
+ _ ->
+ {noreply,State}
+ end.
+
%%
%% Take care of terminated children.
%%
+-ifdef(use_specs).
+-spec handle_info(term(), state()) ->
+ {'noreply', state()} | {'stop', 'shutdown', state()}.
+-endif.
handle_info({'EXIT', Pid, Reason}, State) ->
case restart_child(Pid, Reason, State) of
{ok, State1} ->
@@ -526,20 +678,34 @@ handle_info({'EXIT', Pid, Reason}, State) ->
{stop, shutdown, State1}
end;
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
+ when ?is_simple(State) ->
+ try_restart(RestartType, Reason, Child, State);
+handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
+ case get_child(Child#child.name, State) of
+ {value, Child1} ->
+ try_restart(RestartType, Reason, Child1, State);
+ _What ->
+ {noreply, State}
+ end;
+
handle_info(Msg, State) ->
- error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
+
%%
%% Terminate this server.
%%
-terminate(_Reason, State) when ?is_terminate_simple(State) ->
- terminate_simple_children(
- hd(State#state.children), State#state.dynamics, State#state.name),
- ok;
+-ifdef(use_specs).
+-spec terminate(term(), state()) -> 'ok'.
+-endif.
+terminate(_Reason, #state{children=[Child]} = State) when ?is_simple(State) ->
+ terminate_dynamic_children(Child, dynamics_db(Child#child.restart_type,
+ State#state.dynamics),
+ State#state.name);
terminate(_Reason, State) ->
- terminate_children(State#state.children, State#state.name),
- ok.
+ terminate_children(State#state.children, State#state.name).
%%
%% Change code for the supervisor.
@@ -550,6 +716,10 @@ terminate(_Reason, State) ->
%% NOTE: This requires that the init function of the call-back module
%% does not have any side effects.
%%
+-ifdef(use_specs).
+-spec code_change(term(), state(), term()) ->
+ {'ok', state()} | {'error', term()}.
+-endif.
code_change(_, State, _) ->
case (State#state.module):init(State#state.args) of
{ok, {SupFlags, StartSpec}} ->
@@ -577,14 +747,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
+update_childspec(State, StartSpec) when ?is_simple(State) ->
case check_startspec(StartSpec) of
{ok, [Child]} ->
{ok, State#state{children = [Child]}};
Error ->
{error, Error}
end;
-
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
{ok, Children} ->
@@ -603,11 +772,11 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
update_childspec1(OldC, Children, [Child|KeepOld])
end;
update_childspec1([], Children, KeepOld) ->
- % Return them in (keeped) reverse start order.
+ %% Return them in (kept) reverse start order.
lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
- case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name ->
+ case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
Ch#child{pid = OldCh#child.pid};
(Ch) ->
Ch
@@ -618,7 +787,7 @@ update_chsp(OldCh, Children) ->
NewC ->
{ok, NewC}
end.
-
+
%%% ---------------------------------------------------
%%% Start a new child.
%%% ---------------------------------------------------
@@ -627,20 +796,16 @@ handle_start_child(Child, State) ->
case get_child(Child#child.name, State) of
false ->
case do_start_child(State#state.name, Child) of
+ {ok, undefined} when Child#child.restart_type =:= temporary ->
+ {{ok, undefined}, State};
{ok, Pid} ->
- Children = State#state.children,
- {{ok, Pid},
- State#state{children =
- [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid}, save_child(Child#child{pid = Pid}, State)};
{ok, Pid, Extra} ->
- Children = State#state.children,
- {{ok, Pid, Extra},
- State#state{children =
- [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)};
{error, What} ->
{{error, {What, Child}}, State}
end;
- {value, OldChild} when OldChild#child.pid =/= undefined ->
+ {value, OldChild} when is_pid(OldChild#child.pid) ->
{{error, {already_started, OldChild#child.pid}}, State};
{value, _OldChild} ->
{{error, already_present}, State}
@@ -648,105 +813,145 @@ handle_start_child(Child, State) ->
%%% ---------------------------------------------------
%%% Restart. A process has terminated.
-%%% Returns: {ok, #state} | {shutdown, #state}
+%%% Returns: {ok, state()} | {shutdown, state()}
%%% ---------------------------------------------------
-restart_child(Pid, Reason, State) when ?is_simple(State) ->
- case ?DICT:find(Pid, State#state.dynamics) of
+restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) ->
+ RestartType = Child#child.restart_type,
+ case dynamic_child_args(Pid, dynamics_db(RestartType, State#state.dynamics)) of
{ok, Args} ->
- [Child] = State#state.children,
- RestartType = Child#child.restart_type,
- {M, F, _} = Child#child.mfa,
- NChild = Child#child{pid = Pid, mfa = {M, F, Args}},
+ {M, F, _} = Child#child.mfargs,
+ NChild = Child#child{pid = Pid, mfargs = {M, F, Args}},
do_restart(RestartType, Reason, NChild, State);
error ->
- {ok, State}
+ {ok, State}
end;
+
restart_child(Pid, Reason, State) ->
Children = State#state.children,
+ %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist
case lists:keysearch(Pid, #child.pid, Children) of
- {value, Child} ->
- RestartType = Child#child.restart_type,
+ {value, #child{restart_type = RestartType} = Child} ->
do_restart(RestartType, Reason, Child, State);
- _ ->
+ false ->
{ok, State}
end.
-do_restart({permanent = RestartType, Delay}, Reason, Child, State) ->
- do_restart_delay({RestartType, Delay}, Reason, Child, State);
-do_restart(permanent, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
- restart(Child, State);
-do_restart(Type, normal, Child, State) ->
- del_child_and_maybe_shutdown(Type, Child, State);
-do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State)
- when RestartType =:= transient orelse RestartType =:= intrinsic ->
- do_restart_delay({RestartType, Delay}, Reason, Child, State);
-do_restart(Type, {shutdown, _}, Child, State) ->
- del_child_and_maybe_shutdown(Type, Child, State);
-do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
- del_child_and_maybe_shutdown(Type, Child, State);
-do_restart({RestartType, Delay}, Reason, Child, State)
- when RestartType =:= transient orelse RestartType =:= intrinsic ->
- do_restart_delay({RestartType, Delay}, Reason, Child, State);
-do_restart(Type, Reason, Child, State) when Type =:= transient orelse
- Type =:= intrinsic ->
- report_error(child_terminated, Reason, Child, State#state.name),
+try_restart(RestartType, Reason, Child, State) ->
+ case handle_restart(RestartType, Reason, Child, State) of
+ {ok, NState} -> {noreply, NState};
+ {shutdown, State2} -> {stop, shutdown, State2}
+ end.
+
+do_restart(RestartType, Reason, Child, State) ->
+ maybe_report_error(RestartType, Reason, Child, State),
+ handle_restart(RestartType, Reason, Child, State).
+
+maybe_report_error(permanent, Reason, Child, State) ->
+ report_child_termination(Reason, Child, State);
+maybe_report_error({permanent, _}, Reason, Child, State) ->
+ report_child_termination(Reason, Child, State);
+maybe_report_error(_Type, Reason, Child, State) ->
+ case is_abnormal_termination(Reason) of
+ true -> report_child_termination(Reason, Child, State);
+ false -> ok
+ end.
+
+report_child_termination(Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name).
+
+handle_restart(permanent, _Reason, Child, State) ->
restart(Child, State);
-do_restart(temporary, Reason, Child, State) ->
- report_error(child_terminated, Reason, Child, State#state.name),
- NState = state_del_child(Child, State),
- {ok, NState}.
+handle_restart(transient, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(fun restart/2,
+ fun delete_child_and_continue/2,
+ Reason, Child, State);
+handle_restart(intrinsic, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(fun restart/2,
+ fun delete_child_and_stop/2,
+ Reason, Child, State);
+handle_restart(temporary, _Reason, Child, State) ->
+ delete_child_and_continue(Child, State);
+handle_restart({permanent, _Delay}=Restart, Reason, Child, State) ->
+ do_restart_delay(Restart, Reason, Child, State);
+handle_restart({transient, _Delay}=Restart, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason),
+ fun delete_child_and_continue/2,
+ Reason, Child, State);
+handle_restart({intrinsic, _Delay}=Restart, Reason, Child, State) ->
+ restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason),
+ fun delete_child_and_stop/2,
+ Reason, Child, State).
+
+restart_if_explicit_or_abnormal(RestartHow, Otherwise, Reason, Child, State) ->
+ case ?is_explicit_restart(Reason) orelse is_abnormal_termination(Reason) of
+ true -> RestartHow(Child, State);
+ false -> Otherwise(Child, State)
+ end.
+
+defer_to_restart_delay(Restart, Reason) ->
+ fun(Child, State) -> do_restart_delay(Restart, Reason, Child, State) end.
+
+delete_child_and_continue(Child, State) ->
+ {ok, state_del_child(Child, State)}.
+
+delete_child_and_stop(Child, State) ->
+ {shutdown, state_del_child(Child, State)}.
+
+is_abnormal_termination(normal) -> false;
+is_abnormal_termination(shutdown) -> false;
+is_abnormal_termination({shutdown, _}) -> false;
+is_abnormal_termination(_Other) -> true.
do_restart_delay({RestartType, Delay}, Reason, Child, State) ->
- case restart1(Child, State) of
+ case add_restart(State) of
{ok, NState} ->
- {ok, NState};
- {terminate, NState} ->
+ maybe_restart(NState#state.strategy, Child, NState);
+ {terminate, _NState} ->
+ %% we've reached the max restart intensity, but the
+ %% add_restart will have added to the restarts
+ %% field. Given we don't want to die here, we need to go
+ %% back to the old restarts field otherwise we'll never
+ %% attempt to restart later, which is why we ignore
+ %% NState for this clause.
_TRef = erlang:send_after(trunc(Delay*1000), self(),
{delayed_restart,
{{RestartType, Delay}, Reason, Child}}),
- {ok, state_del_child(Child, NState)}
+ {ok, state_del_child(Child, State)}
end.
-del_child_and_maybe_shutdown(intrinsic, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-del_child_and_maybe_shutdown({intrinsic, _Delay}, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-del_child_and_maybe_shutdown(_, Child, State) ->
- {ok, state_del_child(Child, State)}.
-
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
- restart(NState#state.strategy, Child, NState, fun restart/2);
+ maybe_restart(NState#state.strategy, Child, NState);
{terminate, NState} ->
report_error(shutdown, reached_max_restart_intensity,
Child, State#state.name),
- {shutdown, state_del_child(Child, NState)}
+ {shutdown, remove_child(Child, NState)}
end.
-restart1(Child, State) ->
- case add_restart(State) of
- {ok, NState} ->
- restart(NState#state.strategy, Child, NState, fun restart1/2);
- {terminate, _NState} ->
- %% we've reached the max restart intensity, but the
- %% add_restart will have added to the restarts
- %% field. Given we don't want to die here, we need to go
- %% back to the old restarts field otherwise we'll never
- %% attempt to restart later.
- {terminate, State}
+maybe_restart(Strategy, Child, State) ->
+ case restart(Strategy, Child, State) of
+ {try_again, Reason, NState2} ->
+ %% Leaving control back to gen_server before
+ %% trying again. This way other incoming requsts
+ %% for the supervisor can be handled - e.g. a
+ %% shutdown request for the supervisor or the
+ %% child.
+ Id = if ?is_simple(State) -> Child#child.pid;
+ true -> Child#child.name
+ end,
+ timer:apply_after(0,?MODULE,try_again_restart,[self(),Id,Reason]),
+ {ok,NState2};
+ Other ->
+ Other
end.
-restart(Strategy, Child, State, Restart)
- when Strategy =:= simple_one_for_one orelse
- Strategy =:= simple_one_for_one_terminate ->
- #child{mfa = {M, F, A}} = Child,
- Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
+restart(simple_one_for_one, Child, State) ->
+ #child{pid = OldPid, mfargs = {M, F, A}} = Child,
+ Dynamics = ?DICT:erase(OldPid, dynamics_db(Child#child.restart_type,
+ State#state.dynamics)),
case do_start_child_i(M, F, A) of
- {ok, undefined} ->
- {ok, State};
{ok, Pid} ->
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};
@@ -754,10 +959,13 @@ restart(Strategy, Child, State, Restart)
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};
{error, Error} ->
+ NState = State#state{dynamics = ?DICT:store(restarting(OldPid), A,
+ Dynamics)},
report_error(start_error, Error, Child, State#state.name),
- Restart(Child, State)
+ {try_again, Error, NState}
end;
-restart(one_for_one, Child, State, Restart) ->
+restart(one_for_one, Child, State) ->
+ OldPid = Child#child.pid,
case do_start_child(State#state.name, Child) of
{ok, Pid} ->
NState = replace_child(Child#child{pid = Pid}, State),
@@ -766,139 +974,83 @@ restart(one_for_one, Child, State, Restart) ->
NState = replace_child(Child#child{pid = Pid}, State),
{ok, NState};
{error, Reason} ->
+ NState = replace_child(Child#child{pid = restarting(OldPid)}, State),
report_error(start_error, Reason, Child, State#state.name),
- Restart(Child, State)
+ {try_again, Reason, NState}
end;
-restart(rest_for_one, Child, State, Restart) ->
+restart(rest_for_one, Child, State) ->
{ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children),
ChAfter2 = terminate_children(ChAfter, State#state.name),
case start_children(ChAfter2, State#state.name) of
{ok, ChAfter3} ->
{ok, State#state{children = ChAfter3 ++ ChBefore}};
- {error, ChAfter3} ->
- Restart(Child, State#state{children = ChAfter3 ++ ChBefore})
+ {error, ChAfter3, Reason} ->
+ NChild = Child#child{pid=restarting(Child#child.pid)},
+ NState = State#state{children = ChAfter3 ++ ChBefore},
+ {try_again, Reason, replace_child(NChild,NState)}
end;
-restart(one_for_all, Child, State, Restart) ->
+restart(one_for_all, Child, State) ->
Children1 = del_child(Child#child.pid, State#state.children),
Children2 = terminate_children(Children1, State#state.name),
case start_children(Children2, State#state.name) of
{ok, NChs} ->
{ok, State#state{children = NChs}};
- {error, NChs} ->
- Restart(Child, State#state{children = NChs})
+ {error, NChs, Reason} ->
+ NChild = Child#child{pid=restarting(Child#child.pid)},
+ NState = State#state{children = NChs},
+ {try_again, Reason, replace_child(NChild,NState)}
end.
+restarting(Pid) when is_pid(Pid) -> ?restarting(Pid);
+restarting(RPid) -> RPid.
+
%%-----------------------------------------------------------------
%% Func: terminate_children/2
-%% Args: Children = [#child] in termination order
+%% Args: Children = [child_rec()] in termination order
%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Returns: NChildren = [#child] in
+%% Returns: NChildren = [child_rec()] in
%% startup order (reversed termination order)
%%-----------------------------------------------------------------
terminate_children(Children, SupName) ->
terminate_children(Children, SupName, []).
+%% Temporary children should not be restarted and thus should
+%% be skipped when building the list of terminated children, although
+%% we do want them to be shut down as many functions from this module
+%% use this function to just clear everything.
+terminate_children([Child = #child{restart_type=temporary} | Children], SupName, Res) ->
+ do_terminate(Child, SupName),
+ terminate_children(Children, SupName, Res);
terminate_children([Child | Children], SupName, Res) ->
NChild = do_terminate(Child, SupName),
terminate_children(Children, SupName, [NChild | Res]);
terminate_children([], _SupName, Res) ->
Res.
-terminate_simple_children(Child, Dynamics, SupName) ->
- Pids = dict:fold(fun (Pid, _Args, Pids) ->
- erlang:monitor(process, Pid),
- unlink(Pid),
- exit(Pid, child_exit_reason(Child)),
- [Pid | Pids]
- end, [], Dynamics),
- TimeoutMsg = {timeout, make_ref()},
- TRef = timeout_start(Child, TimeoutMsg),
- {Replies, Timedout} =
- lists:foldl(
- fun (_Pid, {Replies, Timedout}) ->
- {Pid1, Reason1, Timedout1} =
- receive
- TimeoutMsg ->
- Remaining = Pids -- [P || {P, _} <- Replies],
- [exit(P, kill) || P <- Remaining],
- receive
- {'DOWN', _MRef, process, Pid, Reason} ->
- {Pid, Reason, true}
- end;
- {'DOWN', _MRef, process, Pid, Reason} ->
- {Pid, Reason, Timedout}
- end,
- {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies],
- Timedout1}
- end, {[], false}, Pids),
- timeout_stop(Child, TRef, TimeoutMsg, Timedout),
- ReportError = shutdown_error_reporter(SupName),
- Report = fun(_, ok) -> ok;
- (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid})
- end,
- [receive
- {'EXIT', Pid, Reason} ->
- Report(Pid, child_res(Child, Reason, Timedout))
- after
- 0 -> Report(Pid, Reply)
- end || {Pid, Reply} <- Replies],
- ok.
-
-child_exit_reason(#child{shutdown = brutal_kill}) -> kill;
-child_exit_reason(#child{}) -> shutdown.
-
-child_res(#child{shutdown=brutal_kill}, killed, false) -> ok;
-child_res(#child{}, shutdown, false) -> ok;
-child_res(#child{restart_type=permanent}, normal, false) -> {error, normal};
-child_res(#child{restart_type={permanent,_}},normal, false) -> {error, normal};
-child_res(#child{}, normal, false) -> ok;
-child_res(#child{}, R, _) -> {error, R}.
-
-timeout_start(#child{shutdown = Time}, Msg) when is_integer(Time) ->
- erlang:send_after(Time, self(), Msg);
-timeout_start(#child{}, _Msg) ->
- ok.
-
-timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) ->
- erlang:cancel_timer(TRef),
- receive
- Msg -> ok
- after
- 0 -> ok
- end;
-timeout_stop(#child{}, _TRef, _Msg, _Timedout) ->
- ok.
-
-do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
- ReportError = shutdown_error_reporter(SupName),
+do_terminate(Child, SupName) when is_pid(Child#child.pid) ->
case shutdown(Child#child.pid, Child#child.shutdown) of
ok ->
ok;
- {error, normal} ->
- case Child#child.restart_type of
- permanent -> ReportError(normal, Child);
- {permanent, _Delay} -> ReportError(normal, Child);
- _ -> ok
- end;
+ {error, normal} when not ?is_permanent(Child#child.restart_type) ->
+ ok;
{error, OtherReason} ->
- ReportError(OtherReason, Child)
+ report_error(shutdown_error, OtherReason, Child, SupName)
end,
Child#child{pid = undefined};
do_terminate(Child, _SupName) ->
- Child.
+ Child#child{pid = undefined}.
%%-----------------------------------------------------------------
-%% Shutdowns a child. We must check the EXIT value
+%% Shutdowns a child. We must check the EXIT value
%% of the child, because it might have died with another reason than
-%% the wanted. In that case we want to report the error. We put a
-%% monitor on the child an check for the 'DOWN' message instead of
-%% checking for the 'EXIT' message, because if we check the 'EXIT'
-%% message a "naughty" child, who does unlink(Sup), could hang the
-%% supervisor.
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
%% Returns: ok | {error, OtherReason} (this should be reported)
%%-----------------------------------------------------------------
shutdown(Pid, brutal_kill) ->
-
case monitor_child(Pid) of
ok ->
exit(Pid, kill),
@@ -908,16 +1060,14 @@ shutdown(Pid, brutal_kill) ->
{'DOWN', _MRef, process, Pid, OtherReason} ->
{error, OtherReason}
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end;
-
shutdown(Pid, Time) ->
-
case monitor_child(Pid) of
ok ->
exit(Pid, shutdown), %% Try to shutdown gracefully
- receive
+ receive
{'DOWN', _MRef, process, Pid, shutdown} ->
ok;
{'DOWN', _MRef, process, Pid, OtherReason} ->
@@ -929,14 +1079,14 @@ shutdown(Pid, Time) ->
{error, OtherReason}
end
end;
- {error, Reason} ->
+ {error, Reason} ->
{error, Reason}
end.
%% Help function to shutdown/2 switches from link to monitor approach
monitor_child(Pid) ->
-
- %% Do the monitor operation first so that if the child dies
+
+ %% Do the monitor operation first so that if the child dies
%% before the monitoring is done causing a 'DOWN'-message with
%% reason noproc, we will get the real reason in the 'EXIT'-message
%% unless a naughty child has already done unlink...
@@ -946,34 +1096,177 @@ monitor_child(Pid) ->
receive
%% If the child dies before the unlik we must empty
%% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
- {'EXIT', Pid, Reason} ->
- receive
+ {'EXIT', Pid, Reason} ->
+ receive
{'DOWN', _, process, Pid, _} ->
{error, Reason}
end
- after 0 ->
+ after 0 ->
%% If a naughty child did unlink and the child dies before
- %% monitor the result will be that shutdown/2 receives a
+ %% monitor the result will be that shutdown/2 receives a
%% 'DOWN'-message with reason noproc.
%% If the child should die after the unlink there
%% will be a 'DOWN'-message with a correct reason
- %% that will be handled in shutdown/2.
- ok
+ %% that will be handled in shutdown/2.
+ ok
end.
%%-----------------------------------------------------------------
+%% Func: terminate_dynamic_children/3
+%% Args: Child = child_rec()
+%% Dynamics = ?DICT() | ?SET()
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Returns: ok
+%%
+%%
+%% Shutdown all dynamic children. This happens when the supervisor is
+%% stopped. Because the supervisor can have millions of dynamic children, we
+%% can have an significative overhead here.
+%%-----------------------------------------------------------------
+terminate_dynamic_children(Child, Dynamics, SupName) ->
+ {Pids, EStack0} = monitor_dynamic_children(Child, Dynamics),
+ Sz = ?SETS:size(Pids),
+ EStack = case Child#child.shutdown of
+ brutal_kill ->
+ ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
+ infinity ->
+ ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz, undefined, EStack0);
+ Time ->
+ ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids),
+ TRef = erlang:start_timer(Time, self(), kill),
+ wait_dynamic_children(Child, Pids, Sz, TRef, EStack0)
+ end,
+ %% Unroll stacked errors and report them
+ ?DICT:fold(fun(Reason, Ls, _) ->
+ report_error(shutdown_error, Reason,
+ Child#child{pid=Ls}, SupName)
+ end, ok, EStack).
+
+
+monitor_dynamic_children(#child{restart_type=temporary}, Dynamics) ->
+ ?SETS:fold(fun(P, {Pids, EStack}) ->
+ case monitor_child(P) of
+ ok ->
+ {?SETS:add_element(P, Pids), EStack};
+ {error, normal} ->
+ {Pids, EStack};
+ {error, Reason} ->
+ {Pids, ?DICT:append(Reason, P, EStack)}
+ end
+ end, {?SETS:new(), ?DICT:new()}, Dynamics);
+monitor_dynamic_children(#child{restart_type=RType}, Dynamics) ->
+ ?DICT:fold(fun(P, _, {Pids, EStack}) when is_pid(P) ->
+ case monitor_child(P) of
+ ok ->
+ {?SETS:add_element(P, Pids), EStack};
+ {error, normal} when not ?is_permanent(RType) ->
+ {Pids, EStack};
+ {error, Reason} ->
+ {Pids, ?DICT:append(Reason, P, EStack)}
+ end;
+ (?restarting(_), _, {Pids, EStack}) ->
+ {Pids, EStack}
+ end, {?SETS:new(), ?DICT:new()}, Dynamics).
+
+wait_dynamic_children(_Child, _Pids, 0, undefined, EStack) ->
+ EStack;
+wait_dynamic_children(_Child, _Pids, 0, TRef, EStack) ->
+ %% If the timer has expired before its cancellation, we must empty the
+ %% mail-box of the 'timeout'-message.
+ erlang:cancel_timer(TRef),
+ receive
+ {timeout, TRef, kill} ->
+ EStack
+ after 0 ->
+ EStack
+ end;
+wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz,
+ TRef, EStack) ->
+ receive
+ {'DOWN', _MRef, process, Pid, killed} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, ?DICT:append(Reason, Pid, EStack))
+ end;
+wait_dynamic_children(#child{restart_type=RType} = Child, Pids, Sz,
+ TRef, EStack) ->
+ receive
+ {'DOWN', _MRef, process, Pid, shutdown} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, normal} when not ?is_permanent(RType) ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, EStack);
+
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1,
+ TRef, ?DICT:append(Reason, Pid, EStack));
+
+ {timeout, TRef, kill} ->
+ ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids),
+ wait_dynamic_children(Child, Pids, Sz-1, undefined, EStack)
+ end.
+
+%%-----------------------------------------------------------------
%% Child/State manipulating functions.
%%-----------------------------------------------------------------
-state_del_child(#child{pid = Pid}, State) when ?is_simple(State) ->
- NDynamics = ?DICT:erase(Pid, State#state.dynamics),
+
+%% Note we do not want to save the parameter list for temporary processes as
+%% they will not be restarted, and hence we do not need this information.
+%% Especially for dynamic children to simple_one_for_one supervisors
+%% it could become very costly as it is not uncommon to spawn
+%% very many such processes.
+save_child(#child{restart_type = temporary,
+ mfargs = {M, F, _}} = Child, #state{children = Children} = State) ->
+ State#state{children = [Child#child{mfargs = {M, F, undefined}} |Children]};
+save_child(Child, #state{children = Children} = State) ->
+ State#state{children = [Child |Children]}.
+
+save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) ->
+ State#state{dynamics = ?SETS:add_element(Pid, dynamics_db(temporary, Dynamics))};
+save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) ->
+ State#state{dynamics = ?DICT:store(Pid, Args, dynamics_db(RestartType, Dynamics))}.
+
+dynamics_db(temporary, undefined) ->
+ ?SETS:new();
+dynamics_db(_, undefined) ->
+ ?DICT:new();
+dynamics_db(_,Dynamics) ->
+ Dynamics.
+
+dynamic_child_args(Pid, Dynamics) ->
+ case ?SETS:is_set(Dynamics) of
+ true ->
+ {ok, undefined};
+ false ->
+ ?DICT:find(Pid, Dynamics)
+ end.
+
+state_del_child(#child{pid = Pid, restart_type = temporary}, State) when ?is_simple(State) ->
+ NDynamics = ?SETS:del_element(Pid, dynamics_db(temporary, State#state.dynamics)),
+ State#state{dynamics = NDynamics};
+state_del_child(#child{pid = Pid, restart_type = RType}, State) when ?is_simple(State) ->
+ NDynamics = ?DICT:erase(Pid, dynamics_db(RType, State#state.dynamics)),
State#state{dynamics = NDynamics};
state_del_child(Child, State) ->
NChildren = del_child(Child#child.name, State#state.children),
State#state{children = NChildren}.
+del_child(Name, [Ch=#child{pid = ?restarting(_)}|_]=Chs) when Ch#child.name =:= Name ->
+ Chs;
+del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name, Ch#child.restart_type =:= temporary ->
+ Chs;
del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name ->
[Ch#child{pid = undefined} | Chs];
+del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid, Ch#child.restart_type =:= temporary ->
+ Chs;
del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid ->
[Ch#child{pid = undefined} | Chs];
del_child(Name, [Ch|Chs]) ->
@@ -996,7 +1289,38 @@ split_child(_, [], After) ->
{lists:reverse(After), []}.
get_child(Name, State) ->
+ get_child(Name, State, false).
+get_child(Pid, State, AllowPid) when AllowPid, is_pid(Pid) ->
+ get_dynamic_child(Pid, State);
+get_child(Name, State, _) ->
lists:keysearch(Name, #child.name, State#state.children).
+
+get_dynamic_child(Pid, #state{children=[Child], dynamics=Dynamics}) ->
+ DynamicsDb = dynamics_db(Child#child.restart_type, Dynamics),
+ case is_dynamic_pid(Pid, DynamicsDb) of
+ true ->
+ {value, Child#child{pid=Pid}};
+ false ->
+ RPid = restarting(Pid),
+ case is_dynamic_pid(RPid, DynamicsDb) of
+ true ->
+ {value, Child#child{pid=RPid}};
+ false ->
+ case erlang:is_process_alive(Pid) of
+ true -> false;
+ false -> {value, Child}
+ end
+ end
+ end.
+
+is_dynamic_pid(Pid, Dynamics) ->
+ case ?SETS:is_set(Dynamics) of
+ true ->
+ ?SETS:is_element(Pid, Dynamics);
+ false ->
+ ?DICT:is_key(Pid, Dynamics)
+ end.
+
replace_child(Child, State) ->
Chs = do_replace_child(Child, State#state.children),
State#state{children = Chs}.
@@ -1016,12 +1340,12 @@ remove_child(Child, State) ->
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
%% rest_for_one
-%% MaxIntensity = integer()
-%% Period = integer()
+%% MaxIntensity = integer() >= 0
+%% Period = integer() > 0
%% Mod :== atom()
-%% Arsg :== term()
+%% Args :== term()
%% Purpose: Check that Type is of correct type (!)
-%% Returns: {ok, #state} | Error
+%% Returns: {ok, state()} | Error
%%-----------------------------------------------------------------
init_state(SupName, Type, Mod, Args) ->
case catch init_state1(SupName, Type, Mod, Args) of
@@ -1036,46 +1360,45 @@ init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) ->
validIntensity(MaxIntensity),
validPeriod(Period),
{ok, #state{name = supname(SupName,Mod),
- strategy = Strategy,
- intensity = MaxIntensity,
- period = Period,
- module = Mod,
- args = Args}};
+ strategy = Strategy,
+ intensity = MaxIntensity,
+ period = Period,
+ module = Mod,
+ args = Args}};
init_state1(_SupName, Type, _, _) ->
{invalid_type, Type}.
-validStrategy(simple_one_for_one_terminate) -> true;
-validStrategy(simple_one_for_one) -> true;
-validStrategy(one_for_one) -> true;
-validStrategy(one_for_all) -> true;
-validStrategy(rest_for_one) -> true;
-validStrategy(What) -> throw({invalid_strategy, What}).
+validStrategy(simple_one_for_one) -> true;
+validStrategy(one_for_one) -> true;
+validStrategy(one_for_all) -> true;
+validStrategy(rest_for_one) -> true;
+validStrategy(What) -> throw({invalid_strategy, What}).
validIntensity(Max) when is_integer(Max),
Max >= 0 -> true;
-validIntensity(What) -> throw({invalid_intensity, What}).
+validIntensity(What) -> throw({invalid_intensity, What}).
validPeriod(Period) when is_integer(Period),
Period > 0 -> true;
validPeriod(What) -> throw({invalid_period, What}).
-supname(self,Mod) -> {self(),Mod};
-supname(N,_) -> N.
+supname(self, Mod) -> {self(), Mod};
+supname(N, _) -> N.
%%% ------------------------------------------------------
%%% Check that the children start specification is valid.
%%% Shall be a six (6) tuple
%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
%%% where Name is an atom
-%%% Func is {Mod, Fun, Args} == {atom, atom, list}
+%%% Func is {Mod, Fun, Args} == {atom(), atom(), list()}
%%% RestartType is permanent | temporary | transient |
%%% intrinsic | {permanent, Delay} |
%%% {transient, Delay} | {intrinsic, Delay}
%% where Delay >= 0
-%%% Shutdown = integer() | infinity | brutal_kill
+%%% Shutdown = integer() > 0 | infinity | brutal_kill
%%% ChildType = supervisor | worker
%%% Modules = [atom()] | dynamic
-%%% Returns: {ok, [#child]} | Error
+%%% Returns: {ok, [child_rec()]} | Error
%%% ------------------------------------------------------
check_startspec(Children) -> check_startspec(Children, []).
@@ -1103,7 +1426,7 @@ check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) ->
validChildType(ChildType),
validShutdown(Shutdown, ChildType),
validMods(Mods),
- {ok, #child{name = Name, mfa = Func, restart_type = RestartType,
+ {ok, #child{name = Name, mfargs = Func, restart_type = RestartType,
shutdown = Shutdown, child_type = ChildType, modules = Mods}}.
validChildType(supervisor) -> true;
@@ -1112,8 +1435,8 @@ validChildType(What) -> throw({invalid_child_type, What}).
validName(_Name) -> true.
-validFunc({M, F, A}) when is_atom(M),
- is_atom(F),
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
@@ -1131,15 +1454,15 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
-validShutdown(infinity, supervisor) -> true;
+validShutdown(infinity, _) -> true;
validShutdown(brutal_kill, _) -> true;
validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
validMods(dynamic) -> true;
validMods(Mods) when is_list(Mods) ->
- lists:foreach(fun (Mod) ->
+ lists:foreach(fun(Mod) ->
if
is_atom(Mod) -> ok;
true -> throw({invalid_module, Mod})
@@ -1157,7 +1480,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}).
%%% Returns: {ok, State'} | {terminate, State'}
%%% ------------------------------------------------------
-add_restart(State) ->
+add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
@@ -1213,15 +1536,18 @@ report_error(Error, Reason, Child, SupName) ->
{offender, extract_child(Child)}],
error_logger:error_report(supervisor_report, ErrorMsg).
-shutdown_error_reporter(SupName) ->
- fun(Reason, Child) ->
- report_error(shutdown_error, Reason, Child, SupName)
- end.
+extract_child(Child) when is_list(Child#child.pid) ->
+ [{nb_children, length(Child#child.pid)},
+ {name, Child#child.name},
+ {mfargs, Child#child.mfargs},
+ {restart_type, Child#child.restart_type},
+ {shutdown, Child#child.shutdown},
+ {child_type, Child#child.child_type}];
extract_child(Child) ->
[{pid, Child#child.pid},
{name, Child#child.name},
- {mfa, Child#child.mfa},
+ {mfargs, Child#child.mfargs},
{restart_type, Child#child.restart_type},
{shutdown, Child#child.shutdown},
{child_type, Child#child.child_type}].
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
index a841b1f0..5a47e309 100644
--- a/src/supervisor2_tests.erl
+++ b/src/supervisor2_tests.erl
@@ -65,6 +65,6 @@ init([Timeout]) ->
[{local, ?MODULE}, ?MODULE, []]},
transient, Timeout, supervisor, [?MODULE]}]}};
init([]) ->
- {ok, {{simple_one_for_one_terminate, 0, 1},
+ {ok, {{simple_one_for_one, 0, 1},
[{test_worker, {?MODULE, start_link, []},
temporary, 1000, worker, [?MODULE]}]}}.
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 51ff7b4e..da325f1e 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -34,7 +34,7 @@
%%----------------------------------------------------------------------------
test_supervisor_delayed_restart() ->
- passed = with_sup(simple_one_for_one_terminate,
+ passed = with_sup(simple_one_for_one,
fun (SupPid) ->
{ok, _ChildPid} =
supervisor2:start_child(SupPid, []),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index d60b7fec..a07f6c65 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -263,29 +263,11 @@ get_total_memory({unix,openbsd}) ->
sysctl("hw.usermem");
get_total_memory({win32,_OSname}) ->
- %% Due to the Erlang print format bug, on Windows boxes the memory
- %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM
- %% we get negative memory size:
- %% > os_mon_sysinfo:get_mem_info().
- %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"]
- %% Due to this bug, we don't actually know anything. Even if the
- %% number is postive we can't be sure if it's correct. This only
- %% affects us on os_mon versions prior to 2.2.1.
- case application:get_key(os_mon, vsn) of
- undefined ->
- unknown;
- {ok, Version} ->
- case rabbit_misc:version_compare(Version, "2.2.1", lt) of
- true -> %% os_mon is < 2.2.1, so we know nothing
- unknown;
- false ->
- [Result|_] = os_mon_sysinfo:get_mem_info(),
- {ok, [_MemLoad, TotPhys, _AvailPhys,
- _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} =
- io_lib:fread("~d~d~d~d~d~d~d", Result),
- TotPhys
- end
- end;
+ [Result|_] = os_mon_sysinfo:get_mem_info(),
+ {ok, [_MemLoad, TotPhys, _AvailPhys, _TotPage, _AvailPage, _TotV, _AvailV],
+ _RestStr} =
+ io_lib:fread("~d~d~d~d~d~d~d", Result),
+ TotPhys;
get_total_memory({unix, linux}) ->
File = read_proc_file("/proc/meminfo"),