diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-29 13:09:02 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-29 13:09:02 +0100 |
commit | 0fa2b4f6be0580f2fa76eb80d272ae3b37850ead (patch) | |
tree | da3b148de307a2ed9bcaf4d10c59ab21b0ea7f56 | |
parent | a7638709c0fd64df1be6bb53a624487feb82289c (diff) | |
download | rabbitmq-server-0fa2b4f6be0580f2fa76eb80d272ae3b37850ead.tar.gz |
Make some of the more interesting bits of the VQ backing_queue_status an official part of the BQ API.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
4 files changed, 39 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4082c53d..63b18655 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -103,7 +103,8 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). -info_keys() -> ?INFO_KEYS. +info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). +statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- @@ -821,17 +822,15 @@ i(down_slave_nodes, #q{q = #amqqueue{name = Name, end; i(state, #q{status = running}) -> credit_flow:state(); i(state, #q{status = State}) -> State; -i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> - BQ:status(BQS); -i(Item, _) -> - throw({bad_argument, Item}). +i(Item, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:info(Item, BQS). emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> ExtraKs = [K || {K, _} <- Extra], - Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State), + Infos = [{K, V} || {K, V} <- infos(statistics_keys(), State), not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). @@ -931,7 +930,7 @@ handle_call({init, Recover}, From, end; handle_call(info, _From, State) -> - reply(infos(?INFO_KEYS, State), State); + reply(infos(info_keys(), State), State); handle_call({info, Items}, _From, State) -> try diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 8f37bf60..9e5f0813 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -16,6 +16,12 @@ -module(rabbit_backing_queue). +-export([info_keys/0]). + +-define(INFO_KEYS, [messages_ram, messages_ready_ram, + messages_unacknowledged_ram, messages_persistent, + backing_queue_status]). + -ifdef(use_specs). %% We can't specify a per-queue ack/state with callback signatures @@ -37,6 +43,8 @@ -type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). + %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency @@ -216,9 +224,7 @@ %% inbound messages and outbound messages at the moment. -callback msg_rates(state()) -> {float(), float()}. -%% Exists for debugging purposes, to be able to expose state via -%% rabbitmqctl list_queues backing_queue_status --callback status(state()) -> [{atom(), any()}]. +-callback info(atom(), state()) -> any(). %% Passed a function to be invoked with the relevant backing queue's %% state. Useful for when the backing queue or other components need @@ -243,9 +249,11 @@ behaviour_info(callbacks) -> {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {status, 1}, - {invoke, 3}, {is_duplicate, 2}] ; + {handle_pre_hibernate, 1}, {resume, 1}, {msg_rates, 1}, {info_keys, 0}, + {infos, 2}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. -endif. + +info_keys() -> ?INFO_KEYS. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 24b22d4c..9bccf5dd 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, status/1, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -374,10 +374,13 @@ resume(State = #state { backing_queue = BQ, msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:msg_rates(BQS). -status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS) ++ +info(backing_queue_status, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(backing_queue_status, BQS) ++ [ {mirror_seen, dict:size(State #state.seen_status)}, - {mirror_senders, sets:size(State #state.known_senders)} ]. + {mirror_senders, sets:size(State #state.known_senders)} ]; +info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:info(Item, BQS). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ede69748..dc41b589 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -821,15 +821,18 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, out = AvgEgressRate } }) -> {AvgIngressRate, AvgEgressRate}. -status(#vqstate { +info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> + RamMsgCount; +info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) -> + gb_trees:size(RPA); +info(messages_ram, State) -> + info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); +info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> + PersistentCount; +info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - ram_pending_ack = RPA, - disk_pending_ack = DPA, target_ram_count = TargetRamCount, - ram_msg_count = RamMsgCount, next_seq_id = NextSeqId, - persistent_count = PersistentCount, rates = #rates { in = AvgIngressRate, out = AvgEgressRate, ack_in = AvgAckIngressRate, @@ -840,17 +843,14 @@ status(#vqstate { {delta , Delta}, {q3 , ?QUEUE:len(Q3)}, {q4 , ?QUEUE:len(Q4)}, - {len , Len}, - {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)}, {target_ram_count , TargetRamCount}, - {ram_msg_count , RamMsgCount}, - {ram_ack_count , gb_trees:size(RPA)}, {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]. + {avg_ack_egress_rate , AvgAckEgressRate} ]; +info(Item, _) -> + throw({bad_argument, Item}). invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); invoke( _, _, State) -> State. |