summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-10 12:50:45 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-10 12:50:45 +0100
commit4da65790598f6387d99092e7d67745d34d3ab821 (patch)
treedfaf024830bd3194b7d7740527f0a676be33a046
parent7f3db7d3444eab86adc543b35cdd39614a4163b2 (diff)
downloadrabbitmq-server-4da65790598f6387d99092e7d67745d34d3ab821.tar.gz
Move some chunks of code around and rename a couple of functions in order to be closer to bug 21446.
-rw-r--r--src/rabbit_amqqueue_process.erl199
1 files changed, 100 insertions, 99 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index db297c1d..42c96807 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -115,25 +115,6 @@ init(Q) ->
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, Deliveries, Senders, MTC) ->
- case Owner of
- none -> ok;
- _ -> erlang:monitor(process, Owner)
- end,
- State = init_state(Q),
- State1 = State#q{backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = RateTRef,
- senders = Senders,
- msg_id_to_channel = MTC},
- State2 = process_args_policy(State1),
- State3 = lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State2, Deliveries),
- notify_decorators(startup, State3),
- State3.
-
init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
@@ -145,38 +126,32 @@ init_state(Q) ->
args_policy_version = 0},
rabbit_event:init_stats_timer(State, #q.stats_timer).
-terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
-terminate({shutdown, missing_owner} = Reason, State) ->
- %% if the owner was missing then there will be no queue, so don't emit stats
- terminate_shutdown(terminate_delete(false, Reason, State), State);
-terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
-terminate(Reason, State) ->
- terminate_shutdown(terminate_delete(true, Reason, State), State).
+init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
+ init_it2(Recover, From, State);
-terminate_delete(EmitStats, Reason,
- State = #q{q = #amqqueue{name = QName},
- backing_queue = BQ}) ->
- fun (BQS) ->
- BQS1 = BQ:delete_and_terminate(Reason, BQS),
- if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
- fun() -> emit_stats(State) end);
- true -> ok
- end,
- %% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName),
- BQS1
+%% You used to be able to declare an exclusive durable queue. Sadly we
+%% need to still tidy up after that case, there could be the remnants
+%% of one left over from an upgrade. So that's why we don't enforce
+%% Recover = new here.
+init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ init_it2(Recover, From, State);
+ false -> #q{backing_queue = undefined,
+ backing_queue_state = undefined,
+ q = Q} = State,
+ gen_server2:reply(From, {owner_died, Q}),
+ BQ = backing_queue_module(Q),
+ {_, Terms} = recovery_status(Recover),
+ BQS = bq_init(BQ, Q, Terms),
+ %% Rely on terminate to delete the queue.
+ {stop, {shutdown, missing_owner},
+ State#q{backing_queue = BQ, backing_queue_state = BQS}}
end.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
-
-declare(Recover, From, State = #q{q = Q,
- backing_queue = undefined,
- backing_queue_state = undefined}) ->
+init_it2(Recover, From, State = #q{q = Q,
+ backing_queue = undefined,
+ backing_queue_state = undefined}) ->
{Recovery, TermsOrNew} = recovery_status(Recover),
case rabbit_amqqueue:internal_declare(Q, Recovery /= new) of
#amqqueue{} = Q1 ->
@@ -222,6 +197,81 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
+recovery_barrier(new) ->
+ ok;
+recovery_barrier(BarrierPid) ->
+ MRef = erlang:monitor(process, BarrierPid),
+ receive
+ {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
+ {'DOWN', MRef, process, _, _} -> ok
+ end.
+
+init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
+ RateTRef, Deliveries, Senders, MTC) ->
+ case Owner of
+ none -> ok;
+ _ -> erlang:monitor(process, Owner)
+ end,
+ State = init_state(Q),
+ State1 = State#q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ senders = Senders,
+ msg_id_to_channel = MTC},
+ State2 = process_args_policy(State1),
+ State3 = lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries),
+ notify_decorators(startup, State3),
+ State3.
+
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, missing_owner} = Reason, State) ->
+ %% if the owner was missing then there will be no queue, so don't emit stats
+ terminate_shutdown(terminate_delete(false, Reason, State), State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State) ->
+ terminate_shutdown(terminate_delete(true, Reason, State), State).
+
+terminate_delete(EmitStats, Reason,
+ State = #q{q = #amqqueue{name = QName},
+ backing_queue = BQ}) ->
+ fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
+ if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end);
+ true -> ok
+ end,
+ %% don't care if the internal delete doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(QName),
+ BQS1
+ end.
+
+terminate_shutdown(Fun, State) ->
+ State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
+ lists:foldl(fun (F, S) -> F(S) end, State,
+ [fun stop_sync_timer/1,
+ fun stop_rate_timer/1,
+ fun stop_expiry_timer/1,
+ fun stop_ttl_timer/1]),
+ case BQS of
+ undefined -> State1;
+ _ -> ok = rabbit_memory_monitor:deregister(self()),
+ QName = qname(State),
+ notify_decorators(shutdown, State),
+ [emit_consumer_deleted(Ch, CTag, QName) ||
+ {Ch, CTag, _, _, _} <-
+ rabbit_queue_consumers:all(Consumers)],
+ State1#q{backing_queue_state = Fun(BQS)}
+ end.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
maybe_notify_decorators(false, State) -> State;
maybe_notify_decorators(true, State) -> notify_decorators(State), State.
@@ -250,15 +300,6 @@ bq_init(BQ, Q, Recover) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
-recovery_barrier(new) ->
- ok;
-recovery_barrier(BarrierPid) ->
- MRef = erlang:monitor(process, BarrierPid),
- receive
- {BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
- {'DOWN', MRef, process, _, _} -> ok
- end.
-
process_args_policy(State = #q{q = Q,
args_policy_version = N}) ->
ArgsTable =
@@ -310,24 +351,6 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
-terminate_shutdown(Fun, State) ->
- State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
- lists:foldl(fun (F, S) -> F(S) end, State,
- [fun stop_sync_timer/1,
- fun stop_rate_timer/1,
- fun stop_expiry_timer/1,
- fun stop_ttl_timer/1]),
- case BQS of
- undefined -> State1;
- _ -> ok = rabbit_memory_monitor:deregister(self()),
- QName = qname(State),
- notify_decorators(shutdown, State),
- [emit_consumer_deleted(Ch, CTag, QName) ||
- {Ch, CTag, _, _, _} <-
- rabbit_queue_consumers:all(Consumers)],
- State1#q{backing_queue_state = Fun(BQS)}
- end.
-
reply(Reply, NewState) ->
{NewState1, Timeout} = next_state(NewState),
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
@@ -915,30 +938,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call({init, Recover}, From,
- State = #q{q = #amqqueue{exclusive_owner = none}}) ->
- declare(Recover, From, State);
-
-%% You used to be able to declare an exclusive durable queue. Sadly we
-%% need to still tidy up after that case, there could be the remnants
-%% of one left over from an upgrade. So that's why we don't enforce
-%% Recover = new here.
-handle_call({init, Recover}, From,
- State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rabbit_misc:is_process_alive(Owner) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- false -> #q{backing_queue = undefined,
- backing_queue_state = undefined,
- q = Q} = State,
- gen_server2:reply(From, {owner_died, Q}),
- BQ = backing_queue_module(Q),
- {_, Terms} = recovery_status(Recover),
- BQS = bq_init(BQ, Q, Terms),
- %% Rely on terminate to delete the queue.
- {stop, {shutdown, missing_owner},
- State#q{backing_queue = BQ, backing_queue_state = BQS}}
- end;
+handle_call({init, Recover}, From, State) ->
+ init_it(Recover, From, State);
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);