summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-20 21:56:22 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-20 21:56:22 +0100
commit7e40e50743cb6166cf3e02223585f6b5e0804658 (patch)
tree2772e88819dffa789ce2e9726fcf52712bedea77
parentc745a388941cb2050edee2a6be17b6e67a5d4b2a (diff)
parent4e084ecf06e994046450a98f4162050ce0b5d1cf (diff)
downloadrabbitmq-server-7e40e50743cb6166cf3e02223585f6b5e0804658.tar.gz
merge bug22628 into default
-rw-r--r--src/rabbit_persister.erl119
1 files changed, 66 insertions, 53 deletions
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 8f04123e..dd115f1c 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -53,7 +53,7 @@
-define(MAX_WRAP_ENTRIES, 500).
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +64,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
+-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -128,7 +128,8 @@ init(_Args) ->
Snapshot = #psnapshot{serial = 0,
transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
+ queues = ets:new(queues, []),
+ next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
@@ -153,12 +154,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
pending_replies = [],
- snapshot = NewSnapshot},
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -342,20 +343,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
+current_snapshot(_Snapshot = #psnapshot{serial = Serial,
+ transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
+ fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
InnerSnapshot = {{serial, Serial},
{txns, Ts},
{messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
@@ -379,14 +382,15 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
+ {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
serial = Serial,
- transactions = Ts}),
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
@@ -406,8 +410,8 @@ check_version(_Other) ->
requeue_messages(Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
Work = ets:foldl(
- fun ({{QName, PKey}, Delivered}, Acc) ->
- rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc)
+ fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
+ rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
end, dict:new(), Queues),
%% unstable parallel map, because order doesn't matter
L = lists:append(
@@ -418,8 +422,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
fun ({QName, Requeues}) ->
requeue(QName, Requeues, Messages)
end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
+ NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
+ NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
ets:delete_all_objects(Messages),
ets:delete_all_objects(Queues),
true = ets:insert(Messages, NewMessages),
@@ -431,8 +435,8 @@ requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
+ [{SeqId, QName, PKey, Message, Delivered} ||
+ {SeqId, PKey, Delivered} <- Requeues,
{_, Message} <- ets:lookup(Messages, PKey)],
rabbit_amqqueue:redeliver(
QPid,
@@ -442,7 +446,7 @@ requeue(QName, Requeues, Messages) ->
%% per-channel basis, and channels are bound to specific
%% processes, sorting the list does provide the correct
%% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
+ [{Message, Delivered} || {_, _, _, Message, Delivered} <-
lists:sort(RequeueMessages)]),
RequeueMessages;
{error, not_found} ->
@@ -476,39 +480,48 @@ internal_integrate1({rollback_transaction, Key},
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.