diff options
Diffstat (limited to 'deps/rabbit/test/eager_sync_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/eager_sync_SUITE.erl | 271 |
1 files changed, 271 insertions, 0 deletions
diff --git a/deps/rabbit/test/eager_sync_SUITE.erl b/deps/rabbit/test/eager_sync_SUITE.erl new file mode 100644 index 0000000000..a9e2ea2107 --- /dev/null +++ b/deps/rabbit/test/eager_sync_SUITE.erl @@ -0,0 +1,271 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(eager_sync_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(QNAME, <<"ha.two.test">>). +-define(QNAME_AUTO, <<"ha.auto.test">>). +-define(MESSAGE_COUNT, 200000). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + eager_sync, + eager_sync_cancel, + eager_sync_auto, + eager_sync_auto_on_policy_change, + eager_sync_requeue + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = 3, + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, ClusterSize}, + {rmq_nodes_clustered, true}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ [ + fun rabbit_ct_broker_helpers:set_ha_policy_two_pos/1, + fun rabbit_ct_broker_helpers:set_ha_policy_two_pos_batch_sync/1 + ]). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +eager_sync(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% Queue is on AB but not C. + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + Ch = rabbit_ct_client_helpers:open_channel(Config, C), + amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME, + durable = true}), + + %% Don't sync, lose messages + rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), + restart(Config, A), + restart(Config, B), + rabbit_ct_client_helpers:consume(Ch, ?QNAME, 0), + + %% Sync, keep messages + rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), + restart(Config, A), + ok = sync(C, ?QNAME), + restart(Config, B), + rabbit_ct_client_helpers:consume(Ch, ?QNAME, ?MESSAGE_COUNT), + + %% Check the no-need-to-sync path + rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), + ok = sync(C, ?QNAME), + rabbit_ct_client_helpers:consume(Ch, ?QNAME, ?MESSAGE_COUNT), + + %% keep unacknowledged messages + rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), + rabbit_ct_client_helpers:fetch(Ch, ?QNAME, 2), + restart(Config, A), + rabbit_ct_client_helpers:fetch(Ch, ?QNAME, 3), + sync(C, ?QNAME), + restart(Config, B), + rabbit_ct_client_helpers:consume(Ch, ?QNAME, ?MESSAGE_COUNT), + + ok. + +eager_sync_cancel(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% Queue is on AB but not C. + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + Ch = rabbit_ct_client_helpers:open_channel(Config, C), + + set_app_sync_batch_size(A), + set_app_sync_batch_size(B), + set_app_sync_batch_size(C), + + amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME, + durable = true}), + {ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence + eager_sync_cancel_test2(Config, A, B, C, Ch, 100). + +eager_sync_cancel_test2(_, _, _, _, _, 0) -> + error(no_more_attempts_left); +eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts) -> + %% Sync then cancel + rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), + restart(Config, A), + set_app_sync_batch_size(A), + spawn_link(fun() -> ok = sync_nowait(C, ?QNAME) end), + case wait_for_syncing(C, ?QNAME, 1) of + ok -> + case sync_cancel(C, ?QNAME) of + ok -> + wait_for_running(C, ?QNAME), + restart(Config, B), + set_app_sync_batch_size(B), + rabbit_ct_client_helpers:consume(Ch, ?QNAME, 0), + + {ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence + ok; + {ok, not_syncing} -> + %% Damn. Syncing finished between wait_for_syncing/3 and + %% sync_cancel/2 above. Start again. + amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}), + eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1) + end; + synced_already -> + %% Damn. Syncing finished before wait_for_syncing/3. Start again. + amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}), + eager_sync_cancel_test2(Config, A, B, C, Ch, Attempts - 1) + end. + +eager_sync_auto(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + Ch = rabbit_ct_client_helpers:open_channel(Config, C), + amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME_AUTO, + durable = true}), + + %% Sync automatically, don't lose messages + rabbit_ct_client_helpers:publish(Ch, ?QNAME_AUTO, ?MESSAGE_COUNT), + restart(Config, A), + wait_for_sync(C, ?QNAME_AUTO), + restart(Config, B), + wait_for_sync(C, ?QNAME_AUTO), + rabbit_ct_client_helpers:consume(Ch, ?QNAME_AUTO, ?MESSAGE_COUNT), + + ok. + +eager_sync_auto_on_policy_change(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% Queue is on AB but not C. + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + Ch = rabbit_ct_client_helpers:open_channel(Config, C), + amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME, + durable = true}), + + %% Sync automatically once the policy is changed to tell us to. + rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT), + restart(Config, A), + Params = [rabbit_misc:atom_to_binary(N) || N <- [A, B]], + rabbit_ct_broker_helpers:set_ha_policy(Config, + A, <<"^ha.two.">>, {<<"nodes">>, Params}, + [{<<"ha-sync-mode">>, <<"automatic">>}]), + wait_for_sync(C, ?QNAME), + + ok. + +eager_sync_requeue(Config) -> + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + %% Queue is on AB but not C. + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + Ch = rabbit_ct_client_helpers:open_channel(Config, C), + amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME, + durable = true}), + + rabbit_ct_client_helpers:publish(Ch, ?QNAME, 2), + {#'basic.get_ok'{delivery_tag = TagA}, _} = + amqp_channel:call(Ch, #'basic.get'{queue = ?QNAME}), + {#'basic.get_ok'{delivery_tag = TagB}, _} = + amqp_channel:call(Ch, #'basic.get'{queue = ?QNAME}), + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = TagA, requeue = true}), + restart(Config, B), + ok = sync(C, ?QNAME), + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = TagB, requeue = true}), + rabbit_ct_client_helpers:consume(Ch, ?QNAME, 2), + + ok. + +restart(Config, Node) -> + rabbit_ct_broker_helpers:restart_broker(Config, Node). + +sync(Node, QName) -> + case sync_nowait(Node, QName) of + ok -> wait_for_sync(Node, QName), + ok; + R -> R + end. + +sync_nowait(Node, QName) -> action(Node, sync_queue, QName). +sync_cancel(Node, QName) -> action(Node, cancel_sync_queue, QName). + +wait_for_sync(Node, QName) -> + sync_detection_SUITE:wait_for_sync_status(true, Node, QName). + +action(Node, Action, QName) -> + rabbit_control_helper:command_with_output( + Action, Node, [binary_to_list(QName)], [{"-p", "/"}]). + +queue(Node, QName) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, QName), + {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [QNameRes]), + Q. + +wait_for_syncing(Node, QName, Target) -> + case state(Node, QName) of + {{syncing, _}, _} -> ok; + {running, Target} -> synced_already; + _ -> timer:sleep(100), + wait_for_syncing(Node, QName, Target) + end. + +wait_for_running(Node, QName) -> + case state(Node, QName) of + {running, _} -> ok; + _ -> timer:sleep(100), + wait_for_running(Node, QName) + end. + +state(Node, QName) -> + [{state, State}, {synchronised_slave_pids, Pids}] = + rpc:call(Node, rabbit_amqqueue, info, + [queue(Node, QName), [state, synchronised_slave_pids]]), + {State, length(Pids)}. + +%% eager_sync_cancel_test needs a batch size that's < ?MESSAGE_COUNT +%% in order to pass, because a SyncBatchSize >= ?MESSAGE_COUNT will +%% always finish before the test is able to cancel the sync. +set_app_sync_batch_size(Node) -> + rabbit_control_helper:command( + eval, Node, + ["application:set_env(rabbit, mirroring_sync_batch_size, 1)."]). |