diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 13:29:19 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 13:29:19 +0100 |
commit | c06cf2cbf604aa26936d5a2f67705b3a55ee43ce (patch) | |
tree | daf594b94cc9d85c83af3f8b92120f602fda3336 | |
parent | 508e72d273eb223be2f0b36f04eecb88f87054e8 (diff) | |
download | rabbitmq-server-c06cf2cbf604aa26936d5a2f67705b3a55ee43ce.tar.gz |
First step along the way to queue restarts. Create a "prequeue" module whose job it is to decide how to proceed, and then have the supervisor start that. At the moment we only handle rabbit_amqqueue_process startup through this, but that will change. Also move the tx which decides whether we successfully declared the queue or not into the first part of the queue startup (before we call into the queue) since in future it will be needed for other things.
-rw-r--r-- | src/rabbit_amqqueue.erl | 47 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 138 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 104 |
4 files changed, 179 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 692179fc..e625572e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ cancel_sync_mirrors/1]). %% internal --export([internal_declare/2, internal_delete/1, run_backing_queue/3, +-export([internal_declare/1, internal_delete/1, run_backing_queue/3, set_ram_duration_target/2, set_maximum_since_use/2]). -include("rabbit.hrl"). @@ -76,9 +76,9 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) -> {'new' | 'existing' | 'absent' | 'owner_died', rabbit_types:amqqueue()} | rabbit_types:channel_exit()). --spec(internal_declare/2 :: - (rabbit_types:amqqueue(), boolean()) - -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). +%% -spec(internal_declare/2 :: +%% (rabbit_types:amqqueue(), boolean()) +%% -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). -spec(update/2 :: (name(), fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) @@ -196,6 +196,8 @@ arguments]). recover() -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_recovery, Marker), %% Clear out remnants of old incarnation, in case we restarted %% faster than other nodes handled DOWN messages from us. on_node_down(node()), @@ -212,7 +214,11 @@ recover() -> {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), - recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). + Recovered = recover_durable_queues( + lists:zip(DurableQueues, OrderedRecoveryTerms)), + unlink(Marker), + Marker ! stop, + Recovered. stop() -> ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), @@ -271,29 +277,14 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). -internal_declare(Q, true) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> ok = store_queue(Q), rabbit_misc:const(Q) end); -internal_declare(Q = #amqqueue{name = QueueName}, false) -> - rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case not_found_or_absent(QueueName) of - not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), - B = add_default_binding(Q1), - fun () -> B(), Q1 end; - {absent, _Q} = R -> rabbit_misc:const(R) - end; - [ExistingQ = #amqqueue{pid = QPid}] -> - case rabbit_misc:is_process_alive(QPid) of - true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), - fun () -> TailFun(), ExistingQ end - end - end - end). +internal_declare(Q = #amqqueue{name = QueueName}) -> + case not_found_or_absent(QueueName) of + not_found -> ok = store_queue(Q), + B = add_default_binding(Q), + %% TODO can we simplify return here? + {new, fun () -> B(), Q end}; + {absent, _Q} = R -> R + end. update(Name, Fun) -> case mnesia:wread({rabbit_queue, Name}) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index db297c1d..1c982dbb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,9 @@ -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster --export([start_link/1, info_keys/0]). +-export([info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_declared/3, init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -61,8 +61,8 @@ -ifdef(use_specs). --spec(start_link/1 :: - (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). +%% -spec(start_link/1 :: +%% (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(init_with_backing_queue_state/7 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), @@ -102,19 +102,64 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). %%---------------------------------------------------------------------------- -init(Q) -> +init(_) -> + exit(cannot_be_called_directly). + +%% We have just been declared or recovered +init_declared(Recover, From, Q = #amqqueue{name = QName, + exclusive_owner = Owner}) -> process_flag(trap_exit, true), - ?store_proc_name(Q#amqqueue.name), - {ok, init_state(Q#amqqueue{pid = self()}), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + ?store_proc_name(QName), + State = init_state(Q), + case Owner of + none -> finish_init(Recover, From, State); + _ -> case rabbit_misc:is_process_alive(Owner) of %% [1] + true -> erlang:monitor(process, Owner), + finish_init(Recover, From, State); + false -> gen_server2:reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, + backing_queue_state = BQS}} + end + end. +%% [1] You used to be able to declare an exclusive durable +%% queue. Sadly we need to still tidy up after that case, there could +%% be the remnants of one left over from an upgrade. So that's why we +%% don't enforce Recover = new here. + +finish_init(Recover, From, State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> + {Recovery, TermsOrNew} = recovery_status(Recover), + gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + BQ = backing_queue_module(Q), + BQS = bq_init(BQ, Q, TermsOrNew), + recovery_barrier(Recovery), + State1 = process_args_policy(State#q{backing_queue = BQ, + backing_queue_state = BQS}), + notify_decorators(startup, State1), + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + {become, ?MODULE, State1, hibernate}. + +recovery_status(new) -> {new, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. +%% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> case Owner of @@ -174,54 +219,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> - {Recovery, TermsOrNew} = recovery_status(Recover), - case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of - #amqqueue{} = Q1 -> - case matches(Recovery, Q, Q1) of - true -> - gen_server2:reply(From, {new, Q}), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - BQ = backing_queue_module(Q1), - BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(Recovery), - State1 = process_args_policy( - State#q{backing_queue = BQ, - backing_queue_state = BQS}), - notify_decorators(startup, State), - rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - noreply(State1); - false -> - {stop, normal, {existing, Q1}, State} - end; - Err -> - {stop, normal, Err, State} - end. - -recovery_status(new) -> {new, new}; -recovery_status({Recover, Terms}) -> {Recover, Terms}. - -matches(new, Q1, Q2) -> - %% i.e. not policy - Q1#amqqueue.name =:= Q2#amqqueue.name andalso - Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso - Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso - Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso - Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso - Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids; -matches(_, Q, Q) -> true; -matches(_, _Q, _Q1) -> false. - maybe_notify_decorators(false, State) -> State; maybe_notify_decorators(true, State) -> notify_decorators(State), State. @@ -915,31 +912,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = none}}) -> - declare(Recover, From, State); - -%% You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants -%% of one left over from an upgrade. So that's why we don't enforce -%% Recover = new here. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rabbit_misc:is_process_alive(Owner) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - false -> #q{backing_queue = undefined, - backing_queue_state = undefined, - q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, backing_queue_state = BQS}} - end; - handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0fd64c26..137422d4 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -48,5 +48,5 @@ start_child(Node, Args) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, + [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl new file mode 100644 index 00000000..148f5968 --- /dev/null +++ b/src/rabbit_prequeue.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_prequeue). + +%% This is the initial gen_server that all queue processes start off +%% as. It handles the decision as to whether we need to start a new +%% slave, a new master/unmirrored, whether we lost a race to declare a +%% new queue, or whether we are in recovery. Thus a crashing queue +%% process can restart from here and always do the right thing. + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-behaviour(gen_server2). + +-include("rabbit.hrl"). + +start_link(Q) -> + gen_server2:start_link(?MODULE, Q, []). + +%%---------------------------------------------------------------------------- + +init(Q) -> + %% Hand back to supervisor ASAP + gen_server2:cast(self(), init), + {ok, Q#amqqueue{pid = self()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}. + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(init, Q) -> + case whereis(rabbit_recovery) of + undefined -> init_non_recovery(Q); + _Pid -> init_recovery(Q) + end; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +init_non_recovery(Q = #amqqueue{name = QueueName}) -> + Result = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + {decl, rabbit_amqqueue:internal_declare(Q)}; + [ExistingQ = #amqqueue{pid = QPid}] -> + case rabbit_misc:is_process_alive(QPid) of + true -> {decl, {existing, ExistingQ}}; + false -> exit(todo) + end + end + end), + case Result of + {decl, DeclResult} -> + %% We have just been declared. Block waiting for an init + %% call so that we don't respond to any other message first + receive {'$gen_call', From, {init, new}} -> + case DeclResult of + {new, Fun} -> + Q1 = Fun(), + rabbit_amqqueue_process:init_declared(new,From, Q1); + {F, _} when F =:= absent; F =:= existing -> + gen_server2:reply(From, DeclResult), + {stop, normal, Q} + end + end + end. + +init_recovery(Q) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> ok = rabbit_amqqueue:store_queue(Q) end), + %% Again block waiting for an init call. + receive {'$gen_call', From, {init, Terms}} -> + rabbit_amqqueue_process:init_declared(Terms, From, Q) + end. |