summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl130
1 files changed, 56 insertions, 74 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index fe8c433c..75892f68 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -980,11 +980,6 @@ internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
%% Q is only needed if MnesiaDelete /= false
-%% called from ack with MnesiaDelete = true
-%% called from tx_commit with MnesiaDelete = txn
-%% called from tx_cancel with MnesiaDelete = false
-%% called from purge with MnesiaDelete = txn
-%% called from delete_queue with MnesiaDelete = txn
remove_messages(Q, MsgSeqIds, MnesiaDelete,
State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
@@ -1092,8 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
State = #dqstate { sequences = Sequences }) ->
{InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- {atomic, {WriteSeqId, State1}} =
- mnesia:transaction(
+ WriteSeqId =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
{ok, WriteSeqId1} =
@@ -1107,9 +1102,9 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From},
}, write),
SeqId + 1}
end, {ok, InitWriteSeqId}, PubMsgIds),
- {ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
- {WriteSeqId1, State2}
+ WriteSeqId1
end),
+ {ok, State1} = remove_messages(Q, AckSeqIds, true, State),
true = case PubMsgIds of
[] -> true;
_ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId})
@@ -1162,17 +1157,18 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
%% as they have no concept of sequence id anyway).
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- {atomic, {WriteSeqId1, Q, State}} =
- mnesia:transaction(
+ {WriteSeqId1, Q, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State},
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []},
MsgSeqIds)
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
{ok, State}.
-requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) ->
+requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
ok = mnesia:write(rabbit_disk_queue,
@@ -1181,57 +1177,50 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) ->
},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
- decrement_cache(MsgId, State),
- {WriteSeqId + 1, Q, State}.
+ {WriteSeqId + 1, Q, [MsgId | Acc]}.
%% move the next N messages from the front of the queue to the back.
internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
true ->
- {atomic, {ReadSeqIdN, WriteSeqIdN}} =
- mnesia:transaction(
+ {ReadSeqIdN, WriteSeqIdN, MsgIds} =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId)
+ requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, [])
end
),
true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
+ lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
{ok, State}
end.
-requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) ->
- {ReadSeq, WriteSeq};
-requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) ->
+requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
+ {ReadSeq, WriteSeq, Acc};
+requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) ->
[Obj = #dq_msg_loc { msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write),
- decrement_cache(MsgId, State),
- requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1).
+ requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]).
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case sequence_lookup(Sequences, Q) of
{SeqId, SeqId} -> {ok, 0, State};
{ReadSeqId, WriteSeqId} ->
- {atomic, {ok, State1}} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- {MsgSeqIds, WriteSeqId} =
- rabbit_misc:unfold(
- fun (SeqId) when SeqId == WriteSeqId -> false;
- (SeqId) ->
- [#dq_msg_loc { msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue,
- {Q, SeqId}, write),
- {true, {MsgId, SeqId}, SeqId + 1}
- end, ReadSeqId),
- remove_messages(Q, MsgSeqIds, txn, State)
- end),
+ {MsgSeqIds, WriteSeqId} =
+ rabbit_misc:unfold(
+ fun (SeqId) when SeqId == WriteSeqId -> false;
+ (SeqId) ->
+ [#dq_msg_loc { msg_id = MsgId }] =
+ mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
+ {true, {MsgId, SeqId}, SeqId + 1}
+ end, ReadSeqId),
true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ {ok, State1} = remove_messages(Q, MsgSeqIds, true, State),
{ok, WriteSeqId - ReadSeqId, State1}
end.
@@ -1239,26 +1228,19 @@ internal_delete_queue(Q, State) ->
{ok, _Count, State1 = #dqstate { sequences = Sequences }} =
internal_purge(Q, State), %% remove everything undelivered
true = ets:delete(Sequences, Q),
- {atomic, {ok, State2}} =
- mnesia:transaction(
- fun() -> %% now remove everything already delivered
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- Objs =
- mnesia:match_object(
- rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, '_'},
- msg_id = '_',
- is_delivered = '_'
- },
- write),
- MsgSeqIds =
- lists:map(
- fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId }) ->
- {MsgId, SeqId} end, Objs),
- remove_messages(Q, MsgSeqIds, txn, State1)
- end),
- {ok, State2}.
+ %% now remove everything already delivered
+ Objs = mnesia:dirty_match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_'
+ }),
+ MsgSeqIds =
+ lists:map(
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId} end, Objs),
+ remove_messages(Q, MsgSeqIds, true, State1).
internal_delete_non_durable_queues(
DurableQueues, State = #dqstate { sequences = Sequences }) ->
@@ -1563,8 +1545,8 @@ load_from_disk(State) ->
State1 = load_messages(undefined, Files, State),
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
- {atomic, State2} =
- mnesia:transaction(
+ State2 =
+ rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
{State6, FinalQ, MsgSeqIds2, _Len} =
@@ -1605,7 +1587,7 @@ load_from_disk(State) ->
{ok, State8}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
- {atomic, true} = mnesia:transaction(
+ true = rabbit_misc:execute_mnesia_transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
mnesia:foldl(
@@ -1624,7 +1606,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
end
end, true, rabbit_disk_queue)
end),
- remove_gaps_in_sequences(State),
+ ok = remove_gaps_in_sequences(State),
State.
remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
@@ -1637,18 +1619,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
%% we could shuffle downwards. However, I think there's greater
%% likelihood of gaps being at the bottom rather than the top of
%% the queue, so shuffling up should be the better bet.
- {atomic, _} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId}) ->
- Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
- ReadSeqId1 = ReadSeqId + Gap,
- true = ets:insert(Sequences,
- {Q, ReadSeqId1, WriteSeqId})
- end, ets:match_object(Sequences, '_'))
- end).
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
+ ReadSeqId1 = ReadSeqId + Gap,
+ true = ets:insert(Sequences,
+ {Q, ReadSeqId1, WriteSeqId})
+ end, ets:match_object(Sequences, '_'))
+ end),
+ ok.
shuffle_up(_Q, SeqId, SeqId, Gap) ->
Gap;