summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-09 13:02:46 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-09 13:02:46 +0000
commit5afcb260bcf40622f1c3ea7b395a23dc8de5220a (patch)
treeea9adca6bdc4ec14f00d2feb40a0438e494f504c
parente68984c2f411371d8d0512557639c30b48ffa8da (diff)
downloadrabbitmq-server-5afcb260bcf40622f1c3ea7b395a23dc8de5220a.tar.gz
fix credit handling
This cannot go via possibly_unblock since that deals with *channels* becoming unblocked only, whereas here we care about possibly unblocking a consumer on an otherwise unblocked channel.
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_queue_consumers.erl35
2 files changed, 32 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d916dccb..f1cbf274 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1211,13 +1211,18 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS1});
handle_cast({credit, ChPid, CTag, Credit, Drain},
- State = #q{backing_queue = BQ,
+ State = #q{consumers = Consumers,
+ backing_queue = BQ,
backing_queue_state = BQS}) ->
Len = BQ:len(BQS),
rabbit_channel:send_credit_reply(ChPid, Len),
- noreply(possibly_unblock(rabbit_queue_consumers:credit_fun(
- Len == 0, Credit, Drain, CTag),
- ChPid, State));
+ noreply(
+ case rabbit_queue_consumers:credit(Len == 0, Credit, Drain, ChPid, CTag,
+ Consumers) of
+ unchanged -> State;
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, State1)
+ end);
handle_cast(notify_decorators, State) ->
notify_decorators(State),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f06423f7..ab235755 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -20,8 +20,8 @@
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
- resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
- utilisation/1]).
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
+ credit/6, utilisation/1]).
%%----------------------------------------------------------------------------
@@ -84,11 +84,11 @@
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
--spec resume_fun() -> cr_fun().
--spec notify_sent_fun(non_neg_integer()) -> cr_fun().
--spec activate_limit_fun() -> cr_fun().
--spec credit_fun(boolean(), non_neg_integer(), boolean(),
- rabbit_types:ctag()) -> cr_fun().
+-spec resume_fun() -> cr_fun().
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+-spec activate_limit_fun() -> cr_fun().
+-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
+ state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
-endif.
@@ -306,13 +306,24 @@ activate_limit_fun() ->
C#cr{limiter = rabbit_limiter:activate(Limiter)}
end.
-credit_fun(IsEmpty, Credit, Drain, CTag) ->
- fun (C = #cr{limiter = Limiter}) ->
+credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ unchanged;
+ #cr{limiter = Limiter} = C ->
C1 = C#cr{limiter = rabbit_limiter:credit(
Limiter, CTag, Credit, IsEmpty, Drain)},
- case Drain andalso IsEmpty of
- true -> send_drained(C1);
- false -> C1
+ C2 = #cr{limiter = Limiter1} =
+ case Drain andalso IsEmpty of
+ true -> send_drained(C1);
+ false -> C1
+ end,
+ case is_ch_blocked(C2) orelse
+ (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
+ rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
+ true -> update_ch_record(C2),
+ unchanged;
+ false -> unblock(C2, State)
end
end.