summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-25 11:57:00 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-25 11:57:00 +0000
commit4cdc47a48e9a757e216d1275f66bc7dde80162e7 (patch)
treee7ed7e0741c44c232a2999f72c8cd7246e6aa8a8 /src/rabbit_limiter.erl
parent8e940c2e95208924a55a6f437c792b7b1e2b1614 (diff)
parent423aca76adbca197fe4bbcb8ee720122bc75ece4 (diff)
downloadrabbitmq-server-4cdc47a48e9a757e216d1275f66bc7dde80162e7.tar.gz
merge default into bug23749
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl107
1 files changed, 101 insertions, 6 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a7d14fe..b97d1073 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -15,19 +15,25 @@
%%
-module(rabbit_limiter).
+-include("rabbit_framing.hrl").
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
+
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_ch_send/3, can_cons_send/2, record_cons_send/4,
+ ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+-export([inform/4, forget_consumer/2, copy_queue_state/2]).
+
+-import(rabbit_misc, [serial_add/2, serial_diff/2]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled}).
+-record(token, {pid, enabled, q_state}).
-ifdef(use_specs).
@@ -42,7 +48,8 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
+-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()).
+-spec(can_cons_send/2 :: (token(), rabbit_types:ctag()) -> boolean()).
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -50,6 +57,10 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) ->
+ {[rabbit_types:ctag()], token()}).
+-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()).
+-spec(copy_queue_state/2 :: (token(), token()) -> token()).
-endif.
@@ -64,6 +75,8 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
+-record(credit, {count = 0, credit = 0, drain = false}).
+
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
@@ -71,7 +84,8 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+make_token(Pid) -> #token{pid = Pid, enabled = false,
+ q_state = dict:new()}.
is_enabled(#token{enabled = Enabled}) -> Enabled.
@@ -88,15 +102,25 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
+can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
fun () ->
gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
end);
-can_send(_, _, _) ->
+can_ch_send(_, _, _) ->
true.
+can_cons_send(#token{q_state = Credits}, CTag) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{credit = C}} when C > 0 -> true;
+ {ok, #credit{}} -> false;
+ error -> true
+ end.
+
+record_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) ->
+ Token#token{q_state = record_send_q(CTag, Len, ChPid, QState)}.
+
%% Let the limiter know that the channel has received some acks from a
%% consumer
ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
@@ -119,6 +143,77 @@ unblock(Limiter) ->
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
+inform(Limiter = #token{q_state = Credits},
+ ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) ->
+ {Unblock, Credits2} = update_credit(
+ CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ case Reply of
+ true -> rabbit_channel:send_credit_reply(ChPid, Len);
+ false -> ok
+ end,
+ {Unblock, Limiter#token{q_state = Credits2}}.
+
+forget_consumer(Limiter = #token{q_state = Credits}, CTag) ->
+ Limiter#token{q_state = dict:erase(CTag, Credits)}.
+
+copy_queue_state(#token{q_state = Credits}, Token) ->
+ Token#token{q_state = Credits}.
+
+%%----------------------------------------------------------------------------
+%% Queue-local code
+%%----------------------------------------------------------------------------
+
+%% We want to do all the AMQP 1.0-ish link level credit calculations in the
+%% queue (to do them elsewhere introduces a ton of races). However, it's a big
+%% chunk of code that is conceptually very linked to the limiter concept. So
+%% we get the queue to hold a bit of state for us (#token.q_state), and
+%% maintain a fiction that the limiter is making the decisions...
+
+record_send_q(CTag, Len, ChPid, Credits) ->
+ case dict:find(CTag, Credits) of
+ {ok, Cred} ->
+ decr_credit(CTag, Len, ChPid, Cred, Credits);
+ error ->
+ Credits
+ end.
+
+decr_credit(CTag, Len, ChPid, Cred, Credits) ->
+ #credit{credit = Credit, count = Count, drain = Drain} = Cred,
+ {NewCredit, NewCount} = maybe_drain(Len - 1, Drain, CTag, ChPid,
+ Credit - 1, serial_add(Count, 1)),
+ write_credit(CTag, NewCredit, NewCount, Drain, Credits).
+
+maybe_drain(0, true, CTag, ChPid, Credit, Count) ->
+ %% Drain, so advance til credit = 0
+ NewCount = serial_add(Count, Credit - 2),
+ send_drained(ChPid, CTag, NewCount),
+ {0, NewCount}; %% Magic reduction to 0
+
+maybe_drain(_, _, _, _, Credit, Count) ->
+ {Credit, Count}.
+
+send_drained(ChPid, CTag, Count) ->
+ rabbit_channel:send_drained(ChPid, CTag, Count).
+
+update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
+ Count = case dict:find(CTag, Credits) of
+ %% Use our count if we can, more accurate
+ {ok, #credit{ count = LocalCount }} -> LocalCount;
+ %% But if this is new, take it from the adapter
+ _ -> Count0
+ end,
+ {NewCredit, NewCount} = maybe_drain(Len, Drain, CTag, ChPid, Credit, Count),
+ NewCredits = write_credit(CTag, NewCredit, NewCount, Drain, Credits),
+ case NewCredit > 0 of
+ true -> {[CTag], NewCredits};
+ false -> {[], NewCredits}
+ end.
+
+write_credit(CTag, Credit, Count, Drain, Credits) ->
+ dict:store(CTag, #credit{credit = Credit,
+ count = Count,
+ drain = Drain}, Credits).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------