summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/test/rabbit_stream_queue_SUITE.erl')
-rw-r--r--deps/rabbit/test/rabbit_stream_queue_SUITE.erl104
1 files changed, 104 insertions, 0 deletions
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
index 92d9f8c806..a010869043 100644
--- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl
@@ -47,6 +47,7 @@ groups() ->
delete_quorum_replica,
consume_from_replica,
leader_failover,
+ leader_failover_dedupe,
initial_cluster_size_one,
initial_cluster_size_two,
initial_cluster_size_one_policy,
@@ -1194,6 +1195,76 @@ leader_failover(Config) ->
?assert(NewLeader =/= Server1),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1).
+leader_failover_dedupe(Config) ->
+ %% tests that in-flight messages are automatically handled in the case where
+ %% a leader change happens during publishing
+ [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
+
+ check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),
+
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}),
+
+ Self= self(),
+ F = fun F(N) ->
+ receive
+ go ->
+ [publish(Ch2, Q, integer_to_binary(N + I))
+ || I <- lists:seq(1, 100)],
+ true = amqp_channel:wait_for_confirms(Ch2, 25),
+ F(N + 100);
+ stop ->
+ Self ! {last_msg, N},
+ ct:pal("stop"),
+ ok
+ after 2 ->
+ self() ! go,
+ F(N)
+ end
+ end,
+ Pid = spawn(fun () ->
+ amqp_channel:register_confirm_handler(Ch2, self()),
+ F(0)
+ end),
+ erlang:monitor(process, Pid),
+ Pid ! go,
+ timer:sleep(10),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ %% this should cause a new leader to be elected and the channel on node 2
+ %% to have to resend any pending messages to ensure none is lost
+ timer:sleep(30000),
+ [Info] = lists:filter(
+ fun(Props) ->
+ QName = rabbit_misc:r(<<"/">>, queue, Q),
+ lists:member({name, QName}, Props)
+ end,
+ rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue,
+ info_all, [<<"/">>, [name, leader, members]])),
+ NewLeader = proplists:get_value(leader, Info),
+ ?assert(NewLeader =/= Server1),
+ flush(),
+ ?assert(erlang:is_process_alive(Pid)),
+ Pid ! stop,
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+
+ N = receive
+ {last_msg, X} -> X
+ after 2000 ->
+ exit(last_msg_timeout)
+ end,
+ %% validate that no duplicates were written even though an internal
+ %% resend might have taken place
+ qos(Ch2, 100, false),
+ subscribe(Ch2, Q, false, 0),
+ validate_dedupe(Ch2, 1, N),
+
+ ok.
+
initial_cluster_size_one(Config) ->
[Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1598,6 +1669,30 @@ qos(Ch, Prefetch, Global) ->
amqp_channel:call(Ch, #'basic.qos'{global = Global,
prefetch_count = Prefetch})).
+validate_dedupe(Ch, N, N) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = B}} ->
+ I = binary_to_integer(B),
+ ?assertEqual(N, I),
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ after 60000 ->
+ exit({missing_record, N})
+ end;
+validate_dedupe(Ch, N, M) ->
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{payload = B}} ->
+ I = binary_to_integer(B),
+ ?assertEqual(N, I),
+ ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ validate_dedupe(Ch, N + 1, M)
+ after 60000 ->
+ exit({missing_record, N})
+ end.
+
receive_batch(Ch, N, N) ->
receive
{#'basic.deliver'{delivery_tag = DeliveryTag},
@@ -1642,3 +1737,12 @@ run_proper(Fun, Args, NumTests) ->
{on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
(F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
end}])).
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.