diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 16:12:55 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-19 16:12:55 +0100 |
commit | 08ed6e8c220693d8dfe94f28de84debe8aa4383d (patch) | |
tree | 07f94aaf530fffc93723d841658412eb86c025ca | |
parent | e00d04982ae1b630939120dc4f1c94e95a1cc63d (diff) | |
download | rabbitmq-server-08ed6e8c220693d8dfe94f28de84debe8aa4383d.tar.gz |
Allow crashing queues to recover themselves.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 79 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 2 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 3 |
3 files changed, 48 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 84832f9f..590a8be0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -136,15 +136,15 @@ init_declared(Recover, From, Q = #amqqueue{name = QName, finish_init(Recover, From, State = #q{q = Q, backing_queue = undefined, backing_queue_state = undefined}) -> - {Recovery, TermsOrNew} = recovery_status(Recover), - gen_server2:reply(From, {new, Q}), + send_reply(From, Q), + {RecoveryPid, TermsOrNew} = recovery_status(Recover), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQ = backing_queue_module(Q), BQS = bq_init(BQ, Q, TermsOrNew), - recovery_barrier(Recovery), + recovery_barrier(RecoveryPid), State1 = process_args_policy(State#q{backing_queue = BQ, backing_queue_state = BQS}), notify_decorators(startup, State1), @@ -153,8 +153,20 @@ finish_init(Recover, From, State = #q{q = Q, fun() -> emit_stats(State1) end), {become, ?MODULE, State1, hibernate}. -recovery_status(new) -> {new, new}; -recovery_status({Recover, Terms}) -> {Recover, Terms}. +recovery_status(new) -> {no_barrier, new}; +recovery_status({Recover, Terms}) -> {Recover, Terms}. + +recovery_barrier(no_barrier) -> + ok; +recovery_barrier(BarrierPid) -> + MRef = erlang:monitor(process, BarrierPid), + receive + {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, _, _} -> ok + end. + +send_reply(none, _Q) -> ok; +send_reply(From, Q) -> gen_server2:reply(From, {new, Q}). %% We have been promoted init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, @@ -194,8 +206,10 @@ terminate({shutdown, missing_owner} = Reason, State) -> terminate_shutdown(terminate_delete(false, Reason, State), State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(Reason, State) -> - terminate_shutdown(terminate_delete(true, Reason, State), State). +terminate(normal, State) -> %% auto-delete case + terminate_shutdown(terminate_delete(true, normal, State), State); +terminate(_Reason, State) -> + terminate_crash(State). terminate_delete(EmitStats, Reason, State = #q{q = #amqqueue{name = QName}, @@ -211,6 +225,30 @@ terminate_delete(EmitStats, Reason, BQS1 end. +terminate_shutdown(Fun, State) -> + State1 = #q{backing_queue_state = BQS, consumers = Consumers} = + lists:foldl(fun (F, S) -> F(S) end, State, + [fun stop_sync_timer/1, + fun stop_rate_timer/1, + fun stop_expiry_timer/1, + fun stop_ttl_timer/1]), + case BQS of + undefined -> State1; + _ -> ok = rabbit_memory_monitor:deregister(self()), + QName = qname(State), + notify_decorators(shutdown, State), + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _, _} <- + rabbit_queue_consumers:all(Consumers)], + State1#q{backing_queue_state = Fun(BQS)} + end. + +terminate_crash(State = #q{consumers = Consumers}) -> + QName = qname(State), + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], + ok. + code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -244,15 +282,6 @@ bq_init(BQ, Q, Recover) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). -recovery_barrier(new) -> - ok; -recovery_barrier(BarrierPid) -> - MRef = erlang:monitor(process, BarrierPid), - receive - {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); - {'DOWN', MRef, process, _, _} -> ok - end. - process_args_policy(State = #q{q = Q, args_policy_version = N}) -> ArgsTable = @@ -304,24 +333,6 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. -terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue_state = BQS, consumers = Consumers} = - lists:foldl(fun (F, S) -> F(S) end, State, - [fun stop_sync_timer/1, - fun stop_rate_timer/1, - fun stop_expiry_timer/1, - fun stop_ttl_timer/1]), - case BQS of - undefined -> State1; - _ -> ok = rabbit_memory_monitor:deregister(self()), - QName = qname(State), - notify_decorators(shutdown, State), - [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _, _} <- - rabbit_queue_consumers:all(Consumers)], - State1#q{backing_queue_state = Fun(BQS)} - end. - reply(Reply, NewState) -> {NewState1, Timeout} = next_state(NewState), {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 8b6fcc01..99909e55 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -48,5 +48,5 @@ start_queue_process(Node, Q, Hint) -> init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_prequeue, start_link, []}, - temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process, + transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process, rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index 2bfffa28..20808b1f 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -102,7 +102,8 @@ init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) -> new_slave -> rabbit_mirror_queue_slave:init_slave(Q); crash_restart -> - exit(todo); + rabbit_amqqueue_process:init_declared( + {no_barrier, non_clean_shutdown}, none, Q); sleep_retry -> timer:sleep(25), init_non_recovery(Q, Hint); |