diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-21 11:38:53 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-21 11:38:53 +0000 |
commit | 82cb8cff4c2a73b62ca014d21769729b6e14e2a5 (patch) | |
tree | a7985a90e2dd6d433b8732cffb83193d4014d39f /src | |
parent | 7ab604b801537e6e3812bd5c5738b3bd6822c795 (diff) | |
download | rabbitmq-server-82cb8cff4c2a73b62ca014d21769729b6e14e2a5.tar.gz |
simplify queue's basic_consume handler
- the call to update_ch_record in the is_ch_blocked(C1) == false
branch was superfluos since the preceding update_consumer_count
calls update_ch_record
- all the checking whether the channel is blocked, and associated
branching was just an optimisation. And not a particularly important
one, since a) the "a new consumer comes along while its
channel is blocked" case is hardly on the critical path, and b)
exactly the same check is performed as part of run_message_queue (in
deliver_msg_to_consumer/3). So get rid of it.
- the is_empty & send_drained logic can be invoked earlier, which
allows us to use the #cr we have rather than looking it up again. We
can do this since the only case we need to catch here is that of a
consumer coming along while the queue is empty already. If it
becomes empty as part of run_message_queue then send_drained will be
invoked in 'fetch'.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 |
1 files changed, 7 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a43dbdcc..c02fd6b5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1142,6 +1142,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, Limiter, ConsumerTag, Credit, Drain) end, C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1150,22 +1154,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - E = {ChPid, Consumer}, - State2 = - case is_ch_blocked(C1) of - true -> block_consumer(C1, E), - State1; - false -> update_ch_record(C1), - AC1 = queue:in(E, State1#q.active_consumers), - run_message_queue(State1#q{active_consumers = AC1}) - end, - case is_empty(State2) of - true -> send_drained(lookup_ch(ChPid)); - false -> ok - end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State2)), - reply(ok, State2) + not NoAck, qname(State1)), + AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), + reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, |