diff options
author | Michael Klishin <klishinm@vmware.com> | 2020-11-03 17:48:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-03 17:48:50 +0300 |
commit | 3d2487323d048397a9e89af9acbdf8ff7489af5c (patch) | |
tree | b1c819a6c59fef89cea423b3bb1c78f5d717ac96 | |
parent | 635a789a362e641aadefaa7df7b470b98d004c83 (diff) | |
parent | ef30519c8d956bfb86d364c892426c1724880a50 (diff) | |
download | rabbitmq-server-git-3d2487323d048397a9e89af9acbdf8ff7489af5c.tar.gz |
Merge pull request #2490 from rabbitmq/lrb-confirm-timeout-is-seconds
amqp_channel:wait_for_confirms timeout is in seconds
-rw-r--r-- | test/publisher_confirms_parallel_SUITE.erl | 4 | ||||
-rw-r--r-- | test/queue_length_limits_SUITE.erl | 16 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 38 |
3 files changed, 29 insertions, 29 deletions
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl index ae755dc8cd..f79fcae3ce 100644 --- a/test/publisher_confirms_parallel_SUITE.erl +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -131,7 +131,7 @@ publisher_confirms(Config) -> amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), amqp_channel:unregister_confirm_handler(Ch), ok. @@ -143,7 +143,7 @@ publisher_confirms_with_deleted_queue(Config) -> amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, QName, [<<"msg1">>]), amqp_channel:call(Ch, #'queue.delete'{queue = QName}), - amqp_channel:wait_for_confirms_or_die(Ch, 5000), + amqp_channel:wait_for_confirms_or_die(Ch, 5), amqp_channel:unregister_confirm_handler(Ch). %% Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok diff --git a/test/queue_length_limits_SUITE.erl b/test/queue_length_limits_SUITE.erl index f9b6f2f368..b86f502869 100644 --- a/test/queue_length_limits_SUITE.erl +++ b/test/queue_length_limits_SUITE.erl @@ -260,7 +260,7 @@ check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{delivery_tag = DeliveryTag}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -268,7 +268,7 @@ check_max_length_requeue(Config, QName, Ch, Payload1, Payload2) -> %% Another message is published amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, @@ -286,7 +286,7 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -294,7 +294,7 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) %% Message 2 is dropped, message 1 stays amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -302,7 +302,7 @@ check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). @@ -348,7 +348,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), %% A single message is published and consumed amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -356,7 +356,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> %% Message 1 is replaced by message 2 amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), @@ -365,7 +365,7 @@ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}), #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index 718c2e2c34..fdd1bb9cef 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -465,7 +465,7 @@ publish_confirm(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). restart_single_node(Config) -> @@ -546,7 +546,7 @@ consume(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -574,7 +574,7 @@ consume_offset(Config) -> amqp_channel:register_confirm_handler(Ch, self()), Payload = << <<"1">> || _ <- lists:seq(1, 500) >>, [publish(Ch, Q, Payload) || _ <- lists:seq(1, 1000)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), run_proper( fun () -> @@ -634,7 +634,7 @@ consume_and_nack(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -664,7 +664,7 @@ basic_cancel(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -693,7 +693,7 @@ consume_and_reject(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -722,7 +722,7 @@ consume_and_ack(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -753,7 +753,7 @@ consume_from_last(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -783,7 +783,7 @@ consume_from_last(Config) -> %% Publish a few more [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), %% Yeah! we got them receive_batch(Ch1, 100, 199). @@ -806,7 +806,7 @@ consume_from_next(Config, Args) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), @@ -833,7 +833,7 @@ consume_from_next(Config, Args) -> %% Publish a few more [publish(Ch, Q, <<"msg2">>) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), %% Yeah! we got them receive_batch(Ch1, 100, 199). @@ -850,7 +850,7 @@ consume_from_replica(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch1, self()), [publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch1, 5000), + amqp_channel:wait_for_confirms(Ch1, 5), Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), qos(Ch2, 10, false), @@ -877,7 +877,7 @@ consume_credit(Config) -> %% Let's publish a big batch, to ensure we have more than a chunk available NumMsgs = 100, [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -937,7 +937,7 @@ consume_credit_out_of_order_ack(Config) -> %% Let's publish a big batch, to ensure we have more than a chunk available NumMsgs = 100, [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -998,7 +998,7 @@ consume_credit_multiple_ack(Config) -> %% Let's publish a big batch, to ensure we have more than a chunk available NumMsgs = 100, [publish(Ch, Q, <<"msg1">>) || _ <- lists:seq(1, NumMsgs)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -1037,7 +1037,7 @@ max_length_bytes(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), %% We don't yet have reliable metrics, as the committed offset doesn't work %% as a counter once we start applying retention policies. @@ -1063,13 +1063,13 @@ max_age(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), timer:sleep(10000), %% Let's publish again so the new segments will trigger the retention policy [publish(Ch, Q, Payload) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:wait_for_confirms(Ch, 5), timer:sleep(5000), @@ -1090,7 +1090,7 @@ leader_failover(Config) -> #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch1, self()), [publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)], - amqp_channel:wait_for_confirms(Ch1, 5000), + amqp_channel:wait_for_confirms(Ch1, 5), check_leader_and_replicas(Config, Q, Server1, [Server2, Server3]), |