diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-17 12:01:00 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-17 12:01:00 +0100 |
commit | 18555077d5e14138c9568691367f0277b5ab0f79 (patch) | |
tree | 938fb8d4184e09714f1cd2acb8f0193604fa1bba | |
parent | b25c1f560c34ea3595b017499e7759c0ab7decb3 (diff) | |
download | rabbitmq-server-18555077d5e14138c9568691367f0277b5ab0f79.tar.gz |
extract correlation between consumer_count and limiter registration
...and fix a bug in the process: during consumer registration we were
using the *old* cr, with the old (and usually undefined) limiter, to
check whether the channel is blocked. Thus we would end up running
through the message queue unnecessarily. No big deal, but certainly
not what was intended.
Also, make the update_ch_record return the record, which makes the
function more composable.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 82ed2ec8..b66109e3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -354,7 +354,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount, case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of {0, 0, 0} -> ok = erase_ch_record(C); _ -> ok = store_ch_record(C) - end. + end, + C. store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), @@ -368,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> + ok = rabbit_limiter:register(Limiter, self()), + update_ch_record(C#cr{consumer_count = 1}); +update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> + ok = rabbit_limiter:unregister(Limiter, self()), + update_ch_record(C#cr{consumer_count = 0, + limiter = rabbit_limiter:make_token()}); +update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> + update_ch_record(C#cr{consumer_count = Count + Delta}). + all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> @@ -405,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, true -> sets:add_element(AckTag, ChAckTags); false -> ChAckTags end, - NewC = C#cr{unsent_message_count = Count + 1, - acktags = ChAckTags1}, - update_ch_record(NewC), + NewC = update_ch_record( + C#cr{unsent_message_count = Count + 1, + acktags = ChAckTags1}), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -607,8 +618,7 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - NewC = Update(C), - update_ch_record(NewC), + NewC = update_ch_record(Update(C)), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -962,15 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + C = ch_record(ChPid), + C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - update_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}), - ok = case ConsumerCount of - 0 -> rabbit_limiter:register(Limiter, self()); - _ -> ok - end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> ExistingHolder end, @@ -978,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), State2 = - case is_ch_blocked(C) of + case is_ch_blocked(C1) of true -> State1#q{ blocked_consumers = add_consumer(ChPid, Consumer, @@ -1000,15 +1005,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, - limiter = Limiter} -> - C1 = C#cr{consumer_count = ConsumerCount -1}, - update_ch_record( - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(Limiter, self()), - C1#cr{limiter = rabbit_limiter:make_token()}; - _ -> C1 - end), + C -> + update_consumer_count(C, -1), emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = |