diff options
-rw-r--r-- | src/rabbit_vm.erl | 184 |
1 files changed, 136 insertions, 48 deletions
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index b3e9ec66..c28b0cd5 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -33,18 +33,23 @@ %% Like erlang:memory(), but with awareness of rabbit-y things memory() -> - Conns = (sup_memory(rabbit_tcp_client_sup) + - sup_memory(ssl_connection_sup) + - sup_memory(amqp_sup)), - Qs = (sup_memory(rabbit_amqqueue_sup) + - sup_memory(rabbit_mirror_queue_slave_sup)), + ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup], + QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup], + MsgIndexProcs = [msg_store_transient, msg_store_persistent], + MgmtDbProcs = [rabbit_mgmt_sup], + PluginProcs = plugin_sups(), + + All = [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs], + + {Sums, _Other} = sum_processes(lists:append(All), [memory]), + + [Conns, Qs, MsgIndexProc, MgmtDbProc, AllPlugins] = + [aggregate_memory(Names, Sums) || Names <- All], + Mnesia = mnesia_memory(), MsgIndexETS = ets_memory(rabbit_msg_store_ets_index), - MsgIndexProc = (pid_memory(msg_store_transient) + - pid_memory(msg_store_persistent)), MgmtDbETS = ets_memory(rabbit_mgmt_db), - MgmtDbProc = sup_memory(rabbit_mgmt_sup), - Plugins = plugin_memory() - MgmtDbProc, + Plugins = AllPlugins - MgmtDbProc, [{total, Total}, {processes, Processes}, @@ -55,7 +60,7 @@ memory() -> {system, System}] = erlang:memory([total, processes, ets, atom, binary, code, system]), - OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins, + OtherProc = Processes - Conns - Qs - MsgIndexProc - AllPlugins, [{total, Total}, {connection_procs, Conns}, @@ -78,35 +83,6 @@ memory() -> %%---------------------------------------------------------------------------- -sup_memory(Sup) -> - lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) + - pid_memory(Sup). - -sup_children(Sup) -> - rabbit_misc:with_exit_handler( - rabbit_misc:const([]), - fun () -> - %% Just in case we end up talking to something that is - %% not a supervisor by mistake. - case supervisor:which_children(Sup) of - L when is_list(L) -> L; - _ -> [] - end - end). - -pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of - {memory, M} -> M; - _ -> 0 - end; -pid_memory(Name) when is_atom(Name) -> case whereis(Name) of - P when is_pid(P) -> pid_memory(P); - _ -> 0 - end. - -child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid); -child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid); -child_memory(_, _) -> 0. - mnesia_memory() -> case mnesia:system_info(is_running) of yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) || @@ -121,20 +97,132 @@ ets_memory(Name) -> bytes(Words) -> Words * erlang:system_info(wordsize). -plugin_memory() -> - lists:sum([plugin_memory(App) || - {App, _, _} <- application:which_applications(), - is_plugin(atom_to_list(App))]). +plugin_sups() -> + lists:append([plugin_sup(App) || + {App, _, _} <- application:which_applications(), + is_plugin(atom_to_list(App))]). -plugin_memory(App) -> +plugin_sup(App) -> case application_controller:get_master(App) of - undefined -> 0; + undefined -> []; Master -> case application_master:get_child(Master) of - {Pid, _} when is_pid(Pid) -> sup_memory(Pid); - Pid when is_pid(Pid) -> sup_memory(Pid); - _ -> 0 + {Pid, _} when is_pid(Pid) -> [process_name(Pid)]; + Pid when is_pid(Pid) -> [process_name(Pid)]; + _ -> [] end end. +process_name(Pid) -> + case process_info(Pid, registered_name) of + {registered_name, Name} -> Name; + _ -> Pid + end. + is_plugin("rabbitmq_" ++ _) -> true; is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS). + +aggregate_memory(Names, Sums) -> + lists:sum([extract_memory(Name, Sums) || Name <- Names]). + +extract_memory(Name, Sums) -> + {value, {_, Accs}} = lists:keysearch(Name, 1, Sums), + {value, {memory, V}} = lists:keysearch(memory, 1, Accs), + V. + +%%---------------------------------------------------------------------------- + +%% NB: this code is non-rabbit specific. + +-ifdef(use_specs). +-type(process() :: pid() | atom()). +-type(info_key() :: atom()). +-type(info_value() :: any()). +-type(info_item() :: {info_key(), info_value()}). +-type(accumulate() :: fun ((info_key(), info_value(), info_value()) -> + info_value())). +-spec(sum_processes/2 :: ([process()], [info_key()]) -> + {[{process(), [info_item()]}], [info_item()]}). +-spec(sum_processes/3 :: ([process()], accumulate(), [info_item()]) -> + {[{process(), [info_item()]}], [info_item()]}). +-endif. + +sum_processes(Names, Items) -> + sum_processes(Names, fun (_, X, Y) -> X + Y end, + [{Item, 0} || Item <- Items]). + +%% summarize the process_info of all processes based on their +%% '$ancestor' hierarchy, recorded in their process dictionary. +%% +%% The function takes +%% +%% 1) a list of names/pids of processes that are accumulation points +%% in the hierarchy. +%% +%% 2) a function that aggregates individual info items -taking the +%% info item key, value and accumulated value as the input and +%% producing a new accumulated value. +%% +%% 3) a list of info item key / initial accumulator value pairs. +%% +%% The process_info of a process is accumulated at the nearest of its +%% ancestors that is mentioned in the first argument, or, if no such +%% ancestor exists or the ancestor information is absent, in a special +%% 'other' bucket. +%% +%% The result is a pair consisting of +%% +%% 1) a k/v list, containing for each of the accumulation names/pids a +%% list of info items, containing the accumulated data, and +%% +%% 2) the 'other' bucket - a list of info items containing the +%% accumulated data of all processes with no matching ancestors +%% +%% Note that this function operates on names as well as pids, but +%% these must match whatever is contained in the '$ancestor' process +%% dictionary entry. Generally that means for all registered processes +%% the name should be used. +sum_processes(Names, Fun, Acc0) -> + Items = [Item || {Item, _Val0} <- Acc0], + Acc0Dict = orddict:from_list(Acc0), + NameAccs0 = orddict:from_list([{Name, Acc0Dict} || Name <- Names]), + {NameAccs, OtherAcc} = + lists:foldl( + fun (Pid, Acc) -> + InfoItems = [registered_name, dictionary | Items], + case process_info(Pid, InfoItems) of + undefined -> + Acc; + [{registered_name, RegName}, {dictionary, D} | Vals] -> + %% see docs for process_info/2 for the + %% special handling of 'registered_name' + %% info items + Extra = case RegName of + [] -> []; + N -> [N] + end, + accumulate(find_ancestor(Extra, D, Names), Fun, + orddict:from_list(Vals), Acc) + end + end, {NameAccs0, Acc0Dict}, processes()), + %% these conversions aren't strictly necessary; we do them simply + %% for the sake of encapsulating the representation. + {[{Name, orddict:to_list(Accs)} || + {Name, Accs} <- orddict:to_list(NameAccs)], + orddict:to_list(OtherAcc)}. + +find_ancestor(Extra, D, Names) -> + Ancestors = case lists:keysearch('$ancestors', 1, D) of + {value, {_, Ancs}} -> Ancs; + false -> [] + end, + case lists:splitwith(fun (A) -> not lists:member(A, Names) end, + Extra ++ Ancestors) of + {_, []} -> undefined; + {_, [Name | _]} -> Name + end. + +accumulate(undefined, Fun, ValsDict, {NameAccs, OtherAcc}) -> + {NameAccs, orddict:merge(Fun, ValsDict, OtherAcc)}; +accumulate(Name, Fun, ValsDict, {NameAccs, OtherAcc}) -> + F = fun (NameAcc) -> orddict:merge(Fun, ValsDict, NameAcc) end, + {orddict:update(Name, F, NameAccs), OtherAcc}. |