diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-19 00:10:52 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-19 00:10:52 +0100 |
commit | e1ba0047eea4ec6e0582cdac90871a8cb3ced81f (patch) | |
tree | de48052f1bba728503c1d35de47beb2a2ad36b4b | |
parent | aaa37d459dfdc18e394dfef01e204072cb5ce716 (diff) | |
download | rabbitmq-server-e1ba0047eea4ec6e0582cdac90871a8cb3ced81f.tar.gz |
count queue consumers as required by the spec
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 47a9865e..b9fb2f85 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -648,8 +648,13 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -consumer_count() -> - lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). +consumer_count() -> consumer_count(fun (_) -> false end). + +active_consumer_count() -> consumer_count(fun is_ch_blocked/1). + +consumer_count(Exclude) -> + lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(), + not Exclude(C)]). is_unused(_State) -> consumer_count() == 0. @@ -1008,15 +1013,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS, - active_consumers = ActiveConsumers} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(ensure_expiry_timer(State)), - %% TODO: According to the spec the returned consumer count should - %% not include blocked consumers. Since we remove those from - %% ActiveConsumers lazily - when come across them when attempting - %% to deliver messages - the count we are returning here may - %% contain some blocked consumers. - reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1); + reply({ok, BQ:len(BQS), active_consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> |