summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-20 15:45:56 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-20 15:45:56 +0100
commit59d4fa4e0bb24657f569d4343b3e853c007be18b (patch)
tree5dea958578bbaa302760207e56859e3f7d3059a0
parentd4098eb629aebb2866f1f2299098d45b82be1e21 (diff)
downloadrabbitmq-server-59d4fa4e0bb24657f569d4343b3e853c007be18b.tar.gz
Use a sequence ID when combining entries into the snapshot. This can then be used to correctly sort the messages and remove the dependence on increasing guids
-rw-r--r--src/rabbit_persister.erl115
1 files changed, 64 insertions, 51 deletions
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 8aa5ad8d..aee8cfd4 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -53,7 +53,7 @@
-define(MAX_WRAP_ENTRIES, 500).
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +64,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
+-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -128,7 +128,8 @@ init(_Args) ->
Snapshot = #psnapshot{serial = 0,
transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
+ queues = ets:new(queues, []),
+ next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
@@ -153,12 +154,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
pending_replies = [],
- snapshot = NewSnapshot},
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -343,20 +344,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
+current_snapshot(_Snapshot = #psnapshot{serial = Serial,
+ transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
+ fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
InnerSnapshot = {{serial, Serial},
{txns, Ts},
{messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
@@ -380,14 +383,15 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
+ {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
serial = Serial,
- transactions = Ts}),
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
@@ -407,8 +411,8 @@ check_version(_Other) ->
requeue_messages(Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
Work = ets:foldl(
- fun ({{QName, PKey}, Delivered}, Acc) ->
- rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc)
+ fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
+ rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
end, dict:new(), Queues),
%% unstable parallel map, because order doesn't matter
L = lists:append(
@@ -420,7 +424,7 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
requeue(QName, Requeues, Messages)
end, dict:to_list(Work))),
NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
+ NewQueues = [{{Q, K}, D, S} || {{S, Q, K}, _M, D} <- L],
ets:delete_all_objects(Messages),
ets:delete_all_objects(Queues),
true = ets:insert(Messages, NewMessages),
@@ -432,8 +436,8 @@ requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
+ [{{SeqId, QName, PKey}, Message, Delivered} ||
+ {SeqId, PKey, Delivered} <- Requeues,
{_, Message} <- ets:lookup(Messages, PKey)],
rabbit_amqqueue:redeliver(
QPid,
@@ -477,39 +481,48 @@ internal_integrate1({rollback_transaction, Key},
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.