diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-20 23:20:43 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-20 23:20:43 +0100 |
commit | 0ce01488ba4be1e305744ae831f13e5328ab56f5 (patch) | |
tree | c729e6c0906082252a3bdebf84a76b6ad12b9074 | |
parent | 02f10f61c710d1244748fbc64469583193e20925 (diff) | |
download | rabbitmq-server-0ce01488ba4be1e305744ae831f13e5328ab56f5.tar.gz |
fixed bug documented in preceeding comment
-rw-r--r-- | src/rabbit_disk_queue.erl | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 0dcbb5ae..5829533e 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1002,11 +1002,11 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), ReadSeqId1 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}), - {atomic, {WriteSeqId1, Q}} = + {atomic, {WriteSeqId1, Q, State}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun requeue_message/2, {WriteSeqId, Q}, + lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State}, MsgSeqIdsZipped) end), true = ets:insert(Sequences, {Q, ReadSeqId1, WriteSeqId1, @@ -1015,7 +1015,7 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}}, - {ExpectedSeqIdTo, Q}) -> + {ExpectedSeqIdTo, Q, State}) -> SeqIdTo1 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), NextSeqIdTo1 = find_next_seq_id(SeqIdTo1, NextSeqIdTo), [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId, @@ -1031,7 +1031,8 @@ requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, write), ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write) end, - {NextSeqIdTo1, Q}. + decrement_cache(MsgId, State), + {NextSeqIdTo1, Q, State}. internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of |