diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 14:38:48 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 14:38:48 +0000 |
commit | 065c6794c4f6279487d5db45ebbf610d57e70335 (patch) | |
tree | e52211064e8b58ba54e82533083fa38427b1b5de /src/rabbit_queue_consumers.erl | |
parent | af0265053ebae4ae006fd4cca2d66bf499dd26ef (diff) | |
download | rabbitmq-server-065c6794c4f6279487d5db45ebbf610d57e70335.tar.gz |
Upon unblocking, unblock, and run message queue.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 9dafa71a..3ae29d30 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/3, + send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, utilisation/1]). @@ -244,16 +244,20 @@ record_ack(ChPid, LimiterPid, AckTag) -> update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), ok. -subtract_acks(ChPid, CTag, AckTags) -> +subtract_acks(ChPid, CTag, AckTags, State) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> - Lim2 = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), + {Lim2, Unblocked} = + rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), AckTags2 = subtract_acks0(AckTags, [], ChAckTags), - update_ch_record(C#cr{acktags = AckTags2, - limiter = Lim2}), - ok + C2 = C#cr{acktags = AckTags2, limiter = Lim2}, + case Unblocked of + true -> unblock(C2, State); + false -> update_ch_record(C2), + unchanged + end end. subtract_acks0([], [], AckQ) -> |