summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-14 12:03:10 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-14 12:03:10 +0000
commit8444163da6f023914044d6532cab70bff735273e (patch)
tree7cc5f7892f8d784a2e47d695c4c046042c3b2829
parent52b6f753a82d6d8b6e5d10714a296fe185563c14 (diff)
downloadrabbitmq-server-8444163da6f023914044d6532cab70bff735273e.tar.gz
consumer_bound info item.
-rw-r--r--docs/rabbitmqctl.1.xml4
-rw-r--r--src/rabbit_amqqueue_process.erl23
2 files changed, 18 insertions, 9 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6ec7ee07..5e5a5544 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,6 +1135,10 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>consumer_bound</term>
+ <listitem><para>True if the queue could deliver messages faster if there were more consumers, the consumers were faster, or the consumers had a greater prefetch count.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4ff30ce0..d4cba944 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
+ active_consumers_last_empty,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -95,6 +96,7 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_bound,
memory,
slave_pids,
synchronised_slave_pids,
@@ -145,14 +147,15 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3.
init_state(Q) ->
- State = #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- active_consumers = priority_queue:new(),
- senders = pmon:new(delegate),
- msg_id_to_channel = gb_trees:empty(),
- status = running,
- args_policy_version = 0},
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ active_consumers = priority_queue:new(),
+ active_consumers_last_empty = erlang:now(),
+ senders = pmon:new(delegate),
+ msg_id_to_channel = gb_trees:empty(),
+ status = running,
+ args_policy_version = 0},
rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -485,7 +488,7 @@ deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State};
+ {false, State#q{active_consumers_last_empty = erlang:now()}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -1037,6 +1040,8 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(consumer_bound, #q{active_consumers_last_empty = Last}) ->
+ timer:now_diff(erlang:now(), Last) < 1000000;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;