summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-17 12:01:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-17 12:01:00 +0100
commit18555077d5e14138c9568691367f0277b5ab0f79 (patch)
tree938fb8d4184e09714f1cd2acb8f0193604fa1bba
parentb25c1f560c34ea3595b017499e7759c0ab7decb3 (diff)
downloadrabbitmq-server-18555077d5e14138c9568691367f0277b5ab0f79.tar.gz
extract correlation between consumer_count and limiter registration
...and fix a bug in the process: during consumer registration we were using the *old* cr, with the old (and usually undefined) limiter, to check whether the channel is blocked. Thus we would end up running through the message queue unnecessarily. No big deal, but certainly not what was intended. Also, make the update_ch_record return the record, which makes the function more composable.
-rw-r--r--src/rabbit_amqqueue_process.erl44
1 files changed, 21 insertions, 23 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 82ed2ec8..b66109e3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -354,7 +354,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount,
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
{0, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
- end.
+ end,
+ C.
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
@@ -368,6 +369,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
+ ok = rabbit_limiter:register(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 1});
+update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 0,
+ limiter = rabbit_limiter:make_token()});
+update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
+ update_ch_record(C#cr{consumer_count = Count + Delta}).
+
all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
@@ -405,9 +416,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
true -> sets:add_element(AckTag, ChAckTags);
false -> ChAckTags
end,
- NewC = C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1},
- update_ch_record(NewC),
+ NewC = update_ch_record(
+ C#cr{unsent_message_count = Count + 1,
+ acktags = ChAckTags1}),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -607,8 +618,7 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- NewC = Update(C),
- update_ch_record(NewC),
+ NewC = update_ch_record(Update(C)),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -962,15 +972,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ C = ch_record(ChPid),
+ C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- update_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter}),
- ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(Limiter, self());
- _ -> ok
- end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> ExistingHolder
end,
@@ -978,7 +983,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
State2 =
- case is_ch_blocked(C) of
+ case is_ch_blocked(C1) of
true -> State1#q{
blocked_consumers =
add_consumer(ChPid, Consumer,
@@ -1000,15 +1005,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount,
- limiter = Limiter} ->
- C1 = C#cr{consumer_count = ConsumerCount -1},
- update_ch_record(
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
- C1#cr{limiter = rabbit_limiter:make_token()};
- _ -> C1
- end),
+ C ->
+ update_consumer_count(C, -1),
emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =