summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-12 13:53:10 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-12 13:53:10 +0100
commite36ee71399670a8fd83941c6d0e1fd78cb1072a0 (patch)
tree2e3dad700a891682db1ac6831ed8a4efb60caa81
parent39490ea6d727429572f56a6cd8cf4858ca857e1d (diff)
parent1a2e46cbc786aea76d77a05dc52a33d163d9b426 (diff)
downloadrabbitmq-server-e36ee71399670a8fd83941c6d0e1fd78cb1072a0.tar.gz
Merge in default
-rw-r--r--src/gatherer.erl51
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl25
-rw-r--r--src/rabbit_mirror_queue_misc.erl81
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_vm.erl129
10 files changed, 230 insertions, 91 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 98b36038..29d2d713 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]).
+-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -32,6 +32,7 @@
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
-spec(in/2 :: (pid(), any()) -> 'ok').
+-spec(sync_in/2 :: (pid(), any()) -> 'ok').
-spec(out/1 :: (pid()) -> {'value', any()} | 'empty').
-endif.
@@ -62,6 +63,9 @@ finish(Pid) ->
in(Pid, Value) ->
gen_server2:cast(Pid, {in, Value}).
+sync_in(Pid, Value) ->
+ gen_server2:call(Pid, {in, Value}, infinity).
+
out(Pid) ->
gen_server2:call(Pid, out, infinity).
@@ -78,19 +82,22 @@ handle_call(stop, _From, State) ->
handle_call(fork, _From, State = #gstate { forks = Forks }) ->
{reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
+handle_call({in, Value}, From, State) ->
+ {noreply, in(Value, From, State), hibernate};
+
handle_call(out, From, State = #gstate { forks = Forks,
values = Values,
blocked = Blocked }) ->
case queue:out(Values) of
+ {empty, _} when Forks == 0 ->
+ {reply, empty, State, hibernate};
{empty, _} ->
- case Forks of
- 0 -> {reply, empty, State, hibernate};
- _ -> {noreply,
- State #gstate { blocked = queue:in(From, Blocked) },
- hibernate}
- end;
- {{value, _Value} = V, NewValues} ->
- {reply, V, State #gstate { values = NewValues }, hibernate}
+ {noreply, State #gstate { blocked = queue:in(From, Blocked) },
+ hibernate};
+ {{value, {PendingIn, Value}}, NewValues} ->
+ reply(PendingIn, ok),
+ {reply, {value, Value}, State #gstate { values = NewValues },
+ hibernate}
end;
handle_call(Msg, _From, State) ->
@@ -107,15 +114,8 @@ handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
{noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
hibernate};
-handle_cast({in, Value}, State = #gstate { values = Values,
- blocked = Blocked }) ->
- {noreply, case queue:out(Blocked) of
- {empty, _} ->
- State #gstate { values = queue:in(Value, Values) };
- {{value, From}, NewBlocked} ->
- gen_server2:reply(From, {value, Value}),
- State #gstate { blocked = NewBlocked }
- end, hibernate};
+handle_cast({in, Value}, State) ->
+ {noreply, in(Value, undefined, State), hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -128,3 +128,18 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
+
+%%----------------------------------------------------------------------------
+
+in(Value, From, State = #gstate { values = Values, blocked = Blocked }) ->
+ case queue:out(Blocked) of
+ {empty, _} ->
+ State #gstate { values = queue:in({From, Value}, Values) };
+ {{value, PendingOut}, NewBlocked} ->
+ reply(From, ok),
+ gen_server2:reply(PendingOut, {value, Value}),
+ State #gstate { blocked = NewBlocked }
+ end.
+
+reply(undefined, _Reply) -> ok;
+reply(From, Reply) -> gen_server2:reply(From, Reply).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7b417b00..93808f84 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -364,7 +364,7 @@ status() ->
{running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}],
+ {memory, rabbit_vm:memory()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 30df2b5c..9706efbf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -543,16 +543,10 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
{{Message, Props#message_properties.delivered, AckTag},
true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
- {Duplicate, BQS1} ->
- %% if the message has previously been seen by the BQ then
- %% it must have been seen under the same circumstances as
- %% now: i.e. if it is now a deliver_immediately then it
- %% must have been before.
- {case Duplicate of
- published -> true;
- discarded -> false
- end,
- State#q{backing_queue_state = BQS1}}
+ {published, BQS1} ->
+ {true, State#q{backing_queue_state = BQS1}};
+ {discarded, BQS1} ->
+ {false, State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d69a6c3b..c6d17785 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -24,6 +24,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
@@ -117,7 +118,7 @@
%% first time the message id appears in the result of
%% drain_confirmed. All subsequent appearances of that message id will
%% be ignored.
--callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+-callback drain_confirmed(state()) -> {msg_ids(), state()}.
%% Drop messages from the head of the queue while the supplied predicate returns
%% true. Also accepts a boolean parameter that determines whether the messages
@@ -136,7 +137,7 @@
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
--callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback ack([ack()], state()) -> {msg_ids(), state()}.
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
@@ -144,7 +145,7 @@
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
--callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback requeue([ack()], state()) -> {msg_ids(), state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 40359da3..676cb519 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -344,10 +344,9 @@ handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, MPid, DeadPids, ExtraNodes} ->
+ {ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 15ab9424..d865d675 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -88,18 +88,19 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(Q, Recover, AsyncCallback) ->
+init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- init_with_existing_bq(Q, BQ, BQS).
+ State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
+ State.
-init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) ->
+init_with_existing_bq(Q, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), depth_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
- ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -148,7 +149,17 @@ stop_all_slaves(Reason, #state{gm = GM}) ->
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
- ok = gm:forget_group(proplists:get_value(group_name, Info)).
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ QName = proplists:get_value(group_name, Info),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { slave_pids = [] })
+ end),
+ ok = gm:forget_group(QName).
purge(State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9d701422..b6c229aa 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -31,11 +31,12 @@
-spec(remove_from_queue/2 ::
(rabbit_amqqueue:name(), [pid()])
- -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
+ -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+ (rabbit_amqqueue:name(), node()) ->
+ {'ok', atom()} | rabbit_types:error(any())).
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -59,7 +60,6 @@
remove_from_queue(QueueName, DeadGMPids) ->
DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes,
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -73,49 +73,52 @@ remove_from_queue(QueueName, DeadGMPids) ->
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, [], []};
+ {ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = store_updated_slaves(
- Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
- %% Sometimes a slave dying means we need
- %% to start more on other nodes -
- %% "exactly" mode can cause this to
- %% happen.
- {_, OldNodes} = actual_queue_nodes(Q1),
- {_, NewNodes} = suggested_queue_nodes(
- Q1, ClusterNodes),
- {ok, QPid1, [QPid | SPids] -- Alive,
- NewNodes -- OldNodes};
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, [], []}
+ {ok, QPid1, []}
end
end
end).
on_node_up() ->
- ClusterNodes = rabbit_mnesia:cluster_nodes(running),
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName}, QNames0) ->
+ fun (Q = #amqqueue{name = QName,
+ pid = Pid,
+ slave_pids = SPids}, QNames0) ->
+ %% We don't want to pass in the whole
+ %% cluster - we don't want a situation
+ %% where starting one node causes us to
+ %% decide to start a mirror on another
+ PossibleNodes0 = [node(P) || P <- [Pid | SPids]],
+ PossibleNodes =
+ case lists:member(node(), PossibleNodes0) of
+ true -> PossibleNodes0;
+ false -> [node() | PossibleNodes0]
+ end,
{_MNode, SNodes} = suggested_queue_nodes(
- Q, ClusterNodes),
+ Q, PossibleNodes),
case lists:member(node(), SNodes) of
true -> [QName | QNames0];
false -> QNames0
end
end, [], rabbit_queue)
end),
- [ok = add_mirror(QName, node()) || QName <- QNames],
+ [{ok, _} = add_mirror(QName, node()) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
@@ -141,7 +144,7 @@ drop_mirror(QName, MirrorNode) ->
end).
add_mirrors(QName, Nodes) ->
- [ok = add_mirror(QName, Node) || Node <- Nodes],
+ [{ok, _} = add_mirror(QName, Node) || Node <- Nodes],
ok.
add_mirror(QName, MirrorNode) ->
@@ -154,8 +157,7 @@ add_mirror(QName, MirrorNode) ->
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
true ->
- {error,{queue_already_mirrored_on_node,
- MirrorNode}};
+ {ok, already_mirrored};
false ->
start_child(Name, MirrorNode, Q)
end
@@ -171,20 +173,20 @@ start_child(Name, MirrorNode, Q) ->
{ok, undefined} ->
%% this means the mirror process was
%% already running on the given node.
- ok;
+ {ok, already_mirrored};
{ok, down} ->
%% Node went down between us deciding to start a mirror
%% and actually starting it. Which is fine.
- ok;
+ {ok, node_down};
{ok, SPid} ->
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
+ {ok, started};
{error, {{stale_master_pid, StalePid}, _}} ->
rabbit_log:warning("Detected stale HA master while adding "
"mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, StalePid]),
- ok;
+ {ok, stale_master};
{error, {{duplicate_live_master, _}=Err, _}} ->
Err;
Other ->
@@ -235,14 +237,14 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, ClusterNodes) ->
+suggested_queue_nodes(Q, PossibleNodes) ->
{MNode0, SNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, ClusterNodes).
+ {MNode, SNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -250,11 +252,11 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) ->
- {MNode, All -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
+ {MNode, Possible -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- Unavailable = Nodes -- All,
+ Unavailable = Nodes -- Possible,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -269,10 +271,10 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
%% crude form of load-balancing. TODO it would also be nice to
%% randomise the list of ones to remove when we have too many - but
%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) ->
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((All -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
@@ -320,13 +322,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
All = fun ({A,B}) -> [A|B] end,
OldNodes = All(actual_queue_nodes(OldQ)),
NewNodes = All(suggested_queue_nodes(NewQ)),
- %% When a mirror dies, remove_from_queue/2 might have to add new
- %% slaves (in "exactly" mode). It will check mnesia to see which
- %% slaves there currently are. If drop_mirror/2 is invoked first
- %% then when we end up in remove_from_queue/2 it will not see the
- %% slaves that add_mirror/2 will add, and also want to add them
- %% (even though we are not responding to the death of a
- %% mirror). Breakage ensues.
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 307f2b4f..61423202 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -181,25 +181,20 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids, ExtraNodes} ->
+ {ok, Pid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
gen_server2:reply(From, ok),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
node(Pid) =:= node() ->
%% we've become master
QueueState = promote_me(From, State),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
{become, rabbit_amqqueue_process, QueueState, hibernate};
true ->
%% master has changed to not us.
gen_server2:reply(From, ok),
- %% assertion, we don't need to add_mirrors/2 in this
- %% branch, see last clause in remove_from_queue/2
- [] = ExtraNodes,
erlang:monitor(process, Pid),
%% GM is lazy. So we know of the death of the
%% slave since it is a neighbour of ours, but
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6d6c648a..21f58154 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -537,7 +537,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
+ gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
new file mode 100644
index 00000000..53f3df18
--- /dev/null
+++ b/src/rabbit_vm.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_vm).
+
+-export([memory/0]).
+
+-define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
+ "rfc4627_jsonrpc"]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(memory/0 :: () -> rabbit_types:infos()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Like erlang:memory(), but with awareness of rabbit-y things
+memory() ->
+ Conns = (sup_memory(rabbit_tcp_client_sup) +
+ sup_memory(ssl_connection_sup) +
+ sup_memory(amqp_sup)),
+ Qs = (sup_memory(rabbit_amqqueue_sup) +
+ sup_memory(rabbit_mirror_queue_slave_sup)),
+ Mnesia = mnesia_memory(),
+ MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
+ MsgIndexProc = (pid_memory(msg_store_transient) +
+ pid_memory(msg_store_persistent)),
+ MgmtDbETS = ets_memory(rabbit_mgmt_db),
+ MgmtDbProc = sup_memory(rabbit_mgmt_sup),
+ Plugins = plugin_memory() - MgmtDbProc,
+
+ [{total, Total},
+ {processes, Processes},
+ {ets, ETS},
+ {atom, Atom},
+ {binary, Bin},
+ {code, Code},
+ {system, System}] =
+ erlang:memory([total, processes, ets, atom, binary, code, system]),
+
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins,
+
+ [{total, Total},
+ {connection_procs, Conns},
+ {queue_procs, Qs},
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
+ {mnesia, Mnesia},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+ {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+ {binary, Bin},
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Atom - Bin - Code}].
+
+%% [1] - erlang:memory(processes) can be less than the sum of its
+%% parts. Rather than display something nonsensical, just silence any
+%% claims about negative memory. See
+%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+
+%%----------------------------------------------------------------------------
+
+sup_memory(Sup) ->
+ lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) +
+ pid_memory(Sup).
+
+sup_children(Sup) ->
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+
+pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
+ {memory, M} -> M;
+ _ -> 0
+ end;
+pid_memory(Name) when is_atom(Name) -> case whereis(Name) of
+ P when is_pid(P) -> pid_memory(P);
+ _ -> 0
+ end.
+
+child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid);
+child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid);
+child_memory(_, _) -> 0.
+
+mnesia_memory() ->
+ case mnesia:system_info(is_running) of
+ yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
+ Tab <- mnesia:system_info(tables)]);
+ no -> 0
+ end.
+
+ets_memory(Name) ->
+ lists:sum([bytes(ets:info(T, memory)) || T <- ets:all(),
+ N <- [ets:info(T, name)],
+ N =:= Name]).
+
+bytes(Words) -> Words * erlang:system_info(wordsize).
+
+plugin_memory() ->
+ lists:sum([plugin_memory(App) ||
+ {App, _, _} <- application:which_applications(),
+ is_plugin(atom_to_list(App))]).
+
+plugin_memory(App) ->
+ case catch application_master:get_child(
+ application_controller:get_master(App)) of
+ {Pid, _} -> sup_memory(Pid);
+ _ -> 0
+ end.
+
+is_plugin("rabbitmq_" ++ _) -> true;
+is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).