summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-20 23:20:43 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-20 23:20:43 +0100
commit0ce01488ba4be1e305744ae831f13e5328ab56f5 (patch)
treec729e6c0906082252a3bdebf84a76b6ad12b9074
parent02f10f61c710d1244748fbc64469583193e20925 (diff)
downloadrabbitmq-server-0ce01488ba4be1e305744ae831f13e5328ab56f5.tar.gz
fixed bug documented in preceeding comment
-rw-r--r--src/rabbit_disk_queue.erl9
1 files changed, 5 insertions, 4 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 0dcbb5ae..5829533e 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1002,11 +1002,11 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
{ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q),
ReadSeqId1 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}),
- {atomic, {WriteSeqId1, Q}} =
+ {atomic, {WriteSeqId1, Q, State}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun requeue_message/2, {WriteSeqId, Q},
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State},
MsgSeqIdsZipped)
end),
true = ets:insert(Sequences, {Q, ReadSeqId1, WriteSeqId1,
@@ -1015,7 +1015,7 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
{_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}},
- {ExpectedSeqIdTo, Q}) ->
+ {ExpectedSeqIdTo, Q, State}) ->
SeqIdTo1 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
NextSeqIdTo1 = find_next_seq_id(SeqIdTo1, NextSeqIdTo),
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId,
@@ -1031,7 +1031,8 @@ requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write)
end,
- {NextSeqIdTo1, Q}.
+ decrement_cache(MsgId, State),
+ {NextSeqIdTo1, Q, State}.
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of