From 5ab8473d3918b5cdb720889f8a4da08afc1d86e3 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Mon, 25 Jul 2011 15:01:16 +0100 Subject: start limiter before channel Also use an opaque type, a token, to invoke limiter functions. --- src/rabbit_amqqueue_process.erl | 44 +++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 21 deletions(-) (limited to 'src/rabbit_amqqueue_process.erl') diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fcd6cc24..e9949913 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,7 @@ %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, - limiter_pid, + limiter_token, monitor_ref, acktags, is_limit_active, @@ -346,10 +346,10 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, true end. -erase_ch_record(#cr{ch_pid = ChPid, - limiter_pid = LimiterPid, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(LimiterPid, self()), +erase_ch_record(#cr{ch_pid = ChPid, + limiter_token = LimiterToken, + monitor_ref = MonitorRef}) -> + ok = rabbit_limiter:unregister(LimiterToken, self()), erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. @@ -374,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> - C = #cr{limiter_pid = LimiterPid, + C = #cr{limiter_token = LimiterToken, unsent_message_count = Count, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send( LimiterToken, self(), AckRequired )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -904,7 +904,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, LimiterToken, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{exclusive_consumer = ExistingHolder}) -> case check_exclusive_access(ExistingHolder, ExclusiveConsume, @@ -915,10 +915,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record( + C#cr{consumer_count = ConsumerCount +1, + limiter_token = LimiterToken}), ok = case ConsumerCount of - 0 -> rabbit_limiter:register(LimiterPid, self()); + 0 -> rabbit_limiter:register(LimiterToken, self()); _ -> ok end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -951,12 +952,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); C = #cr{consumer_count = ConsumerCount, - limiter_pid = LimiterPid} -> + limiter_token = LimiterToken} -> C1 = C#cr{consumer_count = ConsumerCount -1}, maybe_store_ch_record( case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), - C1#cr{limiter_pid = undefined}; + 1 -> ok = rabbit_limiter:unregister(LimiterToken, self()), + C1#cr{limiter_token = undefined}; _ -> C1 end), emit_consumer_deleted(ChPid, ConsumerTag), @@ -1065,20 +1066,21 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({limit, ChPid, LimiterPid}, State) -> +handle_cast({limit, ChPid, LimiterToken}, State) -> noreply( possibly_unblock( State, ChPid, fun (C = #cr{consumer_count = ConsumerCount, - limiter_pid = OldLimiterPid, + limiter_token = OldLimiterToken, is_limit_active = Limited}) -> - if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> + if ConsumerCount =/= 0 -> + ok = rabbit_limiter:register(LimiterToken, self()); + true -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, - C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + NewLimited = Limited, + C#cr{limiter_token = LimiterToken, + is_limit_active = NewLimited} end)); handle_cast({flush, ChPid}, State) -> -- cgit v1.2.1