diff options
author | Tim Watson <tim@rabbitmq.com> | 2014-01-15 16:53:09 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2014-01-15 16:53:09 +0000 |
commit | af00f09c66d5db546e3e59adafdf86ba5230ec1d (patch) | |
tree | 461389bee1d9acdfedc1b2d7086007ddd86677fd | |
parent | ebc2aa448dcf2825508e1a8a101844e133b88e5d (diff) | |
parent | d9506fe01ba41065df37e4f95de550918e08c2c8 (diff) | |
download | rabbitmq-server-af00f09c66d5db546e3e59adafdf86ba5230ec1d.tar.gz |
merge default into bug25827
-rw-r--r-- | src/rabbit_amqqueue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 18 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 102 | ||||
-rw-r--r-- | src/rabbit_recovery_terms.erl | 120 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 18 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 34 |
9 files changed, 226 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6b1e00b7..55b98a0c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,13 +195,18 @@ recover() -> on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), - ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), + + %% We rely on BQ:start/1 returning the recovery terms in the same + %% order as the supplied queue names, so that we can zip them together + %% for further processing in recover_durable_queues. + {ok, OrderedRecoveryTerms} = + BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(DurableQueues). + recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -229,10 +234,11 @@ find_durable_queues() -> node(Pid) == Node])) end). -recover_durable_queues(DurableQueues) -> - Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], - [Q || Q = #amqqueue{pid = Pid} <- Qs, - gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}]. +recover_durable_queues(QueuesAndRecoveryTerms) -> + Qs = [{start_queue_process(node(), Q), Terms} || + {Q, Terms} <- QueuesAndRecoveryTerms], + [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, + gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08509c96..51032fc7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -173,9 +173,10 @@ code_change(_OldVsn, State, _Extra) -> declare(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of + {Recovery, TermsOrNew} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of #amqqueue{} = Q1 -> - case matches(Recover, Q, Q1) of + case matches(Recovery, Q, Q1) of true -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( @@ -184,8 +185,8 @@ declare(Recover, From, State = #q{q = Q, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q1), - BQS = bq_init(BQ, Q, Recover), - recovery_barrier(Recover), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Recovery), State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), @@ -202,6 +203,9 @@ declare(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. +recovery_status(new) -> {new, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. + matches(new, Q1, Q2) -> %% i.e. not policy Q1#amqqueue.name =:= Q2#amqqueue.name andalso @@ -237,7 +241,7 @@ decorator_callback(QName, F, A) -> bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(Q, Recover =/= new, + BQ:init(Q, Recover, fun (Mod, Fun) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). @@ -855,7 +859,7 @@ handle_call({init, Recover}, From, %% You used to be able to declare an exclusive durable queue. Sadly we %% need to still tidy up after that case, there could be the remnants %% of one left over from an upgrade. So that's why we don't enforce -%% Recover = false here. +%% Recover = new here. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> case rabbit_misc:is_process_alive(Owner) of @@ -866,7 +870,8 @@ handle_call({init, Recover}, From, q = Q} = State, gen_server2:reply(From, {owner_died, Q}), BQ = backing_queue_module(Q), - BQS = bq_init(BQ, Q, Recover), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), %% Rely on terminate to delete the queue. {stop, {shutdown, missing_owner}, State#q{backing_queue = BQ, backing_queue_state = BQS}} diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 61b504bc..f66327ac 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -27,7 +27,8 @@ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: ('empty' | {rabbit_types:msg_id(), Ack})). --type(attempt_recovery() :: boolean()). +-type(recovery_terms() :: [term()] | 'non_clean_shutdown'). +-type(recovery_info() :: 'new' | recovery_terms()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). @@ -40,7 +41,10 @@ %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency %% of those queues, or initialise any other shared resources. --callback start([rabbit_amqqueue:name()]) -> 'ok'. +%% +%% The list of queue recovery terms returned as {ok, Terms} must be given +%% in the same order as the list of queue names supplied. +-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). %% Called to tear down any state/resources. NB: Implementations should %% not depend on this function being called on shutdown and instead @@ -51,15 +55,19 @@ %% %% Takes %% 1. the amqqueue record -%% 2. a boolean indicating whether the queue is an existing queue that -%% should be recovered +%% 2. a term indicating whether the queue is an existing queue that +%% should be recovered or not. When 'new' is given, no recovery is +%% taking place, otherwise a tuple containing the process id of a +%% "barrier process" (on which to wait for a {BarrierPid, 'go'} +%% notification) is passed as the first element, whilst the second +%% holds either a list of recovery terms or the atom 'non_clean_shutdown'. %% 3. an asynchronous callback which accepts a function of type %% backing-queue-state to backing-queue-state. This callback %% function can be safely invoked from any process, which makes it %% useful for passing messages back into the backing queue, %% especially as the backing queue does not have control of its own %% mailbox. --callback init(rabbit_types:amqqueue(), attempt_recovery(), +-callback init(rabbit_types:amqqueue(), recovery_info(), async_callback()) -> state(). %% Called on queue shutdown when queue isn't being deleted. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index e2bc3247..ddd9a6f2 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -373,7 +373,7 @@ qc_default_exchange() -> qc_variable_queue_init(Q) -> {call, ?BQMOD, init, - [Q, false, function(2, ok)]}. + [Q, {false, []}, function(2, ok)]}. qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d562210a..da185dce 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - BQS = bq_init(BQ, Q1, false), + BQS = bq_init(BQ, Q1, []), State = #state { q = Q1, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f69d8355..292ad983 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,13 +16,11 @@ -module(rabbit_queue_index). --export([init/2, shutdown_terms/1, recover/5, +-export([init/2, recover/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). --export([scan/3]). - -export([add_queue_ttl/0, avoid_zeroes/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -196,10 +194,9 @@ -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). --type(shutdown_terms() :: [any()]). +-type(shutdown_terms() :: [any()] | 'non_clean_shutdown'). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). --spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> {'undefined' | non_neg_integer(), qistate()}). @@ -222,12 +219,6 @@ {non_neg_integer(), non_neg_integer(), qistate()}). -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). --spec(scan/3 :: (file:filename(), - fun ((seq_id(), rabbit_types:msg_id(), - rabbit_types:message_properties(), boolean(), - ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A), - A) -> A). - -spec(add_queue_ttl/0 :: () -> 'ok'). -endif. @@ -242,26 +233,20 @@ init(Name, OnSyncFun) -> false = rabbit_file:is_file(Dir), %% is_file == is file or dir State #qistate { on_sync = OnSyncFun }. -shutdown_terms(Name) -> - #qistate { dir = Dir } = blank_state(Name), - case read_shutdown_terms(Dir) of - {error, _} -> []; - {ok, Terms1} -> Terms1 - end. - -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +recover(Name, Terms, MsgStoreRecovered, + ContainsCheckFun, OnSyncFun) -> + State = blank_state(Name), State1 = State #qistate { on_sync = OnSyncFun }, - CleanShutdown = detect_clean_shutdown(Dir), + CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), init_clean(RecoveredCounts, State1); false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -terminate(Terms, State) -> - {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), - store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir), +terminate(Terms, State = #qistate { dir = Dir }) -> + {SegmentCounts, State1} = terminate(State), + rabbit_recovery_terms:store(Dir, [{segments, SegmentCounts} | Terms]), State1. delete_and_terminate(State) -> @@ -357,31 +342,33 @@ bounds(State = #qistate { segments = Segments }) -> end, {LowSeqId, NextSeqId, State}. -recover(DurableQueues) -> - DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || - Queue <- DurableQueues ]), - QueuesDir = queues_dir(), - QueueDirNames = all_queue_directory_names(QueuesDir), - DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), - {DurableQueueNames, DurableTerms} = +recover(DurableQueueNames) -> + rabbit_recovery_terms:recover(), + {DurableTerms, DurableDirectories} = lists:foldl( - fun (QueueDirName, {DurableAcc, TermsAcc}) -> - QueueDirPath = filename:join(QueuesDir, QueueDirName), - case sets:is_element(QueueDirName, DurableDirectories) of - true -> - TermsAcc1 = - case read_shutdown_terms(QueueDirPath) of - {error, _} -> TermsAcc; - {ok, Terms} -> [Terms | TermsAcc] - end, - {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], - TermsAcc1}; - false -> - ok = rabbit_file:recursive_delete([QueueDirPath]), - {DurableAcc, TermsAcc} - end - end, {[], []}, QueueDirNames), - {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + fun(QName, {RecoveryTerms, ValidDirectories}) -> + DirName = queue_name_to_dir_name(QName), + RecoveryInfo = case rabbit_recovery_terms:read(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + {[RecoveryInfo | RecoveryTerms], + sets:add_element(DirName, ValidDirectories)} + end, {[], sets:new()}, DurableQueueNames), + + %% Any queue directory we've not been asked to recover is considered garbage + QueuesDir = queues_dir(), + [rabbit_file:recursive_delete([QueueDir]) || + QueueDir <- all_queue_directory_names(QueuesDir), + not sets:is_element(filename:basename(QueueDir), + DurableDirectories)], + + rabbit_recovery_terms:clear(), + + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. all_queue_directory_names(Dir) -> case rabbit_file:list_dir(Dir) of @@ -410,22 +397,6 @@ blank_state_dir(Dir) -> on_sync = fun (_) -> ok end, unconfirmed = gb_sets:new() }. -clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). - -detect_clean_shutdown(Dir) -> - case rabbit_file:delete(clean_filename(Dir)) of - ok -> true; - {error, enoent} -> false - end. - -read_shutdown_terms(Dir) -> - rabbit_file:read_term_file(clean_filename(Dir)). - -store_clean_shutdown(Terms, Dir) -> - CleanFileName = clean_filename(Dir), - ok = rabbit_file:ensure_dir(CleanFileName), - rabbit_file:write_term_file(CleanFileName, Terms). - init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. @@ -554,9 +525,6 @@ queue_index_walker_reader(QueueName, Gatherer) -> end, ok, State), ok = gatherer:finish(Gatherer). -scan(Dir, Fun, Acc) -> - scan_segments(Fun, Acc, blank_state_dir(Dir)). - scan_segments(Fun, Acc, State) -> State1 = #qistate { segments = Segments, dir = Dir } = recover_journal(State), diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl new file mode 100644 index 00000000..9ff8222d --- /dev/null +++ b/src/rabbit_recovery_terms.erl @@ -0,0 +1,120 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +%% We use a gen_server simply so that during the terminate/2 call +%% (i.e., during shutdown), we can sync/flush the dets table to disk. + +-module(rabbit_recovery_terms). + +-behaviour(gen_server). + +-export([recover/0, upgrade_recovery_terms/0, start_link/0, + store/2, read/1, clear/0, flush/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-rabbit_upgrade({upgrade_recovery_terms, local, []}). + +-ifdef(use_specs). + +-spec(recover() -> 'ok'). +-spec(upgrade_recovery_terms() -> 'ok'). +-spec(start_link() -> rabbit_types:ok_pid_or_error()). +-spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())). +-spec(read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found)). +-spec(clear() -> 'ok'). + +-endif. % use_specs + +-include("rabbit.hrl"). +-define(SERVER, ?MODULE). +-define(UPGRADE_TABLE, rabbit_recovery_upgrades). + +recover() -> + case supervisor:start_child(rabbit_sup, + {?SERVER, {?MODULE, start_link, []}, + permanent, ?MAX_WAIT, worker, + [?SERVER]}) of + {ok, _} -> ok; + {error, {already_started, _}} -> ok; + {error, _}=Err -> Err + end. + +upgrade_recovery_terms() -> + create_table(), + try + QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"), + DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true, + fun(F, Acc) -> [F|Acc] end, []), + [begin + {ok, Terms} = rabbit_file:read_term_file(File), + ok = store(filename:dirname(File), Terms), + case file:delete(File) of + {error, E} -> + rabbit_log:warning("Unable to delete recovery index" + "~s during upgrade: ~p~n", [File, E]); + ok -> + ok + end + end || File <- DotFiles], + ok + after + flush() + end. + +start_link() -> gen_server:start_link(?MODULE, [], []). + +store(QueueDir, Terms) -> dets:insert(?MODULE, {to_key(QueueDir), Terms}). + +read(QueueDir) -> + case dets:lookup(?MODULE, QueueDir) of + [{_, Terms}] -> {ok, Terms}; + _ -> {error, not_found} + end. + +clear() -> + dets:delete_all_objects(?MODULE), + flush(). + +init(_) -> + process_flag(trap_exit, true), + create_table(), + {ok, undefined}. + +handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. + +handle_info(_Info, State) -> {noreply, State}. + +terminate(_Reason, _State) -> + ok = flush(), + ok = dets:close(?MODULE). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +flush() -> dets:sync(?MODULE). + +create_table() -> + File = filename:join(rabbit_mnesia:dir(), "recovery.dets"), + {ok, _} = dets:open_file(?MODULE, [{file, File}, + {ram_file, true}, + {auto_save, infinity}]). + +to_key(QueueDir) -> filename:basename(QueueDir). + diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 054db8ae..af2a6e86 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2129,11 +2129,10 @@ test_queue() -> init_test_queue() -> TestQueue = test_queue(), - Terms = rabbit_queue_index:shutdown_terms(TestQueue), - PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()), + PRef = rabbit_guid:gen(), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( - TestQueue, Terms, false, + TestQueue, [], false, fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, @@ -2144,12 +2143,12 @@ init_test_queue() -> restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([test_queue()]), + {ok, _} = rabbit_variable_queue:start([test_queue()]), init_test_queue(). empty_test_queue() -> ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([]), + {ok, _} = rabbit_variable_queue:start([]), {0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. @@ -2205,7 +2204,7 @@ test_queue_index_props() -> end), ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([]), + {ok, _} = rabbit_variable_queue:start([]), passed. @@ -2329,13 +2328,16 @@ test_queue_index() -> end), ok = rabbit_variable_queue:stop(), - ok = rabbit_variable_queue:start([]), + {ok, _} = rabbit_variable_queue:start([]), passed. variable_queue_init(Q, Recover) -> rabbit_variable_queue:init( - Q, Recover, fun nop/2, fun nop/2, fun nop/1). + Q, case Recover of + true -> non_clean_shutdown; + false -> new + end, fun nop/2, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52..58647d3b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -392,11 +392,13 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), start_msg_store( [Ref || Terms <- AllTerms, + Terms /= non_clean_shutdown, begin Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined end], - StartFunState). + StartFunState), + {ok, AllTerms}. stop() -> stop_msg_store(). @@ -419,7 +421,7 @@ init(Queue, Recover, AsyncCallback) -> end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(#amqqueue { name = QueueName, durable = IsDurable }, false, +init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], @@ -430,29 +432,32 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, true, +init(#amqqueue { name = QueueName, durable = true }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - Terms = rabbit_queue_index:shutdown_terms(QueueName), - {PRef, Terms1} = - case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), []}; - PRef1 -> {PRef1, Terms} - end, + {PRef, RecoveryTerms} = process_recovery_terms(Terms), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( - QueueName, Terms1, + QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, + init(true, IndexState, DeltaCount, RecoveryTerms, PersistentClient, TransientClient). +process_recovery_terms(Terms=non_clean_shutdown) -> + {rabbit_guid:gen(), Terms}; +process_recovery_terms(Terms) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {rabbit_guid:gen(), []}; + PRef1 -> {PRef1, Terms} + end. + terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, @@ -1003,7 +1008,12 @@ init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + DeltaCount1 = + case Terms of + non_clean_shutdown -> DeltaCount; + _ -> proplists:get_value(persistent_count, + Terms, DeltaCount) + end, Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; false -> d(#delta { start_seq_id = LowSeqId, |