summaryrefslogtreecommitdiff
path: root/src/rabbit_confirms.erl
blob: 2fe032d1f1a6d487ace6d11ab74a8116e8ac50ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
-module(rabbit_confirms).

-compile({no_auto_import, [size/1]}).

-include_lib("rabbit_common/include/rabbit.hrl").

-export([init/0,
         insert/4,
         confirm/3,
         reject/2,

         remove_queue/2,

         smallest/1,
         size/1,
         is_empty/1]).

-type seq_no() :: non_neg_integer().
-type queue_name() :: rabbit_amqqueue:name().
-type exchange_name() :: rabbit_exchange:name().

-record(?MODULE, {smallest  :: undefined | seq_no(),
                  unconfirmed = #{} :: #{seq_no() =>
                                         {exchange_name(),
                                          #{queue_name() => ok}}}
                  }).

-type mx() :: {seq_no(), exchange_name()}.

-opaque state() :: #?MODULE{}.

-export_type([
              state/0
              ]).

-spec init() -> state().
init() ->
    #?MODULE{}.

-spec insert(seq_no(), [queue_name()], exchange_name(), state()) ->
    state().
insert(SeqNo, QNames, #resource{kind = exchange} = XName,
       #?MODULE{smallest = S0,
                unconfirmed = U0} = State)
  when is_integer(SeqNo)
       andalso is_list(QNames)
       andalso is_map_key(SeqNo, U0) == false ->
    U = U0#{SeqNo => {XName, maps:from_list([{Q, ok} || Q <- QNames])}},
    S = case S0 of
            undefined -> SeqNo;
            _ -> S0
        end,
    State#?MODULE{smallest = S,
                  unconfirmed = U}.

-spec confirm([seq_no()], queue_name(), state()) ->
    {[mx()], state()}.
confirm(SeqNos, QName, #?MODULE{smallest = Smallest0,
                                unconfirmed = U0} = State)
  when is_list(SeqNos) ->
    {Confirmed, U} = lists:foldr(
                       fun (SeqNo, Acc) ->
                               confirm_one(SeqNo, QName, Acc)
                       end, {[], U0}, SeqNos),
    %% check if smallest is in Confirmed
    %% TODO: this can be optimised by checking in the preceeding foldr
    Smallest =
    case lists:any(fun ({S, _}) -> S == Smallest0 end, Confirmed) of
        true ->
            %% work out new smallest
            next_smallest(Smallest0, U);
        false ->
            Smallest0
    end,
    {Confirmed, State#?MODULE{smallest = Smallest,
                              unconfirmed = U}}.

-spec reject(seq_no(), state()) ->
    {ok, mx(), state()} | {error, not_found}.
reject(SeqNo, #?MODULE{smallest = Smallest0,
                       unconfirmed = U0} = State)
  when is_integer(SeqNo) ->
    case maps:take(SeqNo, U0) of
        {{XName, _QS}, U} ->
            Smallest = case SeqNo of
                           Smallest0 ->
                               %% need to scan as the smallest was removed
                               next_smallest(Smallest0, U);
                           _ ->
                               Smallest0
                       end,
            {ok, {SeqNo, XName}, State#?MODULE{unconfirmed = U,
                                               smallest = Smallest}};
        error ->
            {error, not_found}
    end.

%% idempotent
-spec remove_queue(queue_name(), state()) ->
    {[mx()], state()}.
remove_queue(QName, #?MODULE{unconfirmed = U} = State) ->
    SeqNos = maps:fold(
               fun (SeqNo, {_XName, QS0}, Acc) ->
                       case maps:is_key(QName, QS0) of
                           true ->
                               [SeqNo | Acc];
                           false ->
                               Acc
                       end
               end, [], U),
    confirm(lists:sort(SeqNos), QName,State).

-spec smallest(state()) -> seq_no() | undefined.
smallest(#?MODULE{smallest = Smallest}) ->
    Smallest.

-spec size(state()) -> non_neg_integer().
size(#?MODULE{unconfirmed = U}) ->
    maps:size(U).

-spec is_empty(state()) -> boolean().
is_empty(State) ->
    size(State) == 0.

%% INTERNAL

confirm_one(SeqNo, QName, {Acc, U0}) ->
    case maps:take(SeqNo, U0) of
        {{XName, QS}, U1}
          when is_map_key(QName, QS)
               andalso map_size(QS) == 1 ->
            %% last queue confirm
            {[{SeqNo, XName} | Acc], U1};
        {{XName, QS}, U1} ->
            {Acc, U1#{SeqNo => {XName, maps:remove(QName, QS)}}};
        error ->
            {Acc, U0}
    end.

next_smallest(_S, U) when map_size(U) == 0 ->
    undefined;
next_smallest(S, U) when is_map_key(S, U) ->
    S;
next_smallest(S, U) ->
    %% TODO: this is potentially infinitely recursive if called incorrectly
    next_smallest(S+1, U).



-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.