diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-20 15:45:56 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-20 15:45:56 +0100 |
commit | 59d4fa4e0bb24657f569d4343b3e853c007be18b (patch) | |
tree | 5dea958578bbaa302760207e56859e3f7d3059a0 | |
parent | d4098eb629aebb2866f1f2299098d45b82be1e21 (diff) | |
download | rabbitmq-server-59d4fa4e0bb24657f569d4343b3e853c007be18b.tar.gz |
Use a sequence ID when combining entries into the snapshot. This can then be used to correctly sort the messages and remove the dependence on increasing guids
-rw-r--r-- | src/rabbit_persister.erl | 115 |
1 files changed, 64 insertions, 51 deletions
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 8aa5ad8d..aee8cfd4 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -53,7 +53,7 @@ -define(MAX_WRAP_ENTRIES, 500). --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -64,7 +64,7 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). +-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- @@ -128,7 +128,8 @@ init(_Args) -> Snapshot = #psnapshot{serial = 0, transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, [])}, + queues = ets:new(queues, []), + next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, {head, current_snapshot(Snapshot)}, @@ -153,12 +154,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], pending_replies = [], - snapshot = NewSnapshot}, + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -343,20 +344,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> +current_snapshot(_Snapshot = #psnapshot{serial = Serial, + transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> + fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), InnerSnapshot = {{serial, Serial}, {txns, Ts}, {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. @@ -380,14 +383,15 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), + {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ serial = Serial, - transactions = Ts}), + transactions = Ts, + next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so @@ -407,8 +411,8 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> Work = ets:foldl( - fun ({{QName, PKey}, Delivered}, Acc) -> - rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc) + fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> + rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( @@ -420,7 +424,7 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, requeue(QName, Requeues, Messages) end, dict:to_list(Work))), NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], + NewQueues = [{{Q, K}, D, S} || {{S, Q, K}, _M, D} <- L], ets:delete_all_objects(Messages), ets:delete_all_objects(Queues), true = ets:insert(Messages, NewMessages), @@ -432,8 +436,8 @@ requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + [{{SeqId, QName, PKey}, Message, Delivered} || + {SeqId, PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -477,39 +481,48 @@ internal_integrate1({rollback_transaction, Key}, Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; internal_integrate1({commit_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> case dict:find(Key, Transactions) of {ok, MessageLists} -> ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; error -> Snapshot end; internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. |