diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-28 16:44:43 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-28 16:44:43 +0100 |
commit | 6f40aaa66049587e34aa47864738865020b94ac4 (patch) | |
tree | 256bbaf7f6fa11e31aa838ba4c5821386eeeb365 | |
parent | 538c8e830c672afbf1e12d5b1c3b95fd2f9c49bf (diff) | |
parent | 00d2a4fbe30effa0c130de7446284169b890a12e (diff) | |
download | rabbitmq-server-6f40aaa66049587e34aa47864738865020b94ac4.tar.gz |
Merge default into bug21387
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 74 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 145 | ||||
-rw-r--r-- | src/rabbit_event.erl | 50 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 55 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 100 | ||||
-rw-r--r-- | src/rabbit_tests_event_receiver.erl | 66 |
11 files changed, 482 insertions, 42 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 2cd28abb..48e19ff8 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -27,4 +27,5 @@ {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}. + {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 3fd52568..5df9e690 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,6 +70,8 @@ -record(delivery, {mandatory, immediate, txn, sender, message}). -record(amqp_error, {name, explanation, method = none}). +-record(event, {type, props, timestamp}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). @@ -80,6 +82,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(STATS_INTERVAL, 5000). -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). diff --git a/src/rabbit.erl b/src/rabbit.erl index 18045b94..88d1a318 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -89,6 +89,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_event, + [{description, "statistics event handler"}, + {mfa, {rabbit_sup, start_restartable_child, + [gen_event, [{local, rabbit_event}]]}}, + {requires, external_infrastructure}, + {enables, kernel_ready}]}). + -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, {requires, external_infrastructure}]}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6bf2f6db..cff553d5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,6 +41,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -107,6 +108,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). +-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -344,6 +346,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +emit_stats(#amqqueue{pid = QPid}) -> + delegate_pcast(QPid, 7, emit_stats). + delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -438,7 +443,7 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 67f0fcf5..78697841 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -59,7 +59,9 @@ expires, sync_timer_ref, rate_timer_ref, - expiry_timer_ref + expiry_timer_ref, + stats_timer_ref, + stats_level }). -record(consumer, {tag, ack_required}). @@ -74,13 +76,8 @@ txn, unsent_message_count}). --define(INFO_KEYS, - [name, - durable, - auto_delete, - arguments, - pid, - owner_pid, +-define(STATISTICS_KEYS, + [pid, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, @@ -91,6 +88,16 @@ backing_queue_status ]). +-define(CREATION_EVENT_KEYS, + [name, + durable, + auto_delete, + arguments, + owner_pid + ]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS). + %%---------------------------------------------------------------------------- start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -103,6 +110,7 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, @@ -114,7 +122,9 @@ init(Q) -> expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, - expiry_timer_ref = undefined}, hibernate, + expiry_timer_ref = undefined, + stats_timer_ref = undefined, + stats_level = StatsLevel}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -155,6 +165,10 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + rabbit_event:notify( + queue_created, + [{Item, i(Item, State)} || + Item <- [pid|?CREATION_EVENT_KEYS]]), noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -173,6 +187,7 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -189,9 +204,10 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), + State2 = ensure_stats_timer(State1), case BQ:needs_idle_timeout(BQS)of - true -> {ensure_sync_timer(State1), 0}; - false -> {stop_sync_timer(State1), hibernate} + true -> {ensure_sync_timer(State2), 0}; + false -> {stop_sync_timer(State2), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> @@ -227,7 +243,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. - + stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> @@ -249,6 +265,27 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. +ensure_stats_timer(State = #q{stats_level = none}) -> + State; +ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) -> + {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, + rabbit_amqqueue, emit_stats, [Q]), + emit_stats(State), + State#q{stats_timer_ref = TRef}; +ensure_stats_timer(State) -> + State. + +stop_stats_timer(State = #q{stats_level = none}) -> + State; +stop_stats_timer(State = #q{stats_timer_ref = undefined}) -> + emit_stats(State), + State; +stop_stats_timer(State = #q{stats_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + emit_stats(State), + State#q{stats_timer_ref = undefined}. + + assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -560,6 +597,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +emit_stats(State) -> + rabbit_event:notify(queue_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + %--------------------------------------------------------------------------- handle_call({init, Recover}, From, @@ -841,7 +882,11 @@ handle_cast(maybe_expire, State) -> true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) - end. + end; + +handle_cast(emit_stats, State) -> + emit_stats(State), + noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -878,4 +923,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. + {hibernate, stop_stats_timer( + stop_rate_timer(State#q{backing_queue_state = BQS2}))}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dafc3075..7d45cec9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,6 +38,7 @@ -export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([emit_stats/1]). -export([flow_timeout/2]). @@ -48,25 +49,29 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, flow}). + consumer_mapping, blocking, queue_collector_pid, flow, + stats_timer_ref, stats_level}). -record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds --define(INFO_KEYS, +-define(STATISTICS_KEYS, [pid, - connection, - number, - user, - vhost, transactional, consumer_count, messages_unacknowledged, acks_uncommitted, prefetch_count]). +-define(INFO_KEYS, + [connection, + number, + user, + vhost] + ++ ?STATISTICS_KEYS). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -96,6 +101,7 @@ -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -endif. @@ -151,12 +157,21 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). +emit_stats(Pid) -> + gen_server2:pcast(Pid, 7, emit_stats). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), + rabbit_event:notify(channel_created, [{pid, self()}, + {connection_pid, ReaderPid}, + {channel, Channel}, + {user, Username}, + {vhost, VHost}]), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -174,7 +189,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, - pending = none}}, + pending = none}, + stats_timer_ref = undefined, + stats_level = StatsLevel}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -225,6 +242,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + maybe_incr_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> @@ -245,7 +268,11 @@ handle_cast({flow_timeout, Ref}, "timeout waiting for channel.flow_ok{active=~w}", [not Flow], none), State)}; handle_cast({flow_timeout, _Ref}, State) -> - {noreply, State}. + {noreply, State}; + +handle_cast(emit_stats, State) -> + internal_emit_stats(State), + noreply(State). handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -254,11 +281,12 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {hibernate, State}. + {hibernate, stop_stats_timer(State)}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -276,9 +304,31 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +reply(Reply, NewState) -> + {reply, Reply, ensure_stats_timer(NewState), hibernate}. -noreply(NewState) -> {noreply, NewState, hibernate}. +noreply(NewState) -> + {noreply, ensure_stats_timer(NewState), hibernate}. + +ensure_stats_timer(State = #ch{stats_level = none}) -> + State; +ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) -> + internal_emit_stats(State), + {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, + rabbit_channel, emit_stats, [self()]), + State#ch{stats_timer_ref = TRef}; +ensure_stats_timer(State) -> + State. + +stop_stats_timer(State = #ch{stats_level = none}) -> + State; +stop_stats_timer(State = #ch{stats_timer_ref = undefined}) -> + internal_emit_stats(State), + State; +stop_stats_timer(State = #ch{stats_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + internal_emit_stats(State), + State#ch{stats_timer_ref = undefined}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -437,6 +487,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -447,7 +500,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(TxnKey, Acked), + QIncs = ack(TxnKey, Acked), + Participants = [QPid || {QPid, _} <- QIncs], + maybe_incr_stats(QIncs, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -470,11 +525,16 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + maybe_incr_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end, State), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -978,7 +1038,7 @@ ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), - [QPid | L] + [{QPid, length(MsgIds)} | L] end, [], UAQ). make_tx_id() -> rabbit_guid:guid(). @@ -1105,6 +1165,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]), rabbit_writer:shutdown(WriterPid), rabbit_limiter:shutdown(LimiterPid). @@ -1127,3 +1188,59 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). + +maybe_incr_stats(_QXIncs, _Measure, #ch{stats_level = none}) -> + ok; +maybe_incr_stats(QXIncs, Measure, _State) -> + [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]. + +incr_stats({QPid, _} = QX, Inc, Measure) -> + maybe_monitor(QPid), + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + maybe_monitor(QPid), + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). + +maybe_monitor(QPid) -> + case get({monitoring, QPid}) of + undefined -> erlang:monitor(process, QPid), + put({monitoring, QPid}, true); + _ -> ok + end. + +update_measures(Type, QX, Inc, Measure) -> + Measures = case get({Type, QX}) of + undefined -> []; + D -> D + end, + Cur = case orddict:find(Measure, Measures) of + error -> 0; + {ok, C} -> C + end, + put({Type, QX}, + orddict:store(Measure, Cur + Inc, Measures)). + +internal_emit_stats(State = #ch{stats_level = Level}) -> + CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + case Level of + coarse -> + rabbit_event:notify(channel_stats, CoarseStats); + fine -> + FineStats = + [{channel_queue_stats, + [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, + {channel_exchange_stats, + [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, CoarseStats ++ FineStats) + end. + +erase_queue_stats(QPid) -> + erase({monitoring, QPid}), + erase({queue_stats, QPid}), + [erase({queue_exchange_stats, QX}) || + {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0]. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl new file mode 100644 index 00000000..72a9f8ba --- /dev/null +++ b/src/rabbit_event.erl @@ -0,0 +1,50 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_event). + +-include("rabbit.hrl"). + +-export([notify/2]). + +%%---------------------------------------------------------------------------- + +notify(Type, Props) -> + try + gen_event:notify(rabbit_event, #event{type = Type, + props = Props, + timestamp = os:timestamp()}) + catch error:badarg -> + %% badarg means rabbit_event is no longer registered. We never + %% unregister it so the great likelihood is that we're shutting + %% down the broker but some events were backed up. Ignore it. + ok + end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7f7622b2..1beb38a8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -190,6 +190,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end) of {new, X} -> TypeModule:create(X), + rabbit_event:notify( + exchange_created, + [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]), X; {existing, X} -> X; Err -> Err @@ -427,6 +430,12 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> X#exchange.durable andalso Q#amqqueue.durable, fun mnesia:write/3), + rabbit_event:notify( + binding_created, + [{exchange_name, ExchangeName}, + {queue_name, QueueName}, + {routing_key, RoutingKey}, + {arguments, Arguments}]), {new, X, B}; [_R] -> {existing, X, B} @@ -459,6 +468,10 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> X#exchange.durable andalso Q#amqqueue.durable, fun mnesia:delete_object/3), + rabbit_event:notify( + binding_deleted, + [{exchange_name, ExchangeName}, + {queue_name, QueueName}]), {maybe_auto_delete(X), B}; {error, _} = E -> E @@ -577,6 +590,7 @@ unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}), + rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b5514c82..a2d92d2b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -43,6 +43,8 @@ -export([analyze_frame/2]). +-export([emit_stats/1]). + -import(gen_tcp). -import(fprof). -import(inet). @@ -58,12 +60,16 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector}). + queue_collector, stats_timer_ref, stats_level}). + +-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, + send_pend, state, channels]). --define(INFO_KEYS, - [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max, client_properties]). +-define(CREATION_EVENT_KEYS, [address, port, peer_address, peer_port, + user, vhost, timeout, frame_max, + client_properties]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS). %% connection lifecycle %% @@ -141,6 +147,7 @@ -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). @@ -181,6 +188,9 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. +emit_stats(Pid) -> + gen_server:cast(Pid, emit_stats). + setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of @@ -241,6 +251,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> handshake_timeout), ProfilingValue = setup_profiling(), {ok, Collector} = rabbit_queue_collector:start_link(), + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -253,7 +264,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, - queue_collector = Collector}, + queue_collector = Collector, + stats_timer_ref = undefined, + stats_level = StatsLevel}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -273,7 +286,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), rabbit_queue_collector:shutdown(Collector), - rabbit_misc:unlink_and_capture_exit(Collector) + rabbit_misc:unlink_and_capture_exit(Collector), + rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -339,6 +353,9 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> catch Error -> {error, Error} end), mainloop(Parent, Deb, State); + {'$gen_cast', emit_stats} -> + internal_emit_stats(State), + mainloop(Parent, Deb, State#v1{stats_timer_ref = undefined}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -531,7 +548,8 @@ analyze_frame(_Type, _Body) -> handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]), - {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1}; + {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of @@ -585,6 +603,15 @@ check_version(ClientVersion, ServerVersion) -> (ClientMajor == ServerMajor andalso ClientMinor >= ServerMinor). +ensure_stats_timer(State = #v1{stats_level = none}) -> + State; +ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + rabbit_reader, emit_stats, [self()]), + State#v1{stats_timer_ref = TRef}; +ensure_stats_timer(State) -> + State. + %%-------------------------------------------------------------------------- handle_method0(MethodName, FieldsBin, State) -> @@ -659,8 +686,12 @@ handle_method0(#'connection.open'{virtual_host = VHostPath, ok = send_on_channel0( Sock, #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; + State1 = State#v1{connection_state = running, + connection = NewConnection}, + rabbit_event:notify( + connection_created, + [{Item, i(Item, State1)} || Item <- [pid|?CREATION_EVENT_KEYS]]), + State1; true -> %% FIXME: 'host' is supposed to only contain one %% address; but which one do we pick? This is @@ -847,3 +878,7 @@ amqp_exception_explanation(Text, Expl) -> if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; true -> CompleteTextBin end. + +internal_emit_stats(State) -> + rabbit_event:notify(connection_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0b92682a..e19989a0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -67,6 +67,7 @@ all_tests() -> passed = test_app_management(), passed = test_log_management_during_startup(), passed = test_memory_pressure(), + passed = test_statistics(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -1065,9 +1066,12 @@ test_memory_pressure_sync(Ch, Writer) -> end. test_memory_pressure_spawn() -> + test_spawn(fun test_memory_pressure_receiver/1). + +test_spawn(Receiver) -> Me = self(), - Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + Writer = spawn(fun () -> Receiver(Me) end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), MRef = erlang:monitor(process, Ch), @@ -1161,6 +1165,98 @@ test_memory_pressure() -> passed. +test_statistics_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_statistics_receiver(Pid) + end. + +test_statistics_event_receiver(Pid) -> + receive + Foo -> + Pid ! Foo, + test_statistics_event_receiver(Pid) + end. + +test_statistics_receive_event(Ch, Retries, Matcher) -> + rabbit_channel:emit_stats(Ch), + receive #event{type = channel_stats, props = Props} -> + case Matcher(Props) of + true -> + Props; + _ -> + case Retries of + 0 -> throw(failed_to_receive_matching_event); + _ -> timer:sleep(10), + test_statistics_receive_event(Ch, Retries - 1, + Matcher) + end + end + after 1000 -> throw(failed_to_receive_event) + end. + +test_statistics() -> + application:set_env(rabbit, collect_statistics, fine), + + %% ATM this just tests the queue / exchange stats in channels. That's + %% by far the most complex code though. + + %% Set up a channel and queue + {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{}), + QName = receive #'queue.declare_ok'{queue = Q0} -> + Q0 + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), + QPid = Q#amqqueue.pid, + X = rabbit_misc:r(<<"/">>, exchange, <<"">>), + + rabbit_tests_event_receiver:start(self()), + + %% Check stats empty + Event = test_statistics_receive_event(Ch, 10, fun (_) -> true end), + [] = proplists:get_value(channel_queue_stats, Event), + [] = proplists:get_value(channel_exchange_stats, Event), + [] = proplists:get_value(channel_queue_exchange_stats, Event), + + %% Publish and get a message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{}, <<"">>)), + rabbit_channel:do(Ch, #'basic.get'{queue = QName}), + + %% Check the stats reflect that + Event2 = test_statistics_receive_event( + Ch, 10, + fun (E) -> + length(proplists:get_value( + channel_queue_exchange_stats, E)) > 0 + end), + [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), + [{{QPid,X},[{publish,1}]}] = + proplists:get_value(channel_queue_exchange_stats, Event2), + + %% Check the stats remove stuff on queue deletion + rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), + Event3 = test_statistics_receive_event( + Ch, 10, + fun (E) -> + length(proplists:get_value( + channel_queue_exchange_stats, E)) == 0 + end), + + [] = proplists:get_value(channel_queue_stats, Event3), + [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3), + [] = proplists:get_value(channel_queue_exchange_stats, Event3), + + rabbit_tests_event_receiver:stop(), + passed. + test_delegates_async(SecondaryNode) -> Self = self(), Sender = fun (Pid) -> Pid ! {invoked, Self} end, diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl new file mode 100644 index 00000000..a92e3da7 --- /dev/null +++ b/src/rabbit_tests_event_receiver.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_tests_event_receiver). + +-export([start/1, stop/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +start(Pid) -> + gen_event:add_handler(rabbit_event, ?MODULE, [Pid]). + +stop() -> + gen_event:delete_handler(rabbit_event, ?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([Pid]) -> + {ok, Pid}. + +handle_call(_Request, Pid) -> + {ok, not_understood, Pid}. + +handle_event(Event, Pid) -> + Pid ! Event, + {ok, Pid}. + +handle_info(_Info, Pid) -> + {ok, Pid}. + +terminate(_Arg, _Pid) -> + ok. + +code_change(_OldVsn, Pid, _Extra) -> + {ok, Pid}. + +%%---------------------------------------------------------------------------- |