summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 12:05:05 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 12:05:05 +0000
commit12eea5e00a656f2dda3888ff089f7c4d47acb40c (patch)
tree82ab6fb66a67644e17cb7048ea83b9064530ccdf
parent62facff84cb5aee19de9385d618fcea3ef622218 (diff)
downloadrabbitmq-server-12eea5e00a656f2dda3888ff089f7c4d47acb40c.tar.gz
amqqueue_process re-publishes with confirms
-rw-r--r--include/rabbit_backing_queue_spec.hrl10
-rw-r--r--src/rabbit_amqqueue_process.erl73
-rw-r--r--src/rabbit_backing_queue.erl11
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl33
-rw-r--r--src/rabbit_tests.erl40
-rw-r--r--src/rabbit_variable_queue.erl35
7 files changed, 121 insertions, 87 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1e870bb7..602f598e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -48,14 +48,14 @@
rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/3 ::
+-spec(dropwhile/4 ::
(fun ((rabbit_types:message_properties()) -> boolean()),
- msg_fun(), state())
- -> state()).
+ msg_fun(), non_neg_integer(), state())
+ -> {non_neg_integer(), state()}).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/3 :: ([ack()], msg_fun(), state()) ->
- {[rabbit_guid:guid()], state()}).
+-spec(ack/4 :: ([ack()], msg_fun(), non_neg_integer(), state()) ->
+ {[rabbit_guid:guid()], non_neg_integer(), state()}).
-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b314ddef..24bebc36 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -50,6 +50,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ publish_seqno,
dlx
}).
@@ -131,6 +132,7 @@ init(Q) ->
expiry_timer_ref = undefined,
ttl = undefined,
dlx = undefined,
+ publish_seqno = 1,
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -153,6 +155,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ publish_seqno = 1,
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -471,9 +474,12 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
- rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ rabbit_misc:gb_trees_foreach(fun confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
+confirm_to_sender(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
never;
should_confirm_message(#delivery{sender = ChPid,
@@ -510,7 +516,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
case Confirm of
- immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ immediately -> confirm_to_sender(ChPid, [MsgSeqNo]);
_ -> ok
end,
case BQ:is_duplicate(Message, BQS) of
@@ -686,13 +692,17 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ,
+ publish_seqno = MsgSeqNo}) ->
Now = now_micros(),
- BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- mk_dead_letter_fun(expired, State),
- BQS),
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
+ {MsgSeqNo1, BQS1} =
+ BQ:dropwhile(
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ mk_dead_letter_fun(expired, State),
+ MsgSeqNo,
+ BQS),
+ ensure_ttl_timer(State#q{backing_queue_state = BQS1,
+ publish_seqno = MsgSeqNo1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -708,35 +718,38 @@ ensure_ttl_timer(State) ->
State.
mk_dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- fun(_MsgLookupFun, BQS) -> BQS end;
+ fun(_MsgLookupFun, MsgSeqNo, BQS) -> {MsgSeqNo, BQS} end;
mk_dead_letter_fun(Reason, State) ->
- fun(MsgLookupFun, BQS) ->
+ fun(MsgLookupFun, MsgSeqNo, BQS) ->
{Msg, BQS1} = MsgLookupFun(BQS),
- dead_letter_msg(Msg, Reason, State),
- BQS1
+ MsgSeqNo1 = dead_letter_msg(Msg, Reason, MsgSeqNo, State),
+ {MsgSeqNo1, BQS1}
end.
maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) ->
State;
maybe_dead_letter_queue(Reason, State = #q{
backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ,
+ publish_seqno = MsgSeqNo}) ->
case BQ:fetch(false, BQS) of
{empty, BQS1} ->
State#q{backing_queue_state = BQS1};
{{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} ->
- dead_letter_msg(Msg, Reason, State),
- maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1})
+ MsgSeqNo1 = dead_letter_msg(Msg, Reason, MsgSeqNo, State),
+ maybe_dead_letter_queue(Reason,
+ State#q{backing_queue_state = BQS1,
+ publish_seqno = MsgSeqNo1})
end.
-dead_letter_msg(Msg, Reason, State = #q{dlx = DLX}) ->
+dead_letter_msg(Msg, Reason, MsgSeqNo, State = #q{dlx = DLX}) ->
rabbit_exchange:lookup_or_die(DLX),
rabbit_basic:publish(
rabbit_basic:delivery(
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- undefined)),
- ok.
+ MsgSeqNo)),
+ MsgSeqNo+1.
make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content},
State) ->
@@ -1096,22 +1109,28 @@ handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- Fun = fun(_, BQS0) -> BQS0 end,
- {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
- State1#q{backing_queue_state = BQS1}
+ backing_queue_state = BQS,
+ publish_seqno = MsgSeqNo}) ->
+ Fun = fun(_, MsgSeqNo1, BQS0) -> {MsgSeqNo1, BQS0} end,
+ {_Guids, MsgSeqNo1, BQS1} =
+ BQ:ack(AckTags, Fun, MsgSeqNo, BQS),
+ State1#q{backing_queue_state = BQS1,
+ publish_seqno = MsgSeqNo1}
end));
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ publish_seqno = MsgSeqNo}) ->
case Requeue of
true -> requeue_and_run(AckTags, State1);
false -> Fun = mk_dead_letter_fun(rejected, State),
- {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS),
- State1#q{backing_queue_state = BQS1}
+ {_Guids, MsgSeqNo1, BQS1} =
+ BQ:ack(AckTags, Fun, MsgSeqNo, BQS),
+ State1#q{backing_queue_state = BQS1,
+ publish_seqno = MsgSeqNo1}
end
end));
@@ -1167,6 +1186,10 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
emit_consumer_created(Ch, CTag, true, AckRequired)
end,
+ noreply(State);
+
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ rabbit_log:info("Got a confirm for ~p~n", [MsgSeqNos]),
noreply(State).
handle_info(maybe_expire, State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 72c00e3d..a80d656d 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -97,8 +97,9 @@ behaviour_info(callbacks) ->
%% Drop messages from the head of the queue while the supplied
%% predicate returns true. A callback function is supplied
%% allowing callers access to messages that are about to be
- %% dropped.
- {dropwhile, 3},
+ %% dropped; the callback may publish messages and requires the
+ %% next message sequence number, which must also be supplied.
+ {dropwhile, 4},
%% Produce the next message.
{fetch, 2},
@@ -106,8 +107,10 @@ behaviour_info(callbacks) ->
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as
%% Acks. A callback function is supplied allowing callers to
- %% access messages that are being acked.
- {ack, 3},
+ %% access messages that are being acked; the callback may publish
+ %% messages and requires the next message sequence number, which
+ %% must also be supplied.
+ {ack, 4},
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9b2fe28c..9df862c6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,7 @@
-behaviour(gen_server2).
-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -85,7 +85,6 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -128,9 +127,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNos) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
rabbit_channel, list_local, []).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 7b844b20..c048d4a8 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/3,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/4,
+ requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/4,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3]).
@@ -172,17 +172,18 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, MsgFun, State = #state{gm = GM,
- backing_queue = BQ,
- set_delivered = SetDelivered,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, MsgFun, MsgSeqNo,
+ State = #state{gm = GM,
+ backing_queue = BQ,
+ set_delivered = SetDelivered,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
+ {MsgSeqNo1, BQS1} = BQ:dropwhile(Pred, MsgFun, MsgSeqNo, BQS),
Dropped = Len - BQ:len(BQS1),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
- State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 }.
+ {MsgSeqNo1, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 }}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -235,18 +236,18 @@ fetch(AckRequired, State = #state { gm = GM,
ack_msg_id = AM1 }}
end.
-ack(AckTags, MsgFun, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- {MsgIds, BQS1} = BQ:ack(AckTags, MsgFun, BQS),
+ack(AckTags, MsgFun, MsgSeqNo, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, MsgSeqNo1, BQS1} = BQ:ack(AckTags, MsgFun, MsgSeqNo, BQS),
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgFun, MsgIds})
end,
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
+ {MsgIds, MsgSeqNo1, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6e5e86c6..d9a36664 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2331,12 +2331,13 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- VQ2 = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end,
- dummy_msg_fun(),
- VQ1),
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end,
+ dummy_msg_fun(),
+ dummy_msgseqno(),
+ VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2350,18 +2351,19 @@ test_dropwhile(VQ0) ->
VQ4.
-dummy_msg_fun() ->
- fun(_Fun, State) -> State end.
+dummy_msg_fun() -> fun(_Fun, MsgSeqNo, State) -> {MsgSeqNo, State} end.
+dummy_msgseqno() -> 1.
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_fun(), VQ2),
+ {_, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_fun(), dummy_msgseqno(), VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(
- fun(_) -> false end, dummy_msg_fun(), VQ5).
+ {_, VQ6} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, dummy_msg_fun(), dummy_msgseqno(), VQ5),
+ VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -2386,7 +2388,9 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, dummy_msg_fun(), VQ8),
+ {_Guids, _, VQ9} =
+ rabbit_variable_queue:ack(AckTags, dummy_msg_fun(),
+ dummy_msgseqno(), VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2396,7 +2400,9 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], dummy_msg_fun(), VQ2),
+ {_Guids, _, VQ3} =
+ rabbit_variable_queue:ack([AckTag], dummy_msg_fun(),
+ dummy_msgseqno(), VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2430,8 +2436,10 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
- dummy_msg_fun(), VQ8),
+ {_Guids, _, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1,
+ dummy_msg_fun(),
+ dummy_msgseqno(),
+ VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6174daa8..34a28afe 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/3, requeue/2, len/1, is_empty/1,
+ dropwhile/4, fetch/2, ack/4, requeue/2, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3,
@@ -581,18 +581,19 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, State) ->
+dropwhile(Pred, MsgFun, MsgSeqNo, State) ->
case queue_out(State) of
{empty, State1} ->
- a(State1);
+ {MsgSeqNo, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true ->
- State2 = MsgFun(read_msg_callback(MsgStatus), State1),
+ {MsgSeqNo1, State2} =
+ MsgFun(read_msg_callback(MsgStatus), MsgSeqNo, State1),
{_, State3} = internal_fetch(false, MsgStatus, State2),
- dropwhile(Pred, MsgFun, State3);
+ dropwhile(Pred, MsgFun, MsgSeqNo1, State3);
false ->
- a(in_r(MsgStatus, State1))
+ {MsgSeqNo, a(in_r(MsgStatus, State1))}
end
end.
@@ -625,28 +626,30 @@ read_msg_callback1(MsgId, IsPersistent,
msg_store_read(MSCState, IsPersistent, MsgId),
{Msg, State #vqstate { msg_store_clients = MSCState1 }}.
-ack([], _Fun, State) ->
- {[], State};
+ack([], _Fun, MsgSeqNo, State) ->
+ {[], MsgSeqNo, State};
-ack(AckTags, Fun, State) ->
+ack(AckTags, MsgFun, MsgSeqNo, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- persistent_count = PCount,
- ack_out_counter = AckOutCount }} =
+ {MsgSeqNo2,
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }}} =
lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate{pending_ack = PA}}) ->
+ fun (SeqId, {Acc, {MsgSeqNo1, State2 = #vqstate{pending_ack = PA}}}) ->
AckEntry = gb_trees:get(SeqId, PA),
{MsgStatus, State3} = remove_pending_ack(SeqId, State2),
{accumulate_ack(MsgStatus, Acc),
- Fun(read_msg_callback(AckEntry), State3)}
- end, {accumulate_ack_init(), State}, AckTags),
+ MsgFun(read_msg_callback(AckEntry), MsgSeqNo1, State3)}
+ end, {accumulate_ack_init(), {MsgSeqNo, State}}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
orddict:new(), MsgIdsByStore)),
{lists:reverse(AllMsgIds),
+ MsgSeqNo2,
a(State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.