summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl44
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 82ed2ec8..b66109e3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -354,7 +354,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount,
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
{0, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
- end.
+ end,
+ C.
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
@@ -368,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
+ ok = rabbit_limiter:register(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 1});
+update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 0,
+ limiter = rabbit_limiter:make_token()});
+update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
+ update_ch_record(C#cr{consumer_count = Count + Delta}).
+
all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
@@ -405,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
true -> sets:add_element(AckTag, ChAckTags);
false -> ChAckTags
end,
- NewC = C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1},
- update_ch_record(NewC),
+ NewC = update_ch_record(
+ C#cr{unsent_message_count = Count + 1,
+ acktags = ChAckTags1}),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -607,8 +618,7 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- NewC = Update(C),
- update_ch_record(NewC),
+ NewC = update_ch_record(Update(C)),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -962,15 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ C = ch_record(ChPid),
+ C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- update_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter}),
- ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(Limiter, self());
- _ -> ok
- end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> ExistingHolder
end,
@@ -978,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
State2 =
- case is_ch_blocked(C) of
+ case is_ch_blocked(C1) of
true -> State1#q{
blocked_consumers =
add_consumer(ChPid, Consumer,
@@ -1000,15 +1005,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount,
- limiter = Limiter} ->
- C1 = C#cr{consumer_count = ConsumerCount -1},
- update_ch_record(
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
- C1#cr{limiter = rabbit_limiter:make_token()};
- _ -> C1
- end),
+ C ->
+ update_consumer_count(C, -1),
emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =