summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-06-25 07:03:13 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-06-25 07:03:13 +0100
commit837bbfce10e9ea2fdcb64cab40a9e43a72c5db79 (patch)
tree3ddb19a31a7b39bd7759839a940224e5c6d11976 /src
parentbee6e92332ca8f3de1f7b043e04dd2fa4eea5eab (diff)
downloadrabbitmq-server-837bbfce10e9ea2fdcb64cab40a9e43a72c5db79.tar.gz
fix tests
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl47
1 files changed, 28 insertions, 19 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bc1b00b2..033b65a0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1669,8 +1669,7 @@ test_backing_queue() ->
passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_variable_queue_delete_msg_store_files_callback(),
- %% FIXME: re-enable once fixed
- %% passed = test_queue_recover(),
+ passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
MaxJournal, infinity),
passed;
@@ -2129,6 +2128,29 @@ with_fresh_variable_queue(Fun) ->
_ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
+publish_and_confirm(QPid, Payload, Count) ->
+ Seqs = lists:seq(1, Count),
+ [begin
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2},
+ Payload),
+ Delivery = #delivery{mandatory = false, immediate = false,
+ sender = self(), message = Msg, msg_seq_no = Seq},
+ true = rabbit_amqqueue:deliver(QPid, Delivery)
+ end || Seq <- Seqs],
+ wait_for_confirms(gb_sets:from_list(Seqs)).
+
+wait_for_confirms(Unconfirmed) ->
+ case gb_sets:is_empty(Unconfirmed) of
+ true -> ok;
+ false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
+ wait_for_confirms(
+ gb_sets:difference(Unconfirmed,
+ gb_sets:from_list(Confirmed)))
+ after 1000 -> exit(timeout_waiting_for_confirm)
+ end
+ end.
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,
@@ -2320,14 +2342,8 @@ test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
{new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
- [begin
- Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
- Delivery = #delivery{mandatory = false, immediate = false,
- sender = self(), message = Msg},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
- end || _ <- lists:seq(1, Count)],
- %% FIXME: wait for confirms of all publishes
+ publish_and_confirm(QPid, <<>>, Count),
+
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
receive {'DOWN', MRef, process, QPid, _Info} -> ok
@@ -2356,15 +2372,8 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
- [begin
- Msg = rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = 2}, Payload),
- Delivery = #delivery{mandatory = false, immediate = false,
- sender = self(), message = Msg},
- true = rabbit_amqqueue:deliver(QPid, Delivery)
- end || _ <- lists:seq(1, Count)],
- %% FIXME: wait for confirms of all publishes
+ publish_and_confirm(QPid, Payload, Count),
+
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
CountMinusOne = Count - 1,