diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-10 18:01:16 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-10 18:01:16 +0000 |
commit | d65b067aa9759edd4faab55aad4aaca53690b075 (patch) | |
tree | c4a0dfc8fa3af81e6f080fc0755643fa3faf5192 /src | |
parent | a09fb4cb5e720029bd1be47bae67a6103d885929 (diff) | |
download | rabbitmq-server-d65b067aa9759edd4faab55aad4aaca53690b075.tar.gz |
Second attempt at moving credit into the queue. This time we pretend the limiter is still doing the work.
While testing this I note that the credit calculation is crazy when testing with Proton. But it was just as bad before, so let's commit.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 42 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 227 |
4 files changed, 155 insertions, 149 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 94150f1c..a337c722 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,6 +32,7 @@ -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). +-export([inform_limiter/3]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -175,6 +176,7 @@ -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok'). -endif. @@ -604,6 +606,9 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +inform_limiter(ChPid, QPid, Msg) -> + delegate:cast(QPid, {inform_limiter, ChPid, Msg}). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 87b93d17..2ec54c7b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -430,19 +430,25 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required, - Consumer#consumer.tag, - BQ:len(BQS)) of - false -> block_consumer(C#cr{is_limit_active = true}, E), + false -> #cr{limiter = Limiter, ch_pid = ChPid} = C, + {CanSend, Lim2} = + rabbit_limiter:can_send( + Limiter, ChPid, self(), Consumer#consumer.ack_required, + Consumer#consumer.tag, BQ:len(BQS)), + case CanSend of + false -> block_consumer(C#cr{is_limit_active = true, + limiter = Lim2}, E), {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), + true -> update_ch_record(C#cr{limiter = Lim2}), %%[0] + AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( DeliverFun, Consumer, C, State#q{active_consumers = AC1}) end end. +%% [0] TODO is this a hotspot in the case where the limiter has not changed? + deliver_msg_to_consumer(DeliverFun, #consumer{tag = ConsumerTag, ack_required = AckRequired}, @@ -1250,7 +1256,9 @@ handle_cast({limit, ChPid, Limiter}, State) -> false -> ok end, Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} + C#cr{limiter = rabbit_limiter:copy_queue_state( + OldLimiter, Limiter), + is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> @@ -1308,6 +1316,14 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({inform_limiter, ChPid, Msg}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + C = #cr{limiter = Limiter} = ch_record(ChPid), + Limiter2 = rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), + update_ch_record(C#cr{limiter = Limiter2}), + noreply(State); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a8df19ff..6d00fdb2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1080,35 +1080,19 @@ handle_method(#'channel.flow'{active = false}, _, end; handle_method(#'basic.credit'{consumer_tag = CTag, - credit = Credit, - count = Count, - drain = Drain}, _, - State = #ch{limiter = Limiter, - consumer_mapping = Consumers}) -> - %% We get Available first because it's likely that as soon as we set - %% the credit msgs will get consumed and it'll be out of date. Why do we - %% want that? Because at least then it's consistent with the credit value - %% we return. And Available is always going to be racy. - Available = case dict:find(CTag, Consumers) of - {ok, Q} -> case rabbit_amqqueue:stat(Q) of - {ok, Len, _} -> Len; - _ -> -1 - end; - error -> -1 %% TODO these -1s smell very iffy! - end, - Limiter1 = case rabbit_limiter:is_enabled(Limiter) of - true -> Limiter; - false -> enable_limiter(State) - end, - Limiter3 = - case rabbit_limiter:set_credit( - Limiter1, CTag, Credit, Count, Drain) of - ok -> Limiter1; - {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), - Limiter2 - end, - State1 = State#ch{limiter = Limiter3}, - return_ok(State1, false, #'basic.credit_ok'{available = Available}); + credit = Credit, + count = Count, + drain = Drain} = M, _, + State = #ch{consumer_mapping = Consumers}) -> + %%io:format(" ~p~n", [M]), + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( + self(), Q#amqqueue.pid, + {basic_credit, CTag, Credit, Count, Drain}), + {noreply, State}; + error -> rabbit_misc:protocol_error( + not_allowed, "unknown consumer tag '~s'", [CTag]) + end; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 5f1bc07c..6e3a228b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,15 +24,15 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/5, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/6, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). --export([set_credit/5]). +-export([inform/4]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(token, {pid, enabled, q_state}). -ifdef(use_specs). @@ -47,8 +47,9 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/5 :: (token(), pid(), boolean(), - rabbit_types:ctag(), non_neg_integer()) -> boolean()). +%% TODO +%% -spec(can_send/5 :: (token(), pid(), boolean(), +%% rabbit_types:ctag(), non_neg_integer()) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -56,10 +57,10 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). --spec(set_credit/5 :: (token(), rabbit_types:ctag(), - non_neg_integer(), - non_neg_integer(), boolean()) -> 'ok'). - +%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(), +%% non_neg_integer(), +%% non_neg_integer(), boolean()) -> 'ok'). +-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). -endif. %%---------------------------------------------------------------------------- @@ -67,7 +68,6 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, - credits = dict:new(), queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). @@ -80,7 +80,8 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +make_token(Pid) -> #token{pid = Pid, enabled = false, + q_state = dict:new()}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -97,19 +98,22 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) -> +can_send(#token{pid = Pid, enabled = true, q_state = QState} = Token, + ChPid, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( - fun () -> true end, + fun () -> {true, Token} end, fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len}, - infinity) + CanLim = gen_server2:call(Pid, {can_send, QPid, AckRequired}, + infinity), + {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState), + {CanLim andalso CanQ, Token#token{q_state = NewQState}} end); -can_send(_, _, _, _, _) -> - true. +can_send(Token, _, _, _, _, _) -> + {true, Token}. %% Let the limiter know that the channel has received some acks from a %% consumer -ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}). +ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). @@ -126,12 +130,82 @@ block(Limiter) -> unblock(Limiter) -> maybe_call(Limiter, {unblock, Limiter}, ok). -set_credit(Limiter, CTag, Credit, Count, Drain) -> - maybe_call(Limiter, {set_credit, CTag, Credit, Count, Drain, Limiter}, ok). - is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). +inform(Limiter = #token{q_state = Credits}, + ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) -> + Credits2 = reset_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), + Limiter#token{q_state = Credits2}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations in the +%% queue (to do them elsewhere introduces a ton of races). However, it's a big +%% chunk of code that is conceptually very linked to the limiter concept. So +%% we get the queue to hold a bit of state for us (#token.q_state), and +%% maintain a fiction that the limiter is making the decisions... + +can_send_q(CTag, Len, ChPid, Credits) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = 0}} -> exit(bang), {false, Credits}; + {ok, Cred} -> Credits2 = + decr_credit( + CTag, Len, ChPid, Cred, Credits), + {true, Credits2}; + _ -> {true, Credits} + end. + +decr_credit(CTag, Len, ChPid, Cred, Credits) -> + #credit{credit = Credit, count = Count, drain = Drain} = Cred, + {NewCredit, NewCount} = + case {Credit, Len, Drain} of + {1, _, _} -> {0, serial_add(Count, 1)}; + {_, 1, true} -> %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + end, + update_credit(CTag, NewCredit, NewCount, Drain, Credits). + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + count = Count, + available = 0, + drain = true}). + +%% Assert the credit state. The count may not match ours, in which +%% case we must rebase the credit. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +reset_credit(CTag, Len, ChPid, Credit0, Count0, Drain, Credits) -> + Count = + case dict:find(CTag, Credits) of + {ok, #credit{ count = LocalCount }} -> + LocalCount; + _ -> Count0 + end, + %% Our credit may have been reduced while messages are in flight, + %% so we bottom out at 0. + Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), + rabbit_channel:send_command(ChPid, + #'basic.credit_ok'{available = Len}), + update_credit(CTag, Credit, Count, Drain, Credits). + +%% Store the credit +update_credit(CTag, -1, _Count, _Drain, Credits) -> + dict:erase(CTag, Credits); + +update_credit(CTag, Credit, Count, Drain, Credits) -> + dict:store(CTag, #credit{credit = Credit, + count = Count, + drain = Drain}, Credits). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -142,26 +216,23 @@ init([]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, QPid, _AckRequired, _CTag, _Len}, _From, +handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; -handle_call({can_send, QPid, AckRequired, CTag, Len}, _From, +handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> - case limit_reached(CTag, State) of + case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, - decr_credit(CTag, Len, - State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end})} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(irrelevant, - State, State#lim{prefetch_count = PrefetchCount}) of + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> @@ -171,17 +242,8 @@ handle_call({limit, PrefetchCount, Token}, _From, State) -> handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; -handle_call({set_credit, CTag, Credit, Count, Drain, Token}, _From, State) -> - case maybe_notify(CTag, State, - reset_credit(CTag, Credit, Count, Drain, State)) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - handle_call({unblock, Token}, _From, State) -> - case maybe_notify(irrelevant, State, State#lim{blocked = false}) of + case maybe_notify(State, State#lim{blocked = false}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> @@ -197,11 +259,11 @@ handle_call({enable, Token, Channel, Volume}, _From, State) -> handle_call({disable, Token}, _From, State) -> {reply, Token#token{enabled = false}, State}. -handle_cast({ack, CTag}, State = #lim{volume = Volume}) -> +handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; - true -> Volume - 1 + true -> Volume - Count end, - {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}), + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), {noreply, State1}; handle_cast({register, QPid}, State) -> @@ -223,14 +285,13 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- -maybe_notify(CTag, OldState, NewState) -> - case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso - not (limit_reached(CTag, NewState) orelse blocked(NewState)) of +maybe_notify(OldState, NewState) -> + case (limit_reached(OldState) orelse blocked(OldState)) andalso + not (limit_reached(NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case {NewState1#lim.prefetch_count, - dict:size(NewState1#lim.credits)} of - {0, 0} -> stop; - _ -> cont + {case NewState1#lim.prefetch_count of + 0 -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. @@ -245,67 +306,8 @@ maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> maybe_cast(_, _Call) -> ok. -limit_reached(irrelevant, _) -> - false; -limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, - credits = Credits}) -> - case dict:find(CTag, Credits) of - {ok, #credit{ credit = 0 }} -> true; - _ -> false - end orelse (Limit =/= 0 andalso Volume >= Limit). - -decr_credit(CTag, Len, State = #lim{ credits = Credits, - ch_pid = ChPid } ) -> - case dict:find(CTag, Credits) of - {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> - {NewCredit, NewCount} = - case {Credit, Len, Drain} of - {1, _, _} -> {0, serial_add(Count, 1)}; - {_, 1, true} -> - %% Drain, so advance til credit = 0 - NewCount0 = serial_add(Count, (Credit - 1)), - send_drained(ChPid, CTag, NewCount0), - {0, NewCount0}; %% Magic reduction to 0 - {_, _, _} -> {Credit - 1, serial_add(Count, 1)} - end, - update_credit(CTag, NewCredit, NewCount, Drain, State); - error -> - State - end. - -send_drained(ChPid, CTag, Count) -> - rabbit_channel:send_command(ChPid, - #'basic.credit_state'{consumer_tag = CTag, - credit = 0, - count = Count, - available = 0, - drain = true}). - -%% Assert the credit state. The count may not match ours, in which -%% case we must rebase the credit. -%% TODO Edge case: if the queue has nothing in it, and drain is set, -%% we want to send a basic.credit back. -reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> - Count = - case dict:find(CTag, Credits) of - {ok, #credit{ count = LocalCount }} -> - LocalCount; - _ -> Count0 - end, - %% Our credit may have been reduced while messages are in flight, - %% so we bottom out at 0. - Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), - update_credit(CTag, Credit, Count, Drain, State). - -%% Store the credit -update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> - State#lim{credits = dict:erase(CTag, Credits)}; - -update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> - State#lim{credits = dict:store(CTag, - #credit{credit = Credit, - count = Count, - drain = Drain}, Credits)}. +limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> + Limit =/= 0 andalso Volume >= Limit. blocked(#lim{blocked = Blocked}) -> Blocked. @@ -347,4 +349,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - |