diff options
authorMatthew Sackman <>2009-08-23 17:00:22 +0100
committerMatthew Sackman <>2009-08-23 17:00:22 +0100
commit7eeec412c60abf4172dff808189742e7ec184071 (patch)
parent45cf652845d63f75d827f204edc70d234ff42a2b (diff)
Sorted out transactions within the disk_queue, ensuring that if they do restart that other data structures cannot be left partially updated, and can continue successfully, in particular, manipulation of ets tables within mnesia transactions.
1 files changed, 95 insertions, 71 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 344aff91..835043c3 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1012,8 +1012,6 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
ok = case MnesiaDelete of
true -> mnesia:dirty_delete(rabbit_disk_queue,
{Q, SeqId});
- txn -> mnesia:delete(rabbit_disk_queue,
- {Q, SeqId}, write);
_ -> ok
@@ -1542,67 +1540,92 @@ load_from_disk(State) ->
State1 = load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
- State2 =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- {State6, FinalQ, MsgSeqIds2, _Len} =
- mnesia:foldl(
- fun (#dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = {Q, SeqId} },
- {State3, OldQ, MsgSeqIds, Len}) ->
- {State4, MsgSeqIds1, Len1} =
- case {OldQ == Q, MsgSeqIds} of
- {true, _} when Len < ?BATCH_SIZE ->
- {State3, MsgSeqIds, Len};
- {false, []} -> {State3, MsgSeqIds, Len};
- {_, _} ->
- {ok, State5} =
- remove_messages(Q, MsgSeqIds,
- txn, State3),
- {State5, [], 0}
- end,
- case dets_ets_lookup(State4, MsgId) of
- [] -> ok = mnesia:delete(rabbit_disk_queue,
- {Q, SeqId}, write),
- {State4, Q, MsgSeqIds1, Len1};
- [{MsgId, _RefCount, _File, _Offset,
- _TotalSize, true}] ->
- {State4, Q, MsgSeqIds1, Len1};
- [{MsgId, _RefCount, _File, _Offset,
- _TotalSize, false}] ->
- {State4, Q,
- [{MsgId, SeqId} | MsgSeqIds1], Len1+1}
- end
- end, {State1, undefined, [], 0}, rabbit_disk_queue),
- {ok, State7} =
- remove_messages(FinalQ, MsgSeqIds2, txn, State6),
- State7
- end),
- State8 = extract_sequence_numbers(State2),
+ Key = mnesia:dirty_first(rabbit_disk_queue),
+ {ok, State2} = prune_mnesia(State1, Key, [], [], 0),
+ State3 = extract_sequence_numbers(State2),
ok = del_index(),
- {ok, State8}.
+ {ok, State3}.
+prune_mnesia(State, DeleteAcc, RemoveAcc) ->
+ ok = lists:foldl(fun (Key, ok) ->
+ mnesia:dirty_delete(rabbit_disk_queue, Key)
+ end, ok, DeleteAcc),
+ {ok, _State1} = lists:foldl(
+ fun ({Q, MsgSeqIds}, {ok, State2}) ->
+ remove_messages(Q, MsgSeqIds, true, State2)
+ end, {ok, State}, RemoveAcc).
+prune_mnesia(State, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) ->
+ {ok, State};
+prune_mnesia(State, '$end_of_table', DeleteAcc, RemoveAcc, _Len) ->
+ prune_mnesia(State, DeleteAcc, RemoveAcc);
+prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) ->
+ [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
+ mnesia:dirty_read(rabbit_disk_queue, Key),
+ {AccHeadLst, RemoveAcc1} =
+ case RemoveAcc of
+ [] -> {[], []};
+ [{Q, Lst} | Acc2] -> {Lst, Acc2};
+ [{_OldQ, []} | Acc2] -> {[], Acc2};
+ Acc2 -> {[], Acc2}
+ end,
+ {DeleteAcc1, AccHeadLst1, Len1} =
+ case dets_ets_lookup(State, MsgId) of
+ [] ->
+ %% msg hasn't been found on disk, delete it
+ {[{Q, SeqId} | DeleteAcc], AccHeadLst, Len + 1};
+ [{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] ->
+ %% msg is persistent, keep it
+ {DeleteAcc, AccHeadLst, Len};
+ [{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] ->
+ %% msg is not persistent, delete it
+ {DeleteAcc, [{MsgId, SeqId} | AccHeadLst], Len + 1}
+ end,
+ RemoveAcc2 = [{Q, AccHeadLst1} | RemoveAcc1],
+ {State1, Key1, DeleteAcc2, RemoveAcc3, Len2} =
+ if
+ Len1 >= ?BATCH_SIZE ->
+ %% We have no way of knowing how flushing the batch
+ %% will affect ordering of records within the table,
+ %% so have no choice but to start again. Although this
+ %% will make recovery slower for large queues, we
+ %% guarantee we can start up in constant memory
+ {ok, State2} = prune_mnesia(State, DeleteAcc1, RemoveAcc2),
+ Key2 = mnesia:dirty_first(rabbit_disk_queue),
+ {State2, Key2, [], [], 0};
+ true ->
+ Key2 = mnesia:dirty_next(rabbit_disk_queue, Key),
+ {State, Key2, DeleteAcc1, RemoveAcc2, Len1}
+ end,
+ prune_mnesia(State1, Key1, DeleteAcc2, RemoveAcc3, Len2).
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
- true = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
- NextWrite = SeqId + 1,
- case ets:lookup(Sequences, Q) of
- [] -> ets:insert_new(Sequences,
- {Q, SeqId, NextWrite});
- [Orig = {Q, Read, Write}] ->
- Repl = {Q, lists:min([Read, SeqId]),
- lists:max([Write, NextWrite])},
- case Orig == Repl of
- true -> true;
- false -> ets:insert(Sequences, Repl)
- end
- end
- end, true, rabbit_disk_queue)
- end),
+ true =
+ rabbit_misc:execute_mnesia_transaction(
+ %% the ets manipulation within this transaction is
+ %% idempotent, in particular we're only reading from mnesia,
+ %% and combining what we read with what we find in
+ %% ets. Should the transaction restart, the non-rolledback
+ %% data in ets can still be successfully combined with what
+ %% we find in mnesia
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
+ NextWrite = SeqId + 1,
+ case ets:lookup(Sequences, Q) of
+ [] -> ets:insert_new(Sequences,
+ {Q, SeqId, NextWrite});
+ [Orig = {Q, Read, Write}] ->
+ Repl = {Q, lists:min([Read, SeqId]),
+ lists:max([Write, NextWrite])},
+ case Orig == Repl of
+ true -> true;
+ false -> ets:insert(Sequences, Repl)
+ end
+ end
+ end, true, rabbit_disk_queue)
+ end),
ok = remove_gaps_in_sequences(State),
@@ -1616,17 +1639,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
%% we could shuffle downwards. However, I think there's greater
%% likelihood of gaps being at the bottom rather than the top of
%% the queue, so shuffling up should be the better bet.
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId}) ->
- Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
- ReadSeqId1 = ReadSeqId + Gap,
- true = ets:insert(Sequences,
- {Q, ReadSeqId1, WriteSeqId})
- end, ets:match_object(Sequences, '_'))
- end),
+ QueueBoundaries =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(
+ fun ({Q, ReadSeqId, WriteSeqId}, Acc) ->
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
+ [{Q, ReadSeqId + Gap, WriteSeqId} | Acc]
+ end, [], ets:match_object(Sequences, '_'))
+ end),
+ true = lists:foldl(fun (Obj, true) -> ets:insert(Sequences, Obj) end,
+ true, QueueBoundaries),
shuffle_up(_Q, SeqId, SeqId, Gap) ->