diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 19:45:01 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-20 19:45:01 +0000 |
commit | cba68ce1a820cd6f57dc86482049714734d309cd (patch) | |
tree | edc25e8b29af9229cabe5a3a28f156e08332c12f | |
parent | d011068eabb9861e49e3c1323a9bf1a8465a2b30 (diff) | |
download | rabbitmq-server-cba68ce1a820cd6f57dc86482049714734d309cd.tar.gz |
get rid of maybe_send_drained_cons
and optimise handle_cast/credit along the way
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0530d650..c0e74129 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -408,12 +408,6 @@ maybe_send_drained(WasEmpty, State) -> false -> ok end. -maybe_send_drained_cons(C, State) -> - case is_empty(State) of - true -> send_drained(C); - false -> ok - end. - send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> case rabbit_limiter:drained(Limiter) of {[], Limiter} -> ok; @@ -1166,7 +1160,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, AC1 = queue:in(E, State1#q.active_consumers), run_message_queue(State1#q{active_consumers = AC1}) end, - maybe_send_drained_cons(C1, State2), + case is_empty(State2) of + true -> send_drained(lookup_ch(ChPid)); + false -> ok + end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State2)), reply(ok, State2) @@ -1376,13 +1373,16 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - #cr{limiter = Lim} = ch_record(ChPid), - Lim2 = rabbit_limiter:credit(Lim, CTag, Credit, Drain), - rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)), - State1 = possibly_unblock( - State, ChPid, fun(C) -> C#cr{limiter = Lim2} end), - maybe_send_drained_cons(lookup_ch(ChPid), State1), - noreply(State1); + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Lim} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Lim, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> possibly_unblock(State, C1) + end); handle_cast(wake_up, State) -> noreply(State). |