summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-01-17 09:44:36 +0000
committerTim Watson <tim@rabbitmq.com>2013-01-17 09:44:36 +0000
commitc720e3c00fdd00313f0770e23ba2d40875a1fd4c (patch)
tree2d8ba9ec732e740b3f3a0d89a765aaa905267686
parent6b6d3d4ad00c748e1906295b8e6ab0453d5d7727 (diff)
parentb255ad4ce497b0926c776db2a3bcf09e795a30d8 (diff)
downloadrabbitmq-server-c720e3c00fdd00313f0770e23ba2d40875a1fd4c.tar.gz
merge bug25400 into default
-rw-r--r--src/rabbit_tests.erl33
-rw-r--r--src/rabbit_variable_queue.erl20
2 files changed, 24 insertions, 29 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b6969d06..7257827a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2327,36 +2327,23 @@ test_variable_queue() ->
passed.
test_variable_queue_fold(VQ0) ->
- JustOverTwoSegs = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(
- true, 1, JustOverTwoSegs,
- fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
- VQ3 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ2),
- VQ4 = variable_queue_publish(
- true, JustOverTwoSegs + 1, 64,
- fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ3),
- [false = case V of
- {delta, _, 0, _} -> true;
- 0 -> true;
- _ -> false
- end || {K, V} <- rabbit_variable_queue:status(VQ4),
- lists:member(K, [q1, delta, q3])], %% precondition
- Count = JustOverTwoSegs + 64,
+ {Count, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ Msgs = RequeuedMsgs ++ FreshMsgs,
lists:foldl(
- fun (Cut, VQ5) -> test_variable_queue_fold(Cut, Count, VQ5) end,
- VQ4, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+ fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end,
+ VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
-test_variable_queue_fold(Cut, Count, VQ0) ->
+test_variable_queue_fold(Cut, Msgs, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
fun (M, _, A) ->
- case msg2int(M) =< Cut of
- true -> {cont, [M | A]};
+ MInt = msg2int(M),
+ case MInt =< Cut of
+ true -> {cont, [MInt | A]};
false -> {stop, A}
end
end, [], VQ0),
- true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] ==
- [msg2int(M) || M <- Acc],
+ Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs),
+ Expected = lists:reverse(Acc), %% assertion
VQ1.
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index dc32902f..8a7045ea 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1475,7 +1475,9 @@ delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
{cont, {Acc, State}};
delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { index_state = IndexState,
+ #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ index_state = IndexState,
msg_store_clients = MSCState } = State) ->
DeltaSeqId1 = lists:min(
[rabbit_queue_index:next_segment_boundary(DeltaSeqId),
@@ -1483,12 +1485,18 @@ delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
+ lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered},
{Acc0, MSCState0}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent, MsgId),
- {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}}
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA)) of
+ false -> {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent,
+ MsgId),
+ {StopCont, AccNext} =
+ Fun(Msg, MsgProps, Acc0),
+ {StopCont, {AccNext, MSCState1}};
+ true -> {cont, {Acc0, MSCState0}}
+ end
end, {cont, {Acc, MSCState}}, List),
delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
State #vqstate { index_state = IndexState1,