summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2014-01-15 16:53:09 +0000
committerTim Watson <tim@rabbitmq.com>2014-01-15 16:53:09 +0000
commitaf00f09c66d5db546e3e59adafdf86ba5230ec1d (patch)
tree461389bee1d9acdfedc1b2d7086007ddd86677fd
parentebc2aa448dcf2825508e1a8a101844e133b88e5d (diff)
parentd9506fe01ba41065df37e4f95de550918e08c2c8 (diff)
downloadrabbitmq-server-af00f09c66d5db546e3e59adafdf86ba5230ec1d.tar.gz
merge default into bug25827
-rw-r--r--src/rabbit_amqqueue.erl18
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_backing_queue.erl18
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_queue_index.erl102
-rw-r--r--src/rabbit_recovery_terms.erl120
-rw-r--r--src/rabbit_tests.erl18
-rw-r--r--src/rabbit_variable_queue.erl34
9 files changed, 226 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b1e00b7..55b98a0c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,13 +195,18 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+
+ %% We rely on BQ:start/1 returning the recovery terms in the same
+ %% order as the supplied queue names, so that we can zip them together
+ %% for further processing in recover_durable_queues.
+ {ok, OrderedRecoveryTerms} =
+ BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(DurableQueues).
+ recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -229,10 +234,11 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues) ->
- Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [Q || Q = #amqqueue{pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
+recover_durable_queues(QueuesAndRecoveryTerms) ->
+ Qs = [{start_queue_process(node(), Q), Terms} ||
+ {Q, Terms} <- QueuesAndRecoveryTerms],
+ [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs,
+ gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 08509c96..51032fc7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -173,9 +173,10 @@ code_change(_OldVsn, State, _Extra) ->
declare(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+ {Recovery, TermsOrNew} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of
#amqqueue{} = Q1 ->
- case matches(Recover, Q, Q1) of
+ case matches(Recovery, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
@@ -184,8 +185,8 @@ declare(Recover, From, State = #q{q = Q,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
- BQS = bq_init(BQ, Q, Recover),
- recovery_barrier(Recover),
+ BQS = bq_init(BQ, Q, TermsOrNew),
+ recovery_barrier(Recovery),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -202,6 +203,9 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
+recovery_status(new) -> {new, new};
+recovery_status({Recover, Terms}) -> {Recover, Terms}.
+
matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
@@ -237,7 +241,7 @@ decorator_callback(QName, F, A) ->
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover =/= new,
+ BQ:init(Q, Recover,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
@@ -855,7 +859,7 @@ handle_call({init, Recover}, From,
%% You used to be able to declare an exclusive durable queue. Sadly we
%% need to still tidy up after that case, there could be the remnants
%% of one left over from an upgrade. So that's why we don't enforce
-%% Recover = false here.
+%% Recover = new here.
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
case rabbit_misc:is_process_alive(Owner) of
@@ -866,7 +870,8 @@ handle_call({init, Recover}, From,
q = Q} = State,
gen_server2:reply(From, {owner_died, Q}),
BQ = backing_queue_module(Q),
- BQS = bq_init(BQ, Q, Recover),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
%% Rely on terminate to delete the queue.
{stop, {shutdown, missing_owner},
State#q{backing_queue = BQ, backing_queue_state = BQS}}
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc..f66327ac 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -27,7 +27,8 @@
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
('empty' | {rabbit_types:msg_id(), Ack})).
--type(attempt_recovery() :: boolean()).
+-type(recovery_terms() :: [term()] | 'non_clean_shutdown').
+-type(recovery_info() :: 'new' | recovery_terms()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
@@ -40,7 +41,10 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_amqqueue:name()]) -> 'ok'.
+%%
+%% The list of queue recovery terms returned as {ok, Terms} must be given
+%% in the same order as the list of queue names supplied.
+-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
@@ -51,15 +55,19 @@
%%
%% Takes
%% 1. the amqqueue record
-%% 2. a boolean indicating whether the queue is an existing queue that
-%% should be recovered
+%% 2. a term indicating whether the queue is an existing queue that
+%% should be recovered or not. When 'new' is given, no recovery is
+%% taking place, otherwise a tuple containing the process id of a
+%% "barrier process" (on which to wait for a {BarrierPid, 'go'}
+%% notification) is passed as the first element, whilst the second
+%% holds either a list of recovery terms or the atom 'non_clean_shutdown'.
%% 3. an asynchronous callback which accepts a function of type
%% backing-queue-state to backing-queue-state. This callback
%% function can be safely invoked from any process, which makes it
%% useful for passing messages back into the backing queue,
%% especially as the backing queue does not have control of its own
%% mailbox.
--callback init(rabbit_types:amqqueue(), attempt_recovery(),
+-callback init(rabbit_types:amqqueue(), recovery_info(),
async_callback()) -> state().
%% Called on queue shutdown when queue isn't being deleted.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e2bc3247..ddd9a6f2 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -373,7 +373,7 @@ qc_default_exchange() ->
qc_variable_queue_init(Q) ->
{call, ?BQMOD, init,
- [Q, false, function(2, ok)]}.
+ [Q, {false, []}, function(2, ok)]}.
qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d562210a..da185dce 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -115,7 +115,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, false),
+ BQS = bq_init(BQ, Q1, []),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f69d8355..292ad983 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,13 +16,11 @@
-module(rabbit_queue_index).
--export([init/2, shutdown_terms/1, recover/5,
+-export([init/2, recover/5,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, recover/1]).
--export([scan/3]).
-
-export([add_queue_ttl/0, avoid_zeroes/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -196,10 +194,9 @@
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
--type(shutdown_terms() :: [any()]).
+-type(shutdown_terms() :: [any()] | 'non_clean_shutdown').
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
--spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
contains_predicate(), on_sync_fun()) ->
{'undefined' | non_neg_integer(), qistate()}).
@@ -222,12 +219,6 @@
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
--spec(scan/3 :: (file:filename(),
- fun ((seq_id(), rabbit_types:msg_id(),
- rabbit_types:message_properties(), boolean(),
- ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A),
- A) -> A).
-
-spec(add_queue_ttl/0 :: () -> 'ok').
-endif.
@@ -242,26 +233,20 @@ init(Name, OnSyncFun) ->
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
-shutdown_terms(Name) ->
- #qistate { dir = Dir } = blank_state(Name),
- case read_shutdown_terms(Dir) of
- {error, _} -> [];
- {ok, Terms1} -> Terms1
- end.
-
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+recover(Name, Terms, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun) ->
+ State = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown = detect_clean_shutdown(Dir),
+ CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
-terminate(Terms, State) ->
- {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
- store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir),
+terminate(Terms, State = #qistate { dir = Dir }) ->
+ {SegmentCounts, State1} = terminate(State),
+ rabbit_recovery_terms:store(Dir, [{segments, SegmentCounts} | Terms]),
State1.
delete_and_terminate(State) ->
@@ -357,31 +342,33 @@ bounds(State = #qistate { segments = Segments }) ->
end,
{LowSeqId, NextSeqId, State}.
-recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
- QueuesDir = queues_dir(),
- QueueDirNames = all_queue_directory_names(QueuesDir),
- DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
- {DurableQueueNames, DurableTerms} =
+recover(DurableQueueNames) ->
+ rabbit_recovery_terms:recover(),
+ {DurableTerms, DurableDirectories} =
lists:foldl(
- fun (QueueDirName, {DurableAcc, TermsAcc}) ->
- QueueDirPath = filename:join(QueuesDir, QueueDirName),
- case sets:is_element(QueueDirName, DurableDirectories) of
- true ->
- TermsAcc1 =
- case read_shutdown_terms(QueueDirPath) of
- {error, _} -> TermsAcc;
- {ok, Terms} -> [Terms | TermsAcc]
- end,
- {[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
- TermsAcc1};
- false ->
- ok = rabbit_file:recursive_delete([QueueDirPath]),
- {DurableAcc, TermsAcc}
- end
- end, {[], []}, QueueDirNames),
- {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+ fun(QName, {RecoveryTerms, ValidDirectories}) ->
+ DirName = queue_name_to_dir_name(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ {[RecoveryInfo | RecoveryTerms],
+ sets:add_element(DirName, ValidDirectories)}
+ end, {[], sets:new()}, DurableQueueNames),
+
+ %% Any queue directory we've not been asked to recover is considered garbage
+ QueuesDir = queues_dir(),
+ [rabbit_file:recursive_delete([QueueDir]) ||
+ QueueDir <- all_queue_directory_names(QueuesDir),
+ not sets:is_element(filename:basename(QueueDir),
+ DurableDirectories)],
+
+ rabbit_recovery_terms:clear(),
+
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
case rabbit_file:list_dir(Dir) of
@@ -410,22 +397,6 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unconfirmed = gb_sets:new() }.
-clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
-
-detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_filename(Dir)) of
- ok -> true;
- {error, enoent} -> false
- end.
-
-read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_filename(Dir)).
-
-store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_filename(Dir),
- ok = rabbit_file:ensure_dir(CleanFileName),
- rabbit_file:write_term_file(CleanFileName, Terms).
-
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
%% gets us back to where we were on shutdown.
@@ -554,9 +525,6 @@ queue_index_walker_reader(QueueName, Gatherer) ->
end, ok, State),
ok = gatherer:finish(Gatherer).
-scan(Dir, Fun, Acc) ->
- scan_segments(Fun, Acc, blank_state_dir(Dir)).
-
scan_segments(Fun, Acc, State) ->
State1 = #qistate { segments = Segments, dir = Dir } =
recover_journal(State),
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
new file mode 100644
index 00000000..9ff8222d
--- /dev/null
+++ b/src/rabbit_recovery_terms.erl
@@ -0,0 +1,120 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+%% We use a gen_server simply so that during the terminate/2 call
+%% (i.e., during shutdown), we can sync/flush the dets table to disk.
+
+-module(rabbit_recovery_terms).
+
+-behaviour(gen_server).
+
+-export([recover/0, upgrade_recovery_terms/0, start_link/0,
+ store/2, read/1, clear/0, flush/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-rabbit_upgrade({upgrade_recovery_terms, local, []}).
+
+-ifdef(use_specs).
+
+-spec(recover() -> 'ok').
+-spec(upgrade_recovery_terms() -> 'ok').
+-spec(start_link() -> rabbit_types:ok_pid_or_error()).
+-spec(store(file:filename(), term()) -> rabbit_types:ok_or_error(term())).
+-spec(read(file:filename()) -> rabbit_types:ok_or_error2(term(), not_found)).
+-spec(clear() -> 'ok').
+
+-endif. % use_specs
+
+-include("rabbit.hrl").
+-define(SERVER, ?MODULE).
+-define(UPGRADE_TABLE, rabbit_recovery_upgrades).
+
+recover() ->
+ case supervisor:start_child(rabbit_sup,
+ {?SERVER, {?MODULE, start_link, []},
+ permanent, ?MAX_WAIT, worker,
+ [?SERVER]}) of
+ {ok, _} -> ok;
+ {error, {already_started, _}} -> ok;
+ {error, _}=Err -> Err
+ end.
+
+upgrade_recovery_terms() ->
+ create_table(),
+ try
+ QueuesDir = filename:join(rabbit_mnesia:dir(), "queues"),
+ DotFiles = filelib:fold_files(QueuesDir, "clean.dot", true,
+ fun(F, Acc) -> [F|Acc] end, []),
+ [begin
+ {ok, Terms} = rabbit_file:read_term_file(File),
+ ok = store(filename:dirname(File), Terms),
+ case file:delete(File) of
+ {error, E} ->
+ rabbit_log:warning("Unable to delete recovery index"
+ "~s during upgrade: ~p~n", [File, E]);
+ ok ->
+ ok
+ end
+ end || File <- DotFiles],
+ ok
+ after
+ flush()
+ end.
+
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
+store(QueueDir, Terms) -> dets:insert(?MODULE, {to_key(QueueDir), Terms}).
+
+read(QueueDir) ->
+ case dets:lookup(?MODULE, QueueDir) of
+ [{_, Terms}] -> {ok, Terms};
+ _ -> {error, not_found}
+ end.
+
+clear() ->
+ dets:delete_all_objects(?MODULE),
+ flush().
+
+init(_) ->
+ process_flag(trap_exit, true),
+ create_table(),
+ {ok, undefined}.
+
+handle_call(Msg, _, State) -> {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(_Info, State) -> {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok = flush(),
+ ok = dets:close(?MODULE).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+flush() -> dets:sync(?MODULE).
+
+create_table() ->
+ File = filename:join(rabbit_mnesia:dir(), "recovery.dets"),
+ {ok, _} = dets:open_file(?MODULE, [{file, File},
+ {ram_file, true},
+ {auto_save, infinity}]).
+
+to_key(QueueDir) -> filename:basename(QueueDir).
+
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 054db8ae..af2a6e86 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2129,11 +2129,10 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
- Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
+ PRef = rabbit_guid:gen(),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
- TestQueue, Terms, false,
+ TestQueue, [], false,
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
@@ -2144,12 +2143,12 @@ init_test_queue() ->
restart_test_queue(Qi) ->
_ = rabbit_queue_index:terminate([], Qi),
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([test_queue()]),
+ {ok, _} = rabbit_variable_queue:start([test_queue()]),
init_test_queue().
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([]),
+ {ok, _} = rabbit_variable_queue:start([]),
{0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
@@ -2205,7 +2204,7 @@ test_queue_index_props() ->
end),
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([]),
+ {ok, _} = rabbit_variable_queue:start([]),
passed.
@@ -2329,13 +2328,16 @@ test_queue_index() ->
end),
ok = rabbit_variable_queue:stop(),
- ok = rabbit_variable_queue:start([]),
+ {ok, _} = rabbit_variable_queue:start([]),
passed.
variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
- Q, Recover, fun nop/2, fun nop/2, fun nop/1).
+ Q, case Recover of
+ true -> non_clean_shutdown;
+ false -> new
+ end, fun nop/2, fun nop/2, fun nop/1).
variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52..58647d3b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -392,11 +392,13 @@ start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
start_msg_store(
[Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- StartFunState).
+ StartFunState),
+ {ok, AllTerms}.
stop() -> stop_msg_store().
@@ -419,7 +421,7 @@ init(Queue, Recover, AsyncCallback) ->
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
@@ -430,29 +432,32 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, true,
+init(#amqqueue { name = QueueName, durable = true }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- Terms = rabbit_queue_index:shutdown_terms(QueueName),
- {PRef, Terms1} =
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef1 -> {PRef1, Terms}
- end,
+ {PRef, RecoveryTerms} = process_recovery_terms(Terms),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
- QueueName, Terms1,
+ QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
+ init(true, IndexState, DeltaCount, RecoveryTerms,
PersistentClient, TransientClient).
+process_recovery_terms(Terms=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Terms};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), []};
+ PRef1 -> {PRef1, Terms}
+ end.
+
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
@@ -1003,7 +1008,12 @@ init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
- DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
+ DeltaCount1 =
+ case Terms of
+ non_clean_shutdown -> DeltaCount;
+ _ -> proplists:get_value(persistent_count,
+ Terms, DeltaCount)
+ end,
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
false -> d(#delta { start_seq_id = LowSeqId,