diff options
1 files changed, 136 insertions, 48 deletions
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index b3e9ec66..c28b0cd5 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -33,18 +33,23 @@
%% Like erlang:memory(), but with awareness of rabbit-y things
memory() ->
- Conns = (sup_memory(rabbit_tcp_client_sup) +
- sup_memory(ssl_connection_sup) +
- sup_memory(amqp_sup)),
- Qs = (sup_memory(rabbit_amqqueue_sup) +
- sup_memory(rabbit_mirror_queue_slave_sup)),
+ ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup],
+ QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup],
+ MsgIndexProcs = [msg_store_transient, msg_store_persistent],
+ MgmtDbProcs = [rabbit_mgmt_sup],
+ PluginProcs = plugin_sups(),
+ All = [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs],
+ {Sums, _Other} = sum_processes(lists:append(All), [memory]),
+ [Conns, Qs, MsgIndexProc, MgmtDbProc, AllPlugins] =
+ [aggregate_memory(Names, Sums) || Names <- All],
Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
- MsgIndexProc = (pid_memory(msg_store_transient) +
- pid_memory(msg_store_persistent)),
MgmtDbETS = ets_memory(rabbit_mgmt_db),
- MgmtDbProc = sup_memory(rabbit_mgmt_sup),
- Plugins = plugin_memory() - MgmtDbProc,
+ Plugins = AllPlugins - MgmtDbProc,
[{total, Total},
{processes, Processes},
@@ -55,7 +60,7 @@ memory() ->
{system, System}] =
erlang:memory([total, processes, ets, atom, binary, code, system]),
- OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins,
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - AllPlugins,
[{total, Total},
{connection_procs, Conns},
@@ -78,35 +83,6 @@ memory() ->
-sup_memory(Sup) ->
- lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) +
- pid_memory(Sup).
-sup_children(Sup) ->
- rabbit_misc:with_exit_handler(
- rabbit_misc:const([]),
- fun () ->
- %% Just in case we end up talking to something that is
- %% not a supervisor by mistake.
- case supervisor:which_children(Sup) of
- L when is_list(L) -> L;
- _ -> []
- end
- end).
-pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
- {memory, M} -> M;
- _ -> 0
- end;
-pid_memory(Name) when is_atom(Name) -> case whereis(Name) of
- P when is_pid(P) -> pid_memory(P);
- _ -> 0
- end.
-child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid);
-child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid);
-child_memory(_, _) -> 0.
mnesia_memory() ->
case mnesia:system_info(is_running) of
yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
@@ -121,20 +97,132 @@ ets_memory(Name) ->
bytes(Words) -> Words * erlang:system_info(wordsize).
-plugin_memory() ->
- lists:sum([plugin_memory(App) ||
- {App, _, _} <- application:which_applications(),
- is_plugin(atom_to_list(App))]).
+plugin_sups() ->
+ lists:append([plugin_sup(App) ||
+ {App, _, _} <- application:which_applications(),
+ is_plugin(atom_to_list(App))]).
-plugin_memory(App) ->
+plugin_sup(App) ->
case application_controller:get_master(App) of
- undefined -> 0;
+ undefined -> [];
Master -> case application_master:get_child(Master) of
- {Pid, _} when is_pid(Pid) -> sup_memory(Pid);
- Pid when is_pid(Pid) -> sup_memory(Pid);
- _ -> 0
+ {Pid, _} when is_pid(Pid) -> [process_name(Pid)];
+ Pid when is_pid(Pid) -> [process_name(Pid)];
+ _ -> []
+process_name(Pid) ->
+ case process_info(Pid, registered_name) of
+ {registered_name, Name} -> Name;
+ _ -> Pid
+ end.
is_plugin("rabbitmq_" ++ _) -> true;
is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).
+aggregate_memory(Names, Sums) ->
+ lists:sum([extract_memory(Name, Sums) || Name <- Names]).
+extract_memory(Name, Sums) ->
+ {value, {_, Accs}} = lists:keysearch(Name, 1, Sums),
+ {value, {memory, V}} = lists:keysearch(memory, 1, Accs),
+ V.
+%% NB: this code is non-rabbit specific.
+-type(process() :: pid() | atom()).
+-type(info_key() :: atom()).
+-type(info_value() :: any()).
+-type(info_item() :: {info_key(), info_value()}).
+-type(accumulate() :: fun ((info_key(), info_value(), info_value()) ->
+ info_value())).
+-spec(sum_processes/2 :: ([process()], [info_key()]) ->
+ {[{process(), [info_item()]}], [info_item()]}).
+-spec(sum_processes/3 :: ([process()], accumulate(), [info_item()]) ->
+ {[{process(), [info_item()]}], [info_item()]}).
+sum_processes(Names, Items) ->
+ sum_processes(Names, fun (_, X, Y) -> X + Y end,
+ [{Item, 0} || Item <- Items]).
+%% summarize the process_info of all processes based on their
+%% '$ancestor' hierarchy, recorded in their process dictionary.
+%% The function takes
+%% 1) a list of names/pids of processes that are accumulation points
+%% in the hierarchy.
+%% 2) a function that aggregates individual info items -taking the
+%% info item key, value and accumulated value as the input and
+%% producing a new accumulated value.
+%% 3) a list of info item key / initial accumulator value pairs.
+%% The process_info of a process is accumulated at the nearest of its
+%% ancestors that is mentioned in the first argument, or, if no such
+%% ancestor exists or the ancestor information is absent, in a special
+%% 'other' bucket.
+%% The result is a pair consisting of
+%% 1) a k/v list, containing for each of the accumulation names/pids a
+%% list of info items, containing the accumulated data, and
+%% 2) the 'other' bucket - a list of info items containing the
+%% accumulated data of all processes with no matching ancestors
+%% Note that this function operates on names as well as pids, but
+%% these must match whatever is contained in the '$ancestor' process
+%% dictionary entry. Generally that means for all registered processes
+%% the name should be used.
+sum_processes(Names, Fun, Acc0) ->
+ Items = [Item || {Item, _Val0} <- Acc0],
+ Acc0Dict = orddict:from_list(Acc0),
+ NameAccs0 = orddict:from_list([{Name, Acc0Dict} || Name <- Names]),
+ {NameAccs, OtherAcc} =
+ lists:foldl(
+ fun (Pid, Acc) ->
+ InfoItems = [registered_name, dictionary | Items],
+ case process_info(Pid, InfoItems) of
+ undefined ->
+ Acc;
+ [{registered_name, RegName}, {dictionary, D} | Vals] ->
+ %% see docs for process_info/2 for the
+ %% special handling of 'registered_name'
+ %% info items
+ Extra = case RegName of
+ [] -> [];
+ N -> [N]
+ end,
+ accumulate(find_ancestor(Extra, D, Names), Fun,
+ orddict:from_list(Vals), Acc)
+ end
+ end, {NameAccs0, Acc0Dict}, processes()),
+ %% these conversions aren't strictly necessary; we do them simply
+ %% for the sake of encapsulating the representation.
+ {[{Name, orddict:to_list(Accs)} ||
+ {Name, Accs} <- orddict:to_list(NameAccs)],
+ orddict:to_list(OtherAcc)}.
+find_ancestor(Extra, D, Names) ->
+ Ancestors = case lists:keysearch('$ancestors', 1, D) of
+ {value, {_, Ancs}} -> Ancs;
+ false -> []
+ end,
+ case lists:splitwith(fun (A) -> not lists:member(A, Names) end,
+ Extra ++ Ancestors) of
+ {_, []} -> undefined;
+ {_, [Name | _]} -> Name
+ end.
+accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}) ->
+ {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)};
+accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}) ->
+ F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end,
+ {orddict:update(Name, F, NameAccs), OtherAcc}.