diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-24 20:29:32 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-24 20:29:32 +0000 |
commit | fe0851e4573f2abe13c847e2f3c3f36c3f307d39 (patch) | |
tree | d0954723bb417c7cf17251962eff55fe06c6f307 | |
parent | b0a25005e07f58cd04f4c130d490582d3db31dce (diff) | |
parent | f46058f4e138f8c9adf4526569440d17ec6b701e (diff) | |
download | rabbitmq-server-fe0851e4573f2abe13c847e2f3c3f36c3f307d39.tar.gz |
merge bug25827 into default
-rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 16 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 105 | ||||
-rw-r--r-- | src/rabbit_recovery_terms.erl | 121 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 18 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 40 |
9 files changed, 231 insertions, 110 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b8863aca..eeb0e0bf 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -192,13 +192,18 @@ recover() -> on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), - ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), + + %% We rely on BQ:start/1 returning the recovery terms in the same + %% order as the supplied queue names, so that we can zip them together + %% for further processing in recover_durable_queues. + {ok, OrderedRecoveryTerms} = + BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(DurableQueues). + recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -226,10 +231,11 @@ find_durable_queues() -> node(Pid) == Node])) end). -recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], - [Q || Q = #amqqueue{pid = Pid} <- Qs, - gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}]. +recover_durable_queues(QueuesAndRecoveryTerms) -> + Qs = [{start_queue_process(node(), Q), Terms} || + {Q, Terms} <- QueuesAndRecoveryTerms], + [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, + gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2b0882b8..da8c0607 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -173,9 +173,10 @@ code_change(_OldVsn, State, _Extra) -> declare(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of + {Recovery, TermsOrNew} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of #amqqueue{} = Q1 -> - case matches(Recover, Q, Q1) of + case matches(Recovery, Q, Q1) of true -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( @@ -184,8 +185,8 @@ declare(Recover, From, State = #q{q = Q, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q1), - BQS = bq_init(BQ, Q, Recover), - recovery_barrier(Recover), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Recovery), State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), @@ -202,6 +203,9 @@ declare(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. +recovery_status(new) -> {new, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. + matches(new, Q1, Q2) -> %% i.e. not policy Q1#amqqueue.name =:= Q2#amqqueue.name andalso @@ -237,7 +241,7 @@ decorator_callback(QName, F, A) -> bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(Q, Recover =/= new, + BQ:init(Q, Recover, fun (Mod, Fun) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). @@ -865,7 +869,7 @@ handle_call({init, Recover}, From, %% You used to be able to declare an exclusive durable queue. Sadly we %% need to still tidy up after that case, there could be the remnants %% of one left over from an upgrade. So that's why we don't enforce -%% Recover = false here. +%% Recover = new here. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> case rabbit_misc:is_process_alive(Owner) of @@ -876,7 +880,8 @@ handle_call({init, Recover}, From, q = Q} = State, gen_server2:reply(From, {owner_died, Q}), BQ = backing_queue_module(Q), - BQS = bq_init(BQ, Q, Recover), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), %% Rely on terminate to delete the queue. {stop, {shutdown, missing_owner}, State#q{backing_queue = BQ, backing_queue_state = BQS}} diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 61b504bc..2b561900 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -27,7 +27,8 @@ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: ('empty' | {rabbit_types:msg_id(), Ack})). --type(attempt_recovery() :: boolean()). +-type(recovery_terms() :: [term()] | 'non_clean_shutdown'). +-type(recovery_info() :: 'new' | recovery_terms()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). @@ -40,7 +41,10 @@ %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency %% of those queues, or initialise any other shared resources. --callback start([rabbit_amqqueue:name()]) -> 'ok'. +%% +%% The list of queue recovery terms returned as {ok, Terms} must be given +%% in the same order as the list of queue names supplied. +-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). %% Called to tear down any state/resources. NB: Implementations should %% not depend on this function being called on shutdown and instead @@ -51,15 +55,17 @@ %% %% Takes %% 1. the amqqueue record -%% 2. a boolean indicating whether the queue is an existing queue that -%% should be recovered +%% 2. a term indicating whether the queue is an existing queue that +%% should be recovered or not. When 'new' is given, no recovery is +%% taking place, otherwise a list of recovery terms is given, or +%% the atom 'non_clean_shutdown' if no recovery terms are available. %% 3. an asynchronous callback which accepts a function of type %% backing-queue-state to backing-queue-state. This callback %% function can be safely invoked from any process, which makes it %% useful for passing messages back into the backing queue, %% especially as the backing queue does not have control of its own %% mailbox. --callback init(rabbit_types:amqqueue(), attempt_recovery(), +-callback init(rabbit_types:amqqueue(), recovery_info(), async_callback()) -> state(). %% Called on queue shutdown when queue isn't being deleted. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index e2bc3247..b0545915 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -373,7 +373,7 @@ qc_default_exchange() -> qc_variable_queue_init(Q) -> {call, ?BQMOD, init, - [Q, false, function(2, ok)]}. + [Q, new, function(2, {ok, []})]}. qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d562210a..da185dce 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - BQS = bq_init(BQ, Q1, false), + BQS = bq_init(BQ, Q1, []), State = #state { q = Q1, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f69d8355..b5a316f0 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,12 +16,10 @@ -module(rabbit_queue_index). --export([init/2, shutdown_terms/1, recover/5, +-export([init/2, recover/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, - read/3, next_segment_boundary/1, bounds/1, recover/1]). - --export([scan/3]). + read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). -export([add_queue_ttl/0, avoid_zeroes/0]). @@ -196,10 +194,9 @@ -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). --type(shutdown_terms() :: [any()]). +-type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). --spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> {'undefined' | non_neg_integer(), qistate()}). @@ -220,13 +217,7 @@ -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). - --spec(scan/3 :: (file:filename(), - fun ((seq_id(), rabbit_types:msg_id(), - rabbit_types:message_properties(), boolean(), - ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A), - A) -> A). +-spec(start/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). -spec(add_queue_ttl/0 :: () -> 'ok'). @@ -242,26 +233,20 @@ init(Name, OnSyncFun) -> false = rabbit_file:is_file(Dir), %% is_file == is file or dir State #qistate { on_sync = OnSyncFun }. -shutdown_terms(Name) -> - #qistate { dir = Dir } = blank_state(Name), - case read_shutdown_terms(Dir) of - {error, _} -> []; - {ok, Terms1} -> Terms1 - end. - recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> - State = #qistate { dir = Dir } = blank_state(Name), + State = blank_state(Name), State1 = State #qistate { on_sync = OnSyncFun }, - CleanShutdown = detect_clean_shutdown(Dir), + CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), init_clean(RecoveredCounts, State1); false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State) -> - {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), - store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir), +terminate(Terms, State = #qistate { dir = Dir }) -> + {SegmentCounts, State1} = terminate(State), + rabbit_recovery_terms:store(filename:basename(Dir), + [{segments, SegmentCounts} | Terms]), State1. delete_and_terminate(State) -> @@ -357,31 +342,34 @@ bounds(State = #qistate { segments = Segments }) -> end, {LowSeqId, NextSeqId, State}. -recover(DurableQueues) -> - DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || - Queue <- DurableQueues ]), - QueuesDir = queues_dir(), - QueueDirNames = all_queue_directory_names(QueuesDir), - DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), - {DurableQueueNames, DurableTerms} = +start(DurableQueueNames) -> + ok = rabbit_recovery_terms:start(), + {DurableTerms, DurableDirectories} = lists:foldl( - fun (QueueDirName, {DurableAcc, TermsAcc}) -> - QueueDirPath = filename:join(QueuesDir, QueueDirName), - case sets:is_element(QueueDirName, DurableDirectories) of - true -> - TermsAcc1 = - case read_shutdown_terms(QueueDirPath) of - {error, _} -> TermsAcc; - {ok, Terms} -> [Terms | TermsAcc] - end, - {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], - TermsAcc1}; - false -> - ok = rabbit_file:recursive_delete([QueueDirPath]), - {DurableAcc, TermsAcc} - end - end, {[], []}, QueueDirNames), - {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + fun(QName, {RecoveryTerms, ValidDirectories}) -> + DirName = queue_name_to_dir_name(QName), + RecoveryInfo = case rabbit_recovery_terms:read(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + {[RecoveryInfo | RecoveryTerms], + sets:add_element(DirName, ValidDirectories)} + end, {[], sets:new()}, DurableQueueNames), + + %% Any queue directory we've not been asked to recover is considered garbage + QueuesDir = queues_dir(), + [rabbit_file:recursive_delete([QueueDir]) || + QueueDir <- all_queue_directory_names(QueuesDir), + not sets:is_element(filename:basename(QueueDir), DurableDirectories)], + + rabbit_recovery_terms:clear(), + + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +stop() -> rabbit_recovery_terms:stop(). all_queue_directory_names(Dir) -> case rabbit_file:list_dir(Dir) of @@ -410,22 +398,6 @@ blank_state_dir(Dir) -> on_sync = fun (_) -> ok end, unconfirmed = gb_sets:new() }. -clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). - -detect_clean_shutdown(Dir) -> - case rabbit_file:delete(clean_filename(Dir)) of - ok -> true; - {error, enoent} -> false - end. - -read_shutdown_terms(Dir) -> - rabbit_file:read_term_file(clean_filename(Dir)). - -store_clean_shutdown(Terms, Dir) -> - CleanFileName = clean_filename(Dir), - ok = rabbit_file:ensure_dir(CleanFileName), - rabbit_file:write_term_file(CleanFileName, Terms). - init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. @@ -554,9 +526,6 @@ queue_index_walker_reader(QueueName, Gatherer) -> end, ok, State), ok = gatherer:finish(Gatherer). -scan(Dir, Fun, Acc) -> - scan_segments(Fun, Acc, blank_state_dir(Dir)). - scan_segments(Fun, Acc, State) -> State1 = #qistate { segments = Segments, dir = Dir } = recover_journal(State), diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl new file mode 100644 index 00000000..efb94b81 --- /dev/null +++ b/src/rabbit_recovery_terms.erl @@ -0,0 +1,121 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +%% We use a gen_server simply so that during the terminate/2 call +%% (i.e., during shutdown), we can sync/flush the dets table to disk. + +-module(rabbit_recovery_terms). + +-behaviour(gen_server). + +-export([start/0, stop/0, store/2, read/1, clear/0]). + +-export([upgrade_recovery_terms/0, start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-rabbit_upgrade({upgrade_recovery_terms, local, []}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start() -> rabbit_types:ok_or_error(term())). +-spec(stop() -> rabbit_types:ok_or_error(term())). +-spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())). +-spec(read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found)). +-spec(clear() -> 'ok'). + +-endif. % use_specs + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +start() -> rabbit_sup:start_child(?MODULE). + +stop() -> rabbit_sup:stop_child(?MODULE). + +store(DirBaseName, Terms) -> dets:insert(?MODULE, {DirBaseName, Terms}). + +read(DirBaseName) -> + case dets:lookup(?MODULE, DirBaseName) of + [{_, Terms}] -> {ok, Terms}; + _ -> {error, not_found} + end. + +clear() -> + dets:delete_all_objects(?MODULE), + flush(). + +%%---------------------------------------------------------------------------- + +upgrade_recovery_terms() -> + open_table(), + try + QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), + Dirs = case rabbit_file:list_dir(QueuesDir) of + {ok, Entries} -> Entries; + {error, _} -> [] + end, + [begin + File = filename:join([QueuesDir, Dir, "clean.dot"]), + case rabbit_file:read_term_file(File) of + {ok, Terms} -> ok = store(Dir, Terms); + {error, _} -> ok + end, + file:delete(File) + end || Dir <- Dirs], + ok + after + close_table() + end. + +start_link() -> gen_server:start_link(?MODULE, [], []). + +%%---------------------------------------------------------------------------- + +init(_) -> + process_flag(trap_exit, true), + open_table(), + {ok, undefined}. + +handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. + +handle_info(_Info, State) -> {noreply, State}. + +terminate(_Reason, _State) -> + close_table(). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +open_table() -> + File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), + {ok, _} = dets:open_file(?MODULE, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]). + +flush() -> dets:sync(?MODULE). + +close_table() -> + ok = flush(), + ok = dets:close(?MODULE). + diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f267467e..2d6ff73b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2129,11 +2129,10 @@ test_queue() -> init_test_queue() -> TestQueue = test_queue(), - Terms = rabbit_queue_index:shutdown_terms(TestQueue), - PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()), + PRef = rabbit_guid:gen(), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( - TestQueue, Terms, false, + TestQueue, [], false, fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, @@ -2144,12 +2143,12 @@ init_test_queue() -> restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([test_queue()]), + {ok, _} = rabbit_variable_queue:start([test_queue()]), init_test_queue(). empty_test_queue() -> ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([]), + {ok, _} = rabbit_variable_queue:start([]), {0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. @@ -2205,7 +2204,7 @@ test_queue_index_props() -> end), ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([]), + {ok, _} = rabbit_variable_queue:start([]), passed. @@ -2329,13 +2328,16 @@ test_queue_index() -> end), ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([]), + {ok, _} = rabbit_variable_queue:start([]), passed. variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/2, fun nop/2, fun nop/1). + Q, case Recover of + true -> non_clean_shutdown; + false -> new + end, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52..8711d139 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -389,16 +389,20 @@ %%---------------------------------------------------------------------------- start(DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), + {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), start_msg_store( [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, begin Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined end], - StartFunState). + StartFunState), + {ok, AllTerms}. -stop() -> stop_msg_store(). +stop() -> + ok = stop_msg_store(), + ok = rabbit_queue_index:stop(). start_msg_store(Refs, StartFunState) -> ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, @@ -419,7 +423,7 @@ init(Queue, Recover, AsyncCallback) -> end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(#amqqueue { name = QueueName, durable = IsDurable }, false, +init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], @@ -430,29 +434,32 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, true, +init(#amqqueue { name = QueueName, durable = true }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - Terms = rabbit_queue_index:shutdown_terms(QueueName), - {PRef, Terms1} = - case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), []}; - PRef1 -> {PRef1, Terms} - end, + {PRef, RecoveryTerms} = process_recovery_terms(Terms), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( - QueueName, Terms1, + QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, + init(true, IndexState, DeltaCount, RecoveryTerms, PersistentClient, TransientClient). +process_recovery_terms(Terms=non_clean_shutdown) -> + {rabbit_guid:gen(), Terms}; +process_recovery_terms(Terms) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {rabbit_guid:gen(), []}; + PRef -> {PRef, Terms} + end. + terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, @@ -1003,7 +1010,12 @@ init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + DeltaCount1 = + case Terms of + non_clean_shutdown -> DeltaCount; + _ -> proplists:get_value(persistent_count, + Terms, DeltaCount) + end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; false -> d(#delta { start_seq_id = LowSeqId, |