summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-03-23 19:29:58 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-03-23 19:29:58 +0000
commitfd6e0901bbb462a17d9d1e28b9b65cab00a0a265 (patch)
tree64bd57ce42ab1e1f1c0ec5c0ea39dcb75ac831c8
parent75f8cb8c6f20ae59dd2417ebc136d269ff040884 (diff)
downloadrabbitmq-server-git-test-wait-for-confirms.tar.gz
Wait for commits on test suitetest-wait-for-confirms
Don't wait for consensus as the publish could be delayed
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl71
1 files changed, 25 insertions, 46 deletions
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 2a5e04c870..45a889cb20 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -248,30 +248,6 @@ variable_queue_init(Q, Recover) ->
false -> new
end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).
-publish_and_confirm(Q, Payload, Count) ->
- Seqs = lists:seq(1, Count),
- [begin
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2},
- Payload),
- Delivery = #delivery{mandatory = false, sender = self(),
- confirm = true, message = Msg, msg_seq_no = Seq,
- flow = noflow},
- _QPids = rabbit_amqqueue:deliver([Q], Delivery)
- end || Seq <- Seqs],
- wait_for_confirms(gb_sets:from_list(Seqs)).
-
-wait_for_confirms(Unconfirmed) ->
- case gb_sets:is_empty(Unconfirmed) of
- true -> ok;
- false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
- wait_for_confirms(
- rabbit_misc:gb_sets_difference(
- Unconfirmed, gb_sets:from_list(Confirmed)))
- after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
- end
- end.
-
test_amqqueue(Durable) ->
rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable).
@@ -1207,21 +1183,26 @@ max_length_drop_publish_requeue(Config) ->
check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
sync_mirrors(QName, Config),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
{#'basic.get_ok'{delivery_tag = DeliveryTag},
#amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Another message is published
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
- wait_for_consensus(QName, Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
@@ -1244,41 +1225,33 @@ max_length_bytes_drop_publish(Config) ->
check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 2 is dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Messages 2 and 3 are dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- wait_for_consensus(QName, Config),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
-wait_for_consensus(QName, Config) ->
- case lists:keyfind(<<"x-queue-type">>, 1, ?config(queue_args, Config)) of
- {_, _, <<"quorum">>} ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
- RaName = binary_to_atom(<<"%2F_", QName/binary>>, utf8),
- {ok, _, _} = ra:members({RaName, Server});
- _ ->
- ok
- end.
-
check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -1314,17 +1287,23 @@ check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
sync_mirrors(QName, Config),
+
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% Message 1 is replaced by message 2
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -1332,7 +1311,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- wait_for_consensus(QName, Config),
+ amqp_channel:wait_for_confirms(Ch, 5000),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).