diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2019-06-04 17:19:32 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2019-06-04 17:19:32 +0300 |
commit | 5725a2101537613b681646d636f4aab54c395ade (patch) | |
tree | 38cc1fe279e9603a8c28d9a45998b6525eb87f22 /deps/rabbitmq_mqtt/src/mqtt_machine.erl | |
parent | 2107daf93fefa2f19ecf346478304104dd766371 (diff) | |
download | rabbitmq-server-git-5725a2101537613b681646d636f4aab54c395ade.tar.gz |
Remonitor connections on short disconnects and after recovery
Diffstat (limited to 'deps/rabbitmq_mqtt/src/mqtt_machine.erl')
-rw-r--r-- | deps/rabbitmq_mqtt/src/mqtt_machine.erl | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl index 226300b17c..79d93d743c 100644 --- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl +++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl @@ -72,6 +72,13 @@ apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = Stat end, {State, ok, Effects}; +apply(_Meta, {down, DownPid, noconnection}, State) -> + %% Monitor the node the pid is on (see {nodeup, Node} below) + %% so that we can detect when the node is re-connected and discover the + %% actual fate of the connection processes on it + Effect = {monitor, node, node(DownPid)}, + {State, ok, Effect}; + apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) -> Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid -> false; @@ -85,6 +92,15 @@ apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) -> ["MQTT connection with client id '~s' failed", [Id]]}] end, Delta), {State, ok, Effects ++ snapshot_effects(Meta, State)}; +apply(_Meta, {nodeup, Node}, State) -> + %% Work out if any pids that were disconnected are still + %% alive. + %% Re-request the monitor for the pids on the now-back node. + Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node], + {State, ok, Effects}; +apply(_Meta, {nodedown, _Node}, State) -> + {State, ok}. + apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) -> Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids), Delta = maps:keys(Ids) -- maps:keys(Ids1), |