diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-13 15:39:34 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-13 15:39:34 +0100 |
commit | fae24bbac8719b12b0dec0154a61d267cab3ab4b (patch) | |
tree | eddba98aafe6e14b477af88421eef5bf4ae70ad3 | |
parent | 6893e45ee15fdd6c8d7dcc35d81ec65a56e8867d (diff) | |
parent | cfd2381c8133957ce66db1761096c983994d43d1 (diff) | |
download | rabbitmq-server-fae24bbac8719b12b0dec0154a61d267cab3ab4b.tar.gz |
stable to default
29 files changed, 1070 insertions, 702 deletions
@@ -3,6 +3,7 @@ syntax: glob *~ *.swp *.patch +*.orig erl_crash.dump deps.mk diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index a4582e2d..7d0766f8 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -18,6 +18,7 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, + {vm_memory_high_watermark_paging_ratio, 0.5}, {disk_free_limit, 1000000000}, %% 1GB {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 028b4ec2..7a97556d 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -12,8 +12,8 @@ Source3: rabbitmq-server.logrotate Source4: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ BuildArch: noarch -BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt -Requires: erlang >= R12B-3, logrotate +BuildRequires: erlang >= R13B03, python-simplejson, xmlto, libxslt +Requires: erlang >= R13B03, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server Requires(post): %%REQUIRES%% diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 3a15c4b6..02d23547 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -4,12 +4,12 @@ Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> Uploaders: Emile Joubert <emile@rabbitmq.com> DM-Upload-Allowed: yes -Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip +Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:13.b.3), erlang-src (>= 1:13.b.3), unzip, zip Standards-Version: 3.9.2 Package: rabbitmq-server Architecture: all -Depends: erlang-nox (>= 1:12.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends} +Depends: erlang-nox (>= 1:13.b.3) | esl-erlang, adduser, logrotate, ${misc:Depends} Description: AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 2ab8eee7..85625a9d 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -213,7 +213,7 @@ Function findErlang abort: Abort ${Else} - ${VersionCompare} $2 "5.6.3" $0 + ${VersionCompare} $2 "5.7.4" $0 ${VersionCompare} $2 "5.8.1" $1 ${If} $0 = 2 diff --git a/src/delegate.erl b/src/delegate.erl index 7a06c1e4..5277e59f 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -33,15 +33,14 @@ -export_type([monitor_ref/0]). -type(monitor_ref() :: reference() | {atom(), pid()}). +-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). --spec(invoke/2 :: - ( pid(), fun ((pid()) -> A)) -> A; - ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], - [{pid(), term()}]}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A; + ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], + [{pid(), term()}]}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok'). -spec(monitor/2 :: ('process', pid()) -> monitor_ref()). -spec(demonitor/1 :: (monitor_ref()) -> 'true'). -spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). @@ -64,24 +63,24 @@ start_link(Num) -> Name = delegate_name(Num), gen_server2:start_link({local, Name}, ?MODULE, [Name], []). -invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - Fun(Pid); -invoke(Pid, Fun) when is_pid(Pid) -> - case invoke([Pid], Fun) of +invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + apply1(FunOrMFA, Pid); +invoke(Pid, FunOrMFA) when is_pid(Pid) -> + case invoke([Pid], FunOrMFA) of {[{Pid, Result}], []} -> Result; {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; -invoke([], _Fun) -> %% optimisation +invoke([], _FunOrMFA) -> %% optimisation {[], []}; -invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, Fun) of +invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; -invoke(Pids, Fun) when is_list(Pids) -> +invoke(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do @@ -91,38 +90,38 @@ invoke(Pids, Fun) when is_list(Pids) -> [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}, infinity) + {invoke, FunOrMFA, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - safe_invoke(Pid, Fun), %% we don't care about any error +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, FunOrMFA), %% we don't care about any error ok; -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result([Pid], Fun); +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> + invoke_no_result([Pid], FunOrMFA); -invoke_no_result([], _Fun) -> %% optimisation +invoke_no_result([], _FunOrMFA) -> %% optimisation ok; -invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - safe_invoke(Pid, Fun), %% must not die +invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + safe_invoke(Pid, FunOrMFA), %% must not die ok; -invoke_no_result(Pids, Fun) when is_list(Pids) -> +invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; RemoteNodes -> gen_server2:abcast( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}) + {invoke, FunOrMFA, Grouped}) end, - safe_invoke(LocalPids, Fun), %% must not die + safe_invoke(LocalPids, FunOrMFA), %% must not die ok. monitor(Type, Pid) when node(Pid) =:= node() -> @@ -140,10 +139,10 @@ demonitor({Name, Pid}, Options) -> gen_server2:cast(Name, {demonitor, Pid, Options}). call(PidOrPids, Msg) -> - invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). %%---------------------------------------------------------------------------- @@ -171,23 +170,27 @@ delegate(Pid, RemoteNodes) -> Name -> Name end. -safe_invoke(Pids, Fun) when is_list(Pids) -> - [safe_invoke(Pid, Fun) || Pid <- Pids]; -safe_invoke(Pid, Fun) when is_pid(Pid) -> +safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> + [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; +safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> try - {ok, Pid, Fun(Pid)} + {ok, Pid, apply1(FunOrMFA, Pid)} catch Class:Reason -> {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. +apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); +apply1(Fun, Arg) -> Fun(Arg). + %%---------------------------------------------------------------------------- init([Name]) -> {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. +handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, + hibernate}. handle_cast({monitor, Type, WantsMonitor, Pid}, State = #state{monitors = Monitors}) -> @@ -205,8 +208,8 @@ handle_cast({demonitor, Pid, Options}, State end, hibernate}; -handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), +handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> + safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), {noreply, State, hibernate}. handle_info({'DOWN', Ref, process, Pid, Info}, diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 3b16c53a..d5f51db0 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -212,9 +212,8 @@ start_link0(Prefix, Group, Init) -> init(Mod, Args) -> case Mod:init(Args) of {ok, {{Bad, _, _}, _ChildSpecs}} when - Bad =:= simple_one_for_one orelse - Bad =:= simple_one_for_one_terminate -> erlang:error(badarg); - Init -> Init + Bad =:= simple_one_for_one -> erlang:error(badarg); + Init -> Init end. start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 90be4f80..d54c2a8d 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -74,9 +74,7 @@ check_vhost_access(User = #user{ username = Username, true -> Module:check_vhost_access(User, VHostPath) end end, - "~s failed checking vhost access to ~s for ~s: ~p~n", - [Module, VHostPath, Username], - "access to vhost '~s' refused for user '~s'", + Module, "access to vhost '~s' refused for user '~s'", [VHostPath, Username]). check_resource_access(User, R = #resource{kind = exchange, name = <<"">>}, @@ -87,15 +85,14 @@ check_resource_access(User = #user{username = Username, auth_backend = Module}, Resource, Permission) -> check_access( fun() -> Module:check_resource_access(User, Resource, Permission) end, - "~s failed checking resource access to ~p for ~s: ~p~n", - [Module, Resource, Username], - "access to ~s refused for user '~s'", + Module, "access to ~s refused for user '~s'", [rabbit_misc:rs(Resource), Username]). -check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> +check_access(Fun, Module, ErrStr, ErrArgs) -> Allow = case Fun() of - {error, _} = E -> - rabbit_log:error(ErrStr, ErrArgs ++ [E]), + {error, E} -> + rabbit_log:error(ErrStr ++ " by ~s: ~p~n", + ErrArgs ++ [Module, E]), false; Else -> Else @@ -104,5 +101,5 @@ check_access(Fun, ErrStr, ErrArgs, RefStr, RefArgs) -> true -> ok; false -> - rabbit_misc:protocol_error(access_refused, RefStr, RefArgs) + rabbit_misc:protocol_error(access_refused, ErrStr, ErrArgs) end. diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 6607c4f6..cd1d125b 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -37,7 +37,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]). -spec(set_alarm/1 :: (any()) -> 'ok'). -spec(clear_alarm/1 :: (any()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). @@ -93,8 +93,8 @@ init([]) -> alarmed_nodes = dict:new(), alarms = []}}. -handle_call({register, Pid, AlertMFA}, State) -> - {ok, 0 < dict:size(State#alarms.alarmed_nodes), +handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> + {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> @@ -104,11 +104,20 @@ handle_call(_Request, State) -> {ok, not_understood, State}. handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> - handle_set_alarm(Alarm, State#alarms{alarms = [Alarm|Alarms]}); + case lists:member(Alarm, Alarms) of + true -> {ok, State}; + false -> UpdatedAlarms = lists:usort([Alarm|Alarms]), + handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms}) + end; handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> - handle_clear_alarm(Alarm, State#alarms{alarms = lists:keydelete(Alarm, 1, - Alarms)}); + case lists:keymember(Alarm, 1, Alarms) of + true -> handle_clear_alarm( + Alarm, State#alarms{alarms = lists:keydelete( + Alarm, 1, Alarms)}); + false -> {ok, State} + + end; handle_event({node_up, Node}, State) -> %% Must do this via notify and not call to avoid possible deadlock. @@ -118,7 +127,7 @@ handle_event({node_up, Node}, State) -> {ok, State}; handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; + {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)}; handle_event({register, Pid, AlertMFA}, State) -> {ok, internal_register(Pid, AlertMFA, State)}; @@ -141,45 +150,36 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +dict_append(Key, Val, Dict) -> + L = case dict:find(Key, Dict) of + {ok, V} -> V; + error -> [] + end, + dict:store(Key, lists:usort([Val|L]), Dict). + dict_unappend_all(Key, _Val, Dict) -> dict:erase(Key, Dict). dict_unappend(Key, Val, Dict) -> - case lists:delete(Val, dict:fetch(Key, Dict)) of + L = case dict:find(Key, Dict) of + {ok, V} -> V; + error -> [] + end, + + case lists:delete(Val, L) of [] -> dict:erase(Key, Dict); X -> dict:store(Key, X, Dict) end. -count_dict_values(Val, Dict) -> - dict:fold(fun (_Node, List, Count) -> - Count + case lists:member(Val, List) of - true -> 1; - false -> 0 - end - end, 0, Dict). - -maybe_alert(UpdateFun, Node, Source, +maybe_alert(UpdateFun, Node, Source, Alert, State = #alarms{alarmed_nodes = AN, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), - BeforeSz = count_dict_values(Source, AN), - AfterSz = count_dict_values(Source, AN1), - - %% If we have changed our alarm state, inform the remotes. - IsLocal = Node =:= node(), - if IsLocal andalso BeforeSz < AfterSz -> - ok = alert_remote(true, Alertees, Source); - IsLocal andalso BeforeSz > AfterSz -> - ok = alert_remote(false, Alertees, Source); - true -> - ok - end, - %% If the overall alarm state has changed, inform the locals. - case {dict:size(AN), dict:size(AN1)} of - {0, 1} -> ok = alert_local(true, Alertees, Source); - {1, 0} -> ok = alert_local(false, Alertees, Source); - {_, _} -> ok + case node() of + Node -> ok = alert_remote(Alert, Alertees, Source); + _ -> ok end, + ok = alert_local(Alert, Alertees, Source), State#alarms{alarmed_nodes = AN1}. alert_local(Alert, Alertees, Source) -> @@ -214,7 +214,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}; handle_set_alarm({file_descriptor_limit, []}, State) -> rabbit_log:warning( "file descriptor limit alarm set.~n~n" @@ -229,7 +229,7 @@ handle_set_alarm(Alarm, State) -> handle_clear_alarm({resource_limit, Source, Node}, State) -> rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}; handle_clear_alarm(file_descriptor_limit, State) -> rabbit_log:warning("file descriptor limit alarm cleared~n"), {ok, State}; diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0515e82e..74ae59da 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -47,6 +47,6 @@ start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, + {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index a7e8fb36..5ab22e75 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -31,9 +31,8 @@ %% SASL PLAIN, as used by the Qpid Java client and our clients. Also, %% apparently, by OpenAMQ. -%% TODO: once the minimum erlang becomes R13B03, reimplement this -%% using the binary module - that makes use of BIFs to do binary -%% matching and will thus be much faster. +%% TODO: reimplement this using the binary module? - that makes use of +%% BIFs to do binary matching and will thus be much faster. description() -> [{description, <<"SASL PLAIN authentication mechanism">>}]. diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 1d9ba48b..e2c255db 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -43,6 +43,6 @@ start_channel(Pid, Args) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{channel_sup, {rabbit_channel_sup, start_link, []}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index d6536e16..843bb615 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -49,8 +49,9 @@ start_link_worker(SupName, Callback) -> supervisor2:start_link(SupName, ?MODULE, {Callback, worker}). init({M,F,A}) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}; init({{M,F,A}, worker}) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{client, {M,F,A}, temporary, ?MAX_WAIT, worker, [M]}]}}. + diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 9002514f..a7ee3276 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -35,8 +35,10 @@ {rabbit_types:username(), rabbit_types:password()}), rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> - {'ok', {rabbit_types:user(), - rabbit_framing:amqp_table()}}). + rabbit_types:ok_or_error2( + {rabbit_types:user(), rabbit_framing:amqp_table()}, + 'broker_not_found_on_node' | 'auth_failure' | + 'access_refused')). -spec(start_channel/9 :: (rabbit_channel:channel_number(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), @@ -76,21 +78,23 @@ connect(User = #user{}, VHost, Protocol, Pid, Infos) -> end; connect({Username, Password}, VHost, Protocol, Pid, Infos) -> - connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid, - Infos); + connect0(fun () -> rabbit_access_control:check_user_pass_login( + Username, Password) end, + VHost, Protocol, Pid, Infos); connect(Username, VHost, Protocol, Pid, Infos) -> - connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos). + connect0(fun () -> rabbit_access_control:check_user_login( + Username, []) end, + VHost, Protocol, Pid, Infos). -connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) -> +connect0(AuthFun, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of - true -> - case rabbit_access_control:FunctionName(U, P) of - {ok, User} -> connect(User, VHost, Protocol, Pid, Infos); - {refused, _M, _A} -> {error, auth_failure} - end; - false -> - {error, broker_not_found_on_node} + true -> case AuthFun() of + {ok, User} -> connect(User, VHost, Protocol, Pid, + Infos); + {refused, _M, _A} -> {error, auth_failure} + end; + false -> {error, broker_not_found_on_node} end. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 4d3ddc79..a713d76b 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -141,8 +141,6 @@ notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> - %% TODO: switch to os:timestamp() when we drop support for - %% Erlang/OTP < R13B01 gen_event:notify(?MODULE, #event{type = Type, props = Props, - timestamp = now()}). + timestamp = os:timestamp()}). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index df9baed9..fac74edb 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -19,6 +19,8 @@ -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). +-export([system_continue/3, system_terminate/4, system_code_change/4]). + -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -51,6 +53,10 @@ -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,{_, _}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + -endif. %%---------------------------------------------------------------------------- @@ -88,6 +94,15 @@ pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. +system_continue(_Parent, Deb, {Params, State}) -> + heartbeater(Params, Deb, State). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). + +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. + %%---------------------------------------------------------------------------- start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> {ok, none}; @@ -98,17 +113,27 @@ start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> - {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, V) end, + Deb, {StatVal, SameCount} = State) -> + Recurse = fun (State1) -> heartbeater(Params, Deb, State1) end, + System = fun (From, Req) -> + sys:handle_system_msg( + Req, From, self(), ?MODULE, Deb, {Params, State}) + end, receive - pause -> receive - resume -> Recurse({0, 0}); - Other -> exit({unexpected_message, Other}) - end; - Other -> exit({unexpected_message, Other}) + pause -> + receive + resume -> Recurse({0, 0}); + {system, From, Req} -> System(From, Req); + Other -> exit({unexpected_message, Other}) + end; + {system, From, Req} -> + System(From, Req); + Other -> + exit({unexpected_message, Other}) after TimeoutMillisec -> case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 495f6fdd..b8d8023e 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -43,17 +43,6 @@ -define(DEFAULT_UPDATE_INTERVAL, 2500). -define(TABLE_NAME, ?MODULE). -%% Because we have a feedback loop here, we need to ensure that we -%% have some space for when the queues don't quite respond as fast as -%% we would like, or when there is buffering going on in other parts -%% of the system. In short, we aim to stay some distance away from -%% when the memory alarms will go off, which cause backpressure (of -%% some sort) on producers. Note that all other Thresholds are -%% relative to this scaling. --define(MEMORY_LIMIT_SCALING, 0.4). - --define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this - %% If all queues are pushed to disk (duration 0), then the sum of %% their reported lengths will be 0. If memory then becomes available, %% unless we manually intervene, the sum will remain 0, and the queues @@ -207,7 +196,9 @@ internal_update(State = #state { queue_durations = Durations, desired_duration = DesiredDurationAvg, queue_duration_sum = Sum, queue_duration_count = Count }) -> - MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(), + {ok, LimitThreshold} = + application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), + MemoryLimit = vm_memory_monitor:get_memory_limit(), MemoryRatio = case MemoryLimit > 0.0 of true -> erlang:memory(total) / MemoryLimit; false -> infinity @@ -215,7 +206,7 @@ internal_update(State = #state { queue_durations = Durations, DesiredDurationAvg1 = if MemoryRatio =:= infinity -> 0.0; - MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 -> + MemoryRatio < LimitThreshold orelse Count == 0 -> infinity; MemoryRatio < ?SUM_INC_THRESHOLD -> ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 335b7c81..eded0411 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -204,8 +204,6 @@ start_child(Name, MirrorNode, Q) -> report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> - rabbit_event:notify(queue_mirror_deaths, [{name, QueueName}, - {pids, DeadPids}]), rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", [rabbit_misc:rs(QueueName), case IsMaster of diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1996fd0a..82819cbe 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -41,15 +41,13 @@ %%---------------------------------------------------------------------------- --define(CREATION_EVENT_KEYS, +-define(INFO_KEYS, [pid, name, master_pid, is_synchronised ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS). - -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -124,8 +122,6 @@ init(Q = #amqqueue { name = QName }) -> depth_delta = undefined }, - rabbit_event:notify(queue_slave_created, - infos(?CREATION_EVENT_KEYS, State)), ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), {ok, State, hibernate, diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 43dbb4e9..6fba99db 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -31,7 +31,7 @@ start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> - {ok, {{simple_one_for_one_terminate, 10, 10}, + {ok, {{simple_one_for_one, 10, 10}, [{rabbit_mirror_queue_slave, {rabbit_mirror_queue_slave, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 5fa29b7e..85958400 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -396,7 +396,7 @@ cluster_status(WhichNodes) -> node_info() -> {erlang:system_info(otp_release), rabbit_misc:version(), - delegate_beam_hash(), cluster_status_from_mnesia()}. + cluster_status_from_mnesia()}. node_type() -> DiscNodes = cluster_nodes(disc), @@ -459,10 +459,11 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) -> %% about the cluster case NodeType of ram -> start_mnesia(), - change_extra_db_nodes(ClusterNodes, false), - rabbit_table:wait_for_replicated(); + change_extra_db_nodes(ClusterNodes, false); disc -> ok end, + %% ...and all nodes will need to wait for tables + rabbit_table:wait_for_replicated(), ok. init_db_with_mnesia(ClusterNodes, NodeType, @@ -569,16 +570,16 @@ check_cluster_consistency(Node) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, _Hash, {error, _}} -> + {_OTP, _Rabbit, {error, _}} -> {error, not_found}; - {_OTP, Rabbit, _Status} -> - %% pre-2013/04 format implies version mismatch - version_error("Rabbit", rabbit_misc:version(), Rabbit); - {OTP, Rabbit, Hash, {ok, Status}} -> - case check_consistency(OTP, Rabbit, Hash, Node, Status) of + {OTP, Rabbit, {ok, Status}} -> + case check_consistency(OTP, Rabbit, Node, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} - end + end; + {_OTP, Rabbit, _Hash, _Status} -> + %% delegate hash checking implies version mismatch + version_error("Rabbit", rabbit_misc:version(), Rabbit) end. %%-------------------------------------------------------------------- @@ -742,17 +743,15 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit, Hash) -> +check_consistency(OTP, Rabbit) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), - check_rabbit_consistency(Rabbit), - check_beam_compatibility(Hash)]). + check_rabbit_consistency(Rabbit)]). -check_consistency(OTP, Rabbit, Hash, Node, Status) -> +check_consistency(OTP, Rabbit, Node, Status) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit), - check_beam_compatibility(Hash), check_nodes_consistency(Node, Status)]). check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> @@ -783,31 +782,11 @@ version_error(Name, This, Remote) -> check_otp_consistency(Remote) -> check_version_consistency(erlang:system_info(otp_release), Remote, "OTP"). -%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be -%% removed after 3.1.0 is released. -check_rabbit_consistency("3.0.0") -> - version_error("Rabbit", rabbit_misc:version(), "3.0.0"); - check_rabbit_consistency(Remote) -> check_version_consistency( rabbit_misc:version(), Remote, "Rabbit", fun rabbit_misc:version_minor_equivalent/2). -check_beam_compatibility(RemoteHash) -> - case RemoteHash == delegate_beam_hash() of - true -> ok; - false -> {error, {incompatible_bytecode, - "Incompatible Erlang bytecode found on nodes"}} - end. - -%% The delegate module sends functions across the cluster; if it is -%% out of sync (say due to mixed compilers), we will get badfun -%% exceptions when trying to do so. Let's detect that at startup. -delegate_beam_hash() -> - {delegate, Obj, _} = code:get_object_code(delegate), - {ok, {delegate, Hash}} = beam_lib:md5(Obj), - Hash. - %% This is fairly tricky. We want to know if the node is in the state %% that a `reset' would leave it in. We cannot simply check if the %% mnesia tables aren't there because restarted RAM nodes won't have @@ -833,12 +812,13 @@ find_good_node([]) -> none; find_good_node([Node | Nodes]) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _Reason} -> find_good_node(Nodes); - {_OTP, _Rabbit, _} -> find_good_node(Nodes); - {OTP, Rabbit, Hash, _} -> case check_consistency(OTP, Rabbit, Hash) of - {error, _} -> find_good_node(Nodes); - ok -> {ok, Node} - end + {badrpc, _Reason} -> find_good_node(Nodes); + %% old delegate hash check + {_OTP, _Rabbit, _Hash, _} -> find_good_node(Nodes); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> find_good_node(Nodes); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 6355f935..948d2ab0 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -121,7 +121,6 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> fmt_missing("dependencies", MissingDeps)}) end, write_enabled_plugins(PluginsFile, NewEnabled), - maybe_warn_mochiweb(NewImplicitlyEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", @@ -263,25 +262,6 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -maybe_warn_mochiweb(Enabled) -> - V = erlang:system_info(otp_release), - case lists:member(mochiweb, Enabled) andalso V < "R13B01" of - true -> - Stars = string:copies("*", 80), - io:format("~n~n~s~n" - " Warning: Mochiweb enabled and Erlang version ~s " - "detected.~n" - " Enabling plugins that depend on Mochiweb is not " - "supported on this Erlang~n" - " version. At least R13B01 is required.~n~n" - " RabbitMQ will not start successfully in this " - "configuration. You *must*~n" - " disable the Mochiweb plugin, or upgrade Erlang.~n" - "~s~n~n~n", [Stars, V, Stars]); - false -> - ok - end. - report_change() -> io:format("Plugin configuration has changed. " "Restart RabbitMQ for changes to take effect.~n"). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0908fb73..f69d8355 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,7 +23,7 @@ -export([scan/3]). --export([add_queue_ttl/0]). +-export([add_queue_ttl/0, avoid_zeroes/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -123,9 +123,9 @@ -define(REL_SEQ_BITS, 14). -define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). -%% seq only is binary 00 followed by 14 bits of rel seq id +%% seq only is binary 01 followed by 14 bits of rel seq id %% (range: 0 - 16383) --define(REL_SEQ_ONLY_PREFIX, 00). +-define(REL_SEQ_ONLY_PREFIX, 01). -define(REL_SEQ_ONLY_PREFIX_BITS, 2). -define(REL_SEQ_ONLY_RECORD_BYTES, 2). @@ -171,6 +171,7 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({add_queue_ttl, local, []}). +-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}). -ifdef(use_specs). @@ -715,7 +716,11 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of - {ok, Bin} -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} -> + State; + {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} -> {MsgId, MsgProps} = parse_pub_record_body(Bin), IsPersistent = case Prefix of ?PUB_PERSIST_JPREFIX -> true; @@ -1068,6 +1073,21 @@ add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, add_queue_ttl_segment(_) -> stop. +avoid_zeroes() -> + foreach_queue_index({none, fun avoid_zeroes_segment/1}). + +avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, + RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS, + Expiry:?EXPIRY_BITS, Rest/binary>>) -> + {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS, + MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest}; +avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> + {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, + Rest}; +avoid_zeroes_segment(_) -> + stop. + %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> @@ -1092,7 +1112,9 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)], ok = gatherer:finish(Gatherer). -transform_file(Path, Fun) -> +transform_file(_Path, none) -> + ok; +transform_file(Path, Fun) when is_function(Fun)-> PathTmp = Path ++ ".upgrade", case rabbit_file:file_size(Path) of 0 -> ok; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9b6039d1..9c902703 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -45,7 +45,8 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, + blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -120,7 +121,7 @@ init(Parent, ChSup3Pid, Collector, StartHeartbeatFun) -> end. system_continue(Parent, Deb, State) -> - ?MODULE:mainloop(Deb, State#v1{parent = Parent}). + mainloop(Deb, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -142,8 +143,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_resources(Pid, _Source, Conserve) -> - Pid ! {conserve_resources, Conserve}, +conserve_resources(Pid, Source, Conserve) -> + Pid ! {conserve_resources, Source, Conserve}, ok. server_properties(Protocol) -> @@ -178,7 +179,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> [{<<"publisher_confirms">>, bool, true}, {<<"exchange_exchange_bindings">>, bool, true}, {<<"basic.nack">>, bool, true}, - {<<"consumer_cancel_notify">>, bool, true}]; + {<<"consumer_cancel_notify">>, bool, true}, + {<<"connection.blocked">>, bool, true}]; server_capabilities(_) -> []. @@ -246,9 +248,10 @@ start_connection(Parent, ChSup3Pid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, throttle = #throttle{ - conserve_resources = false, - last_blocked_by = none, - last_blocked_at = never}}, + alarmed_by = [], + last_blocked_by = none, + last_blocked_at = never, + blocked_sent = false}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -321,9 +324,14 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. -handle_other({conserve_resources, Conserve}, - State = #v1{throttle = Throttle}) -> - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, +handle_other({conserve_resources, Source, Conserve}, + State = #v1{throttle = Throttle = + #throttle{alarmed_by = CR}}) -> + CR1 = case Conserve of + true -> lists:usort([Source | CR]); + false -> CR -- [Source] + end, + Throttle1 = Throttle#throttle{alarmed_by = CR1}, control_throttle(State#v1{throttle = Throttle1}); handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), @@ -409,30 +417,61 @@ terminate(_Explanation, State) -> {force, State}. control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> - case {CS, (Throttle#throttle.conserve_resources orelse - credit_flow:blocked())} of + IsThrottled = ((Throttle#throttle.alarmed_by =/= []) orelse + credit_flow:blocked()), + case {CS, IsThrottled} of {running, true} -> State#v1{connection_state = blocking}; {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - State#v1{connection_state = running}; + maybe_send_unblocked(State), + State#v1{connection_state = running, + throttle = Throttle#throttle{ + blocked_sent = false}}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State end. -maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) -> +maybe_block(State = #v1{connection_state = blocking, + throttle = Throttle}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + Sent = maybe_send_blocked(State), State#v1{connection_state = blocked, throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now()})}; + Throttle#throttle{last_blocked_at = erlang:now(), + blocked_sent = Sent})}; maybe_block(State) -> State. -update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) -> - Throttle#throttle{last_blocked_by = resource}; -update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) -> - Throttle#throttle{last_blocked_by = flow}. +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = []}}) -> + false; +maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, + connection = #connection{ + protocol = Protocol, + capabilities = Capabilities}, + sock = Sock}) -> + case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of + {bool, true} -> + RStr = string:join([atom_to_list(A) || A <- CR], " & "), + Reason = list_to_binary(rabbit_misc:format("low on ~s", [RStr])), + ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason}, + Protocol), + true; + _ -> + false + end. + +maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> + ok; +maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, + sock = Sock}) -> + ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). + +update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> + Throttle#throttle{last_blocked_by = flow}; +update_last_blocked_by(Throttle) -> + Throttle#throttle{last_blocked_by = resource}. %%-------------------------------------------------------------------------- %% error handling / termination @@ -847,7 +886,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), - Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + Throttle1 = Throttle#throttle{alarmed_by = Conserve}, {ok, ChannelSupSupPid} = supervisor2:start_child( ChSup3Pid, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index c0b1f8e4..bf6964d8 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -19,6 +19,9 @@ -include("rabbit_framing.hrl"). -export([start/5, start_link/5, start/6, start_link/6]). + +-export([system_continue/3, system_terminate/4, system_code_change/4]). + -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, @@ -26,7 +29,7 @@ -export([internal_send_command/4, internal_send_command/6]). %% internal --export([mainloop/1, mainloop1/1]). +-export([mainloop/2, mainloop1/2]). -record(wstate, {sock, channel, frame_max, protocol, reader, stats_timer, pending}). @@ -53,6 +56,11 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) -> rabbit_types:ok(pid())). + +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,#wstate{}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -94,12 +102,14 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -113,28 +123,44 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> pending = []}, #wstate.stats_timer). -mainloop(State) -> +system_continue(Parent, Deb, State) -> + mainloop(Deb, State#wstate{reader = Parent}). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). + +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. + +mainloop(Deb, State) -> try - mainloop1(State) + mainloop1(Deb, State) catch exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(State = #wstate{pending = []}) -> +mainloop1(Deb, State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) + Message -> {Deb1, State1} = handle_message(Deb, Message, State), + ?MODULE:mainloop1(Deb1, State1) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [Deb, State]) end; -mainloop1(State) -> +mainloop1(Deb, State) -> receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) + Message -> {Deb1, State1} = handle_message(Deb, Message, State), + ?MODULE:mainloop1(Deb1, State1) after 0 -> - ?MODULE:mainloop1(internal_flush(State)) + ?MODULE:mainloop1(Deb, internal_flush(State)) end. +handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State); +handle_message(Deb, Message, State) -> + {Deb, handle_message(Message, State)}. + handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 23bfe7f1..5a6dc887 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -1,14 +1,18 @@ -%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP +%% This file is a copy of supervisor.erl from the R16B Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is supervisor2 %% -%% 2) there is a new strategy called -%% simple_one_for_one_terminate. This is exactly the same as for -%% simple_one_for_one, except that children *are* explicitly -%% terminated as per the shutdown component of the child_spec. +%% 2) a find_child/2 utility function has been added %% -%% 3) child specifications can contain, as the restart type, a tuple +%% 3) Added an 'intrinsic' restart type. Like the transient type, this +%% type means the child should only be restarted if the child exits +%% abnormally. Unlike the transient type, if the child exits +%% normally, the supervisor itself also exits normally. If the +%% child is a supervisor and it exits normally (i.e. with reason of +%% 'shutdown') then the child's parent also exits normally. +%% +%% 4) child specifications can contain, as the restart type, a tuple %% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay} %% where Delay >= 0 (see point (4) below for intrinsic). The delay, %% in seconds, indicates what should happen if a child, upon being @@ -41,13 +45,6 @@ %% perspective it's a normal exit, whilst from supervisor's %% perspective, it's an abnormal exit. %% -%% 4) Added an 'intrinsic' restart type. Like the transient type, this -%% type means the child should only be restarted if the child exits -%% abnormally. Unlike the transient type, if the child exits -%% normally, the supervisor itself also exits normally. If the -%% child is a supervisor and it exits normally (i.e. with reason of -%% 'shutdown') then the child's parent also exits normally. -%% %% 5) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% @@ -55,7 +52,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% Copyright Ericsson AB 1996-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -75,61 +72,34 @@ -behaviour(gen_server). %% External exports --export([start_link/2,start_link/3, +-export([start_link/2, start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, find_child/2, - check_childspecs/1]). + which_children/1, count_children/1, + find_child/2, check_childspecs/1]). %% Internal exports --export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). --export([handle_cast/2]). - --define(DICT, dict). - --record(state, {name, - strategy, - children = [], - dynamics = ?DICT:new(), - intensity, - period, - restarts = [], - module, - args}). - --record(child, {pid = undefined, % pid is undefined when child is not running - name, - mfa, - restart_type, - shutdown, - child_type, - modules = []}). - --define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse - State#state.strategy =:= simple_one_for_one_terminate). --define(is_terminate_simple(State), - State#state.strategy =:= simple_one_for_one_terminate). - --ifdef(use_specs). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([try_again_restart/3]). %%-------------------------------------------------------------------------- -%% Types +-ifdef(use_specs). +-export_type([child_spec/0, startchild_ret/0, strategy/0]). +-endif. %%-------------------------------------------------------------------------- --export_type([child_spec/0, startchild_ret/0, strategy/0, sup_name/0]). - --type child() :: 'undefined' | pid(). +-ifdef(use_specs). +-type child() :: 'undefined' | pid(). -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. --type modules() :: [module()] | 'dynamic'. --type delay() :: non_neg_integer(). --type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' - | {'permanent', delay()} | {'transient', delay()} - | {'intrinsic', delay()}. +-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. +-type modules() :: [module()] | 'dynamic'. +-type delay() :: non_neg_integer(). +-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' | {'permanent', delay()} | {'transient', delay()} | {'intrinsic', delay()}. -type shutdown() :: 'brutal_kill' | timeout(). --type worker() :: 'worker' | 'supervisor'. +-type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. --type sup_ref() :: (Name :: atom()) +-type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). @@ -140,40 +110,110 @@ Type :: worker(), Modules :: modules()}. - -type strategy() :: 'one_for_all' | 'one_for_one' - | 'rest_for_one' | 'simple_one_for_one' - | 'simple_one_for_one_terminate'. - --type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()], - name :: child_id(), - mfa :: mfargs(), - restart_type :: restart(), - shutdown :: shutdown(), - child_type :: worker(), - modules :: modules()}. - --type state() :: #state{strategy :: strategy(), - children :: [child_rec()], - dynamics :: ?DICT(), - intensity :: non_neg_integer(), - period :: pos_integer()}. + | 'rest_for_one' | 'simple_one_for_one'. +-endif. %%-------------------------------------------------------------------------- -%% Callback behaviour -%%-------------------------------------------------------------------------- +-ifdef(use_specs). +-record(child, {% pid is undefined when child is not running + pid = undefined :: child() | {restarting,pid()} | [pid()], + name :: child_id(), + mfargs :: mfargs(), + restart_type :: restart(), + shutdown :: shutdown(), + child_type :: worker(), + modules = [] :: modules()}). +-type child_rec() :: #child{}. +-else. +-record(child, { + pid = undefined, + name, + mfargs, + restart_type, + shutdown, + child_type, + modules = []}). +-endif. + +-define(DICT, dict). +-define(SETS, sets). +-define(SET, set). + +-ifdef(use_specs). +-record(state, {name, + strategy :: strategy(), + children = [] :: [child_rec()], + dynamics :: ?DICT() | ?SET(), + intensity :: non_neg_integer(), + period :: pos_integer(), + restarts = [], + module, + args}). +-type state() :: #state{}. +-else. +-record(state, {name, + strategy, + children = [], + dynamics, + intensity, + period, + restarts = [], + module, + args}). +-endif. + +-define(is_simple(State), State#state.strategy =:= simple_one_for_one). +-define(is_permanent(R), ((R =:= permanent) orelse + (is_tuple(R) andalso + tuple_size(R) == 2 andalso + element(1, R) =:= permanent))). +-define(is_explicit_restart(R), + R == {shutdown, restart}). + +-ifdef(use_specs). -callback init(Args :: term()) -> {ok, {{RestartStrategy :: strategy(), - MaxR :: non_neg_integer(), - MaxT :: non_neg_integer()}, + MaxR :: non_neg_integer(), + MaxT :: non_neg_integer()}, [ChildSpec :: child_spec()]}} | ignore. +-endif. +-define(restarting(_Pid_), {restarting,_Pid_}). -%%-------------------------------------------------------------------------- -%% Specs -%%-------------------------------------------------------------------------- +%%% --------------------------------------------------- +%%% This is a general process supervisor built upon gen_server.erl. +%%% Servers/processes should/could also be built using gen_server.erl. +%%% SupName = {local, atom()} | {global, atom()}. +%%% --------------------------------------------------- +-ifdef(use_specs). +-type startlink_err() :: {'already_started', pid()} + | {'shutdown', term()} + | term(). +-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. +-spec start_link(Module, Args) -> startlink_ret() when + Module :: module(), + Args :: term(). + +-endif. +start_link(Mod, Args) -> + gen_server:start_link(?MODULE, {self, Mod, Args}, []). + +-ifdef(use_specs). +-spec start_link(SupName, Module, Args) -> startlink_ret() when + SupName :: sup_name(), + Module :: module(), + Args :: term(). +-endif. +start_link(SupName, Mod, Args) -> + gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). + +%%% --------------------------------------------------- +%%% Interface functions. +%%% --------------------------------------------------- +-ifdef(use_specs). -type startchild_err() :: 'already_present' | {'already_started', Child :: child()} | term(). -type startchild_ret() :: {'ok', Child :: child()} @@ -183,91 +223,30 @@ -spec start_child(SupRef, ChildSpec) -> startchild_ret() when SupRef :: sup_ref(), ChildSpec :: child_spec() | (List :: [term()]). +-endif. +start_child(Supervisor, ChildSpec) -> + call(Supervisor, {start_child, ChildSpec}). +-ifdef(use_specs). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), Id :: child_id(), Result :: {'ok', Child :: child()} | {'ok', Child :: child(), Info :: term()} | {'error', Error}, - Error :: 'running' | 'not_found' | 'simple_one_for_one' | term(). + Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one' | + term(). +-endif. +restart_child(Supervisor, Name) -> + call(Supervisor, {restart_child, Name}). +-ifdef(use_specs). -spec delete_child(SupRef, Id) -> Result when SupRef :: sup_ref(), Id :: child_id(), Result :: 'ok' | {'error', Error}, - Error :: 'running' | 'not_found' | 'simple_one_for_one'. - --spec terminate_child(SupRef, Id) -> Result when - SupRef :: sup_ref(), - Id :: pid() | child_id(), - Result :: 'ok' | {'error', Error}, - Error :: 'not_found' | 'simple_one_for_one'. - --spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when - SupRef :: sup_ref(), - Id :: child_id() | 'undefined', - Child :: child(), - Type :: worker(), - Modules :: modules(). - --spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], - Result :: 'ok' | {'error', Error :: term()}. - --type init_sup_name() :: sup_name() | 'self'. - --type stop_rsn() :: 'shutdown' | {'bad_return', {module(),'init', term()}} - | {'bad_start_spec', term()} | {'start_spec', term()} - | {'supervisor_data', term()}. - --spec init({init_sup_name(), module(), [term()]}) -> - {'ok', state()} | 'ignore' | {'stop', stop_rsn()}. - --type call() :: 'which_children'. --spec handle_call(call(), term(), state()) -> {'reply', term(), state()}. - --spec handle_cast('null', state()) -> {'noreply', state()}. - --spec handle_info(term(), state()) -> - {'noreply', state()} | {'stop', 'shutdown', state()}. - --spec terminate(term(), state()) -> 'ok'. - --spec code_change(term(), state(), term()) -> - {'ok', state()} | {'error', term()}. - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{init,1}]; -behaviour_info(_Other) -> - undefined. - + Error :: 'running' | 'restarting' | 'not_found' | 'simple_one_for_one'. -endif. - -%%% --------------------------------------------------- -%%% This is a general process supervisor built upon gen_server.erl. -%%% Servers/processes should/could also be built using gen_server.erl. -%%% SupName = {local, atom()} | {global, atom()}. -%%% --------------------------------------------------- -start_link(Mod, Args) -> - gen_server:start_link(?MODULE, {self, Mod, Args}, []). - -start_link(SupName, Mod, Args) -> - gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). - -%%% --------------------------------------------------- -%%% Interface functions. -%%% --------------------------------------------------- -start_child(Supervisor, ChildSpec) -> - call(Supervisor, {start_child, ChildSpec}). - -restart_child(Supervisor, Name) -> - call(Supervisor, {restart_child, Name}). - delete_child(Supervisor, Name) -> call(Supervisor, {delete_child, Name}). @@ -277,12 +256,44 @@ delete_child(Supervisor, Name) -> %% Note that the child is *always* terminated in some %% way (maybe killed). %%----------------------------------------------------------------- +-ifdef(use_specs). +-spec terminate_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: pid() | child_id(), + Result :: 'ok' | {'error', Error}, + Error :: 'not_found' | 'simple_one_for_one'. +-endif. terminate_child(Supervisor, Name) -> call(Supervisor, {terminate_child, Name}). +-ifdef(use_specs). +-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when + SupRef :: sup_ref(), + Id :: child_id() | undefined, + Child :: child() | 'restarting', + Type :: worker(), + Modules :: modules(). +-endif. which_children(Supervisor) -> call(Supervisor, which_children). +-ifdef(use_specs). +-spec count_children(SupRef) -> PropListOfCounts when + SupRef :: sup_ref(), + PropListOfCounts :: [Count], + Count :: {specs, ChildSpecCount :: non_neg_integer()} + | {active, ActiveProcessCount :: non_neg_integer()} + | {supervisors, ChildSupervisorCount :: non_neg_integer()} + |{workers, ChildWorkerCount :: non_neg_integer()}. +-endif. +count_children(Supervisor) -> + call(Supervisor, count_children). + +-ifdef(use_specs). +-spec find_child(Supervisor, Name) -> [pid()] when + Supervisor :: sup_ref(), + Name :: child_id(). +-endif. find_child(Supervisor, Name) -> [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), Name1 =:= Name]. @@ -290,6 +301,11 @@ find_child(Supervisor, Name) -> call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). +-ifdef(use_specs). +-spec check_childspecs(ChildSpecs) -> Result when + ChildSpecs :: [child_spec()], + Result :: 'ok' | {'error', Error :: term()}. +-endif. check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> case check_startspec(ChildSpecs) of {ok, _} -> ok; @@ -297,11 +313,37 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. +%%%----------------------------------------------------------------- +%%% Called by timer:apply_after from restart/2 +-ifdef(use_specs). +-spec try_again_restart(SupRef, Child, Reason) -> ok when + SupRef :: sup_ref(), + Child :: child_id() | pid(), + Reason :: term(). +-endif. +try_again_restart(Supervisor, Child, Reason) -> + cast(Supervisor, {try_again_restart, Child, Reason}). + +cast(Supervisor, Req) -> + gen_server:cast(Supervisor, Req). + %%% --------------------------------------------------- -%%% +%%% %%% Initialize the supervisor. -%%% +%%% %%% --------------------------------------------------- +-ifdef(use_specs). +-type init_sup_name() :: sup_name() | 'self'. + +-type stop_rsn() :: {'shutdown', term()} + | {'bad_return', {module(),'init', term()}} + | {'bad_start_spec', term()} + | {'start_spec', term()} + | {'supervisor_data', term()}. + +-spec init({init_sup_name(), module(), [term()]}) -> + {'ok', state()} | 'ignore' | {'stop', stop_rsn()}. +-endif. init({SupName, Mod, Args}) -> process_flag(trap_exit, true), case Mod:init(Args) of @@ -327,9 +369,9 @@ init_children(State, StartSpec) -> case start_children(Children, SupName) of {ok, NChildren} -> {ok, State#state{children = NChildren}}; - {error, NChildren} -> + {error, NChildren, Reason} -> terminate_children(NChildren, SupName), - {stop, shutdown} + {stop, {shutdown, Reason}} end; Error -> {stop, {start_spec, Error}} @@ -347,32 +389,35 @@ init_dynamic(_State, StartSpec) -> %%----------------------------------------------------------------- %% Func: start_children/2 -%% Args: Children = [#child] in start order -%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Args: Children = [child_rec()] in start order +%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod} %% Purpose: Start all children. The new list contains #child's %% with pids. -%% Returns: {ok, NChildren} | {error, NChildren} -%% NChildren = [#child] in termination order (reversed +%% Returns: {ok, NChildren} | {error, NChildren, Reason} +%% NChildren = [child_rec()] in termination order (reversed %% start order) %%----------------------------------------------------------------- start_children(Children, SupName) -> start_children(Children, [], SupName). start_children([Child|Chs], NChildren, SupName) -> case do_start_child(SupName, Child) of + {ok, undefined} when Child#child.restart_type =:= temporary -> + start_children(Chs, NChildren, SupName); {ok, Pid} -> start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); {ok, Pid, _Extra} -> start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); {error, Reason} -> report_error(start_error, Reason, Child, SupName), - {error, lists:reverse(Chs) ++ [Child | NChildren]} + {error, lists:reverse(Chs) ++ [Child | NChildren], + {failed_to_start_child,Child#child.name,Reason}} end; start_children([], NChildren, _SupName) -> {ok, NChildren}. do_start_child(SupName, Child) -> - #child{mfa = {M, F, A}} = Child, - case catch apply(M, F, A) of + #child{mfargs = {M, F, Args}} = Child, + case catch apply(M, F, Args) of {ok, Pid} when is_pid(Pid) -> NChild = Child#child{pid = Pid}, report_progress(NChild, SupName), @@ -401,35 +446,54 @@ do_start_child_i(M, F, A) -> {error, What} end. - %%% --------------------------------------------------- -%%% +%%% %%% Callback functions. -%%% +%%% %%% --------------------------------------------------- +-ifdef(use_specs). +-type call() :: 'which_children' | 'count_children' | {_, _}. % XXX: refine +-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}. +-endif. handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> - #child{mfa = {M, F, A}} = hd(State#state.children), + Child = hd(State#state.children), + #child{mfargs = {M, F, A}} = Child, Args = A ++ EArgs, case do_start_child_i(M, F, Args) of - {ok, undefined} -> - {reply, {ok, undefined}, State}; + {ok, undefined} when Child#child.restart_type =:= temporary -> + {reply, {ok, undefined}, State}; {ok, Pid} -> - NState = State#state{dynamics = - ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State), {reply, {ok, Pid}, NState}; {ok, Pid, Extra} -> - NState = State#state{dynamics = - ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = save_dynamic_child(Child#child.restart_type, Pid, Args, State), {reply, {ok, Pid, Extra}, NState}; What -> {reply, What, State} end; -%%% The requests terminate_child, delete_child and restart_child are -%%% invalid for simple_one_for_one and simple_one_for_one_terminate -%%% supervisors. +%% terminate_child for simple_one_for_one can only be done with pid +handle_call({terminate_child, Name}, _From, State) when not is_pid(Name), + ?is_simple(State) -> + {reply, {error, simple_one_for_one}, State}; + +handle_call({terminate_child, Name}, _From, State) -> + case get_child(Name, State, ?is_simple(State)) of + {value, Child} -> + case do_terminate(Child, State#state.name) of + #child{restart_type=RT} when RT=:=temporary; ?is_simple(State) -> + {reply, ok, state_del_child(Child, State)}; + NChild -> + {reply, ok, replace_child(NChild, State)} + end; + false -> + {reply, {error, not_found}, State} + end; + +%%% The requests delete_child and restart_child are invalid for +%%% simple_one_for_one supervisors. handle_call({_Req, _Data}, _From, State) when ?is_simple(State) -> - {reply, {error, State#state.strategy}, State}; + {reply, {error, simple_one_for_one}, State}; handle_call({start_child, ChildSpec}, _From, State) -> case check_childspec(ChildSpec) of @@ -453,6 +517,8 @@ handle_call({restart_child, Name}, _From, State) -> Error -> {reply, Error, State} end; + {value, #child{pid=?restarting(_)}} -> + {reply, {error, restarting}, State}; {value, _} -> {reply, {error, running}, State}; _ -> @@ -464,60 +530,146 @@ handle_call({delete_child, Name}, _From, State) -> {value, Child} when Child#child.pid =:= undefined -> NState = remove_child(Child, State), {reply, ok, NState}; + {value, #child{pid=?restarting(_)}} -> + {reply, {error, restarting}, State}; {value, _} -> {reply, {error, running}, State}; _ -> {reply, {error, not_found}, State} end; -handle_call({terminate_child, Name}, _From, State) -> - case get_child(Name, State) of - {value, Child} -> - NChild = do_terminate(Child, State#state.name), - {reply, ok, replace_child(NChild, State)}; - _ -> - {reply, {error, not_found}, State} - end; +handle_call(which_children, _From, #state{children = [#child{restart_type = temporary, + child_type = CT, + modules = Mods}]} = + State) when ?is_simple(State) -> + Reply = lists:map(fun(Pid) -> {undefined, Pid, CT, Mods} end, + ?SETS:to_list(dynamics_db(temporary, State#state.dynamics))), + {reply, Reply, State}; -handle_call(which_children, _From, State) when ?is_simple(State) -> - [#child{child_type = CT, modules = Mods}] = State#state.children, - Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, - ?DICT:to_list(State#state.dynamics)), +handle_call(which_children, _From, #state{children = [#child{restart_type = RType, + child_type = CT, + modules = Mods}]} = + State) when ?is_simple(State) -> + Reply = lists:map(fun({?restarting(_),_}) -> {undefined,restarting,CT,Mods}; + ({Pid, _}) -> {undefined, Pid, CT, Mods} end, + ?DICT:to_list(dynamics_db(RType, State#state.dynamics))), {reply, Reply, State}; handle_call(which_children, _From, State) -> Resp = - lists:map(fun (#child{pid = Pid, name = Name, + lists:map(fun(#child{pid = ?restarting(_), name = Name, + child_type = ChildType, modules = Mods}) -> + {Name, restarting, ChildType, Mods}; + (#child{pid = Pid, name = Name, child_type = ChildType, modules = Mods}) -> - {Name, Pid, ChildType, Mods} + {Name, Pid, ChildType, Mods} end, State#state.children), - {reply, Resp, State}. + {reply, Resp, State}; -%%% Hopefully cause a function-clause as there is no API function -%%% that utilizes cast. -handle_cast(null, State) -> - error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", - []), - {noreply, State}. +handle_call(count_children, _From, #state{children = [#child{restart_type = temporary, + child_type = CT}]} = State) + when ?is_simple(State) -> + {Active, Count} = + ?SETS:fold(fun(Pid, {Alive, Tot}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true ->{Alive+1, Tot +1}; + false -> + {Alive, Tot + 1} + end + end, {0, 0}, dynamics_db(temporary, State#state.dynamics)), + Reply = case CT of + supervisor -> [{specs, 1}, {active, Active}, + {supervisors, Count}, {workers, 0}]; + worker -> [{specs, 1}, {active, Active}, + {supervisors, 0}, {workers, Count}] + end, + {reply, Reply, State}; -handle_info({delayed_restart, {RestartType, Reason, Child}}, State) +handle_call(count_children, _From, #state{children = [#child{restart_type = RType, + child_type = CT}]} = State) when ?is_simple(State) -> - {ok, NState} = do_restart(RestartType, Reason, Child, State), - {noreply, NState}; -handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> - case get_child(Child#child.name, State) of - {value, Child1} -> - {ok, NState} = do_restart(RestartType, Reason, Child1, State), - {noreply, NState}; - _ -> + {Active, Count} = + ?DICT:fold(fun(Pid, _Val, {Alive, Tot}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> + {Alive+1, Tot +1}; + false -> + {Alive, Tot + 1} + end + end, {0, 0}, dynamics_db(RType, State#state.dynamics)), + Reply = case CT of + supervisor -> [{specs, 1}, {active, Active}, + {supervisors, Count}, {workers, 0}]; + worker -> [{specs, 1}, {active, Active}, + {supervisors, 0}, {workers, Count}] + end, + {reply, Reply, State}; + +handle_call(count_children, _From, State) -> + %% Specs and children are together on the children list... + {Specs, Active, Supers, Workers} = + lists:foldl(fun(Child, Counts) -> + count_child(Child, Counts) + end, {0,0,0,0}, State#state.children), + + %% Reformat counts to a property list. + Reply = [{specs, Specs}, {active, Active}, + {supervisors, Supers}, {workers, Workers}], + {reply, Reply, State}. + + +count_child(#child{pid = Pid, child_type = worker}, + {Specs, Active, Supers, Workers}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> {Specs+1, Active+1, Supers, Workers+1}; + false -> {Specs+1, Active, Supers, Workers+1} + end; +count_child(#child{pid = Pid, child_type = supervisor}, + {Specs, Active, Supers, Workers}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> {Specs+1, Active+1, Supers+1, Workers}; + false -> {Specs+1, Active, Supers+1, Workers} + end. + + +%%% If a restart attempt failed, this message is sent via +%%% timer:apply_after(0,...) in order to give gen_server the chance to +%%% check it's inbox before trying again. +-ifdef(use_specs). +-spec handle_cast({try_again_restart, child_id() | pid(), term()}, state()) -> + {'noreply', state()} | {stop, shutdown, state()}. +-endif. +handle_cast({try_again_restart,Pid,Reason}, #state{children=[Child]}=State) + when ?is_simple(State) -> + RT = Child#child.restart_type, + RPid = restarting(Pid), + case dynamic_child_args(RPid, dynamics_db(RT, State#state.dynamics)) of + {ok, Args} -> + {M, F, _} = Child#child.mfargs, + NChild = Child#child{pid = RPid, mfargs = {M, F, Args}}, + try_restart(Child#child.restart_type, Reason, NChild, State); + error -> {noreply, State} end; +handle_cast({try_again_restart,Name,Reason}, State) -> + %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist + case lists:keysearch(Name,#child.name,State#state.children) of + {value, Child = #child{pid=?restarting(_), restart_type=RestartType}} -> + try_restart(RestartType, Reason, Child, State); + _ -> + {noreply,State} + end. + %% %% Take care of terminated children. %% +-ifdef(use_specs). +-spec handle_info(term(), state()) -> + {'noreply', state()} | {'stop', 'shutdown', state()}. +-endif. handle_info({'EXIT', Pid, Reason}, State) -> case restart_child(Pid, Reason, State) of {ok, State1} -> @@ -526,20 +678,34 @@ handle_info({'EXIT', Pid, Reason}, State) -> {stop, shutdown, State1} end; +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) + when ?is_simple(State) -> + try_restart(RestartType, Reason, Child, State); +handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> + case get_child(Child#child.name, State) of + {value, Child1} -> + try_restart(RestartType, Reason, Child1, State); + _What -> + {noreply, State} + end; + handle_info(Msg, State) -> - error_logger:error_msg("Supervisor received unexpected message: ~p~n", + error_logger:error_msg("Supervisor received unexpected message: ~p~n", [Msg]), {noreply, State}. + %% %% Terminate this server. %% -terminate(_Reason, State) when ?is_terminate_simple(State) -> - terminate_simple_children( - hd(State#state.children), State#state.dynamics, State#state.name), - ok; +-ifdef(use_specs). +-spec terminate(term(), state()) -> 'ok'. +-endif. +terminate(_Reason, #state{children=[Child]} = State) when ?is_simple(State) -> + terminate_dynamic_children(Child, dynamics_db(Child#child.restart_type, + State#state.dynamics), + State#state.name); terminate(_Reason, State) -> - terminate_children(State#state.children, State#state.name), - ok. + terminate_children(State#state.children, State#state.name). %% %% Change code for the supervisor. @@ -550,6 +716,10 @@ terminate(_Reason, State) -> %% NOTE: This requires that the init function of the call-back module %% does not have any side effects. %% +-ifdef(use_specs). +-spec code_change(term(), state(), term()) -> + {'ok', state()} | {'error', term()}. +-endif. code_change(_, State, _) -> case (State#state.module):init(State#state.args) of {ok, {SupFlags, StartSpec}} -> @@ -577,14 +747,13 @@ check_flags({Strategy, MaxIntensity, Period}) -> check_flags(What) -> {bad_flags, What}. -update_childspec(State, StartSpec) when ?is_simple(State) -> +update_childspec(State, StartSpec) when ?is_simple(State) -> case check_startspec(StartSpec) of {ok, [Child]} -> {ok, State#state{children = [Child]}}; Error -> {error, Error} end; - update_childspec(State, StartSpec) -> case check_startspec(StartSpec) of {ok, Children} -> @@ -603,11 +772,11 @@ update_childspec1([Child|OldC], Children, KeepOld) -> update_childspec1(OldC, Children, [Child|KeepOld]) end; update_childspec1([], Children, KeepOld) -> - % Return them in (keeped) reverse start order. + %% Return them in (kept) reverse start order. lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> - case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name -> + case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> Ch#child{pid = OldCh#child.pid}; (Ch) -> Ch @@ -618,7 +787,7 @@ update_chsp(OldCh, Children) -> NewC -> {ok, NewC} end. - + %%% --------------------------------------------------- %%% Start a new child. %%% --------------------------------------------------- @@ -627,20 +796,16 @@ handle_start_child(Child, State) -> case get_child(Child#child.name, State) of false -> case do_start_child(State#state.name, Child) of + {ok, undefined} when Child#child.restart_type =:= temporary -> + {{ok, undefined}, State}; {ok, Pid} -> - Children = State#state.children, - {{ok, Pid}, - State#state{children = - [Child#child{pid = Pid}|Children]}}; + {{ok, Pid}, save_child(Child#child{pid = Pid}, State)}; {ok, Pid, Extra} -> - Children = State#state.children, - {{ok, Pid, Extra}, - State#state{children = - [Child#child{pid = Pid}|Children]}}; + {{ok, Pid, Extra}, save_child(Child#child{pid = Pid}, State)}; {error, What} -> {{error, {What, Child}}, State} end; - {value, OldChild} when OldChild#child.pid =/= undefined -> + {value, OldChild} when is_pid(OldChild#child.pid) -> {{error, {already_started, OldChild#child.pid}}, State}; {value, _OldChild} -> {{error, already_present}, State} @@ -648,105 +813,145 @@ handle_start_child(Child, State) -> %%% --------------------------------------------------- %%% Restart. A process has terminated. -%%% Returns: {ok, #state} | {shutdown, #state} +%%% Returns: {ok, state()} | {shutdown, state()} %%% --------------------------------------------------- -restart_child(Pid, Reason, State) when ?is_simple(State) -> - case ?DICT:find(Pid, State#state.dynamics) of +restart_child(Pid, Reason, #state{children = [Child]} = State) when ?is_simple(State) -> + RestartType = Child#child.restart_type, + case dynamic_child_args(Pid, dynamics_db(RestartType, State#state.dynamics)) of {ok, Args} -> - [Child] = State#state.children, - RestartType = Child#child.restart_type, - {M, F, _} = Child#child.mfa, - NChild = Child#child{pid = Pid, mfa = {M, F, Args}}, + {M, F, _} = Child#child.mfargs, + NChild = Child#child{pid = Pid, mfargs = {M, F, Args}}, do_restart(RestartType, Reason, NChild, State); error -> - {ok, State} + {ok, State} end; + restart_child(Pid, Reason, State) -> Children = State#state.children, + %% we still support >= R12-B3 in which lists:keyfind/3 doesn't exist case lists:keysearch(Pid, #child.pid, Children) of - {value, Child} -> - RestartType = Child#child.restart_type, + {value, #child{restart_type = RestartType} = Child} -> do_restart(RestartType, Reason, Child, State); - _ -> + false -> {ok, State} end. -do_restart({permanent = RestartType, Delay}, Reason, Child, State) -> - do_restart_delay({RestartType, Delay}, Reason, Child, State); -do_restart(permanent, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), - restart(Child, State); -do_restart(Type, normal, Child, State) -> - del_child_and_maybe_shutdown(Type, Child, State); -do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State) - when RestartType =:= transient orelse RestartType =:= intrinsic -> - do_restart_delay({RestartType, Delay}, Reason, Child, State); -do_restart(Type, {shutdown, _}, Child, State) -> - del_child_and_maybe_shutdown(Type, Child, State); -do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) -> - del_child_and_maybe_shutdown(Type, Child, State); -do_restart({RestartType, Delay}, Reason, Child, State) - when RestartType =:= transient orelse RestartType =:= intrinsic -> - do_restart_delay({RestartType, Delay}, Reason, Child, State); -do_restart(Type, Reason, Child, State) when Type =:= transient orelse - Type =:= intrinsic -> - report_error(child_terminated, Reason, Child, State#state.name), +try_restart(RestartType, Reason, Child, State) -> + case handle_restart(RestartType, Reason, Child, State) of + {ok, NState} -> {noreply, NState}; + {shutdown, State2} -> {stop, shutdown, State2} + end. + +do_restart(RestartType, Reason, Child, State) -> + maybe_report_error(RestartType, Reason, Child, State), + handle_restart(RestartType, Reason, Child, State). + +maybe_report_error(permanent, Reason, Child, State) -> + report_child_termination(Reason, Child, State); +maybe_report_error({permanent, _}, Reason, Child, State) -> + report_child_termination(Reason, Child, State); +maybe_report_error(_Type, Reason, Child, State) -> + case is_abnormal_termination(Reason) of + true -> report_child_termination(Reason, Child, State); + false -> ok + end. + +report_child_termination(Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name). + +handle_restart(permanent, _Reason, Child, State) -> restart(Child, State); -do_restart(temporary, Reason, Child, State) -> - report_error(child_terminated, Reason, Child, State#state.name), - NState = state_del_child(Child, State), - {ok, NState}. +handle_restart(transient, Reason, Child, State) -> + restart_if_explicit_or_abnormal(fun restart/2, + fun delete_child_and_continue/2, + Reason, Child, State); +handle_restart(intrinsic, Reason, Child, State) -> + restart_if_explicit_or_abnormal(fun restart/2, + fun delete_child_and_stop/2, + Reason, Child, State); +handle_restart(temporary, _Reason, Child, State) -> + delete_child_and_continue(Child, State); +handle_restart({permanent, _Delay}=Restart, Reason, Child, State) -> + do_restart_delay(Restart, Reason, Child, State); +handle_restart({transient, _Delay}=Restart, Reason, Child, State) -> + restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason), + fun delete_child_and_continue/2, + Reason, Child, State); +handle_restart({intrinsic, _Delay}=Restart, Reason, Child, State) -> + restart_if_explicit_or_abnormal(defer_to_restart_delay(Restart, Reason), + fun delete_child_and_stop/2, + Reason, Child, State). + +restart_if_explicit_or_abnormal(RestartHow, Otherwise, Reason, Child, State) -> + case ?is_explicit_restart(Reason) orelse is_abnormal_termination(Reason) of + true -> RestartHow(Child, State); + false -> Otherwise(Child, State) + end. + +defer_to_restart_delay(Restart, Reason) -> + fun(Child, State) -> do_restart_delay(Restart, Reason, Child, State) end. + +delete_child_and_continue(Child, State) -> + {ok, state_del_child(Child, State)}. + +delete_child_and_stop(Child, State) -> + {shutdown, state_del_child(Child, State)}. + +is_abnormal_termination(normal) -> false; +is_abnormal_termination(shutdown) -> false; +is_abnormal_termination({shutdown, _}) -> false; +is_abnormal_termination(_Other) -> true. do_restart_delay({RestartType, Delay}, Reason, Child, State) -> - case restart1(Child, State) of + case add_restart(State) of {ok, NState} -> - {ok, NState}; - {terminate, NState} -> + maybe_restart(NState#state.strategy, Child, NState); + {terminate, _NState} -> + %% we've reached the max restart intensity, but the + %% add_restart will have added to the restarts + %% field. Given we don't want to die here, we need to go + %% back to the old restarts field otherwise we'll never + %% attempt to restart later, which is why we ignore + %% NState for this clause. _TRef = erlang:send_after(trunc(Delay*1000), self(), {delayed_restart, {{RestartType, Delay}, Reason, Child}}), - {ok, state_del_child(Child, NState)} + {ok, state_del_child(Child, State)} end. -del_child_and_maybe_shutdown(intrinsic, Child, State) -> - {shutdown, state_del_child(Child, State)}; -del_child_and_maybe_shutdown({intrinsic, _Delay}, Child, State) -> - {shutdown, state_del_child(Child, State)}; -del_child_and_maybe_shutdown(_, Child, State) -> - {ok, state_del_child(Child, State)}. - restart(Child, State) -> case add_restart(State) of {ok, NState} -> - restart(NState#state.strategy, Child, NState, fun restart/2); + maybe_restart(NState#state.strategy, Child, NState); {terminate, NState} -> report_error(shutdown, reached_max_restart_intensity, Child, State#state.name), - {shutdown, state_del_child(Child, NState)} + {shutdown, remove_child(Child, NState)} end. -restart1(Child, State) -> - case add_restart(State) of - {ok, NState} -> - restart(NState#state.strategy, Child, NState, fun restart1/2); - {terminate, _NState} -> - %% we've reached the max restart intensity, but the - %% add_restart will have added to the restarts - %% field. Given we don't want to die here, we need to go - %% back to the old restarts field otherwise we'll never - %% attempt to restart later. - {terminate, State} +maybe_restart(Strategy, Child, State) -> + case restart(Strategy, Child, State) of + {try_again, Reason, NState2} -> + %% Leaving control back to gen_server before + %% trying again. This way other incoming requsts + %% for the supervisor can be handled - e.g. a + %% shutdown request for the supervisor or the + %% child. + Id = if ?is_simple(State) -> Child#child.pid; + true -> Child#child.name + end, + timer:apply_after(0,?MODULE,try_again_restart,[self(),Id,Reason]), + {ok,NState2}; + Other -> + Other end. -restart(Strategy, Child, State, Restart) - when Strategy =:= simple_one_for_one orelse - Strategy =:= simple_one_for_one_terminate -> - #child{mfa = {M, F, A}} = Child, - Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), +restart(simple_one_for_one, Child, State) -> + #child{pid = OldPid, mfargs = {M, F, A}} = Child, + Dynamics = ?DICT:erase(OldPid, dynamics_db(Child#child.restart_type, + State#state.dynamics)), case do_start_child_i(M, F, A) of - {ok, undefined} -> - {ok, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; @@ -754,10 +959,13 @@ restart(Strategy, Child, State, Restart) NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; {error, Error} -> + NState = State#state{dynamics = ?DICT:store(restarting(OldPid), A, + Dynamics)}, report_error(start_error, Error, Child, State#state.name), - Restart(Child, State) + {try_again, Error, NState} end; -restart(one_for_one, Child, State, Restart) -> +restart(one_for_one, Child, State) -> + OldPid = Child#child.pid, case do_start_child(State#state.name, Child) of {ok, Pid} -> NState = replace_child(Child#child{pid = Pid}, State), @@ -766,139 +974,83 @@ restart(one_for_one, Child, State, Restart) -> NState = replace_child(Child#child{pid = Pid}, State), {ok, NState}; {error, Reason} -> + NState = replace_child(Child#child{pid = restarting(OldPid)}, State), report_error(start_error, Reason, Child, State#state.name), - Restart(Child, State) + {try_again, Reason, NState} end; -restart(rest_for_one, Child, State, Restart) -> +restart(rest_for_one, Child, State) -> {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children), ChAfter2 = terminate_children(ChAfter, State#state.name), case start_children(ChAfter2, State#state.name) of {ok, ChAfter3} -> {ok, State#state{children = ChAfter3 ++ ChBefore}}; - {error, ChAfter3} -> - Restart(Child, State#state{children = ChAfter3 ++ ChBefore}) + {error, ChAfter3, Reason} -> + NChild = Child#child{pid=restarting(Child#child.pid)}, + NState = State#state{children = ChAfter3 ++ ChBefore}, + {try_again, Reason, replace_child(NChild,NState)} end; -restart(one_for_all, Child, State, Restart) -> +restart(one_for_all, Child, State) -> Children1 = del_child(Child#child.pid, State#state.children), Children2 = terminate_children(Children1, State#state.name), case start_children(Children2, State#state.name) of {ok, NChs} -> {ok, State#state{children = NChs}}; - {error, NChs} -> - Restart(Child, State#state{children = NChs}) + {error, NChs, Reason} -> + NChild = Child#child{pid=restarting(Child#child.pid)}, + NState = State#state{children = NChs}, + {try_again, Reason, replace_child(NChild,NState)} end. +restarting(Pid) when is_pid(Pid) -> ?restarting(Pid); +restarting(RPid) -> RPid. + %%----------------------------------------------------------------- %% Func: terminate_children/2 -%% Args: Children = [#child] in termination order +%% Args: Children = [child_rec()] in termination order %% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} -%% Returns: NChildren = [#child] in +%% Returns: NChildren = [child_rec()] in %% startup order (reversed termination order) %%----------------------------------------------------------------- terminate_children(Children, SupName) -> terminate_children(Children, SupName, []). +%% Temporary children should not be restarted and thus should +%% be skipped when building the list of terminated children, although +%% we do want them to be shut down as many functions from this module +%% use this function to just clear everything. +terminate_children([Child = #child{restart_type=temporary} | Children], SupName, Res) -> + do_terminate(Child, SupName), + terminate_children(Children, SupName, Res); terminate_children([Child | Children], SupName, Res) -> NChild = do_terminate(Child, SupName), terminate_children(Children, SupName, [NChild | Res]); terminate_children([], _SupName, Res) -> Res. -terminate_simple_children(Child, Dynamics, SupName) -> - Pids = dict:fold(fun (Pid, _Args, Pids) -> - erlang:monitor(process, Pid), - unlink(Pid), - exit(Pid, child_exit_reason(Child)), - [Pid | Pids] - end, [], Dynamics), - TimeoutMsg = {timeout, make_ref()}, - TRef = timeout_start(Child, TimeoutMsg), - {Replies, Timedout} = - lists:foldl( - fun (_Pid, {Replies, Timedout}) -> - {Pid1, Reason1, Timedout1} = - receive - TimeoutMsg -> - Remaining = Pids -- [P || {P, _} <- Replies], - [exit(P, kill) || P <- Remaining], - receive - {'DOWN', _MRef, process, Pid, Reason} -> - {Pid, Reason, true} - end; - {'DOWN', _MRef, process, Pid, Reason} -> - {Pid, Reason, Timedout} - end, - {[{Pid1, child_res(Child, Reason1, Timedout1)} | Replies], - Timedout1} - end, {[], false}, Pids), - timeout_stop(Child, TRef, TimeoutMsg, Timedout), - ReportError = shutdown_error_reporter(SupName), - Report = fun(_, ok) -> ok; - (Pid, {error, R}) -> ReportError(R, Child#child{pid = Pid}) - end, - [receive - {'EXIT', Pid, Reason} -> - Report(Pid, child_res(Child, Reason, Timedout)) - after - 0 -> Report(Pid, Reply) - end || {Pid, Reply} <- Replies], - ok. - -child_exit_reason(#child{shutdown = brutal_kill}) -> kill; -child_exit_reason(#child{}) -> shutdown. - -child_res(#child{shutdown=brutal_kill}, killed, false) -> ok; -child_res(#child{}, shutdown, false) -> ok; -child_res(#child{restart_type=permanent}, normal, false) -> {error, normal}; -child_res(#child{restart_type={permanent,_}},normal, false) -> {error, normal}; -child_res(#child{}, normal, false) -> ok; -child_res(#child{}, R, _) -> {error, R}. - -timeout_start(#child{shutdown = Time}, Msg) when is_integer(Time) -> - erlang:send_after(Time, self(), Msg); -timeout_start(#child{}, _Msg) -> - ok. - -timeout_stop(#child{shutdown = Time}, TRef, Msg, false) when is_integer(Time) -> - erlang:cancel_timer(TRef), - receive - Msg -> ok - after - 0 -> ok - end; -timeout_stop(#child{}, _TRef, _Msg, _Timedout) -> - ok. - -do_terminate(Child, SupName) when Child#child.pid =/= undefined -> - ReportError = shutdown_error_reporter(SupName), +do_terminate(Child, SupName) when is_pid(Child#child.pid) -> case shutdown(Child#child.pid, Child#child.shutdown) of ok -> ok; - {error, normal} -> - case Child#child.restart_type of - permanent -> ReportError(normal, Child); - {permanent, _Delay} -> ReportError(normal, Child); - _ -> ok - end; + {error, normal} when not ?is_permanent(Child#child.restart_type) -> + ok; {error, OtherReason} -> - ReportError(OtherReason, Child) + report_error(shutdown_error, OtherReason, Child, SupName) end, Child#child{pid = undefined}; do_terminate(Child, _SupName) -> - Child. + Child#child{pid = undefined}. %%----------------------------------------------------------------- -%% Shutdowns a child. We must check the EXIT value +%% Shutdowns a child. We must check the EXIT value %% of the child, because it might have died with another reason than -%% the wanted. In that case we want to report the error. We put a -%% monitor on the child an check for the 'DOWN' message instead of -%% checking for the 'EXIT' message, because if we check the 'EXIT' -%% message a "naughty" child, who does unlink(Sup), could hang the -%% supervisor. +%% the wanted. In that case we want to report the error. We put a +%% monitor on the child an check for the 'DOWN' message instead of +%% checking for the 'EXIT' message, because if we check the 'EXIT' +%% message a "naughty" child, who does unlink(Sup), could hang the +%% supervisor. %% Returns: ok | {error, OtherReason} (this should be reported) %%----------------------------------------------------------------- shutdown(Pid, brutal_kill) -> - case monitor_child(Pid) of ok -> exit(Pid, kill), @@ -908,16 +1060,14 @@ shutdown(Pid, brutal_kill) -> {'DOWN', _MRef, process, Pid, OtherReason} -> {error, OtherReason} end; - {error, Reason} -> + {error, Reason} -> {error, Reason} end; - shutdown(Pid, Time) -> - case monitor_child(Pid) of ok -> exit(Pid, shutdown), %% Try to shutdown gracefully - receive + receive {'DOWN', _MRef, process, Pid, shutdown} -> ok; {'DOWN', _MRef, process, Pid, OtherReason} -> @@ -929,14 +1079,14 @@ shutdown(Pid, Time) -> {error, OtherReason} end end; - {error, Reason} -> + {error, Reason} -> {error, Reason} end. %% Help function to shutdown/2 switches from link to monitor approach monitor_child(Pid) -> - - %% Do the monitor operation first so that if the child dies + + %% Do the monitor operation first so that if the child dies %% before the monitoring is done causing a 'DOWN'-message with %% reason noproc, we will get the real reason in the 'EXIT'-message %% unless a naughty child has already done unlink... @@ -946,34 +1096,177 @@ monitor_child(Pid) -> receive %% If the child dies before the unlik we must empty %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. - {'EXIT', Pid, Reason} -> - receive + {'EXIT', Pid, Reason} -> + receive {'DOWN', _, process, Pid, _} -> {error, Reason} end - after 0 -> + after 0 -> %% If a naughty child did unlink and the child dies before - %% monitor the result will be that shutdown/2 receives a + %% monitor the result will be that shutdown/2 receives a %% 'DOWN'-message with reason noproc. %% If the child should die after the unlink there %% will be a 'DOWN'-message with a correct reason - %% that will be handled in shutdown/2. - ok + %% that will be handled in shutdown/2. + ok end. %%----------------------------------------------------------------- +%% Func: terminate_dynamic_children/3 +%% Args: Child = child_rec() +%% Dynamics = ?DICT() | ?SET() +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Returns: ok +%% +%% +%% Shutdown all dynamic children. This happens when the supervisor is +%% stopped. Because the supervisor can have millions of dynamic children, we +%% can have an significative overhead here. +%%----------------------------------------------------------------- +terminate_dynamic_children(Child, Dynamics, SupName) -> + {Pids, EStack0} = monitor_dynamic_children(Child, Dynamics), + Sz = ?SETS:size(Pids), + EStack = case Child#child.shutdown of + brutal_kill -> + ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); + infinity -> + ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz, undefined, EStack0); + Time -> + ?SETS:fold(fun(P, _) -> exit(P, shutdown) end, ok, Pids), + TRef = erlang:start_timer(Time, self(), kill), + wait_dynamic_children(Child, Pids, Sz, TRef, EStack0) + end, + %% Unroll stacked errors and report them + ?DICT:fold(fun(Reason, Ls, _) -> + report_error(shutdown_error, Reason, + Child#child{pid=Ls}, SupName) + end, ok, EStack). + + +monitor_dynamic_children(#child{restart_type=temporary}, Dynamics) -> + ?SETS:fold(fun(P, {Pids, EStack}) -> + case monitor_child(P) of + ok -> + {?SETS:add_element(P, Pids), EStack}; + {error, normal} -> + {Pids, EStack}; + {error, Reason} -> + {Pids, ?DICT:append(Reason, P, EStack)} + end + end, {?SETS:new(), ?DICT:new()}, Dynamics); +monitor_dynamic_children(#child{restart_type=RType}, Dynamics) -> + ?DICT:fold(fun(P, _, {Pids, EStack}) when is_pid(P) -> + case monitor_child(P) of + ok -> + {?SETS:add_element(P, Pids), EStack}; + {error, normal} when not ?is_permanent(RType) -> + {Pids, EStack}; + {error, Reason} -> + {Pids, ?DICT:append(Reason, P, EStack)} + end; + (?restarting(_), _, {Pids, EStack}) -> + {Pids, EStack} + end, {?SETS:new(), ?DICT:new()}, Dynamics). + +wait_dynamic_children(_Child, _Pids, 0, undefined, EStack) -> + EStack; +wait_dynamic_children(_Child, _Pids, 0, TRef, EStack) -> + %% If the timer has expired before its cancellation, we must empty the + %% mail-box of the 'timeout'-message. + erlang:cancel_timer(TRef), + receive + {timeout, TRef, kill} -> + EStack + after 0 -> + EStack + end; +wait_dynamic_children(#child{shutdown=brutal_kill} = Child, Pids, Sz, + TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, killed} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, ?DICT:append(Reason, Pid, EStack)) + end; +wait_dynamic_children(#child{restart_type=RType} = Child, Pids, Sz, + TRef, EStack) -> + receive + {'DOWN', _MRef, process, Pid, shutdown} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, normal} when not ?is_permanent(RType) -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, EStack); + + {'DOWN', _MRef, process, Pid, Reason} -> + wait_dynamic_children(Child, ?SETS:del_element(Pid, Pids), Sz-1, + TRef, ?DICT:append(Reason, Pid, EStack)); + + {timeout, TRef, kill} -> + ?SETS:fold(fun(P, _) -> exit(P, kill) end, ok, Pids), + wait_dynamic_children(Child, Pids, Sz-1, undefined, EStack) + end. + +%%----------------------------------------------------------------- %% Child/State manipulating functions. %%----------------------------------------------------------------- -state_del_child(#child{pid = Pid}, State) when ?is_simple(State) -> - NDynamics = ?DICT:erase(Pid, State#state.dynamics), + +%% Note we do not want to save the parameter list for temporary processes as +%% they will not be restarted, and hence we do not need this information. +%% Especially for dynamic children to simple_one_for_one supervisors +%% it could become very costly as it is not uncommon to spawn +%% very many such processes. +save_child(#child{restart_type = temporary, + mfargs = {M, F, _}} = Child, #state{children = Children} = State) -> + State#state{children = [Child#child{mfargs = {M, F, undefined}} |Children]}; +save_child(Child, #state{children = Children} = State) -> + State#state{children = [Child |Children]}. + +save_dynamic_child(temporary, Pid, _, #state{dynamics = Dynamics} = State) -> + State#state{dynamics = ?SETS:add_element(Pid, dynamics_db(temporary, Dynamics))}; +save_dynamic_child(RestartType, Pid, Args, #state{dynamics = Dynamics} = State) -> + State#state{dynamics = ?DICT:store(Pid, Args, dynamics_db(RestartType, Dynamics))}. + +dynamics_db(temporary, undefined) -> + ?SETS:new(); +dynamics_db(_, undefined) -> + ?DICT:new(); +dynamics_db(_,Dynamics) -> + Dynamics. + +dynamic_child_args(Pid, Dynamics) -> + case ?SETS:is_set(Dynamics) of + true -> + {ok, undefined}; + false -> + ?DICT:find(Pid, Dynamics) + end. + +state_del_child(#child{pid = Pid, restart_type = temporary}, State) when ?is_simple(State) -> + NDynamics = ?SETS:del_element(Pid, dynamics_db(temporary, State#state.dynamics)), + State#state{dynamics = NDynamics}; +state_del_child(#child{pid = Pid, restart_type = RType}, State) when ?is_simple(State) -> + NDynamics = ?DICT:erase(Pid, dynamics_db(RType, State#state.dynamics)), State#state{dynamics = NDynamics}; state_del_child(Child, State) -> NChildren = del_child(Child#child.name, State#state.children), State#state{children = NChildren}. +del_child(Name, [Ch=#child{pid = ?restarting(_)}|_]=Chs) when Ch#child.name =:= Name -> + Chs; +del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name, Ch#child.restart_type =:= temporary -> + Chs; del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name -> [Ch#child{pid = undefined} | Chs]; +del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid, Ch#child.restart_type =:= temporary -> + Chs; del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid -> [Ch#child{pid = undefined} | Chs]; del_child(Name, [Ch|Chs]) -> @@ -996,7 +1289,38 @@ split_child(_, [], After) -> {lists:reverse(After), []}. get_child(Name, State) -> + get_child(Name, State, false). +get_child(Pid, State, AllowPid) when AllowPid, is_pid(Pid) -> + get_dynamic_child(Pid, State); +get_child(Name, State, _) -> lists:keysearch(Name, #child.name, State#state.children). + +get_dynamic_child(Pid, #state{children=[Child], dynamics=Dynamics}) -> + DynamicsDb = dynamics_db(Child#child.restart_type, Dynamics), + case is_dynamic_pid(Pid, DynamicsDb) of + true -> + {value, Child#child{pid=Pid}}; + false -> + RPid = restarting(Pid), + case is_dynamic_pid(RPid, DynamicsDb) of + true -> + {value, Child#child{pid=RPid}}; + false -> + case erlang:is_process_alive(Pid) of + true -> false; + false -> {value, Child} + end + end + end. + +is_dynamic_pid(Pid, Dynamics) -> + case ?SETS:is_set(Dynamics) of + true -> + ?SETS:is_element(Pid, Dynamics); + false -> + ?DICT:is_key(Pid, Dynamics) + end. + replace_child(Child, State) -> Chs = do_replace_child(Child, State#state.children), State#state{children = Chs}. @@ -1016,12 +1340,12 @@ remove_child(Child, State) -> %% Type = {Strategy, MaxIntensity, Period} %% Strategy = one_for_one | one_for_all | simple_one_for_one | %% rest_for_one -%% MaxIntensity = integer() -%% Period = integer() +%% MaxIntensity = integer() >= 0 +%% Period = integer() > 0 %% Mod :== atom() -%% Arsg :== term() +%% Args :== term() %% Purpose: Check that Type is of correct type (!) -%% Returns: {ok, #state} | Error +%% Returns: {ok, state()} | Error %%----------------------------------------------------------------- init_state(SupName, Type, Mod, Args) -> case catch init_state1(SupName, Type, Mod, Args) of @@ -1036,46 +1360,45 @@ init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) -> validIntensity(MaxIntensity), validPeriod(Period), {ok, #state{name = supname(SupName,Mod), - strategy = Strategy, - intensity = MaxIntensity, - period = Period, - module = Mod, - args = Args}}; + strategy = Strategy, + intensity = MaxIntensity, + period = Period, + module = Mod, + args = Args}}; init_state1(_SupName, Type, _, _) -> {invalid_type, Type}. -validStrategy(simple_one_for_one_terminate) -> true; -validStrategy(simple_one_for_one) -> true; -validStrategy(one_for_one) -> true; -validStrategy(one_for_all) -> true; -validStrategy(rest_for_one) -> true; -validStrategy(What) -> throw({invalid_strategy, What}). +validStrategy(simple_one_for_one) -> true; +validStrategy(one_for_one) -> true; +validStrategy(one_for_all) -> true; +validStrategy(rest_for_one) -> true; +validStrategy(What) -> throw({invalid_strategy, What}). validIntensity(Max) when is_integer(Max), Max >= 0 -> true; -validIntensity(What) -> throw({invalid_intensity, What}). +validIntensity(What) -> throw({invalid_intensity, What}). validPeriod(Period) when is_integer(Period), Period > 0 -> true; validPeriod(What) -> throw({invalid_period, What}). -supname(self,Mod) -> {self(),Mod}; -supname(N,_) -> N. +supname(self, Mod) -> {self(), Mod}; +supname(N, _) -> N. %%% ------------------------------------------------------ %%% Check that the children start specification is valid. %%% Shall be a six (6) tuple %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom -%%% Func is {Mod, Fun, Args} == {atom, atom, list} +%%% Func is {Mod, Fun, Args} == {atom(), atom(), list()} %%% RestartType is permanent | temporary | transient | %%% intrinsic | {permanent, Delay} | %%% {transient, Delay} | {intrinsic, Delay} %% where Delay >= 0 -%%% Shutdown = integer() | infinity | brutal_kill +%%% Shutdown = integer() > 0 | infinity | brutal_kill %%% ChildType = supervisor | worker %%% Modules = [atom()] | dynamic -%%% Returns: {ok, [#child]} | Error +%%% Returns: {ok, [child_rec()]} | Error %%% ------------------------------------------------------ check_startspec(Children) -> check_startspec(Children, []). @@ -1103,7 +1426,7 @@ check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) -> validChildType(ChildType), validShutdown(Shutdown, ChildType), validMods(Mods), - {ok, #child{name = Name, mfa = Func, restart_type = RestartType, + {ok, #child{name = Name, mfargs = Func, restart_type = RestartType, shutdown = Shutdown, child_type = ChildType, modules = Mods}}. validChildType(supervisor) -> true; @@ -1112,8 +1435,8 @@ validChildType(What) -> throw({invalid_child_type, What}). validName(_Name) -> true. -validFunc({M, F, A}) when is_atom(M), - is_atom(F), +validFunc({M, F, A}) when is_atom(M), + is_atom(F), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). @@ -1131,15 +1454,15 @@ validDelay(Delay) when is_number(Delay), Delay >= 0 -> true; validDelay(What) -> throw({invalid_delay, What}). -validShutdown(Shutdown, _) +validShutdown(Shutdown, _) when is_integer(Shutdown), Shutdown > 0 -> true; -validShutdown(infinity, supervisor) -> true; +validShutdown(infinity, _) -> true; validShutdown(brutal_kill, _) -> true; validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). validMods(dynamic) -> true; validMods(Mods) when is_list(Mods) -> - lists:foreach(fun (Mod) -> + lists:foreach(fun(Mod) -> if is_atom(Mod) -> ok; true -> throw({invalid_module, Mod}) @@ -1157,7 +1480,7 @@ validMods(Mods) -> throw({invalid_modules, Mods}). %%% Returns: {ok, State'} | {terminate, State'} %%% ------------------------------------------------------ -add_restart(State) -> +add_restart(State) -> I = State#state.intensity, P = State#state.period, R = State#state.restarts, @@ -1213,15 +1536,18 @@ report_error(Error, Reason, Child, SupName) -> {offender, extract_child(Child)}], error_logger:error_report(supervisor_report, ErrorMsg). -shutdown_error_reporter(SupName) -> - fun(Reason, Child) -> - report_error(shutdown_error, Reason, Child, SupName) - end. +extract_child(Child) when is_list(Child#child.pid) -> + [{nb_children, length(Child#child.pid)}, + {name, Child#child.name}, + {mfargs, Child#child.mfargs}, + {restart_type, Child#child.restart_type}, + {shutdown, Child#child.shutdown}, + {child_type, Child#child.child_type}]; extract_child(Child) -> [{pid, Child#child.pid}, {name, Child#child.name}, - {mfa, Child#child.mfa}, + {mfargs, Child#child.mfargs}, {restart_type, Child#child.restart_type}, {shutdown, Child#child.shutdown}, {child_type, Child#child.child_type}]. diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl index a841b1f0..5a47e309 100644 --- a/src/supervisor2_tests.erl +++ b/src/supervisor2_tests.erl @@ -65,6 +65,6 @@ init([Timeout]) -> [{local, ?MODULE}, ?MODULE, []]}, transient, Timeout, supervisor, [?MODULE]}]}}; init([]) -> - {ok, {{simple_one_for_one_terminate, 0, 1}, + {ok, {{simple_one_for_one, 0, 1}, [{test_worker, {?MODULE, start_link, []}, temporary, 1000, worker, [?MODULE]}]}}. diff --git a/src/test_sup.erl b/src/test_sup.erl index 51ff7b4e..da325f1e 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -34,7 +34,7 @@ %%---------------------------------------------------------------------------- test_supervisor_delayed_restart() -> - passed = with_sup(simple_one_for_one_terminate, + passed = with_sup(simple_one_for_one, fun (SupPid) -> {ok, _ChildPid} = supervisor2:start_child(SupPid, []), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index d60b7fec..a07f6c65 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -263,29 +263,11 @@ get_total_memory({unix,openbsd}) -> sysctl("hw.usermem"); get_total_memory({win32,_OSname}) -> - %% Due to the Erlang print format bug, on Windows boxes the memory - %% size is broken. For example Windows 7 64 bit with 4Gigs of RAM - %% we get negative memory size: - %% > os_mon_sysinfo:get_mem_info(). - %% ["76 -1658880 1016913920 -1 -1021628416 2147352576 2134794240\n"] - %% Due to this bug, we don't actually know anything. Even if the - %% number is postive we can't be sure if it's correct. This only - %% affects us on os_mon versions prior to 2.2.1. - case application:get_key(os_mon, vsn) of - undefined -> - unknown; - {ok, Version} -> - case rabbit_misc:version_compare(Version, "2.2.1", lt) of - true -> %% os_mon is < 2.2.1, so we know nothing - unknown; - false -> - [Result|_] = os_mon_sysinfo:get_mem_info(), - {ok, [_MemLoad, TotPhys, _AvailPhys, - _TotPage, _AvailPage, _TotV, _AvailV], _RestStr} = - io_lib:fread("~d~d~d~d~d~d~d", Result), - TotPhys - end - end; + [Result|_] = os_mon_sysinfo:get_mem_info(), + {ok, [_MemLoad, TotPhys, _AvailPhys, _TotPage, _AvailPage, _TotV, _AvailV], + _RestStr} = + io_lib:fread("~d~d~d~d~d~d~d", Result), + TotPhys; get_total_memory({unix, linux}) -> File = read_proc_file("/proc/meminfo"), |