summaryrefslogtreecommitdiff
path: root/src/credit_flow.erl
blob: ba99811f70e0154e1da99c54190dd01092a4cfbf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
%%

-module(credit_flow).

%% Credit flow is controlled by a credit specification - a
%% {InitialCredit, MoreCreditAfter} tuple. For the message sender,
%% credit starts at InitialCredit and is decremented with every
%% message sent. The message receiver grants more credit to the sender
%% by sending it a {bump_credit, ...} control message after receiving
%% MoreCreditAfter messages. The sender should pass this message in to
%% handle_bump_msg/1. The sender should block when it goes below 0
%% (check by invoking blocked/0). If a process is both a sender and a
%% receiver it will not grant any more credit to its senders when it
%% is itself blocked - thus the only processes that need to check
%% blocked/0 are ones that read from network sockets.

-define(DEFAULT_CREDIT, {200, 50}).

-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
-export([peer_down/1]).

%%----------------------------------------------------------------------------

-ifdef(use_specs).

-opaque(bump_msg() :: {pid(), non_neg_integer()}).
-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).

-spec(send/1 :: (pid()) -> 'ok').
-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(ack/1 :: (pid()) -> 'ok').
-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
-spec(blocked/0 :: () -> boolean()).
-spec(peer_down/1 :: (pid()) -> 'ok').

-endif.

%%----------------------------------------------------------------------------

%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
%% the flow of credit, but the function names refer to the flow of
%% messages. This is the clearest I can make it (since the function
%% names form the API and want to make sense externally, while the
%% variable names are used in credit bookkeeping and want to make
%% sense internally).

%% For any given pair of processes, ack/2 and send/2 must always be
%% called with the same credit_spec().

send(From) -> send(From, ?DEFAULT_CREDIT).

send(From, {InitialCredit, _MoreCreditAfter}) ->
    update({credit_from, From}, InitialCredit,
           fun (1) -> block(From),
                      0;
               (C) -> C - 1
           end).

ack(To) -> ack(To, ?DEFAULT_CREDIT).

ack(To, {_InitialCredit, MoreCreditAfter}) ->
    update({credit_to, To}, MoreCreditAfter,
           fun (1) -> grant(To, MoreCreditAfter),
                      MoreCreditAfter;
               (C) -> C - 1
           end).

handle_bump_msg({From, MoreCredit}) ->
    update({credit_from, From}, 0,
           fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
                                                             C + MoreCredit;
               (C)                                        -> C + MoreCredit
           end).

blocked() -> get(credit_blocked, []) =/= [].

peer_down(Peer) ->
    %% In theory we could also remove it from credit_deferred here, but it
    %% doesn't really matter; at some point later we will drain
    %% credit_deferred and thus send messages into the void...
    unblock(Peer),
    erase({credit_from, Peer}),
    erase({credit_to, Peer}),
    ok.

%% --------------------------------------------------------------------------

grant(To, Quantity) ->
    Msg = {bump_credit, {self(), Quantity}},
    case blocked() of
        false -> To ! Msg;
        true  -> update(credit_deferred, [],
                        fun (Deferred) -> [{To, Msg} | Deferred] end)
    end.

block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).

unblock(From) ->
    update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
    case blocked() of
        false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
                 erase(credit_deferred);
        true  -> ok
    end.

get(Key, Default) ->
    case get(Key) of
        undefined -> Default;
        Value     -> Value
    end.

update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.