summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-07-29 13:09:02 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-07-29 13:09:02 +0100
commit0fa2b4f6be0580f2fa76eb80d272ae3b37850ead (patch)
treeda3b148de307a2ed9bcaf4d10c59ab21b0ea7f56
parenta7638709c0fd64df1be6bb53a624487feb82289c (diff)
downloadrabbitmq-server-0fa2b4f6be0580f2fa76eb80d272ae3b37850ead.tar.gz
Make some of the more interesting bits of the VQ backing_queue_status an official part of the BQ API.
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_backing_queue.erl18
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_variable_queue.erl26
4 files changed, 39 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4082c53d..63b18655 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -103,7 +103,8 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
-info_keys() -> ?INFO_KEYS.
+info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
+statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
@@ -821,17 +822,15 @@ i(down_slave_nodes, #q{q = #amqqueue{name = Name,
end;
i(state, #q{status = running}) -> credit_flow:state();
i(state, #q{status = State}) -> State;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(Item, _) ->
- throw({bad_argument, Item}).
+i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:info(Item, BQS).
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
ExtraKs = [K || {K, _} <- Extra],
- Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State),
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
@@ -931,7 +930,7 @@ handle_call({init, Recover}, From,
end;
handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
+ reply(infos(info_keys(), State), State);
handle_call({info, Items}, _From, State) ->
try
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 8f37bf60..9e5f0813 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -16,6 +16,12 @@
-module(rabbit_backing_queue).
+-export([info_keys/0]).
+
+-define(INFO_KEYS, [messages_ram, messages_ready_ram,
+ messages_unacknowledged_ram, messages_persistent,
+ backing_queue_status]).
+
-ifdef(use_specs).
%% We can't specify a per-queue ack/state with callback signatures
@@ -37,6 +43,8 @@
-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
@@ -216,9 +224,7 @@
%% inbound messages and outbound messages at the moment.
-callback msg_rates(state()) -> {float(), float()}.
-%% Exists for debugging purposes, to be able to expose state via
-%% rabbitmqctl list_queues backing_queue_status
--callback status(state()) -> [{atom(), any()}].
+-callback info(atom(), state()) -> any().
%% Passed a function to be invoked with the relevant backing queue's
%% state. Useful for when the backing queue or other components need
@@ -243,9 +249,11 @@ behaviour_info(callbacks) ->
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1},
- {invoke, 3}, {is_duplicate, 2}] ;
+ {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0},
+ {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
-endif.
+
+info_keys() -> ?INFO_KEYS.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 24b22d4c..9bccf5dd 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, status/1, invoke/3, is_duplicate/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
@@ -374,10 +374,13 @@ resume(State = #state { backing_queue = BQ,
msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:msg_rates(BQS).
-status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS) ++
+info(backing_queue_status,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(backing_queue_status, BQS) ++
[ {mirror_seen, dict:size(State #state.seen_status)},
- {mirror_senders, sets:size(State #state.known_senders)} ].
+ {mirror_senders, sets:size(State #state.known_senders)} ];
+info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:info(Item, BQS).
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ede69748..dc41b589 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -821,15 +821,18 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
out = AvgEgressRate } }) ->
{AvgIngressRate, AvgEgressRate}.
-status(#vqstate {
+info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
+ RamMsgCount;
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
+ gb_trees:size(RPA);
+info(messages_ram, State) ->
+ info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
+info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
+ PersistentCount;
+info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
- ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
rates = #rates { in = AvgIngressRate,
out = AvgEgressRate,
ack_in = AvgAckIngressRate,
@@ -840,17 +843,14 @@ status(#vqstate {
{delta , Delta},
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
- {len , Len},
- {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
- {ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+ {avg_ack_egress_rate , AvgAckEgressRate} ];
+info(Item, _) ->
+ throw({bad_argument, Item}).
invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
invoke( _, _, State) -> State.