summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/src/mqtt_machine.erl
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-06-04 12:25:09 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-06-04 12:25:09 +0300
commitf4f65867ea537f951fe75e3a39cb35d863ac5d9e (patch)
tree8aaf4134a4baf222f90ab7873756f949d9a9aa48 /deps/rabbitmq_mqtt/src/mqtt_machine.erl
parentc5ccd7b4acf19c6cc2c1332a5b4006040369b13c (diff)
downloadrabbitmq-server-git-f4f65867ea537f951fe75e3a39cb35d863ac5d9e.tar.gz
Client tracking: notify existing connections in a one-off process
To avoid blocking the leader even a little bit. Per suggestion from @kjnilsson.
Diffstat (limited to 'deps/rabbitmq_mqtt/src/mqtt_machine.erl')
-rw-r--r--deps/rabbitmq_mqtt/src/mqtt_machine.erl11
1 files changed, 8 insertions, 3 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
index fdc3f20213..0ee4080303 100644
--- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl
+++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
@@ -20,7 +20,8 @@
-export([init/1,
apply/3,
- state_enter/2]).
+ state_enter/2,
+ notify_connection/2]).
-record(state, {client_ids = #{}}).
@@ -47,7 +48,7 @@ apply(Meta, {register, ClientId, Pid}, #state{client_ids = Ids} = State0) ->
{ok, OldPid} when Pid =/= OldPid ->
Effects0 = [{demonitor, process, OldPid},
{monitor, process, Pid},
- {mod_call, gen_server2, cast, [OldPid, duplicate_id]}],
+ {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}],
{Effects0, maps:remove(ClientId, Ids)};
error ->
Effects0 = [{monitor, process, Pid}],
@@ -87,7 +88,7 @@ apply(Meta, {leave, Node}, #state{client_ids = Ids} = State0) ->
Pid = maps:get(ClientId, Ids),
[
{demonitor, process, Pid},
- {mod_call, gen_server2, cast, [Pid, decommission_node]},
+ {mod_call, ?MODULE, notify_connection, [Pid, decommission_node]},
{mod_call, rabbit_log, debug,
["MQTT will remove client ID '~s' from known "
"as its node has been decommissioned", [ClientId]]}
@@ -113,6 +114,10 @@ state_enter(_, _) ->
%% ==========================
+%% Avoids blocking the Raft leader.
+notify_connection(Pid, Reason) ->
+ spawn(fun() -> gen_server2:cast(Pid, Reason) end).
+
-spec snapshot_effects(map(), state()) -> ra_machine:effects().
snapshot_effects(#{index := RaftIdx}, State) ->
[{release_cursor, RaftIdx, State}].