diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-16 14:52:26 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-16 14:52:26 +0100 |
commit | 17960c865edad12d5b44ae65a3384bbce534f81c (patch) | |
tree | 834978fc929a41b537b8edded241d45e279bb311 | |
parent | 77d9f83cc2e8e6afea0a42d0953d3adfaf2769e1 (diff) | |
download | rabbitmq-server-17960c865edad12d5b44ae65a3384bbce534f81c.tar.gz |
Get queues to emit events for statistics.
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 | ||||
-rw-r--r-- | src/rabbit_event.erl | 41 |
4 files changed, 82 insertions, 4 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 3fd52568..b991aa7d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,6 +70,10 @@ -record(delivery, {mandatory, immediate, txn, sender, message}). -record(amqp_error, {name, explanation, method = none}). +-record(event_queue_stats, {q_pid, messages_ready, messages_unacknowledged, + consumers, memory, exclusive_consumer_pid, + exclusive_consumer_tag, backing_queue_status}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/src/rabbit.erl b/src/rabbit.erl index 18045b94..88d1a318 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -89,6 +89,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_event, + [{description, "statistics event handler"}, + {mfa, {rabbit_sup, start_restartable_child, + [gen_event, [{local, rabbit_event}]]}}, + {requires, external_infrastructure}, + {enables, kernel_ready}]}). + -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, {requires, external_infrastructure}]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2fb60e96..808e1117 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,6 +38,7 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(STATISTICS_UPDATE_INTERVAL, 5000). -export([start_link/1, info_keys/0]). @@ -57,7 +58,8 @@ active_consumers, blocked_consumers, sync_timer_ref, - rate_timer_ref + rate_timer_ref, + last_statistics_update }). -record(consumer, {tag, ack_required}). @@ -110,7 +112,8 @@ init(Q) -> active_consumers = queue:new(), blocked_consumers = queue:new(), sync_timer_ref = undefined, - rate_timer_ref = undefined}, hibernate, + rate_timer_ref = undefined, + last_statistics_update = 0}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -179,9 +182,10 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), + State2 = maybe_emit_stats(State1), case BQ:needs_sync(BQS)of - true -> {ensure_sync_timer(State1), 0}; - false -> {stop_sync_timer(State1), hibernate} + true -> {ensure_sync_timer(State2), 0}; + false -> {stop_sync_timer(State2), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> @@ -530,6 +534,28 @@ i(Item, _) -> %--------------------------------------------------------------------------- +maybe_emit_stats(State = #q{last_statistics_update = LastUpdate}) -> + {MegaSecs, Secs, MicroSecs} = os:timestamp(), + Now = MegaSecs * 1000000 + Secs * 1000 + MicroSecs / 1000, + case Now - LastUpdate > ?STATISTICS_UPDATE_INTERVAL of + true -> + S = {queue_stats, #event_queue_stats{ + q_pid = self(), + messages_ready = i(messages_ready, State), + messages_unacknowledged = i(messages_unacknowledged, State), + consumers = i(consumers, State), + memory = i(memory, State), + exclusive_consumer_tag = i(exclusive_consumer_tag, State), + exclusive_consumer_pid = i(exclusive_consumer_pid, State), + backing_queue_status = i(backing_queue_status, State) + }}, + rabbit_event:notify(S), + State#q{last_statistics_update = Now}; + _ -> + State + end. +%--------------------------------------------------------------------------- + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl new file mode 100644 index 00000000..618b8dfc --- /dev/null +++ b/src/rabbit_event.erl @@ -0,0 +1,41 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_event). + +-include("rabbit.hrl"). + +-export([notify/1]). + +%%---------------------------------------------------------------------------- + +notify(Event) -> + gen_event:notify(rabbit_event, Event). |