summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-16 14:52:26 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-16 14:52:26 +0100
commit17960c865edad12d5b44ae65a3384bbce534f81c (patch)
tree834978fc929a41b537b8edded241d45e279bb311
parent77d9f83cc2e8e6afea0a42d0953d3adfaf2769e1 (diff)
downloadrabbitmq-server-17960c865edad12d5b44ae65a3384bbce534f81c.tar.gz
Get queues to emit events for statistics.
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_event.erl41
4 files changed, 82 insertions, 4 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 3fd52568..b991aa7d 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -70,6 +70,10 @@
-record(delivery, {mandatory, immediate, txn, sender, message}).
-record(amqp_error, {name, explanation, method = none}).
+-record(event_queue_stats, {q_pid, messages_ready, messages_unacknowledged,
+ consumers, memory, exclusive_consumer_pid,
+ exclusive_consumer_tag, backing_queue_status}).
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 18045b94..88d1a318 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -89,6 +89,13 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_event,
+ [{description, "statistics event handler"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [gen_event, [{local, rabbit_event}]]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
{requires, external_infrastructure}]}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2fb60e96..808e1117 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,6 +38,7 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(STATISTICS_UPDATE_INTERVAL, 5000).
-export([start_link/1, info_keys/0]).
@@ -57,7 +58,8 @@
active_consumers,
blocked_consumers,
sync_timer_ref,
- rate_timer_ref
+ rate_timer_ref,
+ last_statistics_update
}).
-record(consumer, {tag, ack_required}).
@@ -110,7 +112,8 @@ init(Q) ->
active_consumers = queue:new(),
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
- rate_timer_ref = undefined}, hibernate,
+ rate_timer_ref = undefined,
+ last_statistics_update = 0}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -179,9 +182,10 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
+ State2 = maybe_emit_stats(State1),
case BQ:needs_sync(BQS)of
- true -> {ensure_sync_timer(State1), 0};
- false -> {stop_sync_timer(State1), hibernate}
+ true -> {ensure_sync_timer(State2), 0};
+ false -> {stop_sync_timer(State2), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
@@ -530,6 +534,28 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+maybe_emit_stats(State = #q{last_statistics_update = LastUpdate}) ->
+ {MegaSecs, Secs, MicroSecs} = os:timestamp(),
+ Now = MegaSecs * 1000000 + Secs * 1000 + MicroSecs / 1000,
+ case Now - LastUpdate > ?STATISTICS_UPDATE_INTERVAL of
+ true ->
+ S = {queue_stats, #event_queue_stats{
+ q_pid = self(),
+ messages_ready = i(messages_ready, State),
+ messages_unacknowledged = i(messages_unacknowledged, State),
+ consumers = i(consumers, State),
+ memory = i(memory, State),
+ exclusive_consumer_tag = i(exclusive_consumer_tag, State),
+ exclusive_consumer_pid = i(exclusive_consumer_pid, State),
+ backing_queue_status = i(backing_queue_status, State)
+ }},
+ rabbit_event:notify(S),
+ State#q{last_statistics_update = Now};
+ _ ->
+ State
+ end.
+%---------------------------------------------------------------------------
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
new file mode 100644
index 00000000..618b8dfc
--- /dev/null
+++ b/src/rabbit_event.erl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_event).
+
+-include("rabbit.hrl").
+
+-export([notify/1]).
+
+%%----------------------------------------------------------------------------
+
+notify(Event) ->
+ gen_event:notify(rabbit_event, Event).