diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-14 12:03:10 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-14 12:03:10 +0000 |
commit | 8444163da6f023914044d6532cab70bff735273e (patch) | |
tree | 7cc5f7892f8d784a2e47d695c4c046042c3b2829 | |
parent | 52b6f753a82d6d8b6e5d10714a296fe185563c14 (diff) | |
download | rabbitmq-server-8444163da6f023914044d6532cab70bff735273e.tar.gz |
consumer_bound info item.
-rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 |
2 files changed, 18 insertions, 9 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6ec7ee07..5e5a5544 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1135,6 +1135,10 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> + <term>consumer_bound</term> + <listitem><para>True if the queue could deliver messages faster if there were more consumers, the consumers were faster, or the consumers had a greater prefetch count.</para></listitem> + </varlistentry> + <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4ff30ce0..d4cba944 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,7 @@ backing_queue, backing_queue_state, active_consumers, + active_consumers_last_empty, expires, sync_timer_ref, rate_timer_ref, @@ -95,6 +96,7 @@ messages_unacknowledged, messages, consumers, + consumer_bound, memory, slave_pids, synchronised_slave_pids, @@ -145,14 +147,15 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3. init_state(Q) -> - State = #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - active_consumers = priority_queue:new(), - senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), - status = running, - args_policy_version = 0}, + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + active_consumers = priority_queue:new(), + active_consumers_last_empty = erlang:now(), + senders = pmon:new(delegate), + msg_id_to_channel = gb_trees:empty(), + status = running, + args_policy_version = 0}, rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -485,7 +488,7 @@ deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State}; + {false, State#q{active_consumers_last_empty = erlang:now()}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -1037,6 +1040,8 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(consumer_bound, #q{active_consumers_last_empty = Last}) -> + timer:now_diff(erlang:now(), Last) < 1000000; i(memory, _) -> {memory, M} = process_info(self(), memory), M; |