diff options
Diffstat (limited to 'src/credit_flow.erl')
-rw-r--r-- | src/credit_flow.erl | 75 |
1 files changed, 44 insertions, 31 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 9ffaf247..106179fd 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -52,6 +52,22 @@ %%---------------------------------------------------------------------------- +%% process dict update macro - eliminates the performance-hurting +%% closure creation a HOF would introduce +-define(UPDATE(Key, Default, Var, Expr), + begin + %% We deliberately allow Var to escape from the case here + %% to be used in Expr. Any temporary var we introduced + %% would also escape, and might conflict. + case get(Key) of + undefined -> Var = Default; + Var -> ok + end, + put(Key, Expr) + end). + +%%---------------------------------------------------------------------------- + %% There are two "flows" here; of messages and of credit, going in %% opposite directions. The variable names "From" and "To" refer to %% the flow of credit, but the function names refer to the flow of @@ -66,29 +82,33 @@ send(From) -> send(From, ?DEFAULT_CREDIT). send(From, {InitialCredit, _MoreCreditAfter}) -> - update({credit_from, From}, InitialCredit, - fun (1) -> block(From), - 0; - (C) -> C - 1 - end). + ?UPDATE({credit_from, From}, InitialCredit, C, + if C == 1 -> block(From), + 0; + true -> C - 1 + end). ack(To) -> ack(To, ?DEFAULT_CREDIT). ack(To, {_InitialCredit, MoreCreditAfter}) -> - update({credit_to, To}, MoreCreditAfter, - fun (1) -> grant(To, MoreCreditAfter), - MoreCreditAfter; - (C) -> C - 1 - end). + ?UPDATE({credit_to, To}, MoreCreditAfter, C, + if C == 1 -> grant(To, MoreCreditAfter), + MoreCreditAfter; + true -> C - 1 + end). handle_bump_msg({From, MoreCredit}) -> - update({credit_from, From}, 0, - fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From), - C + MoreCredit; - (C) -> C + MoreCredit - end). - -blocked() -> get(credit_blocked, []) =/= []. + ?UPDATE({credit_from, From}, 0, C, + if C =< 0 andalso C + MoreCredit > 0 -> unblock(From), + C + MoreCredit; + true -> C + MoreCredit + end). + +blocked() -> case get(credit_blocked) of + undefined -> false; + [] -> false; + _ -> true + end. peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it @@ -105,24 +125,17 @@ grant(To, Quantity) -> Msg = {bump_credit, {self(), Quantity}}, case blocked() of false -> To ! Msg; - true -> update(credit_deferred, [], - fun (Deferred) -> [{To, Msg} | Deferred] end) + true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end). +block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> - update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end), + ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), case blocked() of - false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], - erase(credit_deferred); + false -> case erase(credit_deferred) of + undefined -> ok; + Credits -> [To ! Msg || {To, Msg} <- Credits] + end; true -> ok end. - -get(Key, Default) -> - case get(Key) of - undefined -> Default; - Value -> Value - end. - -update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok. |