diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-03 12:55:34 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-03 12:55:34 +0100 |
commit | 1d3fa1819b5471fb8db5804f78d0c1bbe957b41e (patch) | |
tree | fec604920623168d29c8347afc3353fb1320ac6a | |
parent | 9207ac7614247cc09b9d6785307c9ead042fb3bc (diff) | |
parent | a091bbd4bd494cf7848117ce83eee439cb62f269 (diff) | |
download | rabbitmq-server-1d3fa1819b5471fb8db5804f78d0c1bbe957b41e.tar.gz |
Merge bug25200
-rw-r--r-- | docs/rabbitmqctl.1.xml | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 |
2 files changed, 37 insertions, 15 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 11d85e9e..73347cea 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -986,6 +986,27 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> + <term>active_consumers</term> + <listitem> + <para> + Number of active consumers. An active consumer is + one which could immediately receive any messages + sent to the queue - i.e. it is not limited by its + prefetch count, TCP congestion, flow control, or + because it has issued channel.flow. At least one + of messages_ready and active_consumers must always + be zero. + </para> + <para> + Note that this value is an instantaneous snapshot + - when consumers are restricted by their prefetch + count they may only appear to be active for small + fractions of a second until more messages are sent + out. + </para> + </listitem> + </varlistentry> + <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a22e32b0..30df2b5c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -92,6 +92,7 @@ messages_unacknowledged, messages, consumers, + active_consumers, memory, slave_pids, synchronised_slave_pids, @@ -574,14 +575,13 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, State2#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> - {_MsgIds, BQS1} = M:requeue(AckTags, BQS), - BQS1 - end, State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + run_message_queue(State#q{backing_queue_state = BQS1}). -fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +fetch(AckRequired, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. @@ -675,12 +675,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -backing_queue_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). - -run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). +backing_queue_timeout(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:timeout(BQS)}. subtract_acks(ChPid, AckTags, State, Fun) -> case lookup_ch(ChPid) of @@ -910,6 +907,8 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(active_consumers, _) -> + active_consumer_count(); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1190,8 +1189,10 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); -handle_cast({run_backing_queue, Mod, Fun}, State) -> - noreply(run_backing_queue(Mod, Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + noreply(run_message_queue( + State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> |