diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 12:50:45 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-10 12:50:45 +0100 |
commit | 4da65790598f6387d99092e7d67745d34d3ab821 (patch) | |
tree | dfaf024830bd3194b7d7740527f0a676be33a046 | |
parent | 7f3db7d3444eab86adc543b35cdd39614a4163b2 (diff) | |
download | rabbitmq-server-4da65790598f6387d99092e7d67745d34d3ab821.tar.gz |
Move some chunks of code around and rename a couple of functions in order to be closer to bug 21446.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 199 |
1 files changed, 100 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index db297c1d..42c96807 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -115,25 +115,6 @@ init(Q) -> {ok, init_state(Q#amqqueue{pid = self()}), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, Deliveries, Senders, MTC) -> - case Owner of - none -> ok; - _ -> erlang:monitor(process, Owner) - end, - State = init_state(Q), - State1 = State#q{backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = RateTRef, - senders = Senders, - msg_id_to_channel = MTC}, - State2 = process_args_policy(State1), - State3 = lists:foldl(fun (Delivery, StateN) -> - deliver_or_enqueue(Delivery, true, StateN) - end, State2, Deliveries), - notify_decorators(startup, State3), - State3. - init_state(Q) -> State = #q{q = Q, exclusive_consumer = none, @@ -145,38 +126,32 @@ init_state(Q) -> args_policy_version = 0}, rabbit_event:init_stats_timer(State, #q.stats_timer). -terminate(shutdown = R, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate({shutdown, missing_owner} = Reason, State) -> - %% if the owner was missing then there will be no queue, so don't emit stats - terminate_shutdown(terminate_delete(false, Reason, State), State); -terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(Reason, State) -> - terminate_shutdown(terminate_delete(true, Reason, State), State). +init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> + init_it2(Recover, From, State); -terminate_delete(EmitStats, Reason, - State = #q{q = #amqqueue{name = QName}, - backing_queue = BQ}) -> - fun (BQS) -> - BQS1 = BQ:delete_and_terminate(Reason, BQS), - if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer, - fun() -> emit_stats(State) end); - true -> ok - end, - %% don't care if the internal delete doesn't return 'ok'. - rabbit_amqqueue:internal_delete(QName), - BQS1 +%% You used to be able to declare an exclusive durable queue. Sadly we +%% need to still tidy up after that case, there could be the remnants +%% of one left over from an upgrade. So that's why we don't enforce +%% Recover = new here. +init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + init_it2(Recover, From, State); + false -> #q{backing_queue = undefined, + backing_queue_state = undefined, + q = Q} = State, + gen_server2:reply(From, {owner_died, Q}), + BQ = backing_queue_module(Q), + {_, Terms} = recovery_status(Recover), + BQS = bq_init(BQ, Q, Terms), + %% Rely on terminate to delete the queue. + {stop, {shutdown, missing_owner}, + State#q{backing_queue = BQ, backing_queue_state = BQS}} end. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- - -declare(Recover, From, State = #q{q = Q, - backing_queue = undefined, - backing_queue_state = undefined}) -> +init_it2(Recover, From, State = #q{q = Q, + backing_queue = undefined, + backing_queue_state = undefined}) -> {Recovery, TermsOrNew} = recovery_status(Recover), case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of #amqqueue{} = Q1 -> @@ -222,6 +197,81 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. +recovery_barrier(new) -> + ok; +recovery_barrier(BarrierPid) -> + MRef = erlang:monitor(process, BarrierPid), + receive + {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); + {'DOWN', MRef, process, _, _} -> ok + end. + +init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, + RateTRef, Deliveries, Senders, MTC) -> + case Owner of + none -> ok; + _ -> erlang:monitor(process, Owner) + end, + State = init_state(Q), + State1 = State#q{backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + senders = Senders, + msg_id_to_channel = MTC}, + State2 = process_args_policy(State1), + State3 = lists:foldl(fun (Delivery, StateN) -> + deliver_or_enqueue(Delivery, true, StateN) + end, State2, Deliveries), + notify_decorators(startup, State3), + State3. + +terminate(shutdown = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate({shutdown, missing_owner} = Reason, State) -> + %% if the owner was missing then there will be no queue, so don't emit stats + terminate_shutdown(terminate_delete(false, Reason, State), State); +terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); +terminate(Reason, State) -> + terminate_shutdown(terminate_delete(true, Reason, State), State). + +terminate_delete(EmitStats, Reason, + State = #q{q = #amqqueue{name = QName}, + backing_queue = BQ}) -> + fun (BQS) -> + BQS1 = BQ:delete_and_terminate(Reason, BQS), + if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer, + fun() -> emit_stats(State) end); + true -> ok + end, + %% don't care if the internal delete doesn't return 'ok'. + rabbit_amqqueue:internal_delete(QName), + BQS1 + end. + +terminate_shutdown(Fun, State) -> + State1 = #q{backing_queue_state = BQS, consumers = Consumers} = + lists:foldl(fun (F, S) -> F(S) end, State, + [fun stop_sync_timer/1, + fun stop_rate_timer/1, + fun stop_expiry_timer/1, + fun stop_ttl_timer/1]), + case BQS of + undefined -> State1; + _ -> ok = rabbit_memory_monitor:deregister(self()), + QName = qname(State), + notify_decorators(shutdown, State), + [emit_consumer_deleted(Ch, CTag, QName) || + {Ch, CTag, _, _, _} <- + rabbit_queue_consumers:all(Consumers)], + State1#q{backing_queue_state = Fun(BQS)} + end. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + maybe_notify_decorators(false, State) -> State; maybe_notify_decorators(true, State) -> notify_decorators(State), State. @@ -250,15 +300,6 @@ bq_init(BQ, Q, Recover) -> rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). -recovery_barrier(new) -> - ok; -recovery_barrier(BarrierPid) -> - MRef = erlang:monitor(process, BarrierPid), - receive - {BarrierPid, go} -> erlang:demonitor(MRef, [flush]); - {'DOWN', MRef, process, _, _} -> ok - end. - process_args_policy(State = #q{q = Q, args_policy_version = N}) -> ArgsTable = @@ -310,24 +351,6 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. -terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue_state = BQS, consumers = Consumers} = - lists:foldl(fun (F, S) -> F(S) end, State, - [fun stop_sync_timer/1, - fun stop_rate_timer/1, - fun stop_expiry_timer/1, - fun stop_ttl_timer/1]), - case BQS of - undefined -> State1; - _ -> ok = rabbit_memory_monitor:deregister(self()), - QName = qname(State), - notify_decorators(shutdown, State), - [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _, _} <- - rabbit_queue_consumers:all(Consumers)], - State1#q{backing_queue_state = Fun(BQS)} - end. - reply(Reply, NewState) -> {NewState1, Timeout} = next_state(NewState), {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. @@ -915,30 +938,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = none}}) -> - declare(Recover, From, State); - -%% You used to be able to declare an exclusive durable queue. Sadly we -%% need to still tidy up after that case, there could be the remnants -%% of one left over from an upgrade. So that's why we don't enforce -%% Recover = new here. -handle_call({init, Recover}, From, - State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rabbit_misc:is_process_alive(Owner) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - false -> #q{backing_queue = undefined, - backing_queue_state = undefined, - q = Q} = State, - gen_server2:reply(From, {owner_died, Q}), - BQ = backing_queue_module(Q), - {_, Terms} = recovery_status(Recover), - BQS = bq_init(BQ, Q, Terms), - %% Rely on terminate to delete the queue. - {stop, {shutdown, missing_owner}, - State#q{backing_queue = BQ, backing_queue_state = BQS}} - end; +handle_call({init, Recover}, From, State) -> + init_it(Recover, From, State); handle_call(info, _From, State) -> reply(infos(info_keys(), State), State); |