diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-11-26 11:57:18 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-26 11:57:18 +0000 |
commit | bfa4903c93e1bd04cfe8e053d5066e6460883c5c (patch) | |
tree | bc1d35e1448802efe6baab6bdcde18dc2f25a907 | |
parent | b872b881f803417ed32e3b31a199a237998eceff (diff) | |
parent | bf87506cea0640482af76fcb580229bed2e2c74e (diff) | |
download | rabbitmq-server-bfa4903c93e1bd04cfe8e053d5066e6460883c5c.tar.gz |
Merged bug25311
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 29 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 38 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 45 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 58 |
7 files changed, 140 insertions, 61 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a3e4ca31..5ddafba8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -108,7 +108,7 @@ owner_pid ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]). +-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]). %%---------------------------------------------------------------------------- @@ -484,11 +484,10 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, _Remaining}, State1} = - fetch(AckRequired, State), + {Result, State1} = fetch(AckRequired, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State1), - {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. + {Result, BQ:is_empty(BQS), State2}. confirm_messages([], State) -> State; @@ -1062,8 +1061,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, case fetch(AckRequired, drop_expired_messages(State1)) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag, Remaining}, State2} -> - State3 = + {{Message, IsDelivered, AckTag}, State2} -> + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), ChAckTags1 = sets:add_element(AckTag, ChAckTags), @@ -1072,7 +1071,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, Limiter, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 329144f9..9e99ca5e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,13 +26,9 @@ -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: - ('empty' | - %% MessageId, AckTag, Remaining_Len - {rabbit_types:msg_id(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:msg_id(), Ack})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: @@ -159,6 +155,11 @@ %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state()) + -> {A, state()}. + %% How long is my queue? -callback len(state()) -> non_neg_integer(). @@ -220,7 +221,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 3168ca5c..03808859 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -106,7 +106,8 @@ command(S) -> {1, qc_dropwhile(S)}, {1, qc_is_empty(S)}, {1, qc_timeout(S)}, - {1, qc_purge(S)}]). + {1, qc_purge(S)}, + {1, qc_fold(S)}]). qc_publish(#state{bqstate = BQ}) -> {call, ?BQMOD, publish, @@ -157,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) -> qc_purge(#state{bqstate = BQ}) -> {call, ?BQMOD, purge, [BQ]}. +qc_fold(#state{bqstate = BQ}) -> + {call, ?BQMOD, fold, [fun foldfun/2, foldacc(), BQ]}. + %% Preconditions %% Create long queues by only allowing publishing @@ -271,19 +275,23 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}. + S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}; + +next_state(S, Res, {call, ?BQMOD, fold, _Args}) -> + BQ1 = {call, erlang, element, [2, Res]}, + S#state{bqstate = BQ1}. %% Postconditions postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of - {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> + {{MsgFetched, _IsDelivered, AckTag}, _BQ} -> {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgFetched =:= Msg andalso not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso - RemainingLen =:= Len - 1; + Len =/= 0; {empty, _BQ} -> Len =:= 0 end; @@ -291,14 +299,14 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> postcondition(S, {call, ?BQMOD, drop, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of - {{MsgIdFetched, AckTag, RemainingLen}, _BQ} -> + {{MsgIdFetched, AckTag}, _BQ} -> {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgId = eval({call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}), MsgIdFetched =:= MsgId andalso not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso - RemainingLen =:= Len - 1; + Len =/= 0; {empty, _BQ} -> Len =:= 0 end; @@ -321,6 +329,12 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end, ReportedConfirmed); +postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) -> + #state{messages = Messages} = S, + lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) -> + foldfun(Msg, Acc) + end, foldacc(), gb_trees:to_list(Messages)) =:= Res; + postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> ?BQMOD:len(BQ) =:= Len. @@ -379,6 +393,9 @@ rand_choice(List, Selection, N) -> rand_choice(List -- [Picked], [Picked | Selection], N - 1). +foldfun(Msg, Acc) -> [Msg | Acc]. +foldacc() -> []. + dropfun(Props) -> Expiry = eval({call, erlang, element, [?RECORD_INDEX(expiry, message_properties), Props]}), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 39060c09..8fcd1893 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -19,7 +19,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, - requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, + requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, foreach_ack/3]). @@ -268,8 +268,7 @@ drain_confirmed(State = #state { backing_queue = BQ, seen_status = SS1, confirmed = [] }}. -fetch(AckRequired, State = #state { gm = GM, - backing_queue = BQ, +fetch(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS, set_delivered = SetDelivered }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), @@ -277,25 +276,19 @@ fetch(AckRequired, State = #state { gm = GM, case Result of empty -> {Result, State1}; - {Message, IsDelivered, AckTag, Remaining} -> - ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, - {{Message, IsDelivered1, AckTag, Remaining}, + {Message, IsDelivered, AckTag} -> + {{Message, IsDelivered orelse SetDelivered > 0, AckTag}, drop(Message#basic_message.id, AckTag, State1)} end. -drop(AckRequired, State = #state { gm = GM, - backing_queue = BQ, +drop(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:drop(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, - case Result of - empty -> - {Result, State1}; - {MsgId, AckTag, Remaining} -> - ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), - {Result, drop(MsgId, AckTag, State1)} - end. + {Result, case Result of + empty -> State1; + {MsgId, AckTag} -> drop(MsgId, AckTag, State1) + end}. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, @@ -321,6 +314,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +fold(Fun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:fold(Fun, Acc, BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:len(BQS). @@ -453,8 +451,12 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, - ack_msg_id = AM }) -> +drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, + ack_msg_id = AM, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), State #state { set_delivered = lists:max([0, SetDelivered - 1]), ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3ad8eb77..cb7a2135 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -727,8 +727,7 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{MsgId, AckTag, _Remaining}, BQSN1} = - BQ:drop(AckRequired, BQSN), + {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 03500d71..81180ebe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2216,6 +2216,10 @@ variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> + variable_queue_publish(IsPersistent, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> lists:foldl( fun (N, VQN) -> rabbit_variable_queue:publish( @@ -2224,7 +2228,8 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), + end}, + PayloadFun(N)), PropFun(N, #message_properties{}), self(), VQN) end, VQ, lists:seq(1, Count)). @@ -2232,8 +2237,9 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + IsDelivered, AckTagN}, VQM} = rabbit_variable_queue:fetch(true, VQN), + Rem = rabbit_variable_queue:len(VQM), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -2304,9 +2310,22 @@ test_variable_queue() -> fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, - fun test_variable_queue_requeue/1]], + fun test_variable_queue_requeue/1, + fun test_variable_queue_fold/1]], passed. +test_variable_queue_fold(VQ0) -> + Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1, + VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish( + true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2), + true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] == + [list_to_binary(lists:reverse(P)) || + #basic_message{ content = #content{ payload_fragments_rev = P}} <- + Acc], + VQ3. + test_variable_queue_requeue(VQ0) -> Interval = 50, Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, @@ -2326,7 +2345,7 @@ test_variable_queue_requeue(VQ0) -> VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag, _}, VQb} = + {{#basic_message{}, true, AckTag}, VQb} = rabbit_variable_queue:fetch(true, VQa), VQb end, VQ5, lists:reverse(Acks)), @@ -2366,14 +2385,16 @@ test_drop(VQ0) -> %% start by sending a messages VQ1 = variable_queue_publish(false, 1, VQ0), %% drop message with AckRequired = true - {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + true = rabbit_variable_queue:is_empty(VQ2), true = AckTag =/= undefinded, %% drop again -> empty {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), %% requeue {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), %% drop message with AckRequired = false - {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + true = rabbit_variable_queue:is_empty(VQ5), VQ5. test_dropwhile(VQ0) -> @@ -2392,7 +2413,7 @@ test_dropwhile(VQ0) -> %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _, _}, VQM} = + {{#basic_message{}, _, _}, VQM} = rabbit_variable_queue:fetch(false, VQN), VQM end, VQ2, lists:seq(6, Count)), @@ -2445,7 +2466,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + Len = rabbit_variable_queue:len(VQ2), {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). @@ -2510,8 +2532,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = - rabbit_variable_queue:fetch(true, VQ7), + {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), + Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), @@ -2558,8 +2580,9 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, true, _AckTag1}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + CountMinusOne = rabbit_variable_queue:len(VQ2), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), rabbit_amqqueue:internal_delete(QName) end), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1aea8a3b..4f2668d9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,8 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1, - depth/1, set_ram_duration_target/2, ram_duration/1, + dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). @@ -591,8 +591,8 @@ dropwhile(Pred, AckRequired, State, Msgs) -> case {Pred(MsgProps), AckRequired} of {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _, AckTag, _}, State3} = - internal_fetch(true, MsgStatus1, State2), + {{Msg, _IsDelivered, AckTag}, State3} = + internal_fetch(true, MsgStatus1, State2), dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), @@ -619,9 +619,9 @@ drop(AckRequired, State) -> {empty, State1} -> {empty, a(State1)}; {{value, MsgStatus}, State1} -> - {{_Msg, _IsDelivered, AckTag, Remaining}, State2} = + {{_Msg, _IsDelivered, AckTag}, State2} = internal_fetch(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)} + {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} end. ack([], State) -> @@ -678,6 +678,24 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +fold(Fun, Acc, #vqstate { q1 = Q1, + q2 = Q2, + delta = #delta { start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd }, + q3 = Q3, + q4 = Q4 } = State) -> + QFun = fun(MsgStatus, {Acc0, State0}) -> + {#msg_status { msg = Msg }, State1 } = + read_msg(MsgStatus, false, State0), + {Fun(Msg, Acc0), State1} + end, + {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), + {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), + {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2), + {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), + {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1), + {Acc5, State5}. + len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -1127,14 +1145,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag}, State1 #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, - len = Len1, + len = Len - 1, persistent_count = PCount1 }}. purge_betas_and_deltas(LensByStore, @@ -1352,7 +1369,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue +%% Internal plumbing for requeue and fold %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1421,6 +1438,27 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. +delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> + {Acc, State}; +delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd, + #vqstate { index_state = IndexState, + msg_store_clients = MSCState } = State) -> + DeltaSeqId1 = lists:min( + [rabbit_queue_index:next_segment_boundary(DeltaSeqId), + DeltaSeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, + IndexState), + {Acc1, MSCState1} = + lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, + _IsDelivered}, {Acc0, MSCState0}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, MsgId), + {Fun(Msg, Acc0), MSCState1} + end, {Acc, MSCState}, List), + delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd, + State #vqstate { index_state = IndexState1, + msg_store_clients = MSCState1 }). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |