diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 82ed2ec8..b66109e3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -354,7 +354,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount, case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of {0, 0, 0} -> ok = erase_ch_record(C); _ -> ok = store_ch_record(C) - end. + end, + C. store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), @@ -368,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> + ok = rabbit_limiter:register(Limiter, self()), + update_ch_record(C#cr{consumer_count = 1}); +update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> + ok = rabbit_limiter:unregister(Limiter, self()), + update_ch_record(C#cr{consumer_count = 0, + limiter = rabbit_limiter:make_token()}); +update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> + update_ch_record(C#cr{consumer_count = Count + Delta}). + all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> @@ -405,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, true -> sets:add_element(AckTag, ChAckTags); false -> ChAckTags end, - NewC = C#cr{unsent_message_count = Count + 1, - acktags = ChAckTags1}, - update_ch_record(NewC), + NewC = update_ch_record( + C#cr{unsent_message_count = Count + 1, + acktags = ChAckTags1}), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -607,8 +618,7 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - NewC = Update(C), - update_ch_record(NewC), + NewC = update_ch_record(Update(C)), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -962,15 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + C = ch_record(ChPid), + C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - update_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}), - ok = case ConsumerCount of - 0 -> rabbit_limiter:register(Limiter, self()); - _ -> ok - end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> ExistingHolder end, @@ -978,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), State2 = - case is_ch_blocked(C) of + case is_ch_blocked(C1) of true -> State1#q{ blocked_consumers = add_consumer(ChPid, Consumer, @@ -1000,15 +1005,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, - limiter = Limiter} -> - C1 = C#cr{consumer_count = ConsumerCount -1}, - update_ch_record( - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(Limiter, self()), - C1#cr{limiter = rabbit_limiter:make_token()}; - _ -> C1 - end), + C -> + update_consumer_count(C, -1), emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = |