summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-25 15:01:16 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-25 15:01:16 +0100
commit5ab8473d3918b5cdb720889f8a4da08afc1d86e3 (patch)
treef8a521d2bb4aeec17688b49a211e2d4a7aa555b4 /src/rabbit_amqqueue_process.erl
parentf37aed126d810e623b41f6a0eade002eec4c3b16 (diff)
downloadrabbitmq-server-5ab8473d3918b5cdb720889f8a4da08afc1d86e3.tar.gz
start limiter before channel
Also use an opaque type, a token, to invoke limiter functions.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl44
1 files changed, 23 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fcd6cc24..e9949913 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -58,7 +58,7 @@
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
- limiter_pid,
+ limiter_token,
monitor_ref,
acktags,
is_limit_active,
@@ -346,10 +346,10 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
true
end.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter_pid = LimiterPid,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(LimiterPid, self()),
+erase_ch_record(#cr{ch_pid = ChPid,
+ limiter_token = LimiterToken,
+ monitor_ref = MonitorRef}) ->
+ ok = rabbit_limiter:unregister(LimiterToken, self()),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
@@ -374,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
ActiveConsumersTail} ->
- C = #cr{limiter_pid = LimiterPid,
+ C = #cr{limiter_token = LimiterToken,
unsent_message_count = Count,
acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
- rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
+ rabbit_limiter:can_send( LimiterToken, self(), AckRequired )) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
@@ -904,7 +904,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, LimiterToken,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{exclusive_consumer = ExistingHolder}) ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume,
@@ -915,10 +915,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
+ true = maybe_store_ch_record(
+ C#cr{consumer_count = ConsumerCount +1,
+ limiter_token = LimiterToken}),
ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(LimiterPid, self());
+ 0 -> rabbit_limiter:register(LimiterToken, self());
_ -> ok
end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -951,12 +952,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
C = #cr{consumer_count = ConsumerCount,
- limiter_pid = LimiterPid} ->
+ limiter_token = LimiterToken} ->
C1 = C#cr{consumer_count = ConsumerCount -1},
maybe_store_ch_record(
case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()),
- C1#cr{limiter_pid = undefined};
+ 1 -> ok = rabbit_limiter:unregister(LimiterToken, self()),
+ C1#cr{limiter_token = undefined};
_ -> C1
end),
emit_consumer_deleted(ChPid, ConsumerTag),
@@ -1065,20 +1066,21 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({limit, ChPid, LimiterPid}, State) ->
+handle_cast({limit, ChPid, LimiterToken}, State) ->
noreply(
possibly_unblock(
State, ChPid,
fun (C = #cr{consumer_count = ConsumerCount,
- limiter_pid = OldLimiterPid,
+ limiter_token = OldLimiterToken,
is_limit_active = Limited}) ->
- if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
- ok = rabbit_limiter:register(LimiterPid, self());
- true ->
+ if ConsumerCount =/= 0 ->
+ ok = rabbit_limiter:register(LimiterToken, self());
+ true ->
ok
end,
- NewLimited = Limited andalso LimiterPid =/= undefined,
- C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ NewLimited = Limited,
+ C#cr{limiter_token = LimiterToken,
+ is_limit_active = NewLimited}
end));
handle_cast({flush, ChPid}, State) ->