diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-15 13:23:37 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-15 13:23:37 +0000 |
commit | b255ad4ce497b0926c776db2a3bcf09e795a30d8 (patch) | |
tree | 507ba4338c932fceee95671e57657e245730a62d | |
parent | 58ba91b595d7a65c345b94ec38b0b3538e909f35 (diff) | |
download | rabbitmq-server-b255ad4ce497b0926c776db2a3bcf09e795a30d8.tar.gz |
filter out pending acks when folding over deltabug25400
-rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dc32902f..8a7045ea 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1475,7 +1475,9 @@ delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> {cont, {Acc, State}}; delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { index_state = IndexState, + #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + index_state = IndexState, msg_store_clients = MSCState } = State) -> DeltaSeqId1 = lists:min( [rabbit_queue_index:next_segment_boundary(DeltaSeqId), @@ -1483,12 +1485,18 @@ delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, + lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered}, {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}} + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA)) of + false -> {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState0, IsPersistent, + MsgId), + {StopCont, AccNext} = + Fun(Msg, MsgProps, Acc0), + {StopCont, {AccNext, MSCState1}}; + true -> {cont, {Acc0, MSCState0}} + end end, {cont, {Acc, MSCState}}, List), delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, State #vqstate { index_state = IndexState1, |