diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 13:13:53 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 13:13:53 +0100 |
commit | 0b7b63210106ce366c8f422d581087b9a75d835a (patch) | |
tree | 3e0e2a5bc4f5253dc0fcedb8c3c81300e1eaedec | |
parent | a900a7b383a93ad2bdd14b76101d70d73751ecca (diff) | |
download | rabbitmq-server-0b7b63210106ce366c8f422d581087b9a75d835a.tar.gz |
refactor
Most of the changes in channel are due to it being called LimiterPid.
So, we're going to call it Limiter now.
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 97 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 47 |
4 files changed, 90 insertions, 95 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 01721582..0821b121 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -119,14 +119,13 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:limiter_token()) -> +-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:limiter_token(), - rabbit_types:ctag(), boolean(), any()) + rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9949913..aa2fb0f4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,7 @@ %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, - limiter_token, + limiter, monitor_ref, acktags, is_limit_active, @@ -346,10 +346,10 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, true end. -erase_ch_record(#cr{ch_pid = ChPid, - limiter_token = LimiterToken, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(LimiterToken, self()), +erase_ch_record(#cr{ch_pid = ChPid, + limiter = Limiter, + monitor_ref = MonitorRef}) -> + ok = rabbit_limiter:unregister(Limiter, self()), erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. @@ -374,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> - C = #cr{limiter_token = LimiterToken, + C = #cr{limiter = Limiter, unsent_message_count = Count, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterToken, self(), AckRequired )) of + rabbit_limiter:can_send( Limiter, self(), AckRequired )) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -904,7 +904,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, LimiterToken, +handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{exclusive_consumer = ExistingHolder}) -> case check_exclusive_access(ExistingHolder, ExclusiveConsume, @@ -917,9 +917,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterToken, ack_required = not NoAck}, true = maybe_store_ch_record( C#cr{consumer_count = ConsumerCount +1, - limiter_token = LimiterToken}), + limiter = Limiter}), ok = case ConsumerCount of - 0 -> rabbit_limiter:register(LimiterToken, self()); + 0 -> rabbit_limiter:register(Limiter, self()); _ -> ok end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -952,12 +952,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); C = #cr{consumer_count = ConsumerCount, - limiter_token = LimiterToken} -> + limiter = Limiter} -> C1 = C#cr{consumer_count = ConsumerCount -1}, maybe_store_ch_record( case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterToken, self()), - C1#cr{limiter_token = undefined}; + 1 -> ok = rabbit_limiter:unregister(Limiter, self()), + C1#cr{limiter = undefined}; _ -> C1 end), emit_consumer_deleted(ChPid, ConsumerTag), @@ -1066,20 +1066,20 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({limit, ChPid, LimiterToken}, State) -> +handle_cast({limit, ChPid, Limiter}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter_token = OldLimiterToken, + fun (C = #cr{consumer_count = ConsumerCount, + limiter = OldLimiter, is_limit_active = Limited}) -> if ConsumerCount =/= 0 -> - ok = rabbit_limiter:register(LimiterToken, self()); + ok = rabbit_limiter:register(Limiter, self()); true -> ok end, NewLimited = Limited, - C#cr{limiter_token = LimiterToken, + C#cr{limiter = Limiter, is_limit_active = NewLimited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ae747332..629b1bb9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -30,7 +30,7 @@ prioritise_cast/2]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_token, tx_status, next_tag, + limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, @@ -71,8 +71,7 @@ -spec(start_link/10 :: (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), rabbit_limiter:limiter_token()) -> - rabbit_types:ok_pid_or_error()). + pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -99,10 +98,10 @@ %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, LimiterToken) -> + Capabilities, CollectorPid, Limiter) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, - VHost, Capabilities, CollectorPid, LimiterToken], []). + VHost, Capabilities, CollectorPid, Limiter], []). do(Pid, Method) -> do(Pid, Method, none). @@ -162,7 +161,7 @@ ready_for_close(Pid) -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, LimiterToken]) -> + Capabilities, CollectorPid, Limiter]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), @@ -172,7 +171,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, reader_pid = ReaderPid, writer_pid = WriterPid, conn_pid = ConnPid, - limiter_token = LimiterToken, + limiter = Limiter, tx_status = none, next_tag = 1, unacked_message_q = queue:new(), @@ -704,7 +703,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid, - limiter_token = LimiterToken, + limiter = Limiter, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -723,7 +722,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), LimiterToken, + Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -798,25 +797,25 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, - State = #ch{limiter_token = LimiterToken}) -> - LimiterToken1 = - case {rabbit_limiter:is_enabled(LimiterToken), PrefetchCount} of - {false, 0} -> LimiterToken; + State = #ch{limiter = Limiter}) -> + Limiter1 = + case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of + {false, 0} -> Limiter; {false, _} -> enable_limiter(State); - {_, _} -> LimiterToken + {_, _} -> Limiter end, - LimiterToken3 = case rabbit_limiter:limit(LimiterToken1, PrefetchCount) of - ok -> - LimiterToken1; - {disabled, LimiterToken2} -> - ok = limit_queues(LimiterToken2, State), - LimiterToken2 - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter_token = LimiterToken3}}; + Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of + ok -> + Limiter1; + {disabled, Limiter2} -> + ok = limit_queues(Limiter2, State), + Limiter2 + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, - limiter_token = LimiterToken}) -> + limiter = Limiter}) -> OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -830,7 +829,7 @@ handle_method(#'basic.recover_async'{requeue = true}, QPid, lists:reverse(MsgIds), self()) end) end, ok, UAMQ), - ok = notify_limiter(LimiterToken, UAMQ), + ok = notify_limiter(Limiter, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1077,22 +1076,22 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> NoWait, #'confirm.select_ok'{}); handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter_token = LimiterToken}) -> - LimiterToken2 = case rabbit_limiter:unblock(LimiterToken) of - ok -> - LimiterToken; - {disabled, LimiterToken1} -> - ok = limit_queues(LimiterToken1, State), - LimiterToken1 - end, + State = #ch{limiter = Limiter}) -> + Limiter2 = case rabbit_limiter:unblock(Limiter) of + ok -> + Limiter; + {disabled, Limiter1} -> + ok = limit_queues(Limiter1, State), + Limiter1 + end, {reply, #'channel.flow_ok'{active = true}, - State#ch{limiter_token = LimiterToken2}}; + State#ch{limiter = Limiter2}}; handle_method(#'channel.flow'{active = false}, _, State = #ch{consumer_mapping = Consumers}) -> - LimiterToken1 = enable_limiter(State), - State1 = State#ch{limiter_token = LimiterToken1}, - ok = rabbit_limiter:block(LimiterToken1), + Limiter1 = enable_limiter(State), + State1 = State#ch{limiter = Limiter1}, + ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || @@ -1222,7 +1221,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter_token, Acked), + ok = notify_limiter(State#ch.limiter, Acked), {noreply, State#ch{unacked_message_q = Remaining}}. ack_record(DeliveryTag, ConsumerTag, @@ -1259,7 +1258,7 @@ ack(Acked, State) -> [{QPid, length(MsgIds)} | L] end, [], Acked), maybe_incr_stats(QIncs, ack, State), - ok = notify_limiter(State#ch.limiter_token, Acked), + ok = notify_limiter(State#ch.limiter, Acked), State. new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), @@ -1284,10 +1283,10 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). enable_limiter(State = #ch{unacked_message_q = UAMQ, - limiter_token = LimiterToken}) -> - LimiterToken1 = rabbit_limiter:enable(LimiterToken, queue:len(UAMQ)), - ok = limit_queues(LimiterToken1, State), - LimiterToken1. + limiter = Limiter}) -> + Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)), + ok = limit_queues(Limiter1, State), + Limiter1. limit_queues(Token, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Token). @@ -1301,15 +1300,15 @@ consumer_queues(Consumers) -> %% for messages delivered to subscribed consumers, but not acks for %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) -notify_limiter(LimiterToken, Acked) -> - case rabbit_limiter:is_enabled(LimiterToken) of +notify_limiter(Limiter, Acked) -> + case rabbit_limiter:is_enabled(Limiter) of false -> ok; true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 end, 0, Acked) of 0 -> ok; - Count -> rabbit_limiter:ack(LimiterToken, Count) + Count -> rabbit_limiter:ack(Limiter, Count) end end. @@ -1448,10 +1447,10 @@ i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> queue:len(TAQ); -i(prefetch_count, #ch{limiter_token = LimiterToken}) -> - rabbit_limiter:get_limit(LimiterToken); -i(client_flow_blocked, #ch{limiter_token = LimiterToken}) -> - rabbit_limiter:is_blocked(LimiterToken); +i(prefetch_count, #ch{limiter = Limiter}) -> + rabbit_limiter:get_limit(Limiter); +i(client_flow_blocked, #ch{limiter = Limiter}) -> + rabbit_limiter:is_blocked(Limiter); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 7c01ba96..315784ab 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -21,24 +21,24 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). -export([start_link/0, make_new_token/1, is_enabled/1, enable/2, disable/1]). --export_type([limiter_token/0]). +-export_type([token/0]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --record(limiter_token, {pid, enabled = false}). +-record(token, {pid, enabled = false}). -ifdef(use_specs). --type(maybe_token() :: limiter_token() | 'undefined'). --opaque(limiter_token() :: #'limiter_token'{}). +-type(maybe_token() :: token() | 'undefined'). +-opaque(token() :: #token{}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(make_new_token/1 :: (pid()) -> limiter_token()). --spec(is_enabled/1 :: (limiter_token()) -> boolean()). --spec(enable/2 :: (limiter_token(), non_neg_integer()) -> limiter_token()). --spec(disable/1 :: (limiter_token()) -> limiter_token()). +-spec(make_new_token/1 :: (pid()) -> token()). +-spec(is_enabled/1 :: (token()) -> boolean()). +-spec(enable/2 :: (token(), non_neg_integer()) -> token()). +-spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (maybe_token(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_token(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_token(), non_neg_integer()) -> 'ok'). @@ -66,19 +66,16 @@ %% API %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link(?MODULE, [], []). +start_link() -> gen_server2:start_link(?MODULE, [], []). -make_new_token(Pid) -> - #limiter_token{pid = Pid}. +make_new_token(Pid) -> #token{pid = Pid}. -is_enabled(#limiter_token{enabled = Enabled}) -> - Enabled. +is_enabled(#token{enabled = Enabled}) -> Enabled. -enable(#limiter_token{pid = Pid} = Token, Volume) -> +enable(#token{pid = Pid} = Token, Volume) -> gen_server2:call(Pid, {enable, Token, self(), Volume}). -disable(#limiter_token{pid = Pid} = Token) -> +disable(#token{pid = Pid} = Token) -> gen_server2:call(Pid, {disable, Token}). limit(LimiterToken, PrefetchCount) -> @@ -88,7 +85,7 @@ limit(LimiterToken, PrefetchCount) -> %% breaching a limit can_send(undefined, _QPid, _AckRequired) -> true; -can_send(#limiter_token{enabled = false}, _QPid, _AckRequired) -> +can_send(#token{enabled = false}, _QPid, _AckRequired) -> true; can_send(LimiterToken, QPid, AckRequired) -> rabbit_misc:with_exit_handler( @@ -150,7 +147,7 @@ handle_call({limit, PrefetchCount, Token}, _From, State) -> {cont, State1} -> {reply, ok, State1}; {stop, State1} -> - {reply, {disabled, Token#limiter_token{enabled = false}}, State1} + {reply, {disabled, Token#token{enabled = false}}, State1} end; handle_call(block, _From, State) -> @@ -161,17 +158,17 @@ handle_call({unblock, Token}, _From, State) -> {cont, State1} -> {reply, ok, State1}; {stop, State1} -> - {reply, {disabled, Token#limiter_token{enabled = false}}, State1} + {reply, {disabled, Token#token{enabled = false}}, State1} end; handle_call(is_blocked, _From, State) -> {reply, blocked(State), State}; handle_call({enable, Token, Channel, Volume}, _From, State) -> - {reply, Token#limiter_token{enabled = true}, + {reply, Token#token{enabled = true}, State#lim{ch_pid = Channel, volume = Volume}}; handle_call({disable, Token}, _From, State) -> - {reply, Token#limiter_token{enabled = false}, State}. + {reply, Token#token{enabled = false}, State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -212,16 +209,16 @@ maybe_notify(OldState, NewState) -> maybe_call(undefined, _Call, Default) -> Default; -maybe_call(#limiter_token{enabled = false}, _Call, Default) -> +maybe_call(#token{enabled = false}, _Call, Default) -> Default; -maybe_call(#limiter_token{pid = Pid}, Call, _Default) -> +maybe_call(#token{pid = Pid}, Call, _Default) -> gen_server2:call(Pid, Call, infinity). maybe_cast(undefined, _Call) -> ok; -maybe_cast(#limiter_token{enabled = false}, _Cast) -> +maybe_cast(#token{enabled = false}, _Cast) -> ok; -maybe_cast(#limiter_token{pid = Pid}, Cast) -> +maybe_cast(#token{pid = Pid}, Cast) -> gen_server2:cast(Pid, Cast). limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> |