summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/src/mqtt_node.erl
blob: 84dcd9b3a4fcb5445ca75878f39c36699c03f186 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%
-module(mqtt_node).

-export([start/0, node_id/0, server_id/0, all_node_ids/0, leave/1, trigger_election/0]).

-define(ID_NAME, mqtt_node).
-define(START_TIMEOUT, 100000).
-define(RETRY_INTERVAL, 5000).
-define(RA_OPERATION_TIMEOUT, 60000).

node_id() ->
    server_id(node()).

server_id() ->
    server_id(node()).

server_id(Node) ->
    {?ID_NAME, Node}.

all_node_ids() ->
    [server_id(N) || N <- rabbit_mnesia:cluster_nodes(all),
                   can_participate_in_clientid_tracking(N)].

start() ->
    %% 3s to 6s randomized
    Repetitions = rand:uniform(10) + 10,
    start(300, Repetitions).

start(_Delay, AttemptsLeft) when AttemptsLeft =< 0 ->
    start_server(),
    trigger_election();
start(Delay, AttemptsLeft) ->
    NodeId = server_id(),
    Nodes = compatible_peer_servers(),
    case ra_directory:uid_of(?ID_NAME) of
          undefined ->
              case Nodes of
                  [] ->
                      %% Since cluster members are not known ahead of time and initial boot can be happening in parallel,
                      %% we wait and check a few times (up to a few seconds) to see if we can discover any peers to
                      %% join before forming a cluster. This reduces the probability of N independent clusters being
                      %% formed in the common scenario of N nodes booting in parallel e.g. because they were started
                      %% at the same time by a deployment tool.
                      %%
                      %% This scenario does not guarantee single cluster formation but without knowing the list of members
                      %% ahead of time, this is a best effort workaround. Multi-node consensus is apparently hard
                      %% to achieve without having consensus around expected cluster members.
                      rabbit_log:info("MQTT: will wait for ~p more ms for cluster members to join before triggering a Raft leader election", [Delay]),
                      timer:sleep(Delay),
                      start(Delay, AttemptsLeft - 1);
                  Peers ->
                      %% Trigger an election.
                      %% This is required when we start a node for the first time.
                      %% Using default timeout because it supposed to reply fast.
                      rabbit_log:info("MQTT: discovered ~p cluster peers that support client ID tracking", [length(Peers)]),
                      start_server(),
                      join_peers(NodeId, Peers),
                      ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT)
              end;
          _ ->
              join_peers(NodeId, Nodes),
              ra:restart_server(NodeId),
              ra:trigger_election(NodeId)
    end,
    ok.

compatible_peer_servers() ->
    all_node_ids() -- [(node_id())].

start_server() ->
    NodeId = node_id(),
    Nodes = compatible_peer_servers(),
    UId = ra:new_uid(ra_lib:to_binary(?ID_NAME)),
    Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
    Conf = #{cluster_name => ?ID_NAME,
             id => NodeId,
             uid => UId,
             friendly_name => ?ID_NAME,
             initial_members => Nodes,
             log_init_args => #{uid => UId},
             tick_timeout => Timeout,
             machine => {module, mqtt_machine, #{}}
    },
    ra:start_server(Conf).

trigger_election() ->
    ra:trigger_election(server_id()).

join_peers(_NodeId, []) ->
    ok;
join_peers(NodeId, Nodes) ->
    join_peers(NodeId, Nodes, 100).
join_peers(_NodeId, [], _RetriesLeft) ->
    ok;
join_peers(_NodeId, _Nodes, RetriesLeft) when RetriesLeft =:= 0 ->
    rabbit_log:error("MQTT: exhausted all attempts while trying to rejoin cluster peers");
join_peers(NodeId, Nodes, RetriesLeft) ->
    case ra:members(Nodes, ?START_TIMEOUT) of
        {ok, Members, _} ->
            case lists:member(NodeId, Members) of
                true  -> ok;
                false -> ra:add_member(Members, NodeId)
            end;
        {timeout, _} ->
            rabbit_log:debug("MQTT: timed out contacting cluster peers, %s retries left", [RetriesLeft]),
            timer:sleep(?RETRY_INTERVAL),
            join_peers(NodeId, Nodes, RetriesLeft - 1);
        Err ->
            Err
    end.

-spec leave(node()) -> 'ok' | 'timeout' | 'nodedown'.
leave(Node) ->
    NodeId = server_id(),
    ToLeave = server_id(Node),
    try
        ra:leave_and_delete_server(NodeId, ToLeave)
    catch
        exit:{{nodedown, Node}, _} ->
            nodedown
    end.

can_participate_in_clientid_tracking(Node) ->
    case rpc:call(Node, mqtt_machine, module_info, []) of
        {badrpc, _} -> false;
        _           -> true
    end.