summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/test
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_mqtt/test')
-rw-r--r--deps/rabbitmq_mqtt/test/cluster_SUITE.erl130
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE.erl6
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile1
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml1
-rw-r--r--deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java15
-rw-r--r--deps/rabbitmq_mqtt/test/reader_SUITE.erl2
6 files changed, 153 insertions, 2 deletions
diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl
new file mode 100644
index 0000000000..593b7e69db
--- /dev/null
+++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl
@@ -0,0 +1,130 @@
+-module(cluster_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ nodedown,
+ decommission_node
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, basic},
+ {collect_statistics_interval, 100}
+ ]}).
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {rmq_extra_tcp_ports, [tcp_port_mqtt_extra,
+ tcp_port_mqtt_tls_extra]},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 3}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testsuite cases
+%% -------------------------------------------------------------------
+
+nodedown(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ unlink(C),
+ MRef = erlang:monitor(process, C),
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ receive
+ {'DOWN', MRef, _, _, _} ->
+ ok
+ after
+ 30000 ->
+ exit(missing_down_message)
+ end,
+ [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []).
+
+decommission_node(Config) ->
+ P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, C} = emqttc:start_link([{host, "localhost"},
+ {port, P},
+ {client_id, <<"simpleClient">>},
+ {proto_ver, 3},
+ {logger, info},
+ {puback_timeout, 1}]),
+ unlink(C),
+ MRef = erlang:monitor(process, C),
+ emqttc:subscribe(C, <<"TopicA">>, qos0),
+ emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
+ expect_publishes(<<"TopicA">>, [<<"Payload">>]),
+
+ [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]),
+ receive
+ {'DOWN', MRef, _, _, _} ->
+ ok
+ after
+ 30000 ->
+ exit(missing_down_message)
+ end,
+ [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []).
+
+expect_publishes(_Topic, []) -> ok;
+expect_publishes(Topic, [Payload|Rest]) ->
+ receive
+ {publish, Topic, Payload} -> expect_publishes(Topic, Rest)
+ after 5000 ->
+ throw({publish_not_delivered, Payload})
+ end.
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl
index 5fdf9b88fd..f924a8998a 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl
@@ -58,7 +58,9 @@ init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE},
- {rmq_certspwd, "bunnychow"}
+ {rmq_certspwd, "bunnychow"},
+ {rmq_nodes_clustered, true},
+ {rmq_nodes_count, 2}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
[ fun merge_app_env/1 ] ++
@@ -105,11 +107,13 @@ end_per_testcase(Testcase, Config) ->
java(Config) ->
CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir),
MqttPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
+ MqttPort2 = rabbit_ct_broker_helpers:get_node_config(Config, 1, tcp_port_mqtt),
MqttSslPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls),
AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
os:putenv("SSL_CERTS_DIR", CertsDir),
os:putenv("MQTT_SSL_PORT", erlang:integer_to_list(MqttSslPort)),
os:putenv("MQTT_PORT", erlang:integer_to_list(MqttPort)),
+ os:putenv("MQTT_PORT_2", erlang:integer_to_list(MqttPort2)),
os:putenv("AMQP_PORT", erlang:integer_to_list(AmqpPort)),
DataDir = rabbit_ct_helpers:get_config(Config, data_dir),
MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests"]),
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
index aaf9b25a0f..2ce61cd08f 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile
@@ -5,6 +5,7 @@ MVN_FLAGS += -Ddeps.dir="$(abspath $(DEPS_DIR))" \
-Dcerts.dir=$(SSL_CERTS_DIR) \
-Dmqtt.ssl.port=$(MQTT_SSL_PORT) \
-Dmqtt.port=$(MQTT_PORT) \
+ -Dmqtt.port.2=$(MQTT_PORT_2) \
-Damqp.port=$(AMQP_PORT)
.PHONY: deps tests clean distclean
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
index 49c9b2fac8..d75adcdeb2 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml
@@ -57,6 +57,7 @@
<certs.dir>${certs.dir}</certs.dir>
<mqtt.ssl.port>${mqtt.ssl.port}</mqtt.ssl.port>
<mqtt.port>${mqtt.port}</mqtt.port>
+ <mqtt.port.2>${mqtt.port.2}</mqtt.port.2>
<amqp.port>${amqp.port}</amqp.port>
<test-keystore.ca>${test-keystore.ca}</test-keystore.ca>
diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
index 3c06905622..075b2f5016 100644
--- a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
+++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java
@@ -52,6 +52,7 @@ public class MqttTest implements MqttCallback {
private final String host = "localhost";
private final String brokerUrl = "tcp://" + host + ":" + getPort();
+ private final String brokerTwoUrl = "tcp://" + host + ":" + getSecondPort();
private String clientId;
private String clientId2;
private MqttClient client;
@@ -78,6 +79,12 @@ public class MqttTest implements MqttCallback {
return Integer.parseInt(port.toString());
}
+ private static int getSecondPort() {
+ Object port = System.getProperty("mqtt.port.2", "1883");
+ Assert.assertNotNull(port);
+ return Integer.parseInt(port.toString());
+ }
+
private static int getAmqpPort() {
Object port = System.getProperty("amqp.port", "5672");
assertNotNull(port);
@@ -514,6 +521,14 @@ public class MqttTest implements MqttCallback {
client2.disconnect();
}
+ @Test public void multipleClusterClientIds() throws MqttException, InterruptedException {
+ client.connect(conOpt);
+ client2 = new MqttClient(brokerTwoUrl, clientId, null);
+ client2.connect(conOpt);
+ waitAtMost(timeout).until(isClientConnected(),equalTo(false));
+ client2.disconnect();
+ }
+
@Test public void ping() throws MqttException, InterruptedException {
conOpt.setKeepAliveInterval(1);
client.connect(conOpt);
diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
index afabbf064c..574924dda5 100644
--- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl
@@ -138,7 +138,7 @@ stats(Config) ->
emqttc:unsubscribe(C, [<<"TopicA">>]),
timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
%% Retrieve the connection Pid
- [{_, {Reader, _}}] = rpc(Config, rabbit_mqtt_collector, list, []),
+ [{_, Reader}] = rpc(Config, rabbit_mqtt_collector, list, []),
[{_, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Reader, [connection]]),
%% Verify the content of the metrics, garbage_collection must be present
[{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]),