path: root/src/rabbit_limiter.erl
diff options
authorSimon MacMullen <>2013-01-10 18:01:16 +0000
committerSimon MacMullen <>2013-01-10 18:01:16 +0000
commitd65b067aa9759edd4faab55aad4aaca53690b075 (patch)
treec4a0dfc8fa3af81e6f080fc0755643fa3faf5192 /src/rabbit_limiter.erl
parenta09fb4cb5e720029bd1be47bae67a6103d885929 (diff)
Second attempt at moving credit into the queue. This time we pretend the limiter is still doing the work.
While testing this I note that the credit calculation is crazy when testing with Proton. But it was just as bad before, so let's commit.
Diffstat (limited to 'src/rabbit_limiter.erl')
1 files changed, 114 insertions, 113 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 5f1bc07c..6e3a228b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,15 +24,15 @@
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
--export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/6, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
--record(token, {pid, enabled}).
+-record(token, {pid, enabled, q_state}).
@@ -47,8 +47,9 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/5 :: (token(), pid(), boolean(),
- rabbit_types:ctag(), non_neg_integer()) -> boolean()).
+%% TODO
+%% -spec(can_send/5 :: (token(), pid(), boolean(),
+%% rabbit_types:ctag(), non_neg_integer()) -> boolean()).
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -56,10 +57,10 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
--spec(set_credit/5 :: (token(), rabbit_types:ctag(),
- non_neg_integer(),
- non_neg_integer(), boolean()) -> 'ok').
+%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(),
+%% non_neg_integer(),
+%% non_neg_integer(), boolean()) -> 'ok').
+-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()).
@@ -67,7 +68,6 @@
-record(lim, {prefetch_count = 0,
blocked = false,
- credits = dict:new(),
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
@@ -80,7 +80,8 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+make_token(Pid) -> #token{pid = Pid, enabled = false,
+ q_state = dict:new()}.
is_enabled(#token{enabled = Enabled}) -> Enabled.
@@ -97,19 +98,22 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) ->
+can_send(#token{pid = Pid, enabled = true, q_state = QState} = Token,
+ ChPid, QPid, AckRequired, CTag, Len) ->
- fun () -> true end,
+ fun () -> {true, Token} end,
fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len},
- infinity)
+ CanLim = gen_server2:call(Pid, {can_send, QPid, AckRequired},
+ infinity),
+ {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState),
+ {CanLim andalso CanQ, Token#token{q_state = NewQState}}
-can_send(_, _, _, _, _) ->
- true.
+can_send(Token, _, _, _, _, _) ->
+ {true, Token}.
%% Let the limiter know that the channel has received some acks from a
%% consumer
-ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}).
+ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
@@ -126,12 +130,82 @@ block(Limiter) ->
unblock(Limiter) ->
maybe_call(Limiter, {unblock, Limiter}, ok).
-set_credit(Limiter, CTag, Credit, Count, Drain) ->
- maybe_call(Limiter, {set_credit, CTag, Credit, Count, Drain, Limiter}, ok).
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
+inform(Limiter = #token{q_state = Credits},
+ ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) ->
+ Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ Limiter#token{q_state = Credits2}.
+%% Queue-local code
+%% We want to do all the AMQP 1.0-ish link level credit calculations in the
+%% queue (to do them elsewhere introduces a ton of races). However, it's a big
+%% chunk of code that is conceptually very linked to the limiter concept. So
+%% we get the queue to hold a bit of state for us (#token.q_state), and
+%% maintain a fiction that the limiter is making the decisions...
+can_send_q(CTag, Len, ChPid, Credits) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{credit = 0}} -> exit(bang), {false, Credits};
+ {ok, Cred} -> Credits2 =
+ decr_credit(
+ CTag, Len, ChPid, Cred, Credits),
+ {true, Credits2};
+ _ -> {true, Credits}
+ end.
+decr_credit(CTag, Len, ChPid, Cred, Credits) ->
+ #credit{credit = Credit, count = Count, drain = Drain} = Cred,
+ {NewCredit, NewCount} =
+ case {Credit, Len, Drain} of
+ {1, _, _} -> {0, serial_add(Count, 1)};
+ {_, 1, true} -> %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
+ end,
+ update_credit(CTag, NewCredit, NewCount, Drain, Credits).
+send_drained(ChPid, CTag, Count) ->
+ rabbit_channel:send_command(ChPid,
+ #'basic.credit_state'{consumer_tag = CTag,
+ credit = 0,
+ count = Count,
+ available = 0,
+ drain = true}).
+%% Assert the credit state. The count may not match ours, in which
+%% case we must rebase the credit.
+%% TODO Edge case: if the queue has nothing in it, and drain is set,
+%% we want to send a back.
+reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) ->
+ Count =
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ count = LocalCount }} ->
+ LocalCount;
+ _ -> Count0
+ end,
+ %% Our credit may have been reduced while messages are in flight,
+ %% so we bottom out at 0.
+ Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
+ rabbit_channel:send_command(ChPid,
+ #'basic.credit_ok'{available = Len}),
+ update_credit(CTag, Credit, Count, Drain, Credits).
+%% Store the credit
+update_credit(CTag, -1, _Count, _Drain, Credits) ->
+ dict:erase(CTag, Credits);
+update_credit(CTag, Credit, Count, Drain, Credits) ->
+ dict:store(CTag, #credit{credit = Credit,
+ count = Count,
+ drain = Drain}, Credits).
%% gen_server callbacks
@@ -142,26 +216,23 @@ init([]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, QPid, _AckRequired, _CTag, _Len}, _From,
+handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, limit_queue(QPid, State)};
-handle_call({can_send, QPid, AckRequired, CTag, Len}, _From,
+handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(CTag, State) of
+ case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true,
- decr_credit(CTag, Len,
- State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end})}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
handle_call({limit, PrefetchCount, Token}, _From, State) ->
- case maybe_notify(irrelevant,
- State, State#lim{prefetch_count = PrefetchCount}) of
+ case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
{cont, State1} ->
{reply, ok, State1};
{stop, State1} ->
@@ -171,17 +242,8 @@ handle_call({limit, PrefetchCount, Token}, _From, State) ->
handle_call(block, _From, State) ->
{reply, ok, State#lim{blocked = true}};
-handle_call({set_credit, CTag, Credit, Count, Drain, Token}, _From, State) ->
- case maybe_notify(CTag, State,
- reset_credit(CTag, Credit, Count, Drain, State)) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
handle_call({unblock, Token}, _From, State) ->
- case maybe_notify(irrelevant, State, State#lim{blocked = false}) of
+ case maybe_notify(State, State#lim{blocked = false}) of
{cont, State1} ->
{reply, ok, State1};
{stop, State1} ->
@@ -197,11 +259,11 @@ handle_call({enable, Token, Channel, Volume}, _From, State) ->
handle_call({disable, Token}, _From, State) ->
{reply, Token#token{enabled = false}, State}.
-handle_cast({ack, CTag}, State = #lim{volume = Volume}) ->
+handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
- true -> Volume - 1
+ true -> Volume - Count
- {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}),
+ {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
{noreply, State1};
handle_cast({register, QPid}, State) ->
@@ -223,14 +285,13 @@ code_change(_, State, _) ->
%% Internal plumbing
-maybe_notify(CTag, OldState, NewState) ->
- case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso
- not (limit_reached(CTag, NewState) orelse blocked(NewState)) of
+maybe_notify(OldState, NewState) ->
+ case (limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (limit_reached(NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
- {case {NewState1#lim.prefetch_count,
- dict:size(NewState1#lim.credits)} of
- {0, 0} -> stop;
- _ -> cont
+ {case NewState1#lim.prefetch_count of
+ 0 -> stop;
+ _ -> cont
end, NewState1};
false -> {cont, NewState}
@@ -245,67 +306,8 @@ maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
maybe_cast(_, _Call) ->
-limit_reached(irrelevant, _) ->
- false;
-limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume,
- credits = Credits}) ->
- case dict:find(CTag, Credits) of
- {ok, #credit{ credit = 0 }} -> true;
- _ -> false
- end orelse (Limit =/= 0 andalso Volume >= Limit).
-decr_credit(CTag, Len, State = #lim{ credits = Credits,
- ch_pid = ChPid } ) ->
- case dict:find(CTag, Credits) of
- {ok, #credit{ credit = Credit, count = Count, drain = Drain }} ->
- {NewCredit, NewCount} =
- case {Credit, Len, Drain} of
- {1, _, _} -> {0, serial_add(Count, 1)};
- {_, 1, true} ->
- %% Drain, so advance til credit = 0
- NewCount0 = serial_add(Count, (Credit - 1)),
- send_drained(ChPid, CTag, NewCount0),
- {0, NewCount0}; %% Magic reduction to 0
- {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
- end,
- update_credit(CTag, NewCredit, NewCount, Drain, State);
- error ->
- State
- end.
-send_drained(ChPid, CTag, Count) ->
- rabbit_channel:send_command(ChPid,
- #'basic.credit_state'{consumer_tag = CTag,
- credit = 0,
- count = Count,
- available = 0,
- drain = true}).
-%% Assert the credit state. The count may not match ours, in which
-%% case we must rebase the credit.
-%% TODO Edge case: if the queue has nothing in it, and drain is set,
-%% we want to send a back.
-reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) ->
- Count =
- case dict:find(CTag, Credits) of
- {ok, #credit{ count = LocalCount }} ->
- LocalCount;
- _ -> Count0
- end,
- %% Our credit may have been reduced while messages are in flight,
- %% so we bottom out at 0.
- Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
- update_credit(CTag, Credit, Count, Drain, State).
-%% Store the credit
-update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) ->
- State#lim{credits = dict:erase(CTag, Credits)};
-update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) ->
- State#lim{credits = dict:store(CTag,
- #credit{credit = Credit,
- count = Count,
- drain = Drain}, Credits)}.
+limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+ Limit =/= 0 andalso Volume >= Limit.
blocked(#lim{blocked = Blocked}) -> Blocked.
@@ -347,4 +349,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
State#lim{queues = NewQueues}.