summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-19 00:10:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-19 00:10:52 +0100
commite1ba0047eea4ec6e0582cdac90871a8cb3ced81f (patch)
treede48052f1bba728503c1d35de47beb2a2ad36b4b
parentaaa37d459dfdc18e394dfef01e204072cb5ce716 (diff)
downloadrabbitmq-server-e1ba0047eea4ec6e0582cdac90871a8cb3ced81f.tar.gz
count queue consumers as required by the spec
-rw-r--r--src/rabbit_amqqueue_process.erl19
1 files changed, 9 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 47a9865e..b9fb2f85 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -648,8 +648,13 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() ->
- lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+consumer_count() -> consumer_count(fun (_) -> false end).
+
+active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
+
+consumer_count(Exclude) ->
+ lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
+ not Exclude(C)]).
is_unused(_State) -> consumer_count() == 0.
@@ -1008,15 +1013,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
- active_consumers = ActiveConsumers} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(ensure_expiry_timer(State)),
- %% TODO: According to the spec the returned consumer count should
- %% not include blocked consumers. Since we remove those from
- %% ActiveConsumers lazily - when come across them when attempting
- %% to deliver messages - the count we are returning here may
- %% contain some blocked consumers.
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
+ reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->