summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-26 13:13:53 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-07-26 13:13:53 +0100
commit0b7b63210106ce366c8f422d581087b9a75d835a (patch)
tree3e0e2a5bc4f5253dc0fcedb8c3c81300e1eaedec
parenta900a7b383a93ad2bdd14b76101d70d73751ecca (diff)
downloadrabbitmq-server-0b7b63210106ce366c8f422d581087b9a75d835a.tar.gz
refactor
Most of the changes in channel are due to it being called LimiterPid. So, we're going to call it Limiter now.
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl36
-rw-r--r--src/rabbit_channel.erl97
-rw-r--r--src/rabbit_limiter.erl47
4 files changed, 90 insertions, 95 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 01721582..0821b121 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -119,14 +119,13 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:limiter_token()) ->
+-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
(rabbit_types:amqqueue(), boolean(), pid(),
- rabbit_limiter:limiter_token(),
- rabbit_types:ctag(), boolean(), any())
+ rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9949913..aa2fb0f4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -58,7 +58,7 @@
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
- limiter_token,
+ limiter,
monitor_ref,
acktags,
is_limit_active,
@@ -346,10 +346,10 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
true
end.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter_token = LimiterToken,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(LimiterToken, self()),
+erase_ch_record(#cr{ch_pid = ChPid,
+ limiter = Limiter,
+ monitor_ref = MonitorRef}) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
@@ -374,12 +374,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
ActiveConsumersTail} ->
- C = #cr{limiter_token = LimiterToken,
+ C = #cr{limiter = Limiter,
unsent_message_count = Count,
acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
- rabbit_limiter:can_send( LimiterToken, self(), AckRequired )) of
+ rabbit_limiter:can_send( Limiter, self(), AckRequired )) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
@@ -904,7 +904,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, Remaining, Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, LimiterToken,
+handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{exclusive_consumer = ExistingHolder}) ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume,
@@ -917,9 +917,9 @@ handle_call({basic_consume, NoAck, ChPid, LimiterToken,
ack_required = not NoAck},
true = maybe_store_ch_record(
C#cr{consumer_count = ConsumerCount +1,
- limiter_token = LimiterToken}),
+ limiter = Limiter}),
ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(LimiterToken, self());
+ 0 -> rabbit_limiter:register(Limiter, self());
_ -> ok
end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -952,12 +952,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
C = #cr{consumer_count = ConsumerCount,
- limiter_token = LimiterToken} ->
+ limiter = Limiter} ->
C1 = C#cr{consumer_count = ConsumerCount -1},
maybe_store_ch_record(
case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(LimiterToken, self()),
- C1#cr{limiter_token = undefined};
+ 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
+ C1#cr{limiter = undefined};
_ -> C1
end),
emit_consumer_deleted(ChPid, ConsumerTag),
@@ -1066,20 +1066,20 @@ handle_cast({notify_sent, ChPid}, State) ->
C#cr{unsent_message_count = Count - 1}
end));
-handle_cast({limit, ChPid, LimiterToken}, State) ->
+handle_cast({limit, ChPid, Limiter}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter_token = OldLimiterToken,
+ fun (C = #cr{consumer_count = ConsumerCount,
+ limiter = OldLimiter,
is_limit_active = Limited}) ->
if ConsumerCount =/= 0 ->
- ok = rabbit_limiter:register(LimiterToken, self());
+ ok = rabbit_limiter:register(Limiter, self());
true ->
ok
end,
NewLimited = Limited,
- C#cr{limiter_token = LimiterToken,
+ C#cr{limiter = Limiter,
is_limit_active = NewLimited}
end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ae747332..629b1bb9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -30,7 +30,7 @@
prioritise_cast/2]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_token, tx_status, next_tag,
+ limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
@@ -71,8 +71,7 @@
-spec(start_link/10 ::
(channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), rabbit_limiter:limiter_token()) ->
- rabbit_types:ok_pid_or_error()).
+ pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -99,10 +98,10 @@
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, LimiterToken) ->
+ Capabilities, CollectorPid, Limiter) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
- VHost, Capabilities, CollectorPid, LimiterToken], []).
+ VHost, Capabilities, CollectorPid, Limiter], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -162,7 +161,7 @@ ready_for_close(Pid) ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, LimiterToken]) ->
+ Capabilities, CollectorPid, Limiter]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
@@ -172,7 +171,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
reader_pid = ReaderPid,
writer_pid = WriterPid,
conn_pid = ConnPid,
- limiter_token = LimiterToken,
+ limiter = Limiter,
tx_status = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -704,7 +703,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid,
- limiter_token = LimiterToken,
+ limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -723,7 +722,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterToken,
+ Q, NoAck, self(), Limiter,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -798,25 +797,25 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
- State = #ch{limiter_token = LimiterToken}) ->
- LimiterToken1 =
- case {rabbit_limiter:is_enabled(LimiterToken), PrefetchCount} of
- {false, 0} -> LimiterToken;
+ State = #ch{limiter = Limiter}) ->
+ Limiter1 =
+ case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
+ {false, 0} -> Limiter;
{false, _} -> enable_limiter(State);
- {_, _} -> LimiterToken
+ {_, _} -> Limiter
end,
- LimiterToken3 = case rabbit_limiter:limit(LimiterToken1, PrefetchCount) of
- ok ->
- LimiterToken1;
- {disabled, LimiterToken2} ->
- ok = limit_queues(LimiterToken2, State),
- LimiterToken2
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter_token = LimiterToken3}};
+ Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
+ ok ->
+ Limiter1;
+ {disabled, Limiter2} ->
+ ok = limit_queues(Limiter2, State),
+ Limiter2
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
- limiter_token = LimiterToken}) ->
+ limiter = Limiter}) ->
OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -830,7 +829,7 @@ handle_method(#'basic.recover_async'{requeue = true},
QPid, lists:reverse(MsgIds), self())
end)
end, ok, UAMQ),
- ok = notify_limiter(LimiterToken, UAMQ),
+ ok = notify_limiter(Limiter, UAMQ),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1077,22 +1076,22 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
NoWait, #'confirm.select_ok'{});
handle_method(#'channel.flow'{active = true}, _,
- State = #ch{limiter_token = LimiterToken}) ->
- LimiterToken2 = case rabbit_limiter:unblock(LimiterToken) of
- ok ->
- LimiterToken;
- {disabled, LimiterToken1} ->
- ok = limit_queues(LimiterToken1, State),
- LimiterToken1
- end,
+ State = #ch{limiter = Limiter}) ->
+ Limiter2 = case rabbit_limiter:unblock(Limiter) of
+ ok ->
+ Limiter;
+ {disabled, Limiter1} ->
+ ok = limit_queues(Limiter1, State),
+ Limiter1
+ end,
{reply, #'channel.flow_ok'{active = true},
- State#ch{limiter_token = LimiterToken2}};
+ State#ch{limiter = Limiter2}};
handle_method(#'channel.flow'{active = false}, _,
State = #ch{consumer_mapping = Consumers}) ->
- LimiterToken1 = enable_limiter(State),
- State1 = State#ch{limiter_token = LimiterToken1},
- ok = rabbit_limiter:block(LimiterToken1),
+ Limiter1 = enable_limiter(State),
+ State1 = State#ch{limiter = Limiter1},
+ ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
@@ -1222,7 +1221,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_token, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked),
{noreply, State#ch{unacked_message_q = Remaining}}.
ack_record(DeliveryTag, ConsumerTag,
@@ -1259,7 +1258,7 @@ ack(Acked, State) ->
[{QPid, length(MsgIds)} | L]
end, [], Acked),
maybe_incr_stats(QIncs, ack, State),
- ok = notify_limiter(State#ch.limiter_token, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked),
State.
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
@@ -1284,10 +1283,10 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
- limiter_token = LimiterToken}) ->
- LimiterToken1 = rabbit_limiter:enable(LimiterToken, queue:len(UAMQ)),
- ok = limit_queues(LimiterToken1, State),
- LimiterToken1.
+ limiter = Limiter}) ->
+ Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
+ ok = limit_queues(Limiter1, State),
+ Limiter1.
limit_queues(Token, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Token).
@@ -1301,15 +1300,15 @@ consumer_queues(Consumers) ->
%% for messages delivered to subscribed consumers, but not acks for
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
-notify_limiter(LimiterToken, Acked) ->
- case rabbit_limiter:is_enabled(LimiterToken) of
+notify_limiter(Limiter, Acked) ->
+ case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
true ->
case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
end, 0, Acked) of
0 -> ok;
- Count -> rabbit_limiter:ack(LimiterToken, Count)
+ Count -> rabbit_limiter:ack(Limiter, Count)
end
end.
@@ -1448,10 +1447,10 @@ i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
queue:len(TMQ);
i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
queue:len(TAQ);
-i(prefetch_count, #ch{limiter_token = LimiterToken}) ->
- rabbit_limiter:get_limit(LimiterToken);
-i(client_flow_blocked, #ch{limiter_token = LimiterToken}) ->
- rabbit_limiter:is_blocked(LimiterToken);
+i(prefetch_count, #ch{limiter = Limiter}) ->
+ rabbit_limiter:get_limit(Limiter);
+i(client_flow_blocked, #ch{limiter = Limiter}) ->
+ rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 7c01ba96..315784ab 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -21,24 +21,24 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
-export([start_link/0, make_new_token/1, is_enabled/1, enable/2, disable/1]).
--export_type([limiter_token/0]).
+-export_type([token/0]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
--record(limiter_token, {pid, enabled = false}).
+-record(token, {pid, enabled = false}).
-ifdef(use_specs).
--type(maybe_token() :: limiter_token() | 'undefined').
--opaque(limiter_token() :: #'limiter_token'{}).
+-type(maybe_token() :: token() | 'undefined').
+-opaque(token() :: #token{}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(make_new_token/1 :: (pid()) -> limiter_token()).
--spec(is_enabled/1 :: (limiter_token()) -> boolean()).
--spec(enable/2 :: (limiter_token(), non_neg_integer()) -> limiter_token()).
--spec(disable/1 :: (limiter_token()) -> limiter_token()).
+-spec(make_new_token/1 :: (pid()) -> token()).
+-spec(is_enabled/1 :: (token()) -> boolean()).
+-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
+-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (maybe_token(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_token(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_token(), non_neg_integer()) -> 'ok').
@@ -66,19 +66,16 @@
%% API
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link(?MODULE, [], []).
+start_link() -> gen_server2:start_link(?MODULE, [], []).
-make_new_token(Pid) ->
- #limiter_token{pid = Pid}.
+make_new_token(Pid) -> #token{pid = Pid}.
-is_enabled(#limiter_token{enabled = Enabled}) ->
- Enabled.
+is_enabled(#token{enabled = Enabled}) -> Enabled.
-enable(#limiter_token{pid = Pid} = Token, Volume) ->
+enable(#token{pid = Pid} = Token, Volume) ->
gen_server2:call(Pid, {enable, Token, self(), Volume}).
-disable(#limiter_token{pid = Pid} = Token) ->
+disable(#token{pid = Pid} = Token) ->
gen_server2:call(Pid, {disable, Token}).
limit(LimiterToken, PrefetchCount) ->
@@ -88,7 +85,7 @@ limit(LimiterToken, PrefetchCount) ->
%% breaching a limit
can_send(undefined, _QPid, _AckRequired) ->
true;
-can_send(#limiter_token{enabled = false}, _QPid, _AckRequired) ->
+can_send(#token{enabled = false}, _QPid, _AckRequired) ->
true;
can_send(LimiterToken, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
@@ -150,7 +147,7 @@ handle_call({limit, PrefetchCount, Token}, _From, State) ->
{cont, State1} ->
{reply, ok, State1};
{stop, State1} ->
- {reply, {disabled, Token#limiter_token{enabled = false}}, State1}
+ {reply, {disabled, Token#token{enabled = false}}, State1}
end;
handle_call(block, _From, State) ->
@@ -161,17 +158,17 @@ handle_call({unblock, Token}, _From, State) ->
{cont, State1} ->
{reply, ok, State1};
{stop, State1} ->
- {reply, {disabled, Token#limiter_token{enabled = false}}, State1}
+ {reply, {disabled, Token#token{enabled = false}}, State1}
end;
handle_call(is_blocked, _From, State) ->
{reply, blocked(State), State};
handle_call({enable, Token, Channel, Volume}, _From, State) ->
- {reply, Token#limiter_token{enabled = true},
+ {reply, Token#token{enabled = true},
State#lim{ch_pid = Channel, volume = Volume}};
handle_call({disable, Token}, _From, State) ->
- {reply, Token#limiter_token{enabled = false}, State}.
+ {reply, Token#token{enabled = false}, State}.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
@@ -212,16 +209,16 @@ maybe_notify(OldState, NewState) ->
maybe_call(undefined, _Call, Default) ->
Default;
-maybe_call(#limiter_token{enabled = false}, _Call, Default) ->
+maybe_call(#token{enabled = false}, _Call, Default) ->
Default;
-maybe_call(#limiter_token{pid = Pid}, Call, _Default) ->
+maybe_call(#token{pid = Pid}, Call, _Default) ->
gen_server2:call(Pid, Call, infinity).
maybe_cast(undefined, _Call) ->
ok;
-maybe_cast(#limiter_token{enabled = false}, _Cast) ->
+maybe_cast(#token{enabled = false}, _Cast) ->
ok;
-maybe_cast(#limiter_token{pid = Pid}, Cast) ->
+maybe_cast(#token{pid = Pid}, Cast) ->
gen_server2:cast(Pid, Cast).
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->