summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-19 16:15:41 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-19 16:15:41 +0100
commit56aded725eb8adbb971a5f221969df37f62b53f0 (patch)
tree9d3ce58bc220a86018faa33bf9fb957be8714d50
parent9bf26ff9baa023dedd9192d3f440619f80246465 (diff)
downloadrabbitmq-server-56aded725eb8adbb971a5f221969df37f62b53f0.tar.gz
simplify qi tests
-rw-r--r--src/rabbit_tests.erl137
1 files changed, 73 insertions, 64 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 30b82650..21cb41df 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1541,6 +1541,12 @@ init_test_queue() ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end).
+restart_test_queue(Qi) ->
+ _ = rabbit_queue_index:terminate([], Qi),
+ rabbit_variable_queue:stop_msg_store(),
+ ok = rabbit_variable_queue:start([test_queue()]),
+ init_test_queue().
+
empty_test_queue() ->
rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([]),
@@ -1553,6 +1559,9 @@ init_empty_test_queue() ->
{0, _Terms, Qi} = init_test_queue(),
Qi.
+with_empty_test_queue(Fun) ->
+ rabbit_queue_index:delete_and_terminate(Fun(init_empty_test_queue())).
+
queue_index_publish(SeqIds, Persistent, Qi) ->
Ref = rabbit_guid:guid(),
MsgStore = case Persistent of
@@ -1595,23 +1604,17 @@ test_queue_index() ->
{ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
- _Qi5 = rabbit_queue_index:terminate([], Qi4),
- rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0, as all the msgs were transient
- {0, _Terms1, Qi6} = init_test_queue(),
+ {0, _Terms1, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
{ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsGuidsB)),
- _Qi11 = rabbit_queue_index:terminate([], Qi10),
- rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
- {LenB, _Terms2, Qi12} = init_test_queue(),
+ {LenB, _Terms2, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
@@ -1621,79 +1624,85 @@ test_queue_index() ->
Qi17 = rabbit_queue_index:flush(Qi16),
%% Everything will have gone now because #pubs == #acks
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
- _Qi19 = rabbit_queue_index:terminate([], Qi18),
- rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0 because all persistent msgs have been acked
- {0, _Terms3, Qi20} = init_test_queue(),
+ {0, _Terms3, Qi20} = restart_test_queue(Qi18),
_Qi21 = rabbit_queue_index:delete_and_terminate(Qi20),
+ SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
%% These next bits are just to hit the auto deletion of segment files.
%% First, partials:
%% a) partial pub+del+ack, then move to new segment
- SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
- Qi22 = init_empty_test_queue(),
- {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
- Qi24 = rabbit_queue_index:deliver(SeqIdsC, Qi23),
- Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24),
- Qi26 = rabbit_queue_index:flush(Qi25),
- {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26),
- _Qi28 = rabbit_queue_index:delete_and_terminate(Qi27),
+ with_empty_test_queue(
+ fun (Qi22) ->
+ {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
+ Qi24 = rabbit_queue_index:deliver(SeqIdsC, Qi23),
+ Qi25 = rabbit_queue_index:ack(SeqIdsC, Qi24),
+ Qi26 = rabbit_queue_index:flush(Qi25),
+ {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize],
+ false, Qi26),
+ Qi27
+ end),
%% b) partial pub+del, then move to new segment, then ack all in old segment
- Qi29 = init_empty_test_queue(),
- {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29),
- Qi31 = rabbit_queue_index:deliver(SeqIdsC, Qi30),
- {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
- Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32),
- Qi34 = rabbit_queue_index:flush(Qi33),
- _Qi35 = rabbit_queue_index:delete_and_terminate(Qi34),
+ with_empty_test_queue(
+ fun (Qi29) ->
+ {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC,
+ false, Qi29),
+ Qi31 = rabbit_queue_index:deliver(SeqIdsC, Qi30),
+ {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize],
+ false, Qi31),
+ Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32),
+ rabbit_queue_index:flush(Qi33)
+ end),
%% c) just fill up several segments of all pubs, then +dels, then +acks
- SeqIdsD = lists:seq(0,SegmentSize*4),
- Qi36 = init_empty_test_queue(),
- {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
- Qi38 = rabbit_queue_index:deliver(SeqIdsD, Qi37),
- Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38),
- Qi40 = rabbit_queue_index:flush(Qi39),
- _Qi41 = rabbit_queue_index:delete_and_terminate(Qi40),
+ with_empty_test_queue(
+ fun (Qi36) ->
+ SeqIdsD = lists:seq(0,SegmentSize*4),
+ {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
+ Qi38 = rabbit_queue_index:deliver(SeqIdsD, Qi37),
+ Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38),
+ rabbit_queue_index:flush(Qi39)
+ end),
%% d) get messages in all states to a segment, then flush, then do
%% the same again, don't flush and read. This will hit all
%% possibilities in combining the segment with the journal.
- Qi42 = init_empty_test_queue(),
- {Qi43, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], false, Qi42),
- Qi44 = rabbit_queue_index:deliver([0,1,4], Qi43),
- Qi45 = rabbit_queue_index:ack([0], Qi44),
- Qi46 = rabbit_queue_index:flush(Qi45),
- {Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46),
- Qi48 = rabbit_queue_index:deliver([2,3,5,6], Qi47),
- Qi49 = rabbit_queue_index:ack([1,2,3], Qi48),
- {[], Qi50} = rabbit_queue_index:read(0, 4, Qi49),
- {ReadD, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
- ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]),
- {ReadE, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
- ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]),
- _Qi53 = rabbit_queue_index:delete_and_terminate(Qi52),
+ with_empty_test_queue(
+ fun (Qi42) ->
+ {Qi43, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7],
+ false, Qi42),
+ Qi44 = rabbit_queue_index:deliver([0,1,4], Qi43),
+ Qi45 = rabbit_queue_index:ack([0], Qi44),
+ Qi46 = rabbit_queue_index:flush(Qi45),
+ {Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46),
+ Qi48 = rabbit_queue_index:deliver([2,3,5,6], Qi47),
+ Qi49 = rabbit_queue_index:ack([1,2,3], Qi48),
+ {[], Qi50} = rabbit_queue_index:read(0, 4, Qi49),
+ {ReadD, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
+ ok = verify_read_with_published(true, false, ReadD,
+ [Four, Five, Six]),
+ {ReadE, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
+ ok = verify_read_with_published(false, false, ReadE,
+ [Seven, Eight]),
+ Qi52
+ end),
%% e) as for (d), but use terminate instead of read, which will
%% exercise journal_minus_segment, not segment_plus_journal.
- Qi54 = init_empty_test_queue(),
- {Qi55, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7], true, Qi54),
- Qi56 = rabbit_queue_index:deliver([0,1,4], Qi55),
- Qi57 = rabbit_queue_index:ack([0], Qi56),
- _Qi58 = rabbit_queue_index:terminate([], Qi57),
- rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start([test_queue()]),
- {5, _Terms9, Qi59} = init_test_queue(),
- {Qi60, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi59),
- Qi61 = rabbit_queue_index:deliver([2,3,5,6], Qi60),
- Qi62 = rabbit_queue_index:ack([1,2,3], Qi61),
- _Qi63 = rabbit_queue_index:terminate([], Qi62),
- rabbit_variable_queue:stop_msg_store(),
- ok = rabbit_variable_queue:start([test_queue()]),
- {5, _Terms10, Qi64} = init_test_queue(),
- _Qi65 = rabbit_queue_index:delete_and_terminate(Qi64),
+ with_empty_test_queue(
+ fun (Qi54) ->
+ {Qi55, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7],
+ true, Qi54),
+ Qi56 = rabbit_queue_index:deliver([0,1,4], Qi55),
+ Qi57 = rabbit_queue_index:ack([0], Qi56),
+ {5, _Terms9, Qi59} = restart_test_queue(Qi57),
+ {Qi60, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi59),
+ Qi61 = rabbit_queue_index:deliver([2,3,5,6], Qi60),
+ Qi62 = rabbit_queue_index:ack([1,2,3], Qi61),
+ {5, _Terms10, Qi64} = restart_test_queue(Qi62),
+ Qi64
+ end),
rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([]),