diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-04-27 05:12:25 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-04-27 05:12:25 +0100 |
commit | 922615eb75d9b6f52c7eb27d6412c42c7f042c9b (patch) | |
tree | 2818939f5174705a9f9daf930cb5de44fe352c6e | |
parent | b26c4c39b25751f9126e97cce9dd59380912cc67 (diff) | |
download | rabbitmq-server-922615eb75d9b6f52c7eb27d6412c42c7f042c9b.tar.gz |
turn queue recovery upside down
instead of the persister pushing recovered messages to the queues, the
queues pull recovered messages from the persister.
This allows us to perform queue recovery before recording the
existince of the queues in mnesia, and thus prevents access to
potentially uninitialised queues from other cluster nodes.
It also makes the dependency between queues and the persister one way.
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 39 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 122 |
4 files changed, 79 insertions, 111 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f2dce303..17e18e0e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -128,12 +128,8 @@ -rabbit_boot_step({queue_sup_queue_recovery, [{description, "queue supervisor and queue recovery"}, {mfa, {rabbit_amqqueue, start, []}}, - {requires, empty_db_check}]}). - --rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, - [rabbit_persister]}}, - {requires, queue_sup_queue_recovery}]}). + {requires, empty_db_check}, + {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}]}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f6278836..5f045b27 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). @@ -88,7 +88,6 @@ {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). -spec(deliver/2 :: (pid(), delivery()) -> boolean()). --spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). @@ -118,6 +117,9 @@ start() -> DurableQueues = find_durable_queues(), + ok = rabbit_sup:start_child( + rabbit_persister, + [[QName || #amqqueue{name = QName} <- DurableQueues]]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, @@ -137,27 +139,13 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - lists:foldl( - fun (RecoveredQ, Acc) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, - read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> [Q | Acc]; - false -> exit(Q#amqqueue.pid, shutdown), - Acc - end - end, [], DurableQueues). + Qs = [start_queue_process(Q) || Q <- DurableQueues], + %% Issue inits to *all* the queues so that they all init at the same time + [ok = gen_server2:cast(Q#amqqueue.pid, {init, true}) || Q <- Qs], + [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], + rabbit_misc:execute_mnesia_transaction( + fun () -> [ok = store_queue(Q) || Q <- Qs] end), + Qs. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -165,6 +153,8 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), + ok = gen_server2:cast(Q#amqqueue.pid, {init, false}), + ok = gen_server2:call(Q#amqqueue.pid, sync, infinity), internal_declare(Q, true). internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> @@ -279,9 +269,6 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. -redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). - requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 449e79ea..18c98f14 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -92,7 +92,7 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). info_keys() -> ?INFO_KEYS. - + %%---------------------------------------------------------------------------- init(Q) -> @@ -102,11 +102,13 @@ init(Q) -> exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, - message_buffer = queue:new(), + message_buffer = undefined, active_consumers = queue:new(), blocked_consumers = queue:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +terminate(_Reason, #q{message_buffer = undefined}) -> + ok; terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), @@ -541,6 +543,9 @@ i(Item, _) -> %--------------------------------------------------------------------------- +handle_call(sync, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -748,6 +753,15 @@ handle_call({claim_queue, ReaderPid}, _From, reply(locked, State) end. +handle_cast({init, Recover}, State = #q{message_buffer = undefined}) -> + Messages = case Recover of + true -> rabbit_persister:queue_content(qname(State)); + false -> [] + end, + noreply(State#q{message_buffer = queue:from_list(Messages)}); +handle_cast(init, State) -> + noreply(State); + handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), @@ -775,9 +789,6 @@ handle_cast({rollback, Txn, ChPid}, State) -> record_current_channel_tx(ChPid, none), noreply(State); -handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); - handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index a9e0cab9..a8e41baf 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -33,14 +33,14 @@ -behaviour(gen_server). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0]). + force_snapshot/0, queue_content/1]). -include("rabbit.hrl"). @@ -52,8 +52,7 @@ -define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot}). + pending_logs, pending_replies, snapshot}). %% two tables for efficient persistency %% one maps a key to a message @@ -72,20 +71,22 @@ {deliver, pmsg()} | {ack, pmsg()}). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: ([queue_name()]) -> + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). -spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). -spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). +-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(DurableQueues) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), @@ -111,15 +112,18 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). +queue_content(QName) -> + gen_server:call(?SERVER, {queue_content, QName}, infinity). + %%-------------------------------------------------------------------- -init(_Args) -> +init([DurableQueues]) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, []), + queues = ets:new(queues, [ordered_set]), next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, @@ -135,7 +139,8 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot), + {Res, NewSnapshot} = + internal_load_snapshot(LogHandle, DurableQueues, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -143,12 +148,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], + pending_replies = [], + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -158,6 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); +handle_call({queue_content, QName}, _From, + State = #pstate{snapshot = #psnapshot{messages = Messages, + queues = Queues}}) -> + MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), D} || + {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -339,10 +351,10 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues), + prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, {queues, ets:tab2list(Queues)}, @@ -351,20 +363,21 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. -prune_table(Tab, Keys) -> +prune_table(Tab, Pred) -> true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), + ok = prune_table(Tab, Pred, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of +prune_table(_Tab, _Pred, '$end_of_table') -> ok; +prune_table(Tab, Pred, Key) -> + case Pred(Key) of true -> ok; false -> ets:delete(Tab, Key) end, - prune_table(Tab, Keys, ets:next(Tab, Key)). + prune_table(Tab, Pred, ets:next(Tab, Key)). internal_load_snapshot(LogHandle, + DurableQueues, Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), @@ -378,11 +391,18 @@ internal_load_snapshot(LogHandle, Snapshot#psnapshot{ transactions = Ts, next_seq_id = NextSeqId}), - Snapshot2 = requeue_messages(Snapshot1), + %% Remove all entries for queues that no longer exist. + %% Note that the 'messages' table is pruned when the next + %% snapshot is taken. + DurableQueuesSet = sets:from_list(DurableQueues), + prune_table(Snapshot1#psnapshot.queues, + fun ({QName, _PKey}) -> + sets:is_element(QName, DurableQueuesSet) + end), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so %% any uncompleted transactions will have been aborted. - {ok, Snapshot2#psnapshot{transactions = dict:new()}}; + {ok, Snapshot1#psnapshot{transactions = dict:new()}}; {error, Reason} -> {{error, Reason}, Snapshot} end. @@ -394,52 +414,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> check_version(_Other) -> {error, unrecognised_persister_log_format}. -requeue_messages(Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - Work = ets:foldl( - fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> - rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) - end, dict:new(), Queues), - %% unstable parallel map, because order doesn't matter - L = lists:append( - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - requeue(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], - NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - Snapshot. - -requeue(QName, Requeues, Messages) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = - [{SeqId, QName, PKey, Message, Delivered} || - {SeqId, PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)], - rabbit_amqqueue:redeliver( - QPid, - %% Messages published by the same process receive - %% persistence keys that are monotonically - %% increasing. Since message ordering is defined on a - %% per-channel basis, and channels are bound to specific - %% processes, sorting the list does provide the correct - %% ordering properties. - [{Message, Delivered} || {_, _, _, Message, Delivered} <- - lists:sort(RequeueMessages)]), - RequeueMessages; - {error, not_found} -> - [] - end. - replay([], LogHandle, K, Snapshot) -> case disk_log:chunk(LogHandle, K) of {K1, Items} -> |