diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-10-11 14:02:44 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-10-11 14:02:44 +0100 |
commit | f528b2da3af823b457a11e86b901eb6b98fd6f9e (patch) | |
tree | 30958f64739468ffb00d5c2c535b858a62a34c64 | |
parent | 0bcf01d6c9dcf28709e5ce668ba11850a4bc720d (diff) | |
download | rabbitmq-server-bug21368.tar.gz |
rollback transactions on queue terminationbug21368
That way we don't leave garbage - transactionally published, but
uncommitted messages - in the message store.
Also, we we can get rid of the pending_commits state wart in
disk_queue. That is possible because both tx commits and queue
deletions are issued by the queue process and tx commits are
synchronous, so there is never a chance of there being a pending commit
when doing a deletion.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_disk_queue.erl | 128 |
2 files changed, 59 insertions, 80 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5789b105..0c334bc3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -118,16 +118,23 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> {ok, start_memory_timer(State), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, State) -> +terminate(_Reason, State = #q{mixed_state = MS}) -> %% FIXME: How do we cancel active subscriptions? State1 = stop_memory_timer(State), + %% Ensure that any persisted tx messages are removed; + %% mixed_queue:delete_queue cannot do that for us since neither + %% mixed_queue nor disk_queue keep a record of uncommitted tx + %% messages. + {ok, MS1} = rabbit_mixed_queue:tx_rollback( + lists:concat([PM || #tx { pending_messages = PM } <- + all_tx_record()]), MS), %% Delete from disk queue first. If we crash at this point, when a %% durable queue, we will be recreated at startup, possibly with %% partial content. The alternative is much worse however - if we %% called internal_delete first, we would then have a race between %% the disk_queue delete and a new queue with the same name being %% created and published to. - {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state), + {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1), ok = rabbit_amqqueue:internal_delete(qname(State1)). code_change(_OldVsn, State, _Extra) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 42c4ed8b..7d44dd9d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -66,10 +66,7 @@ -define(SERVER, ?MODULE). --record(dqstate, - { sequences, %% next read and write for each q - pending_commits %% dict of txns waiting for msg_store - }). +-record(dqstate, { sequences }). %% next read and write for each q %%---------------------------------------------------------------------------- @@ -170,8 +167,8 @@ stop_and_obliterate() -> %% private -finalise_commit(TxId) -> - gen_server2:cast(?SERVER, {finalise_commit, TxId}). +finalise_commit(TxDetails) -> + gen_server2:cast(?SERVER, {finalise_commit, TxDetails}). %%---------------------------------------------------------------------------- %% gen_server behaviour @@ -200,7 +197,7 @@ init([]) -> Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), - State = #dqstate { sequences = Sequences, pending_commits = dict:new() }, + State = #dqstate { sequences = Sequences }, {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -269,8 +266,8 @@ handle_cast({prefetch, Q, From}, State) -> false -> ok end, noreply(State1); -handle_cast({finalise_commit, TxId}, State) -> - noreply(finalise_commit(TxId, State)). +handle_cast({finalise_commit, TxDetails}, State) -> + noreply(finalise_commit(TxDetails, State)). handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -390,54 +387,40 @@ internal_tx_publish(Message = #basic_message { guid = MsgId, MsgId, Message #basic_message { content = ClearedContent }), {ok, State}. -internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, - State = #dqstate { pending_commits = PendingCommits }) -> +internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State) -> + TxDetails = {Q, PubMsgIds, AckSeqIds, From}, ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds], - fun () -> finalise_commit({Q, From}) end), - PendingCommits1 = dict:store(Q, {PubMsgIds, AckSeqIds, From}, - PendingCommits), - State #dqstate { pending_commits = PendingCommits1 }. - -finalise_commit({Q, From}, - State = #dqstate { sequences = Sequences, - pending_commits = PendingCommits }) -> - case dict:find(Q, PendingCommits) of - {ok, {PubMsgIds, AckSeqIds, From}} -> - {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - WriteSeqId = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun ({MsgId, IsDelivered, IsPersistent}, SeqId) -> - ok = mnesia:write( - rabbit_disk_queue, - #dq_msg_loc { - queue_and_seq_id = {Q, SeqId}, - msg_id = MsgId, - is_delivered = IsDelivered, - is_persistent = IsPersistent - }, write), - SeqId + 1 - end, InitWriteSeqId, PubMsgIds) - end), - {ok, State1} = remove_messages(Q, AckSeqIds, State), - true = case PubMsgIds of - [] -> true; - _ -> ets:insert(Sequences, - {Q, InitReadSeqId, WriteSeqId}) - end, - gen_server2:reply(From, ok), - State1 # dqstate { pending_commits = - dict:erase(Q, PendingCommits) }; - {ok, _} -> - %% sync notification for a deleted queue which has since - %% been recreated - State; - error -> - %% sync notification for a deleted queue - State - end. + fun () -> finalise_commit(TxDetails) end), + State. + +finalise_commit({Q, PubMsgIds, AckSeqIds, From}, + State = #dqstate { sequences = Sequences }) -> + {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), + WriteSeqId = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun ({MsgId, IsDelivered, IsPersistent}, SeqId) -> + ok = mnesia:write( + rabbit_disk_queue, + #dq_msg_loc { + queue_and_seq_id = {Q, SeqId}, + msg_id = MsgId, + is_delivered = IsDelivered, + is_persistent = IsPersistent + }, write), + SeqId + 1 + end, InitWriteSeqId, PubMsgIds) + end), + {ok, State1} = remove_messages(Q, AckSeqIds, State), + true = case PubMsgIds of + [] -> true; + _ -> ets:insert(Sequences, + {Q, InitReadSeqId, WriteSeqId}) + end, + gen_server2:reply(From, ok), + State1. internal_publish(Q, Message = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -551,31 +534,20 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> {ok, WriteSeqId - ReadSeqId, State1} end. -internal_delete_queue(Q, - State = #dqstate { pending_commits = PendingCommits }) -> - %% remove pending commits - State1 = case dict:find(Q, PendingCommits) of - {ok, {PubMsgIds, _, _}} -> - ok = rabbit_msg_store:remove( - [MsgId || {MsgId, _, _} <- PubMsgIds]), - State # dqstate { pending_commits = - dict:erase(Q, PendingCommits) }; - error -> - State - end, +internal_delete_queue(Q, State) -> %% remove everything undelivered - {ok, _Count, State2 = #dqstate { sequences = Sequences }} = - internal_purge(Q, State1), + {ok, _Count, State1 = #dqstate { sequences = Sequences }} = + internal_purge(Q, State), true = ets:delete(Sequences, Q), %% remove everything already delivered - Objs = mnesia:dirty_match_object( - rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, '_'}, _ = '_' }), - MsgSeqIds = lists:map(fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, - msg_id = MsgId }) -> - {MsgId, SeqId} - end, Objs), - remove_messages(Q, MsgSeqIds, State2). + remove_messages( + Q, [{MsgId, SeqId} || #dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, + msg_id = MsgId } <- + mnesia:dirty_match_object( + rabbit_disk_queue, + #dq_msg_loc { + queue_and_seq_id = {Q, '_'}, + _ = '_' })], State1). internal_delete_non_durable_queues( DurableQueues, State = #dqstate { sequences = Sequences }) -> |