diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-10 18:01:16 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-10 18:01:16 +0000 |
commit | d65b067aa9759edd4faab55aad4aaca53690b075 (patch) | |
tree | c4a0dfc8fa3af81e6f080fc0755643fa3faf5192 /src/rabbit_limiter.erl | |
parent | a09fb4cb5e720029bd1be47bae67a6103d885929 (diff) | |
download | rabbitmq-server-d65b067aa9759edd4faab55aad4aaca53690b075.tar.gz |
Second attempt at moving credit into the queue. This time we pretend the limiter is still doing the work.
While testing this I note that the credit calculation is crazy when testing with Proton. But it was just as bad before, so let's commit.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 227 |
1 files changed, 114 insertions, 113 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 5f1bc07c..6e3a228b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,15 +24,15 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/5, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/6, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). --export([set_credit/5]). +-export([inform/4]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(token, {pid, enabled, q_state}). -ifdef(use_specs). @@ -47,8 +47,9 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/5 :: (token(), pid(), boolean(), - rabbit_types:ctag(), non_neg_integer()) -> boolean()). +%% TODO +%% -spec(can_send/5 :: (token(), pid(), boolean(), +%% rabbit_types:ctag(), non_neg_integer()) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -56,10 +57,10 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). --spec(set_credit/5 :: (token(), rabbit_types:ctag(), - non_neg_integer(), - non_neg_integer(), boolean()) -> 'ok'). - +%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(), +%% non_neg_integer(), +%% non_neg_integer(), boolean()) -> 'ok'). +-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). -endif. %%---------------------------------------------------------------------------- @@ -67,7 +68,6 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, - credits = dict:new(), queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). @@ -80,7 +80,8 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +make_token(Pid) -> #token{pid = Pid, enabled = false, + q_state = dict:new()}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -97,19 +98,22 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) -> +can_send(#token{pid = Pid, enabled = true, q_state = QState} = Token, + ChPid, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( - fun () -> true end, + fun () -> {true, Token} end, fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len}, - infinity) + CanLim = gen_server2:call(Pid, {can_send, QPid, AckRequired}, + infinity), + {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState), + {CanLim andalso CanQ, Token#token{q_state = NewQState}} end); -can_send(_, _, _, _, _) -> - true. +can_send(Token, _, _, _, _, _) -> + {true, Token}. %% Let the limiter know that the channel has received some acks from a %% consumer -ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}). +ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). @@ -126,12 +130,82 @@ block(Limiter) -> unblock(Limiter) -> maybe_call(Limiter, {unblock, Limiter}, ok). -set_credit(Limiter, CTag, Credit, Count, Drain) -> - maybe_call(Limiter, {set_credit, CTag, Credit, Count, Drain, Limiter}, ok). - is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). +inform(Limiter = #token{q_state = Credits}, + ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) -> + Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), + Limiter#token{q_state = Credits2}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations in the +%% queue (to do them elsewhere introduces a ton of races). However, it's a big +%% chunk of code that is conceptually very linked to the limiter concept. So +%% we get the queue to hold a bit of state for us (#token.q_state), and +%% maintain a fiction that the limiter is making the decisions... + +can_send_q(CTag, Len, ChPid, Credits) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = 0}} -> exit(bang), {false, Credits}; + {ok, Cred} -> Credits2 = + decr_credit( + CTag, Len, ChPid, Cred, Credits), + {true, Credits2}; + _ -> {true, Credits} + end. + +decr_credit(CTag, Len, ChPid, Cred, Credits) -> + #credit{credit = Credit, count = Count, drain = Drain} = Cred, + {NewCredit, NewCount} = + case {Credit, Len, Drain} of + {1, _, _} -> {0, serial_add(Count, 1)}; + {_, 1, true} -> %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + end, + update_credit(CTag, NewCredit, NewCount, Drain, Credits). + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + count = Count, + available = 0, + drain = true}). + +%% Assert the credit state. The count may not match ours, in which +%% case we must rebase the credit. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) -> + Count = + case dict:find(CTag, Credits) of + {ok, #credit{ count = LocalCount }} -> + LocalCount; + _ -> Count0 + end, + %% Our credit may have been reduced while messages are in flight, + %% so we bottom out at 0. + Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), + rabbit_channel:send_command(ChPid, + #'basic.credit_ok'{available = Len}), + update_credit(CTag, Credit, Count, Drain, Credits). + +%% Store the credit +update_credit(CTag, -1, _Count, _Drain, Credits) -> + dict:erase(CTag, Credits); + +update_credit(CTag, Credit, Count, Drain, Credits) -> + dict:store(CTag, #credit{credit = Credit, + count = Count, + drain = Drain}, Credits). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -142,26 +216,23 @@ init([]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, QPid, _AckRequired, _CTag, _Len}, _From, +handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; -handle_call({can_send, QPid, AckRequired, CTag, Len}, _From, +handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> - case limit_reached(CTag, State) of + case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, - decr_credit(CTag, Len, - State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end})} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(irrelevant, - State, State#lim{prefetch_count = PrefetchCount}) of + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> @@ -171,17 +242,8 @@ handle_call({limit, PrefetchCount, Token}, _From, State) -> handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; -handle_call({set_credit, CTag, Credit, Count, Drain, Token}, _From, State) -> - case maybe_notify(CTag, State, - reset_credit(CTag, Credit, Count, Drain, State)) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - handle_call({unblock, Token}, _From, State) -> - case maybe_notify(irrelevant, State, State#lim{blocked = false}) of + case maybe_notify(State, State#lim{blocked = false}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> @@ -197,11 +259,11 @@ handle_call({enable, Token, Channel, Volume}, _From, State) -> handle_call({disable, Token}, _From, State) -> {reply, Token#token{enabled = false}, State}. -handle_cast({ack, CTag}, State = #lim{volume = Volume}) -> +handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; - true -> Volume - 1 + true -> Volume - Count end, - {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}), + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), {noreply, State1}; handle_cast({register, QPid}, State) -> @@ -223,14 +285,13 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- -maybe_notify(CTag, OldState, NewState) -> - case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso - not (limit_reached(CTag, NewState) orelse blocked(NewState)) of +maybe_notify(OldState, NewState) -> + case (limit_reached(OldState) orelse blocked(OldState)) andalso + not (limit_reached(NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case {NewState1#lim.prefetch_count, - dict:size(NewState1#lim.credits)} of - {0, 0} -> stop; - _ -> cont + {case NewState1#lim.prefetch_count of + 0 -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. @@ -245,67 +306,8 @@ maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> maybe_cast(_, _Call) -> ok. -limit_reached(irrelevant, _) -> - false; -limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, - credits = Credits}) -> - case dict:find(CTag, Credits) of - {ok, #credit{ credit = 0 }} -> true; - _ -> false - end orelse (Limit =/= 0 andalso Volume >= Limit). - -decr_credit(CTag, Len, State = #lim{ credits = Credits, - ch_pid = ChPid } ) -> - case dict:find(CTag, Credits) of - {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> - {NewCredit, NewCount} = - case {Credit, Len, Drain} of - {1, _, _} -> {0, serial_add(Count, 1)}; - {_, 1, true} -> - %% Drain, so advance til credit = 0 - NewCount0 = serial_add(Count, (Credit - 1)), - send_drained(ChPid, CTag, NewCount0), - {0, NewCount0}; %% Magic reduction to 0 - {_, _, _} -> {Credit - 1, serial_add(Count, 1)} - end, - update_credit(CTag, NewCredit, NewCount, Drain, State); - error -> - State - end. - -send_drained(ChPid, CTag, Count) -> - rabbit_channel:send_command(ChPid, - #'basic.credit_state'{consumer_tag = CTag, - credit = 0, - count = Count, - available = 0, - drain = true}). - -%% Assert the credit state. The count may not match ours, in which -%% case we must rebase the credit. -%% TODO Edge case: if the queue has nothing in it, and drain is set, -%% we want to send a basic.credit back. -reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> - Count = - case dict:find(CTag, Credits) of - {ok, #credit{ count = LocalCount }} -> - LocalCount; - _ -> Count0 - end, - %% Our credit may have been reduced while messages are in flight, - %% so we bottom out at 0. - Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), - update_credit(CTag, Credit, Count, Drain, State). - -%% Store the credit -update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> - State#lim{credits = dict:erase(CTag, Credits)}; - -update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> - State#lim{credits = dict:store(CTag, - #credit{credit = Credit, - count = Count, - drain = Drain}, Credits)}. +limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> + Limit =/= 0 andalso Volume >= Limit. blocked(#lim{blocked = Blocked}) -> Blocked. @@ -347,4 +349,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - |