summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-05-16 17:50:06 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-05-16 17:50:06 +0100
commit7bfe632cb60cc954189bbecbc4f02724d8dd2599 (patch)
tree8ec5070c19d38f7c4dfcf617fd4dcaeb4711c08f
parent3e17e1a80d45780230e80b4b369ce7ae512249db (diff)
downloadrabbitmq-server-git-7bfe632cb60cc954189bbecbc4f02724d8dd2599.tar.gz
Cluster-wide MQTT client id tracking
Uses a ra cluster to keep the client id tracking information - in the state of the ra machine. If nodes are decommissioned from the RMQ cluster, the command decommission_mqtt_node must be invoked first to disconnect the clients on that node and remove the node from the ra cluster. [#135330629]
-rw-r--r--deps/rabbitmq_mqtt/Makefile2
-rw-r--r--deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl77
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt.erl5
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl92
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl8
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl5
-rw-r--r--deps/rabbitmq_mqtt/test/cluster_SUITE.erl130
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE.erl6
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile1
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml1
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java15
-rw-r--r--deps/rabbitmq_mqtt/test/reader_SUITE.erl2
12 files changed, 260 insertions, 84 deletions
diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile
index 53d3857db4..382712c74f 100644
--- a/deps/rabbitmq_mqtt/Makefile
+++ b/deps/rabbitmq_mqtt/Makefile
@@ -30,7 +30,7 @@ define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, []}
endef
-DEPS = ranch rabbit_common rabbit amqp_client
+DEPS = ranch rabbit_common rabbit amqp_client ra
TEST_DEPS = emqttc ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers
dep_ct_helper = git https://github.com/extend/ct_helper.git master
diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl
new file mode 100644
index 0000000000..7dd9cd4874
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand').
+
+-include("rabbit_mqtt.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([scopes/0,
+ switches/0,
+ aliases/0,
+ usage/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+scopes() -> [ctl, diagnostics].
+switches() -> [].
+aliases() -> [].
+
+description() -> <<"Decommissions MQTT cluster member used to track client ids">>.
+
+help_section() ->
+ {plugin, mqtt}.
+
+validate([], _Opts) ->
+ {validation_failure, not_enough_args};
+validate([_, _ | _], _Opts) ->
+ {validation_failure, too_many_args};
+validate([_], _) ->
+ ok.
+
+merge_defaults(Args, Opts) ->
+ {Args, Opts}.
+
+usage() ->
+ <<"decomission_mqtt_node <node>">>.
+
+usage_doc_guides() ->
+ [?MQTT_GUIDE_URL].
+
+run([Node], #{node := NodeName,
+ timeout := Timeout}) ->
+ case rabbit_misc:rpc_call(NodeName, rabbit_mqtt_collector, leave, [Node], Timeout) of
+ {badrpc, _} = Error ->
+ Error;
+ nodedown ->
+ list_to_binary(io_lib:format("Node ~p is down but has been successfully removed"
+ " from the cluster", [Node]));
+ Result ->
+ %% 'ok' or 'timeout'
+ %% TODO: Ra will timeout if the node is not a cluster member - should this be fixed??
+ Result
+ end.
+
+banner(_, _) -> <<"Decomissioning MQTT node used for client id tracking ...">>.
+
+output(Result, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
index be10d2277d..528a680774 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
@@ -26,6 +26,7 @@ start(normal, []) ->
{ok, Listeners} = application:get_env(tcp_listeners),
{ok, SslListeners} = application:get_env(ssl_listeners),
Result = rabbit_mqtt_sup:start_link({Listeners, SslListeners}, []),
+ ok = mqtt_node:start(),
EMPid = case rabbit_event:start_link() of
{ok, Pid} -> Pid;
{error, {already_started, Pid}} -> Pid
@@ -45,7 +46,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
- AggregatorPid, Ref, fun({_, {Pid, _}}) ->
+ AggregatorPid, Ref, fun({_, Pid}) ->
rabbit_mqtt_reader:info(Pid, Items)
end,
rabbit_mqtt_collector:list()).
@@ -53,4 +54,4 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
connection_info_local(Items) ->
Connections = rabbit_mqtt_collector:list(),
[rabbit_mqtt_reader:info(Pid, Items)
- || {_, {Pid, _}} <- Connections].
+ || {_, Pid} <- Connections].
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl
index c025339306..b13d9ec108 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl
@@ -11,91 +11,33 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_mqtt_collector).
--behaviour(gen_server).
-
--export([start_link/0, register/2, unregister/2, list/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--record(state, {client_ids}).
-
--define(SERVER, ?MODULE).
+-export([register/2, unregister/2, list/0, leave/1]).
%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
register(ClientId, Pid) ->
- gen_server:call(rabbit_mqtt_collector, {register, ClientId, Pid}, infinity).
+ run_ra_command({register, ClientId, Pid}).
unregister(ClientId, Pid) ->
- gen_server:call(rabbit_mqtt_collector, {unregister, ClientId, Pid}, infinity).
+ run_ra_command({unregister, ClientId, Pid}).
list() ->
- gen_server:call(rabbit_mqtt_collector, list).
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- {ok, #state{client_ids = #{}}}. % clientid -> {pid, monitor}
+ run_ra_command(list).
-%%--------------------------------------------------------------------------
+leave(NodeBin) ->
+ Node = binary_to_atom(NodeBin, utf8),
+ run_ra_command({leave, Node}),
+ mqtt_node:leave(Node).
-handle_call({register, ClientId, Pid}, _From,
- State = #state{client_ids = Ids}) ->
- Ids1 = case maps:find(ClientId, Ids) of
- {ok, {OldPid, MRef}} when Pid =/= OldPid ->
- catch gen_server2:cast(OldPid, duplicate_id),
- erlang:demonitor(MRef),
- maps:remove(ClientId, Ids);
- error ->
- Ids
- end,
- Ids2 = maps:put(ClientId, {Pid, erlang:monitor(process, Pid)}, Ids1),
- {reply, ok, State#state{client_ids = Ids2}};
-
-handle_call({unregister, ClientId, Pid}, _From, State = #state{client_ids = Ids}) ->
- {Reply, Ids1} = case maps:find(ClientId, Ids) of
- {ok, {Pid, MRef}} -> erlang:demonitor(MRef),
- {ok, maps:remove(ClientId, Ids)};
- _ -> {ok, Ids}
- end,
- {reply, Reply, State#state{ client_ids = Ids1 }};
-
-handle_call(list, _From, State = #state{client_ids = Ids}) ->
- {reply, maps:to_list(Ids), State};
-
-handle_call(Msg, _From, State) ->
- {stop, {unhandled_call, Msg}, State}.
-
-handle_cast(Msg, State) ->
- {stop, {unhandled_cast, Msg}, State}.
-
-handle_info({'EXIT', _, {shutdown, closed}}, State) ->
- {stop, {shutdown, closed}, State};
-
-handle_info({'DOWN', MRef, process, DownPid, _Reason},
- State = #state{client_ids = Ids}) ->
- Ids1 = maps:filter(fun (ClientId, {Pid, M})
- when Pid =:= DownPid, MRef =:= M ->
- rabbit_log_connection:warning(
- "MQTT disconnect from ~p~n",
- [ClientId]),
- false;
- (_, _) ->
- true
- end, Ids),
- {noreply, State #state{ client_ids = Ids1 }}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+%%----------------------------------------------------------------------------
+-spec run_ra_command(term()) -> term() | {error, term()}.
+run_ra_command(RaCommand) ->
+ NodeId = mqtt_node:node_id(),
+ case ra:process_command(NodeId, RaCommand) of
+ {ok, Result, _} -> Result;
+ _ = Error -> Error
+ end.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
index 1352f9e6c4..143c8bf9f6 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
@@ -111,6 +111,14 @@ handle_cast(duplicate_id,
[rabbit_mqtt_processor:info(client_id, PState), ConnName]),
{stop, {shutdown, duplicate_id}, State};
+handle_cast(decommission_node,
+ State = #state{ proc_state = PState,
+ conn_name = ConnName }) ->
+ rabbit_log_connection:warning("MQTT disconnecting client id ~p (~p) as node is about"
+ " to be decommissioned~n",
+ [rabbit_mqtt_processor:info(client_id, PState), ConnName]),
+ {stop, {shutdown, decommission_node}, State};
+
handle_cast(Msg, State) ->
{stop, {mqtt_unexpected_cast, Msg}, State}.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
index 4783149cd5..34d5ccd481 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
@@ -38,10 +38,7 @@ init([{Listeners, SslListeners0}]) ->
end}
end,
{ok, {{one_for_all, 10, 10},
- [{collector,
- {rabbit_mqtt_collector, start_link, []},
- transient, ?WORKER_WAIT, worker, [rabbit_mqtt_collector]},
- {rabbit_mqtt_retainer_sup,
+ [{rabbit_mqtt_retainer_sup,
{rabbit_mqtt_retainer_sup, start_link, [{local, rabbit_mqtt_retainer_sup}]},
transient, ?SUPERVISOR_WAIT, supervisor, [rabbit_mqtt_retainer_sup]} |
listener_specs(fun tcp_listener_spec/1,
diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl
new file mode 100644
index 0000000000..593b7e69db
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl
@@ -0,0 +1,130 @@
+-module(cluster_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ nodedown,
+ decommission_node
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, basic},
+ {collect_statistics_interval, 100}
+ ]}).
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
+ tcp_port_mqtt_tls_extra]},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 3}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testsuite cases
+%% -------------------------------------------------------------------
+
+nodedown(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ unlink(C),
+ MRef = erlang:monitor(process, C),
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ receive
+ {'DOWN', MRef, _, _, _} ->
+ ok
+ after
+ 30000 ->
+ exit(missing_down_message)
+ end,
+ [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []).
+
+decommission_node(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ unlink(C),
+ MRef = erlang:monitor(process, C),
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]),
+ receive
+ {'DOWN', MRef, _, _, _} ->
+ ok
+ after
+ 30000 ->
+ exit(missing_down_message)
+ end,
+ [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []).
+
+expect_publishes(_Topic, []) -> ok;
+expect_publishes(Topic, [Payload|Rest]) ->
+ receive
+ {publish, Topic, Payload} -> expect_publishes(Topic, Rest)
+ after 5000 ->
+ throw({publish_not_delivered, Payload})
+ end.
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl
index 5fdf9b88fd..f924a8998a 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl
@@ -58,7 +58,9 @@ init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
- {rmq_certspwd, "bunnychow"}
+ {rmq_certspwd, "bunnychow"},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 2}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
[ fun merge_app_env/1 ] ++
@@ -105,11 +107,13 @@ end_per_testcase(Testcase, Config) ->
java(Config) ->
CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir),
MqttPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ MqttPort2 = rabbit_ct_broker_helpers:get_node_config(Config, 1, tcp_port_mqtt),
MqttSslPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
os:putenv("SSL_CERTS_DIR", CertsDir),
os:putenv("MQTT_SSL_PORT", erlang:integer_to_list(MqttSslPort)),
os:putenv("MQTT_PORT", erlang:integer_to_list(MqttPort)),
+ os:putenv("MQTT_PORT_2", erlang:integer_to_list(MqttPort2)),
os:putenv("AMQP_PORT", erlang:integer_to_list(AmqpPort)),
DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests"]),
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
index aaf9b25a0f..2ce61cd08f 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
@@ -5,6 +5,7 @@ MVN_FLAGS += -Ddeps.dir="$(abspath $(DEPS_DIR))" \
-Dcerts.dir=$(SSL_CERTS_DIR) \
-Dmqtt.ssl.port=$(MQTT_SSL_PORT) \
-Dmqtt.port=$(MQTT_PORT) \
+ -Dmqtt.port.2=$(MQTT_PORT_2) \
-Damqp.port=$(AMQP_PORT)
.PHONY: deps tests clean distclean
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
index 49c9b2fac8..d75adcdeb2 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
@@ -57,6 +57,7 @@
<certs.dir>${certs.dir}</certs.dir>
<mqtt.ssl.port>${mqtt.ssl.port}</mqtt.ssl.port>
<mqtt.port>${mqtt.port}</mqtt.port>
+ <mqtt.port.2>${mqtt.port.2}</mqtt.port.2>
<amqp.port>${amqp.port}</amqp.port>
<test-keystore.ca>${test-keystore.ca}</test-keystore.ca>
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
index 3c06905622..075b2f5016 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
@@ -52,6 +52,7 @@ public class MqttTest implements MqttCallback {
private final String host = "localhost";
private final String brokerUrl = "tcp://" + host + ":" + getPort();
+ private final String brokerTwoUrl = "tcp://" + host + ":" + getSecondPort();
private String clientId;
private String clientId2;
private MqttClient client;
@@ -78,6 +79,12 @@ public class MqttTest implements MqttCallback {
return Integer.parseInt(port.toString());
}
+ private static int getSecondPort() {
+ Object port = System.getProperty("mqtt.port.2", "1883");
+ Assert.assertNotNull(port);
+ return Integer.parseInt(port.toString());
+ }
+
private static int getAmqpPort() {
Object port = System.getProperty("amqp.port", "5672");
assertNotNull(port);
@@ -514,6 +521,14 @@ public class MqttTest implements MqttCallback {
client2.disconnect();
}
+ @Test public void multipleClusterClientIds() throws MqttException, InterruptedException {
+ client.connect(conOpt);
+ client2 = new MqttClient(brokerTwoUrl, clientId, null);
+ client2.connect(conOpt);
+ waitAtMost(timeout).until(isClientConnected(),equalTo(false));
+ client2.disconnect();
+ }
+
@Test public void ping() throws MqttException, InterruptedException {
conOpt.setKeepAliveInterval(1);
client.connect(conOpt);
diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
index afabbf064c..574924dda5 100644
--- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
@@ -138,7 +138,7 @@ stats(Config) ->
emqttc:unsubscribe(C, [<<"TopicA">>]),
timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
%% Retrieve the connection Pid
- [{_, {Reader, _}}] = rpc(Config, rabbit_mqtt_collector, list, []),
+ [{_, Reader}] = rpc(Config, rabbit_mqtt_collector, list, []),
[{_, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Reader, [connection]]),
%% Verify the content of the metrics, garbage_collection must be present
[{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]),