diff options
Diffstat (limited to 'deps/rabbit/test/crashing_queues_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/crashing_queues_SUITE.erl | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/deps/rabbit/test/crashing_queues_SUITE.erl b/deps/rabbit/test/crashing_queues_SUITE.erl new file mode 100644 index 0000000000..cf88fb00f0 --- /dev/null +++ b/deps/rabbit/test/crashing_queues_SUITE.erl @@ -0,0 +1,267 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(crashing_queues_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_2} + ]. + +groups() -> + [ + {cluster_size_2, [], [ + crashing_unmirrored, + crashing_mirrored, + give_up_after_repeated_crashes + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_2, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 2} + ]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +crashing_unmirrored(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ChA = rabbit_ct_client_helpers:open_channel(Config, A), + ConnB = rabbit_ct_client_helpers:open_connection(Config, B), + QName = <<"crashing_unmirrored-q">>, + amqp_channel:call(ChA, #'confirm.select'{}), + test_queue_failure(A, ChA, ConnB, 1, 0, + #'queue.declare'{queue = QName, durable = true}), + test_queue_failure(A, ChA, ConnB, 0, 0, + #'queue.declare'{queue = QName, durable = false}), + ok. + +crashing_mirrored(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<".*">>, <<"all">>), + ChA = rabbit_ct_client_helpers:open_channel(Config, A), + ConnB = rabbit_ct_client_helpers:open_connection(Config, B), + QName = <<"crashing_mirrored-q">>, + amqp_channel:call(ChA, #'confirm.select'{}), + test_queue_failure(A, ChA, ConnB, 2, 1, + #'queue.declare'{queue = QName, durable = true}), + ok. + +test_queue_failure(Node, Ch, RaceConn, MsgCount, FollowerCount, Decl) -> + #'queue.declare_ok'{queue = QName} = amqp_channel:call(Ch, Decl), + try + publish(Ch, QName, transient), + publish(Ch, QName, durable), + Racer = spawn_declare_racer(RaceConn, Decl), + kill_queue(Node, QName), + assert_message_count(MsgCount, Ch, QName), + assert_follower_count(FollowerCount, Node, QName), + stop_declare_racer(Racer) + after + amqp_channel:call(Ch, #'queue.delete'{queue = QName}) + end. + +give_up_after_repeated_crashes(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ChA = rabbit_ct_client_helpers:open_channel(Config, A), + ChB = rabbit_ct_client_helpers:open_channel(Config, B), + QName = <<"give_up_after_repeated_crashes-q">>, + amqp_channel:call(ChA, #'confirm.select'{}), + amqp_channel:call(ChA, #'queue.declare'{queue = QName, + durable = true}), + await_state(A, QName, running), + publish(ChA, QName, durable), + kill_queue_hard(A, QName), + {'EXIT', _} = (catch amqp_channel:call( + ChA, #'queue.declare'{queue = QName, + durable = true})), + await_state(A, QName, crashed), + amqp_channel:call(ChB, #'queue.delete'{queue = QName}), + amqp_channel:call(ChB, #'queue.declare'{queue = QName, + durable = true}), + await_state(A, QName, running), + + %% Since it's convenient, also test absent queue status here. + rabbit_ct_broker_helpers:stop_node(Config, B), + await_state(A, QName, down), + ok. + + +publish(Ch, QName, DelMode) -> + Publish = #'basic.publish'{exchange = <<>>, routing_key = QName}, + Msg = #amqp_msg{props = #'P_basic'{delivery_mode = del_mode(DelMode)}}, + amqp_channel:cast(Ch, Publish, Msg), + amqp_channel:wait_for_confirms(Ch). + +del_mode(transient) -> 1; +del_mode(durable) -> 2. + +spawn_declare_racer(Conn, Decl) -> + Self = self(), + spawn_link(fun() -> declare_racer_loop(Self, Conn, Decl) end). + +stop_declare_racer(Pid) -> + Pid ! stop, + MRef = erlang:monitor(process, Pid), + receive + {'DOWN', MRef, process, Pid, _} -> ok + end. + +declare_racer_loop(Parent, Conn, Decl) -> + receive + stop -> unlink(Parent) + after 0 -> + %% Catch here because we might happen to catch the queue + %% while it is in the middle of recovering and thus + %% explode with NOT_FOUND because crashed. Doesn't matter, + %% we are only in this loop to try to fool the recovery + %% code anyway. + try + case amqp_connection:open_channel(Conn) of + {ok, Ch} -> amqp_channel:call(Ch, Decl); + closing -> ok + end + catch + exit:_ -> + ok + end, + declare_racer_loop(Parent, Conn, Decl) + end. + +await_state(Node, QName, State) -> + await_state(Node, QName, State, 30000). + +await_state(Node, QName, State, Time) -> + case state(Node, QName) of + State -> + ok; + Other -> + case Time of + 0 -> exit({timeout_awaiting_state, State, Other}); + _ -> timer:sleep(100), + await_state(Node, QName, State, Time - 100) + end + end. + +state(Node, QName) -> + V = <<"/">>, + Res = rabbit_misc:r(V, queue, QName), + Infos = rpc:call(Node, rabbit_amqqueue, info_all, [V, [name, state]]), + case Infos of + [] -> undefined; + [[{name, Res}, {state, State}]] -> State + end. + +kill_queue_hard(Node, QName) -> + case kill_queue(Node, QName) of + crashed -> ok; + _NewPid -> timer:sleep(100), + kill_queue_hard(Node, QName) + end. + +kill_queue(Node, QName) -> + Pid1 = queue_pid(Node, QName), + exit(Pid1, boom), + await_new_pid(Node, QName, Pid1). + +queue_pid(Node, QName) -> + Q = lookup(Node, QName), + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + #resource{virtual_host = VHost} = amqqueue:get_name(Q), + case State of + crashed -> + case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of + {error, {queue_supervisor_not_found, _}} -> {error, no_sup}; + {ok, SPid} -> + case sup_child(Node, SPid) of + {ok, _} -> QPid; %% restarting + {error, no_child} -> crashed %% given up + end + end; + _ -> QPid + end. + +sup_child(Node, Sup) -> + case rpc:call(Node, supervisor2, which_children, [Sup]) of + [{_, Child, _, _}] -> {ok, Child}; + [] -> {error, no_child}; + {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup} + end. + +lookup(Node, QName) -> + {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, QName)]), + Q. + +await_new_pid(Node, QName, OldPid) -> + case queue_pid(Node, QName) of + OldPid -> timer:sleep(10), + await_new_pid(Node, QName, OldPid); + New -> New + end. + +assert_message_count(Count, Ch, QName) -> + #'queue.declare_ok'{message_count = Count} = + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + passive = true}). + +assert_follower_count(Count, Node, QName) -> + Q = lookup(Node, QName), + [{_, Pids}] = rpc:call(Node, rabbit_amqqueue, info, [Q, [slave_pids]]), + RealCount = case Pids of + '' -> 0; + _ -> length(Pids) + end, + case RealCount of + Count -> + ok; + _ when RealCount < Count -> + timer:sleep(10), + assert_follower_count(Count, Node, QName); + _ -> + exit({too_many_replicas, Count, RealCount}) + end. |