summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2013-12-19 10:19:44 +0000
committerTim Watson <watson.timothy@gmail.com>2013-12-19 10:19:44 +0000
commitcaee64cd25e429f094b0a7528a335add13bf4d22 (patch)
tree3e99d55dbb7558955dbc5d65989ca316bb28d679 /src
parent9db4fd04f9bbda5a1a3d2b956db68cedffd1fc99 (diff)
downloadrabbitmq-server-caee64cd25e429f094b0a7528a335add13bf4d22.tar.gz
Rework/Refactor to handle recovery terms up-front
We process all the recovery terms up-front, during qi recovery, and clear + sync the dets table immediately afterwards. The recovery terms and keys, based on the queue directory?s ?basename?, are then passed throughout the initialisation process and checked in the various places they?re used.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_queue_index.erl44
-rw-r--r--src/rabbit_recovery_terms.erl43
-rw-r--r--src/rabbit_tests.erl3
-rw-r--r--src/rabbit_variable_queue.erl27
10 files changed, 105 insertions, 71 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c0010d62..0203b4e9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -595,12 +595,11 @@ boot_delegate() ->
recover() ->
rabbit_policy:recover(),
- ok = rabbit_recovery_indexes:recover(),
+ ok = rabbit_recovery_terms:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
- rabbit_amqqueue:start(Qs),
- ok = rabbit_recovery_indexes:flush().
+ rabbit_amqqueue:start(Qs).
maybe_insert_default_data() ->
case rabbit_table:is_empty() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b1e00b7..f611573b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
- assert_equivalence/5,
+ assert_equivalence/5, queue_name_to_dir_name/1,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -117,6 +117,7 @@
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
rabbit_framing:amqp_table()}]).
+-spec(queue_name_to_dir_name/1 :: (rabbit_types:amqqueue()) -> string()).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
@@ -195,13 +196,13 @@ recover() ->
on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
- ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
+ {ok, Terms} = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
- recover_durable_queues(DurableQueues).
+ recover_durable_queues(DurableQueues, Terms).
stop() ->
ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
@@ -229,10 +230,17 @@ find_durable_queues() ->
node(Pid) == Node]))
end).
-recover_durable_queues(DurableQueues) ->
+recover_durable_queues(DurableQueues, RecoveryTerms) ->
Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
- [Q || Q = #amqqueue{pid = Pid} <- Qs,
- gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}].
+ [Q || Q <- Qs, queue_init(Q, RecoveryTerms) == {new, Q}].
+
+queue_init(#amqqueue{ pid = Pid, name = Name }, RecoveryTerms) ->
+ RecoveryKey = queue_name_to_dir_name(Name),
+ QueueRecoveryTerms = case lists:keyfind(RecoveryKey, 1, RecoveryTerms) of
+ {_, Terms} -> Terms;
+ false -> non_clean_shutdown
+ end,
+ gen_server2:call(Pid, {init, {self(), QueueRecoveryTerms}}, infinity).
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
@@ -519,6 +527,10 @@ notify_policy_changed(#amqqueue{pid = QPid}) ->
consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
+queue_name_to_dir_name(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
consumers_all(VHostPath) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7002fd36..3af2993e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -187,12 +187,14 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From, State = #q{q = Q,
- backing_queue = undefined,
- backing_queue_state = undefined}) ->
- case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of
+declare(Recover, From,
+ State = #q{q = Q,
+ backing_queue = undefined,
+ backing_queue_state = undefined}) ->
+ {IsRecovering, MediatorPid} = recovery_status(Recover),
+ case rabbit_amqqueue:internal_declare(Q, IsRecovering) of
#amqqueue{} = Q1 ->
- case matches(Recover, Q, Q1) of
+ case matches(IsRecovering, Q, Q1) of
true ->
gen_server2:reply(From, {new, Q}),
ok = file_handle_cache:register_callback(
@@ -202,7 +204,7 @@ declare(Recover, From, State = #q{q = Q,
set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q1),
BQS = bq_init(BQ, Q, Recover),
- recovery_barrier(Recover),
+ recovery_barrier(MediatorPid),
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
@@ -219,6 +221,11 @@ declare(Recover, From, State = #q{q = Q,
{stop, normal, Err, State}
end.
+recovery_status(new) ->
+ {false, new};
+recovery_status({Recover, _}) ->
+ {true, Recover}.
+
matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
@@ -254,7 +261,7 @@ decorator_callback(QName, F, A) ->
bq_init(BQ, Q, Recover) ->
Self = self(),
- BQ:init(Q, Recover =/= new,
+ BQ:init(Q, Recover,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 61b504bc..603c34a9 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -27,7 +27,8 @@
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
('empty' | {rabbit_types:msg_id(), Ack})).
--type(attempt_recovery() :: boolean()).
+-type(recovery_terms() :: [{file:filename(), [term()]}]).
+-type(attempt_recovery() :: {boolean(), recovery_terms()}).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
@@ -40,7 +41,7 @@
%% aren't being started at this point, but this call allows the
%% backing queue to perform any checking necessary for the consistency
%% of those queues, or initialise any other shared resources.
--callback start([rabbit_amqqueue:name()]) -> 'ok'.
+-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()).
%% Called to tear down any state/resources. NB: Implementations should
%% not depend on this function being called on shutdown and instead
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index e2bc3247..ddd9a6f2 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -373,7 +373,7 @@ qc_default_exchange() ->
qc_variable_queue_init(Q) ->
{call, ?BQMOD, init,
- [Q, false, function(2, ok)]}.
+ [Q, {false, []}, function(2, ok)]}.
qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 96f89ecc..b578d1a6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -114,7 +114,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- BQS = bq_init(BQ, Q1, false),
+ BQS = bq_init(BQ, Q1, {false, []}),
State = #state { q = Q1,
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4349a2f0..95cb9d97 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,7 +196,8 @@
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
--type(shutdown_terms() :: [any()]).
+-type(recovery_type() :: 'clean_shutdown' | 'non_clean_shutdown').
+-type(shutdown_terms() :: {recovery_type(), [any()]}).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
@@ -244,19 +245,16 @@ init(Name, OnSyncFun) ->
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case rabbit_recovery_indexes:read_recovery_terms(Dir) of
+ case rabbit_recovery_terms:read(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
end.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+recover(Name, {Recovery, Terms}, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun) ->
+ State = blank_state(Name),
State1 = State #qistate { on_sync = OnSyncFun },
- CleanShutdown =
- case rabbit_recovery_indexes:remove_recovery_terms(Dir) of
- ok -> true;
- {error, not_found} -> false
- end,
+ CleanShutdown = Recovery =/= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
init_clean(RecoveredCounts, State1);
@@ -265,7 +263,7 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
terminate(Terms, State = #qistate { dir = Dir }) ->
{SegmentCounts, State1} = terminate(State),
- rabbit_recovery_indexes:store_recovery_terms(
+ rabbit_recovery_terms:store(
Dir, [{segments, SegmentCounts} | Terms]),
State1.
@@ -363,8 +361,12 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
- DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
- Queue <- DurableQueues ]),
+ DurableDict =
+ dict:from_list(
+ [ begin
+ DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue),
+ {DirName, Queue}
+ end || Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
@@ -375,20 +377,23 @@ recover(DurableQueues) ->
case sets:is_element(QueueDirName, DurableDirectories) of
true ->
TermsAcc1 =
- case rabbit_recovery_indexes:read_recovery_terms(
+ case rabbit_recovery_terms:read(
QueueDirPath) of
{error, _} -> TermsAcc;
- {ok, Terms} -> [Terms | TermsAcc]
+ {ok, Terms} -> [{QueueDirPath, Terms} |
+ TermsAcc]
end,
{[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
ok = rabbit_file:recursive_delete([QueueDirPath]),
- rabbit_recovery_indexes:remove_recovery_terms(
- QueueDirPath),
+ %rabbit_recovery_indexes:remove_recovery_terms(
+ % QueueDirPath),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
+ rabbit_recovery_terms:clear(),
+ rabbit_recovery_terms:flush(),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
@@ -405,7 +410,8 @@ all_queue_directory_names(Dir) ->
blank_state(QueueName) ->
blank_state_dir(
- filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+ filename:join(queues_dir(),
+ rabbit_amqqueue:queue_name_to_dir_name(QueueName))).
blank_state_dir(Dir) ->
{ok, MaxJournal} =
@@ -501,10 +507,6 @@ recover_message(false, _, del, RelSeq, Segment) ->
recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary(Name)),
- rabbit_misc:format("~.36B", [Num]).
-
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 48af9530..f8138e0e 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -24,9 +24,9 @@
-export([recover/0,
upgrade_recovery_indexes/0,
start_link/0,
- store_recovery_terms/2,
- read_recovery_terms/1,
- remove_recovery_terms/1,
+ store/2,
+ read/1,
+ clear/0,
flush/0]).
-export([init/1,
@@ -43,15 +43,16 @@
-spec(recover() -> 'ok').
-spec(upgrade_recovery_indexes() -> 'ok').
-spec(start_link() -> rabbit_types:ok_pid_or_error()).
--spec(store_recovery_terms(
+-spec(store(
Name :: file:filename(),
Terms :: term()) -> rabbit_types:ok_or_error(term())).
--spec(read_recovery_terms(
- file:filename()) ->
- rabbit_types:ok_or_error(not_found)).
--spec(remove_recovery_terms(
+-spec(read(
file:filename()) ->
rabbit_types:ok_or_error(not_found)).
+-spec(clear() -> 'ok').
+%-spec(remove_recovery_terms(
+% file:filename()) ->
+% rabbit_types:ok_or_error(not_found)).
-endif. % use_specs
@@ -76,7 +77,7 @@ upgrade_recovery_indexes() ->
fun(F, Acc) -> [F|Acc] end, []),
[begin
{ok, Terms} = rabbit_file:read_term_file(File),
- ok = store_recovery_terms(File, Terms),
+ ok = store(File, Terms),
case file:delete(File) of
{error, E} ->
rabbit_log:warning("Unable to delete recovery index"
@@ -93,20 +94,26 @@ upgrade_recovery_indexes() ->
start_link() ->
gen_server:start_link(?MODULE, [], []).
-store_recovery_terms(Name, Terms) ->
- dets:insert(?MODULE, {Name, Terms}).
+store(Name, Terms) ->
+ dets:insert(?MODULE, {scrub(Name), Terms}).
-read_recovery_terms(Name) ->
- case dets:lookup(?MODULE, Name) of
+read(Name) ->
+ case dets:lookup(?MODULE, scrub(Name)) of
[{_, Terms}] -> {ok, Terms};
_ -> {error, not_found}
end.
-remove_recovery_terms(Name) ->
- case dets:member(?MODULE, Name) of
- true -> dets:delete(?MODULE, Name);
- _ -> {error, not_found}
- end.
+scrub(Name) ->
+ filename:basename(Name).
+
+%remove_recovery_terms(Name) ->
+% case dets:member(?MODULE, Name) of
+% true -> dets:delete(?MODULE, Name);
+% _ -> {error, not_found}
+% end.
+
+clear() ->
+ dets:delete_all_objects(?MODULE).
flush() ->
dets:sync(?MODULE),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3..7aafb23d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2129,11 +2129,12 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
+ %% TODO: shutdown_terms is no longer relevant - rework this test case
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
- TestQueue, Terms, false,
+ TestQueue, {clean_shutdown, Terms}, false,
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ac2b9f52..1b29ceb3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -391,12 +391,13 @@
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues),
start_msg_store(
- [Ref || Terms <- AllTerms,
+ [Ref || {_, Terms} <- AllTerms,
begin
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- StartFunState).
+ StartFunState),
+ {ok, AllTerms}.
stop() -> stop_msg_store().
@@ -419,7 +420,7 @@ init(Queue, Recover, AsyncCallback) ->
end,
fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
-init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
@@ -430,21 +431,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
-init(#amqqueue { name = QueueName, durable = true }, true,
+init(#amqqueue { name = QueueName, durable = true }, {_, Terms},
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- Terms = rabbit_queue_index:shutdown_terms(QueueName),
- {PRef, Terms1} =
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef1 -> {PRef1, Terms}
- end,
+ %% Terms = rabbit_queue_index:shutdown_terms(QueueName),
+ {PRef, Recovery, Terms1} = process_recovery_terms(Terms),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
- QueueName, Terms1,
+ QueueName, {Recovery, Terms1},
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (MsgId) ->
rabbit_msg_store:contains(MsgId, PersistentClient)
@@ -453,6 +450,14 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
+process_recovery_terms(Recovery=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Recovery, []};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), clean_shutdown, []};
+ PRef1 -> {PRef1, clean_shutdown, Terms}
+ end.
+
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,