summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-26 11:57:18 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-26 11:57:18 +0000
commitbfa4903c93e1bd04cfe8e053d5066e6460883c5c (patch)
treebc1d35e1448802efe6baab6bdcde18dc2f25a907
parentb872b881f803417ed32e3b31a199a237998eceff (diff)
parentbf87506cea0640482af76fcb580229bed2e2c74e (diff)
downloadrabbitmq-server-bfa4903c93e1bd04cfe8e053d5066e6460883c5c.tar.gz
Merged bug25311
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_backing_queue.erl15
-rw-r--r--src/rabbit_backing_queue_qc.erl29
-rw-r--r--src/rabbit_mirror_queue_master.erl38
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_tests.erl45
-rw-r--r--src/rabbit_variable_queue.erl58
7 files changed, 140 insertions, 61 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a3e4ca31..5ddafba8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -108,7 +108,7 @@
owner_pid
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -484,11 +484,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
- fetch(AckRequired, State),
+ {Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -1062,8 +1061,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case fetch(AckRequired, drop_expired_messages(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
ChAckTags1 = sets:add_element(AckTag, ChAckTags),
@@ -1072,7 +1071,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 329144f9..9e99ca5e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,13 +26,9 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
- ('empty' |
- %% MessageId, AckTag, Remaining_Len
- {rabbit_types:msg_id(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
@@ -159,6 +155,11 @@
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(), A) -> A), A, state())
+ -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -220,7 +221,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 3168ca5c..03808859 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -106,7 +106,8 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
- {1, qc_purge(S)}]).
+ {1, qc_purge(S)},
+ {1, qc_fold(S)}]).
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
@@ -157,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) ->
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
+qc_fold(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fold, [fun foldfun/2, foldacc(), BQ]}.
+
%% Preconditions
%% Create long queues by only allowing publishing
@@ -271,19 +275,23 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
+
+next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1}.
%% Postconditions
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -291,14 +299,14 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgIdFetched, AckTag, RemainingLen}, _BQ} ->
+ {{MsgIdFetched, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgId = eval({call, erlang, element,
[?RECORD_INDEX(id, basic_message), Msg]}),
MsgIdFetched =:= MsgId andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -321,6 +329,12 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
+postcondition(S, {call, ?BQMOD, fold, _Args}, {Res, _BQ}) ->
+ #state{messages = Messages} = S,
+ lists:foldl(fun ({_SeqId, {_MsgProps, Msg}}, Acc) ->
+ foldfun(Msg, Acc)
+ end, foldacc(), gb_trees:to_list(Messages)) =:= Res;
+
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -379,6 +393,9 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
+foldfun(Msg, Acc) -> [Msg | Acc].
+foldacc() -> [].
+
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 39060c09..8fcd1893 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -19,7 +19,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
@@ -268,8 +268,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
+fetch(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = SetDelivered }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
@@ -277,25 +276,19 @@ fetch(AckRequired, State = #state { gm = GM,
case Result of
empty ->
{Result, State1};
- {Message, IsDelivered, AckTag, Remaining} ->
- ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- {{Message, IsDelivered1, AckTag, Remaining},
+ {Message, IsDelivered, AckTag} ->
+ {{Message, IsDelivered orelse SetDelivered > 0, AckTag},
drop(Message#basic_message.id, AckTag, State1)}
end.
-drop(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
+drop(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:drop(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {MsgId, AckTag, Remaining} ->
- ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
- {Result, drop(MsgId, AckTag, State1)}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -321,6 +314,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
@@ -453,8 +451,12 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
+ ack_msg_id = AM,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
State #state { set_delivered = lists:max([0, SetDelivered - 1]),
ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3ad8eb77..cb7a2135 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -727,8 +727,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{MsgId, AckTag, _Remaining}, BQSN1} =
- BQ:drop(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 03500d71..81180ebe 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2216,6 +2216,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
+ variable_queue_publish(IsPersistent, Count, PropFun,
+ fun (_N) -> <<>> end, VQ).
+
+variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2224,7 +2228,8 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
+ end},
+ PayloadFun(N)),
PropFun(N, #message_properties{}), self(), VQN)
end, VQ, lists:seq(1, Count)).
@@ -2232,8 +2237,9 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2304,9 +2310,22 @@ test_variable_queue() ->
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1]],
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_fold/1]],
passed.
+test_variable_queue_fold(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 1,
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(
+ true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {Acc, VQ3} = rabbit_variable_queue:fold(fun (M, A) -> [M | A] end, [], VQ2),
+ true = [term_to_binary(N) || N <- lists:seq(Count, 1, -1)] ==
+ [list_to_binary(lists:reverse(P)) ||
+ #basic_message{ content = #content{ payload_fragments_rev = P}} <-
+ Acc],
+ VQ3.
+
test_variable_queue_requeue(VQ0) ->
Interval = 50,
Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
@@ -2326,7 +2345,7 @@ test_variable_queue_requeue(VQ0) ->
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ {{#basic_message{}, true, AckTag}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
VQb
end, VQ5, lists:reverse(Acks)),
@@ -2366,14 +2385,16 @@ test_drop(VQ0) ->
%% start by sending a messages
VQ1 = variable_queue_publish(false, 1, VQ0),
%% drop message with AckRequired = true
- {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
true = AckTag =/= undefinded,
%% drop again -> empty
{empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
%% requeue
{[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
%% drop message with AckRequired = false
- {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
VQ5.
test_dropwhile(VQ0) ->
@@ -2392,7 +2413,7 @@ test_dropwhile(VQ0) ->
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {{#basic_message{}, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
VQM
end, VQ2, lists:seq(6, Count)),
@@ -2445,7 +2466,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2510,8 +2532,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2558,8 +2580,9 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1aea8a3b..4f2668d9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+ dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
@@ -591,8 +591,8 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
+ {{Msg, _IsDelivered, AckTag}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
{true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
@@ -619,9 +619,9 @@ drop(AckRequired, State) ->
{empty, State1} ->
{empty, a(State1)};
{{value, MsgStatus}, State1} ->
- {{_Msg, _IsDelivered, AckTag, Remaining}, State2} =
+ {{_Msg, _IsDelivered, AckTag}, State2} =
internal_fetch(AckRequired, MsgStatus, State1),
- {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)}
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
ack([], State) ->
@@ -678,6 +678,24 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd },
+ q3 = Q3,
+ q4 = Q4 } = State) ->
+ QFun = fun(MsgStatus, {Acc0, State0}) ->
+ {#msg_status { msg = Msg }, State1 } =
+ read_msg(MsgStatus, false, State0),
+ {Fun(Msg, Acc0), State1}
+ end,
+ {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
+ {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
+ {Acc3, State3} = delta_fold(Fun, Acc2, DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
+ {Acc5, State5} = ?QUEUE:foldl(QFun, {Acc4, State4}, Q1),
+ {Acc5, State5}.
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -1127,14 +1145,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, IsDelivered, AckTag},
State1 #vqstate { ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
- len = Len1,
+ len = Len - 1,
persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
@@ -1352,7 +1369,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
+%% Internal plumbing for requeue and fold
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1421,6 +1438,27 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+delta_fold(_Fun, Acc, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {Acc, State};
+delta_fold(Fun, Acc, DeltaSeqId, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState } = State) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Acc1, MSCState1} =
+ lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent,
+ _IsDelivered}, {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {Fun(Msg, Acc0), MSCState1}
+ end, {Acc, MSCState}, List),
+ delta_fold(Fun, Acc1, DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------