summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/src/mqtt_machine.erl
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-06-04 17:19:32 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-06-04 17:19:32 +0300
commit5725a2101537613b681646d636f4aab54c395ade (patch)
tree38cc1fe279e9603a8c28d9a45998b6525eb87f22 /deps/rabbitmq_mqtt/src/mqtt_machine.erl
parent2107daf93fefa2f19ecf346478304104dd766371 (diff)
downloadrabbitmq-server-git-5725a2101537613b681646d636f4aab54c395ade.tar.gz
Remonitor connections on short disconnects and after recovery
Diffstat (limited to 'deps/rabbitmq_mqtt/src/mqtt_machine.erl')
-rw-r--r--deps/rabbitmq_mqtt/src/mqtt_machine.erl16
1 files changed, 16 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
index 226300b17c..79d93d743c 100644
--- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl
+++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
@@ -72,6 +72,13 @@ apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = Stat
end,
{State, ok, Effects};
+apply(_Meta, {down, DownPid, noconnection}, State) ->
+ %% Monitor the node the pid is on (see {nodeup, Node} below)
+ %% so that we can detect when the node is re-connected and discover the
+ %% actual fate of the connection processes on it
+ Effect = {monitor, node, node(DownPid)},
+ {State, ok, Effect};
+
apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) ->
Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid ->
false;
@@ -85,6 +92,15 @@ apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) ->
["MQTT connection with client id '~s' failed", [Id]]}] end, Delta),
{State, ok, Effects ++ snapshot_effects(Meta, State)};
+apply(_Meta, {nodeup, Node}, State) ->
+ %% Work out if any pids that were disconnected are still
+ %% alive.
+ %% Re-request the monitor for the pids on the now-back node.
+ Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node],
+ {State, ok, Effects};
+apply(_Meta, {nodedown, _Node}, State) ->
+ {State, ok}.
+
apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) ->
Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids),
Delta = maps:keys(Ids) -- maps:keys(Ids1),