summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-19 16:12:55 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-19 16:12:55 +0100
commit08ed6e8c220693d8dfe94f28de84debe8aa4383d (patch)
tree07f94aaf530fffc93723d841658412eb86c025ca
parente00d04982ae1b630939120dc4f1c94e95a1cc63d (diff)
downloadrabbitmq-server-08ed6e8c220693d8dfe94f28de84debe8aa4383d.tar.gz
Allow crashing queues to recover themselves.
-rw-r--r--src/rabbit_amqqueue_process.erl79
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_prequeue.erl3
3 files changed, 48 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 84832f9f..590a8be0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -136,15 +136,15 @@ init_declared(Recover, From, Q = #amqqueue{name = QName,
finish_init(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
- {Recovery, TermsOrNew} = recovery_status(Recover),
- gen_server2:reply(From, {new, Q}),
+ send_reply(From, Q),
+ {RecoveryPid, TermsOrNew} = recovery_status(Recover),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
BQ = backing_queue_module(Q),
BQS = bq_init(BQ, Q, TermsOrNew),
- recovery_barrier(Recovery),
+ recovery_barrier(RecoveryPid),
State1 = process_args_policy(State#q{backing_queue = BQ,
backing_queue_state = BQS}),
notify_decorators(startup, State1),
@@ -153,8 +153,20 @@ finish_init(Recover, From, State = #q{q = Q,
fun() -> emit_stats(State1) end),
{become, ?MODULE, State1, hibernate}.
-recovery_status(new) -> {new, new};
-recovery_status({Recover, Terms}) -> {Recover, Terms}.
+recovery_status(new) -> {no_barrier, new};
+recovery_status({Recover, Terms}) -> {Recover, Terms}.
+
+recovery_barrier(no_barrier) ->
+ ok;
+recovery_barrier(BarrierPid) ->
+ MRef = erlang:monitor(process, BarrierPid),
+ receive
+ {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
+send_reply(none, _Q) -> ok;
+send_reply(From, Q) -> gen_server2:reply(From, {new, Q}).
%% We have been promoted
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -194,8 +206,10 @@ terminate({shutdown, missing_owner} = Reason, State) ->
terminate_shutdown(terminate_delete(false, Reason, State), State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
-terminate(Reason, State) ->
- terminate_shutdown(terminate_delete(true, Reason, State), State).
+terminate(normal, State) -> %% auto-delete case
+ terminate_shutdown(terminate_delete(true, normal, State), State);
+terminate(_Reason, State) ->
+ terminate_crash(State).
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
@@ -211,6 +225,30 @@ terminate_delete(EmitStats, Reason,
BQS1
end.
+terminate_shutdown(Fun, State) ->
+ State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
+ lists:foldl(fun (F, S) -> F(S) end, State,
+ [fun stop_sync_timer/1,
+ fun stop_rate_timer/1,
+ fun stop_expiry_timer/1,
+ fun stop_ttl_timer/1]),
+ case BQS of
+ undefined -> State1;
+ _ -> ok = rabbit_memory_monitor:deregister(self()),
+ QName = qname(State),
+ notify_decorators(shutdown, State),
+ [emit_consumer_deleted(Ch, CTag, QName) ||
+ {Ch, CTag, _, _, _} <-
+ rabbit_queue_consumers:all(Consumers)],
+ State1#q{backing_queue_state = Fun(BQS)}
+ end.
+
+terminate_crash(State = #q{consumers = Consumers}) ->
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName) ||
+ {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)],
+ ok.
+
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -244,15 +282,6 @@ bq_init(BQ, Q, Recover) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
-recovery_barrier(new) ->
- ok;
-recovery_barrier(BarrierPid) ->
- MRef = erlang:monitor(process, BarrierPid),
- receive
- {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
- {'DOWN', MRef, process, _, _} -> ok
- end.
-
process_args_policy(State = #q{q = Q,
args_policy_version = N}) ->
ArgsTable =
@@ -304,24 +333,6 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
-terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
- lists:foldl(fun (F, S) -> F(S) end, State,
- [fun stop_sync_timer/1,
- fun stop_rate_timer/1,
- fun stop_expiry_timer/1,
- fun stop_ttl_timer/1]),
- case BQS of
- undefined -> State1;
- _ -> ok = rabbit_memory_monitor:deregister(self()),
- QName = qname(State),
- notify_decorators(shutdown, State),
- [emit_consumer_deleted(Ch, CTag, QName) ||
- {Ch, CTag, _, _, _} <-
- rabbit_queue_consumers:all(Consumers)],
- State1#q{backing_queue_state = Fun(BQS)}
- end.
-
reply(Reply, NewState) ->
{NewState1, Timeout} = next_state(NewState),
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 8b6fcc01..99909e55 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -48,5 +48,5 @@ start_queue_process(Node, Q, Hint) ->
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_prequeue, start_link, []},
- temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
+ transient, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index 2bfffa28..20808b1f 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -102,7 +102,8 @@ init_non_recovery(Q = #amqqueue{name = QueueName}, Hint) ->
new_slave ->
rabbit_mirror_queue_slave:init_slave(Q);
crash_restart ->
- exit(todo);
+ rabbit_amqqueue_process:init_declared(
+ {no_barrier, non_clean_shutdown}, none, Q);
sleep_retry ->
timer:sleep(25),
init_non_recovery(Q, Hint);