diff options
Diffstat (limited to 'deps/rabbitmq_mqtt/test')
6 files changed, 153 insertions, 2 deletions
diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl new file mode 100644 index 0000000000..593b7e69db --- /dev/null +++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl @@ -0,0 +1,130 @@ +-module(cluster_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + nodedown, + decommission_node + ]} + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, basic}, + {collect_statistics_interval, 100} + ]}). + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase}, + {rmq_extra_tcp_ports, [tcp_port_mqtt_extra, + tcp_port_mqtt_tls_extra]}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 3} + ]), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testsuite cases +%% ------------------------------------------------------------------- + +nodedown(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + unlink(C), + MRef = erlang:monitor(process, C), + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + 30000 -> + exit(missing_down_message) + end, + [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []). + +decommission_node(Config) -> + P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {ok, C} = emqttc:start_link([{host, "localhost"}, + {port, P}, + {client_id, <<"simpleClient">>}, + {proto_ver, 3}, + {logger, info}, + {puback_timeout, 1}]), + unlink(C), + MRef = erlang:monitor(process, C), + emqttc:subscribe(C, <<"TopicA">>, qos0), + emqttc:publish(C, <<"TopicA">>, <<"Payload">>), + expect_publishes(<<"TopicA">>, [<<"Payload">>]), + + [_] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []), + {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]), + receive + {'DOWN', MRef, _, _, _} -> + ok + after + 30000 -> + exit(missing_down_message) + end, + [] = rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_mqtt_collector, list, []). + +expect_publishes(_Topic, []) -> ok; +expect_publishes(Topic, [Payload|Rest]) -> + receive + {publish, Topic, Payload} -> expect_publishes(Topic, Rest) + after 5000 -> + throw({publish_not_delivered, Payload}) + end. diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl index 5fdf9b88fd..f924a8998a 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl @@ -58,7 +58,9 @@ init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config(Config, [ {rmq_nodename_suffix, ?MODULE}, - {rmq_certspwd, "bunnychow"} + {rmq_certspwd, "bunnychow"}, + {rmq_nodes_clustered, true}, + {rmq_nodes_count, 2} ]), rabbit_ct_helpers:run_setup_steps(Config1, [ fun merge_app_env/1 ] ++ @@ -105,11 +107,13 @@ end_per_testcase(Testcase, Config) -> java(Config) -> CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir), MqttPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt), + MqttPort2 = rabbit_ct_broker_helpers:get_node_config(Config, 1, tcp_port_mqtt), MqttSslPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt_tls), AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), os:putenv("SSL_CERTS_DIR", CertsDir), os:putenv("MQTT_SSL_PORT", erlang:integer_to_list(MqttSslPort)), os:putenv("MQTT_PORT", erlang:integer_to_list(MqttPort)), + os:putenv("MQTT_PORT_2", erlang:integer_to_list(MqttPort2)), os:putenv("AMQP_PORT", erlang:integer_to_list(AmqpPort)), DataDir = rabbit_ct_helpers:get_config(Config, data_dir), MakeResult = rabbit_ct_helpers:make(Config, DataDir, ["tests"]), diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile index aaf9b25a0f..2ce61cd08f 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/Makefile @@ -5,6 +5,7 @@ MVN_FLAGS += -Ddeps.dir="$(abspath $(DEPS_DIR))" \ -Dcerts.dir=$(SSL_CERTS_DIR) \ -Dmqtt.ssl.port=$(MQTT_SSL_PORT) \ -Dmqtt.port=$(MQTT_PORT) \ + -Dmqtt.port.2=$(MQTT_PORT_2) \ -Damqp.port=$(AMQP_PORT) .PHONY: deps tests clean distclean diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml index 49c9b2fac8..d75adcdeb2 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/pom.xml @@ -57,6 +57,7 @@ <certs.dir>${certs.dir}</certs.dir> <mqtt.ssl.port>${mqtt.ssl.port}</mqtt.ssl.port> <mqtt.port>${mqtt.port}</mqtt.port> + <mqtt.port.2>${mqtt.port.2}</mqtt.port.2> <amqp.port>${amqp.port}</amqp.port> <test-keystore.ca>${test-keystore.ca}</test-keystore.ca> diff --git a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java index 3c06905622..075b2f5016 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java +++ b/deps/rabbitmq_mqtt/test/java_SUITE_data/src/test/java/com/rabbitmq/mqtt/test/MqttTest.java @@ -52,6 +52,7 @@ public class MqttTest implements MqttCallback { private final String host = "localhost"; private final String brokerUrl = "tcp://" + host + ":" + getPort(); + private final String brokerTwoUrl = "tcp://" + host + ":" + getSecondPort(); private String clientId; private String clientId2; private MqttClient client; @@ -78,6 +79,12 @@ public class MqttTest implements MqttCallback { return Integer.parseInt(port.toString()); } + private static int getSecondPort() { + Object port = System.getProperty("mqtt.port.2", "1883"); + Assert.assertNotNull(port); + return Integer.parseInt(port.toString()); + } + private static int getAmqpPort() { Object port = System.getProperty("amqp.port", "5672"); assertNotNull(port); @@ -514,6 +521,14 @@ public class MqttTest implements MqttCallback { client2.disconnect(); } + @Test public void multipleClusterClientIds() throws MqttException, InterruptedException { + client.connect(conOpt); + client2 = new MqttClient(brokerTwoUrl, clientId, null); + client2.connect(conOpt); + waitAtMost(timeout).until(isClientConnected(),equalTo(false)); + client2.disconnect(); + } + @Test public void ping() throws MqttException, InterruptedException { conOpt.setKeepAliveInterval(1); client.connect(conOpt); diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index afabbf064c..574924dda5 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -138,7 +138,7 @@ stats(Config) -> emqttc:unsubscribe(C, [<<"TopicA">>]), timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms %% Retrieve the connection Pid - [{_, {Reader, _}}] = rpc(Config, rabbit_mqtt_collector, list, []), + [{_, Reader}] = rpc(Config, rabbit_mqtt_collector, list, []), [{_, Pid}] = rpc(Config, rabbit_mqtt_reader, info, [Reader, [connection]]), %% Verify the content of the metrics, garbage_collection must be present [{Pid, Props}] = rpc(Config, ets, lookup, [connection_metrics, Pid]), |