diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-04-20 21:56:22 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-04-20 21:56:22 +0100 |
commit | 7e40e50743cb6166cf3e02223585f6b5e0804658 (patch) | |
tree | 2772e88819dffa789ce2e9726fcf52712bedea77 | |
parent | c745a388941cb2050edee2a6be17b6e67a5d4b2a (diff) | |
parent | 4e084ecf06e994046450a98f4162050ce0b5d1cf (diff) | |
download | rabbitmq-server-7e40e50743cb6166cf3e02223585f6b5e0804658.tar.gz |
merge bug22628 into default
-rw-r--r-- | src/rabbit_persister.erl | 119 |
1 files changed, 66 insertions, 53 deletions
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 8f04123e..dd115f1c 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -53,7 +53,7 @@ -define(MAX_WRAP_ENTRIES, 500). --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -64,7 +64,7 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). +-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- @@ -128,7 +128,8 @@ init(_Args) -> Snapshot = #psnapshot{serial = 0, transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, [])}, + queues = ets:new(queues, []), + next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, {head, current_snapshot(Snapshot)}, @@ -153,12 +154,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], pending_replies = [], - snapshot = NewSnapshot}, + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -342,20 +343,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> +current_snapshot(_Snapshot = #psnapshot{serial = Serial, + transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> + fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), InnerSnapshot = {{serial, Serial}, {txns, Ts}, {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. @@ -379,14 +382,15 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), + {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ serial = Serial, - transactions = Ts}), + transactions = Ts, + next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so @@ -406,8 +410,8 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> Work = ets:foldl( - fun ({{QName, PKey}, Delivered}, Acc) -> - rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc) + fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> + rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( @@ -418,8 +422,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, fun ({QName, Requeues}) -> requeue(QName, Requeues, Messages) end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], + NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], + NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], ets:delete_all_objects(Messages), ets:delete_all_objects(Queues), true = ets:insert(Messages, NewMessages), @@ -431,8 +435,8 @@ requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + [{SeqId, QName, PKey, Message, Delivered} || + {SeqId, PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -442,7 +446,7 @@ requeue(QName, Requeues, Messages) -> %% per-channel basis, and channels are bound to specific %% processes, sorting the list does provide the correct %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- + [{Message, Delivered} || {_, _, _, Message, Delivered} <- lists:sort(RequeueMessages)]), RequeueMessages; {error, not_found} -> @@ -476,39 +480,48 @@ internal_integrate1({rollback_transaction, Key}, Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; internal_integrate1({commit_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> case dict:find(Key, Transactions) of {ok, MessageLists} -> ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; error -> Snapshot end; internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. |