diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 |
1 files changed, 34 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 91877efb..2c53a8e3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -43,7 +43,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2]). -import(queue). -import(erlang). @@ -153,7 +153,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined}) -> + backing_queue = BQ, backing_queue_state = undefined, + stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -164,9 +165,12 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + State1 = init_expires(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State)), - noreply(init_expires(State#q{backing_queue_state = BQS})); + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(StatsTimer, + fun() -> emit_stats(State1) end), + noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -266,14 +270,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer, q = Q}) -> State#q{stats_timer = rabbit_event:ensure_stats_timer( StatsTimer, - fun() -> emit_stats(State) end, fun() -> rabbit_amqqueue:emit_stats(Q) end)}. -stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> - State#q{stats_timer = rabbit_event:stop_stats_timer( - StatsTimer, - fun() -> emit_stats(State) end)}. - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -595,6 +593,7 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; + delete_exclusive -> 8; {maybe_run_queue_via_backing_queue, _Fun} -> 6; _ -> 0 end. @@ -613,6 +612,10 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; +prioritise_info(_Msg, _State) -> 0. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -782,6 +785,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ, active_consumers = ActiveConsumers}) -> reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); +handle_call(delete_exclusive, _From, + State = #q{ backing_queue_state = BQS, + backing_queue = BQ, + q = #amqqueue{exclusive_owner = Owner} + }) when Owner =/= none -> + {stop, normal, {ok, BQ:len(BQS)}, State}; + +handle_call(delete_exclusive, _From, State) -> + reply({error, not_exclusive}, State); + handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), @@ -910,9 +923,12 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; -handle_cast(emit_stats, State) -> +handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> + %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - noreply(State). + State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + assert_invariant(State1), + {noreply, State1}. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -943,11 +959,14 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + stats_timer = StatsTimer}) -> BQS1 = BQ:handle_pre_hibernate(BQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_stats_timer( - stop_rate_timer(State#q{backing_queue_state = BQS2}))}. + rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end), + State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), + backing_queue_state = BQS2}, + {hibernate, stop_rate_timer(State1)}. |