diff options
author | Diana Corbacho <diana@rabbitmq.com> | 2019-05-16 17:50:06 +0100 |
---|---|---|
committer | Diana Corbacho <diana@rabbitmq.com> | 2019-05-16 17:50:06 +0100 |
commit | 7bfe632cb60cc954189bbecbc4f02724d8dd2599 (patch) | |
tree | 8ec5070c19d38f7c4dfcf617fd4dcaeb4711c08f | |
parent | 3e17e1a80d45780230e80b4b369ce7ae512249db (diff) | |
download | rabbitmq-server-git-7bfe632cb60cc954189bbecbc4f02724d8dd2599.tar.gz |
Cluster-wide MQTT client id tracking
Uses a ra cluster to keep the client id tracking information - in
the state of the ra machine.
If nodes are decommissioned from the RMQ cluster, the command
decommission_mqtt_node must be invoked first to disconnect the clients
on that node and remove the node from the ra cluster.
[#135330629]
-rw-r--r-- | deps/rabbitmq_mqtt/Makefile | 2 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl | 77 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt.erl | 5 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl | 92 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 8 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl | 5 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/cluster_SUITE.erl | 130 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/java_SUITE.erl | 6 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile | 1 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml | 1 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java | 15 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/reader_SUITE.erl | 2 |
12 files changed, 260 insertions, 84 deletions
diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index 53d3857db4..382712c74f 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -30,7 +30,7 @@ define PROJECT_APP_EXTRA_KEYS {broker_version_requirements, []} endef -DEPS = ranch rabbit_common rabbit amqp_client +DEPS = ranch rabbit_common rabbit amqp_client ra TEST_DEPS = emqttc ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers dep_ct_helper = git https://github.com/extend/ct_helper.git master diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl new file mode 100644 index 0000000000..7dd9cd4874 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand'). + +-include("rabbit_mqtt.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([scopes/0, + switches/0, + aliases/0, + usage/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +scopes() -> [ctl, diagnostics]. +switches() -> []. +aliases() -> []. + +description() -> <<"Decommissions MQTT cluster member used to track client ids">>. + +help_section() -> + {plugin, mqtt}. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_, _ | _], _Opts) -> + {validation_failure, too_many_args}; +validate([_], _) -> + ok. + +merge_defaults(Args, Opts) -> + {Args, Opts}. + +usage() -> + <<"decomission_mqtt_node <node>">>. + +usage_doc_guides() -> + [?MQTT_GUIDE_URL]. + +run([Node], #{node := NodeName, + timeout := Timeout}) -> + case rabbit_misc:rpc_call(NodeName, rabbit_mqtt_collector, leave, [Node], Timeout) of + {badrpc, _} = Error -> + Error; + nodedown -> + list_to_binary(io_lib:format("Node ~p is down but has been successfully removed" + " from the cluster", [Node])); + Result -> + %% 'ok' or 'timeout' + %% TODO: Ra will timeout if the node is not a cluster member - should this be fixed?? + Result + end. + +banner(_, _) -> <<"Decomissioning MQTT node used for client id tracking ...">>. + +output(Result, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index be10d2277d..528a680774 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -26,6 +26,7 @@ start(normal, []) -> {ok, Listeners} = application:get_env(tcp_listeners), {ok, SslListeners} = application:get_env(ssl_listeners), Result = rabbit_mqtt_sup:start_link({Listeners, SslListeners}, []), + ok = mqtt_node:start(), EMPid = case rabbit_event:start_link() of {ok, Pid} -> Pid; {error, {already_started, Pid}} -> Pid @@ -45,7 +46,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> emit_connection_info_local(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler( - AggregatorPid, Ref, fun({_, {Pid, _}}) -> + AggregatorPid, Ref, fun({_, Pid}) -> rabbit_mqtt_reader:info(Pid, Items) end, rabbit_mqtt_collector:list()). @@ -53,4 +54,4 @@ emit_connection_info_local(Items, Ref, AggregatorPid) -> connection_info_local(Items) -> Connections = rabbit_mqtt_collector:list(), [rabbit_mqtt_reader:info(Pid, Items) - || {_, {Pid, _}} <- Connections]. + || {_, Pid} <- Connections]. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl index c025339306..b13d9ec108 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl @@ -11,91 +11,33 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. %% -module(rabbit_mqtt_collector). --behaviour(gen_server). - --export([start_link/0, register/2, unregister/2, list/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {client_ids}). - --define(SERVER, ?MODULE). +-export([register/2, unregister/2, list/0, leave/1]). %%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - register(ClientId, Pid) -> - gen_server:call(rabbit_mqtt_collector, {register, ClientId, Pid}, infinity). + run_ra_command({register, ClientId, Pid}). unregister(ClientId, Pid) -> - gen_server:call(rabbit_mqtt_collector, {unregister, ClientId, Pid}, infinity). + run_ra_command({unregister, ClientId, Pid}). list() -> - gen_server:call(rabbit_mqtt_collector, list). - -%%---------------------------------------------------------------------------- - -init([]) -> - {ok, #state{client_ids = #{}}}. % clientid -> {pid, monitor} + run_ra_command(list). -%%-------------------------------------------------------------------------- +leave(NodeBin) -> + Node = binary_to_atom(NodeBin, utf8), + run_ra_command({leave, Node}), + mqtt_node:leave(Node). -handle_call({register, ClientId, Pid}, _From, - State = #state{client_ids = Ids}) -> - Ids1 = case maps:find(ClientId, Ids) of - {ok, {OldPid, MRef}} when Pid =/= OldPid -> - catch gen_server2:cast(OldPid, duplicate_id), - erlang:demonitor(MRef), - maps:remove(ClientId, Ids); - error -> - Ids - end, - Ids2 = maps:put(ClientId, {Pid, erlang:monitor(process, Pid)}, Ids1), - {reply, ok, State#state{client_ids = Ids2}}; - -handle_call({unregister, ClientId, Pid}, _From, State = #state{client_ids = Ids}) -> - {Reply, Ids1} = case maps:find(ClientId, Ids) of - {ok, {Pid, MRef}} -> erlang:demonitor(MRef), - {ok, maps:remove(ClientId, Ids)}; - _ -> {ok, Ids} - end, - {reply, Reply, State#state{ client_ids = Ids1 }}; - -handle_call(list, _From, State = #state{client_ids = Ids}) -> - {reply, maps:to_list(Ids), State}; - -handle_call(Msg, _From, State) -> - {stop, {unhandled_call, Msg}, State}. - -handle_cast(Msg, State) -> - {stop, {unhandled_cast, Msg}, State}. - -handle_info({'EXIT', _, {shutdown, closed}}, State) -> - {stop, {shutdown, closed}, State}; - -handle_info({'DOWN', MRef, process, DownPid, _Reason}, - State = #state{client_ids = Ids}) -> - Ids1 = maps:filter(fun (ClientId, {Pid, M}) - when Pid =:= DownPid, MRef =:= M -> - rabbit_log_connection:warning( - "MQTT disconnect from ~p~n", - [ClientId]), - false; - (_, _) -> - true - end, Ids), - {noreply, State #state{ client_ids = Ids1 }}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +%%---------------------------------------------------------------------------- +-spec run_ra_command(term()) -> term() | {error, term()}. +run_ra_command(RaCommand) -> + NodeId = mqtt_node:node_id(), + case ra:process_command(NodeId, RaCommand) of + {ok, Result, _} -> Result; + _ = Error -> Error + end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 1352f9e6c4..143c8bf9f6 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -111,6 +111,14 @@ handle_cast(duplicate_id, [rabbit_mqtt_processor:info(client_id, PState), ConnName]), {stop, {shutdown, duplicate_id}, State}; +handle_cast(decommission_node, + State = #state{ proc_state = PState, + conn_name = ConnName }) -> + rabbit_log_connection:warning("MQTT disconnecting client id ~p (~p) as node is about" + " to be decommissioned~n", + [rabbit_mqtt_processor:info(client_id, PState), ConnName]), + {stop, {shutdown, decommission_node}, State}; + handle_cast(Msg, State) -> {stop, {mqtt_unexpected_cast, Msg}, State}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl index 4783149cd5..34d5ccd481 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl @@ -38,10 +38,7 @@ init([{Listeners, SslListeners0}]) -> end} end, {ok, {{one_for_all, 10, 10}, - [{collector, - {rabbit_mqtt_collector, start_link, []}, - transient, ?WORKER_WAIT, worker, [rabbit_mqtt_collector]}, - {rabbit_mqtt_retainer_sup, + [{rabbit_mqtt_retainer_sup, {rabbit_mqtt_retainer_sup, start_link, [{local, rabbit_mqtt_retainer_sup}]}, transient, ?SUPERVISOR_WAIT, supervisor, [rabbit_mqtt_retainer_sup]} | listener_specs(fun tcp_listener_spec/1, diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl new file mode 100644 index 0000000000..593b7e69db --- /dev/null +++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl @@ -0,0 +1,130 @@ +-module(cluster_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + nodedown, + decommission_node + ]} + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, basic}, + {collect_statistics_interval, 100} + ]}). + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase}, + {rmq_extra_tcp_ports, [tcp_port_mqtt_extra, + tcp_port_mqtt_tls_extra]}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 3} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testsuite cases +%% ------------------------------------------------------------------- + +nodedown(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + unlink(C), + MRef = erlang:monitor(process, C), + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + 30000 -> + exit(missing_down_message) + end, + [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []). + +decommission_node(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + unlink(C), + MRef = erlang:monitor(process, C), + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []), + {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + 30000 -> + exit(missing_down_message) + end, + [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []). + +expect_publishes(_Topic, []) -> ok; +expect_publishes(Topic, [Payload|Rest]) -> + receive + {publish, Topic, Payload} -> expect_publishes(Topic, Rest) + after 5000 -> + throw({publish_not_delivered, Payload}) + end. diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl index 5fdf9b88fd..f924a8998a 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl @@ -58,7 +58,9 @@ init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, ?MODULE}, - {rmq_certspwd, "bunnychow"} + {rmq_certspwd, "bunnychow"}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 2} ]), rabbit_ct_helpers:run_setup_steps(Config1, [ fun merge_app_env/1 ] ++ @@ -105,11 +107,13 @@ end_per_testcase(Testcase, Config) -> java(Config) -> CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir), MqttPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + MqttPort2 = rabbit_ct_broker_helpers:get_node_config(Config, 1, tcp_port_mqtt), MqttSslPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls), AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), os:putenv("SSL_CERTS_DIR", CertsDir), os:putenv("MQTT_SSL_PORT", erlang:integer_to_list(MqttSslPort)), os:putenv("MQTT_PORT", erlang:integer_to_list(MqttPort)), + os:putenv("MQTT_PORT_2", erlang:integer_to_list(MqttPort2)), os:putenv("AMQP_PORT", erlang:integer_to_list(AmqpPort)), DataDir = rabbit_ct_helpers:get_config(Config, data_dir), MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests"]), diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile index aaf9b25a0f..2ce61cd08f 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile @@ -5,6 +5,7 @@ MVN_FLAGS += -Ddeps.dir="$(abspath $(DEPS_DIR))" \ -Dcerts.dir=$(SSL_CERTS_DIR) \ -Dmqtt.ssl.port=$(MQTT_SSL_PORT) \ -Dmqtt.port=$(MQTT_PORT) \ + -Dmqtt.port.2=$(MQTT_PORT_2) \ -Damqp.port=$(AMQP_PORT) .PHONY: deps tests clean distclean diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml index 49c9b2fac8..d75adcdeb2 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml @@ -57,6 +57,7 @@ <certs.dir>${certs.dir}</certs.dir> <mqtt.ssl.port>${mqtt.ssl.port}</mqtt.ssl.port> <mqtt.port>${mqtt.port}</mqtt.port> + <mqtt.port.2>${mqtt.port.2}</mqtt.port.2> <amqp.port>${amqp.port}</amqp.port> <test-keystore.ca>${test-keystore.ca}</test-keystore.ca> diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java index 3c06905622..075b2f5016 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java @@ -52,6 +52,7 @@ public class MqttTest implements MqttCallback { private final String host = "localhost"; private final String brokerUrl = "tcp://" + host + ":" + getPort(); + private final String brokerTwoUrl = "tcp://" + host + ":" + getSecondPort(); private String clientId; private String clientId2; private MqttClient client; @@ -78,6 +79,12 @@ public class MqttTest implements MqttCallback { return Integer.parseInt(port.toString()); } + private static int getSecondPort() { + Object port = System.getProperty("mqtt.port.2", "1883"); + Assert.assertNotNull(port); + return Integer.parseInt(port.toString()); + } + private static int getAmqpPort() { Object port = System.getProperty("amqp.port", "5672"); assertNotNull(port); @@ -514,6 +521,14 @@ public class MqttTest implements MqttCallback { client2.disconnect(); } + @Test public void multipleClusterClientIds() throws MqttException, InterruptedException { + client.connect(conOpt); + client2 = new MqttClient(brokerTwoUrl, clientId, null); + client2.connect(conOpt); + waitAtMost(timeout).until(isClientConnected(),equalTo(false)); + client2.disconnect(); + } + @Test public void ping() throws MqttException, InterruptedException { conOpt.setKeepAliveInterval(1); client.connect(conOpt); diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index afabbf064c..574924dda5 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -138,7 +138,7 @@ stats(Config) -> emqttc:unsubscribe(C, [<<"TopicA">>]), timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms %% Retrieve the connection Pid - [{_, {Reader, _}}] = rpc(Config, rabbit_mqtt_collector, list, []), + [{_, Reader}] = rpc(Config, rabbit_mqtt_collector, list, []), [{_, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Reader, [connection]]), %% Verify the content of the metrics, garbage_collection must be present [{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]), |