summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-21 12:43:50 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-21 12:43:50 +0000
commitd1562e9de47255303213793205f648c64aa542d1 (patch)
tree96e105735cf0b7dcdb0b54f8553273250bc27be1
parent43ec388e97d0effb26b6ef4aef8d1a676acd9c94 (diff)
downloadrabbitmq-server-d1562e9de47255303213793205f648c64aa542d1.tar.gz
simplify queue's basic_consume handler
backported from bug23749 branch - the call to update_ch_record in the is_ch_blocked(C1) == false branch was superfluos since the preceding update_consumer_count calls update_ch_record - all the checking whether the channel is blocked, and associated branching was just an optimisation. And not a particularly important one, since a) the "a new consumer comes along while its channel is blocked" case is hardly on the critical path, and b) exactly the same check is performed as part of run_message_queue (in deliver_msg_to_consumer/3). So get rid of it.
-rw-r--r--src/rabbit_amqqueue_process.erl16
1 files changed, 4 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ef48bb5d..fba58d38 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1097,7 +1097,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = ch_record(ChPid),
- C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
+ update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1106,18 +1106,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,