summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-03 12:55:34 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-03 12:55:34 +0100
commit1d3fa1819b5471fb8db5804f78d0c1bbe957b41e (patch)
treefec604920623168d29c8347afc3353fb1320ac6a
parent9207ac7614247cc09b9d6785307c9ead042fb3bc (diff)
parenta091bbd4bd494cf7848117ce83eee439cb62f269 (diff)
downloadrabbitmq-server-1d3fa1819b5471fb8db5804f78d0c1bbe957b41e.tar.gz
Merge bug25200
-rw-r--r--docs/rabbitmqctl.1.xml21
-rw-r--r--src/rabbit_amqqueue_process.erl31
2 files changed, 37 insertions, 15 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 11d85e9e..73347cea 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -986,6 +986,27 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>active_consumers</term>
+ <listitem>
+ <para>
+ Number of active consumers. An active consumer is
+ one which could immediately receive any messages
+ sent to the queue - i.e. it is not limited by its
+ prefetch count, TCP congestion, flow control, or
+ because it has issued channel.flow. At least one
+ of messages_ready and active_consumers must always
+ be zero.
+ </para>
+ <para>
+ Note that this value is an instantaneous snapshot
+ - when consumers are restricted by their prefetch
+ count they may only appear to be active for small
+ fractions of a second until more messages are sent
+ out.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a22e32b0..30df2b5c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -92,6 +92,7 @@
messages_unacknowledged,
messages,
consumers,
+ active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
@@ -574,14 +575,13 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
State2#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
- BQS1
- end, State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ run_message_queue(State#q{backing_queue_state = BQS1}).
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+fetch(AckRequired, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
@@ -675,12 +675,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-backing_queue_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-
-run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
+backing_queue_timeout(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
case lookup_ch(ChPid) of
@@ -910,6 +907,8 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(active_consumers, _) ->
+ active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1190,8 +1189,10 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
-handle_cast({run_backing_queue, Mod, Fun}, State) ->
- noreply(run_backing_queue(Mod, Fun, State));
+handle_cast({run_backing_queue, Mod, Fun},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ noreply(run_message_queue(
+ State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->