diff options
Diffstat (limited to 'deps/rabbitmq_stomp/test')
39 files changed, 4147 insertions, 0 deletions
diff --git a/deps/rabbitmq_stomp/test/amqqueue_SUITE.erl b/deps/rabbitmq_stomp/test/amqqueue_SUITE.erl new file mode 100644 index 0000000000..0474fd67d6 --- /dev/null +++ b/deps/rabbitmq_stomp/test/amqqueue_SUITE.erl @@ -0,0 +1,319 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(amqqueue_SUITE). + +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp.hrl"). +-include("rabbit_stomp_frame.hrl"). +-include("rabbit_stomp_headers.hrl"). + +-define(QUEUE, <<"TestQueue">>). +-define(DESTINATION, "/amq/queue/TestQueue"). + +all() -> + [{group, version_to_group_name(V)} || V <- ?SUPPORTED_VERSIONS]. + +groups() -> + Tests = [ + publish_no_dest_error, + publish_unauthorized_error, + subscribe_error, + subscribe, + unsubscribe_ack, + subscribe_ack, + send, + delete_queue_subscribe, + temp_destination_queue, + temp_destination_in_send, + blank_destination_in_send + ], + + [{version_to_group_name(V), [sequence], Tests} + || V <- ?SUPPORTED_VERSIONS]. + +version_to_group_name(V) -> + list_to_atom(re:replace("version_" ++ V, + "\\.", + "_", + [global, {return, list}])). + +init_per_suite(Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(Group, Config) -> + Suffix = string:sub_string(atom_to_list(Group), 9), + Version = re:replace(Suffix, "_", ".", [global, {return, list}]), + rabbit_ct_helpers:set_config(Config, [{version, Version}]). + +end_per_group(_Group, Config) -> Config. + +init_per_testcase(TestCase, Config) -> + Version = ?config(version, Config), + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + {ok, Connection} = amqp_connection:start(#amqp_params_direct{ + node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename) + }), + {ok, Channel} = amqp_connection:open_channel(Connection), + {ok, Client} = rabbit_stomp_client:connect(Version, StompPort), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {amqp_connection, Connection}, + {amqp_channel, Channel}, + {stomp_client, Client} + ]), + init_per_testcase0(TestCase, Config1). + +end_per_testcase(TestCase, Config) -> + Connection = ?config(amqp_connection, Config), + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + rabbit_stomp_client:disconnect(Client), + amqp_channel:close(Channel), + amqp_connection:close(Connection), + end_per_testcase0(TestCase, Config). + +init_per_testcase0(publish_unauthorized_error, Config) -> + Channel = ?config(amqp_channel, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = <<"RestrictedQueue">>, + auto_delete = true}), + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, add_user, + [<<"user">>, <<"pass">>, <<"acting-user">>]), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_permissions, [ + <<"user">>, <<"/">>, <<"nothing">>, <<"nothing">>, <<"nothing">>, <<"acting-user">>]), + Version = ?config(version, Config), + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + {ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort), + rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]); +init_per_testcase0(_, Config) -> + Config. + +end_per_testcase0(publish_unauthorized_error, Config) -> + ClientFoo = ?config(client_foo, Config), + rabbit_stomp_client:disconnect(ClientFoo), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, delete_user, + [<<"user">>, <<"acting-user">>]), + Config; +end_per_testcase0(_, Config) -> + Config. + +publish_no_dest_error(Config) -> + Client = ?config(stomp_client, Config), + rabbit_stomp_client:send( + Client, "SEND", [{"destination", "/exchange/non-existent"}], ["hello"]), + {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"), + "not_found" = proplists:get_value("message", Hdrs), + ok. + +publish_unauthorized_error(Config) -> + ClientFoo = ?config(client_foo, Config), + rabbit_stomp_client:send( + ClientFoo, "SEND", [{"destination", "/amq/queue/RestrictedQueue"}], ["hello"]), + {ok, _Client1, Hdrs, _} = stomp_receive(ClientFoo, "ERROR"), + "access_refused" = proplists:get_value("message", Hdrs), + ok. + +subscribe_error(Config) -> + Client = ?config(stomp_client, Config), + %% SUBSCRIBE to missing queue + rabbit_stomp_client:send( + Client, "SUBSCRIBE", [{"destination", ?DESTINATION}]), + {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"), + "not_found" = proplists:get_value("message", Hdrs), + ok. + +subscribe(Config) -> + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + + %% subscribe and wait for receipt + rabbit_stomp_client:send( + Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "foo"}]), + {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"), + + %% send from amqp + Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE}, + + amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{}, + payload = <<"hello">>}), + + {ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"), + ok. + +unsubscribe_ack(Config) -> + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + Version = ?config(version, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + %% subscribe and wait for receipt + rabbit_stomp_client:send( + Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, + {"receipt", "rcpt1"}, + {"ack", "client"}, + {"id", "subscription-id"}]), + {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"), + + %% send from amqp + Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE}, + + amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{}, + payload = <<"hello">>}), + + {ok, Client2, Hdrs1, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"), + + rabbit_stomp_client:send( + Client2, "UNSUBSCRIBE", [{"destination", ?DESTINATION}, + {"id", "subscription-id"}]), + + rabbit_stomp_client:send( + Client2, "ACK", [{rabbit_stomp_util:ack_header_name(Version), + proplists:get_value( + rabbit_stomp_util:msg_header_name(Version), Hdrs1)}, + {"receipt", "rcpt2"}]), + + {ok, _Client3, Hdrs2, _Body2} = stomp_receive(Client2, "ERROR"), + ?assertEqual("Subscription not found", + proplists:get_value("message", Hdrs2)), + ok. + +subscribe_ack(Config) -> + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + Version = ?config(version, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + + %% subscribe and wait for receipt + rabbit_stomp_client:send( + Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, + {"receipt", "foo"}, + {"ack", "client"}]), + {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"), + + %% send from amqp + Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE}, + + amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{}, + payload = <<"hello">>}), + + {ok, _Client2, Headers, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"), + false = (Version == "1.2") xor proplists:is_defined(?HEADER_ACK, Headers), + + MsgHeader = rabbit_stomp_util:msg_header_name(Version), + AckValue = proplists:get_value(MsgHeader, Headers), + AckHeader = rabbit_stomp_util:ack_header_name(Version), + + rabbit_stomp_client:send(Client, "ACK", [{AckHeader, AckValue}]), + #'basic.get_empty'{} = + amqp_channel:call(Channel, #'basic.get'{queue = ?QUEUE}), + ok. + +send(Config) -> + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + + %% subscribe and wait for receipt + rabbit_stomp_client:send( + Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "foo"}]), + {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"), + + %% send from stomp + rabbit_stomp_client:send( + Client1, "SEND", [{"destination", ?DESTINATION}], ["hello"]), + + {ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"), + ok. + +delete_queue_subscribe(Config) -> + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + + %% subscribe and wait for receipt + rabbit_stomp_client:send( + Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "bah"}]), + {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"), + + %% delete queue while subscribed + #'queue.delete_ok'{} = + amqp_channel:call(Channel, #'queue.delete'{queue = ?QUEUE}), + + {ok, _Client2, Headers, _} = stomp_receive(Client1, "ERROR"), + + ?DESTINATION = proplists:get_value("subscription", Headers), + + % server closes connection + ok. + +temp_destination_queue(Config) -> + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE, + auto_delete = true}), + rabbit_stomp_client:send( Client, "SEND", [{"destination", ?DESTINATION}, + {"reply-to", "/temp-queue/foo"}], + ["ping"]), + amqp_channel:call(Channel,#'basic.consume'{queue = ?QUEUE, no_ack = true}), + receive #'basic.consume_ok'{consumer_tag = _Tag} -> ok end, + ReplyTo = receive {#'basic.deliver'{delivery_tag = _DTag}, + #'amqp_msg'{payload = <<"ping">>, + props = #'P_basic'{reply_to = RT}}} -> RT + end, + ok = amqp_channel:call(Channel, + #'basic.publish'{routing_key = ReplyTo}, + #amqp_msg{payload = <<"pong">>}), + {ok, _Client1, _, [<<"pong">>]} = stomp_receive(Client, "MESSAGE"), + ok. + +temp_destination_in_send(Config) -> + Client = ?config(stomp_client, Config), + rabbit_stomp_client:send( Client, "SEND", [{"destination", "/temp-queue/foo"}], + ["poing"]), + {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"), + "Invalid destination" = proplists:get_value("message", Hdrs), + ok. + +blank_destination_in_send(Config) -> + Client = ?config(stomp_client, Config), + rabbit_stomp_client:send( Client, "SEND", [{"destination", ""}], + ["poing"]), + {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"), + "Invalid destination" = proplists:get_value("message", Hdrs), + ok. + +stomp_receive(Client, Command) -> + {#stomp_frame{command = Command, + headers = Hdrs, + body_iolist = Body}, Client1} = + rabbit_stomp_client:recv(Client), + {ok, Client1, Hdrs, Body}. + diff --git a/deps/rabbitmq_stomp/test/command_SUITE.erl b/deps/rabbitmq_stomp/test/command_SUITE.erl new file mode 100644 index 0000000000..8fe9fa0d0f --- /dev/null +++ b/deps/rabbitmq_stomp/test/command_SUITE.erl @@ -0,0 +1,127 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(command_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp.hrl"). + + +-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStompConnectionsCommand'). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + merge_defaults, + run + ]} + ]. + +init_per_suite(Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +merge_defaults(_Config) -> + {[<<"session_id">>, <<"conn_name">>], #{verbose := false}} = + ?COMMAND:merge_defaults([], #{}), + + {[<<"other_key">>], #{verbose := true}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}), + + {[<<"other_key">>], #{verbose := false}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}). + + +run(Config) -> + + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = #{node => Node, timeout => 10000, verbose => false}, + + %% No connections + [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)), + + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + + {ok, _Client} = rabbit_stomp_client:connect(StompPort), + ct:sleep(100), + + [[{session_id, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)), + + + {ok, _Client2} = rabbit_stomp_client:connect(StompPort), + ct:sleep(100), + + [[{session_id, _}], [{session_id, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)), + + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + start_amqp_connection(network, Node, Port), + + %% There are still just two connections + [[{session_id, _}], [{session_id, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)), + + start_amqp_connection(direct, Node, Port), + + %% Still two MQTT connections, one direct AMQP 0-9-1 connection + [[{session_id, _}], [{session_id, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)), + + %% Verbose returns all keys + Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS), + AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)), + AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})), + + %% There are two connections + [First, _Second] = AllKeys, + + %% Keys are INFO_ITEMS + KeysCount = length(?INFO_ITEMS), + KeysCount = length(First), + + {Keys, _} = lists:unzip(First), + + [] = Keys -- ?INFO_ITEMS, + [] = ?INFO_ITEMS -- Keys. + + +start_amqp_connection(Type, Node, Port) -> + Params = amqp_params(Type, Node, Port), + {ok, _Connection} = amqp_connection:start(Params). + +amqp_params(network, _, Port) -> + #amqp_params_network{port = Port}; +amqp_params(direct, Node, _) -> + #amqp_params_direct{node = Node}. diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE.erl b/deps/rabbitmq_stomp/test/config_schema_SUITE.erl new file mode 100644 index 0000000000..8d340810f7 --- /dev/null +++ b/deps/rabbitmq_stomp/test/config_schema_SUITE.erl @@ -0,0 +1,55 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(config_schema_SUITE). + +-compile(export_all). + +all() -> + [ + run_snippets + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Config1 = rabbit_ct_helpers:run_setup_steps(Config), + rabbit_ct_config_schema:init_schemas(rabbitmq_stomp, Config1). + + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +run_snippets(Config) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, run_snippets1, [Config]). + +run_snippets1(Config) -> + rabbit_ct_config_schema:run_snippets(Config). + diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem new file mode 100644 index 0000000000..eaf6b67806 --- /dev/null +++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem @@ -0,0 +1 @@ +I'm not a certificate diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem new file mode 100644 index 0000000000..eaf6b67806 --- /dev/null +++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem @@ -0,0 +1 @@ +I'm not a certificate diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem new file mode 100644 index 0000000000..eaf6b67806 --- /dev/null +++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem @@ -0,0 +1 @@ +I'm not a certificate diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets new file mode 100644 index 0000000000..6081240c68 --- /dev/null +++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets @@ -0,0 +1,97 @@ +[{listener_port, + "stomp.listeners.tcp.1 = 12345", + [{rabbitmq_stomp,[{tcp_listeners,[12345]}]}], + [rabbitmq_stomp]}, + {listeners_ip, + "stomp.listeners.tcp.1 = 127.0.0.1:61613 + stomp.listeners.tcp.2 = ::1:61613", + [{rabbitmq_stomp,[{tcp_listeners,[{"127.0.0.1",61613},{"::1",61613}]}]}], + [rabbitmq_stomp]}, + + {listener_tcp_options, + "stomp.listeners.tcp.1 = 127.0.0.1:61613 + stomp.listeners.tcp.2 = ::1:61613 + + stomp.tcp_listen_options.backlog = 2048 + stomp.tcp_listen_options.recbuf = 8192 + stomp.tcp_listen_options.sndbuf = 8192 + + stomp.tcp_listen_options.keepalive = true + stomp.tcp_listen_options.nodelay = true + + stomp.tcp_listen_options.exit_on_close = true + + stomp.tcp_listen_options.send_timeout = 120 +", + [{rabbitmq_stomp,[ + {tcp_listeners,[ + {"127.0.0.1",61613}, + {"::1",61613} + ]} + , {tcp_listen_options, [ + {backlog, 2048}, + {exit_on_close, true}, + + {recbuf, 8192}, + {sndbuf, 8192}, + + {send_timeout, 120}, + + {keepalive, true}, + {nodelay, true} + ]} + ]}], + [rabbitmq_stomp]}, + + {ssl, + "ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem + ssl_options.certfile = test/config_schema_SUITE_data/certs/cert.pem + ssl_options.keyfile = test/config_schema_SUITE_data/certs/key.pem + ssl_options.verify = verify_peer + ssl_options.fail_if_no_peer_cert = true + + stomp.listeners.tcp.1 = 61613 + stomp.listeners.ssl.1 = 61614", + [{rabbit, + [{ssl_options, + [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, + {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, + {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, + {verify,verify_peer}, + {fail_if_no_peer_cert,true}]}]}, + {rabbitmq_stomp,[{tcp_listeners,[61613]},{ssl_listeners,[61614]}]}], + [rabbitmq_stomp]}, + {defaults, + "stomp.default_user = guest + stomp.default_pass = guest + stomp.proxy_protocol = false + stomp.hide_server_info = false", + [{rabbitmq_stomp,[{default_user,[{login,"guest"},{passcode,"guest"}]}, + {proxy_protocol,false},{hide_server_info,false}]}], + [rabbitmq_stomp]}, + {ssl_cert_login, + "stomp.ssl_cert_login = true", + [{rabbitmq_stomp,[{ssl_cert_login,true}]}], + [rabbitmq_stomp]}, + {proxy_protocol, + "stomp.default_user = guest + stomp.default_pass = guest + stomp.implicit_connect = true + stomp.proxy_protocol = true", + [{rabbitmq_stomp,[{default_user,[{login,"guest"},{passcode,"guest"}]}, + {implicit_connect,true}, + {proxy_protocol,true}]}], + [rabbitmq_stomp]}, + {default_vhost, + "stomp.default_vhost = /", + [{rabbitmq_stomp,[{default_vhost,<<"/">>}]}], + [rabbitmq_stomp]}, + {default_topic_exchange, + "stomp.default_topic_exchange = my.fancy.topic", + [{rabbitmq_stomp,[{default_topic_exchange,<<"my.fancy.topic">>}]}], + [rabbitmq_stomp]}, + {hide_server_info, + "stomp.hide_server_info = true", + [{rabbitmq_stomp,[{hide_server_info,true}]}], + [rabbitmq_stomp]} +]. diff --git a/deps/rabbitmq_stomp/test/connections_SUITE.erl b/deps/rabbitmq_stomp/test/connections_SUITE.erl new file mode 100644 index 0000000000..4f9b027bb9 --- /dev/null +++ b/deps/rabbitmq_stomp/test/connections_SUITE.erl @@ -0,0 +1,160 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(connections_SUITE). +-compile(export_all). + +-import(rabbit_misc, [pget/2]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp_frame.hrl"). +-define(DESTINATION, "/queue/bulk-test"). + +all() -> + [ + messages_not_dropped_on_disconnect, + direct_client_connections_are_not_leaked, + stats_are_not_leaked, + stats, + heartbeat + ]. + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [ + {collect_statistics, basic}, + {collect_statistics_interval, 100} + ]}). + +init_per_suite(Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()). + + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +-define(GARBAGE, <<"bdaf63dda9d78b075c748b740e7c3510ad203b07\nbdaf63dd">>). + +count_connections(Config) -> + StompPort = get_stomp_port(Config), + %% The default port is 61613 but it's in the middle of the ephemeral + %% ports range on many operating systems. Therefore, there is a + %% chance this port is already in use. Let's use a port close to the + %% AMQP default port. + IPv4Count = try + %% Count IPv4 connections. On some platforms, the IPv6 listener + %% implicitely listens to IPv4 connections too so the IPv4 + %% listener doesn't exist. Thus this try/catch. This is the case + %% with Linux where net.ipv6.bindv6only is disabled (default in + %% most cases). + rpc_count_connections(Config, {acceptor, {0,0,0,0}, StompPort}) + catch + _:{badarg, _} -> 0; + _:Other -> exit({foo, Other}) + end, + IPv6Count = try + %% Count IPv6 connections. We also use a try/catch block in case + %% the host is not configured for IPv6. + rpc_count_connections(Config, {acceptor, {0,0,0,0,0,0,0,0}, StompPort}) + catch + _:{badarg, _} -> 0; + _:Other1 -> exit({foo, Other1}) + end, + IPv4Count + IPv6Count. + +rpc_count_connections(Config, ConnSpec) -> + rabbit_ct_broker_helpers:rpc(Config, 0, + ranch_server, count_connections, [ConnSpec]). + +direct_client_connections_are_not_leaked(Config) -> + StompPort = get_stomp_port(Config), + N = count_connections(Config), + lists:foreach(fun (_) -> + {ok, Client = {Socket, _}} = rabbit_stomp_client:connect(StompPort), + %% send garbage which trips up the parser + gen_tcp:send(Socket, ?GARBAGE), + rabbit_stomp_client:send( + Client, "LOL", [{"", ""}]) + end, + lists:seq(1, 100)), + timer:sleep(5000), + N = count_connections(Config), + ok. + +messages_not_dropped_on_disconnect(Config) -> + StompPort = get_stomp_port(Config), + N = count_connections(Config), + {ok, Client} = rabbit_stomp_client:connect(StompPort), + N1 = N + 1, + N1 = count_connections(Config), + [rabbit_stomp_client:send( + Client, "SEND", [{"destination", ?DESTINATION}], + [integer_to_list(Count)]) || Count <- lists:seq(1, 1000)], + rabbit_stomp_client:disconnect(Client), + QName = rabbit_misc:r(<<"/">>, queue, <<"bulk-test">>), + timer:sleep(3000), + N = count_connections(Config), + {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName]), + Messages = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info, [Q, [messages]]), + 1000 = pget(messages, Messages), + ok. + +get_stomp_port(Config) -> + rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp). + +stats_are_not_leaked(Config) -> + StompPort = get_stomp_port(Config), + N = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [connection_metrics, size]), + {ok, C} = gen_tcp:connect("localhost", StompPort, []), + Bin = <<"GET / HTTP/1.1\r\nHost: www.rabbitmq.com\r\nUser-Agent: curl/7.43.0\r\nAccept: */*\n\n">>, + gen_tcp:send(C, Bin), + gen_tcp:close(C), + timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms + N = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [connection_metrics, size]), + ok. + +stats(Config) -> + StompPort = get_stomp_port(Config), + {ok, Client} = rabbit_stomp_client:connect(StompPort), + timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms + %% Retrieve the connection Pid + [Reader] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp, list, []), + [{_, Pid}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp_reader, + info, [Reader, [connection]]), + %% Verify the content of the metrics, garbage_collection must be present + [{Pid, Props}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup, + [connection_metrics, Pid]), + true = proplists:is_defined(garbage_collection, Props), + 0 = proplists:get_value(timeout, Props), + %% If the coarse entry is present, stats were successfully emitted + [{Pid, _, _, _, _}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup, + [connection_coarse_metrics, Pid]), + rabbit_stomp_client:disconnect(Client), + ok. + +heartbeat(Config) -> + StompPort = get_stomp_port(Config), + {ok, Client} = rabbit_stomp_client:connect("1.2", "guest", "guest", StompPort, + [{"heart-beat", "5000,7000"}]), + timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms + %% Retrieve the connection Pid + [Reader] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp, list, []), + [{_, Pid}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp_reader, + info, [Reader, [connection]]), + %% Verify the content of the heartbeat timeout + [{Pid, Props}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup, + [connection_metrics, Pid]), + 5 = proplists:get_value(timeout, Props), + rabbit_stomp_client:disconnect(Client), + ok. diff --git a/deps/rabbitmq_stomp/test/frame_SUITE.erl b/deps/rabbitmq_stomp/test/frame_SUITE.erl new file mode 100644 index 0000000000..da191ac12a --- /dev/null +++ b/deps/rabbitmq_stomp/test/frame_SUITE.erl @@ -0,0 +1,191 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(frame_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp_frame.hrl"). +-include("rabbit_stomp_headers.hrl"). +-compile(export_all). + +all() -> + [ + parse_simple_frame, + parse_simple_frame_crlf, + parse_command_only, + parse_command_prefixed_with_newline, + parse_ignore_empty_frames, + parse_heartbeat_interframe, + parse_crlf_interframe, + parse_carriage_return_not_ignored_interframe, + parse_carriage_return_mid_command, + parse_carriage_return_end_command, + parse_resume_mid_command, + parse_resume_mid_header_key, + parse_resume_mid_header_val, + parse_resume_mid_body, + parse_no_header_stripping, + parse_multiple_headers, + header_no_colon, + no_nested_escapes, + header_name_with_cr, + header_value_with_cr, + header_value_with_colon, + headers_escaping_roundtrip, + headers_escaping_roundtrip_without_trailing_lf + ]. + +parse_simple_frame(_) -> + parse_simple_frame_gen("\n"). + +parse_simple_frame_crlf(_) -> + parse_simple_frame_gen("\r\n"). + +parse_simple_frame_gen(Term) -> + Headers = [{"header1", "value1"}, {"header2", "value2"}], + Content = frame_string("COMMAND", + Headers, + "Body Content", + Term), + {"COMMAND", Frame, _State} = parse_complete(Content), + [?assertEqual({ok, Value}, + rabbit_stomp_frame:header(Frame, Key)) || + {Key, Value} <- Headers], + #stomp_frame{body_iolist = Body} = Frame, + ?assertEqual(<<"Body Content">>, iolist_to_binary(Body)). + +parse_command_only(_) -> + {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("COMMAND\n\n\0"). + +parse_command_prefixed_with_newline(_) -> + {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\nCOMMAND\n\n\0"). + +parse_ignore_empty_frames(_) -> + {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\0\0COMMAND\n\n\0"). + +parse_heartbeat_interframe(_) -> + {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\nCOMMAND\n\n\0"). + +parse_crlf_interframe(_) -> + {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\r\nCOMMAND\n\n\0"). + +parse_carriage_return_not_ignored_interframe(_) -> + {error, {unexpected_chars_between_frames, "\rC"}} = parse("\rCOMMAND\n\n\0"). + +parse_carriage_return_mid_command(_) -> + {error, {unexpected_chars_in_command, "\rA"}} = parse("COMM\rAND\n\n\0"). + +parse_carriage_return_end_command(_) -> + {error, {unexpected_chars_in_command, "\r\r"}} = parse("COMMAND\r\r\n\n\0"). + +parse_resume_mid_command(_) -> + First = "COMM", + Second = "AND\n\n\0", + {more, Resume} = parse(First), + {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse(Second, Resume). + +parse_resume_mid_header_key(_) -> + First = "COMMAND\nheade", + Second = "r1:value1\n\n\0", + {more, Resume} = parse(First), + {ok, Frame = #stomp_frame{command = "COMMAND"}, _Rest} = + parse(Second, Resume), + ?assertEqual({ok, "value1"}, + rabbit_stomp_frame:header(Frame, "header1")). + +parse_resume_mid_header_val(_) -> + First = "COMMAND\nheader1:val", + Second = "ue1\n\n\0", + {more, Resume} = parse(First), + {ok, Frame = #stomp_frame{command = "COMMAND"}, _Rest} = + parse(Second, Resume), + ?assertEqual({ok, "value1"}, + rabbit_stomp_frame:header(Frame, "header1")). + +parse_resume_mid_body(_) -> + First = "COMMAND\n\nABC", + Second = "DEF\0", + {more, Resume} = parse(First), + {ok, #stomp_frame{command = "COMMAND", body_iolist = Body}, _Rest} = + parse(Second, Resume), + ?assertEqual([<<"ABC">>, <<"DEF">>], Body). + +parse_no_header_stripping(_) -> + Content = "COMMAND\nheader: foo \n\n\0", + {ok, Frame, _} = parse(Content), + {ok, Val} = rabbit_stomp_frame:header(Frame, "header"), + ?assertEqual(" foo ", Val). + +parse_multiple_headers(_) -> + Content = "COMMAND\nheader:correct\nheader:incorrect\n\n\0", + {ok, Frame, _} = parse(Content), + {ok, Val} = rabbit_stomp_frame:header(Frame, "header"), + ?assertEqual("correct", Val). + +header_no_colon(_) -> + Content = "COMMAND\n" + "hdr1:val1\n" + "hdrerror\n" + "hdr2:val2\n" + "\n\0", + ?assertEqual(parse(Content), {error, {header_no_value, "hdrerror"}}). + +no_nested_escapes(_) -> + Content = "COM\\\\rAND\n" % no escapes + "hdr\\\\rname:" % one escape + "hdr\\\\rval\n\n\0", % one escape + {ok, Frame, _} = parse(Content), + ?assertEqual(Frame, + #stomp_frame{command = "COM\\\\rAND", + headers = [{"hdr\\rname", "hdr\\rval"}], + body_iolist = []}). + +header_name_with_cr(_) -> + Content = "COMMAND\nhead\rer:val\n\n\0", + {error, {unexpected_chars_in_header, "\re"}} = parse(Content). + +header_value_with_cr(_) -> + Content = "COMMAND\nheader:val\rue\n\n\0", + {error, {unexpected_chars_in_header, "\ru"}} = parse(Content). + +header_value_with_colon(_) -> + Content = "COMMAND\nheader:val:ue\n\n\0", + {ok, Frame, _} = parse(Content), + ?assertEqual(Frame, + #stomp_frame{ command = "COMMAND", + headers = [{"header", "val:ue"}], + body_iolist = []}). + +test_frame_serialization(Expected, TrailingLF) -> + {ok, Frame, _} = parse(Expected), + {ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"), + ?assertEqual(":\n\r\\", Val), + Serialized = lists:flatten(rabbit_stomp_frame:serialize(Frame, TrailingLF)), + ?assertEqual(Expected, rabbit_misc:format("~s", [Serialized])). + +headers_escaping_roundtrip(_) -> + test_frame_serialization("COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0\n", true). + +headers_escaping_roundtrip_without_trailing_lf(_) -> + test_frame_serialization("COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0", false). + +parse(Content) -> + parse(Content, rabbit_stomp_frame:initial_state()). +parse(Content, State) -> + rabbit_stomp_frame:parse(list_to_binary(Content), State). + +parse_complete(Content) -> + {ok, Frame = #stomp_frame{command = Command}, State} = parse(Content), + {Command, Frame, State}. + +frame_string(Command, Headers, BodyContent, Term) -> + HeaderString = + lists:flatten([Key ++ ":" ++ Value ++ Term || {Key, Value} <- Headers]), + Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0" ++ "\n". + diff --git a/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl new file mode 100644 index 0000000000..46c1c6c743 --- /dev/null +++ b/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl @@ -0,0 +1,104 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(proxy_protocol_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(TIMEOUT, 5000). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + proxy_protocol, + proxy_protocol_tls + ]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Suffix}, + {rmq_certspwd, "bunnychow"}, + {rabbitmq_ct_tls_verify, verify_none} + ]), + MqttConfig = stomp_config(), + rabbit_ct_helpers:run_setup_steps(Config1, + [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++ + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +stomp_config() -> + {rabbitmq_stomp, [ + {proxy_protocol, true} + ]}. + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +proxy_protocol(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port, + [binary, {active, false}, {packet, raw}]), + ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"), + ok = inet:send(Socket, stomp_connect_frame()), + {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), + ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, connection_name, []), + match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]), + gen_tcp:close(Socket), + ok. + +proxy_protocol_tls(Config) -> + app_utils:start_applications([asn1, crypto, public_key, ssl]), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls), + {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port, + [binary, {active, false}, {packet, raw}]), + ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"), + {ok, SslSocket} = ssl:connect(Socket, [], ?TIMEOUT), + ok = ssl:send(SslSocket, stomp_connect_frame()), + {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT), + ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, connection_name, []), + match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]), + gen_tcp:close(Socket), + ok. + +connection_name() -> + Connections = ets:tab2list(connection_created), + {_Key, Values} = lists:nth(1, Connections), + {_, Name} = lists:keyfind(name, 1, Values), + Name. + +merge_app_env(MqttConfig, Config) -> + rabbit_ct_helpers:merge_app_env(Config, MqttConfig). + +stomp_connect_frame() -> + <<"CONNECT\n", + "login:guest\n", + "passcode:guest\n", + "\n", + 0>>.
\ No newline at end of file diff --git a/deps/rabbitmq_stomp/test/python_SUITE.erl b/deps/rabbitmq_stomp/test/python_SUITE.erl new file mode 100644 index 0000000000..9613b25032 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE.erl @@ -0,0 +1,72 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(python_SUITE). +-compile(export_all). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [ + common, + ssl, + connect_options + ]. + +init_per_testcase(TestCase, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, TestCase, "-"), + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_certspwd, "bunnychow"}, + {rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:log_environment(), + Config2 = rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps()), + DataDir = ?config(data_dir, Config2), + PikaDir = filename:join([DataDir, "deps", "pika"]), + StomppyDir = filename:join([DataDir, "deps", "stomppy"]), + rabbit_ct_helpers:make(Config2, PikaDir, []), + rabbit_ct_helpers:make(Config2, StomppyDir, []), + Config2. + +end_per_testcase(_, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + + +common(Config) -> + run(Config, filename:join("src", "test.py")). + +connect_options(Config) -> + run(Config, filename:join("src", "test_connect_options.py")). + +ssl(Config) -> + run(Config, filename:join("src", "test_ssl.py")). + +run(Config, Test) -> + DataDir = ?config(data_dir, Config), + CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir), + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + StompPortTls = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls), + AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + PythonPath = os:getenv("PYTHONPATH"), + os:putenv("PYTHONPATH", filename:join([DataDir, "deps", "pika","pika"]) + ++":"++ + filename:join([DataDir, "deps", "stomppy", "stomppy"]) + ++ ":" ++ + PythonPath), + os:putenv("AMQP_PORT", integer_to_list(AmqpPort)), + os:putenv("STOMP_PORT", integer_to_list(StompPort)), + os:putenv("STOMP_PORT_TLS", integer_to_list(StompPortTls)), + os:putenv("RABBITMQ_NODENAME", atom_to_list(NodeName)), + os:putenv("SSL_CERTS_PATH", CertsDir), + {ok, _} = rabbit_ct_helpers:exec([filename:join(DataDir, Test)]). + + +cur_dir() -> + {ok, Src} = filelib:find_source(?MODULE), + filename:dirname(Src). diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile new file mode 100644 index 0000000000..10aa6f0212 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile @@ -0,0 +1,27 @@ +UPSTREAM_GIT=https://github.com/pika/pika.git +REVISION=1.1.0 + +LIB_DIR=pika +CHECKOUT_DIR=pika-$(REVISION) + +TARGETS=$(LIB_DIR) + +all: $(TARGETS) + +clean: + rm -rf $(LIB_DIR) + +distclean: clean + rm -rf $(CHECKOUT_DIR) + +$(LIB_DIR) : $(CHECKOUT_DIR) + rm -rf $@ + cp -R $< $@ + +$(CHECKOUT_DIR): + git clone --depth 1 --branch $(REVISION) $(UPSTREAM_GIT) $@ || \ + (rm -rf $@; exit 1) + +echo-revision: + @echo $(REVISION) + diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile new file mode 100644 index 0000000000..40f5bd1db7 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile @@ -0,0 +1,27 @@ +UPSTREAM_GIT=https://github.com/jasonrbriggs/stomp.py.git +REVISION=v4.0.16 + +LIB_DIR=stomppy +CHECKOUT_DIR=stomppy-git + +TARGETS=$(LIB_DIR) + +all: $(TARGETS) + +clean: + rm -rf $(LIB_DIR) + +distclean: clean + rm -rf $(CHECKOUT_DIR) + +$(LIB_DIR) : $(CHECKOUT_DIR) + rm -rf $@ + cp -R $< $@ + +$(CHECKOUT_DIR): + git clone $(UPSTREAM_GIT) $@ + (cd $@ && git checkout $(REVISION)) || rm -rf $@ + +echo-revision: + @echo $(REVISION) + diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py new file mode 100644 index 0000000000..9103bc76ea --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py @@ -0,0 +1,252 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time +import os + +class TestAck(base.BaseTest): + + def test_ack_client(self): + destination = "/queue/ack-test" + + # subscribe and send message + self.listener.reset(2) ## expecting 2 messages + self.subscribe_dest(self.conn, destination, None, + ack='client', + headers={'prefetch-count': '10'}) + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + self.assertTrue(self.listener.wait(4), "initial message not received") + self.assertEquals(2, len(self.listener.messages)) + + # disconnect with no ack + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + listener2.reset(2) + conn2.set_listener('', listener2) + self.subscribe_dest(conn2, destination, None, + ack='client', + headers={'prefetch-count': '10'}) + self.assertTrue(listener2.wait(), "message not received again") + self.assertEquals(2, len(listener2.messages)) + + # now ack only the last message - expecting cumulative behaviour + mid = listener2.messages[1]['headers'][self.ack_id_source_header] + self.ack_message(conn2, mid, None) + finally: + conn2.disconnect() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + conn3.set_listener('', listener3) + self.subscribe_dest(conn3, destination, None) + self.assertFalse(listener3.wait(3), + "unexpected message. ACK not working?") + finally: + conn3.disconnect() + + def test_ack_client_individual(self): + destination = "/queue/ack-test-individual" + + # subscribe and send message + self.listener.reset(2) ## expecting 2 messages + self.subscribe_dest(self.conn, destination, None, + ack='client-individual', + headers={'prefetch-count': '10'}) + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + self.assertTrue(self.listener.wait(4), "Both initial messages not received") + self.assertEquals(2, len(self.listener.messages)) + + # disconnect without acks + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + listener2.reset(2) ## expect 2 messages + conn2.set_listener('', listener2) + self.subscribe_dest(conn2, destination, None, + ack='client-individual', + headers={'prefetch-count': '10'}) + self.assertTrue(listener2.wait(2.5), "Did not receive 2 messages") + self.assertEquals(2, len(listener2.messages), "Not exactly 2 messages received") + + # now ack only the 'test2' message - expecting individual behaviour + nummsgs = len(listener2.messages) + mid = None + for ind in range(nummsgs): + if listener2.messages[ind]['message']=="test2": + mid = listener2.messages[ind]['headers'][self.ack_id_source_header] + self.assertEquals(1, ind, 'Expecting test2 to be second message') + break + self.assertTrue(mid, "Did not find test2 message id.") + self.ack_message(conn2, mid, None) + finally: + conn2.disconnect() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + listener3.reset(2) ## expecting a single message, but wait for two + conn3.set_listener('', listener3) + self.subscribe_dest(conn3, destination, None) + self.assertFalse(listener3.wait(2.5), + "Expected to see only one message. ACK not working?") + self.assertEquals(1, len(listener3.messages), "Expecting exactly one message") + self.assertEquals("test1", listener3.messages[0]['message'], "Unexpected message remains") + finally: + conn3.disconnect() + + def test_ack_client_tx(self): + destination = "/queue/ack-test-tx" + + # subscribe and send message + self.listener.reset() + self.subscribe_dest(self.conn, destination, None, ack='client') + self.conn.send(destination, "test") + self.assertTrue(self.listener.wait(3), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + + # disconnect with no ack + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + tx = "abc" + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + conn2.begin(transaction=tx) + self.subscribe_dest(conn2, destination, None, ack='client') + self.assertTrue(listener2.wait(), "message not received again") + self.assertEquals(1, len(listener2.messages)) + + # now ack + mid = listener2.messages[0]['headers'][self.ack_id_source_header] + self.ack_message(conn2, mid, None, transaction=tx) + + #now commit + conn2.commit(transaction=tx) + finally: + conn2.disconnect() + + # now reconnect again, shouldn't see the message + conn3 = self.create_connection() + try: + listener3 = base.WaitableListener() + conn3.set_listener('', listener3) + self.subscribe_dest(conn3, destination, None) + self.assertFalse(listener3.wait(3), + "unexpected message. TX ACK not working?") + finally: + conn3.disconnect() + + def test_topic_prefetch(self): + destination = "/topic/prefetch-test" + + # subscribe and send message + self.listener.reset(6) ## expect 6 messages + self.subscribe_dest(self.conn, destination, None, + ack='client', + headers={'prefetch-count': '5'}) + + for x in range(10): + self.conn.send(destination, "test" + str(x)) + + self.assertFalse(self.listener.wait(3), + "Should not have been able to see 6 messages") + self.assertEquals(5, len(self.listener.messages)) + + def test_nack(self): + destination = "/queue/nack-test" + + #subscribe and send + self.subscribe_dest(self.conn, destination, None, + ack='client-individual') + self.conn.send(destination, "nack-test") + + self.assertTrue(self.listener.wait(), "Not received message") + message_id = self.listener.messages[0]['headers'][self.ack_id_source_header] + self.listener.reset() + + self.nack_message(self.conn, message_id, None) + self.assertTrue(self.listener.wait(), "Not received message after NACK") + message_id = self.listener.messages[0]['headers'][self.ack_id_source_header] + self.ack_message(self.conn, message_id, None) + + def test_nack_multi(self): + destination = "/queue/nack-multi" + + self.listener.reset(2) + + #subscribe and send + self.subscribe_dest(self.conn, destination, None, + ack='client', + headers = {'prefetch-count' : '10'}) + self.conn.send(destination, "nack-test1") + self.conn.send(destination, "nack-test2") + + self.assertTrue(self.listener.wait(), "Not received messages") + message_id = self.listener.messages[1]['headers'][self.ack_id_source_header] + self.listener.reset(2) + + self.nack_message(self.conn, message_id, None) + self.assertTrue(self.listener.wait(), "Not received message again") + message_id = self.listener.messages[1]['headers'][self.ack_id_source_header] + self.ack_message(self.conn, message_id, None) + + def test_nack_without_requeueing(self): + destination = "/queue/nack-test-no-requeue" + + self.subscribe_dest(self.conn, destination, None, + ack='client-individual') + self.conn.send(destination, "nack-test") + + self.assertTrue(self.listener.wait(), "Not received message") + message_id = self.listener.messages[0]['headers'][self.ack_id_source_header] + self.listener.reset() + + self.conn.send_frame("NACK", {self.ack_id_header: message_id, "requeue": False}) + self.assertFalse(self.listener.wait(4), "Received message after NACK with requeue = False") + +class TestAck11(TestAck): + + def create_connection_obj(self, version='1.1', vhost='/', heartbeats=(0, 0)): + conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + self.ack_id_source_header = 'message-id' + self.ack_id_header = 'message-id' + return conn + + def test_version(self): + self.assertEquals('1.1', self.conn.version) + +class TestAck12(TestAck): + + def create_connection_obj(self, version='1.2', vhost='/', heartbeats=(0, 0)): + conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + self.ack_id_source_header = 'ack' + self.ack_id_header = 'id' + return conn + + def test_version(self): + self.assertEquals('1.2', self.conn.version) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py new file mode 100644 index 0000000000..2c5ee45a8e --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py @@ -0,0 +1,42 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import pika +import base +import os + +class TestAmqpHeaders(base.BaseTest): + def test_headers_to_stomp(self): + self.listener.reset(1) + queueName='test-amqp-headers-to-stomp' + + # Set up STOMP subscription + self.subscribe_dest(self.conn, '/topic/test', None, headers={'x-queue-name': queueName}) + + # Set up AMQP connection + amqp_params = pika.ConnectionParameters(host='localhost', port=int(os.environ["AMQP_PORT"])) + amqp_conn = pika.BlockingConnection(amqp_params) + amqp_chan = amqp_conn.channel() + + # publish a message with headers to the named AMQP queue + amqp_headers = { 'x-custom-hdr-1': 'value1', + 'x-custom-hdr-2': 'value2', + 'custom-hdr-3': 'value3' } + amqp_props = pika.BasicProperties(headers=amqp_headers) + amqp_chan.basic_publish(exchange='', routing_key=queueName, body='Hello World!', properties=amqp_props) + + # check if we receive the message from the STOMP subscription + self.assertTrue(self.listener.wait(2), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + msg = self.listener.messages[0] + self.assertEquals('Hello World!', msg['message']) + self.assertEquals('value1', msg['headers']['x-custom-hdr-1']) + self.assertEquals('value2', msg['headers']['x-custom-hdr-2']) + self.assertEquals('value3', msg['headers']['custom-hdr-3']) + + self.conn.disconnect() + amqp_conn.close() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py new file mode 100644 index 0000000000..a8f7ef59b9 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py @@ -0,0 +1,259 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import sys +import threading +import os + + +class BaseTest(unittest.TestCase): + + def create_connection_obj(self, version='1.0', vhost='/', heartbeats=(0, 0)): + if version == '1.0': + conn = stomp.StompConnection10(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))]) + self.ack_id_source_header = 'message-id' + self.ack_id_header = 'message-id' + elif version == '1.1': + conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + self.ack_id_source_header = 'message-id' + self.ack_id_header = 'message-id' + elif version == '1.2': + conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + self.ack_id_source_header = 'ack' + self.ack_id_header = 'id' + else: + conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))], + vhost=vhost, + heartbeats=heartbeats) + conn.version = version + return conn + + def create_connection(self, user='guest', passcode='guest', wait=True, **kwargs): + conn = self.create_connection_obj(**kwargs) + conn.start() + conn.connect(user, passcode, wait=wait) + return conn + + def subscribe_dest(self, conn, destination, sub_id, **kwargs): + if type(conn) is stomp.StompConnection10: + # 'id' is optional in STOMP 1.0. + if sub_id != None: + kwargs['id'] = sub_id + conn.subscribe(destination, **kwargs) + else: + # 'id' is required in STOMP 1.1+. + if sub_id == None: + sub_id = 'ctag' + conn.subscribe(destination, sub_id, **kwargs) + + def unsubscribe_dest(self, conn, destination, sub_id, **kwargs): + if type(conn) is stomp.StompConnection10: + # 'id' is optional in STOMP 1.0. + if sub_id != None: + conn.unsubscribe(id=sub_id, **kwargs) + else: + conn.unsubscribe(destination=destination, **kwargs) + else: + # 'id' is required in STOMP 1.1+. + if sub_id == None: + sub_id = 'ctag' + conn.unsubscribe(sub_id, **kwargs) + + def ack_message(self, conn, msg_id, sub_id, **kwargs): + if type(conn) is stomp.StompConnection10: + conn.ack(msg_id, **kwargs) + elif type(conn) is stomp.StompConnection11: + if sub_id == None: + sub_id = 'ctag' + conn.ack(msg_id, sub_id, **kwargs) + elif type(conn) is stomp.StompConnection12: + conn.ack(msg_id, **kwargs) + + def nack_message(self, conn, msg_id, sub_id, **kwargs): + if type(conn) is stomp.StompConnection10: + # Normally unsupported by STOMP 1.0. + conn.send_frame("NACK", {"message-id": msg_id}) + elif type(conn) is stomp.StompConnection11: + if sub_id == None: + sub_id = 'ctag' + conn.nack(msg_id, sub_id, **kwargs) + elif type(conn) is stomp.StompConnection12: + conn.nack(msg_id, **kwargs) + + def create_subscriber_connection(self, dest): + conn = self.create_connection() + listener = WaitableListener() + conn.set_listener('', listener) + self.subscribe_dest(conn, dest, None, receipt="sub.receipt") + listener.wait() + self.assertEquals(1, len(listener.receipts)) + listener.reset() + return conn, listener + + def setUp(self): + # Note: useful for debugging + # import stomp.listener + self.conn = self.create_connection() + self.listener = WaitableListener() + self.conn.set_listener('waitable', self.listener) + # Note: useful for debugging + # self.printing_listener = stomp.listener.PrintingListener() + # self.conn.set_listener('printing', self.printing_listener) + + def tearDown(self): + if self.conn.is_connected(): + self.conn.disconnect() + self.conn.stop() + + def simple_test_send_rec(self, dest, headers={}): + self.listener.reset() + + self.subscribe_dest(self.conn, dest, None) + self.conn.send(dest, "foo", headers=headers) + + self.assertTrue(self.listener.wait(), "Timeout, no message received") + + # assert no errors + if len(self.listener.errors) > 0: + self.fail(self.listener.errors[0]['message']) + + # check header content + msg = self.listener.messages[0] + self.assertEquals("foo", msg['message']) + self.assertEquals(dest, msg['headers']['destination']) + return msg['headers'] + + def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=10): + if numMsgs + numErrs + numRcts > 0: + self._assertTrue(self.listener.wait(timeout), errMsg + " (#awaiting)") + else: + self._assertFalse(self.listener.wait(timeout), errMsg + " (#awaiting)") + self._assertEquals(numMsgs, len(self.listener.messages), errMsg + " (#messages)") + self._assertEquals(numErrs, len(self.listener.errors), errMsg + " (#errors)") + self._assertEquals(numRcts, len(self.listener.receipts), errMsg + " (#receipts)") + + def _assertTrue(self, bool, msg): + if not bool: + self.listener.print_state(msg, True) + self.assertTrue(bool, msg) + + def _assertFalse(self, bool, msg): + if bool: + self.listener.print_state(msg, True) + self.assertFalse(bool, msg) + + def _assertEquals(self, expected, actual, msg): + if expected != actual: + self.listener.print_state(msg, True) + self.assertEquals(expected, actual, msg) + + def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=5): + num = numMsgs + numErrs + numRcts + self.listener.reset(num if num>0 else 1) + verb() + self.assertListener(errMsg=errMsg, numMsgs=numMsgs, numErrs=numErrs, numRcts=numRcts, timeout=timeout) + +class WaitableListener(object): + + def __init__(self): + self.debug = False + if self.debug: + print('(listener) init') + self.messages = [] + self.errors = [] + self.receipts = [] + self.latch = Latch(1) + self.msg_no = 0 + + def _next_msg_no(self): + self.msg_no += 1 + return self.msg_no + + def _append(self, array, msg, hdrs): + mno = self._next_msg_no() + array.append({'message' : msg, 'headers' : hdrs, 'msg_no' : mno}) + self.latch.countdown() + + def on_receipt(self, headers, message): + if self.debug: + print('(on_receipt) message: {}, headers: {}'.format(message, headers)) + self._append(self.receipts, message, headers) + + def on_error(self, headers, message): + if self.debug: + print('(on_error) message: {}, headers: {}'.format(message, headers)) + self._append(self.errors, message, headers) + + def on_message(self, headers, message): + if self.debug: + print('(on_message) message: {}, headers: {}'.format(message, headers)) + self._append(self.messages, message, headers) + + def reset(self, count=1): + if self.debug: + self.print_state('(reset listener--old state)') + self.messages = [] + self.errors = [] + self.receipts = [] + self.latch = Latch(count) + self.msg_no = 0 + if self.debug: + self.print_state('(reset listener--new state)') + + def wait(self, timeout=10): + return self.latch.wait(timeout) + + def print_state(self, hdr="", full=False): + print(hdr) + print('#messages: {}'.format(len(self.messages))) + print('#errors: {}', len(self.errors)) + print('#receipts: {}'.format(len(self.receipts))) + print('Remaining count: {}'.format(self.latch.get_count())) + if full: + if len(self.messages) != 0: print('Messages: {}'.format(self.messages)) + if len(self.errors) != 0: print('Messages: {}'.format(self.errors)) + if len(self.receipts) != 0: print('Messages: {}'.format(self.receipts)) + +class Latch(object): + + def __init__(self, count=1): + self.cond = threading.Condition() + self.cond.acquire() + self.count = count + self.cond.release() + + def countdown(self): + self.cond.acquire() + if self.count > 0: + self.count -= 1 + if self.count == 0: + self.cond.notify_all() + self.cond.release() + + def wait(self, timeout=None): + try: + self.cond.acquire() + if self.count == 0: + return True + else: + self.cond.wait(timeout) + return self.count == 0 + finally: + self.cond.release() + + def get_count(self): + try: + self.cond.acquire() + return self.count + finally: + self.cond.release() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py new file mode 100644 index 0000000000..f71c4acf70 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py @@ -0,0 +1,51 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import test_util +import os + +class TestConnectOptions(base.BaseTest): + + def test_implicit_connect(self): + ''' Implicit connect with receipt on first command ''' + self.conn.disconnect() + test_util.enable_implicit_connect() + listener = base.WaitableListener() + new_conn = stomp.Connection(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))]) + new_conn.set_listener('', listener) + + new_conn.start() # not going to issue connect + self.subscribe_dest(new_conn, "/topic/implicit", 'sub_implicit', + receipt='implicit') + + try: + self.assertTrue(listener.wait(5)) + self.assertEquals(1, len(listener.receipts), + 'Missing receipt. Likely not connected') + self.assertEquals('implicit', listener.receipts[0]['headers']['receipt-id']) + finally: + new_conn.disconnect() + test_util.disable_implicit_connect() + + def test_default_user(self): + ''' Default user connection ''' + self.conn.disconnect() + test_util.enable_default_user() + listener = base.WaitableListener() + new_conn = stomp.Connection(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))]) + new_conn.set_listener('', listener) + new_conn.start() + new_conn.connect() + try: + self.assertFalse(listener.wait(3)) # no error back + self.assertTrue(new_conn.is_connected()) + finally: + new_conn.disconnect() + test_util.disable_default_user() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py new file mode 100644 index 0000000000..76e5402686 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py @@ -0,0 +1,536 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time + +class TestExchange(base.BaseTest): + + + def test_amq_direct(self): + ''' Test basic send/receive for /exchange/amq.direct ''' + self.__test_exchange_send_rec("amq.direct", "route") + + def test_amq_topic(self): + ''' Test basic send/receive for /exchange/amq.topic ''' + self.__test_exchange_send_rec("amq.topic", "route") + + def test_amq_fanout(self): + ''' Test basic send/receive for /exchange/amq.fanout ''' + self.__test_exchange_send_rec("amq.fanout", "route") + + def test_amq_fanout_no_route(self): + ''' Test basic send/receive, /exchange/amq.direct, no routing key''' + self.__test_exchange_send_rec("amq.fanout") + + def test_invalid_exchange(self): + ''' Test invalid exchange error ''' + self.listener.reset(1) + self.subscribe_dest(self.conn, "/exchange/does.not.exist", None, + ack="auto") + self.assertListener("Expecting an error", numErrs=1) + err = self.listener.errors[0] + self.assertEquals("not_found", err['headers']['message']) + self.assertEquals( + "NOT_FOUND - no exchange 'does.not.exist' in vhost '/'\n", + err['message']) + time.sleep(1) + self.assertFalse(self.conn.is_connected()) + + def __test_exchange_send_rec(self, exchange, route = None): + if exchange != "amq.topic": + dest = "/exchange/" + exchange + else: + dest = "/topic" + if route != None: + dest += "/" + route + + self.simple_test_send_rec(dest) + +class TestQueue(base.BaseTest): + + def test_send_receive(self): + ''' Test basic send/receive for /queue ''' + destination = '/queue/test' + self.simple_test_send_rec(destination) + + def test_send_receive_in_other_conn(self): + ''' Test send in one connection, receive in another ''' + destination = '/queue/test2' + + # send + self.conn.send(destination, "hello") + + # now receive + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + self.subscribe_dest(conn2, destination, None, ack="auto") + self.assertTrue(listener2.wait(10), "no receive") + finally: + conn2.disconnect() + + def test_send_receive_in_other_conn_with_disconnect(self): + ''' Test send, disconnect, receive ''' + destination = '/queue/test3' + + # send + self.conn.send(destination, "hello thar", receipt="foo") + self.listener.wait(3) + self.conn.disconnect() + + # now receive + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + self.subscribe_dest(conn2, destination, None, ack="auto") + self.assertTrue(listener2.wait(10), "no receive") + finally: + conn2.disconnect() + + + def test_multi_subscribers(self): + ''' Test multiple subscribers against a single /queue destination ''' + destination = '/queue/test-multi' + + ## set up two subscribers + conn1, listener1 = self.create_subscriber_connection(destination) + conn2, listener2 = self.create_subscriber_connection(destination) + + try: + ## now send + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + + ## expect both consumers to get a message? + self.assertTrue(listener1.wait(2)) + self.assertEquals(1, len(listener1.messages), + "unexpected message count") + self.assertTrue(listener2.wait(2)) + self.assertEquals(1, len(listener2.messages), + "unexpected message count") + finally: + conn1.disconnect() + conn2.disconnect() + + def test_send_with_receipt(self): + destination = '/queue/test-receipt' + def noop(): pass + self.__test_send_receipt(destination, noop, noop) + + def test_send_with_receipt_tx(self): + destination = '/queue/test-receipt-tx' + tx = 'receipt.tx' + + def before(): + self.conn.begin(transaction=tx) + + def after(): + self.assertFalse(self.listener.wait(1)) + self.conn.commit(transaction=tx) + + self.__test_send_receipt(destination, before, after, {'transaction': tx}) + + def test_interleaved_receipt_no_receipt(self): + ''' Test i-leaved receipt/no receipt, no-r bracketed by rs ''' + + destination = '/queue/ir' + + self.listener.reset(5) + + self.subscribe_dest(self.conn, destination, None, ack="auto") + self.conn.send(destination, 'first', receipt='a') + self.conn.send(destination, 'second') + self.conn.send(destination, 'third', receipt='b') + + self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=3) + + self.assertEquals(set(['a','b']), self.__gather_receipts()) + + def test_interleaved_receipt_no_receipt_tx(self): + ''' Test i-leaved receipt/no receipt, no-r bracketed by r+xactions ''' + + destination = '/queue/ir' + tx = 'tx.ir' + + # three messages and two receipts + self.listener.reset(5) + + self.subscribe_dest(self.conn, destination, None, ack="auto") + self.conn.begin(transaction=tx) + + self.conn.send(destination, 'first', receipt='a', transaction=tx) + self.conn.send(destination, 'second', transaction=tx) + self.conn.send(destination, 'third', receipt='b', transaction=tx) + self.conn.commit(transaction=tx) + + self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=40) + + expected = set(['a', 'b']) + missing = expected.difference(self.__gather_receipts()) + + self.assertEquals(set(), missing, "Missing receipts: " + str(missing)) + + def test_interleaved_receipt_no_receipt_inverse(self): + ''' Test i-leaved receipt/no receipt, r bracketed by no-rs ''' + + destination = '/queue/ir' + + self.listener.reset(4) + + self.subscribe_dest(self.conn, destination, None, ack="auto") + self.conn.send(destination, 'first') + self.conn.send(destination, 'second', receipt='a') + self.conn.send(destination, 'third') + + self.assertListener("Missing messages/receipt", numMsgs=3, numRcts=1, timeout=3) + + self.assertEquals(set(['a']), self.__gather_receipts()) + + def __test_send_receipt(self, destination, before, after, headers = {}): + count = 50 + self.listener.reset(count) + + before() + expected_receipts = set() + + for x in range(0, count): + receipt = "test" + str(x) + expected_receipts.add(receipt) + self.conn.send(destination, "test receipt", + receipt=receipt, headers=headers) + after() + + self.assertTrue(self.listener.wait(5)) + + missing_receipts = expected_receipts.difference( + self.__gather_receipts()) + + self.assertEquals(set(), missing_receipts, + "missing receipts: " + str(missing_receipts)) + + def __gather_receipts(self): + result = set() + for r in self.listener.receipts: + result.add(r['headers']['receipt-id']) + return result + +class TestTopic(base.BaseTest): + + def test_send_receive(self): + ''' Test basic send/receive for /topic ''' + destination = '/topic/test' + self.simple_test_send_rec(destination) + + def test_send_multiple(self): + ''' Test /topic with multiple consumers ''' + destination = '/topic/multiple' + + ## set up two subscribers + conn1, listener1 = self.create_subscriber_connection(destination) + conn2, listener2 = self.create_subscriber_connection(destination) + + try: + ## listeners are expecting 2 messages + listener1.reset(2) + listener2.reset(2) + + ## now send + self.conn.send(destination, "test1") + self.conn.send(destination, "test2") + + ## expect both consumers to get both messages + self.assertTrue(listener1.wait(5)) + self.assertEquals(2, len(listener1.messages), + "unexpected message count") + self.assertTrue(listener2.wait(5)) + self.assertEquals(2, len(listener2.messages), + "unexpected message count") + finally: + conn1.disconnect() + conn2.disconnect() + + def test_send_multiple_with_a_large_message(self): + ''' Test /topic with multiple consumers ''' + destination = '/topic/16mb' + # payload size + s = 1024 * 1024 * 16 + message = 'x' * s + + conn1, listener1 = self.create_subscriber_connection(destination) + conn2, listener2 = self.create_subscriber_connection(destination) + + try: + listener1.reset(2) + listener2.reset(2) + + self.conn.send(destination, message) + self.conn.send(destination, message) + + self.assertTrue(listener1.wait(10)) + self.assertEquals(2, len(listener1.messages), + "unexpected message count") + self.assertTrue(len(listener2.messages[0]['message']) == s, + "unexpected message size") + + self.assertTrue(listener2.wait(10)) + self.assertEquals(2, len(listener2.messages), + "unexpected message count") + finally: + conn1.disconnect() + conn2.disconnect() + +class TestReplyQueue(base.BaseTest): + + def test_reply_queue(self): + ''' Test with two separate clients. Client 1 sends + message to a known destination with a defined reply + queue. Client 2 receives on known destination and replies + on the reply destination. Client 1 gets the reply message''' + + known = '/queue/known' + reply = '/temp-queue/0' + + ## Client 1 uses pre-supplied connection and listener + ## Set up client 2 + conn2, listener2 = self.create_subscriber_connection(known) + + try: + self.conn.send(known, "test", + headers = {"reply-to": reply}) + + self.assertTrue(listener2.wait(5)) + self.assertEquals(1, len(listener2.messages)) + + reply_to = listener2.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to.startswith('/reply-queue/')) + + conn2.send(reply_to, "reply") + self.assertTrue(self.listener.wait(5)) + self.assertEquals("reply", self.listener.messages[0]['message']) + finally: + conn2.disconnect() + + def test_reuse_reply_queue(self): + ''' Test re-use of reply-to queue ''' + + known2 = '/queue/known2' + known3 = '/queue/known3' + reply = '/temp-queue/foo' + + def respond(cntn, listna): + self.assertTrue(listna.wait(5)) + self.assertEquals(1, len(listna.messages)) + reply_to = listna.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to.startswith('/reply-queue/')) + cntn.send(reply_to, "reply") + + ## Client 1 uses pre-supplied connection and listener + ## Set up clients 2 and 3 + conn2, listener2 = self.create_subscriber_connection(known2) + conn3, listener3 = self.create_subscriber_connection(known3) + try: + self.listener.reset(2) + self.conn.send(known2, "test2", + headers = {"reply-to": reply}) + self.conn.send(known3, "test3", + headers = {"reply-to": reply}) + respond(conn2, listener2) + respond(conn3, listener3) + + self.assertTrue(self.listener.wait(5)) + self.assertEquals(2, len(self.listener.messages)) + self.assertEquals("reply", self.listener.messages[0]['message']) + self.assertEquals("reply", self.listener.messages[1]['message']) + finally: + conn2.disconnect() + conn3.disconnect() + + def test_perm_reply_queue(self): + '''As test_reply_queue, but with a non-temp reply queue''' + + known = '/queue/known' + reply = '/queue/reply' + + ## Client 1 uses pre-supplied connection and listener + ## Set up client 2 + conn1, listener1 = self.create_subscriber_connection(reply) + conn2, listener2 = self.create_subscriber_connection(known) + + try: + conn1.send(known, "test", + headers = {"reply-to": reply}) + + self.assertTrue(listener2.wait(5)) + self.assertEquals(1, len(listener2.messages)) + + reply_to = listener2.messages[0]['headers']['reply-to'] + self.assertTrue(reply_to == reply) + + conn2.send(reply_to, "reply") + self.assertTrue(listener1.wait(5)) + self.assertEquals("reply", listener1.messages[0]['message']) + finally: + conn1.disconnect() + conn2.disconnect() + +class TestDurableSubscription(base.BaseTest): + + ID = 'test.subscription' + + def __subscribe(self, dest, conn=None, id=None): + if not conn: + conn = self.conn + if not id: + id = TestDurableSubscription.ID + + self.subscribe_dest(conn, dest, id, ack="auto", + headers = {'durable': 'true', + 'receipt': 1, + 'auto-delete': False}) + + def __assert_receipt(self, listener=None, pos=None): + if not listener: + listener = self.listener + + self.assertTrue(listener.wait(5)) + self.assertEquals(1, len(self.listener.receipts)) + if pos is not None: + self.assertEquals(pos, self.listener.receipts[0]['msg_no']) + + def __assert_message(self, msg, listener=None, pos=None): + if not listener: + listener = self.listener + + self.assertTrue(listener.wait(5)) + self.assertEquals(1, len(listener.messages)) + self.assertEquals(msg, listener.messages[0]['message']) + if pos is not None: + self.assertEquals(pos, self.listener.messages[0]['msg_no']) + + def do_test_durable_subscription(self, durability_header): + destination = '/topic/durable' + + self.__subscribe(destination) + self.__assert_receipt() + + # send first message without unsubscribing + self.listener.reset(1) + self.conn.send(destination, "first") + self.__assert_message("first") + + # now unsubscribe (disconnect only) + self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID) + + # send again + self.listener.reset(2) + self.conn.send(destination, "second") + + # resubscribe and expect receipt + self.__subscribe(destination) + self.__assert_receipt(pos=1) + # and message + self.__assert_message("second", pos=2) + + # now unsubscribe (cancel) + self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID, + headers={durability_header: 'true'}) + + # send again + self.listener.reset(1) + self.conn.send(destination, "third") + + # resubscribe and expect no message + self.__subscribe(destination) + self.assertTrue(self.listener.wait(3)) + self.assertEquals(0, len(self.listener.messages)) + self.assertEquals(1, len(self.listener.receipts)) + + def test_durable_subscription(self): + self.do_test_durable_subscription('durable') + + def test_durable_subscription_and_legacy_header(self): + self.do_test_durable_subscription('persistent') + + def test_share_subscription(self): + destination = '/topic/durable-shared' + + conn2 = self.create_connection() + conn2.set_listener('', self.listener) + + try: + self.__subscribe(destination) + self.__assert_receipt() + self.listener.reset(1) + self.__subscribe(destination, conn2) + self.__assert_receipt() + + self.listener.reset(100) + + # send 100 messages + for x in range(0, 100): + self.conn.send(destination, "msg" + str(x)) + + self.assertTrue(self.listener.wait(5)) + self.assertEquals(100, len(self.listener.messages)) + finally: + conn2.disconnect() + + def test_separate_ids(self): + destination = '/topic/durable-separate' + + conn2 = self.create_connection() + listener2 = base.WaitableListener() + conn2.set_listener('', listener2) + + try: + # ensure durable subscription exists for each ID + self.__subscribe(destination) + self.__assert_receipt() + self.__subscribe(destination, conn2, "other.id") + self.__assert_receipt(listener2) + self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID) + self.unsubscribe_dest(conn2, destination, "other.id") + + self.listener.reset(101) + listener2.reset(101) ## 100 messages and 1 receipt + + # send 100 messages + for x in range(0, 100): + self.conn.send(destination, "msg" + str(x)) + + self.__subscribe(destination) + self.__subscribe(destination, conn2, "other.id") + + for l in [self.listener, listener2]: + self.assertTrue(l.wait(20)) + self.assertTrue(len(l.messages) >= 90) + self.assertTrue(len(l.messages) <= 100) + + finally: + conn2.disconnect() + + def do_test_durable_subscribe_no_id_and_header(self, header): + destination = '/topic/durable-invalid' + + self.conn.send_frame('SUBSCRIBE', + {'destination': destination, 'ack': 'auto', header: 'true'}) + self.listener.wait(3) + self.assertEquals(1, len(self.listener.errors)) + self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message']) + + def test_durable_subscribe_no_id(self): + self.do_test_durable_subscribe_no_id_and_header('durable') + + def test_durable_subscribe_no_id_and_legacy_header(self): + self.do_test_durable_subscribe_no_id_and_header('persistent') diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py new file mode 100644 index 0000000000..884ada50e8 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py @@ -0,0 +1,101 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time + +class TestErrorsAndCloseConnection(base.BaseTest): + def __test_duplicate_consumer_tag_with_headers(self, destination, headers): + self.subscribe_dest(self.conn, destination, None, + headers = headers) + + self.subscribe_dest(self.conn, destination, None, + headers = headers) + + self.assertTrue(self.listener.wait()) + + self.assertEquals(1, len(self.listener.errors)) + errorReceived = self.listener.errors[0] + self.assertEquals("Duplicated subscription identifier", errorReceived['headers']['message']) + self.assertEquals("A subscription identified by 'T_1' already exists.", errorReceived['message']) + time.sleep(2) + self.assertFalse(self.conn.is_connected()) + + + def test_duplicate_consumer_tag_with_transient_destination(self): + destination = "/exchange/amq.direct/duplicate-consumer-tag-test1" + self.__test_duplicate_consumer_tag_with_headers(destination, {'id': 1}) + + def test_duplicate_consumer_tag_with_durable_destination(self): + destination = "/queue/duplicate-consumer-tag-test2" + self.__test_duplicate_consumer_tag_with_headers(destination, {'id': 1, + 'persistent': True}) + + +class TestErrors(base.BaseTest): + + def test_invalid_queue_destination(self): + self.__test_invalid_destination("queue", "/bah/baz") + + def test_invalid_empty_queue_destination(self): + self.__test_invalid_destination("queue", "") + + def test_invalid_topic_destination(self): + self.__test_invalid_destination("topic", "/bah/baz") + + def test_invalid_empty_topic_destination(self): + self.__test_invalid_destination("topic", "") + + def test_invalid_exchange_destination(self): + self.__test_invalid_destination("exchange", "/bah/baz/boo") + + def test_invalid_empty_exchange_destination(self): + self.__test_invalid_destination("exchange", "") + + def test_invalid_default_exchange_destination(self): + self.__test_invalid_destination("exchange", "//foo") + + def test_unknown_destination(self): + self.listener.reset() + self.conn.send("/something/interesting", 'test_unknown_destination') + + self.assertTrue(self.listener.wait()) + self.assertEquals(1, len(self.listener.errors)) + + err = self.listener.errors[0] + self.assertEquals("Unknown destination", err['headers']['message']) + + def test_send_missing_destination(self): + self.__test_missing_destination("SEND") + + def test_send_missing_destination(self): + self.__test_missing_destination("SUBSCRIBE") + + def __test_missing_destination(self, command): + self.listener.reset() + self.conn.send_frame(command) + + self.assertTrue(self.listener.wait()) + self.assertEquals(1, len(self.listener.errors)) + + err = self.listener.errors[0] + self.assertEquals("Missing destination", err['headers']['message']) + + def __test_invalid_destination(self, dtype, content): + self.listener.reset() + self.conn.send("/" + dtype + content, '__test_invalid_destination:' + dtype + content) + + self.assertTrue(self.listener.wait()) + self.assertEquals(1, len(self.listener.errors)) + + err = self.listener.errors[0] + self.assertEquals("Invalid destination", err['headers']['message']) + self.assertEquals("'" + content + "' is not a valid " + + dtype + " destination\n", + err['message']) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py new file mode 100644 index 0000000000..d7b558e7b5 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py @@ -0,0 +1,187 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time + +class TestLifecycle(base.BaseTest): + + def test_unsubscribe_exchange_destination(self): + ''' Test UNSUBSCRIBE command with exchange''' + d = "/exchange/amq.fanout" + self.unsub_test(d, self.sub_and_send(d)) + + def test_unsubscribe_exchange_destination_with_receipt(self): + ''' Test receipted UNSUBSCRIBE command with exchange''' + d = "/exchange/amq.fanout" + self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1) + + def test_unsubscribe_queue_destination(self): + ''' Test UNSUBSCRIBE command with queue''' + d = "/queue/unsub01" + self.unsub_test(d, self.sub_and_send(d)) + + def test_unsubscribe_queue_destination_with_receipt(self): + ''' Test receipted UNSUBSCRIBE command with queue''' + d = "/queue/unsub02" + self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1) + + def test_unsubscribe_exchange_id(self): + ''' Test UNSUBSCRIBE command with exchange by id''' + d = "/exchange/amq.fanout" + self.unsub_test(d, self.sub_and_send(d, subid="exchid")) + + def test_unsubscribe_exchange_id_with_receipt(self): + ''' Test receipted UNSUBSCRIBE command with exchange by id''' + d = "/exchange/amq.fanout" + self.unsub_test(d, self.sub_and_send(d, subid="exchid", receipt="unsub.rct"), numRcts=1) + + def test_unsubscribe_queue_id(self): + ''' Test UNSUBSCRIBE command with queue by id''' + d = "/queue/unsub03" + self.unsub_test(d, self.sub_and_send(d, subid="queid")) + + def test_unsubscribe_queue_id_with_receipt(self): + ''' Test receipted UNSUBSCRIBE command with queue by id''' + d = "/queue/unsub04" + self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1) + + def test_connect_version_1_0(self): + ''' Test CONNECT with version 1.0''' + self.conn.disconnect() + new_conn = self.create_connection(version="1.0") + try: + self.assertTrue(new_conn.is_connected()) + finally: + new_conn.disconnect() + self.assertFalse(new_conn.is_connected()) + + def test_connect_version_1_1(self): + ''' Test CONNECT with version 1.1''' + self.conn.disconnect() + new_conn = self.create_connection(version="1.1") + try: + self.assertTrue(new_conn.is_connected()) + finally: + new_conn.disconnect() + self.assertFalse(new_conn.is_connected()) + + def test_connect_version_1_2(self): + ''' Test CONNECT with version 1.2''' + self.conn.disconnect() + new_conn = self.create_connection(version="1.2") + try: + self.assertTrue(new_conn.is_connected()) + finally: + new_conn.disconnect() + self.assertFalse(new_conn.is_connected()) + + def test_heartbeat_disconnects_client(self): + ''' Test heart-beat disconnection''' + self.conn.disconnect() + new_conn = self.create_connection(version='1.1', heartbeats=(1500, 0)) + try: + self.assertTrue(new_conn.is_connected()) + time.sleep(1) + self.assertTrue(new_conn.is_connected()) + time.sleep(3) + self.assertFalse(new_conn.is_connected()) + finally: + if new_conn.is_connected(): + new_conn.disconnect() + + def test_unsupported_version(self): + ''' Test unsupported version on CONNECT command''' + self.bad_connect("Supported versions are 1.0,1.1,1.2\n", version='100.1') + + def test_bad_username(self): + ''' Test bad username''' + self.bad_connect("Access refused for user 'gust'\n", user='gust') + + def test_bad_password(self): + ''' Test bad password''' + self.bad_connect("Access refused for user 'guest'\n", passcode='gust') + + def test_bad_vhost(self): + ''' Test bad virtual host''' + self.bad_connect("Virtual host '//' access denied", version='1.1', vhost='//') + + def bad_connect(self, expected, user='guest', passcode='guest', **kwargs): + self.conn.disconnect() + new_conn = self.create_connection_obj(**kwargs) + listener = base.WaitableListener() + new_conn.set_listener('', listener) + try: + new_conn.start() + new_conn.connect(user, passcode) + self.assertTrue(listener.wait()) + self.assertEquals(expected, listener.errors[0]['message']) + finally: + if new_conn.is_connected(): + new_conn.disconnect() + + def test_bad_header_on_send(self): + ''' Test disallowed header on SEND ''' + self.listener.reset(1) + self.conn.send_frame("SEND", {"destination":"a", "message-id":"1"}) + self.assertTrue(self.listener.wait()) + self.assertEquals(1, len(self.listener.errors)) + errorReceived = self.listener.errors[0] + self.assertEquals("Invalid header", errorReceived['headers']['message']) + self.assertEquals("'message-id' is not allowed on 'SEND'.\n", errorReceived['message']) + + def test_send_recv_header(self): + ''' Test sending a custom header and receiving it back ''' + dest = '/queue/custom-header' + hdrs = {'x-custom-header-1': 'value1', + 'x-custom-header-2': 'value2', + 'custom-header-3': 'value3'} + self.listener.reset(1) + recv_hdrs = self.simple_test_send_rec(dest, headers=hdrs) + self.assertEquals('value1', recv_hdrs['x-custom-header-1']) + self.assertEquals('value2', recv_hdrs['x-custom-header-2']) + self.assertEquals('value3', recv_hdrs['custom-header-3']) + + def test_disconnect(self): + ''' Test DISCONNECT command''' + self.conn.disconnect() + self.assertFalse(self.conn.is_connected()) + + def test_disconnect_with_receipt(self): + ''' Test the DISCONNECT command with receipts ''' + time.sleep(3) + self.listener.reset(1) + self.conn.send_frame("DISCONNECT", {"receipt": "test"}) + self.assertTrue(self.listener.wait()) + self.assertEquals(1, len(self.listener.receipts)) + receiptReceived = self.listener.receipts[0]['headers']['receipt-id'] + self.assertEquals("test", receiptReceived + , "Wrong receipt received: '" + receiptReceived + "'") + + def unsub_test(self, dest, verbs, numRcts=0): + def afterfun(): + self.conn.send(dest, "after-test") + subverb, unsubverb = verbs + self.assertListenerAfter(subverb, numMsgs=1, + errMsg="FAILED to subscribe and send") + self.assertListenerAfter(unsubverb, numRcts=numRcts, + errMsg="Incorrect responses from UNSUBSCRIBE") + self.assertListenerAfter(afterfun, + errMsg="Still receiving messages") + + def sub_and_send(self, dest, subid=None, receipt=None): + def subfun(): + self.subscribe_dest(self.conn, dest, subid) + self.conn.send(dest, "test") + def unsubfun(): + headers = {} + if receipt != None: + headers['receipt'] = receipt + self.unsubscribe_dest(self.conn, dest, subid, **headers) + return subfun, unsubfun diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py new file mode 100644 index 0000000000..40f908c5d9 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py @@ -0,0 +1,331 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import re +import socket +import functools +import time +import sys +import os + +def connect(cnames): + ''' Decorator that creates stomp connections and issues CONNECT ''' + cmd=('CONNECT\n' + 'login:guest\n' + 'passcode:guest\n' + '\n' + '\n\0') + resp = ('CONNECTED\n' + 'server:RabbitMQ/(.*)\n' + 'session:(.*)\n' + 'heart-beat:0,0\n' + 'version:1.0\n' + '\n\x00') + def w(m): + @functools.wraps(m) + def wrapper(self, *args, **kwargs): + for cname in cnames: + sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sd.settimeout(30000) + sd.connect((self.host, self.port)) + sd.sendall(cmd.encode('utf-8')) + self.match(resp, sd.recv(4096).decode('utf-8')) + setattr(self, cname, sd) + try: + r = m(self, *args, **kwargs) + finally: + for cname in cnames: + try: + getattr(self, cname).close() + except IOError: + pass + return r + return wrapper + return w + + +class TestParsing(unittest.TestCase): + host='127.0.0.1' + # The default port is 61613 but it's in the middle of the ephemeral + # ports range on many operating systems. Therefore, there is a + # chance this port is already in use. Let's use a port close to the + # AMQP default port. + port=int(os.environ["STOMP_PORT"]) + + + def match(self, pattern, data): + ''' helper: try to match a regexp with a string. + Fail test if they do not match. + ''' + matched = re.match(pattern, data) + if matched: + return matched.groups() + self.assertTrue(False, 'No match:\n{}\n\n{}'.format(pattern, data)) + + def recv_atleast(self, bufsize): + recvhead = [] + rl = bufsize + while rl > 0: + buf = self.cd.recv(rl).decode('utf-8') + bl = len(buf) + if bl==0: break + recvhead.append( buf ) + rl -= bl + return ''.join(recvhead) + + + @connect(['cd']) + def test_newline_after_nul(self): + cmd = ('\n' + 'SUBSCRIBE\n' + 'destination:/exchange/amq.fanout\n' + '\n\x00\n' + 'SEND\n' + 'content-type:text/plain\n' + 'destination:/exchange/amq.fanout\n\n' + 'hello\n\x00\n') + self.cd.sendall(cmd.encode('utf-8')) + resp = ('MESSAGE\n' + 'destination:/exchange/amq.fanout\n' + 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n' + 'redelivered:false\n' + 'content-type:text/plain\n' + 'content-length:6\n' + '\n' + 'hello\n\0') + self.match(resp, self.cd.recv(4096).decode('utf-8')) + + @connect(['cd']) + def test_send_without_content_type(self): + cmd = ('\n' + 'SUBSCRIBE\n' + 'destination:/exchange/amq.fanout\n' + '\n\x00\n' + 'SEND\n' + 'destination:/exchange/amq.fanout\n\n' + 'hello\n\x00') + self.cd.sendall(cmd.encode('utf-8')) + resp = ('MESSAGE\n' + 'destination:/exchange/amq.fanout\n' + 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n' + 'redelivered:false\n' + 'content-length:6\n' + '\n' + 'hello\n\0') + self.match(resp, self.cd.recv(4096).decode('utf-8')) + + @connect(['cd']) + def test_send_without_content_type_binary(self): + msg = 'hello' + cmd = ('\n' + 'SUBSCRIBE\n' + 'destination:/exchange/amq.fanout\n' + '\n\x00\n' + 'SEND\n' + 'destination:/exchange/amq.fanout\n' + + 'content-length:{}\n\n'.format(len(msg)) + + '{}\x00'.format(msg)) + self.cd.sendall(cmd.encode('utf-8')) + resp = ('MESSAGE\n' + 'destination:/exchange/amq.fanout\n' + 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n' + 'redelivered:false\n' + + 'content-length:{}\n'.format(len(msg)) + + '\n{}\0'.format(msg)) + self.match(resp, self.cd.recv(4096).decode('utf-8')) + + @connect(['cd']) + def test_newline_after_nul_and_leading_nul(self): + cmd = ('\n' + '\x00SUBSCRIBE\n' + 'destination:/exchange/amq.fanout\n' + '\n\x00\n' + '\x00SEND\n' + 'destination:/exchange/amq.fanout\n' + 'content-type:text/plain\n' + '\nhello\n\x00\n') + self.cd.sendall(cmd.encode('utf-8')) + resp = ('MESSAGE\n' + 'destination:/exchange/amq.fanout\n' + 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n' + 'redelivered:false\n' + 'content-type:text/plain\n' + 'content-length:6\n' + '\n' + 'hello\n\0') + self.match(resp, self.cd.recv(4096).decode('utf-8')) + + @connect(['cd']) + def test_bad_command(self): + ''' Trigger an error message. ''' + cmd = ('WRONGCOMMAND\n' + 'destination:a\n' + 'exchange:amq.fanout\n' + '\n\0') + self.cd.sendall(cmd.encode('utf-8')) + resp = ('ERROR\n' + 'message:Bad command\n' + 'content-type:text/plain\n' + 'version:1.0,1.1,1.2\n' + 'content-length:43\n' + '\n' + 'Could not interpret command "WRONGCOMMAND"\n' + '\0') + self.match(resp, self.cd.recv(4096).decode('utf-8')) + + @connect(['sd', 'cd1', 'cd2']) + def test_broadcast(self): + ''' Single message should be delivered to two consumers: + amq.topic --routing_key--> first_queue --> first_connection + \--routing_key--> second_queue--> second_connection + ''' + subscribe=( 'SUBSCRIBE\n' + 'id: XsKNhAf\n' + 'destination:/exchange/amq.topic/da9d4779\n' + '\n\0') + for cd in [self.cd1, self.cd2]: + cd.sendall(subscribe.encode('utf-8')) + + time.sleep(0.1) + + cmd = ('SEND\n' + 'content-type:text/plain\n' + 'destination:/exchange/amq.topic/da9d4779\n' + '\n' + 'message' + '\n\0') + self.sd.sendall(cmd.encode('utf-8')) + + resp=('MESSAGE\n' + 'subscription:(.*)\n' + 'destination:/topic/da9d4779\n' + 'message-id:(.*)\n' + 'redelivered:false\n' + 'content-type:text/plain\n' + 'content-length:8\n' + '\n' + 'message' + '\n\x00') + for cd in [self.cd1, self.cd2]: + self.match(resp, cd.recv(4096).decode('utf-8')) + + @connect(['cd']) + def test_message_with_embedded_nulls(self): + ''' Test sending/receiving message with embedded nulls. ''' + dest='destination:/exchange/amq.topic/test_embed_nulls_message\n' + resp_dest='destination:/topic/test_embed_nulls_message\n' + subscribe=( 'SUBSCRIBE\n' + 'id:xxx\n' + +dest+ + '\n\0') + self.cd.sendall(subscribe.encode('utf-8')) + + boilerplate = '0123456789'*1024 # large enough boilerplate + message = '01' + oldi = 2 + for i in [5, 90, 256-1, 384-1, 512, 1024, 1024+256+64+32]: + message = message + '\0' + boilerplate[oldi+1:i] + oldi = i + msg_len = len(message) + + cmd = ('SEND\n' + +dest+ + 'content-type:text/plain\n' + 'content-length:%i\n' + '\n' + '%s' + '\0' % (len(message), message)) + self.cd.sendall(cmd.encode('utf-8')) + + headresp=('MESSAGE\n' # 8 + 'subscription:(.*)\n' # 14 + subscription + +resp_dest+ # 44 + 'message-id:(.*)\n' # 12 + message-id + 'redelivered:false\n' # 18 + 'content-type:text/plain\n' # 24 + 'content-length:%i\n' # 16 + 4==len('1024') + '\n' # 1 + '(.*)$' # prefix of body+null (potentially) + % len(message) ) + headlen = 8 + 24 + 14 + (3) + 44 + 12 + 18 + (48) + 16 + (4) + 1 + (1) + + headbuf = self.recv_atleast(headlen) + self.assertFalse(len(headbuf) == 0) + + (sub, msg_id, bodyprefix) = self.match(headresp, headbuf) + bodyresp=( '%s\0' % message ) + bodylen = len(bodyresp); + + bodybuf = ''.join([bodyprefix, + self.recv_atleast(bodylen - len(bodyprefix))]) + + self.assertEqual(len(bodybuf), msg_len+1, + "body received not the same length as message sent") + self.assertEqual(bodybuf, bodyresp, + " body (...'%s')\nincorrectly returned as (...'%s')" + % (bodyresp[-10:], bodybuf[-10:])) + + @connect(['cd']) + def test_message_in_packets(self): + ''' Test sending/receiving message in packets. ''' + base_dest='topic/test_embed_nulls_message\n' + dest='destination:/exchange/amq.' + base_dest + resp_dest='destination:/'+ base_dest + subscribe=( 'SUBSCRIBE\n' + 'id:xxx\n' + +dest+ + '\n\0') + self.cd.sendall(subscribe.encode('utf-8')) + + boilerplate = '0123456789'*1024 # large enough boilerplate + + message = boilerplate[:1024 + 512 + 256 + 32] + msg_len = len(message) + + msg_to_send = ('SEND\n' + +dest+ + 'content-type:text/plain\n' + '\n' + '%s' + '\0' % (message) ) + packet_size = 191 + part_index = 0 + msg_to_send_len = len(msg_to_send) + while part_index < msg_to_send_len: + part = msg_to_send[part_index:part_index+packet_size] + time.sleep(0.1) + self.cd.sendall(part.encode('utf-8')) + part_index += packet_size + + headresp=('MESSAGE\n' # 8 + 'subscription:(.*)\n' # 14 + subscription + +resp_dest+ # 44 + 'message-id:(.*)\n' # 12 + message-id + 'redelivered:false\n' # 18 + 'content-type:text/plain\n' # 24 + 'content-length:%i\n' # 16 + 4==len('1024') + '\n' # 1 + '(.*)$' # prefix of body+null (potentially) + % len(message) ) + headlen = 8 + 24 + 14 + (3) + 44 + 12 + 18 + (48) + 16 + (4) + 1 + (1) + + headbuf = self.recv_atleast(headlen) + self.assertFalse(len(headbuf) == 0) + + (sub, msg_id, bodyprefix) = self.match(headresp, headbuf) + bodyresp=( '%s\0' % message ) + bodylen = len(bodyresp); + + bodybuf = ''.join([bodyprefix, + self.recv_atleast(bodylen - len(bodyprefix))]) + + self.assertEqual(len(bodybuf), msg_len+1, + "body received not the same length as message sent") + self.assertEqual(bodybuf, bodyresp, + " body ('%s')\nincorrectly returned as ('%s')" + % (bodyresp, bodybuf)) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py new file mode 100644 index 0000000000..3761c92360 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py @@ -0,0 +1,87 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import pika +import base +import time +import os + +class TestQueueProperties(base.BaseTest): + + def test_subscribe(self): + destination = "/queue/queue-properties-subscribe-test" + + # subscribe + self.subscribe_dest(self.conn, destination, None, + headers={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + # now try to declare the queue using pika + # if the properties are the same we should + # not get any error + connection = pika.BlockingConnection(pika.ConnectionParameters( + host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + channel.queue_declare(queue='queue-properties-subscribe-test', + durable=True, + arguments={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + self.conn.disconnect() + connection.close() + + def test_send(self): + destination = "/queue/queue-properties-send-test" + + # send + self.conn.send(destination, "test1", + headers={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + # now try to declare the queue using pika + # if the properties are the same we should + # not get any error + connection = pika.BlockingConnection(pika.ConnectionParameters( + host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + channel.queue_declare(queue='queue-properties-send-test', + durable=True, + arguments={ + 'x-message-ttl': 60000, + 'x-expires': 70000, + 'x-max-length': 10, + 'x-max-length-bytes': 20000, + 'x-dead-letter-exchange': 'dead-letter-exchange', + 'x-dead-letter-routing-key': 'dead-letter-routing-key', + 'x-max-priority': 6, + }) + + self.conn.disconnect() + connection.close() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py new file mode 100644 index 0000000000..3dfdd72cc9 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py @@ -0,0 +1,40 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time + +class TestRedelivered(base.BaseTest): + + def test_redelivered(self): + destination = "/queue/redelivered-test" + + # subscribe and send message + self.subscribe_dest(self.conn, destination, None, ack='client') + self.conn.send(destination, "test1") + message_receive_timeout = 30 + self.assertTrue(self.listener.wait(message_receive_timeout), "Test message not received within {0} seconds".format(message_receive_timeout)) + self.assertEquals(1, len(self.listener.messages)) + self.assertEquals('false', self.listener.messages[0]['headers']['redelivered']) + + # disconnect with no ack + self.conn.disconnect() + + # now reconnect + conn2 = self.create_connection() + try: + listener2 = base.WaitableListener() + listener2.reset(1) + conn2.set_listener('', listener2) + self.subscribe_dest(conn2, destination, None, ack='client') + self.assertTrue(listener2.wait(), "message not received again") + self.assertEquals(1, len(listener2.messages)) + self.assertEquals('true', listener2.messages[0]['headers']['redelivered']) + finally: + conn2.disconnect() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py new file mode 100644 index 0000000000..6fbcb3d492 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py @@ -0,0 +1,41 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import base +import stomp +import unittest +import time + +class TestReliability(base.BaseTest): + + def test_send_and_disconnect(self): + ''' Test close socket after send does not lose messages ''' + destination = "/queue/reliability" + pub_conn = self.create_connection() + try: + msg = "0" * (128) + + count = 10000 + + listener = base.WaitableListener() + listener.reset(count) + self.conn.set_listener('', listener) + self.subscribe_dest(self.conn, destination, None) + + for x in range(0, count): + pub_conn.send(destination, msg + str(x)) + time.sleep(2.0) + pub_conn.disconnect() + + if listener.wait(30): + self.assertEquals(count, len(listener.messages)) + else: + listener.print_state("Final state of listener:") + self.fail("Did not receive %s messages in time" % count) + finally: + if pub_conn.is_connected(): + pub_conn.disconnect() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py new file mode 100644 index 0000000000..570ad9f5a3 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py @@ -0,0 +1,81 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import os +import os.path +import sys + +import stomp +import base +import ssl + +base_path = os.path.dirname(sys.argv[0]) + +ssl_key_file = os.path.join(os.getenv('SSL_CERTS_PATH'), 'client', 'key.pem') +ssl_cert_file = os.path.join(os.getenv('SSL_CERTS_PATH'), 'client', 'cert.pem') +ssl_ca_certs = os.path.join(os.getenv('SSL_CERTS_PATH'), 'testca', 'cacert.pem') + +class TestSslClient(unittest.TestCase): + + def __ssl_connect(self): + conn = stomp.Connection(host_and_ports = [ ('localhost', int(os.environ["STOMP_PORT_TLS"])) ], + use_ssl = True, ssl_key_file = ssl_key_file, + ssl_cert_file = ssl_cert_file, + ssl_ca_certs = ssl_ca_certs) + print("FILE: ".format(ssl_cert_file)) + conn.start() + conn.connect("guest", "guest") + return conn + + def __ssl_auth_connect(self): + conn = stomp.Connection(host_and_ports = [ ('localhost', int(os.environ["STOMP_PORT_TLS"])) ], + use_ssl = True, ssl_key_file = ssl_key_file, + ssl_cert_file = ssl_cert_file, + ssl_ca_certs = ssl_ca_certs) + conn.start() + conn.connect() + return conn + + def test_ssl_connect(self): + conn = self.__ssl_connect() + conn.disconnect() + + def test_ssl_auth_connect(self): + conn = self.__ssl_auth_connect() + conn.disconnect() + + def test_ssl_send_receive(self): + conn = self.__ssl_connect() + self.__test_conn(conn) + + def test_ssl_auth_send_receive(self): + conn = self.__ssl_auth_connect() + self.__test_conn(conn) + + def __test_conn(self, conn): + try: + listener = base.WaitableListener() + + conn.set_listener('', listener) + + d = "/topic/ssl.test" + conn.subscribe(destination=d, ack="auto", id="ctag", receipt="sub") + + self.assertTrue(listener.wait(1)) + + self.assertEquals("sub", + listener.receipts[0]['headers']['receipt-id']) + + listener.reset(1) + conn.send(body="Hello SSL!", destination=d) + + self.assertTrue(listener.wait()) + + self.assertEquals("Hello SSL!", listener.messages[0]['message']) + finally: + conn.disconnect() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py new file mode 100755 index 0000000000..01967465a2 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +import test_runner + +if __name__ == '__main__': + modules = [ + 'parsing', + 'errors', + 'lifecycle', + 'ack', + 'amqp_headers', + 'queue_properties', + 'reliability', + 'transactions', + 'x_queue_name', + 'destinations', + 'redelivered', + 'topic_permissions', + 'x_queue_type_quorum' + ] + test_runner.run_unittests(modules) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py new file mode 100755 index 0000000000..10efa4fbb4 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python + +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import test_runner + +if __name__ == '__main__': + modules = ['connect_options'] + test_runner.run_unittests(modules) + diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py new file mode 100644 index 0000000000..9aa5855b02 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import sys +import os + +def run_unittests(modules): + suite = unittest.TestSuite() + for m in modules: + mod = __import__(m) + for name in dir(mod): + obj = getattr(mod, name) + if name.startswith("Test") and issubclass(obj, unittest.TestCase): + suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj)) + + ts = unittest.TextTestRunner().run(unittest.TestSuite(suite)) + if ts.errors or ts.failures: + sys.exit(1) + diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py new file mode 100755 index 0000000000..95d2d2baa7 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import test_runner +import test_util + +if __name__ == '__main__': + modules = ['ssl_lifecycle'] + test_util.ensure_ssl_auth_user() + test_runner.run_unittests(modules) + diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py new file mode 100644 index 0000000000..911100c54f --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py @@ -0,0 +1,52 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import subprocess +import socket +import sys +import os +import os.path + +def ensure_ssl_auth_user(): + user = 'O=client,CN=%s' % socket.gethostname() + rabbitmqctl(['stop_app']) + rabbitmqctl(['reset']) + rabbitmqctl(['start_app']) + rabbitmqctl(['add_user', user, 'foo']) + rabbitmqctl(['clear_password', user]) + rabbitmqctl(['set_permissions', user, '.*', '.*', '.*']) + +def enable_implicit_connect(): + switch_config(implicit_connect='true', default_user='[{login, "guest"}, {passcode, "guest"}]') + +def disable_implicit_connect(): + switch_config(implicit_connect='false', default_user='[]') + +def enable_default_user(): + switch_config(default_user='[{login, "guest"}, {passcode, "guest"}]') + +def disable_default_user(): + switch_config(default_user='[]') + +def switch_config(implicit_connect='', default_user=''): + cmd = '' + cmd += 'ok = io:format("~n===== Ranch listeners (before stop) =====~n~n~p~n", [ranch:info()]),' + cmd += 'ok = application:stop(rabbitmq_stomp),' + cmd += 'io:format("~n===== Ranch listeners (after stop) =====~n~n~p~n", [ranch:info()]),' + if implicit_connect: + cmd += 'ok = application:set_env(rabbitmq_stomp,implicit_connect,{}),'.format(implicit_connect) + if default_user: + cmd += 'ok = application:set_env(rabbitmq_stomp,default_user,{}),'.format(default_user) + cmd += 'ok = application:start(rabbitmq_stomp),' + cmd += 'io:format("~n===== Ranch listeners (after start) =====~n~n~p~n", [ranch:info()]).' + rabbitmqctl(['eval', cmd]) + +def rabbitmqctl(args): + ctl = os.getenv('RABBITMQCTL') + cmdline = [ctl, '-n', os.getenv('RABBITMQ_NODENAME')] + cmdline.extend(args) + subprocess.check_call(cmdline) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py new file mode 100644 index 0000000000..6272f6d8b5 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py @@ -0,0 +1,52 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import base +import test_util +import sys + + +class TestTopicPermissions(base.BaseTest): + @classmethod + def setUpClass(cls): + test_util.rabbitmqctl(['set_topic_permissions', 'guest', 'amq.topic', '^{username}.Authorised', '^{username}.Authorised']) + cls.authorised_topic = '/topic/guest.AuthorisedTopic' + cls.restricted_topic = '/topic/guest.RestrictedTopic' + + @classmethod + def tearDownClass(cls): + test_util.rabbitmqctl(['clear_topic_permissions', 'guest']) + + def test_publish_authorisation(self): + ''' Test topic permissions via publish ''' + self.listener.reset() + + # send on authorised topic + self.subscribe_dest(self.conn, self.authorised_topic, None) + self.conn.send(self.authorised_topic, "authorised hello") + + self.assertTrue(self.listener.wait(), "Timeout, no message received") + + # assert no errors + if len(self.listener.errors) > 0: + self.fail(self.listener.errors[0]['message']) + + # check msg content + msg = self.listener.messages[0] + self.assertEqual("authorised hello", msg['message']) + self.assertEqual(self.authorised_topic, msg['headers']['destination']) + + self.listener.reset() + + # send on restricted topic + self.conn.send(self.restricted_topic, "hello") + + self.assertTrue(self.listener.wait(), "Timeout, no message received") + + # assert errors + self.assertGreater(len(self.listener.errors), 0) + self.assertIn("ACCESS_REFUSED", self.listener.errors[0]['message']) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py new file mode 100644 index 0000000000..379806bfb8 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py @@ -0,0 +1,61 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import base +import time + +class TestTransactions(base.BaseTest): + + def test_tx_commit(self): + ''' Test TX with a COMMIT and ensure messages are delivered ''' + destination = "/exchange/amq.fanout" + tx = "test.tx" + + self.listener.reset() + self.subscribe_dest(self.conn, destination, None) + self.conn.begin(transaction=tx) + self.conn.send(destination, "hello!", transaction=tx) + self.conn.send(destination, "again!") + + ## should see the second message + self.assertTrue(self.listener.wait(3)) + self.assertEquals(1, len(self.listener.messages)) + self.assertEquals("again!", self.listener.messages[0]['message']) + + ## now look for the first message + self.listener.reset() + self.conn.commit(transaction=tx) + self.assertTrue(self.listener.wait(3)) + self.assertEquals(1, len(self.listener.messages), + "Missing committed message") + self.assertEquals("hello!", self.listener.messages[0]['message']) + + def test_tx_abort(self): + ''' Test TX with an ABORT and ensure messages are discarded ''' + destination = "/exchange/amq.fanout" + tx = "test.tx" + + self.listener.reset() + self.subscribe_dest(self.conn, destination, None) + self.conn.begin(transaction=tx) + self.conn.send(destination, "hello!", transaction=tx) + self.conn.send(destination, "again!") + + ## should see the second message + self.assertTrue(self.listener.wait(3)) + self.assertEquals(1, len(self.listener.messages)) + self.assertEquals("again!", self.listener.messages[0]['message']) + + ## now look for the first message to be discarded + self.listener.reset() + self.conn.abort(transaction=tx) + self.assertFalse(self.listener.wait(3)) + self.assertEquals(0, len(self.listener.messages), + "Unexpected committed message") + diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py new file mode 100644 index 0000000000..f2c90486eb --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py @@ -0,0 +1,71 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import unittest +import stomp +import pika +import base +import time +import os + +class TestUserGeneratedQueueName(base.BaseTest): + + def test_exchange_dest(self): + queueName='my-user-generated-queue-name-exchange' + + # subscribe + self.subscribe_dest( + self.conn, + '/exchange/amq.direct/test', + None, + headers={ 'x-queue-name': queueName } + ) + + connection = pika.BlockingConnection( + pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + + # publish a message to the named queue + channel.basic_publish( + exchange='', + routing_key=queueName, + body='Hello World!') + + # check if we receive the message from the STOMP subscription + self.assertTrue(self.listener.wait(2), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + + self.conn.disconnect() + connection.close() + + def test_topic_dest(self): + queueName='my-user-generated-queue-name-topic' + + # subscribe + self.subscribe_dest( + self.conn, + '/topic/test', + None, + headers={ 'x-queue-name': queueName } + ) + + connection = pika.BlockingConnection( + pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + + # publish a message to the named queue + channel.basic_publish( + exchange='', + routing_key=queueName, + body='Hello World!') + + # check if we receive the message from the STOMP subscription + self.assertTrue(self.listener.wait(2), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + + self.conn.disconnect() + connection.close() diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py new file mode 100644 index 0000000000..1018abd0d4 --- /dev/null +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py @@ -0,0 +1,62 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +## + +import pika +import base +import time +import os +import re + + +class TestUserGeneratedQueueName(base.BaseTest): + + def test_quorum_queue(self): + queueName = 'my-quorum-queue' + + # subscribe + self.subscribe_dest( + self.conn, + '/topic/quorum-queue-test', + None, + headers={ + 'x-queue-name': queueName, + 'x-queue-type': 'quorum', + 'durable': True, + 'auto-delete': False, + 'id': 1234 + } + ) + + # let the quorum queue some time to start + time.sleep(5) + + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) + channel = connection.channel() + + # publish a message to the named queue + channel.basic_publish( + exchange='', + routing_key=queueName, + body='Hello World!') + + # could we declare a quorum queue? + quorum_queue_supported = True + if len(self.listener.errors) > 0: + pattern = re.compile(r"feature flag is disabled", re.MULTILINE) + for error in self.listener.errors: + if pattern.search(error['message']) != None: + quorum_queue_supported = False + break + + if quorum_queue_supported: + # check if we receive the message from the STOMP subscription + self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertEquals(1, len(self.listener.messages)) + self.conn.disconnect() + + connection.close() diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl new file mode 100644 index 0000000000..739512e3b3 --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl @@ -0,0 +1,75 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +%% The stupidest client imaginable, just for testing. + +-module(rabbit_stomp_client). + +-export([connect/1, connect/2, connect/4, connect/5, disconnect/1, send/2, send/3, send/4, recv/1]). + +-include("rabbit_stomp_frame.hrl"). + +-define(TIMEOUT, 1000). % milliseconds + +connect(Port) -> connect0([], "guest", "guest", Port, []). +connect(V, Port) -> connect0([{"accept-version", V}], "guest", "guest", Port, []). +connect(V, Login, Pass, Port) -> connect0([{"accept-version", V}], Login, Pass, Port, []). +connect(V, Login, Pass, Port, Headers) -> connect0([{"accept-version", V}], Login, Pass, Port, Headers). + +connect0(Version, Login, Pass, Port, Headers) -> + %% The default port is 61613 but it's in the middle of the ephemeral + %% ports range on many operating systems. Therefore, there is a + %% chance this port is already in use. Let's use a port close to the + %% AMQP default port. + {ok, Sock} = gen_tcp:connect(localhost, Port, [{active, false}, binary]), + Client0 = recv_state(Sock), + send(Client0, "CONNECT", [{"login", Login}, + {"passcode", Pass} | Version] ++ Headers), + {#stomp_frame{command = "CONNECTED"}, Client1} = recv(Client0), + {ok, Client1}. + +disconnect(Client = {Sock, _}) -> + send(Client, "DISCONNECT"), + gen_tcp:close(Sock). + +send(Client, Command) -> + send(Client, Command, []). + +send(Client, Command, Headers) -> + send(Client, Command, Headers, []). + +send({Sock, _}, Command, Headers, Body) -> + Frame = rabbit_stomp_frame:serialize( + #stomp_frame{command = list_to_binary(Command), + headers = Headers, + body_iolist = Body}), + gen_tcp:send(Sock, Frame). + +recv_state(Sock) -> + {Sock, []}. + +recv({_Sock, []} = Client) -> + recv(Client, rabbit_stomp_frame:initial_state(), 0); +recv({Sock, [Frame | Frames]}) -> + {Frame, {Sock, Frames}}. + +recv(Client = {Sock, _}, FrameState, Length) -> + {ok, Payload} = gen_tcp:recv(Sock, Length, ?TIMEOUT), + parse(Payload, Client, FrameState, Length). + +parse(Payload, Client = {Sock, FramesRev}, FrameState, Length) -> + case rabbit_stomp_frame:parse(Payload, FrameState) of + {ok, Frame, <<>>} -> + recv({Sock, lists:reverse([Frame | FramesRev])}); + {ok, Frame, <<"\n">>} -> + recv({Sock, lists:reverse([Frame | FramesRev])}); + {ok, Frame, Rest} -> + parse(Rest, {Sock, [Frame | FramesRev]}, + rabbit_stomp_frame:initial_state(), Length); + {more, NewState} -> + recv(Client, NewState, 0) + end. diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl new file mode 100644 index 0000000000..6b5b9298fa --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl @@ -0,0 +1,80 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stomp_publish_test). + +-export([run/0]). + +-include("rabbit_stomp_frame.hrl"). + +-define(DESTINATION, "/queue/test"). + +-define(MICROS_PER_UPDATE, 5000000). +-define(MICROS_PER_UPDATE_MSG, 100000). +-define(MICROS_PER_SECOND, 1000000). + +%% A very simple publish-and-consume-as-fast-as-you-can test. + +run() -> + [put(K, 0) || K <- [sent, recd, last_sent, last_recd]], + put(last_ts, erlang:monotonic_time()), + {ok, Pub} = rabbit_stomp_client:connect(), + {ok, Recv} = rabbit_stomp_client:connect(), + Self = self(), + spawn(fun() -> publish(Self, Pub, 0, erlang:monotonic_time()) end), + rabbit_stomp_client:send( + Recv, "SUBSCRIBE", [{"destination", ?DESTINATION}]), + spawn(fun() -> recv(Self, Recv, 0, erlang:monotonic_time()) end), + report(). + +report() -> + receive + {sent, C} -> put(sent, C); + {recd, C} -> put(recd, C) + end, + Diff = erlang:convert_time_unit( + erlang:monotonic_time() - get(last_ts), native, microseconds), + case Diff > ?MICROS_PER_UPDATE of + true -> S = get(sent) - get(last_sent), + R = get(recd) - get(last_recd), + put(last_sent, get(sent)), + put(last_recd, get(recd)), + put(last_ts, erlang:monotonic_time()), + io:format("Send ~p msg/s | Recv ~p msg/s~n", + [trunc(S * ?MICROS_PER_SECOND / Diff), + trunc(R * ?MICROS_PER_SECOND / Diff)]); + false -> ok + end, + report(). + +publish(Owner, Client, Count, TS) -> + rabbit_stomp_client:send( + Client, "SEND", [{"destination", ?DESTINATION}], + [integer_to_list(Count)]), + Diff = erlang:convert_time_unit( + erlang:monotonic_time() - TS, native, microseconds), + case Diff > ?MICROS_PER_UPDATE_MSG of + true -> Owner ! {sent, Count + 1}, + publish(Owner, Client, Count + 1, + erlang:monotonic_time()); + false -> publish(Owner, Client, Count + 1, TS) + end. + +recv(Owner, Client0, Count, TS) -> + {#stomp_frame{body_iolist = Body}, Client1} = + rabbit_stomp_client:recv(Client0), + BodyInt = list_to_integer(binary_to_list(iolist_to_binary(Body))), + Count = BodyInt, + Diff = erlang:convert_time_unit( + erlang:monotonic_time() - TS, native, microseconds), + case Diff > ?MICROS_PER_UPDATE_MSG of + true -> Owner ! {recd, Count + 1}, + recv(Owner, Client1, Count + 1, + erlang:monotonic_time()); + false -> recv(Owner, Client1, Count + 1, TS) + end. + diff --git a/deps/rabbitmq_stomp/test/src/test.config b/deps/rabbitmq_stomp/test/src/test.config new file mode 100644 index 0000000000..5968824996 --- /dev/null +++ b/deps/rabbitmq_stomp/test/src/test.config @@ -0,0 +1,13 @@ +[{rabbitmq_stomp, [{default_user, []}, + {ssl_cert_login, true}, + {tcp_listeners, [5673]}, + {ssl_listeners, [5674]} + ]}, + {rabbit, [{ssl_options, [{cacertfile,"%%CERTS_DIR%%/testca/cacert.pem"}, + {certfile,"%%CERTS_DIR%%/server/cert.pem"}, + {keyfile,"%%CERTS_DIR%%/server/key.pem"}, + {verify,verify_peer}, + {fail_if_no_peer_cert,true} + ]} + ]} +]. diff --git a/deps/rabbitmq_stomp/test/topic_SUITE.erl b/deps/rabbitmq_stomp/test/topic_SUITE.erl new file mode 100644 index 0000000000..4a6421a326 --- /dev/null +++ b/deps/rabbitmq_stomp/test/topic_SUITE.erl @@ -0,0 +1,170 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(topic_SUITE). + +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp.hrl"). +-include("rabbit_stomp_frame.hrl"). +-include("rabbit_stomp_headers.hrl"). + +all() -> + [{group, list_to_atom("version_" ++ V)} || V <- ?SUPPORTED_VERSIONS]. + +groups() -> + Tests = [ + publish_topic_authorisation, + subscribe_topic_authorisation, + change_default_topic_exchange + ], + + [{list_to_atom("version_" ++ V), [sequence], Tests} + || V <- ?SUPPORTED_VERSIONS]. + +init_per_suite(Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(Group, Config) -> + Version = string:sub_string(atom_to_list(Group), 9), + rabbit_ct_helpers:set_config(Config, [{version, Version}]). + +end_per_group(_Group, Config) -> Config. + +init_per_testcase(_TestCase, Config) -> + Version = ?config(version, Config), + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + {ok, Connection} = amqp_connection:start(#amqp_params_direct{ + node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename) + }), + {ok, Channel} = amqp_connection:open_channel(Connection), + {ok, Client} = rabbit_stomp_client:connect(Version, StompPort), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {amqp_connection, Connection}, + {amqp_channel, Channel}, + {stomp_client, Client} + ]), + init_per_testcase0(Config1). + +end_per_testcase(_TestCase, Config) -> + Connection = ?config(amqp_connection, Config), + Channel = ?config(amqp_channel, Config), + Client = ?config(stomp_client, Config), + rabbit_stomp_client:disconnect(Client), + amqp_channel:close(Channel), + amqp_connection:close(Connection), + end_per_testcase0(Config). + +init_per_testcase0(Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, add_user, + [<<"user">>, <<"pass">>, <<"acting-user">>]), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_permissions, [ + <<"user">>, <<"/">>, <<".*">>, <<".*">>, <<".*">>, <<"acting-user">>]), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_topic_permissions, [ + <<"user">>, <<"/">>, <<"amq.topic">>, <<"^{username}.Authorised">>, <<"^{username}.Authorised">>, <<"acting-user">>]), + Version = ?config(version, Config), + StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), + {ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort), + rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]). + +end_per_testcase0(Config) -> + ClientFoo = ?config(client_foo, Config), + rabbit_stomp_client:disconnect(ClientFoo), + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, delete_user, + [<<"user">>, <<"acting-user">>]), + Config. + +publish_topic_authorisation(Config) -> + ClientFoo = ?config(client_foo, Config), + + AuthorisedTopic = "/topic/user.AuthorisedTopic", + RestrictedTopic = "/topic/user.RestrictedTopic", + + %% send on authorised topic + rabbit_stomp_client:send( + ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]), + + rabbit_stomp_client:send( + ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["authorised hello"]), + + {ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"), + [<<"authorised hello">>] = Body, + + %% send on restricted topic + rabbit_stomp_client:send( + ClientFoo, "SEND", [{"destination", RestrictedTopic}], ["hello"]), + {ok, _Client2, Hdrs2, _} = stomp_receive(ClientFoo, "ERROR"), + "access_refused" = proplists:get_value("message", Hdrs2), + ok. + +subscribe_topic_authorisation(Config) -> + ClientFoo = ?config(client_foo, Config), + + AuthorisedTopic = "/topic/user.AuthorisedTopic", + RestrictedTopic = "/topic/user.RestrictedTopic", + + %% subscribe to authorised topic + rabbit_stomp_client:send( + ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]), + + rabbit_stomp_client:send( + ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["authorised hello"]), + + {ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"), + [<<"authorised hello">>] = Body, + + %% subscribe to restricted topic + rabbit_stomp_client:send( + ClientFoo, "SUBSCRIBE", [{"destination", RestrictedTopic}]), + {ok, _Client2, Hdrs2, _} = stomp_receive(ClientFoo, "ERROR"), + "access_refused" = proplists:get_value("message", Hdrs2), + ok. + +change_default_topic_exchange(Config) -> + Channel = ?config(amqp_channel, Config), + ClientFoo = ?config(client_foo, Config), + Ex = <<"my-topic-exchange">>, + AuthorisedTopic = "/topic/user.AuthorisedTopic", + + Declare = #'exchange.declare'{exchange = Ex, type = <<"topic">>}, + #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare), + + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_stomp, default_topic_exchange, Ex]), + + rabbit_stomp_client:send( + ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]), + + rabbit_stomp_client:send( + ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["ohai there"]), + + {ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"), + [<<"ohai there">>] = Body, + + Delete = #'exchange.delete'{exchange = Ex}, + #'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, [rabbitmq_stomp, default_topic_exchange]), + ok. + + +stomp_receive(Client, Command) -> + {#stomp_frame{command = Command, + headers = Hdrs, + body_iolist = Body}, Client1} = + rabbit_stomp_client:recv(Client), + {ok, Client1, Hdrs, Body}. + diff --git a/deps/rabbitmq_stomp/test/util_SUITE.erl b/deps/rabbitmq_stomp/test/util_SUITE.erl new file mode 100644 index 0000000000..89d9d9e37e --- /dev/null +++ b/deps/rabbitmq_stomp/test/util_SUITE.erl @@ -0,0 +1,242 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(util_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl"). +-include("rabbit_stomp_frame.hrl"). +-compile(export_all). + +all() -> [ + longstr_field, + message_properties, + message_headers, + minimal_message_headers_with_no_custom, + headers_post_process, + headers_post_process_noop_replyto, + headers_post_process_noop2, + negotiate_version_both_empty, + negotiate_version_no_common, + negotiate_version_simple_common, + negotiate_version_two_choice_common, + negotiate_version_two_choice_common_out_of_order, + negotiate_version_two_choice_big_common, + negotiate_version_choice_mismatched_length, + negotiate_version_choice_duplicates, + trim_headers, + ack_mode_auto, + ack_mode_auto_default, + ack_mode_client, + ack_mode_client_individual, + consumer_tag_id, + consumer_tag_destination, + consumer_tag_invalid, + parse_valid_message_id, + parse_invalid_message_id + ]. + + +%%-------------------------------------------------------------------- +%% Header Processing Tests +%%-------------------------------------------------------------------- + +longstr_field(_) -> + {<<"ABC">>, longstr, <<"DEF">>} = + rabbit_stomp_util:longstr_field("ABC", "DEF"). + +message_properties(_) -> + Headers = [ + {"content-type", "text/plain"}, + {"content-encoding", "UTF-8"}, + {"persistent", "true"}, + {"priority", "1"}, + {"correlation-id", "123"}, + {"reply-to", "something"}, + {"expiration", "my-expiration"}, + {"amqp-message-id", "M123"}, + {"timestamp", "123456"}, + {"type", "freshly-squeezed"}, + {"user-id", "joe"}, + {"app-id", "joe's app"}, + {"str", "foo"}, + {"int", "123"} + ], + + #'P_basic'{ + content_type = <<"text/plain">>, + content_encoding = <<"UTF-8">>, + delivery_mode = 2, + priority = 1, + correlation_id = <<"123">>, + reply_to = <<"something">>, + expiration = <<"my-expiration">>, + message_id = <<"M123">>, + timestamp = 123456, + type = <<"freshly-squeezed">>, + user_id = <<"joe">>, + app_id = <<"joe's app">>, + headers = [{<<"str">>, longstr, <<"foo">>}, + {<<"int">>, longstr, <<"123">>}] + } = + rabbit_stomp_util:message_properties(#stomp_frame{headers = Headers}). + +message_headers(_) -> + Properties = #'P_basic'{ + headers = [{<<"str">>, longstr, <<"foo">>}, + {<<"int">>, signedint, 123}], + content_type = <<"text/plain">>, + content_encoding = <<"UTF-8">>, + delivery_mode = 2, + priority = 1, + correlation_id = 123, + reply_to = <<"something">>, + message_id = <<"M123">>, + timestamp = 123456, + type = <<"freshly-squeezed">>, + user_id = <<"joe">>, + app_id = <<"joe's app">>}, + + Headers = rabbit_stomp_util:message_headers(Properties), + + Expected = [ + {"content-type", "text/plain"}, + {"content-encoding", "UTF-8"}, + {"persistent", "true"}, + {"priority", "1"}, + {"correlation-id", "123"}, + {"reply-to", "something"}, + {"expiration", "my-expiration"}, + {"amqp-message-id", "M123"}, + {"timestamp", "123456"}, + {"type", "freshly-squeezed"}, + {"user-id", "joe"}, + {"app-id", "joe's app"}, + {"str", "foo"}, + {"int", "123"} + ], + + [] = lists:subtract(Headers, Expected). + +minimal_message_headers_with_no_custom(_) -> + Properties = #'P_basic'{}, + + Headers = rabbit_stomp_util:message_headers(Properties), + Expected = [ + {"content-type", "text/plain"}, + {"content-encoding", "UTF-8"}, + {"amqp-message-id", "M123"} + ], + + [] = lists:subtract(Headers, Expected). + +headers_post_process(_) -> + Headers = [{"header1", "1"}, + {"header2", "12"}, + {"reply-to", "something"}], + Expected = [{"header1", "1"}, + {"header2", "12"}, + {"reply-to", "/reply-queue/something"}], + [] = lists:subtract( + rabbit_stomp_util:headers_post_process(Headers), Expected). + +headers_post_process_noop_replyto(_) -> + [begin + Headers = [{"reply-to", Prefix ++ "/something"}], + Headers = rabbit_stomp_util:headers_post_process(Headers) + end || Prefix <- rabbit_routing_util:dest_prefixes()]. + +headers_post_process_noop2(_) -> + Headers = [{"header1", "1"}, + {"header2", "12"}], + Expected = [{"header1", "1"}, + {"header2", "12"}], + [] = lists:subtract( + rabbit_stomp_util:headers_post_process(Headers), Expected). + +negotiate_version_both_empty(_) -> + {error, no_common_version} = rabbit_stomp_util:negotiate_version([],[]). + +negotiate_version_no_common(_) -> + {error, no_common_version} = + rabbit_stomp_util:negotiate_version(["1.2"],["1.3"]). + +negotiate_version_simple_common(_) -> + {ok, "1.2"} = + rabbit_stomp_util:negotiate_version(["1.2"],["1.2"]). + +negotiate_version_two_choice_common(_) -> + {ok, "1.3"} = + rabbit_stomp_util:negotiate_version(["1.2", "1.3"],["1.2", "1.3"]). + +negotiate_version_two_choice_common_out_of_order(_) -> + {ok, "1.3"} = + rabbit_stomp_util:negotiate_version(["1.3", "1.2"],["1.2", "1.3"]). + +negotiate_version_two_choice_big_common(_) -> + {ok, "1.20.23"} = + rabbit_stomp_util:negotiate_version(["1.20.23", "1.30.456"], + ["1.20.23", "1.30.457"]). +negotiate_version_choice_mismatched_length(_) -> + {ok, "1.2.3"} = + rabbit_stomp_util:negotiate_version(["1.2", "1.2.3"], + ["1.2.3", "1.2"]). +negotiate_version_choice_duplicates(_) -> + {ok, "1.2"} = + rabbit_stomp_util:negotiate_version(["1.2", "1.2"], + ["1.2", "1.2"]). +trim_headers(_) -> + #stomp_frame{headers = [{"one", "foo"}, {"two", "baz "}]} = + rabbit_stomp_util:trim_headers( + #stomp_frame{headers = [{"one", " foo"}, {"two", " baz "}]}). + +%%-------------------------------------------------------------------- +%% Frame Parsing Tests +%%-------------------------------------------------------------------- + +ack_mode_auto(_) -> + Frame = #stomp_frame{headers = [{"ack", "auto"}]}, + {auto, _} = rabbit_stomp_util:ack_mode(Frame). + +ack_mode_auto_default(_) -> + Frame = #stomp_frame{headers = []}, + {auto, _} = rabbit_stomp_util:ack_mode(Frame). + +ack_mode_client(_) -> + Frame = #stomp_frame{headers = [{"ack", "client"}]}, + {client, true} = rabbit_stomp_util:ack_mode(Frame). + +ack_mode_client_individual(_) -> + Frame = #stomp_frame{headers = [{"ack", "client-individual"}]}, + {client, false} = rabbit_stomp_util:ack_mode(Frame). + +consumer_tag_id(_) -> + Frame = #stomp_frame{headers = [{"id", "foo"}]}, + {ok, <<"T_foo">>, _} = rabbit_stomp_util:consumer_tag(Frame). + +consumer_tag_destination(_) -> + Frame = #stomp_frame{headers = [{"destination", "foo"}]}, + {ok, <<"Q_foo">>, _} = rabbit_stomp_util:consumer_tag(Frame). + +consumer_tag_invalid(_) -> + Frame = #stomp_frame{headers = []}, + {error, missing_destination_header} = rabbit_stomp_util:consumer_tag(Frame). + +%%-------------------------------------------------------------------- +%% Message ID Parsing Tests +%%-------------------------------------------------------------------- + +parse_valid_message_id(_) -> + {ok, {<<"bar">>, "abc", 123}} = + rabbit_stomp_util:parse_message_id("bar@@abc@@123"). + +parse_invalid_message_id(_) -> + {error, invalid_message_id} = + rabbit_stomp_util:parse_message_id("blah"). + |