diff options
-rw-r--r-- | src/rabbit_disk_queue.erl | 130 |
1 files changed, 56 insertions, 74 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index fe8c433c..75892f68 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -980,11 +980,6 @@ internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, true, State). %% Q is only needed if MnesiaDelete /= false -%% called from ack with MnesiaDelete = true -%% called from tx_commit with MnesiaDelete = txn -%% called from tx_cancel with MnesiaDelete = false -%% called from purge with MnesiaDelete = txn -%% called from delete_queue with MnesiaDelete = txn remove_messages(Q, MsgSeqIds, MnesiaDelete, State = #dqstate { file_summary = FileSummary, current_file_name = CurName @@ -1092,8 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, State = #dqstate { sequences = Sequences }) -> {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - {atomic, {WriteSeqId, State1}} = - mnesia:transaction( + WriteSeqId = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), {ok, WriteSeqId1} = @@ -1107,9 +1102,9 @@ internal_do_tx_commit({Q, PubMsgIds, AckSeqIds, From}, }, write), SeqId + 1} end, {ok, InitWriteSeqId}, PubMsgIds), - {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), - {WriteSeqId1, State2} + WriteSeqId1 end), + {ok, State1} = remove_messages(Q, AckSeqIds, true, State), true = case PubMsgIds of [] -> true; _ -> ets:insert(Sequences, {Q, InitReadSeqId, WriteSeqId}) @@ -1162,17 +1157,18 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> %% as they have no concept of sequence id anyway). {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - {atomic, {WriteSeqId1, Q, State}} = - mnesia:transaction( + {WriteSeqId1, Q, MsgIds} = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State}, + lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []}, MsgSeqIds) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), {ok, State}. -requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) -> +requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), ok = mnesia:write(rabbit_disk_queue, @@ -1181,57 +1177,50 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, State}) -> }, write), ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), - decrement_cache(MsgId, State), - {WriteSeqId + 1, Q, State}. + {WriteSeqId + 1, Q, [MsgId | Acc]}. %% move the next N messages from the front of the queue to the back. internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), if N >= (WriteSeqId - ReadSeqId) -> {ok, State}; true -> - {atomic, {ReadSeqIdN, WriteSeqIdN}} = - mnesia:transaction( + {ReadSeqIdN, WriteSeqIdN, MsgIds} = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - requeue_next_messages(Q, State, N, ReadSeqId, WriteSeqId) + requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, []) end ), true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), + lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), {ok, State} end. -requeue_next_messages(_Q, _State, 0, ReadSeq, WriteSeq) -> - {ReadSeq, WriteSeq}; -requeue_next_messages(Q, State, N, ReadSeq, WriteSeq) -> +requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) -> + {ReadSeq, WriteSeq, Acc}; +requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) -> [Obj = #dq_msg_loc { msg_id = MsgId }] = mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write), ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}}, write), ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write), - decrement_cache(MsgId, State), - requeue_next_messages(Q, State, N - 1, ReadSeq + 1, WriteSeq + 1). + requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]). internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case sequence_lookup(Sequences, Q) of {SeqId, SeqId} -> {ok, 0, State}; {ReadSeqId, WriteSeqId} -> - {atomic, {ok, State1}} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - {MsgSeqIds, WriteSeqId} = - rabbit_misc:unfold( - fun (SeqId) when SeqId == WriteSeqId -> false; - (SeqId) -> - [#dq_msg_loc { msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, - {Q, SeqId}, write), - {true, {MsgId, SeqId}, SeqId + 1} - end, ReadSeqId), - remove_messages(Q, MsgSeqIds, txn, State) - end), + {MsgSeqIds, WriteSeqId} = + rabbit_misc:unfold( + fun (SeqId) when SeqId == WriteSeqId -> false; + (SeqId) -> + [#dq_msg_loc { msg_id = MsgId }] = + mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), + {true, {MsgId, SeqId}, SeqId + 1} + end, ReadSeqId), true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), + {ok, State1} = remove_messages(Q, MsgSeqIds, true, State), {ok, WriteSeqId - ReadSeqId, State1} end. @@ -1239,26 +1228,19 @@ internal_delete_queue(Q, State) -> {ok, _Count, State1 = #dqstate { sequences = Sequences }} = internal_purge(Q, State), %% remove everything undelivered true = ets:delete(Sequences, Q), - {atomic, {ok, State2}} = - mnesia:transaction( - fun() -> %% now remove everything already delivered - ok = mnesia:write_lock_table(rabbit_disk_queue), - Objs = - mnesia:match_object( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, '_'}, - msg_id = '_', - is_delivered = '_' - }, - write), - MsgSeqIds = - lists:map( - fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, - msg_id = MsgId }) -> - {MsgId, SeqId} end, Objs), - remove_messages(Q, MsgSeqIds, txn, State1) - end), - {ok, State2}. + %% now remove everything already delivered + Objs = mnesia:dirty_match_object( + rabbit_disk_queue, + #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_' + }), + MsgSeqIds = + lists:map( + fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId }) -> + {MsgId, SeqId} end, Objs), + remove_messages(Q, MsgSeqIds, true, State1). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> @@ -1563,8 +1545,8 @@ load_from_disk(State) -> State1 = load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded - {atomic, State2} = - mnesia:transaction( + State2 = + rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), {State6, FinalQ, MsgSeqIds2, _Len} = @@ -1605,7 +1587,7 @@ load_from_disk(State) -> {ok, State8}. extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - {atomic, true} = mnesia:transaction( + true = rabbit_misc:execute_mnesia_transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), mnesia:foldl( @@ -1624,7 +1606,7 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> end end, true, rabbit_disk_queue) end), - remove_gaps_in_sequences(State), + ok = remove_gaps_in_sequences(State), State. remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> @@ -1637,18 +1619,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> %% we could shuffle downwards. However, I think there's greater %% likelihood of gaps being at the bottom rather than the top of %% the queue, so shuffling up should be the better bet. - {atomic, _} = - mnesia:transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId}) -> - Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), - ReadSeqId1 = ReadSeqId + Gap, - true = ets:insert(Sequences, - {Q, ReadSeqId1, WriteSeqId}) - end, ets:match_object(Sequences, '_')) - end). + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foreach( + fun ({Q, ReadSeqId, WriteSeqId}) -> + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), + ReadSeqId1 = ReadSeqId + Gap, + true = ets:insert(Sequences, + {Q, ReadSeqId1, WriteSeqId}) + end, ets:match_object(Sequences, '_')) + end), + ok. shuffle_up(_Q, SeqId, SeqId, Gap) -> Gap; |