diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-06 15:02:10 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-06 15:02:10 +0000 |
commit | bf7e4614aea1a3448cb8176b97e1990218bebc0b (patch) | |
tree | c9bd8853d1915a19119df1d608c53e7957e25136 | |
parent | e69b6307256e18678f688ed655ea4f1b25840388 (diff) | |
download | rabbitmq-server-bf7e4614aea1a3448cb8176b97e1990218bebc0b.tar.gz |
tweak credit_flow logic
- make credit specs contain the InitialCredit and
MoreCreditAfter. This is more intuitive and means the message sender
only needs to look at the former and the receiver at the latter.
- narrow the condition on which handle_bump_msg invokes unblock - it
only needs to do that on *transition* from =< 0 to > 0.
- refactor: extract process dict update
- cosmetic: more logical order of exported functions
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/credit_flow.erl | 95 |
2 files changed, 48 insertions, 49 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 735b4720..faf3059a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -95,7 +95,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(CREDIT_DISC_BOUND, {2000, 1500}). +-define(CREDIT_DISC_BOUND, {2000, 500}). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/src/credit_flow.erl b/src/credit_flow.erl index edbf4fb5..bdcd892a 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -16,18 +16,21 @@ -module(credit_flow). -%% Credit starts at MaxCredit and goes down. Both sides keep -%% track. When the receiver goes below MoreCreditAt it issues more -%% credit by sending a message to the sender. The sender should pass -%% this message in to handle_bump_msg/1. The sender should block when -%% it goes below 0 (check by invoking blocked/0). If a process is both -%% a sender and a receiver it will not grant any more credit to its -%% senders when it is itself blocked - thus the only processes that -%% need to check blocked/0 are ones that read from network sockets. - --define(DEFAULT_CREDIT, {200, 150}). - --export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]). +%% Credit flow is controlled by a credit specification - a +%% {InitialCredit, MoreCreditAfter} tuple. For the message sender, +%% credit starts at InitialCredit and is decremented with every +%% message sent. The message receiver grants more credit to the sender +%% by sending it a {bump_credit, ...} control message after receiving +%% MoreCreditAfter messages. The sender should pass this message in to +%% handle_bump_msg/1. The sender should block when it goes below 0 +%% (check by invoking blocked/0). If a process is both a sender and a +%% receiver it will not grant any more credit to its senders when it +%% is itself blocked - thus the only processes that need to check +%% blocked/0 are ones that read from network sockets. + +-define(DEFAULT_CREDIT, {200, 50}). + +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -37,12 +40,12 @@ -opaque(bump_msg() :: {pid(), non_neg_integer()}). -opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(send/2 :: (pid(), credit_spec()) -> 'ok'). -spec(ack/1 :: (pid()) -> 'ok'). -spec(ack/2 :: (pid(), credit_spec()) -> 'ok'). -spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). -spec(blocked/0 :: () -> boolean()). --spec(send/1 :: (pid()) -> 'ok'). --spec(send/2 :: (pid(), credit_spec()) -> 'ok'). -spec(peer_down/1 :: (pid()) -> 'ok'). -endif. @@ -60,38 +63,33 @@ %% For any given pair of processes, ack/2 and send/2 must always be %% called with the same credit_spec(). +send(From) -> send(From, ?DEFAULT_CREDIT). + +send(From, {InitialCredit, _MoreCreditAfter}) -> + update({credit_from, From}, InitialCredit, + fun (1) -> block(From), + 0; + (C) -> C - 1 + end). + ack(To) -> ack(To, ?DEFAULT_CREDIT). -ack(To, {MaxCredit, MoreCreditAt}) -> - MoreCreditAt1 = MoreCreditAt + 1, - Credit = case get({credit_to, To}, MaxCredit) of - MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt), - MaxCredit; - C -> C - 1 - end, - put({credit_to, To}, Credit). +ack(To, {_InitialCredit, MoreCreditAfter}) -> + update({credit_to, To}, MoreCreditAfter, + fun (1) -> grant(To, MoreCreditAfter), + MoreCreditAfter; + (C) -> C - 1 + end). handle_bump_msg({From, MoreCredit}) -> - Credit = get({credit_from, From}, 0) + MoreCredit, - put({credit_from, From}, Credit), - case Credit > 0 of - true -> unblock(From), - ok; - false -> ok - end. + update({credit_from, From}, 0, + fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From), + C + MoreCredit; + (C) -> C + MoreCredit + end). blocked() -> get(credit_blocked, []) =/= []. -send(From) -> send(From, ?DEFAULT_CREDIT). - -send(From, {MaxCredit, _MoreCreditAt}) -> - Credit = get({credit_from, From}, MaxCredit) - 1, - case Credit of - 0 -> block(From); - _ -> ok - end, - put({credit_from, From}, Credit). - peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it %% doesn't really matter; at some point later we will drain @@ -106,19 +104,18 @@ grant(To, Quantity) -> Msg = {bump_credit, {self(), Quantity}}, case blocked() of false -> To ! Msg; - true -> Deferred = get(credit_deferred, []), - put(credit_deferred, [{To, Msg} | Deferred]) + true -> update(credit_deferred, [], + fun (Deferred) -> [{To, Msg} | Deferred] end) end. -block(From) -> put(credit_blocked, [From | get(credit_blocked, [])]). +block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end). unblock(From) -> - NewBlocks = get(credit_blocked, []) -- [From], - put(credit_blocked, NewBlocks), - case NewBlocks of - [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], - erase(credit_deferred); - _ -> ok + update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end), + case blocked() of + false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + true -> ok end. get(Key, Default) -> @@ -126,3 +123,5 @@ get(Key, Default) -> undefined -> Default; Value -> Value end. + +update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))). |