diff options
author | Tim Watson <watson.timothy@gmail.com> | 2013-12-19 10:19:44 +0000 |
---|---|---|
committer | Tim Watson <watson.timothy@gmail.com> | 2013-12-19 10:19:44 +0000 |
commit | caee64cd25e429f094b0a7528a335add13bf4d22 (patch) | |
tree | 3e99d55dbb7558955dbc5d65989ca316bb28d679 /src | |
parent | 9db4fd04f9bbda5a1a3d2b956db68cedffd1fc99 (diff) | |
download | rabbitmq-server-caee64cd25e429f094b0a7528a335add13bf4d22.tar.gz |
Rework/Refactor to handle recovery terms up-front
We process all the recovery terms up-front, during qi recovery, and
clear + sync the dets table immediately afterwards. The recovery terms
and keys, based on the queue directory?s ?basename?, are then passed
throughout the initialisation process and checked in the various
places they?re used.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 24 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 44 | ||||
-rw-r--r-- | src/rabbit_recovery_terms.erl | 43 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 27 |
10 files changed, 105 insertions, 71 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c0010d62..0203b4e9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -595,12 +595,11 @@ boot_delegate() -> recover() -> rabbit_policy:recover(), - ok = rabbit_recovery_indexes:recover(), + ok = rabbit_recovery_terms:recover(), Qs = rabbit_amqqueue:recover(), ok = rabbit_binding:recover(rabbit_exchange:recover(), [QName || #amqqueue{name = QName} <- Qs]), - rabbit_amqqueue:start(Qs), - ok = rabbit_recovery_indexes:flush(). + rabbit_amqqueue:start(Qs). maybe_insert_default_data() -> case rabbit_table:is_empty() of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6b1e00b7..f611573b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, - assert_equivalence/5, + assert_equivalence/5, queue_name_to_dir_name/1, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -117,6 +117,7 @@ (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table()}]). +-spec(queue_name_to_dir_name/1 :: (rabbit_types:amqqueue()) -> string()). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) @@ -195,13 +196,13 @@ recover() -> on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), - ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), + {ok, Terms} = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(DurableQueues). + recover_durable_queues(DurableQueues, Terms). stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -229,10 +230,17 @@ find_durable_queues() -> node(Pid) == Node])) end). -recover_durable_queues(DurableQueues) -> +recover_durable_queues(DurableQueues, RecoveryTerms) -> Qs = [start_queue_process(node(), Q) || Q <- DurableQueues], - [Q || Q = #amqqueue{pid = Pid} <- Qs, - gen_server2:call(Pid, {init, self()}, infinity) == {new, Q}]. + [Q || Q <- Qs, queue_init(Q, RecoveryTerms) == {new, Q}]. + +queue_init(#amqqueue{ pid = Pid, name = Name }, RecoveryTerms) -> + RecoveryKey = queue_name_to_dir_name(Name), + QueueRecoveryTerms = case lists:keyfind(RecoveryKey, 1, RecoveryTerms) of + {_, Terms} -> Terms; + false -> non_clean_shutdown + end, + gen_server2:call(Pid, {init, {self(), QueueRecoveryTerms}}, infinity). declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), @@ -519,6 +527,10 @@ notify_policy_changed(#amqqueue{pid = QPid}) -> consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + <<Num:128>> = erlang:md5(term_to_binary(Name)), + rabbit_misc:format("~.36B", [Num]). + consumer_info_keys() -> ?CONSUMER_INFO_KEYS. consumers_all(VHostPath) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7002fd36..3af2993e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -187,12 +187,14 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> - case rabbit_amqqueue:internal_declare(Q, Recover =/= new) of +declare(Recover, From, + State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> + {IsRecovering, MediatorPid} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, IsRecovering) of #amqqueue{} = Q1 -> - case matches(Recover, Q, Q1) of + case matches(IsRecovering, Q, Q1) of true -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( @@ -202,7 +204,7 @@ declare(Recover, From, State = #q{q = Q, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q1), BQS = bq_init(BQ, Q, Recover), - recovery_barrier(Recover), + recovery_barrier(MediatorPid), State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), @@ -219,6 +221,11 @@ declare(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. +recovery_status(new) -> + {false, new}; +recovery_status({Recover, _}) -> + {true, Recover}. + matches(new, Q1, Q2) -> %% i.e. not policy Q1#amqqueue.name =:= Q2#amqqueue.name andalso @@ -254,7 +261,7 @@ decorator_callback(QName, F, A) -> bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(Q, Recover =/= new, + BQ:init(Q, Recover, fun (Mod, Fun) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 61b504bc..603c34a9 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -27,7 +27,8 @@ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: ('empty' | {rabbit_types:msg_id(), Ack})). --type(attempt_recovery() :: boolean()). +-type(recovery_terms() :: [{file:filename(), [term()]}]). +-type(attempt_recovery() :: {boolean(), recovery_terms()}). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). @@ -40,7 +41,7 @@ %% aren't being started at this point, but this call allows the %% backing queue to perform any checking necessary for the consistency %% of those queues, or initialise any other shared resources. --callback start([rabbit_amqqueue:name()]) -> 'ok'. +-callback start([rabbit_amqqueue:name()]) -> rabbit_types:ok(recovery_terms()). %% Called to tear down any state/resources. NB: Implementations should %% not depend on this function being called on shutdown and instead diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index e2bc3247..ddd9a6f2 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -373,7 +373,7 @@ qc_default_exchange() -> qc_variable_queue_init(Q) -> {call, ?BQMOD, init, - [Q, false, function(2, ok)]}. + [Q, {false, []}, function(2, ok)]}. qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 96f89ecc..b578d1a6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -114,7 +114,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - BQS = bq_init(BQ, Q1, false), + BQS = bq_init(BQ, Q1, {false, []}), State = #state { q = Q1, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4349a2f0..95cb9d97 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -196,7 +196,8 @@ -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). --type(shutdown_terms() :: [any()]). +-type(recovery_type() :: 'clean_shutdown' | 'non_clean_shutdown'). +-type(shutdown_terms() :: {recovery_type(), [any()]}). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). @@ -244,19 +245,16 @@ init(Name, OnSyncFun) -> shutdown_terms(Name) -> #qistate { dir = Dir } = blank_state(Name), - case rabbit_recovery_indexes:read_recovery_terms(Dir) of + case rabbit_recovery_terms:read(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 end. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +recover(Name, {Recovery, Terms}, MsgStoreRecovered, + ContainsCheckFun, OnSyncFun) -> + State = blank_state(Name), State1 = State #qistate { on_sync = OnSyncFun }, - CleanShutdown = - case rabbit_recovery_indexes:remove_recovery_terms(Dir) of - ok -> true; - {error, not_found} -> false - end, + CleanShutdown = Recovery =/= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), init_clean(RecoveredCounts, State1); @@ -265,7 +263,7 @@ recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> terminate(Terms, State = #qistate { dir = Dir }) -> {SegmentCounts, State1} = terminate(State), - rabbit_recovery_indexes:store_recovery_terms( + rabbit_recovery_terms:store( Dir, [{segments, SegmentCounts} | Terms]), State1. @@ -363,8 +361,12 @@ bounds(State = #qistate { segments = Segments }) -> {LowSeqId, NextSeqId, State}. recover(DurableQueues) -> - DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || - Queue <- DurableQueues ]), + DurableDict = + dict:from_list( + [ begin + DirName = rabbit_amqqueue:queue_name_to_dir_name(Queue), + {DirName, Queue} + end || Queue <- DurableQueues ]), QueuesDir = queues_dir(), QueueDirNames = all_queue_directory_names(QueuesDir), DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)), @@ -375,20 +377,23 @@ recover(DurableQueues) -> case sets:is_element(QueueDirName, DurableDirectories) of true -> TermsAcc1 = - case rabbit_recovery_indexes:read_recovery_terms( + case rabbit_recovery_terms:read( QueueDirPath) of {error, _} -> TermsAcc; - {ok, Terms} -> [Terms | TermsAcc] + {ok, Terms} -> [{QueueDirPath, Terms} | + TermsAcc] end, {[dict:fetch(QueueDirName, DurableDict) | DurableAcc], TermsAcc1}; false -> ok = rabbit_file:recursive_delete([QueueDirPath]), - rabbit_recovery_indexes:remove_recovery_terms( - QueueDirPath), + %rabbit_recovery_indexes:remove_recovery_terms( + % QueueDirPath), {DurableAcc, TermsAcc} end end, {[], []}, QueueDirNames), + rabbit_recovery_terms:clear(), + rabbit_recovery_terms:flush(), {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. all_queue_directory_names(Dir) -> @@ -405,7 +410,8 @@ all_queue_directory_names(Dir) -> blank_state(QueueName) -> blank_state_dir( - filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). + filename:join(queues_dir(), + rabbit_amqqueue:queue_name_to_dir_name(QueueName))). blank_state_dir(Dir) -> {ok, MaxJournal} = @@ -501,10 +507,6 @@ recover_message(false, _, del, RelSeq, Segment) -> recover_message(false, _, no_del, RelSeq, Segment) -> add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - <<Num:128>> = erlang:md5(term_to_binary(Name)), - rabbit_misc:format("~.36B", [Num]). - queues_dir() -> filename:join(rabbit_mnesia:dir(), "queues"). diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index 48af9530..f8138e0e 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -24,9 +24,9 @@ -export([recover/0, upgrade_recovery_indexes/0, start_link/0, - store_recovery_terms/2, - read_recovery_terms/1, - remove_recovery_terms/1, + store/2, + read/1, + clear/0, flush/0]). -export([init/1, @@ -43,15 +43,16 @@ -spec(recover() -> 'ok'). -spec(upgrade_recovery_indexes() -> 'ok'). -spec(start_link() -> rabbit_types:ok_pid_or_error()). --spec(store_recovery_terms( +-spec(store( Name :: file:filename(), Terms :: term()) -> rabbit_types:ok_or_error(term())). --spec(read_recovery_terms( - file:filename()) -> - rabbit_types:ok_or_error(not_found)). --spec(remove_recovery_terms( +-spec(read( file:filename()) -> rabbit_types:ok_or_error(not_found)). +-spec(clear() -> 'ok'). +%-spec(remove_recovery_terms( +% file:filename()) -> +% rabbit_types:ok_or_error(not_found)). -endif. % use_specs @@ -76,7 +77,7 @@ upgrade_recovery_indexes() -> fun(F, Acc) -> [F|Acc] end, []), [begin {ok, Terms} = rabbit_file:read_term_file(File), - ok = store_recovery_terms(File, Terms), + ok = store(File, Terms), case file:delete(File) of {error, E} -> rabbit_log:warning("Unable to delete recovery index" @@ -93,20 +94,26 @@ upgrade_recovery_indexes() -> start_link() -> gen_server:start_link(?MODULE, [], []). -store_recovery_terms(Name, Terms) -> - dets:insert(?MODULE, {Name, Terms}). +store(Name, Terms) -> + dets:insert(?MODULE, {scrub(Name), Terms}). -read_recovery_terms(Name) -> - case dets:lookup(?MODULE, Name) of +read(Name) -> + case dets:lookup(?MODULE, scrub(Name)) of [{_, Terms}] -> {ok, Terms}; _ -> {error, not_found} end. -remove_recovery_terms(Name) -> - case dets:member(?MODULE, Name) of - true -> dets:delete(?MODULE, Name); - _ -> {error, not_found} - end. +scrub(Name) -> + filename:basename(Name). + +%remove_recovery_terms(Name) -> +% case dets:member(?MODULE, Name) of +% true -> dets:delete(?MODULE, Name); +% _ -> {error, not_found} +% end. + +clear() -> + dets:delete_all_objects(?MODULE). flush() -> dets:sync(?MODULE), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5fe319d3..7aafb23d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2129,11 +2129,12 @@ test_queue() -> init_test_queue() -> TestQueue = test_queue(), + %% TODO: shutdown_terms is no longer relevant - rework this test case Terms = rabbit_queue_index:shutdown_terms(TestQueue), PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( - TestQueue, Terms, false, + TestQueue, {clean_shutdown, Terms}, false, fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) end, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ac2b9f52..1b29ceb3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -391,12 +391,13 @@ start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:recover(DurableQueues), start_msg_store( - [Ref || Terms <- AllTerms, + [Ref || {_, Terms} <- AllTerms, begin Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined end], - StartFunState). + StartFunState), + {ok, AllTerms}. stop() -> stop_msg_store(). @@ -419,7 +420,7 @@ init(Queue, Recover, AsyncCallback) -> end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(#amqqueue { name = QueueName, durable = IsDurable }, false, +init(#amqqueue { name = QueueName, durable = IsDurable }, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], @@ -430,21 +431,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, true, +init(#amqqueue { name = QueueName, durable = true }, {_, Terms}, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> - Terms = rabbit_queue_index:shutdown_terms(QueueName), - {PRef, Terms1} = - case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), []}; - PRef1 -> {PRef1, Terms} - end, + %% Terms = rabbit_queue_index:shutdown_terms(QueueName), + {PRef, Recovery, Terms1} = process_recovery_terms(Terms), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, AsyncCallback), TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, IndexState} = rabbit_queue_index:recover( - QueueName, Terms1, + QueueName, {Recovery, Terms1}, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (MsgId) -> rabbit_msg_store:contains(MsgId, PersistentClient) @@ -453,6 +450,14 @@ init(#amqqueue { name = QueueName, durable = true }, true, init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). +process_recovery_terms(Recovery=non_clean_shutdown) -> + {rabbit_guid:gen(), Recovery, []}; +process_recovery_terms(Terms) -> + case proplists:get_value(persistent_ref, Terms) of + undefined -> {rabbit_guid:gen(), clean_shutdown, []}; + PRef1 -> {PRef1, clean_shutdown, Terms} + end. + terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, |