diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-01-17 09:44:36 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-01-17 09:44:36 +0000 |
commit | c720e3c00fdd00313f0770e23ba2d40875a1fd4c (patch) | |
tree | 2d8ba9ec732e740b3f3a0d89a765aaa905267686 | |
parent | 6b6d3d4ad00c748e1906295b8e6ab0453d5d7727 (diff) | |
parent | b255ad4ce497b0926c776db2a3bcf09e795a30d8 (diff) | |
download | rabbitmq-server-c720e3c00fdd00313f0770e23ba2d40875a1fd4c.tar.gz |
merge bug25400 into default
-rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
2 files changed, 24 insertions, 29 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b6969d06..7257827a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2327,36 +2327,23 @@ test_variable_queue() -> passed. test_variable_queue_fold(VQ0) -> - JustOverTwoSegs = rabbit_queue_index:next_segment_boundary(0) * 2 + 64, - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish( - true, 1, JustOverTwoSegs, - fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), - VQ3 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ2), - VQ4 = variable_queue_publish( - true, JustOverTwoSegs + 1, 64, - fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ3), - [false = case V of - {delta, _, 0, _} -> true; - 0 -> true; - _ -> false - end || {K, V} <- rabbit_variable_queue:status(VQ4), - lists:member(K, [q1, delta, q3])], %% precondition - Count = JustOverTwoSegs + 64, + {Count, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + Msgs = RequeuedMsgs ++ FreshMsgs, lists:foldl( - fun (Cut, VQ5) -> test_variable_queue_fold(Cut, Count, VQ5) end, - VQ4, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). + fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end, + VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). -test_variable_queue_fold(Cut, Count, VQ0) -> +test_variable_queue_fold(Cut, Msgs, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( fun (M, _, A) -> - case msg2int(M) =< Cut of - true -> {cont, [M | A]}; + MInt = msg2int(M), + case MInt =< Cut of + true -> {cont, [MInt | A]}; false -> {stop, A} end end, [], VQ0), - true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] == - [msg2int(M) || M <- Acc], + Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs), + Expected = lists:reverse(Acc), %% assertion VQ1. msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dc32902f..8a7045ea 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1475,7 +1475,9 @@ delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> {cont, {Acc, State}}; delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { index_state = IndexState, + #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + index_state = IndexState, msg_store_clients = MSCState } = State) -> DeltaSeqId1 = lists:min( [rabbit_queue_index:next_segment_boundary(DeltaSeqId), @@ -1483,12 +1485,18 @@ delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, + lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered}, {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}} + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA)) of + false -> {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, + MsgId), + {StopCont, AccNext} = + Fun(Msg, MsgProps, Acc0), + {StopCont, {AccNext, MSCState1}}; + true -> {cont, {Acc0, MSCState0}} + end end, {cont, {Acc, MSCState}}, List), delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, State #vqstate { index_state = IndexState1, |