summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl34
1 files changed, 30 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2fb60e96..808e1117 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,6 +38,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(STATISTICS_UPDATE_INTERVAL, 5000).
-export([start_link/1, info_keys/0]).
@@ -57,7 +58,8 @@
active_consumers,
blocked_consumers,
sync_timer_ref,
- rate_timer_ref
+ rate_timer_ref,
+ last_statistics_update
}).
-record(consumer, {tag, ack_required}).
@@ -110,7 +112,8 @@ init(Q) ->
active_consumers = queue:new(),
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
- rate_timer_ref = undefined}, hibernate,
+ rate_timer_ref = undefined,
+ last_statistics_update = 0}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -179,9 +182,10 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
+ State2 = maybe_emit_stats(State1),
case BQ:needs_sync(BQS)of
- true -> {ensure_sync_timer(State1), 0};
- false -> {stop_sync_timer(State1), hibernate}
+ true -> {ensure_sync_timer(State2), 0};
+ false -> {stop_sync_timer(State2), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
@@ -530,6 +534,28 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+maybe_emit_stats(State = #q{last_statistics_update = LastUpdate}) ->
+ {MegaSecs, Secs, MicroSecs} = os:timestamp(),
+ Now = MegaSecs * 1000000 + Secs * 1000 + MicroSecs / 1000,
+ case Now - LastUpdate > ?STATISTICS_UPDATE_INTERVAL of
+ true ->
+ S = {queue_stats, #event_queue_stats{
+ q_pid = self(),
+ messages_ready = i(messages_ready, State),
+ messages_unacknowledged = i(messages_unacknowledged, State),
+ consumers = i(consumers, State),
+ memory = i(memory, State),
+ exclusive_consumer_tag = i(exclusive_consumer_tag, State),
+ exclusive_consumer_pid = i(exclusive_consumer_pid, State),
+ backing_queue_status = i(backing_queue_status, State)
+ }},
+ rabbit_event:notify(S),
+ State#q{last_statistics_update = Now};
+ _ ->
+ State
+ end.
+%---------------------------------------------------------------------------
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);