summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_maintenance.erl
blob: 3b735126480ac97b5a1fc473074ee11db7b917e8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(rabbit_maintenance).

-include("rabbit.hrl").

-export([
    is_enabled/0,
    drain/0,
    revive/0,
    mark_as_being_drained/0,
    unmark_as_being_drained/0,
    is_being_drained_local_read/1,
    is_being_drained_consistent_read/1,
    status_local_read/1,
    status_consistent_read/1,
    filter_out_drained_nodes_local_read/1,
    filter_out_drained_nodes_consistent_read/1,
    suspend_all_client_listeners/0,
    resume_all_client_listeners/0,
    close_all_client_connections/0,
    primary_replica_transfer_candidate_nodes/0,
    random_primary_replica_transfer_candidate_node/1,
    transfer_leadership_of_quorum_queues/1,
    transfer_leadership_of_classic_mirrored_queues/1,
    status_table_name/0,
    status_table_definition/0
]).

-define(TABLE, rabbit_node_maintenance_states).
-define(FEATURE_FLAG, maintenance_mode_status).
-define(DEFAULT_STATUS,  regular).
-define(DRAINING_STATUS, draining).

-type maintenance_status() :: ?DEFAULT_STATUS | ?DRAINING_STATUS.
-type mnesia_table() :: atom().

-export_type([
    maintenance_status/0
]).

%%
%% API
%%

-spec status_table_name() -> mnesia_table().
status_table_name() ->
    ?TABLE.

-spec status_table_definition() -> list().
status_table_definition() ->
    maps:to_list(#{
        record_name => node_maintenance_state,
        attributes  => record_info(fields, node_maintenance_state)
    }).

-spec is_enabled() -> boolean().
is_enabled() ->
    rabbit_feature_flags:is_enabled(?FEATURE_FLAG).

-spec drain() -> ok.
drain() ->
    case is_enabled() of
        true  -> do_drain();
        false -> rabbit_log:warning("Feature flag `~s` is not enabled, draining is a no-op", [?FEATURE_FLAG])
    end.

-spec do_drain() -> ok.
do_drain() ->
    rabbit_log:alert("This node is being put into maintenance (drain) mode"),
    mark_as_being_drained(),
    rabbit_log:info("Marked this node as undergoing maintenance"),
    suspend_all_client_listeners(),
    rabbit_log:alert("Suspended all listeners and will no longer accept client connections"),
    {ok, NConnections} = close_all_client_connections(),
    %% allow plugins to react e.g. by closing their protocol connections
    rabbit_event:notify(maintenance_connections_closed, #{
        reason => <<"node is being put into maintenance">>
    }),
    rabbit_log:alert("Closed ~b local client connections", [NConnections]),

    TransferCandidates = primary_replica_transfer_candidate_nodes(),
    ReadableCandidates = readable_candidate_list(TransferCandidates),
    rabbit_log:info("Node will transfer primary replicas of its queues to ~b peers: ~s",
                    [length(TransferCandidates), ReadableCandidates]),
    transfer_leadership_of_classic_mirrored_queues(TransferCandidates),
    transfer_leadership_of_quorum_queues(TransferCandidates),
    stop_local_quorum_queue_followers(),

    %% allow plugins to react
    rabbit_event:notify(maintenance_draining, #{
        reason => <<"node is being put into maintenance">>
    }),
    rabbit_log:alert("Node is ready to be shut down for maintenance or upgrade"),

    ok.

-spec revive() -> ok.
revive() ->
    case is_enabled() of
        true  -> do_revive();
        false -> rabbit_log:warning("Feature flag `~s` is not enabled, reviving is a no-op", [?FEATURE_FLAG])
    end.

-spec do_revive() -> ok.
do_revive() ->
    rabbit_log:alert("This node is being revived from maintenance (drain) mode"),
    revive_local_quorum_queue_replicas(),
    rabbit_log:alert("Resumed all listeners and will accept client connections again"),
    resume_all_client_listeners(),
    rabbit_log:alert("Resumed all listeners and will accept client connections again"),
    unmark_as_being_drained(),
    rabbit_log:info("Marked this node as back from maintenance and ready to serve clients"),

    %% allow plugins to react
    rabbit_event:notify(maintenance_revived, #{}),

    ok.
 
-spec mark_as_being_drained() -> boolean().
mark_as_being_drained() ->
    rabbit_log:debug("Marking the node as undergoing maintenance"),
    set_maintenance_status_status(?DRAINING_STATUS).
 
-spec unmark_as_being_drained() -> boolean().
unmark_as_being_drained() ->
    rabbit_log:debug("Unmarking the node as undergoing maintenance"),
    set_maintenance_status_status(?DEFAULT_STATUS).

set_maintenance_status_status(Status) ->
    Res = mnesia:transaction(fun () ->
        case mnesia:wread({?TABLE, node()}) of
           [] ->
                Row = #node_maintenance_state{
                        node   = node(),
                        status = Status
                     },
                mnesia:write(?TABLE, Row, write);
            [Row0] ->
                Row = Row0#node_maintenance_state{
                        node   = node(),
                        status = Status
                      },
                mnesia:write(?TABLE, Row, write)
        end
    end),
    case Res of
        {atomic, ok} -> true;
        _            -> false
    end.
 
 
-spec is_being_drained_local_read(node()) -> boolean().
is_being_drained_local_read(Node) ->
    Status = status_local_read(Node),
    Status =:= ?DRAINING_STATUS.

-spec is_being_drained_consistent_read(node()) -> boolean().
is_being_drained_consistent_read(Node) ->
    Status = status_consistent_read(Node),
    Status =:= ?DRAINING_STATUS.

-spec status_local_read(node()) -> maintenance_status().
status_local_read(Node) ->
    case catch mnesia:dirty_read(?TABLE, Node) of
        []  -> ?DEFAULT_STATUS;
        [#node_maintenance_state{node = Node, status = Status}] ->
            Status;
        _   -> ?DEFAULT_STATUS
    end.
 
-spec status_consistent_read(node()) -> maintenance_status().
status_consistent_read(Node) ->
    case mnesia:transaction(fun() -> mnesia:read(?TABLE, Node) end) of
        {atomic, []} -> ?DEFAULT_STATUS;
        {atomic, [#node_maintenance_state{node = Node, status = Status}]} ->
            Status;
        {atomic, _}  -> ?DEFAULT_STATUS;
        {aborted, _Reason} -> ?DEFAULT_STATUS
    end.
 
 -spec filter_out_drained_nodes_local_read([node()]) -> [node()].
filter_out_drained_nodes_local_read(Nodes) ->
    lists:filter(fun(N) -> not is_being_drained_local_read(N) end, Nodes).
 
-spec filter_out_drained_nodes_consistent_read([node()]) -> [node()].
filter_out_drained_nodes_consistent_read(Nodes) ->
    lists:filter(fun(N) -> not is_being_drained_consistent_read(N) end, Nodes).
 
-spec suspend_all_client_listeners() -> rabbit_types:ok_or_error(any()).
 %% Pauses all listeners on the current node except for
 %% Erlang distribution (clustering and CLI tools).
 %% A respausedumed listener will not accept any new client connections
 %% but previously established connections won't be interrupted.
suspend_all_client_listeners() ->
    Listeners = rabbit_networking:node_client_listeners(node()),
    rabbit_log:info("Asked to suspend ~b client connection listeners. "
                    "No new client connections will be accepted until these listeners are resumed!", [length(Listeners)]),
    Results = lists:foldl(local_listener_fold_fun(fun ranch:suspend_listener/1), [], Listeners),
    lists:foldl(fun ok_or_first_error/2, ok, Results).

 -spec resume_all_client_listeners() -> rabbit_types:ok_or_error(any()).
 %% Resumes all listeners on the current node except for
 %% Erlang distribution (clustering and CLI tools).
 %% A resumed listener will accept new client connections.
resume_all_client_listeners() ->
    Listeners = rabbit_networking:node_client_listeners(node()),
    rabbit_log:info("Asked to resume ~b client connection listeners. "
                    "New client connections will be accepted from now on", [length(Listeners)]),
    Results = lists:foldl(local_listener_fold_fun(fun ranch:resume_listener/1), [], Listeners),
    lists:foldl(fun ok_or_first_error/2, ok, Results).

 -spec close_all_client_connections() -> {'ok', non_neg_integer()}.
close_all_client_connections() ->
    Pids = rabbit_networking:local_connections(),
    rabbit_networking:close_connections(Pids, "Node was put into maintenance mode"),
    {ok, length(Pids)}.

-spec transfer_leadership_of_quorum_queues([node()]) -> ok.
transfer_leadership_of_quorum_queues([]) ->
    rabbit_log:warning("Skipping leadership transfer of quorum queues: no candidate "
                       "(online, not under maintenance) nodes to transfer to!");
transfer_leadership_of_quorum_queues(_TransferCandidates) ->
    %% we only transfer leadership for QQs that have local leaders
    Queues = rabbit_amqqueue:list_local_leaders(),
    rabbit_log:info("Will transfer leadership of ~b quorum queues with current leader on this node",
                    [length(Queues)]),
    [begin
        Name = amqqueue:get_name(Q),
        rabbit_log:debug("Will trigger a leader election for local quorum queue ~s",
                         [rabbit_misc:rs(Name)]),
        %% we trigger an election and exclude this node from the list of candidates
        %% by simply shutting its local QQ replica (Ra server)
        RaLeader = amqqueue:get_pid(Q),
        rabbit_log:debug("Will stop Ra server ~p", [RaLeader]),
        case ra:stop_server(RaLeader) of
            ok     ->
                rabbit_log:debug("Successfully stopped Ra server ~p", [RaLeader]);
            {error, nodedown} ->
                rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down")
        end
     end || Q <- Queues],
    rabbit_log:info("Leadership transfer for quorum queues hosted on this node has been initiated").

-spec transfer_leadership_of_classic_mirrored_queues([node()]) -> ok.
 transfer_leadership_of_classic_mirrored_queues([]) ->
    rabbit_log:warning("Skipping leadership transfer of classic mirrored queues: no candidate "
                       "(online, not under maintenance) nodes to transfer to!");
transfer_leadership_of_classic_mirrored_queues(TransferCandidates) ->
    Queues = rabbit_amqqueue:list_local_mirrored_classic_queues(),
    ReadableCandidates = readable_candidate_list(TransferCandidates),
    rabbit_log:info("Will transfer leadership of ~b classic mirrored queues hosted on this node to these peer nodes: ~s",
                    [length(Queues), ReadableCandidates]),
    
    [begin
         Name = amqqueue:get_name(Q),
         case random_primary_replica_transfer_candidate_node(TransferCandidates) of
             {ok, Pick} ->
                 rabbit_log:debug("Will transfer leadership of local queue ~s to node ~s",
                          [rabbit_misc:rs(Name), Pick]),
                 case rabbit_mirror_queue_misc:transfer_leadership(Q, Pick) of
                     {migrated, _} ->
                         rabbit_log:debug("Successfully transferred leadership of queue ~s to node ~s",
                                          [rabbit_misc:rs(Name), Pick]);
                     Other ->
                         rabbit_log:warning("Could not transfer leadership of queue ~s to node ~s: ~p",
                                            [rabbit_misc:rs(Name), Pick, Other])
                 end;
             undefined ->
                 rabbit_log:warning("Could not transfer leadership of queue ~s: no suitable candidates?",
                                    [Name])
         end
     end || Q <- Queues],
    rabbit_log:info("Leadership transfer for local classic mirrored queues is complete").

-spec stop_local_quorum_queue_followers() -> ok.
stop_local_quorum_queue_followers() ->
    Queues = rabbit_amqqueue:list_local_followers(),
    rabbit_log:info("Will stop local follower replicas of ~b quorum queues on this node",
                    [length(Queues)]),
    [begin
        Name = amqqueue:get_name(Q),
        rabbit_log:debug("Will stop a local follower replica of quorum queue ~s",
                         [rabbit_misc:rs(Name)]),
        %% shut down Ra nodes so that they are not considered for leader election
        {RegisteredName, _LeaderNode} = amqqueue:get_pid(Q),
        RaNode = {RegisteredName, node()},
        rabbit_log:debug("Will stop Ra server ~p", [RaNode]),
        case ra:stop_server(RaNode) of
            ok     ->
                rabbit_log:debug("Successfully stopped Ra server ~p", [RaNode]);
            {error, nodedown} ->
                rabbit_log:error("Failed to stop Ra server ~p: target node was reported as down")
        end
     end || Q <- Queues],
    rabbit_log:info("Stopped all local replicas of quorum queues hosted on this node").

 -spec primary_replica_transfer_candidate_nodes() -> [node()].
primary_replica_transfer_candidate_nodes() ->
    filter_out_drained_nodes_consistent_read(rabbit_nodes:all_running() -- [node()]).

-spec random_primary_replica_transfer_candidate_node([node()]) -> {ok, node()} | undefined.
random_primary_replica_transfer_candidate_node([]) ->
    undefined;
random_primary_replica_transfer_candidate_node(Candidates) ->
    Nth = erlang:phash2(erlang:monotonic_time(), length(Candidates)),
    Candidate = lists:nth(Nth + 1, Candidates),
    {ok, Candidate}.

revive_local_quorum_queue_replicas() ->
    Queues = rabbit_amqqueue:list_local_followers(),
    [begin
        Name = amqqueue:get_name(Q),
        rabbit_log:debug("Will trigger a leader election for local quorum queue ~s",
                         [rabbit_misc:rs(Name)]),
        %% start local QQ replica (Ra server) of this queue
        {Prefix, _Node} = amqqueue:get_pid(Q),
        RaServer = {Prefix, node()},
        rabbit_log:debug("Will start Ra server ~p", [RaServer]),
        case ra:restart_server(RaServer) of
            ok     ->
                rabbit_log:debug("Successfully restarted Ra server ~p", [RaServer]);
            {error, {already_started, _Pid}} ->
                rabbit_log:debug("Ra server ~p is already running", [RaServer]);
            {error, nodedown} ->
                rabbit_log:error("Failed to restart Ra server ~p: target node was reported as down")
        end
     end || Q <- Queues],
    rabbit_log:info("Restart of local quorum queue replicas is complete").
 
%%
%% Implementation
%%

local_listener_fold_fun(Fun) ->
    fun(#listener{node = Node, ip_address = Addr, port = Port}, Acc) when Node =:= node() ->
            RanchRef = rabbit_networking:ranch_ref(Addr, Port),
            [Fun(RanchRef) | Acc];
        (_, Acc) ->
            Acc
    end.
 
ok_or_first_error(ok, Acc) ->
    Acc;
ok_or_first_error({error, _} = Err, _Acc) ->
    Err.
 
readable_candidate_list(Nodes) ->
    string:join(lists:map(fun rabbit_data_coercion:to_list/1, Nodes), ", ").