summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-02-06 15:02:10 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-02-06 15:02:10 +0000
commitbf7e4614aea1a3448cb8176b97e1990218bebc0b (patch)
treec9bd8853d1915a19119df1d608c53e7957e25136
parente69b6307256e18678f688ed655ea4f1b25840388 (diff)
downloadrabbitmq-server-bf7e4614aea1a3448cb8176b97e1990218bebc0b.tar.gz
tweak credit_flow logic
- make credit specs contain the InitialCredit and MoreCreditAfter. This is more intuitive and means the message sender only needs to look at the former and the receiver at the latter. - narrow the condition on which handle_bump_msg invokes unblock - it only needs to do that on *transition* from =< 0 to > 0. - refactor: extract process dict update - cosmetic: more logical order of exported functions
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/credit_flow.erl95
2 files changed, 48 insertions, 49 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 735b4720..faf3059a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -95,7 +95,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(CREDIT_DISC_BOUND, {2000, 1500}).
+-define(CREDIT_DISC_BOUND, {2000, 500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index edbf4fb5..bdcd892a 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -16,18 +16,21 @@
-module(credit_flow).
-%% Credit starts at MaxCredit and goes down. Both sides keep
-%% track. When the receiver goes below MoreCreditAt it issues more
-%% credit by sending a message to the sender. The sender should pass
-%% this message in to handle_bump_msg/1. The sender should block when
-%% it goes below 0 (check by invoking blocked/0). If a process is both
-%% a sender and a receiver it will not grant any more credit to its
-%% senders when it is itself blocked - thus the only processes that
-%% need to check blocked/0 are ones that read from network sockets.
-
--define(DEFAULT_CREDIT, {200, 150}).
-
--export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]).
+%% Credit flow is controlled by a credit specification - a
+%% {InitialCredit, MoreCreditAfter} tuple. For the message sender,
+%% credit starts at InitialCredit and is decremented with every
+%% message sent. The message receiver grants more credit to the sender
+%% by sending it a {bump_credit, ...} control message after receiving
+%% MoreCreditAfter messages. The sender should pass this message in to
+%% handle_bump_msg/1. The sender should block when it goes below 0
+%% (check by invoking blocked/0). If a process is both a sender and a
+%% receiver it will not grant any more credit to its senders when it
+%% is itself blocked - thus the only processes that need to check
+%% blocked/0 are ones that read from network sockets.
+
+-define(DEFAULT_CREDIT, {200, 50}).
+
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -37,12 +40,12 @@
-opaque(bump_msg() :: {pid(), non_neg_integer()}).
-opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(ack/1 :: (pid()) -> 'ok').
-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
-spec(blocked/0 :: () -> boolean()).
--spec(send/1 :: (pid()) -> 'ok').
--spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(peer_down/1 :: (pid()) -> 'ok').
-endif.
@@ -60,38 +63,33 @@
%% For any given pair of processes, ack/2 and send/2 must always be
%% called with the same credit_spec().
+send(From) -> send(From, ?DEFAULT_CREDIT).
+
+send(From, {InitialCredit, _MoreCreditAfter}) ->
+ update({credit_from, From}, InitialCredit,
+ fun (1) -> block(From),
+ 0;
+ (C) -> C - 1
+ end).
+
ack(To) -> ack(To, ?DEFAULT_CREDIT).
-ack(To, {MaxCredit, MoreCreditAt}) ->
- MoreCreditAt1 = MoreCreditAt + 1,
- Credit = case get({credit_to, To}, MaxCredit) of
- MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt),
- MaxCredit;
- C -> C - 1
- end,
- put({credit_to, To}, Credit).
+ack(To, {_InitialCredit, MoreCreditAfter}) ->
+ update({credit_to, To}, MoreCreditAfter,
+ fun (1) -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ (C) -> C - 1
+ end).
handle_bump_msg({From, MoreCredit}) ->
- Credit = get({credit_from, From}, 0) + MoreCredit,
- put({credit_from, From}, Credit),
- case Credit > 0 of
- true -> unblock(From),
- ok;
- false -> ok
- end.
+ update({credit_from, From}, 0,
+ fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ (C) -> C + MoreCredit
+ end).
blocked() -> get(credit_blocked, []) =/= [].
-send(From) -> send(From, ?DEFAULT_CREDIT).
-
-send(From, {MaxCredit, _MoreCreditAt}) ->
- Credit = get({credit_from, From}, MaxCredit) - 1,
- case Credit of
- 0 -> block(From);
- _ -> ok
- end,
- put({credit_from, From}, Credit).
-
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -106,19 +104,18 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> Deferred = get(credit_deferred, []),
- put(credit_deferred, [{To, Msg} | Deferred])
+ true -> update(credit_deferred, [],
+ fun (Deferred) -> [{To, Msg} | Deferred] end)
end.
-block(From) -> put(credit_blocked, [From | get(credit_blocked, [])]).
+block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
unblock(From) ->
- NewBlocks = get(credit_blocked, []) -- [From],
- put(credit_blocked, NewBlocks),
- case NewBlocks of
- [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
- _ -> ok
+ update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ case blocked() of
+ false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ true -> ok
end.
get(Key, Default) ->
@@ -126,3 +123,5 @@ get(Key, Default) ->
undefined -> Default;
Value -> Value
end.
+
+update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))).