summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:45:01 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-20 19:45:01 +0000
commitcba68ce1a820cd6f57dc86482049714734d309cd (patch)
treeedc25e8b29af9229cabe5a3a28f156e08332c12f
parentd011068eabb9861e49e3c1323a9bf1a8465a2b30 (diff)
downloadrabbitmq-server-cba68ce1a820cd6f57dc86482049714734d309cd.tar.gz
get rid of maybe_send_drained_cons
and optimise handle_cast/credit along the way
-rw-r--r--src/rabbit_amqqueue_process.erl28
1 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0530d650..c0e74129 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -408,12 +408,6 @@ maybe_send_drained(WasEmpty, State) ->
false -> ok
end.
-maybe_send_drained_cons(C, State) ->
- case is_empty(State) of
- true -> send_drained(C);
- false -> ok
- end.
-
send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
{[], Limiter} -> ok;
@@ -1166,7 +1160,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
AC1 = queue:in(E, State1#q.active_consumers),
run_message_queue(State1#q{active_consumers = AC1})
end,
- maybe_send_drained_cons(C1, State2),
+ case is_empty(State2) of
+ true -> send_drained(lookup_ch(ChPid));
+ false -> ok
+ end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State2)),
reply(ok, State2)
@@ -1376,13 +1373,16 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- #cr{limiter = Lim} = ch_record(ChPid),
- Lim2 = rabbit_limiter:credit(Lim, CTag, Credit, Drain),
- rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)),
- State1 = possibly_unblock(
- State, ChPid, fun(C) -> C#cr{limiter = Lim2} end),
- maybe_send_drained_cons(lookup_ch(ChPid), State1),
- noreply(State1);
+ Len = BQ:len(BQS),
+ rabbit_channel:send_credit_reply(ChPid, Len),
+ C = #cr{limiter = Lim} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Lim, CTag, Credit, Drain)},
+ noreply(case Drain andalso Len == 0 of
+ true -> update_ch_record(C1),
+ send_drained(C1),
+ State;
+ false -> possibly_unblock(State, C1)
+ end);
handle_cast(wake_up, State) ->
noreply(State).