summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stomp/test
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_stomp/test')
-rw-r--r--deps/rabbitmq_stomp/test/amqqueue_SUITE.erl319
-rw-r--r--deps/rabbitmq_stomp/test/command_SUITE.erl127
-rw-r--r--deps/rabbitmq_stomp/test/config_schema_SUITE.erl55
-rw-r--r--deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem1
-rw-r--r--deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem1
-rw-r--r--deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem1
-rw-r--r--deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets97
-rw-r--r--deps/rabbitmq_stomp/test/connections_SUITE.erl160
-rw-r--r--deps/rabbitmq_stomp/test/frame_SUITE.erl191
-rw-r--r--deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl104
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE.erl72
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile27
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile27
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py252
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py42
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py259
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py51
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py536
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py101
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py187
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py331
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py87
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py40
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py41
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py81
-rwxr-xr-xdeps/rabbitmq_stomp/test/python_SUITE_data/src/test.py21
-rwxr-xr-xdeps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py15
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py26
-rwxr-xr-xdeps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py17
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py52
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py52
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py61
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py71
-rw-r--r--deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py62
-rw-r--r--deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl75
-rw-r--r--deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl80
-rw-r--r--deps/rabbitmq_stomp/test/src/test.config13
-rw-r--r--deps/rabbitmq_stomp/test/topic_SUITE.erl170
-rw-r--r--deps/rabbitmq_stomp/test/util_SUITE.erl242
39 files changed, 4147 insertions, 0 deletions
diff --git a/deps/rabbitmq_stomp/test/amqqueue_SUITE.erl b/deps/rabbitmq_stomp/test/amqqueue_SUITE.erl
new file mode 100644
index 0000000000..0474fd67d6
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/amqqueue_SUITE.erl
@@ -0,0 +1,319 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(amqqueue_SUITE).
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stomp.hrl").
+-include("rabbit_stomp_frame.hrl").
+-include("rabbit_stomp_headers.hrl").
+
+-define(QUEUE, <<"TestQueue">>).
+-define(DESTINATION, "/amq/queue/TestQueue").
+
+all() ->
+ [{group, version_to_group_name(V)} || V <- ?SUPPORTED_VERSIONS].
+
+groups() ->
+ Tests = [
+ publish_no_dest_error,
+ publish_unauthorized_error,
+ subscribe_error,
+ subscribe,
+ unsubscribe_ack,
+ subscribe_ack,
+ send,
+ delete_queue_subscribe,
+ temp_destination_queue,
+ temp_destination_in_send,
+ blank_destination_in_send
+ ],
+
+ [{version_to_group_name(V), [sequence], Tests}
+ || V <- ?SUPPORTED_VERSIONS].
+
+version_to_group_name(V) ->
+ list_to_atom(re:replace("version_" ++ V,
+ "\\.",
+ "_",
+ [global, {return, list}])).
+
+init_per_suite(Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(Group, Config) ->
+ Suffix = string:sub_string(atom_to_list(Group), 9),
+ Version = re:replace(Suffix, "_", ".", [global, {return, list}]),
+ rabbit_ct_helpers:set_config(Config, [{version, Version}]).
+
+end_per_group(_Group, Config) -> Config.
+
+init_per_testcase(TestCase, Config) ->
+ Version = ?config(version, Config),
+ StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+ {ok, Connection} = amqp_connection:start(#amqp_params_direct{
+ node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)
+ }),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+ {ok, Client} = rabbit_stomp_client:connect(Version, StompPort),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {amqp_connection, Connection},
+ {amqp_channel, Channel},
+ {stomp_client, Client}
+ ]),
+ init_per_testcase0(TestCase, Config1).
+
+end_per_testcase(TestCase, Config) ->
+ Connection = ?config(amqp_connection, Config),
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ rabbit_stomp_client:disconnect(Client),
+ amqp_channel:close(Channel),
+ amqp_connection:close(Connection),
+ end_per_testcase0(TestCase, Config).
+
+init_per_testcase0(publish_unauthorized_error, Config) ->
+ Channel = ?config(amqp_channel, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = <<"RestrictedQueue">>,
+ auto_delete = true}),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, add_user,
+ [<<"user">>, <<"pass">>, <<"acting-user">>]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_permissions, [
+ <<"user">>, <<"/">>, <<"nothing">>, <<"nothing">>, <<"nothing">>, <<"acting-user">>]),
+ Version = ?config(version, Config),
+ StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+ {ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort),
+ rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]);
+init_per_testcase0(_, Config) ->
+ Config.
+
+end_per_testcase0(publish_unauthorized_error, Config) ->
+ ClientFoo = ?config(client_foo, Config),
+ rabbit_stomp_client:disconnect(ClientFoo),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, delete_user,
+ [<<"user">>, <<"acting-user">>]),
+ Config;
+end_per_testcase0(_, Config) ->
+ Config.
+
+publish_no_dest_error(Config) ->
+ Client = ?config(stomp_client, Config),
+ rabbit_stomp_client:send(
+ Client, "SEND", [{"destination", "/exchange/non-existent"}], ["hello"]),
+ {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"),
+ "not_found" = proplists:get_value("message", Hdrs),
+ ok.
+
+publish_unauthorized_error(Config) ->
+ ClientFoo = ?config(client_foo, Config),
+ rabbit_stomp_client:send(
+ ClientFoo, "SEND", [{"destination", "/amq/queue/RestrictedQueue"}], ["hello"]),
+ {ok, _Client1, Hdrs, _} = stomp_receive(ClientFoo, "ERROR"),
+ "access_refused" = proplists:get_value("message", Hdrs),
+ ok.
+
+subscribe_error(Config) ->
+ Client = ?config(stomp_client, Config),
+ %% SUBSCRIBE to missing queue
+ rabbit_stomp_client:send(
+ Client, "SUBSCRIBE", [{"destination", ?DESTINATION}]),
+ {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"),
+ "not_found" = proplists:get_value("message", Hdrs),
+ ok.
+
+subscribe(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
+ auto_delete = true}),
+
+ %% subscribe and wait for receipt
+ rabbit_stomp_client:send(
+ Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "foo"}]),
+ {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"),
+
+ %% send from amqp
+ Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE},
+
+ amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
+ payload = <<"hello">>}),
+
+ {ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
+ ok.
+
+unsubscribe_ack(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ Version = ?config(version, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
+ auto_delete = true}),
+ %% subscribe and wait for receipt
+ rabbit_stomp_client:send(
+ Client, "SUBSCRIBE", [{"destination", ?DESTINATION},
+ {"receipt", "rcpt1"},
+ {"ack", "client"},
+ {"id", "subscription-id"}]),
+ {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"),
+
+ %% send from amqp
+ Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE},
+
+ amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
+ payload = <<"hello">>}),
+
+ {ok, Client2, Hdrs1, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
+
+ rabbit_stomp_client:send(
+ Client2, "UNSUBSCRIBE", [{"destination", ?DESTINATION},
+ {"id", "subscription-id"}]),
+
+ rabbit_stomp_client:send(
+ Client2, "ACK", [{rabbit_stomp_util:ack_header_name(Version),
+ proplists:get_value(
+ rabbit_stomp_util:msg_header_name(Version), Hdrs1)},
+ {"receipt", "rcpt2"}]),
+
+ {ok, _Client3, Hdrs2, _Body2} = stomp_receive(Client2, "ERROR"),
+ ?assertEqual("Subscription not found",
+ proplists:get_value("message", Hdrs2)),
+ ok.
+
+subscribe_ack(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ Version = ?config(version, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
+ auto_delete = true}),
+
+ %% subscribe and wait for receipt
+ rabbit_stomp_client:send(
+ Client, "SUBSCRIBE", [{"destination", ?DESTINATION},
+ {"receipt", "foo"},
+ {"ack", "client"}]),
+ {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"),
+
+ %% send from amqp
+ Method = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE},
+
+ amqp_channel:call(Channel, Method, #amqp_msg{props = #'P_basic'{},
+ payload = <<"hello">>}),
+
+ {ok, _Client2, Headers, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
+ false = (Version == "1.2") xor proplists:is_defined(?HEADER_ACK, Headers),
+
+ MsgHeader = rabbit_stomp_util:msg_header_name(Version),
+ AckValue = proplists:get_value(MsgHeader, Headers),
+ AckHeader = rabbit_stomp_util:ack_header_name(Version),
+
+ rabbit_stomp_client:send(Client, "ACK", [{AckHeader, AckValue}]),
+ #'basic.get_empty'{} =
+ amqp_channel:call(Channel, #'basic.get'{queue = ?QUEUE}),
+ ok.
+
+send(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
+ auto_delete = true}),
+
+ %% subscribe and wait for receipt
+ rabbit_stomp_client:send(
+ Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "foo"}]),
+ {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"),
+
+ %% send from stomp
+ rabbit_stomp_client:send(
+ Client1, "SEND", [{"destination", ?DESTINATION}], ["hello"]),
+
+ {ok, _Client2, _, [<<"hello">>]} = stomp_receive(Client1, "MESSAGE"),
+ ok.
+
+delete_queue_subscribe(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
+ auto_delete = true}),
+
+ %% subscribe and wait for receipt
+ rabbit_stomp_client:send(
+ Client, "SUBSCRIBE", [{"destination", ?DESTINATION}, {"receipt", "bah"}]),
+ {ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"),
+
+ %% delete queue while subscribed
+ #'queue.delete_ok'{} =
+ amqp_channel:call(Channel, #'queue.delete'{queue = ?QUEUE}),
+
+ {ok, _Client2, Headers, _} = stomp_receive(Client1, "ERROR"),
+
+ ?DESTINATION = proplists:get_value("subscription", Headers),
+
+ % server closes connection
+ ok.
+
+temp_destination_queue(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = ?QUEUE,
+ auto_delete = true}),
+ rabbit_stomp_client:send( Client, "SEND", [{"destination", ?DESTINATION},
+ {"reply-to", "/temp-queue/foo"}],
+ ["ping"]),
+ amqp_channel:call(Channel,#'basic.consume'{queue = ?QUEUE, no_ack = true}),
+ receive #'basic.consume_ok'{consumer_tag = _Tag} -> ok end,
+ ReplyTo = receive {#'basic.deliver'{delivery_tag = _DTag},
+ #'amqp_msg'{payload = <<"ping">>,
+ props = #'P_basic'{reply_to = RT}}} -> RT
+ end,
+ ok = amqp_channel:call(Channel,
+ #'basic.publish'{routing_key = ReplyTo},
+ #amqp_msg{payload = <<"pong">>}),
+ {ok, _Client1, _, [<<"pong">>]} = stomp_receive(Client, "MESSAGE"),
+ ok.
+
+temp_destination_in_send(Config) ->
+ Client = ?config(stomp_client, Config),
+ rabbit_stomp_client:send( Client, "SEND", [{"destination", "/temp-queue/foo"}],
+ ["poing"]),
+ {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"),
+ "Invalid destination" = proplists:get_value("message", Hdrs),
+ ok.
+
+blank_destination_in_send(Config) ->
+ Client = ?config(stomp_client, Config),
+ rabbit_stomp_client:send( Client, "SEND", [{"destination", ""}],
+ ["poing"]),
+ {ok, _Client1, Hdrs, _} = stomp_receive(Client, "ERROR"),
+ "Invalid destination" = proplists:get_value("message", Hdrs),
+ ok.
+
+stomp_receive(Client, Command) ->
+ {#stomp_frame{command = Command,
+ headers = Hdrs,
+ body_iolist = Body}, Client1} =
+ rabbit_stomp_client:recv(Client),
+ {ok, Client1, Hdrs, Body}.
+
diff --git a/deps/rabbitmq_stomp/test/command_SUITE.erl b/deps/rabbitmq_stomp/test/command_SUITE.erl
new file mode 100644
index 0000000000..8fe9fa0d0f
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/command_SUITE.erl
@@ -0,0 +1,127 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(command_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stomp.hrl").
+
+
+-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStompConnectionsCommand').
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ merge_defaults,
+ run
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+merge_defaults(_Config) ->
+ {[<<"session_id">>, <<"conn_name">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([], #{}),
+
+ {[<<"other_key">>], #{verbose := true}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
+
+ {[<<"other_key">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
+
+
+run(Config) ->
+
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Opts = #{node => Node, timeout => 10000, verbose => false},
+
+ %% No connections
+ [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
+
+ StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+
+ {ok, _Client} = rabbit_stomp_client:connect(StompPort),
+ ct:sleep(100),
+
+ [[{session_id, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)),
+
+
+ {ok, _Client2} = rabbit_stomp_client:connect(StompPort),
+ ct:sleep(100),
+
+ [[{session_id, _}], [{session_id, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)),
+
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ start_amqp_connection(network, Node, Port),
+
+ %% There are still just two connections
+ [[{session_id, _}], [{session_id, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)),
+
+ start_amqp_connection(direct, Node, Port),
+
+ %% Still two MQTT connections, one direct AMQP 0-9-1 connection
+ [[{session_id, _}], [{session_id, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"session_id">>], Opts)),
+
+ %% Verbose returns all keys
+ Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
+ AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)),
+ AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
+
+ %% There are two connections
+ [First, _Second] = AllKeys,
+
+ %% Keys are INFO_ITEMS
+ KeysCount = length(?INFO_ITEMS),
+ KeysCount = length(First),
+
+ {Keys, _} = lists:unzip(First),
+
+ [] = Keys -- ?INFO_ITEMS,
+ [] = ?INFO_ITEMS -- Keys.
+
+
+start_amqp_connection(Type, Node, Port) ->
+ Params = amqp_params(Type, Node, Port),
+ {ok, _Connection} = amqp_connection:start(Params).
+
+amqp_params(network, _, Port) ->
+ #amqp_params_network{port = Port};
+amqp_params(direct, Node, _) ->
+ #amqp_params_direct{node = Node}.
diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE.erl b/deps/rabbitmq_stomp/test/config_schema_SUITE.erl
new file mode 100644
index 0000000000..8d340810f7
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/config_schema_SUITE.erl
@@ -0,0 +1,55 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(config_schema_SUITE).
+
+-compile(export_all).
+
+all() ->
+ [
+ run_snippets
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:run_setup_steps(Config),
+ rabbit_ct_config_schema:init_schemas(rabbitmq_stomp, Config1).
+
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+run_snippets(Config) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, run_snippets1, [Config]).
+
+run_snippets1(Config) ->
+ rabbit_ct_config_schema:run_snippets(Config).
+
diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem
new file mode 100644
index 0000000000..eaf6b67806
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cacert.pem
@@ -0,0 +1 @@
+I'm not a certificate
diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem
new file mode 100644
index 0000000000..eaf6b67806
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/cert.pem
@@ -0,0 +1 @@
+I'm not a certificate
diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem
new file mode 100644
index 0000000000..eaf6b67806
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/certs/key.pem
@@ -0,0 +1 @@
+I'm not a certificate
diff --git a/deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets
new file mode 100644
index 0000000000..6081240c68
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/config_schema_SUITE_data/rabbitmq_stomp.snippets
@@ -0,0 +1,97 @@
+[{listener_port,
+ "stomp.listeners.tcp.1 = 12345",
+ [{rabbitmq_stomp,[{tcp_listeners,[12345]}]}],
+ [rabbitmq_stomp]},
+ {listeners_ip,
+ "stomp.listeners.tcp.1 = 127.0.0.1:61613
+ stomp.listeners.tcp.2 = ::1:61613",
+ [{rabbitmq_stomp,[{tcp_listeners,[{"127.0.0.1",61613},{"::1",61613}]}]}],
+ [rabbitmq_stomp]},
+
+ {listener_tcp_options,
+ "stomp.listeners.tcp.1 = 127.0.0.1:61613
+ stomp.listeners.tcp.2 = ::1:61613
+
+ stomp.tcp_listen_options.backlog = 2048
+ stomp.tcp_listen_options.recbuf = 8192
+ stomp.tcp_listen_options.sndbuf = 8192
+
+ stomp.tcp_listen_options.keepalive = true
+ stomp.tcp_listen_options.nodelay = true
+
+ stomp.tcp_listen_options.exit_on_close = true
+
+ stomp.tcp_listen_options.send_timeout = 120
+",
+ [{rabbitmq_stomp,[
+ {tcp_listeners,[
+ {"127.0.0.1",61613},
+ {"::1",61613}
+ ]}
+ , {tcp_listen_options, [
+ {backlog, 2048},
+ {exit_on_close, true},
+
+ {recbuf, 8192},
+ {sndbuf, 8192},
+
+ {send_timeout, 120},
+
+ {keepalive, true},
+ {nodelay, true}
+ ]}
+ ]}],
+ [rabbitmq_stomp]},
+
+ {ssl,
+ "ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
+ ssl_options.certfile = test/config_schema_SUITE_data/certs/cert.pem
+ ssl_options.keyfile = test/config_schema_SUITE_data/certs/key.pem
+ ssl_options.verify = verify_peer
+ ssl_options.fail_if_no_peer_cert = true
+
+ stomp.listeners.tcp.1 = 61613
+ stomp.listeners.ssl.1 = 61614",
+ [{rabbit,
+ [{ssl_options,
+ [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
+ {certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
+ {keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
+ {verify,verify_peer},
+ {fail_if_no_peer_cert,true}]}]},
+ {rabbitmq_stomp,[{tcp_listeners,[61613]},{ssl_listeners,[61614]}]}],
+ [rabbitmq_stomp]},
+ {defaults,
+ "stomp.default_user = guest
+ stomp.default_pass = guest
+ stomp.proxy_protocol = false
+ stomp.hide_server_info = false",
+ [{rabbitmq_stomp,[{default_user,[{login,"guest"},{passcode,"guest"}]},
+ {proxy_protocol,false},{hide_server_info,false}]}],
+ [rabbitmq_stomp]},
+ {ssl_cert_login,
+ "stomp.ssl_cert_login = true",
+ [{rabbitmq_stomp,[{ssl_cert_login,true}]}],
+ [rabbitmq_stomp]},
+ {proxy_protocol,
+ "stomp.default_user = guest
+ stomp.default_pass = guest
+ stomp.implicit_connect = true
+ stomp.proxy_protocol = true",
+ [{rabbitmq_stomp,[{default_user,[{login,"guest"},{passcode,"guest"}]},
+ {implicit_connect,true},
+ {proxy_protocol,true}]}],
+ [rabbitmq_stomp]},
+ {default_vhost,
+ "stomp.default_vhost = /",
+ [{rabbitmq_stomp,[{default_vhost,<<"/">>}]}],
+ [rabbitmq_stomp]},
+ {default_topic_exchange,
+ "stomp.default_topic_exchange = my.fancy.topic",
+ [{rabbitmq_stomp,[{default_topic_exchange,<<"my.fancy.topic">>}]}],
+ [rabbitmq_stomp]},
+ {hide_server_info,
+ "stomp.hide_server_info = true",
+ [{rabbitmq_stomp,[{hide_server_info,true}]}],
+ [rabbitmq_stomp]}
+].
diff --git a/deps/rabbitmq_stomp/test/connections_SUITE.erl b/deps/rabbitmq_stomp/test/connections_SUITE.erl
new file mode 100644
index 0000000000..4f9b027bb9
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/connections_SUITE.erl
@@ -0,0 +1,160 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(connections_SUITE).
+-compile(export_all).
+
+-import(rabbit_misc, [pget/2]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stomp_frame.hrl").
+-define(DESTINATION, "/queue/bulk-test").
+
+all() ->
+ [
+ messages_not_dropped_on_disconnect,
+ direct_client_connections_are_not_leaked,
+ stats_are_not_leaked,
+ stats,
+ heartbeat
+ ].
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config,
+ {rabbit, [
+ {collect_statistics, basic},
+ {collect_statistics_interval, 100}
+ ]}).
+
+init_per_suite(Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()).
+
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+-define(GARBAGE, <<"bdaf63dda9d78b075c748b740e7c3510ad203b07\nbdaf63dd">>).
+
+count_connections(Config) ->
+ StompPort = get_stomp_port(Config),
+ %% The default port is 61613 but it's in the middle of the ephemeral
+ %% ports range on many operating systems. Therefore, there is a
+ %% chance this port is already in use. Let's use a port close to the
+ %% AMQP default port.
+ IPv4Count = try
+ %% Count IPv4 connections. On some platforms, the IPv6 listener
+ %% implicitely listens to IPv4 connections too so the IPv4
+ %% listener doesn't exist. Thus this try/catch. This is the case
+ %% with Linux where net.ipv6.bindv6only is disabled (default in
+ %% most cases).
+ rpc_count_connections(Config, {acceptor, {0,0,0,0}, StompPort})
+ catch
+ _:{badarg, _} -> 0;
+ _:Other -> exit({foo, Other})
+ end,
+ IPv6Count = try
+ %% Count IPv6 connections. We also use a try/catch block in case
+ %% the host is not configured for IPv6.
+ rpc_count_connections(Config, {acceptor, {0,0,0,0,0,0,0,0}, StompPort})
+ catch
+ _:{badarg, _} -> 0;
+ _:Other1 -> exit({foo, Other1})
+ end,
+ IPv4Count + IPv6Count.
+
+rpc_count_connections(Config, ConnSpec) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ ranch_server, count_connections, [ConnSpec]).
+
+direct_client_connections_are_not_leaked(Config) ->
+ StompPort = get_stomp_port(Config),
+ N = count_connections(Config),
+ lists:foreach(fun (_) ->
+ {ok, Client = {Socket, _}} = rabbit_stomp_client:connect(StompPort),
+ %% send garbage which trips up the parser
+ gen_tcp:send(Socket, ?GARBAGE),
+ rabbit_stomp_client:send(
+ Client, "LOL", [{"", ""}])
+ end,
+ lists:seq(1, 100)),
+ timer:sleep(5000),
+ N = count_connections(Config),
+ ok.
+
+messages_not_dropped_on_disconnect(Config) ->
+ StompPort = get_stomp_port(Config),
+ N = count_connections(Config),
+ {ok, Client} = rabbit_stomp_client:connect(StompPort),
+ N1 = N + 1,
+ N1 = count_connections(Config),
+ [rabbit_stomp_client:send(
+ Client, "SEND", [{"destination", ?DESTINATION}],
+ [integer_to_list(Count)]) || Count <- lists:seq(1, 1000)],
+ rabbit_stomp_client:disconnect(Client),
+ QName = rabbit_misc:r(<<"/">>, queue, <<"bulk-test">>),
+ timer:sleep(3000),
+ N = count_connections(Config),
+ {ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QName]),
+ Messages = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, info, [Q, [messages]]),
+ 1000 = pget(messages, Messages),
+ ok.
+
+get_stomp_port(Config) ->
+ rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp).
+
+stats_are_not_leaked(Config) ->
+ StompPort = get_stomp_port(Config),
+ N = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [connection_metrics, size]),
+ {ok, C} = gen_tcp:connect("localhost", StompPort, []),
+ Bin = <<"GET / HTTP/1.1\r\nHost: www.rabbitmq.com\r\nUser-Agent: curl/7.43.0\r\nAccept: */*\n\n">>,
+ gen_tcp:send(C, Bin),
+ gen_tcp:close(C),
+ timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
+ N = rabbit_ct_broker_helpers:rpc(Config, 0, ets, info, [connection_metrics, size]),
+ ok.
+
+stats(Config) ->
+ StompPort = get_stomp_port(Config),
+ {ok, Client} = rabbit_stomp_client:connect(StompPort),
+ timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
+ %% Retrieve the connection Pid
+ [Reader] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp, list, []),
+ [{_, Pid}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp_reader,
+ info, [Reader, [connection]]),
+ %% Verify the content of the metrics, garbage_collection must be present
+ [{Pid, Props}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup,
+ [connection_metrics, Pid]),
+ true = proplists:is_defined(garbage_collection, Props),
+ 0 = proplists:get_value(timeout, Props),
+ %% If the coarse entry is present, stats were successfully emitted
+ [{Pid, _, _, _, _}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup,
+ [connection_coarse_metrics, Pid]),
+ rabbit_stomp_client:disconnect(Client),
+ ok.
+
+heartbeat(Config) ->
+ StompPort = get_stomp_port(Config),
+ {ok, Client} = rabbit_stomp_client:connect("1.2", "guest", "guest", StompPort,
+ [{"heart-beat", "5000,7000"}]),
+ timer:sleep(1000), %% Wait for stats to be emitted, which it does every 100ms
+ %% Retrieve the connection Pid
+ [Reader] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp, list, []),
+ [{_, Pid}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stomp_reader,
+ info, [Reader, [connection]]),
+ %% Verify the content of the heartbeat timeout
+ [{Pid, Props}] = rabbit_ct_broker_helpers:rpc(Config, 0, ets, lookup,
+ [connection_metrics, Pid]),
+ 5 = proplists:get_value(timeout, Props),
+ rabbit_stomp_client:disconnect(Client),
+ ok.
diff --git a/deps/rabbitmq_stomp/test/frame_SUITE.erl b/deps/rabbitmq_stomp/test/frame_SUITE.erl
new file mode 100644
index 0000000000..da191ac12a
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/frame_SUITE.erl
@@ -0,0 +1,191 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(frame_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stomp_frame.hrl").
+-include("rabbit_stomp_headers.hrl").
+-compile(export_all).
+
+all() ->
+ [
+ parse_simple_frame,
+ parse_simple_frame_crlf,
+ parse_command_only,
+ parse_command_prefixed_with_newline,
+ parse_ignore_empty_frames,
+ parse_heartbeat_interframe,
+ parse_crlf_interframe,
+ parse_carriage_return_not_ignored_interframe,
+ parse_carriage_return_mid_command,
+ parse_carriage_return_end_command,
+ parse_resume_mid_command,
+ parse_resume_mid_header_key,
+ parse_resume_mid_header_val,
+ parse_resume_mid_body,
+ parse_no_header_stripping,
+ parse_multiple_headers,
+ header_no_colon,
+ no_nested_escapes,
+ header_name_with_cr,
+ header_value_with_cr,
+ header_value_with_colon,
+ headers_escaping_roundtrip,
+ headers_escaping_roundtrip_without_trailing_lf
+ ].
+
+parse_simple_frame(_) ->
+ parse_simple_frame_gen("\n").
+
+parse_simple_frame_crlf(_) ->
+ parse_simple_frame_gen("\r\n").
+
+parse_simple_frame_gen(Term) ->
+ Headers = [{"header1", "value1"}, {"header2", "value2"}],
+ Content = frame_string("COMMAND",
+ Headers,
+ "Body Content",
+ Term),
+ {"COMMAND", Frame, _State} = parse_complete(Content),
+ [?assertEqual({ok, Value},
+ rabbit_stomp_frame:header(Frame, Key)) ||
+ {Key, Value} <- Headers],
+ #stomp_frame{body_iolist = Body} = Frame,
+ ?assertEqual(<<"Body Content">>, iolist_to_binary(Body)).
+
+parse_command_only(_) ->
+ {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("COMMAND\n\n\0").
+
+parse_command_prefixed_with_newline(_) ->
+ {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\nCOMMAND\n\n\0").
+
+parse_ignore_empty_frames(_) ->
+ {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\0\0COMMAND\n\n\0").
+
+parse_heartbeat_interframe(_) ->
+ {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\nCOMMAND\n\n\0").
+
+parse_crlf_interframe(_) ->
+ {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse("\r\nCOMMAND\n\n\0").
+
+parse_carriage_return_not_ignored_interframe(_) ->
+ {error, {unexpected_chars_between_frames, "\rC"}} = parse("\rCOMMAND\n\n\0").
+
+parse_carriage_return_mid_command(_) ->
+ {error, {unexpected_chars_in_command, "\rA"}} = parse("COMM\rAND\n\n\0").
+
+parse_carriage_return_end_command(_) ->
+ {error, {unexpected_chars_in_command, "\r\r"}} = parse("COMMAND\r\r\n\n\0").
+
+parse_resume_mid_command(_) ->
+ First = "COMM",
+ Second = "AND\n\n\0",
+ {more, Resume} = parse(First),
+ {ok, #stomp_frame{command = "COMMAND"}, _Rest} = parse(Second, Resume).
+
+parse_resume_mid_header_key(_) ->
+ First = "COMMAND\nheade",
+ Second = "r1:value1\n\n\0",
+ {more, Resume} = parse(First),
+ {ok, Frame = #stomp_frame{command = "COMMAND"}, _Rest} =
+ parse(Second, Resume),
+ ?assertEqual({ok, "value1"},
+ rabbit_stomp_frame:header(Frame, "header1")).
+
+parse_resume_mid_header_val(_) ->
+ First = "COMMAND\nheader1:val",
+ Second = "ue1\n\n\0",
+ {more, Resume} = parse(First),
+ {ok, Frame = #stomp_frame{command = "COMMAND"}, _Rest} =
+ parse(Second, Resume),
+ ?assertEqual({ok, "value1"},
+ rabbit_stomp_frame:header(Frame, "header1")).
+
+parse_resume_mid_body(_) ->
+ First = "COMMAND\n\nABC",
+ Second = "DEF\0",
+ {more, Resume} = parse(First),
+ {ok, #stomp_frame{command = "COMMAND", body_iolist = Body}, _Rest} =
+ parse(Second, Resume),
+ ?assertEqual([<<"ABC">>, <<"DEF">>], Body).
+
+parse_no_header_stripping(_) ->
+ Content = "COMMAND\nheader: foo \n\n\0",
+ {ok, Frame, _} = parse(Content),
+ {ok, Val} = rabbit_stomp_frame:header(Frame, "header"),
+ ?assertEqual(" foo ", Val).
+
+parse_multiple_headers(_) ->
+ Content = "COMMAND\nheader:correct\nheader:incorrect\n\n\0",
+ {ok, Frame, _} = parse(Content),
+ {ok, Val} = rabbit_stomp_frame:header(Frame, "header"),
+ ?assertEqual("correct", Val).
+
+header_no_colon(_) ->
+ Content = "COMMAND\n"
+ "hdr1:val1\n"
+ "hdrerror\n"
+ "hdr2:val2\n"
+ "\n\0",
+ ?assertEqual(parse(Content), {error, {header_no_value, "hdrerror"}}).
+
+no_nested_escapes(_) ->
+ Content = "COM\\\\rAND\n" % no escapes
+ "hdr\\\\rname:" % one escape
+ "hdr\\\\rval\n\n\0", % one escape
+ {ok, Frame, _} = parse(Content),
+ ?assertEqual(Frame,
+ #stomp_frame{command = "COM\\\\rAND",
+ headers = [{"hdr\\rname", "hdr\\rval"}],
+ body_iolist = []}).
+
+header_name_with_cr(_) ->
+ Content = "COMMAND\nhead\rer:val\n\n\0",
+ {error, {unexpected_chars_in_header, "\re"}} = parse(Content).
+
+header_value_with_cr(_) ->
+ Content = "COMMAND\nheader:val\rue\n\n\0",
+ {error, {unexpected_chars_in_header, "\ru"}} = parse(Content).
+
+header_value_with_colon(_) ->
+ Content = "COMMAND\nheader:val:ue\n\n\0",
+ {ok, Frame, _} = parse(Content),
+ ?assertEqual(Frame,
+ #stomp_frame{ command = "COMMAND",
+ headers = [{"header", "val:ue"}],
+ body_iolist = []}).
+
+test_frame_serialization(Expected, TrailingLF) ->
+ {ok, Frame, _} = parse(Expected),
+ {ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"),
+ ?assertEqual(":\n\r\\", Val),
+ Serialized = lists:flatten(rabbit_stomp_frame:serialize(Frame, TrailingLF)),
+ ?assertEqual(Expected, rabbit_misc:format("~s", [Serialized])).
+
+headers_escaping_roundtrip(_) ->
+ test_frame_serialization("COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0\n", true).
+
+headers_escaping_roundtrip_without_trailing_lf(_) ->
+ test_frame_serialization("COMMAND\nhead\\r\\c\\ner:\\c\\n\\r\\\\\n\n\0", false).
+
+parse(Content) ->
+ parse(Content, rabbit_stomp_frame:initial_state()).
+parse(Content, State) ->
+ rabbit_stomp_frame:parse(list_to_binary(Content), State).
+
+parse_complete(Content) ->
+ {ok, Frame = #stomp_frame{command = Command}, State} = parse(Content),
+ {Command, Frame, State}.
+
+frame_string(Command, Headers, BodyContent, Term) ->
+ HeaderString =
+ lists:flatten([Key ++ ":" ++ Value ++ Term || {Key, Value} <- Headers]),
+ Command ++ Term ++ HeaderString ++ Term ++ BodyContent ++ "\0" ++ "\n".
+
diff --git a/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl
new file mode 100644
index 0000000000..46c1c6c743
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/proxy_protocol_SUITE.erl
@@ -0,0 +1,104 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(proxy_protocol_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(TIMEOUT, 5000).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ proxy_protocol,
+ proxy_protocol_tls
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_certspwd, "bunnychow"},
+ {rabbitmq_ct_tls_verify, verify_none}
+ ]),
+ MqttConfig = stomp_config(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ [ fun(Conf) -> merge_app_env(MqttConfig, Conf) end ] ++
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+stomp_config() ->
+ {rabbitmq_stomp, [
+ {proxy_protocol, true}
+ ]}.
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+proxy_protocol(Config) ->
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+ {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
+ [binary, {active, false}, {packet, raw}]),
+ ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
+ ok = inet:send(Socket, stomp_connect_frame()),
+ {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
+ ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, connection_name, []),
+ match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]),
+ gen_tcp:close(Socket),
+ ok.
+
+proxy_protocol_tls(Config) ->
+ app_utils:start_applications([asn1, crypto, public_key, ssl]),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls),
+ {ok, Socket} = gen_tcp:connect({127,0,0,1}, Port,
+ [binary, {active, false}, {packet, raw}]),
+ ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
+ {ok, SslSocket} = ssl:connect(Socket, [], ?TIMEOUT),
+ ok = ssl:send(SslSocket, stomp_connect_frame()),
+ {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT),
+ ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, connection_name, []),
+ match = re:run(ConnectionName, <<"^192.168.1.1:80 ">>, [{capture, none}]),
+ gen_tcp:close(Socket),
+ ok.
+
+connection_name() ->
+ Connections = ets:tab2list(connection_created),
+ {_Key, Values} = lists:nth(1, Connections),
+ {_, Name} = lists:keyfind(name, 1, Values),
+ Name.
+
+merge_app_env(MqttConfig, Config) ->
+ rabbit_ct_helpers:merge_app_env(Config, MqttConfig).
+
+stomp_connect_frame() ->
+ <<"CONNECT\n",
+ "login:guest\n",
+ "passcode:guest\n",
+ "\n",
+ 0>>. \ No newline at end of file
diff --git a/deps/rabbitmq_stomp/test/python_SUITE.erl b/deps/rabbitmq_stomp/test/python_SUITE.erl
new file mode 100644
index 0000000000..9613b25032
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE.erl
@@ -0,0 +1,72 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(python_SUITE).
+-compile(export_all).
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+ [
+ common,
+ ssl,
+ connect_options
+ ].
+
+init_per_testcase(TestCase, Config) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, TestCase, "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_certspwd, "bunnychow"},
+ {rmq_nodename_suffix, Suffix}]),
+ rabbit_ct_helpers:log_environment(),
+ Config2 = rabbit_ct_helpers:run_setup_steps(
+ Config1,
+ rabbit_ct_broker_helpers:setup_steps()),
+ DataDir = ?config(data_dir, Config2),
+ PikaDir = filename:join([DataDir, "deps", "pika"]),
+ StomppyDir = filename:join([DataDir, "deps", "stomppy"]),
+ rabbit_ct_helpers:make(Config2, PikaDir, []),
+ rabbit_ct_helpers:make(Config2, StomppyDir, []),
+ Config2.
+
+end_per_testcase(_, Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+
+common(Config) ->
+ run(Config, filename:join("src", "test.py")).
+
+connect_options(Config) ->
+ run(Config, filename:join("src", "test_connect_options.py")).
+
+ssl(Config) ->
+ run(Config, filename:join("src", "test_ssl.py")).
+
+run(Config, Test) ->
+ DataDir = ?config(data_dir, Config),
+ CertsDir = rabbit_ct_helpers:get_config(Config, rmq_certsdir),
+ StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+ StompPortTls = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp_tls),
+ AmqpPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ PythonPath = os:getenv("PYTHONPATH"),
+ os:putenv("PYTHONPATH", filename:join([DataDir, "deps", "pika","pika"])
+ ++":"++
+ filename:join([DataDir, "deps", "stomppy", "stomppy"])
+ ++ ":" ++
+ PythonPath),
+ os:putenv("AMQP_PORT", integer_to_list(AmqpPort)),
+ os:putenv("STOMP_PORT", integer_to_list(StompPort)),
+ os:putenv("STOMP_PORT_TLS", integer_to_list(StompPortTls)),
+ os:putenv("RABBITMQ_NODENAME", atom_to_list(NodeName)),
+ os:putenv("SSL_CERTS_PATH", CertsDir),
+ {ok, _} = rabbit_ct_helpers:exec([filename:join(DataDir, Test)]).
+
+
+cur_dir() ->
+ {ok, Src} = filelib:find_source(?MODULE),
+ filename:dirname(Src).
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile
new file mode 100644
index 0000000000..10aa6f0212
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/pika/Makefile
@@ -0,0 +1,27 @@
+UPSTREAM_GIT=https://github.com/pika/pika.git
+REVISION=1.1.0
+
+LIB_DIR=pika
+CHECKOUT_DIR=pika-$(REVISION)
+
+TARGETS=$(LIB_DIR)
+
+all: $(TARGETS)
+
+clean:
+ rm -rf $(LIB_DIR)
+
+distclean: clean
+ rm -rf $(CHECKOUT_DIR)
+
+$(LIB_DIR) : $(CHECKOUT_DIR)
+ rm -rf $@
+ cp -R $< $@
+
+$(CHECKOUT_DIR):
+ git clone --depth 1 --branch $(REVISION) $(UPSTREAM_GIT) $@ || \
+ (rm -rf $@; exit 1)
+
+echo-revision:
+ @echo $(REVISION)
+
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile
new file mode 100644
index 0000000000..40f5bd1db7
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/deps/stomppy/Makefile
@@ -0,0 +1,27 @@
+UPSTREAM_GIT=https://github.com/jasonrbriggs/stomp.py.git
+REVISION=v4.0.16
+
+LIB_DIR=stomppy
+CHECKOUT_DIR=stomppy-git
+
+TARGETS=$(LIB_DIR)
+
+all: $(TARGETS)
+
+clean:
+ rm -rf $(LIB_DIR)
+
+distclean: clean
+ rm -rf $(CHECKOUT_DIR)
+
+$(LIB_DIR) : $(CHECKOUT_DIR)
+ rm -rf $@
+ cp -R $< $@
+
+$(CHECKOUT_DIR):
+ git clone $(UPSTREAM_GIT) $@
+ (cd $@ && git checkout $(REVISION)) || rm -rf $@
+
+echo-revision:
+ @echo $(REVISION)
+
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py
new file mode 100644
index 0000000000..9103bc76ea
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ack.py
@@ -0,0 +1,252 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import time
+import os
+
+class TestAck(base.BaseTest):
+
+ def test_ack_client(self):
+ destination = "/queue/ack-test"
+
+ # subscribe and send message
+ self.listener.reset(2) ## expecting 2 messages
+ self.subscribe_dest(self.conn, destination, None,
+ ack='client',
+ headers={'prefetch-count': '10'})
+ self.conn.send(destination, "test1")
+ self.conn.send(destination, "test2")
+ self.assertTrue(self.listener.wait(4), "initial message not received")
+ self.assertEquals(2, len(self.listener.messages))
+
+ # disconnect with no ack
+ self.conn.disconnect()
+
+ # now reconnect
+ conn2 = self.create_connection()
+ try:
+ listener2 = base.WaitableListener()
+ listener2.reset(2)
+ conn2.set_listener('', listener2)
+ self.subscribe_dest(conn2, destination, None,
+ ack='client',
+ headers={'prefetch-count': '10'})
+ self.assertTrue(listener2.wait(), "message not received again")
+ self.assertEquals(2, len(listener2.messages))
+
+ # now ack only the last message - expecting cumulative behaviour
+ mid = listener2.messages[1]['headers'][self.ack_id_source_header]
+ self.ack_message(conn2, mid, None)
+ finally:
+ conn2.disconnect()
+
+ # now reconnect again, shouldn't see the message
+ conn3 = self.create_connection()
+ try:
+ listener3 = base.WaitableListener()
+ conn3.set_listener('', listener3)
+ self.subscribe_dest(conn3, destination, None)
+ self.assertFalse(listener3.wait(3),
+ "unexpected message. ACK not working?")
+ finally:
+ conn3.disconnect()
+
+ def test_ack_client_individual(self):
+ destination = "/queue/ack-test-individual"
+
+ # subscribe and send message
+ self.listener.reset(2) ## expecting 2 messages
+ self.subscribe_dest(self.conn, destination, None,
+ ack='client-individual',
+ headers={'prefetch-count': '10'})
+ self.conn.send(destination, "test1")
+ self.conn.send(destination, "test2")
+ self.assertTrue(self.listener.wait(4), "Both initial messages not received")
+ self.assertEquals(2, len(self.listener.messages))
+
+ # disconnect without acks
+ self.conn.disconnect()
+
+ # now reconnect
+ conn2 = self.create_connection()
+ try:
+ listener2 = base.WaitableListener()
+ listener2.reset(2) ## expect 2 messages
+ conn2.set_listener('', listener2)
+ self.subscribe_dest(conn2, destination, None,
+ ack='client-individual',
+ headers={'prefetch-count': '10'})
+ self.assertTrue(listener2.wait(2.5), "Did not receive 2 messages")
+ self.assertEquals(2, len(listener2.messages), "Not exactly 2 messages received")
+
+ # now ack only the 'test2' message - expecting individual behaviour
+ nummsgs = len(listener2.messages)
+ mid = None
+ for ind in range(nummsgs):
+ if listener2.messages[ind]['message']=="test2":
+ mid = listener2.messages[ind]['headers'][self.ack_id_source_header]
+ self.assertEquals(1, ind, 'Expecting test2 to be second message')
+ break
+ self.assertTrue(mid, "Did not find test2 message id.")
+ self.ack_message(conn2, mid, None)
+ finally:
+ conn2.disconnect()
+
+ # now reconnect again, shouldn't see the message
+ conn3 = self.create_connection()
+ try:
+ listener3 = base.WaitableListener()
+ listener3.reset(2) ## expecting a single message, but wait for two
+ conn3.set_listener('', listener3)
+ self.subscribe_dest(conn3, destination, None)
+ self.assertFalse(listener3.wait(2.5),
+ "Expected to see only one message. ACK not working?")
+ self.assertEquals(1, len(listener3.messages), "Expecting exactly one message")
+ self.assertEquals("test1", listener3.messages[0]['message'], "Unexpected message remains")
+ finally:
+ conn3.disconnect()
+
+ def test_ack_client_tx(self):
+ destination = "/queue/ack-test-tx"
+
+ # subscribe and send message
+ self.listener.reset()
+ self.subscribe_dest(self.conn, destination, None, ack='client')
+ self.conn.send(destination, "test")
+ self.assertTrue(self.listener.wait(3), "initial message not received")
+ self.assertEquals(1, len(self.listener.messages))
+
+ # disconnect with no ack
+ self.conn.disconnect()
+
+ # now reconnect
+ conn2 = self.create_connection()
+ try:
+ tx = "abc"
+ listener2 = base.WaitableListener()
+ conn2.set_listener('', listener2)
+ conn2.begin(transaction=tx)
+ self.subscribe_dest(conn2, destination, None, ack='client')
+ self.assertTrue(listener2.wait(), "message not received again")
+ self.assertEquals(1, len(listener2.messages))
+
+ # now ack
+ mid = listener2.messages[0]['headers'][self.ack_id_source_header]
+ self.ack_message(conn2, mid, None, transaction=tx)
+
+ #now commit
+ conn2.commit(transaction=tx)
+ finally:
+ conn2.disconnect()
+
+ # now reconnect again, shouldn't see the message
+ conn3 = self.create_connection()
+ try:
+ listener3 = base.WaitableListener()
+ conn3.set_listener('', listener3)
+ self.subscribe_dest(conn3, destination, None)
+ self.assertFalse(listener3.wait(3),
+ "unexpected message. TX ACK not working?")
+ finally:
+ conn3.disconnect()
+
+ def test_topic_prefetch(self):
+ destination = "/topic/prefetch-test"
+
+ # subscribe and send message
+ self.listener.reset(6) ## expect 6 messages
+ self.subscribe_dest(self.conn, destination, None,
+ ack='client',
+ headers={'prefetch-count': '5'})
+
+ for x in range(10):
+ self.conn.send(destination, "test" + str(x))
+
+ self.assertFalse(self.listener.wait(3),
+ "Should not have been able to see 6 messages")
+ self.assertEquals(5, len(self.listener.messages))
+
+ def test_nack(self):
+ destination = "/queue/nack-test"
+
+ #subscribe and send
+ self.subscribe_dest(self.conn, destination, None,
+ ack='client-individual')
+ self.conn.send(destination, "nack-test")
+
+ self.assertTrue(self.listener.wait(), "Not received message")
+ message_id = self.listener.messages[0]['headers'][self.ack_id_source_header]
+ self.listener.reset()
+
+ self.nack_message(self.conn, message_id, None)
+ self.assertTrue(self.listener.wait(), "Not received message after NACK")
+ message_id = self.listener.messages[0]['headers'][self.ack_id_source_header]
+ self.ack_message(self.conn, message_id, None)
+
+ def test_nack_multi(self):
+ destination = "/queue/nack-multi"
+
+ self.listener.reset(2)
+
+ #subscribe and send
+ self.subscribe_dest(self.conn, destination, None,
+ ack='client',
+ headers = {'prefetch-count' : '10'})
+ self.conn.send(destination, "nack-test1")
+ self.conn.send(destination, "nack-test2")
+
+ self.assertTrue(self.listener.wait(), "Not received messages")
+ message_id = self.listener.messages[1]['headers'][self.ack_id_source_header]
+ self.listener.reset(2)
+
+ self.nack_message(self.conn, message_id, None)
+ self.assertTrue(self.listener.wait(), "Not received message again")
+ message_id = self.listener.messages[1]['headers'][self.ack_id_source_header]
+ self.ack_message(self.conn, message_id, None)
+
+ def test_nack_without_requeueing(self):
+ destination = "/queue/nack-test-no-requeue"
+
+ self.subscribe_dest(self.conn, destination, None,
+ ack='client-individual')
+ self.conn.send(destination, "nack-test")
+
+ self.assertTrue(self.listener.wait(), "Not received message")
+ message_id = self.listener.messages[0]['headers'][self.ack_id_source_header]
+ self.listener.reset()
+
+ self.conn.send_frame("NACK", {self.ack_id_header: message_id, "requeue": False})
+ self.assertFalse(self.listener.wait(4), "Received message after NACK with requeue = False")
+
+class TestAck11(TestAck):
+
+ def create_connection_obj(self, version='1.1', vhost='/', heartbeats=(0, 0)):
+ conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
+ vhost=vhost,
+ heartbeats=heartbeats)
+ self.ack_id_source_header = 'message-id'
+ self.ack_id_header = 'message-id'
+ return conn
+
+ def test_version(self):
+ self.assertEquals('1.1', self.conn.version)
+
+class TestAck12(TestAck):
+
+ def create_connection_obj(self, version='1.2', vhost='/', heartbeats=(0, 0)):
+ conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
+ vhost=vhost,
+ heartbeats=heartbeats)
+ self.ack_id_source_header = 'ack'
+ self.ack_id_header = 'id'
+ return conn
+
+ def test_version(self):
+ self.assertEquals('1.2', self.conn.version)
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py
new file mode 100644
index 0000000000..2c5ee45a8e
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/amqp_headers.py
@@ -0,0 +1,42 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import pika
+import base
+import os
+
+class TestAmqpHeaders(base.BaseTest):
+ def test_headers_to_stomp(self):
+ self.listener.reset(1)
+ queueName='test-amqp-headers-to-stomp'
+
+ # Set up STOMP subscription
+ self.subscribe_dest(self.conn, '/topic/test', None, headers={'x-queue-name': queueName})
+
+ # Set up AMQP connection
+ amqp_params = pika.ConnectionParameters(host='localhost', port=int(os.environ["AMQP_PORT"]))
+ amqp_conn = pika.BlockingConnection(amqp_params)
+ amqp_chan = amqp_conn.channel()
+
+ # publish a message with headers to the named AMQP queue
+ amqp_headers = { 'x-custom-hdr-1': 'value1',
+ 'x-custom-hdr-2': 'value2',
+ 'custom-hdr-3': 'value3' }
+ amqp_props = pika.BasicProperties(headers=amqp_headers)
+ amqp_chan.basic_publish(exchange='', routing_key=queueName, body='Hello World!', properties=amqp_props)
+
+ # check if we receive the message from the STOMP subscription
+ self.assertTrue(self.listener.wait(2), "initial message not received")
+ self.assertEquals(1, len(self.listener.messages))
+ msg = self.listener.messages[0]
+ self.assertEquals('Hello World!', msg['message'])
+ self.assertEquals('value1', msg['headers']['x-custom-hdr-1'])
+ self.assertEquals('value2', msg['headers']['x-custom-hdr-2'])
+ self.assertEquals('value3', msg['headers']['custom-hdr-3'])
+
+ self.conn.disconnect()
+ amqp_conn.close()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py
new file mode 100644
index 0000000000..a8f7ef59b9
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py
@@ -0,0 +1,259 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import sys
+import threading
+import os
+
+
+class BaseTest(unittest.TestCase):
+
+ def create_connection_obj(self, version='1.0', vhost='/', heartbeats=(0, 0)):
+ if version == '1.0':
+ conn = stomp.StompConnection10(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
+ self.ack_id_source_header = 'message-id'
+ self.ack_id_header = 'message-id'
+ elif version == '1.1':
+ conn = stomp.StompConnection11(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
+ vhost=vhost,
+ heartbeats=heartbeats)
+ self.ack_id_source_header = 'message-id'
+ self.ack_id_header = 'message-id'
+ elif version == '1.2':
+ conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
+ vhost=vhost,
+ heartbeats=heartbeats)
+ self.ack_id_source_header = 'ack'
+ self.ack_id_header = 'id'
+ else:
+ conn = stomp.StompConnection12(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))],
+ vhost=vhost,
+ heartbeats=heartbeats)
+ conn.version = version
+ return conn
+
+ def create_connection(self, user='guest', passcode='guest', wait=True, **kwargs):
+ conn = self.create_connection_obj(**kwargs)
+ conn.start()
+ conn.connect(user, passcode, wait=wait)
+ return conn
+
+ def subscribe_dest(self, conn, destination, sub_id, **kwargs):
+ if type(conn) is stomp.StompConnection10:
+ # 'id' is optional in STOMP 1.0.
+ if sub_id != None:
+ kwargs['id'] = sub_id
+ conn.subscribe(destination, **kwargs)
+ else:
+ # 'id' is required in STOMP 1.1+.
+ if sub_id == None:
+ sub_id = 'ctag'
+ conn.subscribe(destination, sub_id, **kwargs)
+
+ def unsubscribe_dest(self, conn, destination, sub_id, **kwargs):
+ if type(conn) is stomp.StompConnection10:
+ # 'id' is optional in STOMP 1.0.
+ if sub_id != None:
+ conn.unsubscribe(id=sub_id, **kwargs)
+ else:
+ conn.unsubscribe(destination=destination, **kwargs)
+ else:
+ # 'id' is required in STOMP 1.1+.
+ if sub_id == None:
+ sub_id = 'ctag'
+ conn.unsubscribe(sub_id, **kwargs)
+
+ def ack_message(self, conn, msg_id, sub_id, **kwargs):
+ if type(conn) is stomp.StompConnection10:
+ conn.ack(msg_id, **kwargs)
+ elif type(conn) is stomp.StompConnection11:
+ if sub_id == None:
+ sub_id = 'ctag'
+ conn.ack(msg_id, sub_id, **kwargs)
+ elif type(conn) is stomp.StompConnection12:
+ conn.ack(msg_id, **kwargs)
+
+ def nack_message(self, conn, msg_id, sub_id, **kwargs):
+ if type(conn) is stomp.StompConnection10:
+ # Normally unsupported by STOMP 1.0.
+ conn.send_frame("NACK", {"message-id": msg_id})
+ elif type(conn) is stomp.StompConnection11:
+ if sub_id == None:
+ sub_id = 'ctag'
+ conn.nack(msg_id, sub_id, **kwargs)
+ elif type(conn) is stomp.StompConnection12:
+ conn.nack(msg_id, **kwargs)
+
+ def create_subscriber_connection(self, dest):
+ conn = self.create_connection()
+ listener = WaitableListener()
+ conn.set_listener('', listener)
+ self.subscribe_dest(conn, dest, None, receipt="sub.receipt")
+ listener.wait()
+ self.assertEquals(1, len(listener.receipts))
+ listener.reset()
+ return conn, listener
+
+ def setUp(self):
+ # Note: useful for debugging
+ # import stomp.listener
+ self.conn = self.create_connection()
+ self.listener = WaitableListener()
+ self.conn.set_listener('waitable', self.listener)
+ # Note: useful for debugging
+ # self.printing_listener = stomp.listener.PrintingListener()
+ # self.conn.set_listener('printing', self.printing_listener)
+
+ def tearDown(self):
+ if self.conn.is_connected():
+ self.conn.disconnect()
+ self.conn.stop()
+
+ def simple_test_send_rec(self, dest, headers={}):
+ self.listener.reset()
+
+ self.subscribe_dest(self.conn, dest, None)
+ self.conn.send(dest, "foo", headers=headers)
+
+ self.assertTrue(self.listener.wait(), "Timeout, no message received")
+
+ # assert no errors
+ if len(self.listener.errors) > 0:
+ self.fail(self.listener.errors[0]['message'])
+
+ # check header content
+ msg = self.listener.messages[0]
+ self.assertEquals("foo", msg['message'])
+ self.assertEquals(dest, msg['headers']['destination'])
+ return msg['headers']
+
+ def assertListener(self, errMsg, numMsgs=0, numErrs=0, numRcts=0, timeout=10):
+ if numMsgs + numErrs + numRcts > 0:
+ self._assertTrue(self.listener.wait(timeout), errMsg + " (#awaiting)")
+ else:
+ self._assertFalse(self.listener.wait(timeout), errMsg + " (#awaiting)")
+ self._assertEquals(numMsgs, len(self.listener.messages), errMsg + " (#messages)")
+ self._assertEquals(numErrs, len(self.listener.errors), errMsg + " (#errors)")
+ self._assertEquals(numRcts, len(self.listener.receipts), errMsg + " (#receipts)")
+
+ def _assertTrue(self, bool, msg):
+ if not bool:
+ self.listener.print_state(msg, True)
+ self.assertTrue(bool, msg)
+
+ def _assertFalse(self, bool, msg):
+ if bool:
+ self.listener.print_state(msg, True)
+ self.assertFalse(bool, msg)
+
+ def _assertEquals(self, expected, actual, msg):
+ if expected != actual:
+ self.listener.print_state(msg, True)
+ self.assertEquals(expected, actual, msg)
+
+ def assertListenerAfter(self, verb, errMsg="", numMsgs=0, numErrs=0, numRcts=0, timeout=5):
+ num = numMsgs + numErrs + numRcts
+ self.listener.reset(num if num>0 else 1)
+ verb()
+ self.assertListener(errMsg=errMsg, numMsgs=numMsgs, numErrs=numErrs, numRcts=numRcts, timeout=timeout)
+
+class WaitableListener(object):
+
+ def __init__(self):
+ self.debug = False
+ if self.debug:
+ print('(listener) init')
+ self.messages = []
+ self.errors = []
+ self.receipts = []
+ self.latch = Latch(1)
+ self.msg_no = 0
+
+ def _next_msg_no(self):
+ self.msg_no += 1
+ return self.msg_no
+
+ def _append(self, array, msg, hdrs):
+ mno = self._next_msg_no()
+ array.append({'message' : msg, 'headers' : hdrs, 'msg_no' : mno})
+ self.latch.countdown()
+
+ def on_receipt(self, headers, message):
+ if self.debug:
+ print('(on_receipt) message: {}, headers: {}'.format(message, headers))
+ self._append(self.receipts, message, headers)
+
+ def on_error(self, headers, message):
+ if self.debug:
+ print('(on_error) message: {}, headers: {}'.format(message, headers))
+ self._append(self.errors, message, headers)
+
+ def on_message(self, headers, message):
+ if self.debug:
+ print('(on_message) message: {}, headers: {}'.format(message, headers))
+ self._append(self.messages, message, headers)
+
+ def reset(self, count=1):
+ if self.debug:
+ self.print_state('(reset listener--old state)')
+ self.messages = []
+ self.errors = []
+ self.receipts = []
+ self.latch = Latch(count)
+ self.msg_no = 0
+ if self.debug:
+ self.print_state('(reset listener--new state)')
+
+ def wait(self, timeout=10):
+ return self.latch.wait(timeout)
+
+ def print_state(self, hdr="", full=False):
+ print(hdr)
+ print('#messages: {}'.format(len(self.messages)))
+ print('#errors: {}', len(self.errors))
+ print('#receipts: {}'.format(len(self.receipts)))
+ print('Remaining count: {}'.format(self.latch.get_count()))
+ if full:
+ if len(self.messages) != 0: print('Messages: {}'.format(self.messages))
+ if len(self.errors) != 0: print('Messages: {}'.format(self.errors))
+ if len(self.receipts) != 0: print('Messages: {}'.format(self.receipts))
+
+class Latch(object):
+
+ def __init__(self, count=1):
+ self.cond = threading.Condition()
+ self.cond.acquire()
+ self.count = count
+ self.cond.release()
+
+ def countdown(self):
+ self.cond.acquire()
+ if self.count > 0:
+ self.count -= 1
+ if self.count == 0:
+ self.cond.notify_all()
+ self.cond.release()
+
+ def wait(self, timeout=None):
+ try:
+ self.cond.acquire()
+ if self.count == 0:
+ return True
+ else:
+ self.cond.wait(timeout)
+ return self.count == 0
+ finally:
+ self.cond.release()
+
+ def get_count(self):
+ try:
+ self.cond.acquire()
+ return self.count
+ finally:
+ self.cond.release()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py
new file mode 100644
index 0000000000..f71c4acf70
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/connect_options.py
@@ -0,0 +1,51 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import test_util
+import os
+
+class TestConnectOptions(base.BaseTest):
+
+ def test_implicit_connect(self):
+ ''' Implicit connect with receipt on first command '''
+ self.conn.disconnect()
+ test_util.enable_implicit_connect()
+ listener = base.WaitableListener()
+ new_conn = stomp.Connection(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
+ new_conn.set_listener('', listener)
+
+ new_conn.start() # not going to issue connect
+ self.subscribe_dest(new_conn, "/topic/implicit", 'sub_implicit',
+ receipt='implicit')
+
+ try:
+ self.assertTrue(listener.wait(5))
+ self.assertEquals(1, len(listener.receipts),
+ 'Missing receipt. Likely not connected')
+ self.assertEquals('implicit', listener.receipts[0]['headers']['receipt-id'])
+ finally:
+ new_conn.disconnect()
+ test_util.disable_implicit_connect()
+
+ def test_default_user(self):
+ ''' Default user connection '''
+ self.conn.disconnect()
+ test_util.enable_default_user()
+ listener = base.WaitableListener()
+ new_conn = stomp.Connection(host_and_ports=[('localhost', int(os.environ["STOMP_PORT"]))])
+ new_conn.set_listener('', listener)
+ new_conn.start()
+ new_conn.connect()
+ try:
+ self.assertFalse(listener.wait(3)) # no error back
+ self.assertTrue(new_conn.is_connected())
+ finally:
+ new_conn.disconnect()
+ test_util.disable_default_user()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py
new file mode 100644
index 0000000000..76e5402686
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/destinations.py
@@ -0,0 +1,536 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import time
+
+class TestExchange(base.BaseTest):
+
+
+ def test_amq_direct(self):
+ ''' Test basic send/receive for /exchange/amq.direct '''
+ self.__test_exchange_send_rec("amq.direct", "route")
+
+ def test_amq_topic(self):
+ ''' Test basic send/receive for /exchange/amq.topic '''
+ self.__test_exchange_send_rec("amq.topic", "route")
+
+ def test_amq_fanout(self):
+ ''' Test basic send/receive for /exchange/amq.fanout '''
+ self.__test_exchange_send_rec("amq.fanout", "route")
+
+ def test_amq_fanout_no_route(self):
+ ''' Test basic send/receive, /exchange/amq.direct, no routing key'''
+ self.__test_exchange_send_rec("amq.fanout")
+
+ def test_invalid_exchange(self):
+ ''' Test invalid exchange error '''
+ self.listener.reset(1)
+ self.subscribe_dest(self.conn, "/exchange/does.not.exist", None,
+ ack="auto")
+ self.assertListener("Expecting an error", numErrs=1)
+ err = self.listener.errors[0]
+ self.assertEquals("not_found", err['headers']['message'])
+ self.assertEquals(
+ "NOT_FOUND - no exchange 'does.not.exist' in vhost '/'\n",
+ err['message'])
+ time.sleep(1)
+ self.assertFalse(self.conn.is_connected())
+
+ def __test_exchange_send_rec(self, exchange, route = None):
+ if exchange != "amq.topic":
+ dest = "/exchange/" + exchange
+ else:
+ dest = "/topic"
+ if route != None:
+ dest += "/" + route
+
+ self.simple_test_send_rec(dest)
+
+class TestQueue(base.BaseTest):
+
+ def test_send_receive(self):
+ ''' Test basic send/receive for /queue '''
+ destination = '/queue/test'
+ self.simple_test_send_rec(destination)
+
+ def test_send_receive_in_other_conn(self):
+ ''' Test send in one connection, receive in another '''
+ destination = '/queue/test2'
+
+ # send
+ self.conn.send(destination, "hello")
+
+ # now receive
+ conn2 = self.create_connection()
+ try:
+ listener2 = base.WaitableListener()
+ conn2.set_listener('', listener2)
+
+ self.subscribe_dest(conn2, destination, None, ack="auto")
+ self.assertTrue(listener2.wait(10), "no receive")
+ finally:
+ conn2.disconnect()
+
+ def test_send_receive_in_other_conn_with_disconnect(self):
+ ''' Test send, disconnect, receive '''
+ destination = '/queue/test3'
+
+ # send
+ self.conn.send(destination, "hello thar", receipt="foo")
+ self.listener.wait(3)
+ self.conn.disconnect()
+
+ # now receive
+ conn2 = self.create_connection()
+ try:
+ listener2 = base.WaitableListener()
+ conn2.set_listener('', listener2)
+
+ self.subscribe_dest(conn2, destination, None, ack="auto")
+ self.assertTrue(listener2.wait(10), "no receive")
+ finally:
+ conn2.disconnect()
+
+
+ def test_multi_subscribers(self):
+ ''' Test multiple subscribers against a single /queue destination '''
+ destination = '/queue/test-multi'
+
+ ## set up two subscribers
+ conn1, listener1 = self.create_subscriber_connection(destination)
+ conn2, listener2 = self.create_subscriber_connection(destination)
+
+ try:
+ ## now send
+ self.conn.send(destination, "test1")
+ self.conn.send(destination, "test2")
+
+ ## expect both consumers to get a message?
+ self.assertTrue(listener1.wait(2))
+ self.assertEquals(1, len(listener1.messages),
+ "unexpected message count")
+ self.assertTrue(listener2.wait(2))
+ self.assertEquals(1, len(listener2.messages),
+ "unexpected message count")
+ finally:
+ conn1.disconnect()
+ conn2.disconnect()
+
+ def test_send_with_receipt(self):
+ destination = '/queue/test-receipt'
+ def noop(): pass
+ self.__test_send_receipt(destination, noop, noop)
+
+ def test_send_with_receipt_tx(self):
+ destination = '/queue/test-receipt-tx'
+ tx = 'receipt.tx'
+
+ def before():
+ self.conn.begin(transaction=tx)
+
+ def after():
+ self.assertFalse(self.listener.wait(1))
+ self.conn.commit(transaction=tx)
+
+ self.__test_send_receipt(destination, before, after, {'transaction': tx})
+
+ def test_interleaved_receipt_no_receipt(self):
+ ''' Test i-leaved receipt/no receipt, no-r bracketed by rs '''
+
+ destination = '/queue/ir'
+
+ self.listener.reset(5)
+
+ self.subscribe_dest(self.conn, destination, None, ack="auto")
+ self.conn.send(destination, 'first', receipt='a')
+ self.conn.send(destination, 'second')
+ self.conn.send(destination, 'third', receipt='b')
+
+ self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=3)
+
+ self.assertEquals(set(['a','b']), self.__gather_receipts())
+
+ def test_interleaved_receipt_no_receipt_tx(self):
+ ''' Test i-leaved receipt/no receipt, no-r bracketed by r+xactions '''
+
+ destination = '/queue/ir'
+ tx = 'tx.ir'
+
+ # three messages and two receipts
+ self.listener.reset(5)
+
+ self.subscribe_dest(self.conn, destination, None, ack="auto")
+ self.conn.begin(transaction=tx)
+
+ self.conn.send(destination, 'first', receipt='a', transaction=tx)
+ self.conn.send(destination, 'second', transaction=tx)
+ self.conn.send(destination, 'third', receipt='b', transaction=tx)
+ self.conn.commit(transaction=tx)
+
+ self.assertListener("Missing messages/receipts", numMsgs=3, numRcts=2, timeout=40)
+
+ expected = set(['a', 'b'])
+ missing = expected.difference(self.__gather_receipts())
+
+ self.assertEquals(set(), missing, "Missing receipts: " + str(missing))
+
+ def test_interleaved_receipt_no_receipt_inverse(self):
+ ''' Test i-leaved receipt/no receipt, r bracketed by no-rs '''
+
+ destination = '/queue/ir'
+
+ self.listener.reset(4)
+
+ self.subscribe_dest(self.conn, destination, None, ack="auto")
+ self.conn.send(destination, 'first')
+ self.conn.send(destination, 'second', receipt='a')
+ self.conn.send(destination, 'third')
+
+ self.assertListener("Missing messages/receipt", numMsgs=3, numRcts=1, timeout=3)
+
+ self.assertEquals(set(['a']), self.__gather_receipts())
+
+ def __test_send_receipt(self, destination, before, after, headers = {}):
+ count = 50
+ self.listener.reset(count)
+
+ before()
+ expected_receipts = set()
+
+ for x in range(0, count):
+ receipt = "test" + str(x)
+ expected_receipts.add(receipt)
+ self.conn.send(destination, "test receipt",
+ receipt=receipt, headers=headers)
+ after()
+
+ self.assertTrue(self.listener.wait(5))
+
+ missing_receipts = expected_receipts.difference(
+ self.__gather_receipts())
+
+ self.assertEquals(set(), missing_receipts,
+ "missing receipts: " + str(missing_receipts))
+
+ def __gather_receipts(self):
+ result = set()
+ for r in self.listener.receipts:
+ result.add(r['headers']['receipt-id'])
+ return result
+
+class TestTopic(base.BaseTest):
+
+ def test_send_receive(self):
+ ''' Test basic send/receive for /topic '''
+ destination = '/topic/test'
+ self.simple_test_send_rec(destination)
+
+ def test_send_multiple(self):
+ ''' Test /topic with multiple consumers '''
+ destination = '/topic/multiple'
+
+ ## set up two subscribers
+ conn1, listener1 = self.create_subscriber_connection(destination)
+ conn2, listener2 = self.create_subscriber_connection(destination)
+
+ try:
+ ## listeners are expecting 2 messages
+ listener1.reset(2)
+ listener2.reset(2)
+
+ ## now send
+ self.conn.send(destination, "test1")
+ self.conn.send(destination, "test2")
+
+ ## expect both consumers to get both messages
+ self.assertTrue(listener1.wait(5))
+ self.assertEquals(2, len(listener1.messages),
+ "unexpected message count")
+ self.assertTrue(listener2.wait(5))
+ self.assertEquals(2, len(listener2.messages),
+ "unexpected message count")
+ finally:
+ conn1.disconnect()
+ conn2.disconnect()
+
+ def test_send_multiple_with_a_large_message(self):
+ ''' Test /topic with multiple consumers '''
+ destination = '/topic/16mb'
+ # payload size
+ s = 1024 * 1024 * 16
+ message = 'x' * s
+
+ conn1, listener1 = self.create_subscriber_connection(destination)
+ conn2, listener2 = self.create_subscriber_connection(destination)
+
+ try:
+ listener1.reset(2)
+ listener2.reset(2)
+
+ self.conn.send(destination, message)
+ self.conn.send(destination, message)
+
+ self.assertTrue(listener1.wait(10))
+ self.assertEquals(2, len(listener1.messages),
+ "unexpected message count")
+ self.assertTrue(len(listener2.messages[0]['message']) == s,
+ "unexpected message size")
+
+ self.assertTrue(listener2.wait(10))
+ self.assertEquals(2, len(listener2.messages),
+ "unexpected message count")
+ finally:
+ conn1.disconnect()
+ conn2.disconnect()
+
+class TestReplyQueue(base.BaseTest):
+
+ def test_reply_queue(self):
+ ''' Test with two separate clients. Client 1 sends
+ message to a known destination with a defined reply
+ queue. Client 2 receives on known destination and replies
+ on the reply destination. Client 1 gets the reply message'''
+
+ known = '/queue/known'
+ reply = '/temp-queue/0'
+
+ ## Client 1 uses pre-supplied connection and listener
+ ## Set up client 2
+ conn2, listener2 = self.create_subscriber_connection(known)
+
+ try:
+ self.conn.send(known, "test",
+ headers = {"reply-to": reply})
+
+ self.assertTrue(listener2.wait(5))
+ self.assertEquals(1, len(listener2.messages))
+
+ reply_to = listener2.messages[0]['headers']['reply-to']
+ self.assertTrue(reply_to.startswith('/reply-queue/'))
+
+ conn2.send(reply_to, "reply")
+ self.assertTrue(self.listener.wait(5))
+ self.assertEquals("reply", self.listener.messages[0]['message'])
+ finally:
+ conn2.disconnect()
+
+ def test_reuse_reply_queue(self):
+ ''' Test re-use of reply-to queue '''
+
+ known2 = '/queue/known2'
+ known3 = '/queue/known3'
+ reply = '/temp-queue/foo'
+
+ def respond(cntn, listna):
+ self.assertTrue(listna.wait(5))
+ self.assertEquals(1, len(listna.messages))
+ reply_to = listna.messages[0]['headers']['reply-to']
+ self.assertTrue(reply_to.startswith('/reply-queue/'))
+ cntn.send(reply_to, "reply")
+
+ ## Client 1 uses pre-supplied connection and listener
+ ## Set up clients 2 and 3
+ conn2, listener2 = self.create_subscriber_connection(known2)
+ conn3, listener3 = self.create_subscriber_connection(known3)
+ try:
+ self.listener.reset(2)
+ self.conn.send(known2, "test2",
+ headers = {"reply-to": reply})
+ self.conn.send(known3, "test3",
+ headers = {"reply-to": reply})
+ respond(conn2, listener2)
+ respond(conn3, listener3)
+
+ self.assertTrue(self.listener.wait(5))
+ self.assertEquals(2, len(self.listener.messages))
+ self.assertEquals("reply", self.listener.messages[0]['message'])
+ self.assertEquals("reply", self.listener.messages[1]['message'])
+ finally:
+ conn2.disconnect()
+ conn3.disconnect()
+
+ def test_perm_reply_queue(self):
+ '''As test_reply_queue, but with a non-temp reply queue'''
+
+ known = '/queue/known'
+ reply = '/queue/reply'
+
+ ## Client 1 uses pre-supplied connection and listener
+ ## Set up client 2
+ conn1, listener1 = self.create_subscriber_connection(reply)
+ conn2, listener2 = self.create_subscriber_connection(known)
+
+ try:
+ conn1.send(known, "test",
+ headers = {"reply-to": reply})
+
+ self.assertTrue(listener2.wait(5))
+ self.assertEquals(1, len(listener2.messages))
+
+ reply_to = listener2.messages[0]['headers']['reply-to']
+ self.assertTrue(reply_to == reply)
+
+ conn2.send(reply_to, "reply")
+ self.assertTrue(listener1.wait(5))
+ self.assertEquals("reply", listener1.messages[0]['message'])
+ finally:
+ conn1.disconnect()
+ conn2.disconnect()
+
+class TestDurableSubscription(base.BaseTest):
+
+ ID = 'test.subscription'
+
+ def __subscribe(self, dest, conn=None, id=None):
+ if not conn:
+ conn = self.conn
+ if not id:
+ id = TestDurableSubscription.ID
+
+ self.subscribe_dest(conn, dest, id, ack="auto",
+ headers = {'durable': 'true',
+ 'receipt': 1,
+ 'auto-delete': False})
+
+ def __assert_receipt(self, listener=None, pos=None):
+ if not listener:
+ listener = self.listener
+
+ self.assertTrue(listener.wait(5))
+ self.assertEquals(1, len(self.listener.receipts))
+ if pos is not None:
+ self.assertEquals(pos, self.listener.receipts[0]['msg_no'])
+
+ def __assert_message(self, msg, listener=None, pos=None):
+ if not listener:
+ listener = self.listener
+
+ self.assertTrue(listener.wait(5))
+ self.assertEquals(1, len(listener.messages))
+ self.assertEquals(msg, listener.messages[0]['message'])
+ if pos is not None:
+ self.assertEquals(pos, self.listener.messages[0]['msg_no'])
+
+ def do_test_durable_subscription(self, durability_header):
+ destination = '/topic/durable'
+
+ self.__subscribe(destination)
+ self.__assert_receipt()
+
+ # send first message without unsubscribing
+ self.listener.reset(1)
+ self.conn.send(destination, "first")
+ self.__assert_message("first")
+
+ # now unsubscribe (disconnect only)
+ self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID)
+
+ # send again
+ self.listener.reset(2)
+ self.conn.send(destination, "second")
+
+ # resubscribe and expect receipt
+ self.__subscribe(destination)
+ self.__assert_receipt(pos=1)
+ # and message
+ self.__assert_message("second", pos=2)
+
+ # now unsubscribe (cancel)
+ self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID,
+ headers={durability_header: 'true'})
+
+ # send again
+ self.listener.reset(1)
+ self.conn.send(destination, "third")
+
+ # resubscribe and expect no message
+ self.__subscribe(destination)
+ self.assertTrue(self.listener.wait(3))
+ self.assertEquals(0, len(self.listener.messages))
+ self.assertEquals(1, len(self.listener.receipts))
+
+ def test_durable_subscription(self):
+ self.do_test_durable_subscription('durable')
+
+ def test_durable_subscription_and_legacy_header(self):
+ self.do_test_durable_subscription('persistent')
+
+ def test_share_subscription(self):
+ destination = '/topic/durable-shared'
+
+ conn2 = self.create_connection()
+ conn2.set_listener('', self.listener)
+
+ try:
+ self.__subscribe(destination)
+ self.__assert_receipt()
+ self.listener.reset(1)
+ self.__subscribe(destination, conn2)
+ self.__assert_receipt()
+
+ self.listener.reset(100)
+
+ # send 100 messages
+ for x in range(0, 100):
+ self.conn.send(destination, "msg" + str(x))
+
+ self.assertTrue(self.listener.wait(5))
+ self.assertEquals(100, len(self.listener.messages))
+ finally:
+ conn2.disconnect()
+
+ def test_separate_ids(self):
+ destination = '/topic/durable-separate'
+
+ conn2 = self.create_connection()
+ listener2 = base.WaitableListener()
+ conn2.set_listener('', listener2)
+
+ try:
+ # ensure durable subscription exists for each ID
+ self.__subscribe(destination)
+ self.__assert_receipt()
+ self.__subscribe(destination, conn2, "other.id")
+ self.__assert_receipt(listener2)
+ self.unsubscribe_dest(self.conn, destination, TestDurableSubscription.ID)
+ self.unsubscribe_dest(conn2, destination, "other.id")
+
+ self.listener.reset(101)
+ listener2.reset(101) ## 100 messages and 1 receipt
+
+ # send 100 messages
+ for x in range(0, 100):
+ self.conn.send(destination, "msg" + str(x))
+
+ self.__subscribe(destination)
+ self.__subscribe(destination, conn2, "other.id")
+
+ for l in [self.listener, listener2]:
+ self.assertTrue(l.wait(20))
+ self.assertTrue(len(l.messages) >= 90)
+ self.assertTrue(len(l.messages) <= 100)
+
+ finally:
+ conn2.disconnect()
+
+ def do_test_durable_subscribe_no_id_and_header(self, header):
+ destination = '/topic/durable-invalid'
+
+ self.conn.send_frame('SUBSCRIBE',
+ {'destination': destination, 'ack': 'auto', header: 'true'})
+ self.listener.wait(3)
+ self.assertEquals(1, len(self.listener.errors))
+ self.assertEquals("Missing Header", self.listener.errors[0]['headers']['message'])
+
+ def test_durable_subscribe_no_id(self):
+ self.do_test_durable_subscribe_no_id_and_header('durable')
+
+ def test_durable_subscribe_no_id_and_legacy_header(self):
+ self.do_test_durable_subscribe_no_id_and_header('persistent')
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py
new file mode 100644
index 0000000000..884ada50e8
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/errors.py
@@ -0,0 +1,101 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import time
+
+class TestErrorsAndCloseConnection(base.BaseTest):
+ def __test_duplicate_consumer_tag_with_headers(self, destination, headers):
+ self.subscribe_dest(self.conn, destination, None,
+ headers = headers)
+
+ self.subscribe_dest(self.conn, destination, None,
+ headers = headers)
+
+ self.assertTrue(self.listener.wait())
+
+ self.assertEquals(1, len(self.listener.errors))
+ errorReceived = self.listener.errors[0]
+ self.assertEquals("Duplicated subscription identifier", errorReceived['headers']['message'])
+ self.assertEquals("A subscription identified by 'T_1' already exists.", errorReceived['message'])
+ time.sleep(2)
+ self.assertFalse(self.conn.is_connected())
+
+
+ def test_duplicate_consumer_tag_with_transient_destination(self):
+ destination = "/exchange/amq.direct/duplicate-consumer-tag-test1"
+ self.__test_duplicate_consumer_tag_with_headers(destination, {'id': 1})
+
+ def test_duplicate_consumer_tag_with_durable_destination(self):
+ destination = "/queue/duplicate-consumer-tag-test2"
+ self.__test_duplicate_consumer_tag_with_headers(destination, {'id': 1,
+ 'persistent': True})
+
+
+class TestErrors(base.BaseTest):
+
+ def test_invalid_queue_destination(self):
+ self.__test_invalid_destination("queue", "/bah/baz")
+
+ def test_invalid_empty_queue_destination(self):
+ self.__test_invalid_destination("queue", "")
+
+ def test_invalid_topic_destination(self):
+ self.__test_invalid_destination("topic", "/bah/baz")
+
+ def test_invalid_empty_topic_destination(self):
+ self.__test_invalid_destination("topic", "")
+
+ def test_invalid_exchange_destination(self):
+ self.__test_invalid_destination("exchange", "/bah/baz/boo")
+
+ def test_invalid_empty_exchange_destination(self):
+ self.__test_invalid_destination("exchange", "")
+
+ def test_invalid_default_exchange_destination(self):
+ self.__test_invalid_destination("exchange", "//foo")
+
+ def test_unknown_destination(self):
+ self.listener.reset()
+ self.conn.send("/something/interesting", 'test_unknown_destination')
+
+ self.assertTrue(self.listener.wait())
+ self.assertEquals(1, len(self.listener.errors))
+
+ err = self.listener.errors[0]
+ self.assertEquals("Unknown destination", err['headers']['message'])
+
+ def test_send_missing_destination(self):
+ self.__test_missing_destination("SEND")
+
+ def test_send_missing_destination(self):
+ self.__test_missing_destination("SUBSCRIBE")
+
+ def __test_missing_destination(self, command):
+ self.listener.reset()
+ self.conn.send_frame(command)
+
+ self.assertTrue(self.listener.wait())
+ self.assertEquals(1, len(self.listener.errors))
+
+ err = self.listener.errors[0]
+ self.assertEquals("Missing destination", err['headers']['message'])
+
+ def __test_invalid_destination(self, dtype, content):
+ self.listener.reset()
+ self.conn.send("/" + dtype + content, '__test_invalid_destination:' + dtype + content)
+
+ self.assertTrue(self.listener.wait())
+ self.assertEquals(1, len(self.listener.errors))
+
+ err = self.listener.errors[0]
+ self.assertEquals("Invalid destination", err['headers']['message'])
+ self.assertEquals("'" + content + "' is not a valid " +
+ dtype + " destination\n",
+ err['message'])
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py
new file mode 100644
index 0000000000..d7b558e7b5
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/lifecycle.py
@@ -0,0 +1,187 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import time
+
+class TestLifecycle(base.BaseTest):
+
+ def test_unsubscribe_exchange_destination(self):
+ ''' Test UNSUBSCRIBE command with exchange'''
+ d = "/exchange/amq.fanout"
+ self.unsub_test(d, self.sub_and_send(d))
+
+ def test_unsubscribe_exchange_destination_with_receipt(self):
+ ''' Test receipted UNSUBSCRIBE command with exchange'''
+ d = "/exchange/amq.fanout"
+ self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
+
+ def test_unsubscribe_queue_destination(self):
+ ''' Test UNSUBSCRIBE command with queue'''
+ d = "/queue/unsub01"
+ self.unsub_test(d, self.sub_and_send(d))
+
+ def test_unsubscribe_queue_destination_with_receipt(self):
+ ''' Test receipted UNSUBSCRIBE command with queue'''
+ d = "/queue/unsub02"
+ self.unsub_test(d, self.sub_and_send(d, receipt="unsub.rct"), numRcts=1)
+
+ def test_unsubscribe_exchange_id(self):
+ ''' Test UNSUBSCRIBE command with exchange by id'''
+ d = "/exchange/amq.fanout"
+ self.unsub_test(d, self.sub_and_send(d, subid="exchid"))
+
+ def test_unsubscribe_exchange_id_with_receipt(self):
+ ''' Test receipted UNSUBSCRIBE command with exchange by id'''
+ d = "/exchange/amq.fanout"
+ self.unsub_test(d, self.sub_and_send(d, subid="exchid", receipt="unsub.rct"), numRcts=1)
+
+ def test_unsubscribe_queue_id(self):
+ ''' Test UNSUBSCRIBE command with queue by id'''
+ d = "/queue/unsub03"
+ self.unsub_test(d, self.sub_and_send(d, subid="queid"))
+
+ def test_unsubscribe_queue_id_with_receipt(self):
+ ''' Test receipted UNSUBSCRIBE command with queue by id'''
+ d = "/queue/unsub04"
+ self.unsub_test(d, self.sub_and_send(d, subid="queid", receipt="unsub.rct"), numRcts=1)
+
+ def test_connect_version_1_0(self):
+ ''' Test CONNECT with version 1.0'''
+ self.conn.disconnect()
+ new_conn = self.create_connection(version="1.0")
+ try:
+ self.assertTrue(new_conn.is_connected())
+ finally:
+ new_conn.disconnect()
+ self.assertFalse(new_conn.is_connected())
+
+ def test_connect_version_1_1(self):
+ ''' Test CONNECT with version 1.1'''
+ self.conn.disconnect()
+ new_conn = self.create_connection(version="1.1")
+ try:
+ self.assertTrue(new_conn.is_connected())
+ finally:
+ new_conn.disconnect()
+ self.assertFalse(new_conn.is_connected())
+
+ def test_connect_version_1_2(self):
+ ''' Test CONNECT with version 1.2'''
+ self.conn.disconnect()
+ new_conn = self.create_connection(version="1.2")
+ try:
+ self.assertTrue(new_conn.is_connected())
+ finally:
+ new_conn.disconnect()
+ self.assertFalse(new_conn.is_connected())
+
+ def test_heartbeat_disconnects_client(self):
+ ''' Test heart-beat disconnection'''
+ self.conn.disconnect()
+ new_conn = self.create_connection(version='1.1', heartbeats=(1500, 0))
+ try:
+ self.assertTrue(new_conn.is_connected())
+ time.sleep(1)
+ self.assertTrue(new_conn.is_connected())
+ time.sleep(3)
+ self.assertFalse(new_conn.is_connected())
+ finally:
+ if new_conn.is_connected():
+ new_conn.disconnect()
+
+ def test_unsupported_version(self):
+ ''' Test unsupported version on CONNECT command'''
+ self.bad_connect("Supported versions are 1.0,1.1,1.2\n", version='100.1')
+
+ def test_bad_username(self):
+ ''' Test bad username'''
+ self.bad_connect("Access refused for user 'gust'\n", user='gust')
+
+ def test_bad_password(self):
+ ''' Test bad password'''
+ self.bad_connect("Access refused for user 'guest'\n", passcode='gust')
+
+ def test_bad_vhost(self):
+ ''' Test bad virtual host'''
+ self.bad_connect("Virtual host '//' access denied", version='1.1', vhost='//')
+
+ def bad_connect(self, expected, user='guest', passcode='guest', **kwargs):
+ self.conn.disconnect()
+ new_conn = self.create_connection_obj(**kwargs)
+ listener = base.WaitableListener()
+ new_conn.set_listener('', listener)
+ try:
+ new_conn.start()
+ new_conn.connect(user, passcode)
+ self.assertTrue(listener.wait())
+ self.assertEquals(expected, listener.errors[0]['message'])
+ finally:
+ if new_conn.is_connected():
+ new_conn.disconnect()
+
+ def test_bad_header_on_send(self):
+ ''' Test disallowed header on SEND '''
+ self.listener.reset(1)
+ self.conn.send_frame("SEND", {"destination":"a", "message-id":"1"})
+ self.assertTrue(self.listener.wait())
+ self.assertEquals(1, len(self.listener.errors))
+ errorReceived = self.listener.errors[0]
+ self.assertEquals("Invalid header", errorReceived['headers']['message'])
+ self.assertEquals("'message-id' is not allowed on 'SEND'.\n", errorReceived['message'])
+
+ def test_send_recv_header(self):
+ ''' Test sending a custom header and receiving it back '''
+ dest = '/queue/custom-header'
+ hdrs = {'x-custom-header-1': 'value1',
+ 'x-custom-header-2': 'value2',
+ 'custom-header-3': 'value3'}
+ self.listener.reset(1)
+ recv_hdrs = self.simple_test_send_rec(dest, headers=hdrs)
+ self.assertEquals('value1', recv_hdrs['x-custom-header-1'])
+ self.assertEquals('value2', recv_hdrs['x-custom-header-2'])
+ self.assertEquals('value3', recv_hdrs['custom-header-3'])
+
+ def test_disconnect(self):
+ ''' Test DISCONNECT command'''
+ self.conn.disconnect()
+ self.assertFalse(self.conn.is_connected())
+
+ def test_disconnect_with_receipt(self):
+ ''' Test the DISCONNECT command with receipts '''
+ time.sleep(3)
+ self.listener.reset(1)
+ self.conn.send_frame("DISCONNECT", {"receipt": "test"})
+ self.assertTrue(self.listener.wait())
+ self.assertEquals(1, len(self.listener.receipts))
+ receiptReceived = self.listener.receipts[0]['headers']['receipt-id']
+ self.assertEquals("test", receiptReceived
+ , "Wrong receipt received: '" + receiptReceived + "'")
+
+ def unsub_test(self, dest, verbs, numRcts=0):
+ def afterfun():
+ self.conn.send(dest, "after-test")
+ subverb, unsubverb = verbs
+ self.assertListenerAfter(subverb, numMsgs=1,
+ errMsg="FAILED to subscribe and send")
+ self.assertListenerAfter(unsubverb, numRcts=numRcts,
+ errMsg="Incorrect responses from UNSUBSCRIBE")
+ self.assertListenerAfter(afterfun,
+ errMsg="Still receiving messages")
+
+ def sub_and_send(self, dest, subid=None, receipt=None):
+ def subfun():
+ self.subscribe_dest(self.conn, dest, subid)
+ self.conn.send(dest, "test")
+ def unsubfun():
+ headers = {}
+ if receipt != None:
+ headers['receipt'] = receipt
+ self.unsubscribe_dest(self.conn, dest, subid, **headers)
+ return subfun, unsubfun
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py
new file mode 100644
index 0000000000..40f908c5d9
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/parsing.py
@@ -0,0 +1,331 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import re
+import socket
+import functools
+import time
+import sys
+import os
+
+def connect(cnames):
+ ''' Decorator that creates stomp connections and issues CONNECT '''
+ cmd=('CONNECT\n'
+ 'login:guest\n'
+ 'passcode:guest\n'
+ '\n'
+ '\n\0')
+ resp = ('CONNECTED\n'
+ 'server:RabbitMQ/(.*)\n'
+ 'session:(.*)\n'
+ 'heart-beat:0,0\n'
+ 'version:1.0\n'
+ '\n\x00')
+ def w(m):
+ @functools.wraps(m)
+ def wrapper(self, *args, **kwargs):
+ for cname in cnames:
+ sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sd.settimeout(30000)
+ sd.connect((self.host, self.port))
+ sd.sendall(cmd.encode('utf-8'))
+ self.match(resp, sd.recv(4096).decode('utf-8'))
+ setattr(self, cname, sd)
+ try:
+ r = m(self, *args, **kwargs)
+ finally:
+ for cname in cnames:
+ try:
+ getattr(self, cname).close()
+ except IOError:
+ pass
+ return r
+ return wrapper
+ return w
+
+
+class TestParsing(unittest.TestCase):
+ host='127.0.0.1'
+ # The default port is 61613 but it's in the middle of the ephemeral
+ # ports range on many operating systems. Therefore, there is a
+ # chance this port is already in use. Let's use a port close to the
+ # AMQP default port.
+ port=int(os.environ["STOMP_PORT"])
+
+
+ def match(self, pattern, data):
+ ''' helper: try to match a regexp with a string.
+ Fail test if they do not match.
+ '''
+ matched = re.match(pattern, data)
+ if matched:
+ return matched.groups()
+ self.assertTrue(False, 'No match:\n{}\n\n{}'.format(pattern, data))
+
+ def recv_atleast(self, bufsize):
+ recvhead = []
+ rl = bufsize
+ while rl > 0:
+ buf = self.cd.recv(rl).decode('utf-8')
+ bl = len(buf)
+ if bl==0: break
+ recvhead.append( buf )
+ rl -= bl
+ return ''.join(recvhead)
+
+
+ @connect(['cd'])
+ def test_newline_after_nul(self):
+ cmd = ('\n'
+ 'SUBSCRIBE\n'
+ 'destination:/exchange/amq.fanout\n'
+ '\n\x00\n'
+ 'SEND\n'
+ 'content-type:text/plain\n'
+ 'destination:/exchange/amq.fanout\n\n'
+ 'hello\n\x00\n')
+ self.cd.sendall(cmd.encode('utf-8'))
+ resp = ('MESSAGE\n'
+ 'destination:/exchange/amq.fanout\n'
+ 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
+ 'redelivered:false\n'
+ 'content-type:text/plain\n'
+ 'content-length:6\n'
+ '\n'
+ 'hello\n\0')
+ self.match(resp, self.cd.recv(4096).decode('utf-8'))
+
+ @connect(['cd'])
+ def test_send_without_content_type(self):
+ cmd = ('\n'
+ 'SUBSCRIBE\n'
+ 'destination:/exchange/amq.fanout\n'
+ '\n\x00\n'
+ 'SEND\n'
+ 'destination:/exchange/amq.fanout\n\n'
+ 'hello\n\x00')
+ self.cd.sendall(cmd.encode('utf-8'))
+ resp = ('MESSAGE\n'
+ 'destination:/exchange/amq.fanout\n'
+ 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
+ 'redelivered:false\n'
+ 'content-length:6\n'
+ '\n'
+ 'hello\n\0')
+ self.match(resp, self.cd.recv(4096).decode('utf-8'))
+
+ @connect(['cd'])
+ def test_send_without_content_type_binary(self):
+ msg = 'hello'
+ cmd = ('\n'
+ 'SUBSCRIBE\n'
+ 'destination:/exchange/amq.fanout\n'
+ '\n\x00\n'
+ 'SEND\n'
+ 'destination:/exchange/amq.fanout\n' +
+ 'content-length:{}\n\n'.format(len(msg)) +
+ '{}\x00'.format(msg))
+ self.cd.sendall(cmd.encode('utf-8'))
+ resp = ('MESSAGE\n'
+ 'destination:/exchange/amq.fanout\n'
+ 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
+ 'redelivered:false\n' +
+ 'content-length:{}\n'.format(len(msg)) +
+ '\n{}\0'.format(msg))
+ self.match(resp, self.cd.recv(4096).decode('utf-8'))
+
+ @connect(['cd'])
+ def test_newline_after_nul_and_leading_nul(self):
+ cmd = ('\n'
+ '\x00SUBSCRIBE\n'
+ 'destination:/exchange/amq.fanout\n'
+ '\n\x00\n'
+ '\x00SEND\n'
+ 'destination:/exchange/amq.fanout\n'
+ 'content-type:text/plain\n'
+ '\nhello\n\x00\n')
+ self.cd.sendall(cmd.encode('utf-8'))
+ resp = ('MESSAGE\n'
+ 'destination:/exchange/amq.fanout\n'
+ 'message-id:Q_/exchange/amq.fanout@@session-(.*)\n'
+ 'redelivered:false\n'
+ 'content-type:text/plain\n'
+ 'content-length:6\n'
+ '\n'
+ 'hello\n\0')
+ self.match(resp, self.cd.recv(4096).decode('utf-8'))
+
+ @connect(['cd'])
+ def test_bad_command(self):
+ ''' Trigger an error message. '''
+ cmd = ('WRONGCOMMAND\n'
+ 'destination:a\n'
+ 'exchange:amq.fanout\n'
+ '\n\0')
+ self.cd.sendall(cmd.encode('utf-8'))
+ resp = ('ERROR\n'
+ 'message:Bad command\n'
+ 'content-type:text/plain\n'
+ 'version:1.0,1.1,1.2\n'
+ 'content-length:43\n'
+ '\n'
+ 'Could not interpret command "WRONGCOMMAND"\n'
+ '\0')
+ self.match(resp, self.cd.recv(4096).decode('utf-8'))
+
+ @connect(['sd', 'cd1', 'cd2'])
+ def test_broadcast(self):
+ ''' Single message should be delivered to two consumers:
+ amq.topic --routing_key--> first_queue --> first_connection
+ \--routing_key--> second_queue--> second_connection
+ '''
+ subscribe=( 'SUBSCRIBE\n'
+ 'id: XsKNhAf\n'
+ 'destination:/exchange/amq.topic/da9d4779\n'
+ '\n\0')
+ for cd in [self.cd1, self.cd2]:
+ cd.sendall(subscribe.encode('utf-8'))
+
+ time.sleep(0.1)
+
+ cmd = ('SEND\n'
+ 'content-type:text/plain\n'
+ 'destination:/exchange/amq.topic/da9d4779\n'
+ '\n'
+ 'message'
+ '\n\0')
+ self.sd.sendall(cmd.encode('utf-8'))
+
+ resp=('MESSAGE\n'
+ 'subscription:(.*)\n'
+ 'destination:/topic/da9d4779\n'
+ 'message-id:(.*)\n'
+ 'redelivered:false\n'
+ 'content-type:text/plain\n'
+ 'content-length:8\n'
+ '\n'
+ 'message'
+ '\n\x00')
+ for cd in [self.cd1, self.cd2]:
+ self.match(resp, cd.recv(4096).decode('utf-8'))
+
+ @connect(['cd'])
+ def test_message_with_embedded_nulls(self):
+ ''' Test sending/receiving message with embedded nulls. '''
+ dest='destination:/exchange/amq.topic/test_embed_nulls_message\n'
+ resp_dest='destination:/topic/test_embed_nulls_message\n'
+ subscribe=( 'SUBSCRIBE\n'
+ 'id:xxx\n'
+ +dest+
+ '\n\0')
+ self.cd.sendall(subscribe.encode('utf-8'))
+
+ boilerplate = '0123456789'*1024 # large enough boilerplate
+ message = '01'
+ oldi = 2
+ for i in [5, 90, 256-1, 384-1, 512, 1024, 1024+256+64+32]:
+ message = message + '\0' + boilerplate[oldi+1:i]
+ oldi = i
+ msg_len = len(message)
+
+ cmd = ('SEND\n'
+ +dest+
+ 'content-type:text/plain\n'
+ 'content-length:%i\n'
+ '\n'
+ '%s'
+ '\0' % (len(message), message))
+ self.cd.sendall(cmd.encode('utf-8'))
+
+ headresp=('MESSAGE\n' # 8
+ 'subscription:(.*)\n' # 14 + subscription
+ +resp_dest+ # 44
+ 'message-id:(.*)\n' # 12 + message-id
+ 'redelivered:false\n' # 18
+ 'content-type:text/plain\n' # 24
+ 'content-length:%i\n' # 16 + 4==len('1024')
+ '\n' # 1
+ '(.*)$' # prefix of body+null (potentially)
+ % len(message) )
+ headlen = 8 + 24 + 14 + (3) + 44 + 12 + 18 + (48) + 16 + (4) + 1 + (1)
+
+ headbuf = self.recv_atleast(headlen)
+ self.assertFalse(len(headbuf) == 0)
+
+ (sub, msg_id, bodyprefix) = self.match(headresp, headbuf)
+ bodyresp=( '%s\0' % message )
+ bodylen = len(bodyresp);
+
+ bodybuf = ''.join([bodyprefix,
+ self.recv_atleast(bodylen - len(bodyprefix))])
+
+ self.assertEqual(len(bodybuf), msg_len+1,
+ "body received not the same length as message sent")
+ self.assertEqual(bodybuf, bodyresp,
+ " body (...'%s')\nincorrectly returned as (...'%s')"
+ % (bodyresp[-10:], bodybuf[-10:]))
+
+ @connect(['cd'])
+ def test_message_in_packets(self):
+ ''' Test sending/receiving message in packets. '''
+ base_dest='topic/test_embed_nulls_message\n'
+ dest='destination:/exchange/amq.' + base_dest
+ resp_dest='destination:/'+ base_dest
+ subscribe=( 'SUBSCRIBE\n'
+ 'id:xxx\n'
+ +dest+
+ '\n\0')
+ self.cd.sendall(subscribe.encode('utf-8'))
+
+ boilerplate = '0123456789'*1024 # large enough boilerplate
+
+ message = boilerplate[:1024 + 512 + 256 + 32]
+ msg_len = len(message)
+
+ msg_to_send = ('SEND\n'
+ +dest+
+ 'content-type:text/plain\n'
+ '\n'
+ '%s'
+ '\0' % (message) )
+ packet_size = 191
+ part_index = 0
+ msg_to_send_len = len(msg_to_send)
+ while part_index < msg_to_send_len:
+ part = msg_to_send[part_index:part_index+packet_size]
+ time.sleep(0.1)
+ self.cd.sendall(part.encode('utf-8'))
+ part_index += packet_size
+
+ headresp=('MESSAGE\n' # 8
+ 'subscription:(.*)\n' # 14 + subscription
+ +resp_dest+ # 44
+ 'message-id:(.*)\n' # 12 + message-id
+ 'redelivered:false\n' # 18
+ 'content-type:text/plain\n' # 24
+ 'content-length:%i\n' # 16 + 4==len('1024')
+ '\n' # 1
+ '(.*)$' # prefix of body+null (potentially)
+ % len(message) )
+ headlen = 8 + 24 + 14 + (3) + 44 + 12 + 18 + (48) + 16 + (4) + 1 + (1)
+
+ headbuf = self.recv_atleast(headlen)
+ self.assertFalse(len(headbuf) == 0)
+
+ (sub, msg_id, bodyprefix) = self.match(headresp, headbuf)
+ bodyresp=( '%s\0' % message )
+ bodylen = len(bodyresp);
+
+ bodybuf = ''.join([bodyprefix,
+ self.recv_atleast(bodylen - len(bodyprefix))])
+
+ self.assertEqual(len(bodybuf), msg_len+1,
+ "body received not the same length as message sent")
+ self.assertEqual(bodybuf, bodyresp,
+ " body ('%s')\nincorrectly returned as ('%s')"
+ % (bodyresp, bodybuf))
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py
new file mode 100644
index 0000000000..3761c92360
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/queue_properties.py
@@ -0,0 +1,87 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import pika
+import base
+import time
+import os
+
+class TestQueueProperties(base.BaseTest):
+
+ def test_subscribe(self):
+ destination = "/queue/queue-properties-subscribe-test"
+
+ # subscribe
+ self.subscribe_dest(self.conn, destination, None,
+ headers={
+ 'x-message-ttl': 60000,
+ 'x-expires': 70000,
+ 'x-max-length': 10,
+ 'x-max-length-bytes': 20000,
+ 'x-dead-letter-exchange': 'dead-letter-exchange',
+ 'x-dead-letter-routing-key': 'dead-letter-routing-key',
+ 'x-max-priority': 6,
+ })
+
+ # now try to declare the queue using pika
+ # if the properties are the same we should
+ # not get any error
+ connection = pika.BlockingConnection(pika.ConnectionParameters(
+ host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+ channel.queue_declare(queue='queue-properties-subscribe-test',
+ durable=True,
+ arguments={
+ 'x-message-ttl': 60000,
+ 'x-expires': 70000,
+ 'x-max-length': 10,
+ 'x-max-length-bytes': 20000,
+ 'x-dead-letter-exchange': 'dead-letter-exchange',
+ 'x-dead-letter-routing-key': 'dead-letter-routing-key',
+ 'x-max-priority': 6,
+ })
+
+ self.conn.disconnect()
+ connection.close()
+
+ def test_send(self):
+ destination = "/queue/queue-properties-send-test"
+
+ # send
+ self.conn.send(destination, "test1",
+ headers={
+ 'x-message-ttl': 60000,
+ 'x-expires': 70000,
+ 'x-max-length': 10,
+ 'x-max-length-bytes': 20000,
+ 'x-dead-letter-exchange': 'dead-letter-exchange',
+ 'x-dead-letter-routing-key': 'dead-letter-routing-key',
+ 'x-max-priority': 6,
+ })
+
+ # now try to declare the queue using pika
+ # if the properties are the same we should
+ # not get any error
+ connection = pika.BlockingConnection(pika.ConnectionParameters(
+ host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+ channel.queue_declare(queue='queue-properties-send-test',
+ durable=True,
+ arguments={
+ 'x-message-ttl': 60000,
+ 'x-expires': 70000,
+ 'x-max-length': 10,
+ 'x-max-length-bytes': 20000,
+ 'x-dead-letter-exchange': 'dead-letter-exchange',
+ 'x-dead-letter-routing-key': 'dead-letter-routing-key',
+ 'x-max-priority': 6,
+ })
+
+ self.conn.disconnect()
+ connection.close()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py
new file mode 100644
index 0000000000..3dfdd72cc9
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/redelivered.py
@@ -0,0 +1,40 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import time
+
+class TestRedelivered(base.BaseTest):
+
+ def test_redelivered(self):
+ destination = "/queue/redelivered-test"
+
+ # subscribe and send message
+ self.subscribe_dest(self.conn, destination, None, ack='client')
+ self.conn.send(destination, "test1")
+ message_receive_timeout = 30
+ self.assertTrue(self.listener.wait(message_receive_timeout), "Test message not received within {0} seconds".format(message_receive_timeout))
+ self.assertEquals(1, len(self.listener.messages))
+ self.assertEquals('false', self.listener.messages[0]['headers']['redelivered'])
+
+ # disconnect with no ack
+ self.conn.disconnect()
+
+ # now reconnect
+ conn2 = self.create_connection()
+ try:
+ listener2 = base.WaitableListener()
+ listener2.reset(1)
+ conn2.set_listener('', listener2)
+ self.subscribe_dest(conn2, destination, None, ack='client')
+ self.assertTrue(listener2.wait(), "message not received again")
+ self.assertEquals(1, len(listener2.messages))
+ self.assertEquals('true', listener2.messages[0]['headers']['redelivered'])
+ finally:
+ conn2.disconnect()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py
new file mode 100644
index 0000000000..6fbcb3d492
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/reliability.py
@@ -0,0 +1,41 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import base
+import stomp
+import unittest
+import time
+
+class TestReliability(base.BaseTest):
+
+ def test_send_and_disconnect(self):
+ ''' Test close socket after send does not lose messages '''
+ destination = "/queue/reliability"
+ pub_conn = self.create_connection()
+ try:
+ msg = "0" * (128)
+
+ count = 10000
+
+ listener = base.WaitableListener()
+ listener.reset(count)
+ self.conn.set_listener('', listener)
+ self.subscribe_dest(self.conn, destination, None)
+
+ for x in range(0, count):
+ pub_conn.send(destination, msg + str(x))
+ time.sleep(2.0)
+ pub_conn.disconnect()
+
+ if listener.wait(30):
+ self.assertEquals(count, len(listener.messages))
+ else:
+ listener.print_state("Final state of listener:")
+ self.fail("Did not receive %s messages in time" % count)
+ finally:
+ if pub_conn.is_connected():
+ pub_conn.disconnect()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py
new file mode 100644
index 0000000000..570ad9f5a3
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/ssl_lifecycle.py
@@ -0,0 +1,81 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import os
+import os.path
+import sys
+
+import stomp
+import base
+import ssl
+
+base_path = os.path.dirname(sys.argv[0])
+
+ssl_key_file = os.path.join(os.getenv('SSL_CERTS_PATH'), 'client', 'key.pem')
+ssl_cert_file = os.path.join(os.getenv('SSL_CERTS_PATH'), 'client', 'cert.pem')
+ssl_ca_certs = os.path.join(os.getenv('SSL_CERTS_PATH'), 'testca', 'cacert.pem')
+
+class TestSslClient(unittest.TestCase):
+
+ def __ssl_connect(self):
+ conn = stomp.Connection(host_and_ports = [ ('localhost', int(os.environ["STOMP_PORT_TLS"])) ],
+ use_ssl = True, ssl_key_file = ssl_key_file,
+ ssl_cert_file = ssl_cert_file,
+ ssl_ca_certs = ssl_ca_certs)
+ print("FILE: ".format(ssl_cert_file))
+ conn.start()
+ conn.connect("guest", "guest")
+ return conn
+
+ def __ssl_auth_connect(self):
+ conn = stomp.Connection(host_and_ports = [ ('localhost', int(os.environ["STOMP_PORT_TLS"])) ],
+ use_ssl = True, ssl_key_file = ssl_key_file,
+ ssl_cert_file = ssl_cert_file,
+ ssl_ca_certs = ssl_ca_certs)
+ conn.start()
+ conn.connect()
+ return conn
+
+ def test_ssl_connect(self):
+ conn = self.__ssl_connect()
+ conn.disconnect()
+
+ def test_ssl_auth_connect(self):
+ conn = self.__ssl_auth_connect()
+ conn.disconnect()
+
+ def test_ssl_send_receive(self):
+ conn = self.__ssl_connect()
+ self.__test_conn(conn)
+
+ def test_ssl_auth_send_receive(self):
+ conn = self.__ssl_auth_connect()
+ self.__test_conn(conn)
+
+ def __test_conn(self, conn):
+ try:
+ listener = base.WaitableListener()
+
+ conn.set_listener('', listener)
+
+ d = "/topic/ssl.test"
+ conn.subscribe(destination=d, ack="auto", id="ctag", receipt="sub")
+
+ self.assertTrue(listener.wait(1))
+
+ self.assertEquals("sub",
+ listener.receipts[0]['headers']['receipt-id'])
+
+ listener.reset(1)
+ conn.send(body="Hello SSL!", destination=d)
+
+ self.assertTrue(listener.wait())
+
+ self.assertEquals("Hello SSL!", listener.messages[0]['message'])
+ finally:
+ conn.disconnect()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
new file mode 100755
index 0000000000..01967465a2
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python
+
+import test_runner
+
+if __name__ == '__main__':
+ modules = [
+ 'parsing',
+ 'errors',
+ 'lifecycle',
+ 'ack',
+ 'amqp_headers',
+ 'queue_properties',
+ 'reliability',
+ 'transactions',
+ 'x_queue_name',
+ 'destinations',
+ 'redelivered',
+ 'topic_permissions',
+ 'x_queue_type_quorum'
+ ]
+ test_runner.run_unittests(modules)
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py
new file mode 100755
index 0000000000..10efa4fbb4
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_connect_options.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import test_runner
+
+if __name__ == '__main__':
+ modules = ['connect_options']
+ test_runner.run_unittests(modules)
+
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py
new file mode 100644
index 0000000000..9aa5855b02
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import sys
+import os
+
+def run_unittests(modules):
+ suite = unittest.TestSuite()
+ for m in modules:
+ mod = __import__(m)
+ for name in dir(mod):
+ obj = getattr(mod, name)
+ if name.startswith("Test") and issubclass(obj, unittest.TestCase):
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj))
+
+ ts = unittest.TextTestRunner().run(unittest.TestSuite(suite))
+ if ts.errors or ts.failures:
+ sys.exit(1)
+
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py
new file mode 100755
index 0000000000..95d2d2baa7
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_ssl.py
@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import test_runner
+import test_util
+
+if __name__ == '__main__':
+ modules = ['ssl_lifecycle']
+ test_util.ensure_ssl_auth_user()
+ test_runner.run_unittests(modules)
+
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py
new file mode 100644
index 0000000000..911100c54f
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_util.py
@@ -0,0 +1,52 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import subprocess
+import socket
+import sys
+import os
+import os.path
+
+def ensure_ssl_auth_user():
+ user = 'O=client,CN=%s' % socket.gethostname()
+ rabbitmqctl(['stop_app'])
+ rabbitmqctl(['reset'])
+ rabbitmqctl(['start_app'])
+ rabbitmqctl(['add_user', user, 'foo'])
+ rabbitmqctl(['clear_password', user])
+ rabbitmqctl(['set_permissions', user, '.*', '.*', '.*'])
+
+def enable_implicit_connect():
+ switch_config(implicit_connect='true', default_user='[{login, "guest"}, {passcode, "guest"}]')
+
+def disable_implicit_connect():
+ switch_config(implicit_connect='false', default_user='[]')
+
+def enable_default_user():
+ switch_config(default_user='[{login, "guest"}, {passcode, "guest"}]')
+
+def disable_default_user():
+ switch_config(default_user='[]')
+
+def switch_config(implicit_connect='', default_user=''):
+ cmd = ''
+ cmd += 'ok = io:format("~n===== Ranch listeners (before stop) =====~n~n~p~n", [ranch:info()]),'
+ cmd += 'ok = application:stop(rabbitmq_stomp),'
+ cmd += 'io:format("~n===== Ranch listeners (after stop) =====~n~n~p~n", [ranch:info()]),'
+ if implicit_connect:
+ cmd += 'ok = application:set_env(rabbitmq_stomp,implicit_connect,{}),'.format(implicit_connect)
+ if default_user:
+ cmd += 'ok = application:set_env(rabbitmq_stomp,default_user,{}),'.format(default_user)
+ cmd += 'ok = application:start(rabbitmq_stomp),'
+ cmd += 'io:format("~n===== Ranch listeners (after start) =====~n~n~p~n", [ranch:info()]).'
+ rabbitmqctl(['eval', cmd])
+
+def rabbitmqctl(args):
+ ctl = os.getenv('RABBITMQCTL')
+ cmdline = [ctl, '-n', os.getenv('RABBITMQ_NODENAME')]
+ cmdline.extend(args)
+ subprocess.check_call(cmdline)
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py
new file mode 100644
index 0000000000..6272f6d8b5
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/topic_permissions.py
@@ -0,0 +1,52 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import base
+import test_util
+import sys
+
+
+class TestTopicPermissions(base.BaseTest):
+ @classmethod
+ def setUpClass(cls):
+ test_util.rabbitmqctl(['set_topic_permissions', 'guest', 'amq.topic', '^{username}.Authorised', '^{username}.Authorised'])
+ cls.authorised_topic = '/topic/guest.AuthorisedTopic'
+ cls.restricted_topic = '/topic/guest.RestrictedTopic'
+
+ @classmethod
+ def tearDownClass(cls):
+ test_util.rabbitmqctl(['clear_topic_permissions', 'guest'])
+
+ def test_publish_authorisation(self):
+ ''' Test topic permissions via publish '''
+ self.listener.reset()
+
+ # send on authorised topic
+ self.subscribe_dest(self.conn, self.authorised_topic, None)
+ self.conn.send(self.authorised_topic, "authorised hello")
+
+ self.assertTrue(self.listener.wait(), "Timeout, no message received")
+
+ # assert no errors
+ if len(self.listener.errors) > 0:
+ self.fail(self.listener.errors[0]['message'])
+
+ # check msg content
+ msg = self.listener.messages[0]
+ self.assertEqual("authorised hello", msg['message'])
+ self.assertEqual(self.authorised_topic, msg['headers']['destination'])
+
+ self.listener.reset()
+
+ # send on restricted topic
+ self.conn.send(self.restricted_topic, "hello")
+
+ self.assertTrue(self.listener.wait(), "Timeout, no message received")
+
+ # assert errors
+ self.assertGreater(len(self.listener.errors), 0)
+ self.assertIn("ACCESS_REFUSED", self.listener.errors[0]['message'])
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py
new file mode 100644
index 0000000000..379806bfb8
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/transactions.py
@@ -0,0 +1,61 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import base
+import time
+
+class TestTransactions(base.BaseTest):
+
+ def test_tx_commit(self):
+ ''' Test TX with a COMMIT and ensure messages are delivered '''
+ destination = "/exchange/amq.fanout"
+ tx = "test.tx"
+
+ self.listener.reset()
+ self.subscribe_dest(self.conn, destination, None)
+ self.conn.begin(transaction=tx)
+ self.conn.send(destination, "hello!", transaction=tx)
+ self.conn.send(destination, "again!")
+
+ ## should see the second message
+ self.assertTrue(self.listener.wait(3))
+ self.assertEquals(1, len(self.listener.messages))
+ self.assertEquals("again!", self.listener.messages[0]['message'])
+
+ ## now look for the first message
+ self.listener.reset()
+ self.conn.commit(transaction=tx)
+ self.assertTrue(self.listener.wait(3))
+ self.assertEquals(1, len(self.listener.messages),
+ "Missing committed message")
+ self.assertEquals("hello!", self.listener.messages[0]['message'])
+
+ def test_tx_abort(self):
+ ''' Test TX with an ABORT and ensure messages are discarded '''
+ destination = "/exchange/amq.fanout"
+ tx = "test.tx"
+
+ self.listener.reset()
+ self.subscribe_dest(self.conn, destination, None)
+ self.conn.begin(transaction=tx)
+ self.conn.send(destination, "hello!", transaction=tx)
+ self.conn.send(destination, "again!")
+
+ ## should see the second message
+ self.assertTrue(self.listener.wait(3))
+ self.assertEquals(1, len(self.listener.messages))
+ self.assertEquals("again!", self.listener.messages[0]['message'])
+
+ ## now look for the first message to be discarded
+ self.listener.reset()
+ self.conn.abort(transaction=tx)
+ self.assertFalse(self.listener.wait(3))
+ self.assertEquals(0, len(self.listener.messages),
+ "Unexpected committed message")
+
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py
new file mode 100644
index 0000000000..f2c90486eb
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py
@@ -0,0 +1,71 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import unittest
+import stomp
+import pika
+import base
+import time
+import os
+
+class TestUserGeneratedQueueName(base.BaseTest):
+
+ def test_exchange_dest(self):
+ queueName='my-user-generated-queue-name-exchange'
+
+ # subscribe
+ self.subscribe_dest(
+ self.conn,
+ '/exchange/amq.direct/test',
+ None,
+ headers={ 'x-queue-name': queueName }
+ )
+
+ connection = pika.BlockingConnection(
+ pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+
+ # publish a message to the named queue
+ channel.basic_publish(
+ exchange='',
+ routing_key=queueName,
+ body='Hello World!')
+
+ # check if we receive the message from the STOMP subscription
+ self.assertTrue(self.listener.wait(2), "initial message not received")
+ self.assertEquals(1, len(self.listener.messages))
+
+ self.conn.disconnect()
+ connection.close()
+
+ def test_topic_dest(self):
+ queueName='my-user-generated-queue-name-topic'
+
+ # subscribe
+ self.subscribe_dest(
+ self.conn,
+ '/topic/test',
+ None,
+ headers={ 'x-queue-name': queueName }
+ )
+
+ connection = pika.BlockingConnection(
+ pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+
+ # publish a message to the named queue
+ channel.basic_publish(
+ exchange='',
+ routing_key=queueName,
+ body='Hello World!')
+
+ # check if we receive the message from the STOMP subscription
+ self.assertTrue(self.listener.wait(2), "initial message not received")
+ self.assertEquals(1, len(self.listener.messages))
+
+ self.conn.disconnect()
+ connection.close()
diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py
new file mode 100644
index 0000000000..1018abd0d4
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_quorum.py
@@ -0,0 +1,62 @@
+## This Source Code Form is subject to the terms of the Mozilla Public
+## License, v. 2.0. If a copy of the MPL was not distributed with this
+## file, You can obtain one at https://mozilla.org/MPL/2.0/.
+##
+## Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+##
+
+import pika
+import base
+import time
+import os
+import re
+
+
+class TestUserGeneratedQueueName(base.BaseTest):
+
+ def test_quorum_queue(self):
+ queueName = 'my-quorum-queue'
+
+ # subscribe
+ self.subscribe_dest(
+ self.conn,
+ '/topic/quorum-queue-test',
+ None,
+ headers={
+ 'x-queue-name': queueName,
+ 'x-queue-type': 'quorum',
+ 'durable': True,
+ 'auto-delete': False,
+ 'id': 1234
+ }
+ )
+
+ # let the quorum queue some time to start
+ time.sleep(5)
+
+ connection = pika.BlockingConnection(
+ pika.ConnectionParameters(host='127.0.0.1', port=int(os.environ["AMQP_PORT"])))
+ channel = connection.channel()
+
+ # publish a message to the named queue
+ channel.basic_publish(
+ exchange='',
+ routing_key=queueName,
+ body='Hello World!')
+
+ # could we declare a quorum queue?
+ quorum_queue_supported = True
+ if len(self.listener.errors) > 0:
+ pattern = re.compile(r"feature flag is disabled", re.MULTILINE)
+ for error in self.listener.errors:
+ if pattern.search(error['message']) != None:
+ quorum_queue_supported = False
+ break
+
+ if quorum_queue_supported:
+ # check if we receive the message from the STOMP subscription
+ self.assertTrue(self.listener.wait(5), "initial message not received")
+ self.assertEquals(1, len(self.listener.messages))
+ self.conn.disconnect()
+
+ connection.close()
diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl
new file mode 100644
index 0000000000..739512e3b3
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl
@@ -0,0 +1,75 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% The stupidest client imaginable, just for testing.
+
+-module(rabbit_stomp_client).
+
+-export([connect/1, connect/2, connect/4, connect/5, disconnect/1, send/2, send/3, send/4, recv/1]).
+
+-include("rabbit_stomp_frame.hrl").
+
+-define(TIMEOUT, 1000). % milliseconds
+
+connect(Port) -> connect0([], "guest", "guest", Port, []).
+connect(V, Port) -> connect0([{"accept-version", V}], "guest", "guest", Port, []).
+connect(V, Login, Pass, Port) -> connect0([{"accept-version", V}], Login, Pass, Port, []).
+connect(V, Login, Pass, Port, Headers) -> connect0([{"accept-version", V}], Login, Pass, Port, Headers).
+
+connect0(Version, Login, Pass, Port, Headers) ->
+ %% The default port is 61613 but it's in the middle of the ephemeral
+ %% ports range on many operating systems. Therefore, there is a
+ %% chance this port is already in use. Let's use a port close to the
+ %% AMQP default port.
+ {ok, Sock} = gen_tcp:connect(localhost, Port, [{active, false}, binary]),
+ Client0 = recv_state(Sock),
+ send(Client0, "CONNECT", [{"login", Login},
+ {"passcode", Pass} | Version] ++ Headers),
+ {#stomp_frame{command = "CONNECTED"}, Client1} = recv(Client0),
+ {ok, Client1}.
+
+disconnect(Client = {Sock, _}) ->
+ send(Client, "DISCONNECT"),
+ gen_tcp:close(Sock).
+
+send(Client, Command) ->
+ send(Client, Command, []).
+
+send(Client, Command, Headers) ->
+ send(Client, Command, Headers, []).
+
+send({Sock, _}, Command, Headers, Body) ->
+ Frame = rabbit_stomp_frame:serialize(
+ #stomp_frame{command = list_to_binary(Command),
+ headers = Headers,
+ body_iolist = Body}),
+ gen_tcp:send(Sock, Frame).
+
+recv_state(Sock) ->
+ {Sock, []}.
+
+recv({_Sock, []} = Client) ->
+ recv(Client, rabbit_stomp_frame:initial_state(), 0);
+recv({Sock, [Frame | Frames]}) ->
+ {Frame, {Sock, Frames}}.
+
+recv(Client = {Sock, _}, FrameState, Length) ->
+ {ok, Payload} = gen_tcp:recv(Sock, Length, ?TIMEOUT),
+ parse(Payload, Client, FrameState, Length).
+
+parse(Payload, Client = {Sock, FramesRev}, FrameState, Length) ->
+ case rabbit_stomp_frame:parse(Payload, FrameState) of
+ {ok, Frame, <<>>} ->
+ recv({Sock, lists:reverse([Frame | FramesRev])});
+ {ok, Frame, <<"\n">>} ->
+ recv({Sock, lists:reverse([Frame | FramesRev])});
+ {ok, Frame, Rest} ->
+ parse(Rest, {Sock, [Frame | FramesRev]},
+ rabbit_stomp_frame:initial_state(), Length);
+ {more, NewState} ->
+ recv(Client, NewState, 0)
+ end.
diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl
new file mode 100644
index 0000000000..6b5b9298fa
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_publish_test.erl
@@ -0,0 +1,80 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stomp_publish_test).
+
+-export([run/0]).
+
+-include("rabbit_stomp_frame.hrl").
+
+-define(DESTINATION, "/queue/test").
+
+-define(MICROS_PER_UPDATE, 5000000).
+-define(MICROS_PER_UPDATE_MSG, 100000).
+-define(MICROS_PER_SECOND, 1000000).
+
+%% A very simple publish-and-consume-as-fast-as-you-can test.
+
+run() ->
+ [put(K, 0) || K <- [sent, recd, last_sent, last_recd]],
+ put(last_ts, erlang:monotonic_time()),
+ {ok, Pub} = rabbit_stomp_client:connect(),
+ {ok, Recv} = rabbit_stomp_client:connect(),
+ Self = self(),
+ spawn(fun() -> publish(Self, Pub, 0, erlang:monotonic_time()) end),
+ rabbit_stomp_client:send(
+ Recv, "SUBSCRIBE", [{"destination", ?DESTINATION}]),
+ spawn(fun() -> recv(Self, Recv, 0, erlang:monotonic_time()) end),
+ report().
+
+report() ->
+ receive
+ {sent, C} -> put(sent, C);
+ {recd, C} -> put(recd, C)
+ end,
+ Diff = erlang:convert_time_unit(
+ erlang:monotonic_time() - get(last_ts), native, microseconds),
+ case Diff > ?MICROS_PER_UPDATE of
+ true -> S = get(sent) - get(last_sent),
+ R = get(recd) - get(last_recd),
+ put(last_sent, get(sent)),
+ put(last_recd, get(recd)),
+ put(last_ts, erlang:monotonic_time()),
+ io:format("Send ~p msg/s | Recv ~p msg/s~n",
+ [trunc(S * ?MICROS_PER_SECOND / Diff),
+ trunc(R * ?MICROS_PER_SECOND / Diff)]);
+ false -> ok
+ end,
+ report().
+
+publish(Owner, Client, Count, TS) ->
+ rabbit_stomp_client:send(
+ Client, "SEND", [{"destination", ?DESTINATION}],
+ [integer_to_list(Count)]),
+ Diff = erlang:convert_time_unit(
+ erlang:monotonic_time() - TS, native, microseconds),
+ case Diff > ?MICROS_PER_UPDATE_MSG of
+ true -> Owner ! {sent, Count + 1},
+ publish(Owner, Client, Count + 1,
+ erlang:monotonic_time());
+ false -> publish(Owner, Client, Count + 1, TS)
+ end.
+
+recv(Owner, Client0, Count, TS) ->
+ {#stomp_frame{body_iolist = Body}, Client1} =
+ rabbit_stomp_client:recv(Client0),
+ BodyInt = list_to_integer(binary_to_list(iolist_to_binary(Body))),
+ Count = BodyInt,
+ Diff = erlang:convert_time_unit(
+ erlang:monotonic_time() - TS, native, microseconds),
+ case Diff > ?MICROS_PER_UPDATE_MSG of
+ true -> Owner ! {recd, Count + 1},
+ recv(Owner, Client1, Count + 1,
+ erlang:monotonic_time());
+ false -> recv(Owner, Client1, Count + 1, TS)
+ end.
+
diff --git a/deps/rabbitmq_stomp/test/src/test.config b/deps/rabbitmq_stomp/test/src/test.config
new file mode 100644
index 0000000000..5968824996
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/src/test.config
@@ -0,0 +1,13 @@
+[{rabbitmq_stomp, [{default_user, []},
+ {ssl_cert_login, true},
+ {tcp_listeners, [5673]},
+ {ssl_listeners, [5674]}
+ ]},
+ {rabbit, [{ssl_options, [{cacertfile,"%%CERTS_DIR%%/testca/cacert.pem"},
+ {certfile,"%%CERTS_DIR%%/server/cert.pem"},
+ {keyfile,"%%CERTS_DIR%%/server/key.pem"},
+ {verify,verify_peer},
+ {fail_if_no_peer_cert,true}
+ ]}
+ ]}
+].
diff --git a/deps/rabbitmq_stomp/test/topic_SUITE.erl b/deps/rabbitmq_stomp/test/topic_SUITE.erl
new file mode 100644
index 0000000000..4a6421a326
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/topic_SUITE.erl
@@ -0,0 +1,170 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(topic_SUITE).
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stomp.hrl").
+-include("rabbit_stomp_frame.hrl").
+-include("rabbit_stomp_headers.hrl").
+
+all() ->
+ [{group, list_to_atom("version_" ++ V)} || V <- ?SUPPORTED_VERSIONS].
+
+groups() ->
+ Tests = [
+ publish_topic_authorisation,
+ subscribe_topic_authorisation,
+ change_default_topic_exchange
+ ],
+
+ [{list_to_atom("version_" ++ V), [sequence], Tests}
+ || V <- ?SUPPORTED_VERSIONS].
+
+init_per_suite(Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(Group, Config) ->
+ Version = string:sub_string(atom_to_list(Group), 9),
+ rabbit_ct_helpers:set_config(Config, [{version, Version}]).
+
+end_per_group(_Group, Config) -> Config.
+
+init_per_testcase(_TestCase, Config) ->
+ Version = ?config(version, Config),
+ StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+ {ok, Connection} = amqp_connection:start(#amqp_params_direct{
+ node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)
+ }),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+ {ok, Client} = rabbit_stomp_client:connect(Version, StompPort),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {amqp_connection, Connection},
+ {amqp_channel, Channel},
+ {stomp_client, Client}
+ ]),
+ init_per_testcase0(Config1).
+
+end_per_testcase(_TestCase, Config) ->
+ Connection = ?config(amqp_connection, Config),
+ Channel = ?config(amqp_channel, Config),
+ Client = ?config(stomp_client, Config),
+ rabbit_stomp_client:disconnect(Client),
+ amqp_channel:close(Channel),
+ amqp_connection:close(Connection),
+ end_per_testcase0(Config).
+
+init_per_testcase0(Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, add_user,
+ [<<"user">>, <<"pass">>, <<"acting-user">>]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_permissions, [
+ <<"user">>, <<"/">>, <<".*">>, <<".*">>, <<".*">>, <<"acting-user">>]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, set_topic_permissions, [
+ <<"user">>, <<"/">>, <<"amq.topic">>, <<"^{username}.Authorised">>, <<"^{username}.Authorised">>, <<"acting-user">>]),
+ Version = ?config(version, Config),
+ StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
+ {ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort),
+ rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]).
+
+end_per_testcase0(Config) ->
+ ClientFoo = ?config(client_foo, Config),
+ rabbit_stomp_client:disconnect(ClientFoo),
+ rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_auth_backend_internal, delete_user,
+ [<<"user">>, <<"acting-user">>]),
+ Config.
+
+publish_topic_authorisation(Config) ->
+ ClientFoo = ?config(client_foo, Config),
+
+ AuthorisedTopic = "/topic/user.AuthorisedTopic",
+ RestrictedTopic = "/topic/user.RestrictedTopic",
+
+ %% send on authorised topic
+ rabbit_stomp_client:send(
+ ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]),
+
+ rabbit_stomp_client:send(
+ ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["authorised hello"]),
+
+ {ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"),
+ [<<"authorised hello">>] = Body,
+
+ %% send on restricted topic
+ rabbit_stomp_client:send(
+ ClientFoo, "SEND", [{"destination", RestrictedTopic}], ["hello"]),
+ {ok, _Client2, Hdrs2, _} = stomp_receive(ClientFoo, "ERROR"),
+ "access_refused" = proplists:get_value("message", Hdrs2),
+ ok.
+
+subscribe_topic_authorisation(Config) ->
+ ClientFoo = ?config(client_foo, Config),
+
+ AuthorisedTopic = "/topic/user.AuthorisedTopic",
+ RestrictedTopic = "/topic/user.RestrictedTopic",
+
+ %% subscribe to authorised topic
+ rabbit_stomp_client:send(
+ ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]),
+
+ rabbit_stomp_client:send(
+ ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["authorised hello"]),
+
+ {ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"),
+ [<<"authorised hello">>] = Body,
+
+ %% subscribe to restricted topic
+ rabbit_stomp_client:send(
+ ClientFoo, "SUBSCRIBE", [{"destination", RestrictedTopic}]),
+ {ok, _Client2, Hdrs2, _} = stomp_receive(ClientFoo, "ERROR"),
+ "access_refused" = proplists:get_value("message", Hdrs2),
+ ok.
+
+change_default_topic_exchange(Config) ->
+ Channel = ?config(amqp_channel, Config),
+ ClientFoo = ?config(client_foo, Config),
+ Ex = <<"my-topic-exchange">>,
+ AuthorisedTopic = "/topic/user.AuthorisedTopic",
+
+ Declare = #'exchange.declare'{exchange = Ex, type = <<"topic">>},
+ #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare),
+
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbitmq_stomp, default_topic_exchange, Ex]),
+
+ rabbit_stomp_client:send(
+ ClientFoo, "SUBSCRIBE", [{"destination", AuthorisedTopic}]),
+
+ rabbit_stomp_client:send(
+ ClientFoo, "SEND", [{"destination", AuthorisedTopic}], ["ohai there"]),
+
+ {ok, _Client1, _, Body} = stomp_receive(ClientFoo, "MESSAGE"),
+ [<<"ohai there">>] = Body,
+
+ Delete = #'exchange.delete'{exchange = Ex},
+ #'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete),
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env, [rabbitmq_stomp, default_topic_exchange]),
+ ok.
+
+
+stomp_receive(Client, Command) ->
+ {#stomp_frame{command = Command,
+ headers = Hdrs,
+ body_iolist = Body}, Client1} =
+ rabbit_stomp_client:recv(Client),
+ {ok, Client1, Hdrs, Body}.
+
diff --git a/deps/rabbitmq_stomp/test/util_SUITE.erl b/deps/rabbitmq_stomp/test/util_SUITE.erl
new file mode 100644
index 0000000000..89d9d9e37e
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/util_SUITE.erl
@@ -0,0 +1,242 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(util_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
+-include("rabbit_stomp_frame.hrl").
+-compile(export_all).
+
+all() -> [
+ longstr_field,
+ message_properties,
+ message_headers,
+ minimal_message_headers_with_no_custom,
+ headers_post_process,
+ headers_post_process_noop_replyto,
+ headers_post_process_noop2,
+ negotiate_version_both_empty,
+ negotiate_version_no_common,
+ negotiate_version_simple_common,
+ negotiate_version_two_choice_common,
+ negotiate_version_two_choice_common_out_of_order,
+ negotiate_version_two_choice_big_common,
+ negotiate_version_choice_mismatched_length,
+ negotiate_version_choice_duplicates,
+ trim_headers,
+ ack_mode_auto,
+ ack_mode_auto_default,
+ ack_mode_client,
+ ack_mode_client_individual,
+ consumer_tag_id,
+ consumer_tag_destination,
+ consumer_tag_invalid,
+ parse_valid_message_id,
+ parse_invalid_message_id
+ ].
+
+
+%%--------------------------------------------------------------------
+%% Header Processing Tests
+%%--------------------------------------------------------------------
+
+longstr_field(_) ->
+ {<<"ABC">>, longstr, <<"DEF">>} =
+ rabbit_stomp_util:longstr_field("ABC", "DEF").
+
+message_properties(_) ->
+ Headers = [
+ {"content-type", "text/plain"},
+ {"content-encoding", "UTF-8"},
+ {"persistent", "true"},
+ {"priority", "1"},
+ {"correlation-id", "123"},
+ {"reply-to", "something"},
+ {"expiration", "my-expiration"},
+ {"amqp-message-id", "M123"},
+ {"timestamp", "123456"},
+ {"type", "freshly-squeezed"},
+ {"user-id", "joe"},
+ {"app-id", "joe's app"},
+ {"str", "foo"},
+ {"int", "123"}
+ ],
+
+ #'P_basic'{
+ content_type = <<"text/plain">>,
+ content_encoding = <<"UTF-8">>,
+ delivery_mode = 2,
+ priority = 1,
+ correlation_id = <<"123">>,
+ reply_to = <<"something">>,
+ expiration = <<"my-expiration">>,
+ message_id = <<"M123">>,
+ timestamp = 123456,
+ type = <<"freshly-squeezed">>,
+ user_id = <<"joe">>,
+ app_id = <<"joe's app">>,
+ headers = [{<<"str">>, longstr, <<"foo">>},
+ {<<"int">>, longstr, <<"123">>}]
+ } =
+ rabbit_stomp_util:message_properties(#stomp_frame{headers = Headers}).
+
+message_headers(_) ->
+ Properties = #'P_basic'{
+ headers = [{<<"str">>, longstr, <<"foo">>},
+ {<<"int">>, signedint, 123}],
+ content_type = <<"text/plain">>,
+ content_encoding = <<"UTF-8">>,
+ delivery_mode = 2,
+ priority = 1,
+ correlation_id = 123,
+ reply_to = <<"something">>,
+ message_id = <<"M123">>,
+ timestamp = 123456,
+ type = <<"freshly-squeezed">>,
+ user_id = <<"joe">>,
+ app_id = <<"joe's app">>},
+
+ Headers = rabbit_stomp_util:message_headers(Properties),
+
+ Expected = [
+ {"content-type", "text/plain"},
+ {"content-encoding", "UTF-8"},
+ {"persistent", "true"},
+ {"priority", "1"},
+ {"correlation-id", "123"},
+ {"reply-to", "something"},
+ {"expiration", "my-expiration"},
+ {"amqp-message-id", "M123"},
+ {"timestamp", "123456"},
+ {"type", "freshly-squeezed"},
+ {"user-id", "joe"},
+ {"app-id", "joe's app"},
+ {"str", "foo"},
+ {"int", "123"}
+ ],
+
+ [] = lists:subtract(Headers, Expected).
+
+minimal_message_headers_with_no_custom(_) ->
+ Properties = #'P_basic'{},
+
+ Headers = rabbit_stomp_util:message_headers(Properties),
+ Expected = [
+ {"content-type", "text/plain"},
+ {"content-encoding", "UTF-8"},
+ {"amqp-message-id", "M123"}
+ ],
+
+ [] = lists:subtract(Headers, Expected).
+
+headers_post_process(_) ->
+ Headers = [{"header1", "1"},
+ {"header2", "12"},
+ {"reply-to", "something"}],
+ Expected = [{"header1", "1"},
+ {"header2", "12"},
+ {"reply-to", "/reply-queue/something"}],
+ [] = lists:subtract(
+ rabbit_stomp_util:headers_post_process(Headers), Expected).
+
+headers_post_process_noop_replyto(_) ->
+ [begin
+ Headers = [{"reply-to", Prefix ++ "/something"}],
+ Headers = rabbit_stomp_util:headers_post_process(Headers)
+ end || Prefix <- rabbit_routing_util:dest_prefixes()].
+
+headers_post_process_noop2(_) ->
+ Headers = [{"header1", "1"},
+ {"header2", "12"}],
+ Expected = [{"header1", "1"},
+ {"header2", "12"}],
+ [] = lists:subtract(
+ rabbit_stomp_util:headers_post_process(Headers), Expected).
+
+negotiate_version_both_empty(_) ->
+ {error, no_common_version} = rabbit_stomp_util:negotiate_version([],[]).
+
+negotiate_version_no_common(_) ->
+ {error, no_common_version} =
+ rabbit_stomp_util:negotiate_version(["1.2"],["1.3"]).
+
+negotiate_version_simple_common(_) ->
+ {ok, "1.2"} =
+ rabbit_stomp_util:negotiate_version(["1.2"],["1.2"]).
+
+negotiate_version_two_choice_common(_) ->
+ {ok, "1.3"} =
+ rabbit_stomp_util:negotiate_version(["1.2", "1.3"],["1.2", "1.3"]).
+
+negotiate_version_two_choice_common_out_of_order(_) ->
+ {ok, "1.3"} =
+ rabbit_stomp_util:negotiate_version(["1.3", "1.2"],["1.2", "1.3"]).
+
+negotiate_version_two_choice_big_common(_) ->
+ {ok, "1.20.23"} =
+ rabbit_stomp_util:negotiate_version(["1.20.23", "1.30.456"],
+ ["1.20.23", "1.30.457"]).
+negotiate_version_choice_mismatched_length(_) ->
+ {ok, "1.2.3"} =
+ rabbit_stomp_util:negotiate_version(["1.2", "1.2.3"],
+ ["1.2.3", "1.2"]).
+negotiate_version_choice_duplicates(_) ->
+ {ok, "1.2"} =
+ rabbit_stomp_util:negotiate_version(["1.2", "1.2"],
+ ["1.2", "1.2"]).
+trim_headers(_) ->
+ #stomp_frame{headers = [{"one", "foo"}, {"two", "baz "}]} =
+ rabbit_stomp_util:trim_headers(
+ #stomp_frame{headers = [{"one", " foo"}, {"two", " baz "}]}).
+
+%%--------------------------------------------------------------------
+%% Frame Parsing Tests
+%%--------------------------------------------------------------------
+
+ack_mode_auto(_) ->
+ Frame = #stomp_frame{headers = [{"ack", "auto"}]},
+ {auto, _} = rabbit_stomp_util:ack_mode(Frame).
+
+ack_mode_auto_default(_) ->
+ Frame = #stomp_frame{headers = []},
+ {auto, _} = rabbit_stomp_util:ack_mode(Frame).
+
+ack_mode_client(_) ->
+ Frame = #stomp_frame{headers = [{"ack", "client"}]},
+ {client, true} = rabbit_stomp_util:ack_mode(Frame).
+
+ack_mode_client_individual(_) ->
+ Frame = #stomp_frame{headers = [{"ack", "client-individual"}]},
+ {client, false} = rabbit_stomp_util:ack_mode(Frame).
+
+consumer_tag_id(_) ->
+ Frame = #stomp_frame{headers = [{"id", "foo"}]},
+ {ok, <<"T_foo">>, _} = rabbit_stomp_util:consumer_tag(Frame).
+
+consumer_tag_destination(_) ->
+ Frame = #stomp_frame{headers = [{"destination", "foo"}]},
+ {ok, <<"Q_foo">>, _} = rabbit_stomp_util:consumer_tag(Frame).
+
+consumer_tag_invalid(_) ->
+ Frame = #stomp_frame{headers = []},
+ {error, missing_destination_header} = rabbit_stomp_util:consumer_tag(Frame).
+
+%%--------------------------------------------------------------------
+%% Message ID Parsing Tests
+%%--------------------------------------------------------------------
+
+parse_valid_message_id(_) ->
+ {ok, {<<"bar">>, "abc", 123}} =
+ rabbit_stomp_util:parse_message_id("bar@@abc@@123").
+
+parse_invalid_message_id(_) ->
+ {error, invalid_message_id} =
+ rabbit_stomp_util:parse_message_id("blah").
+