diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-09 13:02:46 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-09 13:02:46 +0000 |
commit | 5afcb260bcf40622f1c3ea7b395a23dc8de5220a (patch) | |
tree | ea9adca6bdc4ec14f00d2feb40a0438e494f504c | |
parent | e68984c2f411371d8d0512557639c30b48ffa8da (diff) | |
download | rabbitmq-server-5afcb260bcf40622f1c3ea7b395a23dc8de5220a.tar.gz |
fix credit handling
This cannot go via possibly_unblock since that deals with *channels*
becoming unblocked only, whereas here we care about possibly
unblocking a consumer on an otherwise unblocked channel.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 35 |
2 files changed, 32 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d916dccb..f1cbf274 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1211,13 +1211,18 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS1}); handle_cast({credit, ChPid, CTag, Credit, Drain}, - State = #q{backing_queue = BQ, + State = #q{consumers = Consumers, + backing_queue = BQ, backing_queue_state = BQS}) -> Len = BQ:len(BQS), rabbit_channel:send_credit_reply(ChPid, Len), - noreply(possibly_unblock(rabbit_queue_consumers:credit_fun( - Len == 0, Credit, Drain, CTag), - ChPid, State)); + noreply( + case rabbit_queue_consumers:credit(Len == 0, Credit, Drain, ChPid, CTag, + Consumers) of + unchanged -> State; + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, State1) + end); handle_cast(notify_decorators, State) -> notify_decorators(State), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f06423f7..ab235755 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -20,8 +20,8 @@ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, - resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, - utilisation/1]). + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, + credit/6, utilisation/1]). %%---------------------------------------------------------------------------- @@ -84,11 +84,11 @@ -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. --spec resume_fun() -> cr_fun(). --spec notify_sent_fun(non_neg_integer()) -> cr_fun(). --spec activate_limit_fun() -> cr_fun(). --spec credit_fun(boolean(), non_neg_integer(), boolean(), - rabbit_types:ctag()) -> cr_fun(). +-spec resume_fun() -> cr_fun(). +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). +-spec activate_limit_fun() -> cr_fun(). +-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), + state()) -> 'unchanged' | {'unblocked', state()}. -spec utilisation(state()) -> ratio(). -endif. @@ -306,13 +306,24 @@ activate_limit_fun() -> C#cr{limiter = rabbit_limiter:activate(Limiter)} end. -credit_fun(IsEmpty, Credit, Drain, CTag) -> - fun (C = #cr{limiter = Limiter}) -> +credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> + case lookup_ch(ChPid) of + not_found -> + unchanged; + #cr{limiter = Limiter} = C -> C1 = C#cr{limiter = rabbit_limiter:credit( Limiter, CTag, Credit, IsEmpty, Drain)}, - case Drain andalso IsEmpty of - true -> send_drained(C1); - false -> C1 + C2 = #cr{limiter = Limiter1} = + case Drain andalso IsEmpty of + true -> send_drained(C1); + false -> C1 + end, + case is_ch_blocked(C2) orelse + (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse + rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of + true -> update_ch_record(C2), + unchanged; + false -> unblock(C2, State) end end. |