diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 |
1 files changed, 30 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2fb60e96..808e1117 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,6 +38,7 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(STATISTICS_UPDATE_INTERVAL, 5000). -export([start_link/1, info_keys/0]). @@ -57,7 +58,8 @@ active_consumers, blocked_consumers, sync_timer_ref, - rate_timer_ref + rate_timer_ref, + last_statistics_update }). -record(consumer, {tag, ack_required}). @@ -110,7 +112,8 @@ init(Q) -> active_consumers = queue:new(), blocked_consumers = queue:new(), sync_timer_ref = undefined, - rate_timer_ref = undefined}, hibernate, + rate_timer_ref = undefined, + last_statistics_update = 0}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -179,9 +182,10 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), + State2 = maybe_emit_stats(State1), case BQ:needs_sync(BQS)of - true -> {ensure_sync_timer(State1), 0}; - false -> {stop_sync_timer(State1), hibernate} + true -> {ensure_sync_timer(State2), 0}; + false -> {stop_sync_timer(State2), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> @@ -530,6 +534,28 @@ i(Item, _) -> %--------------------------------------------------------------------------- +maybe_emit_stats(State = #q{last_statistics_update = LastUpdate}) -> + {MegaSecs, Secs, MicroSecs} = os:timestamp(), + Now = MegaSecs * 1000000 + Secs * 1000 + MicroSecs / 1000, + case Now - LastUpdate > ?STATISTICS_UPDATE_INTERVAL of + true -> + S = {queue_stats, #event_queue_stats{ + q_pid = self(), + messages_ready = i(messages_ready, State), + messages_unacknowledged = i(messages_unacknowledged, State), + consumers = i(consumers, State), + memory = i(memory, State), + exclusive_consumer_tag = i(exclusive_consumer_tag, State), + exclusive_consumer_pid = i(exclusive_consumer_pid, State), + backing_queue_status = i(backing_queue_status, State) + }}, + rabbit_event:notify(S), + State#q{last_statistics_update = Now}; + _ -> + State + end. +%--------------------------------------------------------------------------- + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); |