summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2020-11-03 17:48:50 +0300
committerGitHub <noreply@github.com>2020-11-03 17:48:50 +0300
commit3d2487323d048397a9e89af9acbdf8ff7489af5c (patch)
treeb1c819a6c59fef89cea423b3bb1c78f5d717ac96
parent635a789a362e641aadefaa7df7b470b98d004c83 (diff)
parentef30519c8d956bfb86d364c892426c1724880a50 (diff)
downloadrabbitmq-server-git-3d2487323d048397a9e89af9acbdf8ff7489af5c.tar.gz
Merge pull request #2490 from rabbitmq/lrb-confirm-timeout-is-seconds
amqp_channel:wait_for_confirms timeout is in seconds
-rw-r--r--test/publisher_confirms_parallel_SUITE.erl4
-rw-r--r--test/queue_length_limits_SUITE.erl16
-rw-r--r--test/rabbit_stream_queue_SUITE.erl38
3 files changed, 29 insertions, 29 deletions
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl
index ae755dc8cd..f79fcae3ce 100644
--- a/test/publisher_confirms_parallel_SUITE.erl
+++ b/test/publisher_confirms_parallel_SUITE.erl
@@ -131,7 +131,7 @@ publisher_confirms(Config) ->
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
amqp_channel:unregister_confirm_handler(Ch),
ok.
@@ -143,7 +143,7 @@ publisher_confirms_with_deleted_queue(Config) ->
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, QName, [<<"msg1">>]),
amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
- amqp_channel:wait_for_confirms_or_die(Ch, 5000),
+ amqp_channel:wait_for_confirms_or_die(Ch, 5),
amqp_channel:unregister_confirm_handler(Ch).
%% Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok
diff --git a/test/queue_length_limits_SUITE.erl b/test/queue_length_limits_SUITE.erl
index f9b6f2f368..b86f502869 100644
--- a/test/queue_length_limits_SUITE.erl
+++ b/test/queue_length_limits_SUITE.erl
@@ -260,7 +260,7 @@ check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{delivery_tag = DeliveryTag},
#amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -268,7 +268,7 @@ check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) ->
%% Another message is published
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
@@ -286,7 +286,7 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -294,7 +294,7 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
%% Message 2 is dropped, message 1 stays
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -302,7 +302,7 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3)
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
@@ -348,7 +348,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
%% A single message is published and consumed
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -356,7 +356,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
%% Message 1 is replaced by message 2
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
@@ -365,7 +365,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index 718c2e2c34..fdd1bb9cef 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -465,7 +465,7 @@ publish_confirm(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]).
restart_single_node(Config) ->
@@ -546,7 +546,7 @@ consume(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -574,7 +574,7 @@ consume_offset(Config) ->
amqp_channel:register_confirm_handler(Ch, self()),
Payload = << <<"1">> || _ <- lists:seq(1, 500) >>,
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
run_proper(
fun () ->
@@ -634,7 +634,7 @@ consume_and_nack(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -664,7 +664,7 @@ basic_cancel(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -693,7 +693,7 @@ consume_and_reject(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -722,7 +722,7 @@ consume_and_ack(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
publish(Ch, Q),
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -753,7 +753,7 @@ consume_from_last(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -783,7 +783,7 @@ consume_from_last(Config) ->
%% Publish a few more
[publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
%% Yeah! we got them
receive_batch(Ch1, 100, 199).
@@ -806,7 +806,7 @@ consume_from_next(Config, Args) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
qos(Ch1, 10, false),
@@ -833,7 +833,7 @@ consume_from_next(Config, Args) ->
%% Publish a few more
[publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
%% Yeah! we got them
receive_batch(Ch1, 100, 199).
@@ -850,7 +850,7 @@ consume_from_replica(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch1, 5000),
+ amqp_channel:wait_for_confirms(Ch1, 5),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
qos(Ch2, 10, false),
@@ -877,7 +877,7 @@ consume_credit(Config) ->
%% Let's publish a big batch, to ensure we have more than a chunk available
NumMsgs = 100,
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -937,7 +937,7 @@ consume_credit_out_of_order_ack(Config) ->
%% Let's publish a big batch, to ensure we have more than a chunk available
NumMsgs = 100,
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -998,7 +998,7 @@ consume_credit_multiple_ack(Config) ->
%% Let's publish a big batch, to ensure we have more than a chunk available
NumMsgs = 100,
[publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -1037,7 +1037,7 @@ max_length_bytes(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
%% We don't yet have reliable metrics, as the committed offset doesn't work
%% as a counter once we start applying retention policies.
@@ -1063,13 +1063,13 @@ max_age(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
timer:sleep(10000),
%% Let's publish again so the new segments will trigger the retention policy
[publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:wait_for_confirms(Ch, 5),
timer:sleep(5000),
@@ -1090,7 +1090,7 @@ leader_failover(Config) ->
#'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch1, self()),
[publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)],
- amqp_channel:wait_for_confirms(Ch1, 5000),
+ amqp_channel:wait_for_confirms(Ch1, 5),
check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]),