diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 13:21:18 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 13:21:18 +0100 |
commit | 08aff2a628cb92a891e7e600d5420e207595be8e (patch) | |
tree | b476a3655723cf4d5e321160eb409ed296e221d8 | |
parent | 0b7b63210106ce366c8f422d581087b9a75d835a (diff) | |
download | rabbitmq-server-08aff2a628cb92a891e7e600d5420e207595be8e.tar.gz |
finish refactoring
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 8 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 31 |
3 files changed, 21 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0821b121..f44f5fec 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -440,10 +440,10 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). -limit_all(QPids, ChPid, LimiterToken) -> +limit_all(QPids, ChPid, Limiter) -> delegate:invoke_no_result( QPids, fun (QPid) -> - gen_server2:cast(QPid, {limit, ChPid, LimiterToken}) + gen_server2:cast(QPid, {limit, ChPid, Limiter}) end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9868cfe0..27e0b743 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -54,27 +54,27 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax, Protocol, ReaderPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), - LimiterToken = start_limiter(SupPid), + Limiter = start_limiter(SupPid), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, - User, VHost, Capabilities, Collector, LimiterToken]}, + User, VHost, Capabilities, Collector, Limiter]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), - LimiterToken = start_limiter(SupPid), + Limiter = start_limiter(SupPid), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector, - LimiterToken]}, + Limiter]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 315784ab..bece8090 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -78,8 +78,8 @@ enable(#token{pid = Pid} = Token, Volume) -> disable(#token{pid = Pid} = Token) -> gen_server2:call(Pid, {disable, Token}). -limit(LimiterToken, PrefetchCount) -> - maybe_call(LimiterToken, {limit, PrefetchCount, LimiterToken}, ok). +limit(Limiter, PrefetchCount) -> + maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -87,35 +87,34 @@ can_send(undefined, _QPid, _AckRequired) -> true; can_send(#token{enabled = false}, _QPid, _AckRequired) -> true; -can_send(LimiterToken, QPid, AckRequired) -> +can_send(Limiter, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, fun () -> - maybe_call(LimiterToken, {can_send, QPid, AckRequired}, ok) + maybe_call(Limiter, {can_send, QPid, AckRequired}, ok) end). %% Let the limiter know that the channel has received some acks from a %% consumer -ack(LimiterToken, Count) -> maybe_cast(LimiterToken, {ack, Count}). +ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). -register(LimiterToken, QPid) -> maybe_cast(LimiterToken, {register, QPid}). +register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). -unregister(LimiterToken, QPid) -> maybe_cast(LimiterToken, - {unregister, QPid}). +unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). -get_limit(LimiterToken) -> +get_limit(Limiter) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> maybe_call(LimiterToken, get_limit, ok) end). + fun () -> maybe_call(Limiter, get_limit, ok) end). -block(LimiterToken) -> - maybe_call(LimiterToken, block, ok). +block(Limiter) -> + maybe_call(Limiter, block, ok). -unblock(LimiterToken) -> - maybe_call(LimiterToken, {unblock, LimiterToken}, ok). +unblock(Limiter) -> + maybe_call(Limiter, {unblock, Limiter}, ok). -is_blocked(LimiterToken) -> - maybe_call(LimiterToken, is_blocked, false). +is_blocked(Limiter) -> + maybe_call(Limiter, is_blocked, false). %%---------------------------------------------------------------------------- %% gen_server callbacks |