summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-27 05:12:25 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-27 05:12:25 +0100
commit922615eb75d9b6f52c7eb27d6412c42c7f042c9b (patch)
tree2818939f5174705a9f9daf930cb5de44fe352c6e
parentb26c4c39b25751f9126e97cce9dd59380912cc67 (diff)
downloadrabbitmq-server-922615eb75d9b6f52c7eb27d6412c42c7f042c9b.tar.gz
turn queue recovery upside down
instead of the persister pushing recovered messages to the queues, the queues pull recovered messages from the persister. This allows us to perform queue recovery before recording the existince of the queues in mnesia, and thus prevents access to potentially uninitialised queues from other cluster nodes. It also makes the dependency between queues and the persister one way.
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl39
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_persister.erl122
4 files changed, 79 insertions, 111 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f2dce303..17e18e0e 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -128,12 +128,8 @@
-rabbit_boot_step({queue_sup_queue_recovery,
[{description, "queue supervisor and queue recovery"},
{mfa, {rabbit_amqqueue, start, []}},
- {requires, empty_db_check}]}).
-
--rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child,
- [rabbit_persister]}},
- {requires, queue_sup_queue_recovery}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f6278836..5f045b27 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
@@ -88,7 +88,6 @@
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
--spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
@@ -118,6 +117,9 @@
start() ->
DurableQueues = find_durable_queues(),
+ ok = rabbit_sup:start_child(
+ rabbit_persister,
+ [[QName || #amqqueue{name = QName} <- DurableQueues]]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
@@ -137,27 +139,13 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- lists:foldl(
- fun (RecoveredQ, Acc) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ,
- read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> [Q | Acc];
- false -> exit(Q#amqqueue.pid, shutdown),
- Acc
- end
- end, [], DurableQueues).
+ Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ %% Issue inits to *all* the queues so that they all init at the same time
+ [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs],
+ [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> [ok = store_queue(Q) || Q <- Qs] end),
+ Qs.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -165,6 +153,8 @@ declare(QueueName, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args,
pid = none}),
+ ok = gen_server2:cast(Q#amqqueue.pid, {init, false}),
+ ok = gen_server2:call(Q#amqqueue.pid, sync, infinity),
internal_declare(Q, true).
internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
@@ -279,9 +269,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
-redeliver(QPid, Messages) ->
- gen_server2:cast(QPid, {redeliver, Messages}).
-
requeue(QPid, MsgIds, ChPid) ->
gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 449e79ea..18c98f14 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -92,7 +92,7 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q) ->
@@ -102,11 +102,13 @@ init(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
- message_buffer = queue:new(),
+ message_buffer = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+terminate(_Reason, #q{message_buffer = undefined}) ->
+ ok;
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
@@ -541,6 +543,9 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(sync, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -748,6 +753,15 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
+handle_cast({init, Recover}, State = #q{message_buffer = undefined}) ->
+ Messages = case Recover of
+ true -> rabbit_persister:queue_content(qname(State));
+ false -> []
+ end,
+ noreply(State#q{message_buffer = queue:from_list(Messages)});
+handle_cast(init, State) ->
+ noreply(State);
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
@@ -775,9 +789,6 @@ handle_cast({rollback, Txn, ChPid}, State) ->
record_current_channel_tx(ChPid, none),
noreply(State);
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
-
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index a9e0cab9..a8e41baf 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -33,14 +33,14 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0]).
+ force_snapshot/0, queue_content/1]).
-include("rabbit.hrl").
@@ -52,8 +52,7 @@
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
+ pending_logs, pending_replies, snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
@@ -72,20 +71,22 @@
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 :: ([queue_name()]) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
+-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link(DurableQueues) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
@@ -111,15 +112,18 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
+queue_content(QName) ->
+ gen_server:call(?SERVER, {queue_content, QName}, infinity).
+
%%--------------------------------------------------------------------
-init(_Args) ->
+init([DurableQueues]) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, []),
+ queues = ets:new(queues, [ordered_set]),
next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
@@ -135,7 +139,8 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
+ {Res, NewSnapshot} =
+ internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -143,12 +148,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
+ pending_replies = [],
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
+handle_call({queue_content, QName}, _From,
+ State = #pstate{snapshot = #psnapshot{messages = Messages,
+ queues = Queues}}) ->
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), D} ||
+ {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues),
+ prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
{queues, ets:tab2list(Queues)},
@@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
-prune_table(Tab, Keys) ->
+prune_table(Tab, Pred) ->
true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
+ ok = prune_table(Tab, Pred, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
+prune_table(_Tab, _Pred, '$end_of_table') -> ok;
+prune_table(Tab, Pred, Key) ->
+ case Pred(Key) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
+ prune_table(Tab, Pred, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
+ DurableQueues,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
@@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle,
Snapshot#psnapshot{
transactions = Ts,
next_seq_id = NextSeqId}),
- Snapshot2 = requeue_messages(Snapshot1),
+ %% Remove all entries for queues that no longer exist.
+ %% Note that the 'messages' table is pruned when the next
+ %% snapshot is taken.
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ prune_table(Snapshot1#psnapshot.queues,
+ fun ({QName, _PKey}) ->
+ sets:is_element(QName, DurableQueuesSet)
+ end),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
+ {ok, Snapshot1#psnapshot{transactions = dict:new()}};
{error, Reason} -> {{error, Reason}, Snapshot}
end.
@@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
check_version(_Other) ->
{error, unrecognised_persister_log_format}.
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(
- fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
- rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
- end, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
- NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{SeqId, QName, PKey, Message, Delivered} ||
- {SeqId, PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, _, _, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->