diff options
Diffstat (limited to 'deps/rabbit/test/rabbit_stream_queue_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 92d9f8c806..a010869043 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -47,6 +47,7 @@ groups() -> delete_quorum_replica, consume_from_replica, leader_failover, + leader_failover_dedupe, initial_cluster_size_one, initial_cluster_size_two, initial_cluster_size_one_policy, @@ -1194,6 +1195,76 @@ leader_failover(Config) -> ?assert(NewLeader =/= Server1), ok = rabbit_ct_broker_helpers:start_node(Config, Server1). +leader_failover_dedupe(Config) -> + %% tests that in-flight messages are automatically handled in the case where + %% a leader change happens during publishing + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]), + + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), + #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}), + + Self= self(), + F = fun F(N) -> + receive + go -> + [publish(Ch2, Q, integer_to_binary(N + I)) + || I <- lists:seq(1, 100)], + true = amqp_channel:wait_for_confirms(Ch2, 25), + F(N + 100); + stop -> + Self ! {last_msg, N}, + ct:pal("stop"), + ok + after 2 -> + self() ! go, + F(N) + end + end, + Pid = spawn(fun () -> + amqp_channel:register_confirm_handler(Ch2, self()), + F(0) + end), + erlang:monitor(process, Pid), + Pid ! go, + timer:sleep(10), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + %% this should cause a new leader to be elected and the channel on node 2 + %% to have to resend any pending messages to ensure none is lost + timer:sleep(30000), + [Info] = lists:filter( + fun(Props) -> + QName = rabbit_misc:r(<<"/">>, queue, Q), + lists:member({name, QName}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + NewLeader = proplists:get_value(leader, Info), + ?assert(NewLeader =/= Server1), + flush(), + ?assert(erlang:is_process_alive(Pid)), + Pid ! stop, + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + + N = receive + {last_msg, X} -> X + after 2000 -> + exit(last_msg_timeout) + end, + %% validate that no duplicates were written even though an internal + %% resend might have taken place + qos(Ch2, 100, false), + subscribe(Ch2, Q, false, 0), + validate_dedupe(Ch2, 1, N), + + ok. + initial_cluster_size_one(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1598,6 +1669,30 @@ qos(Ch, Prefetch, Global) -> amqp_channel:call(Ch, #'basic.qos'{global = Global, prefetch_count = Prefetch})). +validate_dedupe(Ch, N, N) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = B}} -> + I = binary_to_integer(B), + ?assertEqual(N, I), + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + after 60000 -> + exit({missing_record, N}) + end; +validate_dedupe(Ch, N, M) -> + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{payload = B}} -> + I = binary_to_integer(B), + ?assertEqual(N, I), + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + validate_dedupe(Ch, N + 1, M) + after 60000 -> + exit({missing_record, N}) + end. + receive_batch(Ch, N, N) -> receive {#'basic.deliver'{delivery_tag = DeliveryTag}, @@ -1642,3 +1737,12 @@ run_proper(Fun, Args, NumTests) -> {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) end}])). + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. |