diff options
-rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dc32902f..8a7045ea 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1475,7 +1475,9 @@ delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> {cont, {Acc, State}}; delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { index_state = IndexState, + #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + index_state = IndexState, msg_store_clients = MSCState } = State) -> DeltaSeqId1 = lists:min( [rabbit_queue_index:next_segment_boundary(DeltaSeqId), @@ -1483,12 +1485,18 @@ delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, + lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered}, {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}} + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA)) of + false -> {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, + MsgId), + {StopCont, AccNext} = + Fun(Msg, MsgProps, Acc0), + {StopCont, {AccNext, MSCState1}}; + true -> {cont, {Acc0, MSCState0}} + end end, {cont, {Acc, MSCState}}, List), delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, State #vqstate { index_state = IndexState1, |