summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-21 11:38:53 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-21 11:38:53 +0000
commit82cb8cff4c2a73b62ca014d21769729b6e14e2a5 (patch)
treea7985a90e2dd6d433b8732cffb83193d4014d39f /src
parent7ab604b801537e6e3812bd5c5738b3bd6822c795 (diff)
downloadrabbitmq-server-82cb8cff4c2a73b62ca014d21769729b6e14e2a5.tar.gz
simplify queue's basic_consume handler
- the call to update_ch_record in the is_ch_blocked(C1) == false branch was superfluos since the preceding update_consumer_count calls update_ch_record - all the checking whether the channel is blocked, and associated branching was just an optimisation. And not a particularly important one, since a) the "a new consumer comes along while its channel is blocked" case is hardly on the critical path, and b) exactly the same check is performed as part of run_message_queue (in deliver_msg_to_consumer/3). So get rid of it. - the is_empty & send_drained logic can be invoked earlier, which allows us to use the #cr we have rather than looking it up again. We can do this since the only case we need to catch here is that of a consumer coming along while the queue is empty already. If it becomes empty as part of run_message_queue then send_drained will be invoked in 'fetch'.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl22
1 files changed, 7 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a43dbdcc..c02fd6b5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1142,6 +1142,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, Credit, Drain)
end,
C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1150,22 +1154,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
- case is_empty(State2) of
- true -> send_drained(lookup_ch(ChPid));
- false -> ok
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,