summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-15 13:23:37 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-15 13:23:37 +0000
commitb255ad4ce497b0926c776db2a3bcf09e795a30d8 (patch)
tree507ba4338c932fceee95671e57657e245730a62d
parent58ba91b595d7a65c345b94ec38b0b3538e909f35 (diff)
downloadrabbitmq-server-bug25400.tar.gz
filter out pending acks when folding over deltabug25400
-rw-r--r--src/rabbit_variable_queue.erl20
1 files changed, 14 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index dc32902f..8a7045ea 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1475,7 +1475,9 @@ delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
{cont, {Acc, State}};
delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { index_state = IndexState,
+ #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ index_state = IndexState,
msg_store_clients = MSCState } = State) ->
DeltaSeqId1 = lists:min(
[rabbit_queue_index:next_segment_boundary(DeltaSeqId),
@@ -1483,12 +1485,18 @@ delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
+ lfoldl(fun ({MsgId, SeqId, MsgProps, IsPersistent, _IsDelivered},
{Acc0, MSCState0}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent, MsgId),
- {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}}
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA)) of
+ false -> {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent,
+ MsgId),
+ {StopCont, AccNext} =
+ Fun(Msg, MsgProps, Acc0),
+ {StopCont, {AccNext, MSCState1}};
+ true -> {cont, {Acc0, MSCState0}}
+ end
end, {cont, {Acc, MSCState}}, List),
delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
State #vqstate { index_state = IndexState1,