summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl54
1 files changed, 19 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2b0fe17e..435edc07 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -21,8 +21,6 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(BASE_MESSAGE_PROPERTIES,
#message_properties{expiry = undefined, needs_confirming = false}).
@@ -226,37 +224,27 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
false -> {stop_sync_timer(State1), hibernate}
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
- State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+stop_sync_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_sync_timer(
+ fun sync_timer_getter/1, fun sync_timer_setter/2, State).
+
+sync_timer_getter(State) -> State#q.sync_timer_ref.
+sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}.
-stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- State;
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{sync_timer_ref = undefined}.
-
-ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
- State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
- State.
+ rabbit_amqqueue_process_utils:ensure_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
-stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
-stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#q{rate_timer_ref = undefined}.
+stop_rate_timer(State) ->
+ rabbit_amqqueue_process_utils:stop_rate_timer(
+ fun rate_timer_getter/1, fun rate_timer_setter/2, State).
+
+rate_timer_getter(State) -> State#q.rate_timer_ref.
+rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}.
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
@@ -1160,15 +1148,11 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- BQS3 = BQ:handle_pre_hibernate(BQS2),
+ BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS3},
+ backing_queue_state = BQS1},
{hibernate, stop_rate_timer(State1)}.