diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-25 11:57:00 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-25 11:57:00 +0000 |
commit | 4cdc47a48e9a757e216d1275f66bc7dde80162e7 (patch) | |
tree | e7ed7e0741c44c232a2999f72c8cd7246e6aa8a8 /src/rabbit_limiter.erl | |
parent | 8e940c2e95208924a55a6f437c792b7b1e2b1614 (diff) | |
parent | 423aca76adbca197fe4bbcb8ee720122bc75ece4 (diff) | |
download | rabbitmq-server-4cdc47a48e9a757e216d1275f66bc7dde80162e7.tar.gz |
merge default into bug23749
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 107 |
1 files changed, 101 insertions, 6 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a7d14fe..b97d1073 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -15,19 +15,25 @@ %% -module(rabbit_limiter). +-include("rabbit_framing.hrl"). -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). + -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4, + ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([inform/4, forget_consumer/2, copy_queue_state/2]). + +-import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(token, {pid, enabled, q_state}). -ifdef(use_specs). @@ -42,7 +48,8 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(can_cons_send/2 :: (token(), rabbit_types:ctag()) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -50,6 +57,10 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). +-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> + {[rabbit_types:ctag()], token()}). +-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()). +-spec(copy_queue_state/2 :: (token(), token()) -> token()). -endif. @@ -64,6 +75,8 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {count = 0, credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- @@ -71,7 +84,8 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +make_token(Pid) -> #token{pid = Pid, enabled = false, + q_state = dict:new()}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -88,15 +102,25 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> +can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, fun () -> gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) end); -can_send(_, _, _) -> +can_ch_send(_, _, _) -> true. +can_cons_send(#token{q_state = Credits}, CTag) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = C}} when C > 0 -> true; + {ok, #credit{}} -> false; + error -> true + end. + +record_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) -> + Token#token{q_state = record_send_q(CTag, Len, ChPid, QState)}. + %% Let the limiter know that the channel has received some acks from a %% consumer ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). @@ -119,6 +143,77 @@ unblock(Limiter) -> is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). +inform(Limiter = #token{q_state = Credits}, + ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) -> + {Unblock, Credits2} = update_credit( + CTag, Len, ChPid, Credit, Count, Drain, Credits), + case Reply of + true -> rabbit_channel:send_credit_reply(ChPid, Len); + false -> ok + end, + {Unblock, Limiter#token{q_state = Credits2}}. + +forget_consumer(Limiter = #token{q_state = Credits}, CTag) -> + Limiter#token{q_state = dict:erase(CTag, Credits)}. + +copy_queue_state(#token{q_state = Credits}, Token) -> + Token#token{q_state = Credits}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations in the +%% queue (to do them elsewhere introduces a ton of races). However, it's a big +%% chunk of code that is conceptually very linked to the limiter concept. So +%% we get the queue to hold a bit of state for us (#token.q_state), and +%% maintain a fiction that the limiter is making the decisions... + +record_send_q(CTag, Len, ChPid, Credits) -> + case dict:find(CTag, Credits) of + {ok, Cred} -> + decr_credit(CTag, Len, ChPid, Cred, Credits); + error -> + Credits + end. + +decr_credit(CTag, Len, ChPid, Cred, Credits) -> + #credit{credit = Credit, count = Count, drain = Drain} = Cred, + {NewCredit, NewCount} = maybe_drain(Len - 1, Drain, CTag, ChPid, + Credit - 1, serial_add(Count, 1)), + write_credit(CTag, NewCredit, NewCount, Drain, Credits). + +maybe_drain(0, true, CTag, ChPid, Credit, Count) -> + %% Drain, so advance til credit = 0 + NewCount = serial_add(Count, Credit - 2), + send_drained(ChPid, CTag, NewCount), + {0, NewCount}; %% Magic reduction to 0 + +maybe_drain(_, _, _, _, Credit, Count) -> + {Credit, Count}. + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_drained(ChPid, CTag, Count). + +update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> + Count = case dict:find(CTag, Credits) of + %% Use our count if we can, more accurate + {ok, #credit{ count = LocalCount }} -> LocalCount; + %% But if this is new, take it from the adapter + _ -> Count0 + end, + {NewCredit, NewCount} = maybe_drain(Len, Drain, CTag, ChPid, Credit, Count), + NewCredits = write_credit(CTag, NewCredit, NewCount, Drain, Credits), + case NewCredit > 0 of + true -> {[CTag], NewCredits}; + false -> {[], NewCredits} + end. + +write_credit(CTag, Credit, Count, Drain, Credits) -> + dict:store(CTag, #credit{credit = Credit, + count = Count, + drain = Drain}, Credits). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- |