diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-23 17:00:22 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-23 17:00:22 +0100 |
commit | 7eeec412c60abf4172dff808189742e7ec184071 (patch) | |
tree | 8f71281f17076ddee674fb1c232d07f0e2755b7f | |
parent | 45cf652845d63f75d827f204edc70d234ff42a2b (diff) | |
download | rabbitmq-server-7eeec412c60abf4172dff808189742e7ec184071.tar.gz |
Sorted out transactions within the disk_queue, ensuring that if they do restart that other data structures cannot be left partially updated, and can continue successfully, in particular, manipulation of ets tables within mnesia transactions.
-rw-r--r-- | src/rabbit_disk_queue.erl | 166 |
1 files changed, 95 insertions, 71 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 344aff91..835043c3 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1012,8 +1012,6 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, ok = case MnesiaDelete of true -> mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); - txn -> mnesia:delete(rabbit_disk_queue, - {Q, SeqId}, write); _ -> ok end, Files2 @@ -1542,67 +1540,92 @@ load_from_disk(State) -> State1 = load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded - State2 = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - {State6, FinalQ, MsgSeqIds2, _Len} = - mnesia:foldl( - fun (#dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = {Q, SeqId} }, - {State3, OldQ, MsgSeqIds, Len}) -> - {State4, MsgSeqIds1, Len1} = - case {OldQ == Q, MsgSeqIds} of - {true, _} when Len < ?BATCH_SIZE -> - {State3, MsgSeqIds, Len}; - {false, []} -> {State3, MsgSeqIds, Len}; - {_, _} -> - {ok, State5} = - remove_messages(Q, MsgSeqIds, - txn, State3), - {State5, [], 0} - end, - case dets_ets_lookup(State4, MsgId) of - [] -> ok = mnesia:delete(rabbit_disk_queue, - {Q, SeqId}, write), - {State4, Q, MsgSeqIds1, Len1}; - [{MsgId, _RefCount, _File, _Offset, - _TotalSize, true}] -> - {State4, Q, MsgSeqIds1, Len1}; - [{MsgId, _RefCount, _File, _Offset, - _TotalSize, false}] -> - {State4, Q, - [{MsgId, SeqId} | MsgSeqIds1], Len1+1} - end - end, {State1, undefined, [], 0}, rabbit_disk_queue), - {ok, State7} = - remove_messages(FinalQ, MsgSeqIds2, txn, State6), - State7 - end), - State8 = extract_sequence_numbers(State2), + Key = mnesia:dirty_first(rabbit_disk_queue), + {ok, State2} = prune_mnesia(State1, Key, [], [], 0), + State3 = extract_sequence_numbers(State2), ok = del_index(), - {ok, State8}. + {ok, State3}. + +prune_mnesia(State, DeleteAcc, RemoveAcc) -> + ok = lists:foldl(fun (Key, ok) -> + mnesia:dirty_delete(rabbit_disk_queue, Key) + end, ok, DeleteAcc), + {ok, _State1} = lists:foldl( + fun ({Q, MsgSeqIds}, {ok, State2}) -> + remove_messages(Q, MsgSeqIds, true, State2) + end, {ok, State}, RemoveAcc). + +prune_mnesia(State, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) -> + {ok, State}; +prune_mnesia(State, '$end_of_table', DeleteAcc, RemoveAcc, _Len) -> + prune_mnesia(State, DeleteAcc, RemoveAcc); +prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) -> + [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = + mnesia:dirty_read(rabbit_disk_queue, Key), + {AccHeadLst, RemoveAcc1} = + case RemoveAcc of + [] -> {[], []}; + [{Q, Lst} | Acc2] -> {Lst, Acc2}; + [{_OldQ, []} | Acc2] -> {[], Acc2}; + Acc2 -> {[], Acc2} + end, + {DeleteAcc1, AccHeadLst1, Len1} = + case dets_ets_lookup(State, MsgId) of + [] -> + %% msg hasn't been found on disk, delete it + {[{Q, SeqId} | DeleteAcc], AccHeadLst, Len + 1}; + [{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] -> + %% msg is persistent, keep it + {DeleteAcc, AccHeadLst, Len}; + [{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] -> + %% msg is not persistent, delete it + {DeleteAcc, [{MsgId, SeqId} | AccHeadLst], Len + 1} + end, + RemoveAcc2 = [{Q, AccHeadLst1} | RemoveAcc1], + {State1, Key1, DeleteAcc2, RemoveAcc3, Len2} = + if + Len1 >= ?BATCH_SIZE -> + %% We have no way of knowing how flushing the batch + %% will affect ordering of records within the table, + %% so have no choice but to start again. Although this + %% will make recovery slower for large queues, we + %% guarantee we can start up in constant memory + {ok, State2} = prune_mnesia(State, DeleteAcc1, RemoveAcc2), + Key2 = mnesia:dirty_first(rabbit_disk_queue), + {State2, Key2, [], [], 0}; + true -> + Key2 = mnesia:dirty_next(rabbit_disk_queue, Key), + {State, Key2, DeleteAcc1, RemoveAcc2, Len1} + end, + prune_mnesia(State1, Key1, DeleteAcc2, RemoveAcc3, Len2). extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - true = rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> - NextWrite = SeqId + 1, - case ets:lookup(Sequences, Q) of - [] -> ets:insert_new(Sequences, - {Q, SeqId, NextWrite}); - [Orig = {Q, Read, Write}] -> - Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite])}, - case Orig == Repl of - true -> true; - false -> ets:insert(Sequences, Repl) - end - end - end, true, rabbit_disk_queue) - end), + true = + rabbit_misc:execute_mnesia_transaction( + %% the ets manipulation within this transaction is + %% idempotent, in particular we're only reading from mnesia, + %% and combining what we read with what we find in + %% ets. Should the transaction restart, the non-rolledback + %% data in ets can still be successfully combined with what + %% we find in mnesia + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + NextWrite = SeqId + 1, + case ets:lookup(Sequences, Q) of + [] -> ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite])}, + case Orig == Repl of + true -> true; + false -> ets:insert(Sequences, Repl) + end + end + end, true, rabbit_disk_queue) + end), ok = remove_gaps_in_sequences(State), State. @@ -1616,17 +1639,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> %% we could shuffle downwards. However, I think there's greater %% likelihood of gaps being at the bottom rather than the top of %% the queue, so shuffling up should be the better bet. - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId}) -> - Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), - ReadSeqId1 = ReadSeqId + Gap, - true = ets:insert(Sequences, - {Q, ReadSeqId1, WriteSeqId}) - end, ets:match_object(Sequences, '_')) - end), + QueueBoundaries = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun ({Q, ReadSeqId, WriteSeqId}, Acc) -> + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), + [{Q, ReadSeqId + Gap, WriteSeqId} | Acc] + end, [], ets:match_object(Sequences, '_')) + end), + true = lists:foldl(fun (Obj, true) -> ets:insert(Sequences, Obj) end, + true, QueueBoundaries), ok. shuffle_up(_Q, SeqId, SeqId, Gap) -> |