diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 12:56:56 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 12:56:56 +0100 |
commit | 8694829bcae54c2d5d52730f94a37492c5c35257 (patch) | |
tree | 5e3c081b275a031a9621aefdb9cdcd5ae420a5d6 | |
parent | 4da65790598f6387d99092e7d67745d34d3ab821 (diff) | |
parent | 3a12c784c5b70b417b979f3cf0e789bb1915dcce (diff) | |
download | rabbitmq-server-8694829bcae54c2d5d52730f94a37492c5c35257.tar.gz |
Merge in default (no-op)
-rw-r--r-- | src/gen_server2.erl | 14 | ||||
-rw-r--r-- | src/rabbit.erl | 9 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 96 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 41 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 34 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 52 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 31 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 37 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 103 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 10 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 19 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 25 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 8 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 2 |
20 files changed, 352 insertions, 182 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ee82bcb3..d2f96b52 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -69,7 +69,9 @@ %% which will be passed into any of the callback functions in the new %% module. Note there is no form also encompassing a reply, thus if %% you wish to reply in handle_call/3 and change the callback module, -%% you need to use gen_server2:reply/2 to issue the reply manually. +%% you need to use gen_server2:reply/2 to issue the reply +%% manually. The init function can similarly return a 5th argument, +%% Module, in order to dynamically decide the callback module on init. %% %% 8) The callback module can optionally implement %% format_message_queue/2 which is the equivalent of format_status/2 @@ -125,6 +127,7 @@ %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} +%%% {ok, State, Timeout, Backoff, Module} %%% ignore %%% {stop, Reason} %%% @@ -242,6 +245,8 @@ {ok, State :: term(), timeout() | hibernate} | {ok, State :: term(), timeout() | hibernate, {backoff, millis(), millis(), millis()}} | + {ok, State :: term(), timeout() | hibernate, + {backoff, millis(), millis(), millis()}, atom()} | ignore | {stop, Reason :: term()}. -callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, @@ -568,6 +573,13 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> loop(GS2State #gs2_state { state = State, time = Timeout, timeout_state = Backoff1 }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}, Mod1} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { mod = Mod1, + state = State, + time = Timeout, + timeout_state = Backoff1 }); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before diff --git a/src/rabbit.erl b/src/rabbit.erl index b00a1ad7..bd34cf8b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -134,17 +134,10 @@ {requires, core_initialized}, {enables, routing_ready}]}). --rabbit_boot_step({mirror_queue_slave_sup, - [{description, "mirror queue slave sup"}, - {mfa, {rabbit_sup, start_supervisor_child, - [rabbit_mirror_queue_slave_sup]}}, - {requires, recovery}, - {enables, routing_ready}]}). - -rabbit_boot_step({mirrored_queues, [{description, "adding mirrors to queues"}, {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, - {requires, mirror_queue_slave_sup}, + {requires, recovery}, {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 692179fc..e026279f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,7 +17,8 @@ -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, declare/6, - delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). + delete_immediately/1, delete/3, purge/1, forget_all_durable/1, + delete_crashed/1, delete_crashed_internal/1]). -export([pseudo_queue/2, immutable/1]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -49,7 +50,7 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0]). +-export_type([name/0, qmsg/0, absent_reason/0]). -type(name() :: rabbit_types:r('queue')). -type(qpids() :: [pid()]). @@ -59,10 +60,11 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). +-type(absent_reason() :: 'nodedown' | 'crashed'). -type(queue_or_absent() :: rabbit_types:amqqueue() | - {'absent', rabbit_types:amqqueue()}). --type(not_found_or_absent() :: 'not_found' | - {'absent', rabbit_types:amqqueue()}). + {'absent', rabbit_types:amqqueue(),absent_reason()}). +-type(not_found_or_absent() :: + 'not_found' | {'absent', rabbit_types:amqqueue(), absent_reason()}). -spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). -spec(start/1 :: ([rabbit_types:amqqueue()]) -> 'ok'). @@ -74,8 +76,9 @@ -spec(declare/6 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid()), node()) - -> {'new' | 'existing' | 'absent' | 'owner_died', - rabbit_types:amqqueue()} | rabbit_types:channel_exit()). + -> {'new' | 'existing' | 'owner_died', rabbit_types:amqqueue()} | + {'absent', rabbit_types:amqqueue(), absent_reason()} | + rabbit_types:channel_exit()). -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_absent() | rabbit_misc:thunk(queue_or_absent())). @@ -138,6 +141,8 @@ -> qlen() | rabbit_types:error('in_use') | rabbit_types:error('not_empty')). +-spec(delete_crashed/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_crashed_internal/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> @@ -209,14 +214,14 @@ recover() -> BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, - {rabbit_amqqueue_sup, - {rabbit_amqqueue_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}), recover_durable_queues(lists:zip(DurableQueues, OrderedRecoveryTerms)). stop() -> - ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup), - ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup), + ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup_sup), + ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup_sup), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:stop(). @@ -241,9 +246,9 @@ find_durable_queues() -> recover_durable_queues(QueuesAndRecoveryTerms) -> {Results, Failures} = - gen_server2:mcall([{start_queue_process(node(), Q), - {init, {self(), Terms}}} || - {Q, Terms} <- QueuesAndRecoveryTerms]), + gen_server2:mcall( + [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery), + {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]), [rabbit_log:error("Queue ~p failed to initialise: ~p~n", [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. @@ -269,7 +274,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> down_slave_nodes = [], gm_pids = []})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), - gen_server2:call(start_queue_process(Node, Q), {init, new}, infinity). + gen_server2:call( + rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), + {init, new}, infinity). internal_declare(Q, true) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -280,18 +287,14 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case not_found_or_absent(QueueName) of - not_found -> Q1 = rabbit_policy:set(Q), - ok = store_queue(Q1), - B = add_default_binding(Q1), - fun () -> B(), Q1 end; - {absent, _Q} = R -> rabbit_misc:const(R) + not_found -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; + {absent, _Q, _} = R -> rabbit_misc:const(R) end; - [ExistingQ = #amqqueue{pid = QPid}] -> - case rabbit_misc:is_process_alive(QPid) of - true -> rabbit_misc:const(ExistingQ); - false -> TailFun = internal_delete(QueueName), - fun () -> TailFun(), ExistingQ end - end + [ExistingQ] -> + rabbit_misc:const(ExistingQ) end end). @@ -342,10 +345,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -start_queue_process(Node, Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), - Pid. - add_default_binding(#amqqueue{name = QueueName}) -> ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, @@ -368,7 +367,7 @@ not_found_or_absent(Name) -> %% rabbit_queue and not found anything case mnesia:read({rabbit_durable_queue, Name}) of [] -> not_found; - [Q] -> {absent, Q} %% Q exists on stopped node + [Q] -> {absent, Q, nodedown} %% Q exists on stopped node end. not_found_or_absent_dirty(Name) -> @@ -377,7 +376,7 @@ not_found_or_absent_dirty(Name) -> %% and only affect the error kind. case rabbit_misc:dirty_read({rabbit_durable_queue, Name}) of {error, not_found} -> not_found; - {ok, Q} -> {absent, Q} + {ok, Q} -> {absent, Q, nodedown} end. with(Name, F, E) -> @@ -391,8 +390,11 @@ with(Name, F, E) -> %% the retry loop. rabbit_misc:with_exit_handler( fun () -> false = rabbit_misc:is_process_alive(QPid), - timer:sleep(25), - with(Name, F, E) + case crashed_or_recovering(Q) of + crashed -> E({absent, Q, crashed}); + recovering -> timer:sleep(25), + with(Name, F, E) + end end, fun () -> F(Q) end); {error, not_found} -> E(not_found_or_absent_dirty(Name)) @@ -401,10 +403,22 @@ with(Name, F, E) -> with(Name, F) -> with(Name, F, fun (E) -> {error, E} end). with_or_die(Name, F) -> - with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); - ({absent, Q}) -> rabbit_misc:absent(Q) + with(Name, F, fun (not_found) -> rabbit_misc:not_found(Name); + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end). +%% TODO we could still be wrong here if we happen to call in the +%% middle of a crash-failover. We could try to figure out whether +%% that's happening by looking for the supervisor - but we'd need some +%% additional book keeping to know what it is... +crashed_or_recovering(#amqqueue{pid = QPid, slave_pids = []}) -> + case lists:member(node(QPid), [node() | nodes()]) of + true -> crashed; + false -> recovering + end; +crashed_or_recovering(_Q) -> + recovering. + assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> @@ -565,6 +579,14 @@ delete_immediately(QPids) -> delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate:call(QPid, {delete, IfUnused, IfEmpty}). +delete_crashed(#amqqueue{ pid = QPid } = Q) -> + rpc:call(node(QPid), ?MODULE, delete_crashed_internal, [Q]). + +delete_crashed_internal(#amqqueue{ name = QName }) -> + {ok, BQ} = application:get_env(backing_queue_module), + BQ:delete_crashed(QName), + ok = internal_delete(QName). + purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 42c96807..d37e95c3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,7 +24,7 @@ -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster --export([start_link/1, info_keys/0]). +-export([info_keys/0]). -export([init_with_backing_queue_state/7]). @@ -61,8 +61,6 @@ -ifdef(use_specs). --spec(start_link/1 :: - (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(init_with_backing_queue_state/7 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), @@ -102,8 +100,6 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys(). statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys(). @@ -113,7 +109,8 @@ init(Q) -> process_flag(trap_exit, true), ?store_proc_name(Q#amqqueue.name), {ok, init_state(Q#amqqueue{pid = self()}), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}, + ?MODULE}. init_state(Q) -> State = #q{q = Q, @@ -140,7 +137,7 @@ init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> false -> #q{backing_queue = undefined, backing_queue_state = undefined, q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), + send_reply(From, {owner_died, Q}), BQ = backing_queue_module(Q), {_, Terms} = recovery_status(Recover), BQS = bq_init(BQ, Q, Terms), @@ -152,12 +149,12 @@ init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> init_it2(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - {Recovery, TermsOrNew} = recovery_status(Recover), - case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of + {Barrier, TermsOrNew} = recovery_status(Recover), + case rabbit_amqqueue:internal_declare(Q, Recover /= new) of #amqqueue{} = Q1 -> - case matches(Recovery, Q, Q1) of + case matches(Recover, Q, Q1) of true -> - gen_server2:reply(From, {new, Q}), + send_reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( @@ -165,7 +162,7 @@ init_it2(Recover, From, State = #q{q = Q, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q1), BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(Recovery), + recovery_barrier(Barrier), State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), @@ -182,8 +179,11 @@ init_it2(Recover, From, State = #q{q = Q, {stop, normal, Err, State} end. -recovery_status(new) -> {new, new}; -recovery_status({Recover, Terms}) -> {Recover, Terms}. +recovery_status(new) -> {no_barrier, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. + +send_reply(none, _Q) -> ok; +send_reply(From, Q) -> gen_server2:reply(From, Q). matches(new, Q1, Q2) -> %% i.e. not policy @@ -197,7 +197,7 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -recovery_barrier(new) -> +recovery_barrier(no_barrier) -> ok; recovery_barrier(BarrierPid) -> MRef = erlang:monitor(process, BarrierPid), @@ -206,6 +206,7 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. +%% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, Deliveries, Senders, MTC) -> case Owner of @@ -232,8 +233,11 @@ terminate({shutdown, missing_owner} = Reason, State) -> terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(Reason, State) -> - terminate_shutdown(terminate_delete(true, Reason, State), State). +terminate(normal, State) -> %% delete case + terminate_shutdown(terminate_delete(true, normal, State), State); +%% If we crashed don't try to clean up the BQS, probably best to leave it. +terminate(_Reason, State) -> + terminate_shutdown(fun (BQS) -> BQS end, State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, @@ -1084,6 +1088,9 @@ handle_call(sync_mirrors, _From, State) -> handle_call(cancel_sync_mirrors, _From, State) -> reply({ok, not_syncing}, State). +handle_cast(init, State) -> + init_it({no_barrier, non_clean_shutdown}, none, State); + handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0fd64c26..465c0412 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -18,35 +18,33 @@ -behaviour(supervisor2). --export([start_link/0, start_child/2]). +-export([start_link/2]). -export([init/1]). -include("rabbit.hrl"). --define(SERVER, ?MODULE). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(start_child/2 :: - (node(), [any()]) -> rabbit_types:ok(pid() | undefined) | - rabbit_types:ok({pid(), any()}) | - rabbit_types:error(any())). +-spec(start_link/2 :: (rabbit_types:amqqueue(), rabbit_prequeue:start_mode()) -> + {'ok', pid(), pid()}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_child(Node, Args) -> - supervisor2:start_child({?SERVER, Node}, Args). - -init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. +start_link(Q, StartMode) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + ChildSpec = {rabbit_amqqueue, + {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + rabbit_mirror_queue_slave]}, + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), + unlink(Marker), + Marker ! stop, + {ok, SupPid, QPid}. + +init([]) -> {ok, {{one_for_one, 5, 10}, []}}. diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl new file mode 100644 index 00000000..793cb7c9 --- /dev/null +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -0,0 +1,52 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue_sup_sup). + +-behaviour(supervisor2). + +-export([start_link/0, start_queue_process/3]). + +-export([init/1]). + +-include("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_queue_process/3 :: (node(), rabbit_types:amqqueue(), + 'declare' | 'recovery' | 'slave') -> pid()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_queue_process(Node, Q, StartMode) -> + {ok, _SupPid, QPid} = supervisor2:start_child( + {?SERVER, Node}, [Q, StartMode]), + QPid. + +init([]) -> + {ok, {{simple_one_for_one, 10, 10}, + [{rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, + temporary, ?MAX_WAIT, supervisor, [rabbit_amqqueue_sup]}]}}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 098f5f43..310b8220 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -85,6 +85,10 @@ %% content. -callback delete_and_terminate(any(), state()) -> state(). +%% Called to clean up after a crashed queue. In this case we don't +%% have a process and thus a state(), we are just removing on-disk data. +-callback delete_crashed(rabbit_amqqueue:name()) -> 'ok'. + %% Remove all 'fetchable' messages from the queue, i.e. all messages %% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index d887f26a..12082af8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -363,7 +363,7 @@ not_found_or_absent_errs(Names) -> absent_errs_only(Names) -> Errs = [E || Name <- Names, - {absent, _Q} = E <- [not_found_or_absent(Name)]], + {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]], rabbit_misc:const(case Errs of [] -> ok; _ -> {error, {resources_missing, Errs}} @@ -376,8 +376,8 @@ not_found_or_absent(#resource{kind = exchange} = Name) -> {not_found, Name}; not_found_or_absent(#resource{kind = queue} = Name) -> case rabbit_amqqueue:not_found_or_absent(Name) of - not_found -> {not_found, Name}; - {absent, _Q} = R -> R + not_found -> {not_found, Name}; + {absent, _Q, _Reason} = R -> R end. contains(Table, MatchHead) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e5a90410..fc433898 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1189,16 +1189,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% must have been created between the stat and the %% declare. Loop around again. handle_method(Declare, none, State); - {absent, Q} -> - rabbit_misc:absent(Q); + {absent, Q, Reason} -> + rabbit_misc:absent(Q, Reason); {owner_died, _Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. return_queue_declare_ok(QueueName, NoWait, 0, 0, State) end; - {error, {absent, Q}} -> - rabbit_misc:absent(Q) + {error, {absent, Q, Reason}} -> + rabbit_misc:absent(Q, Reason) end; handle_method(#'queue.declare'{queue = QueueNameBin, @@ -1227,8 +1227,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin, rabbit_amqqueue:check_exclusive_access(Q, ConnPid), rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end, - fun (not_found) -> {ok, 0}; - ({absent, Q}) -> rabbit_misc:absent(Q) + fun (not_found) -> {ok, 0}; + ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q), + {ok, 0}; + ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason) end) of {error, in_use} -> precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]); @@ -1477,8 +1479,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, end) of {error, {resources_missing, [{not_found, Name} | _]}} -> rabbit_misc:not_found(Name); - {error, {resources_missing, [{absent, Q} | _]}} -> - rabbit_misc:absent(Q); + {error, {resources_missing, [{absent, Q, Reason} | _]}} -> + rabbit_misc:absent(Q, Reason); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9bccf5dd..1bea8042 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -24,7 +24,7 @@ needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2]). --export([start/1, stop/0]). +-export([start/1, stop/0, delete_crashed/1]). -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). @@ -90,6 +90,9 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). +delete_crashed(_QName) -> + exit({not_valid_for_generic_backing_queue, ?MODULE}). + init(Q, Recover, AsyncCallback) -> {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9e8c4a18..aec6f93d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -202,31 +202,20 @@ add_mirrors(QName, Nodes, SyncMode) -> add_mirror(QName, MirrorNode, SyncMode) -> case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q} -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q, SyncMode); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> {ok, already_mirrored}; - false -> start_child(Name, MirrorNode, Q, SyncMode) - end - end; + {ok, Q} -> + rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun () -> + SPid = rabbit_amqqueue_sup_sup:start_queue_process( + MirrorNode, Q, slave), + log_info(QName, "Adding mirror on node ~p: ~p~n", + [MirrorNode, SPid]), + rabbit_mirror_queue_slave:go(SPid, SyncMode) + end); {error, not_found} = E -> E end. -start_child(Name, MirrorNode, Q, SyncMode) -> - rabbit_misc:with_exit_handler( - rabbit_misc:const(ok), - fun () -> - {ok, SPid} = rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]), - log_info(Name, "Adding mirror on node ~p: ~p~n", - [MirrorNode, SPid]), - rabbit_mirror_queue_slave:go(SPid, SyncMode) - end). - report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6d0064ab..2da2e7a5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1, go/2]). +-export([set_maximum_since_use/2, info/1, go/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -71,8 +71,6 @@ %%---------------------------------------------------------------------------- -start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). - set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). @@ -82,7 +80,7 @@ init(Q) -> ?store_proc_name(Q#amqqueue.name), {ok, {not_started, Q}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}. + ?DESIRED_HIBERNATE}, ?MODULE}. go(SPid, sync) -> gen_server2:call(SPid, go, infinity); go(SPid, async) -> gen_server2:cast(SPid, go). @@ -122,6 +120,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, + ok = rabbit_queue_index:erase(QName), %% For crash recovery BQS = bq_init(BQ, Q1, new), State = #state { q = Q1, gm = GM, diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl deleted file mode 100644 index b631cc31..00000000 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ /dev/null @@ -1,37 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. -%% - --module(rabbit_mirror_queue_slave_sup). - --behaviour(supervisor2). - --export([start_link/0, start_child/2]). - --export([init/1]). - --include_lib("rabbit.hrl"). - --define(SERVER, ?MODULE). - -start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). - -init([]) -> - {ok, {{simple_one_for_one, 10, 10}, - [{rabbit_mirror_queue_slave, - {rabbit_mirror_queue_slave, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c4148bbf..77ac5c44 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -21,7 +21,7 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1, absent/1]). +-export([not_found/1, absent/2]). -export([type_class/1, assert_args_equivalence/4]). -export([dirty_read/1]). -export([table_lookup/2, set_table_value/4]). @@ -119,7 +119,8 @@ -spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> channel_or_connection_exit()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). --spec(absent/1 :: (rabbit_types:amqqueue()) -> rabbit_types:channel_exit()). +-spec(absent/2 :: (rabbit_types:amqqueue(), rabbit_amqqueue:absent_reason()) + -> rabbit_types:channel_exit()). -spec(type_class/1 :: (rabbit_framing:amqp_field_type()) -> atom()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), @@ -292,14 +293,18 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). -absent(#amqqueue{name = QueueName, pid = QPid, durable = true}) -> +absent(#amqqueue{name = QueueName, pid = QPid, durable = true}, nodedown) -> %% The assertion of durability is mainly there because we mention %% durability in the error message. That way we will hopefully %% notice if at some future point our logic changes s.t. we get %% here with non-durable queues. protocol_error(not_found, "home node '~s' of durable ~s is down or inaccessible", - [node(QPid), rs(QueueName)]). + [node(QPid), rs(QueueName)]); + +absent(#amqqueue{name = QueueName}, crashed) -> + protocol_error(not_found, + "~s has crashed and failed to restart", [rs(QueueName)]). type_class(byte) -> int; type_class(short) -> int; diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl new file mode 100644 index 00000000..b1d92b89 --- /dev/null +++ b/src/rabbit_prequeue.erl @@ -0,0 +1,103 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_prequeue). + +%% This is the initial gen_server that all queue processes start off +%% as. It handles the decision as to whether we need to start a new +%% slave, a new master/unmirrored, or whether we are restarting (and +%% if so, as what). Thus a crashing queue process can restart from here +%% and always do the right thing. + +-export([start_link/3]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-behaviour(gen_server2). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([start_mode/0]). + +-type(start_mode() :: 'declare' | 'recovery' | 'slave'). + +-spec(start_link/3 :: (rabbit_types:amqqueue(), start_mode(), pid()) + -> rabbit_types:ok_pid_or_error()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Q, StartMode, Marker) -> + gen_server2:start_link(?MODULE, {Q, StartMode, Marker}, []). + +%%---------------------------------------------------------------------------- + +init({Q, StartMode, Marker}) -> + init(Q, case {is_process_alive(Marker), StartMode} of + {true, slave} -> slave; + {true, _} -> master; + {false, _} -> restart + end). + +init(Q, master) -> rabbit_amqqueue_process:init(Q); +init(Q, slave) -> rabbit_mirror_queue_slave:init(Q); + +init(#amqqueue{name = QueueName}, restart) -> + {ok, Q = #amqqueue{pid = QPid, + slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), + Local = node(QPid) =:= node(), + Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], + case rabbit_misc:is_process_alive(QPid) of + true -> false = Local, %% assertion + rabbit_mirror_queue_slave:go(self(), async), + rabbit_mirror_queue_slave:init(Q); %% [1] + false -> case Local andalso Slaves =:= [] of + true -> crash_restart(Q); %% [2] + false -> timer:sleep(25), + init(Q, restart) %% [3] + end + end. +%% [1] There is a master on another node. Regardless of whether we +%% were originally a master or a slave, we are now a new slave. +%% +%% [2] Nothing is alive. We are the last best hope. Try to restart as a master. +%% +%% [3] The current master is dead but either there are alive slaves to +%% take over or it's all happening on a different node anyway. This is +%% not a stable situation. Sleep and wait for somebody else to make a +%% move. + +crash_restart(Q = #amqqueue{name = QueueName}) -> + rabbit_log:error("Restarting crashed ~s.~n", [rabbit_misc:rs(QueueName)]), + gen_server2:cast(self(), init), + rabbit_amqqueue_process:init(Q#amqqueue{pid = self()}). + +%%---------------------------------------------------------------------------- + +%% This gen_server2 always hands over to some other module at the end +%% of init/1. +handle_call(_Msg, _From, _State) -> exit(unreachable). +handle_cast(_Msg, _State) -> exit(unreachable). +handle_info(_Msg, _State) -> exit(unreachable). +terminate(_Reason, _State) -> exit(unreachable). +code_change(_OldVsn, _State, _Extra) -> exit(unreachable). + diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0f572866..f21b44bc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_index). --export([init/2, recover/5, +-export([erase/1, init/2, recover/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -200,6 +200,7 @@ {rabbit_types:msg_id(), non_neg_integer(), A})). -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). +-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), contains_predicate(), on_sync_fun()) -> @@ -233,6 +234,13 @@ %% public API %%---------------------------------------------------------------------------- +erase(Name) -> + #qistate { dir = Dir } = blank_state(Name), + case rabbit_file:is_dir(Dir) of + true -> rabbit_file:recursive_delete([Dir]); + false -> ok + end. + init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = rabbit_file:is_file(Dir), %% is_file == is file or dir diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a186fb7a..7018bffe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1669,19 +1669,22 @@ test_declare_on_dead_queue(SecondaryNode) -> Self ! {self(), killed, QPid} end), receive - {Pid, killed, QPid} -> - {existing, #amqqueue{name = QueueName, - pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], none), - false = rabbit_misc:is_process_alive(QPid), - {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], - none), - true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {Pid, killed, OldPid} -> + Q = dead_queue_loop(QueueName, OldPid), {ok, 0} = rabbit_amqqueue:delete(Q, false, false), passed after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) end. +dead_queue_loop(QueueName, OldPid) -> + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none), + case Q#amqqueue.pid of + OldPid -> timer:sleep(25), + dead_queue_loop(QueueName, OldPid); + _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + Q + end. + %%--------------------------------------------------------------------- control_action(Command, Args) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e97ed491..e858fb3d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,8 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, +-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, @@ -443,22 +444,25 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(#amqqueue { name = QueueName, durable = true }, Terms, +%% We can be recovering a transient queue if it crashed +init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> {PRef, RecoveryTerms} = process_recovery_terms(Terms), - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback), + {PersistentClient, ContainsCheckFun} = + case IsDurable of + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun, AsyncCallback), + {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end}; + false -> {undefined, fun(_MsgId) -> false end} + end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (MsgId) -> - rabbit_msg_store:contains(MsgId, PersistentClient) - end, - MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, + ContainsCheckFun, MsgIdxOnDiskFun), + init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -507,6 +511,9 @@ delete_and_terminate(_Reason, State) -> a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). +delete_crashed(QName) -> + ok = rabbit_queue_index:erase(QName). + purge(State = #vqstate { q4 = Q4, index_state = IndexState, msg_store_clients = MSCState, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index cfa3add4..2c1e15f0 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -94,10 +94,10 @@ delete(VHostPath) -> [ok = Fun() || Fun <- Funs], ok. -assert_benign(ok) -> ok; -assert_benign({ok, _}) -> ok; -assert_benign({error, not_found}) -> ok; -assert_benign({error, {absent, Q}}) -> +assert_benign(ok) -> ok; +assert_benign({ok, _}) -> ok; +assert_benign({error, not_found}) -> ok; +assert_benign({error, {absent, Q, nodedown}}) -> %% We have a durable queue on a down node. Removing the mnesia %% entries here is safe. If/when the down node restarts, it will %% clear out the on-disk storage of the queue. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 6fe65c12..212cf973 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -34,7 +34,7 @@ %% Like erlang:memory(), but with awareness of rabbit-y things memory() -> ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup], - QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup], + QProcs = [rabbit_amqqueue_sup], MsgIndexProcs = [msg_store_transient, msg_store_persistent], MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), |