summaryrefslogtreecommitdiff
path: root/src/rabbit_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r--src/rabbit_tests.erl49
1 files changed, 28 insertions, 21 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3ee71a6d..63676fef 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -705,7 +705,6 @@ test_topic_expect_match(X, List) ->
Res = rabbit_exchange_type_topic:route(
X, #delivery{mandatory = false,
immediate = false,
- txn = none,
sender = self(),
message = Message}),
ExpectedRes = lists:map(
@@ -2084,7 +2083,7 @@ test_queue_index() ->
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
- Q, Recover, fun nop/2, fun nop/2, fun nop/2, fun nop/1).
+ Q, Recover, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
@@ -2132,6 +2131,29 @@ with_fresh_variable_queue(Fun) ->
_ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
+publish_and_confirm(QPid, Payload, Count) ->
+ Seqs = lists:seq(1, Count),
+ [begin
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2},
+ Payload),
+ Delivery = #delivery{mandatory = false, immediate = false,
+ sender = self(), message = Msg, msg_seq_no = Seq},
+ true = rabbit_amqqueue:deliver(QPid, Delivery)
+ end || Seq <- Seqs],
+ wait_for_confirms(gb_sets:from_list(Seqs)).
+
+wait_for_confirms(Unconfirmed) ->
+ case gb_sets:is_empty(Unconfirmed) of
+ true -> ok;
+ false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
+ wait_for_confirms(
+ gb_sets:difference(Unconfirmed,
+ gb_sets:from_list(Confirmed)))
+ after 1000 -> exit(timeout_waiting_for_confirm)
+ end
+ end.
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
@@ -2325,17 +2347,10 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
- TxID = rabbit_guid:guid(),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- [begin
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
- Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
- sender = self(), message = Msg},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
- end || _ <- lists:seq(1, Count)],
- rabbit_amqqueue:commit_all([QPid], TxID, self()),
+ publish_and_confirm(QPid, <<>>, Count),
+
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
receive {'DOWN', MRef, process, QPid, _Info} -> ok
@@ -2362,18 +2377,10 @@ test_variable_queue_delete_msg_store_files_callback() ->
ok = restart_msg_store_empty(),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- TxID = rabbit_guid:guid(),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- [begin
- Msg = rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, Payload),
- Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
- sender = self(), message = Msg},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
- end || _ <- lists:seq(1, Count)],
- rabbit_amqqueue:commit_all([QPid], TxID, self()),
+ publish_and_confirm(QPid, Payload, Count),
+
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
CountMinusOne = Count - 1,