diff options
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r-- | src/rabbit_tests.erl | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3ee71a6d..63676fef 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -705,7 +705,6 @@ test_topic_expect_match(X, List) -> Res = rabbit_exchange_type_topic:route( X, #delivery{mandatory = false, immediate = false, - txn = none, sender = self(), message = Message}), ExpectedRes = lists:map( @@ -2084,7 +2083,7 @@ test_queue_index() -> variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1). + Q, Recover, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). @@ -2132,6 +2131,29 @@ with_fresh_variable_queue(Fun) -> _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)), passed. +publish_and_confirm(QPid, Payload, Count) -> + Seqs = lists:seq(1, Count), + [begin + Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = 2}, + Payload), + Delivery = #delivery{mandatory = false, immediate = false, + sender = self(), message = Msg, msg_seq_no = Seq}, + true = rabbit_amqqueue:deliver(QPid, Delivery) + end || Seq <- Seqs], + wait_for_confirms(gb_sets:from_list(Seqs)). + +wait_for_confirms(Unconfirmed) -> + case gb_sets:is_empty(Unconfirmed) of + true -> ok; + false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> + wait_for_confirms( + gb_sets:difference(Unconfirmed, + gb_sets:from_list(Confirmed))) + after 1000 -> exit(timeout_waiting_for_confirm) + end + end. + test_variable_queue() -> [passed = with_fresh_variable_queue(F) || F <- [fun test_variable_queue_dynamic_duration_change/1, @@ -2325,17 +2347,10 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - TxID = rabbit_guid:guid(), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - [begin - Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, <<>>), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - true = rabbit_amqqueue:deliver(QPid, Delivery) - end || _ <- lists:seq(1, Count)], - rabbit_amqqueue:commit_all([QPid], TxID, self()), + publish_and_confirm(QPid, <<>>, Count), + exit(QPid, kill), MRef = erlang:monitor(process, QPid), receive {'DOWN', MRef, process, QPid, _Info} -> ok @@ -2362,18 +2377,10 @@ test_variable_queue_delete_msg_store_files_callback() -> ok = restart_msg_store_empty(), {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), - TxID = rabbit_guid:guid(), Payload = <<0:8388608>>, %% 1MB Count = 30, - [begin - Msg = rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = 2}, Payload), - Delivery = #delivery{mandatory = false, immediate = false, txn = TxID, - sender = self(), message = Msg}, - true = rabbit_amqqueue:deliver(QPid, Delivery) - end || _ <- lists:seq(1, Count)], - rabbit_amqqueue:commit_all([QPid], TxID, self()), + publish_and_confirm(QPid, Payload, Count), + rabbit_amqqueue:set_ram_duration_target(QPid, 0), CountMinusOne = Count - 1, |