summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml8
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl23
4 files changed, 54 insertions, 10 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 71764522..8ad476b2 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -860,6 +860,14 @@
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>mirror_nodes</term>
+ <listitem><para>If the queue is mirrored, the nodes upon which mirrors will be present if the nodes are part of the current cluster.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>slaves</term>
+ <listitem><para>If the queue is mirrored, this gives the status of slaves.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e388ccf2..cb8a485e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -74,8 +74,8 @@
messages,
consumers,
memory,
- backing_queue_status,
- slave_pids
+ slaves,
+ backing_queue_status
]).
-define(CREATION_EVENT_KEYS,
@@ -802,14 +802,26 @@ i(consumers, State) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- SPids;
i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
{ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name),
- MNodes;
+ case MNodes of
+ undefined -> '';
+ _ -> MNodes
+ end;
+i(slaves, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ '';
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ [Result || {_Pid, Result} <- Results]
+ end;
+i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6eb1aaba..548ad7fa 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -419,6 +419,11 @@ format_info_item([T | _] = Value)
"[" ++
lists:nthtail(2, lists:append(
[", " ++ format_info_item(E) || E <- Value])) ++ "]";
+format_info_item(Value) when is_tuple(Value) ->
+ List = tuple_to_list(Value),
+ "{" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- List])) ++ "}";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a4a40a8c..bae68cbe 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -33,7 +33,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2]).
+-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,13 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
+-define(STATISTICS_KEYS,
+ [pid,
+ is_synchronised
+ ]).
+
+-define(INFO_KEYS, ?STATISTICS_KEYS).
+
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -75,6 +82,9 @@ start_link(Q) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+info(QPid) ->
+ gen_server2:call(QPid, info, infinity).
+
init([#amqqueue { name = QueueName } = Q]) ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -181,7 +191,10 @@ handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
handle_call({commit, _Txn, _ChPid}, _From, State) ->
%% We don't support transactions in mirror queues
- reply(ok, State).
+ reply(ok, State);
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -337,6 +350,12 @@ inform_deaths(SPid, Deaths) ->
%% Others
%% ---------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _State) -> self();
+i(is_synchronised, State) -> State #state.synchronised;
+i(Item, _State) -> throw({bad_argument, Item}).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,