diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2019-06-04 12:25:09 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2019-06-04 12:25:09 +0300 |
commit | f4f65867ea537f951fe75e3a39cb35d863ac5d9e (patch) | |
tree | 8aaf4134a4baf222f90ab7873756f949d9a9aa48 /deps/rabbitmq_mqtt/src/mqtt_machine.erl | |
parent | c5ccd7b4acf19c6cc2c1332a5b4006040369b13c (diff) | |
download | rabbitmq-server-git-f4f65867ea537f951fe75e3a39cb35d863ac5d9e.tar.gz |
Client tracking: notify existing connections in a one-off process
To avoid blocking the leader even a little bit.
Per suggestion from @kjnilsson.
Diffstat (limited to 'deps/rabbitmq_mqtt/src/mqtt_machine.erl')
-rw-r--r-- | deps/rabbitmq_mqtt/src/mqtt_machine.erl | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl index fdc3f20213..0ee4080303 100644 --- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl +++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl @@ -20,7 +20,8 @@ -export([init/1, apply/3, - state_enter/2]). + state_enter/2, + notify_connection/2]). -record(state, {client_ids = #{}}). @@ -47,7 +48,7 @@ apply(Meta, {register, ClientId, Pid}, #state{client_ids = Ids} = State0) -> {ok, OldPid} when Pid =/= OldPid -> Effects0 = [{demonitor, process, OldPid}, {monitor, process, Pid}, - {mod_call, gen_server2, cast, [OldPid, duplicate_id]}], + {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}], {Effects0, maps:remove(ClientId, Ids)}; error -> Effects0 = [{monitor, process, Pid}], @@ -87,7 +88,7 @@ apply(Meta, {leave, Node}, #state{client_ids = Ids} = State0) -> Pid = maps:get(ClientId, Ids), [ {demonitor, process, Pid}, - {mod_call, gen_server2, cast, [Pid, decommission_node]}, + {mod_call, ?MODULE, notify_connection, [Pid, decommission_node]}, {mod_call, rabbit_log, debug, ["MQTT will remove client ID '~s' from known " "as its node has been decommissioned", [ClientId]]} @@ -113,6 +114,10 @@ state_enter(_, _) -> %% ========================== +%% Avoids blocking the Raft leader. +notify_connection(Pid, Reason) -> + spawn(fun() -> gen_server2:cast(Pid, Reason) end). + -spec snapshot_effects(map(), state()) -> ra_machine:effects(). snapshot_effects(#{index := RaftIdx}, State) -> [{release_cursor, RaftIdx, State}]. |