summaryrefslogtreecommitdiff
path: root/test/priority_queue_SUITE.erl
diff options
context:
space:
mode:
authorDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-23 15:41:28 +0100
committerDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-24 10:19:33 +0100
commitf5a4d26d638be14a139e72335c0bb65a6ff363c5 (patch)
tree612fcd1e5b5fc4807e9cdcd0e5c9307f8bac41c3 /test/priority_queue_SUITE.erl
parent716a43bbd34786b5052b2ca4f2b46e6516958aa0 (diff)
downloadrabbitmq-server-git-f5a4d26d638be14a139e72335c0bb65a6ff363c5.tar.gz
Tests for 802 issues and related race conditions
Diffstat (limited to 'test/priority_queue_SUITE.erl')
-rw-r--r--test/priority_queue_SUITE.erl94
1 files changed, 88 insertions, 6 deletions
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 56b44d423e..db5db78155 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -44,7 +44,8 @@ groups() ->
requeue,
resume,
simple_order,
- straight_through
+ straight_through,
+ invoke
]},
{non_parallel_tests, [], [
recovery %% Restart RabbitMQ.
@@ -52,7 +53,9 @@ groups() ->
]},
{cluster_size_3, [], [
{parallel_tests, [parallel], [
- mirror_queue_auto_ack
+ mirror_queue_auto_ack,
+ mirror_fast_reset_policy,
+ mirror_reset_policy
]}
]}
].
@@ -206,6 +209,29 @@ straight_through(Config) ->
rabbit_ct_client_helpers:close_channel(Ch),
passed.
+invoke(Config) ->
+ %% Synthetic test to check the invoke callback, as the bug tested here
+ %% is only triggered with a race condition.
+ %% When mirroring is stopped, the backing queue of rabbit_amqqueue_process
+ %% changes from rabbit_mirror_queue_master to rabbit_priority_queue,
+ %% which shouldn't receive any invoke call. However, there might
+ %% be pending messages so the priority queue receives the
+ %% `run_backing_queue` cast message sent to the old master.
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q = <<"invoke-queue">>,
+ declare(Ch, Q, 3),
+ Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ rabbit_ct_broker_helpers:rpc(
+ Config, A, gen_server, cast,
+ [Pid,
+ {run_backing_queue, ?MODULE, fun(_, _) -> ok end}]),
+ Pid2 = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ Pid = Pid2,
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
dropwhile_fetchwhile(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Q = <<"dropwhile_fetchwhile-queue">>,
@@ -460,6 +486,47 @@ mirror_queue_sync_order(Config) ->
delete(Ch2, Q),
passed.
+
+mirror_reset_policy(Config) ->
+ %% Gives time to the master to go through all stages.
+ %% Might eventually trigger some race conditions from #802,
+ %% although for that I would expect a longer run and higher
+ %% number of messages in the system.
+ mirror_reset_policy(Config, 5000).
+
+mirror_fast_reset_policy(Config) ->
+ %% This test seems to trigger the bug tested in invoke/1, but it
+ %% cannot guarantee it will always happen. Thus, both tests
+ %% should stay in the test suite.
+ mirror_reset_policy(Config, 5).
+
+
+mirror_reset_policy(Config, Wait) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q = <<"mirror_reset_policy-queue">>,
+ declare(Ch, Q, 5),
+ Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ publish_many(Ch, Q, 20000),
+ [begin
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ timer:sleep(Wait),
+ rabbit_ct_broker_helpers:clear_policy(
+ Config, A, <<"^mirror_reset_policy-queue$">>),
+ timer:sleep(Wait)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(1000),
+ ok = rabbit_ct_broker_helpers:set_ha_policy(
+ Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
+ %% Verify master has not crashed
+ Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ delete(Ch, Q),
+ passed.
+
%%----------------------------------------------------------------------------
open(Config) ->
@@ -564,18 +631,26 @@ priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
%%----------------------------------------------------------------------------
wait_for_sync(Config, Nodename, Q) ->
- case synced(Config, Nodename, Q) of
+ wait_for_sync(Config, Nodename, Q, 1).
+
+wait_for_sync(Config, Nodename, Q, Nodes) ->
+ wait_for_sync(Config, Nodename, Q, Nodes, 600).
+
+wait_for_sync(_, _, _, _, 0) ->
+ throw(sync_timeout);
+wait_for_sync(Config, Nodename, Q, Nodes, N) ->
+ case synced(Config, Nodename, Q, Nodes) of
true -> ok;
false -> timer:sleep(100),
- wait_for_sync(Config, Nodename, Q)
+ wait_for_sync(Config, Nodename, Q, Nodes, N-1)
end.
-synced(Config, Nodename, Q) ->
+synced(Config, Nodename, Q, Nodes) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]),
[SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info,
Q =:= Q1],
- length(SSPids) =:= 1.
+ length(SSPids) =:= Nodes.
synced_msgs(Config, Nodename, Q, Expected) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
@@ -593,4 +668,11 @@ slave_pids(Config, Nodename, Q) ->
Q =:= Q1],
SPids.
+queue_pid(Config, Nodename, Q) ->
+ Info = rabbit_ct_broker_helpers:rpc(
+ Config, Nodename,
+ rabbit_amqqueue, info_all, [<<"/">>, [name, pid]]),
+ [Pid] = [P || [{name, Q1}, {pid, P}] <- Info, Q =:= Q1],
+ Pid.
+
%%----------------------------------------------------------------------------