summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-09 14:38:48 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-09 14:38:48 +0000
commit065c6794c4f6279487d5db45ebbf610d57e70335 (patch)
treee52211064e8b58ba54e82533083fa38427b1b5de /src/rabbit_queue_consumers.erl
parentaf0265053ebae4ae006fd4cca2d66bf499dd26ef (diff)
downloadrabbitmq-server-065c6794c4f6279487d5db45ebbf610d57e70335.tar.gz
Upon unblocking, unblock, and run message queue.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl16
1 files changed, 10 insertions, 6 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 9dafa71a..3ae29d30 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -244,16 +244,20 @@ record_ack(ChPid, LimiterPid, AckTag) ->
update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
ok.
-subtract_acks(ChPid, CTag, AckTags) ->
+subtract_acks(ChPid, CTag, AckTags, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{acktags = ChAckTags, limiter = Lim} ->
- Lim2 = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
+ {Lim2, Unblocked} =
+ rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
AckTags2 = subtract_acks0(AckTags, [], ChAckTags),
- update_ch_record(C#cr{acktags = AckTags2,
- limiter = Lim2}),
- ok
+ C2 = C#cr{acktags = AckTags2, limiter = Lim2},
+ case Unblocked of
+ true -> unblock(C2, State);
+ false -> update_ch_record(C2),
+ unchanged
+ end
end.
subtract_acks0([], [], AckQ) ->