summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl')
-rw-r--r--deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl410
1 files changed, 410 insertions, 0 deletions
diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl
new file mode 100644
index 0000000000..c8375ead1a
--- /dev/null
+++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl
@@ -0,0 +1,410 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(amqp10_dynamic_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, non_parallel_tests},
+ {group, with_map_config}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ simple,
+ change_definition,
+ autodelete_amqp091_src_on_confirm,
+ autodelete_amqp091_src_on_publish,
+ autodelete_amqp091_dest_on_confirm,
+ autodelete_amqp091_dest_on_publish,
+ simple_amqp10_dest,
+ simple_amqp10_src
+ ]},
+ {with_map_config, [], [
+ simple,
+ simple_amqp10_dest,
+ simple_amqp10_src
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config0) ->
+ {ok, _} = application:ensure_all_started(amqp10_client),
+ rabbit_ct_helpers:log_environment(),
+ Config = rabbit_ct_helpers:merge_app_env(Config0,
+ [{lager, [{error_logger_hwm, 200}]}]),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ application:stop(amqp10_client),
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(with_map_config, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{map_config, true}]);
+init_per_group(_, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{map_config, false}]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config0) ->
+ SrcQ = list_to_binary(atom_to_list(Testcase) ++ "_src"),
+ DestQ = list_to_binary(atom_to_list(Testcase) ++ "_dest"),
+ DestQ2 = list_to_binary(atom_to_list(Testcase) ++ "_dest2"),
+ Config = [{srcq, SrcQ}, {destq, DestQ}, {destq2, DestQ2} | Config0],
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+simple(Config) ->
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ with_session(Config,
+ fun (Sess) ->
+ test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp10">>,
+ <<"src-address">>)
+ end).
+
+simple_amqp10_dest(Config) ->
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ with_session(Config,
+ fun (Sess) ->
+ test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>,
+ <<"src-queue">>)
+ end).
+
+test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
+ MapConfig = ?config(map_config, Config),
+ shovel_test_utils:set_param(Config, <<"test">>,
+ [{<<"src-protocol">>, Protocol},
+ {ProtocolSrc, Src},
+ {<<"dest-protocol">>, <<"amqp10">>},
+ {<<"dest-address">>, Dest},
+ {<<"dest-add-forward-headers">>, true},
+ {<<"dest-add-timestamp-header">>, true},
+ {<<"dest-application-properties">>,
+ case MapConfig of
+ true ->
+ #{<<"app-prop-key">> => <<"app-prop-value">>};
+ _ ->
+ [{<<"app-prop-key">>, <<"app-prop-value">>}]
+ end},
+ {<<"dest-properties">>,
+ case MapConfig of
+ true ->
+ #{<<"user_id">> => <<"guest">>};
+ _ ->
+ [{<<"user_id">>, <<"guest">>}]
+ end},
+ {<<"dest-message-annotations">>,
+ case MapConfig of
+ true ->
+ #{<<"message-ann-key">> =>
+ <<"message-ann-value">>};
+ _ ->
+ [{<<"message-ann-key">>,
+ <<"message-ann-value">>}]
+ end}]),
+ Msg = publish_expect(Sess, Src, Dest, <<"tag1">>, <<"hello">>),
+ ?assertMatch((#{user_id := <<"guest">>, creation_time := _}),
+ (amqp10_msg:properties(Msg))),
+ ?assertMatch((#{<<"shovel-name">> := <<"test">>,
+ <<"shovel-type">> := <<"dynamic">>, <<"shovelled-by">> := _,
+ <<"app-prop-key">> := <<"app-prop-value">>}),
+ (amqp10_msg:application_properties(Msg))),
+ ?assertMatch((#{<<"message-ann-key">> := <<"message-ann-value">>}),
+ (amqp10_msg:message_annotations(Msg))).
+
+simple_amqp10_src(Config) ->
+ MapConfig = ?config(map_config, Config),
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ with_session(Config,
+ fun (Sess) ->
+ shovel_test_utils:set_param(
+ Config,
+ <<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
+ {<<"src-address">>, Src},
+ {<<"dest-protocol">>, <<"amqp091">>},
+ {<<"dest-queue">>, Dest},
+ {<<"add-forward-headers">>, true},
+ {<<"dest-add-timestamp-header">>, true},
+ {<<"publish-properties">>,
+ case MapConfig of
+ true -> #{<<"cluster_id">> => <<"x">>};
+ _ -> [{<<"cluster_id">>, <<"x">>}]
+ end}
+ ]),
+ _Msg = publish_expect(Sess, Src, Dest, <<"tag1">>,
+ <<"hello">>),
+ % the fidelity loss is quite high when consuming using the amqp10
+ % plugin. For example custom headers aren't current translated.
+ % This isn't due to the shovel though.
+ ok
+ end).
+
+change_definition(Config) ->
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ Dest2 = ?config(destq2, Config),
+ with_session(Config,
+ fun (Sess) ->
+ shovel_test_utils:set_param(Config, <<"test">>,
+ [{<<"src-address">>, Src},
+ {<<"src-protocol">>, <<"amqp10">>},
+ {<<"dest-protocol">>, <<"amqp10">>},
+ {<<"dest-address">>, Dest}]),
+ publish_expect(Sess, Src, Dest, <<"tag2">>,<<"hello">>),
+ shovel_test_utils:set_param(Config, <<"test">>,
+ [{<<"src-address">>, Src},
+ {<<"src-protocol">>, <<"amqp10">>},
+ {<<"dest-protocol">>, <<"amqp10">>},
+ {<<"dest-address">>, Dest2}]),
+ publish_expect(Sess, Src, Dest2, <<"tag3">>, <<"hello">>),
+ expect_empty(Sess, Dest),
+ shovel_test_utils:clear_param(Config, <<"test">>),
+ publish_expect(Sess, Src, Src, <<"tag4">>, <<"hello2">>),
+ expect_empty(Sess, Dest),
+ expect_empty(Sess, Dest2)
+ end).
+
+autodelete_amqp091_src_on_confirm(Config) ->
+ autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50},
+ fun autodelete_amqp091_src/2),
+ ok.
+
+autodelete_amqp091_src_on_publish(Config) ->
+ autodelete_case(Config, {<<"on-publish">>, 50, 50, 50},
+ fun autodelete_amqp091_src/2),
+ ok.
+
+autodelete_amqp091_dest_on_confirm(Config) ->
+ autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50},
+ fun autodelete_amqp091_dest/2),
+ ok.
+
+autodelete_amqp091_dest_on_publish(Config) ->
+ autodelete_case(Config, {<<"on-publish">>, 50, 50, 50},
+ fun autodelete_amqp091_dest/2),
+ ok.
+
+autodelete_case(Config, Args, CaseFun) ->
+ with_session(Config, CaseFun(Config, Args)).
+
+autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ fun (Session) ->
+ publish_count(Session, Src, <<"hello">>, 100),
+ shovel_test_utils:set_param_nowait(
+ Config,
+ <<"test">>, [{<<"src-address">>, Src},
+ {<<"src-protocol">>, <<"amqp10">>},
+ {<<"src-delete-after">>, After},
+ {<<"src-prefetch-count">>, 5},
+ {<<"dest-address">>, Dest},
+ {<<"dest-protocol">>, <<"amqp10">>},
+ {<<"ack-mode">>, AckMode}
+ ]),
+ await_autodelete(Config, <<"test">>),
+ expect_count(Session, Dest, <<"hello">>, ExpDest),
+ expect_count(Session, Src, <<"hello">>, ExpSrc)
+ end.
+
+autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ fun (Session) ->
+ publish_count(Session, Src, <<"hello">>, 100),
+ shovel_test_utils:set_param_nowait(
+ Config,
+ <<"test">>, [{<<"src-queue">>, Src},
+ {<<"src-protocol">>, <<"amqp091">>},
+ {<<"src-delete-after">>, After},
+ {<<"src-prefetch-count">>, 5},
+ {<<"dest-address">>, Dest},
+ {<<"dest-protocol">>, <<"amqp10">>},
+ {<<"ack-mode">>, AckMode}
+ ]),
+ await_autodelete(Config, <<"test">>),
+ expect_count(Session, Dest, <<"hello">>, ExpDest),
+ expect_count(Session, Src, <<"hello">>, ExpSrc)
+ end.
+
+autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
+ Src = ?config(srcq, Config),
+ Dest = ?config(destq, Config),
+ fun (Session) ->
+ publish_count(Session, Src, <<"hello">>, 100),
+ shovel_test_utils:set_param_nowait(
+ Config,
+ <<"test">>, [{<<"src-address">>, Src},
+ {<<"src-protocol">>, <<"amqp10">>},
+ {<<"src-delete-after">>, After},
+ {<<"src-prefetch-count">>, 5},
+ {<<"dest-queue">>, Dest},
+ {<<"dest-protocol">>, <<"amqp091">>},
+ {<<"ack-mode">>, AckMode}
+ ]),
+ await_autodelete(Config, <<"test">>),
+ expect_count(Session, Dest, <<"hello">>, ExpDest),
+ expect_count(Session, Src, <<"hello">>, ExpSrc)
+ end.
+
+%%----------------------------------------------------------------------------
+
+with_session(Config, Fun) ->
+ Hostname = ?config(rmq_hostname, Config),
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ {ok, Conn} = amqp10_client:open_connection(Hostname, Port),
+ {ok, Sess} = amqp10_client:begin_session(Conn),
+ Fun(Sess),
+ amqp10_client:close_connection(Conn),
+ ok.
+
+publish(Sender, Tag, Payload) when is_binary(Payload) ->
+ Headers = #{durable => true},
+ Msg = amqp10_msg:set_headers(Headers,
+ amqp10_msg:new(Tag, Payload, false)),
+ ok = amqp10_client:send_msg(Sender, Msg),
+ receive
+ {amqp10_disposition, {accepted, Tag}} -> ok
+ after 3000 ->
+ exit(publish_disposition_not_received)
+ end.
+
+publish_expect(Session, Source, Dest, Tag, Payload) ->
+ LinkName = <<"dynamic-sender-", Dest/binary>>,
+ {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
+ unsettled, unsettled_state),
+ ok = await_amqp10_event(link, Sender, attached),
+ publish(Sender, Tag, Payload),
+ amqp10_client:detach_link(Sender),
+ expect_one(Session, Dest, Payload).
+
+await_amqp10_event(On, Ref, Evt) ->
+ receive
+ {amqp10_event, {On, Ref, Evt}} -> ok
+ after 5000 ->
+ exit({amqp10_event_timeout, On, Ref, Evt})
+ end.
+
+expect_one(Session, Dest, Payload) ->
+ LinkName = <<"dynamic-receiver-", Dest/binary>>,
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session, LinkName,
+ Dest, settled,
+ unsettled_state),
+ ok = amqp10_client:flow_link_credit(Receiver, 1, never),
+ Msg = expect(Receiver, Payload),
+ amqp10_client:detach_link(Receiver),
+ Msg.
+
+expect(Receiver, _Payload) ->
+ receive
+ {amqp10_msg, Receiver, InMsg} ->
+ InMsg
+ after 4000 ->
+ throw(timeout_in_expect_waiting_for_delivery)
+ end.
+
+expect_empty(Session, Dest) ->
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
+ <<"dynamic-receiver">>,
+ Dest, settled,
+ unsettled_state),
+ % probably good enough given we don't currently have a means of
+ % echoing flow state
+ {error, timeout} = amqp10_client:get_msg(Receiver, 250),
+ amqp10_client:detach_link(Receiver).
+
+publish_count(Session, Address, Payload, Count) ->
+ LinkName = <<"dynamic-sender-", Address/binary>>,
+ {ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName,
+ Address, unsettled,
+ unsettled_state),
+ ok = await_amqp10_event(link, Sender, attached),
+ [begin
+ Tag = rabbit_data_coercion:to_binary(I),
+ publish(Sender, Tag, <<Payload/binary, Tag/binary>>)
+ end || I <- lists:seq(1, Count)],
+ amqp10_client:detach_link(Sender).
+
+expect_count(Session, Address, Payload, Count) ->
+ {ok, Receiver} = amqp10_client:attach_receiver_link(Session,
+ <<"dynamic-receiver",
+ Address/binary>>,
+ Address, settled,
+ unsettled_state),
+ ok = amqp10_client:flow_link_credit(Receiver, Count, never),
+ [begin
+ expect(Receiver, Payload)
+ end || _ <- lists:seq(1, Count)],
+ expect_empty(Session, Address),
+ amqp10_client:detach_link(Receiver).
+
+
+invalid_param(Config, Value, User) ->
+ {error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_runtime_parameters, set,
+ [<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]).
+
+valid_param(Config, Value, User) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, valid_param1, [Config, Value, User]).
+
+valid_param1(_Config, Value, User) ->
+ ok = rabbit_runtime_parameters:set(
+ <<"/">>, <<"shovel">>, <<"a">>, Value, User),
+ ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"a">>, <<"acting-user">>).
+
+invalid_param(Config, Value) -> invalid_param(Config, Value, none).
+valid_param(Config, Value) -> valid_param(Config, Value, none).
+
+lookup_user(Config, Name) ->
+ {ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_access_control, check_user_login, [Name, []]),
+ User.
+
+await_autodelete(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, await_autodelete1, [Config, Name], 10000).
+
+await_autodelete1(_Config, Name) ->
+ shovel_test_utils:await(
+ fun () -> not lists:member(Name, shovels_from_parameters()) end),
+ shovel_test_utils:await(
+ fun () ->
+ not lists:member(Name,
+ shovel_test_utils:shovels_from_status())
+ end).
+
+shovels_from_parameters() ->
+ L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
+ [rabbit_misc:pget(name, Shovel) || Shovel <- L].