diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 86 |
1 files changed, 41 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe1ddba0..46f6674b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -118,19 +118,19 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = backing_queue_module(Q), - backing_queue_state = undefined, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = dict:new()}, hibernate, + State = #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = backing_queue_module(Q), + backing_queue_state = undefined, + active_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined, + ttl = undefined, + msg_id_to_channel = dict:new()}, + {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, @@ -140,25 +140,24 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, none -> ok; _ -> erlang:monitor(process, Owner) end, - State = requeue_and_run( - AckTags, - process_args( - #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = BQ, - backing_queue_state = BQS, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = RateTRef, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = MTC})), + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + msg_id_to_channel = MTC}, + State1 = requeue_and_run(AckTags, process_args( + rabbit_event:init_stats_timer( + State, #q.stats_timer))), lists:foldl( fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, - State, Deliveries). + State1, Deliveries). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); @@ -183,9 +182,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, - State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, - stats_timer = StatsTimer}) -> +declare(Recover, From, State = #q{q = Q, + backing_queue = BQ, + backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -199,7 +198,7 @@ declare(Recover, From, State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(StatsTimer, + rabbit_event:if_enabled(State1, #q.stats_timer, fun() -> emit_stats(State1) end), noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} @@ -315,10 +314,8 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. -ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = #amqqueue{pid = QPid}}) -> - State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, QPid, emit_stats)}. +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1120,10 +1117,10 @@ handle_info(maybe_expire, State) -> handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer), assert_invariant(State1), {noreply, State1, hibernate}; @@ -1167,18 +1164,17 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - stats_timer = StatsTimer}) -> + backing_queue_state = BQS}) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( - StatsTimer, + State, #q.stats_timer, fun () -> emit_stats(State, [{idle_since, now()}]) end), - State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS3}, + State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, + #q.stats_timer), {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). |