diff options
author | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-23 15:41:28 +0100 |
---|---|---|
committer | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-24 10:19:33 +0100 |
commit | f5a4d26d638be14a139e72335c0bb65a6ff363c5 (patch) | |
tree | 612fcd1e5b5fc4807e9cdcd0e5c9307f8bac41c3 /test/priority_queue_SUITE.erl | |
parent | 716a43bbd34786b5052b2ca4f2b46e6516958aa0 (diff) | |
download | rabbitmq-server-git-f5a4d26d638be14a139e72335c0bb65a6ff363c5.tar.gz |
Tests for 802 issues and related race conditions
Diffstat (limited to 'test/priority_queue_SUITE.erl')
-rw-r--r-- | test/priority_queue_SUITE.erl | 94 |
1 files changed, 88 insertions, 6 deletions
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 56b44d423e..db5db78155 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -44,7 +44,8 @@ groups() -> requeue, resume, simple_order, - straight_through + straight_through, + invoke ]}, {non_parallel_tests, [], [ recovery %% Restart RabbitMQ. @@ -52,7 +53,9 @@ groups() -> ]}, {cluster_size_3, [], [ {parallel_tests, [parallel], [ - mirror_queue_auto_ack + mirror_queue_auto_ack, + mirror_fast_reset_policy, + mirror_reset_policy ]} ]} ]. @@ -206,6 +209,29 @@ straight_through(Config) -> rabbit_ct_client_helpers:close_channel(Ch), passed. +invoke(Config) -> + %% Synthetic test to check the invoke callback, as the bug tested here + %% is only triggered with a race condition. + %% When mirroring is stopped, the backing queue of rabbit_amqqueue_process + %% changes from rabbit_mirror_queue_master to rabbit_priority_queue, + %% which shouldn't receive any invoke call. However, there might + %% be pending messages so the priority queue receives the + %% `run_backing_queue` cast message sent to the old master. + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + Q = <<"invoke-queue">>, + declare(Ch, Q, 3), + Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + rabbit_ct_broker_helpers:rpc( + Config, A, gen_server, cast, + [Pid, + {run_backing_queue, ?MODULE, fun(_, _) -> ok end}]), + Pid2 = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + Pid = Pid2, + delete(Ch, Q), + rabbit_ct_client_helpers:close_channel(Ch), + passed. + dropwhile_fetchwhile(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, 0), Q = <<"dropwhile_fetchwhile-queue">>, @@ -460,6 +486,47 @@ mirror_queue_sync_order(Config) -> delete(Ch2, Q), passed. + +mirror_reset_policy(Config) -> + %% Gives time to the master to go through all stages. + %% Might eventually trigger some race conditions from #802, + %% although for that I would expect a longer run and higher + %% number of messages in the system. + mirror_reset_policy(Config, 5000). + +mirror_fast_reset_policy(Config) -> + %% This test seems to trigger the bug tested in invoke/1, but it + %% cannot guarantee it will always happen. Thus, both tests + %% should stay in the test suite. + mirror_reset_policy(Config, 5). + + +mirror_reset_policy(Config, Wait) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + Q = <<"mirror_reset_policy-queue">>, + declare(Ch, Q, 5), + Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + publish_many(Ch, Q, 20000), + [begin + rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]), + timer:sleep(Wait), + rabbit_ct_broker_helpers:clear_policy( + Config, A, <<"^mirror_reset_policy-queue$">>), + timer:sleep(Wait) + end || _ <- lists:seq(1, 10)], + timer:sleep(1000), + ok = rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]), + wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2), + %% Verify master has not crashed + Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + delete(Ch, Q), + passed. + %%---------------------------------------------------------------------------- open(Config) -> @@ -564,18 +631,26 @@ priority2bin(Int) -> list_to_binary(integer_to_list(Int)). %%---------------------------------------------------------------------------- wait_for_sync(Config, Nodename, Q) -> - case synced(Config, Nodename, Q) of + wait_for_sync(Config, Nodename, Q, 1). + +wait_for_sync(Config, Nodename, Q, Nodes) -> + wait_for_sync(Config, Nodename, Q, Nodes, 600). + +wait_for_sync(_, _, _, _, 0) -> + throw(sync_timeout); +wait_for_sync(Config, Nodename, Q, Nodes, N) -> + case synced(Config, Nodename, Q, Nodes) of true -> ok; false -> timer:sleep(100), - wait_for_sync(Config, Nodename, Q) + wait_for_sync(Config, Nodename, Q, Nodes, N-1) end. -synced(Config, Nodename, Q) -> +synced(Config, Nodename, Q, Nodes) -> Info = rabbit_ct_broker_helpers:rpc(Config, Nodename, rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]), [SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info, Q =:= Q1], - length(SSPids) =:= 1. + length(SSPids) =:= Nodes. synced_msgs(Config, Nodename, Q, Expected) -> Info = rabbit_ct_broker_helpers:rpc(Config, Nodename, @@ -593,4 +668,11 @@ slave_pids(Config, Nodename, Q) -> Q =:= Q1], SPids. +queue_pid(Config, Nodename, Q) -> + Info = rabbit_ct_broker_helpers:rpc( + Config, Nodename, + rabbit_amqqueue, info_all, [<<"/">>, [name, pid]]), + [Pid] = [P || [{name, Q1}, {pid, P}] <- Info, Q =:= Q1], + Pid. + %%---------------------------------------------------------------------------- |