diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-03 15:23:57 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-03 15:23:57 +0100 |
commit | aaf5bed2b13b9ac70a02249daa59f1e34f8dff0e (patch) | |
tree | fdcdf273144cee16351cdb4ff38b1398bd9ccae5 | |
parent | 5ae7d968ef83183ec4f36ebec657eb5f2141b6ff (diff) | |
parent | 97e24e2854c18dadcc9c48955cead1d01c734622 (diff) | |
download | rabbitmq-server-aaf5bed2b13b9ac70a02249daa59f1e34f8dff0e.tar.gz |
Merging default into bug 23027
-rw-r--r-- | codegen.py | 3 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 3 | ||||
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 5 | ||||
-rw-r--r-- | src/rabbit.erl | 7 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 27 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 63 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 173 | ||||
-rw-r--r-- | src/rabbit_event.erl | 131 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 50 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 19 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 65 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 105 | ||||
-rw-r--r-- | src/rabbit_tests_event_receiver.erl | 66 | ||||
-rw-r--r-- | src/rabbit_types.erl | 8 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 2 |
18 files changed, 647 insertions, 109 deletions
@@ -407,7 +407,8 @@ def genErl(spec): -spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()). -spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()). -spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]). --spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()). +-spec(decode_method_fields/2 :: + (amqp_method_name(), binary()) -> amqp_method_record() | rabbit_types:connection_exit()). -spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()). -spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()). -spec(encode_properties/1 :: (amqp_method_record()) -> binary()). diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 2cd28abb..48e19ff8 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -27,4 +27,5 @@ {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, - {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}. + {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 6364d60f..a2529e2d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -72,6 +72,8 @@ -record(delivery, {mandatory, immediate, txn, sender, message}). -record(amqp_error, {name, explanation, method = none}). +-record(event, {type, props, timestamp}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). @@ -83,6 +85,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(STATS_INTERVAL, 5000). -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index f05bcb84..cecd666b 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -43,7 +43,8 @@ rabbit_types:binding()) -> 'ok'). -spec(remove_bindings/2 :: (rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), - rabbit_framing:amqp_table()) -> 'ok'). +-spec(assert_args_equivalence/2 :: + (rabbit_types:exchange(), rabbit_framing:amqp_table()) + -> 'ok' | rabbit_types:connection_exit()). -endif. diff --git a/src/rabbit.erl b/src/rabbit.erl index ada2c38e..1fab7e4d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -89,6 +89,13 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_event, + [{description, "statistics event handler"}, + {mfa, {rabbit_sup, start_restartable_child, + [gen_event, [{local, rabbit_event}]]}}, + {requires, external_infrastructure}, + {enables, kernel_ready}]}). + -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, {requires, external_infrastructure}]}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 3aaf5928..1dbda805 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -52,12 +52,18 @@ -type(password() :: binary()). -type(regexp() :: binary()). --spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user()). --spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()). +-spec(check_login/2 :: + (binary(), binary()) -> rabbit_types:user() | + rabbit_types:channel_exit()). +-spec(user_pass_login/2 :: + (username(), password()) + -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_vhost_access/2 :: - (rabbit_types:user(), rabbit_types:vhost()) -> 'ok'). + (rabbit_types:user(), rabbit_types:vhost()) + -> 'ok' | rabbit_types:channel_exit()). -spec(check_resource_access/3 :: - (username(), rabbit_types:r(atom()), permission_atom()) -> 'ok'). + (username(), rabbit_types:r(atom()), permission_atom()) + -> 'ok' | rabbit_types:channel_exit()). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -65,8 +71,10 @@ -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). --spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(add_vhost/1 :: + (rabbit_types:vhost()) -> 'ok'). +-spec(delete_vhost/1 :: + (rabbit_types:vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 870c119a..d4226331 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -41,6 +41,7 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -77,18 +78,24 @@ -spec(declare/5 :: (name(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) - -> {'new' | 'existing', rabbit_types:amqqueue()}). + -> {'new' | 'existing', rabbit_types:amqqueue()} | + rabbit_types:channel_exit()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found')). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). --spec(with_or_die/2 :: (name(), qfun(A)) -> A). +-spec(with_or_die/2 :: + (name(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(assert_equivalence/5 :: (rabbit_types:amqqueue(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) - -> 'ok' | no_return()). --spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok'). --spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A). + -> 'ok' | rabbit_types:channel_exit() | + rabbit_types:connection_exit()). +-spec(check_exclusive_access/2 :: + (rabbit_types:amqqueue(), pid()) + -> 'ok' | rabbit_types:channel_exit()). +-spec(with_exclusive_access_or_die/3 :: + (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]). @@ -107,6 +114,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). +-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -143,7 +151,9 @@ -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> rabbit_types:amqqueue() | 'not_found'). --spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found')). +-spec(internal_delete/1 :: + (name()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). @@ -345,6 +355,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). +emit_stats(#amqqueue{pid = QPid}) -> + delegate_pcast(QPid, 7, emit_stats). + delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -442,7 +455,7 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ac5fb7f9..5e1b1f71 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -59,7 +59,8 @@ expires, sync_timer_ref, rate_timer_ref, - expiry_timer_ref + expiry_timer_ref, + stats_timer }). -record(consumer, {tag, ack_required}). @@ -74,13 +75,8 @@ txn, unsent_message_count}). --define(INFO_KEYS, - [name, - durable, - auto_delete, - arguments, - pid, - owner_pid, +-define(STATISTICS_KEYS, + [pid, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, @@ -91,6 +87,17 @@ backing_queue_status ]). +-define(CREATION_EVENT_KEYS, + [pid, + name, + durable, + auto_delete, + arguments, + owner_pid + ]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). + %%---------------------------------------------------------------------------- start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -114,7 +121,8 @@ init(Q) -> expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, - expiry_timer_ref = undefined}, hibernate, + expiry_timer_ref = undefined, + stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -155,6 +163,10 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), + rabbit_event:notify( + queue_created, + [{Item, i(Item, State)} || + Item <- ?CREATION_EVENT_KEYS]), noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -173,6 +185,7 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -189,9 +202,10 @@ noreply(NewState) -> next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), + State2 = ensure_stats_timer(State1), case BQ:needs_idle_timeout(BQS)of - true -> {ensure_sync_timer(State1), 0}; - false -> {stop_sync_timer(State1), hibernate} + true -> {ensure_sync_timer(State2), 0}; + false -> {stop_sync_timer(State2), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> @@ -227,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. - + stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> @@ -249,6 +263,18 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. +ensure_stats_timer(State = #q{stats_timer = StatsTimer, + q = Q}) -> + State#q{stats_timer = rabbit_event:ensure_stats_timer( + StatsTimer, + fun() -> emit_stats(State) end, + fun() -> rabbit_amqqueue:emit_stats(Q) end)}. + +stop_stats_timer(State = #q{stats_timer = StatsTimer}) -> + State#q{stats_timer = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> emit_stats(State) end)}. + assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -560,6 +586,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +emit_stats(State) -> + rabbit_event:notify(queue_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + %--------------------------------------------------------------------------- handle_call({init, Recover}, From, @@ -856,7 +886,11 @@ handle_cast(maybe_expire, State) -> true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) - end. + end; + +handle_cast(emit_stats, State) -> + emit_stats(State), + noreply(State). handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -893,4 +927,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), infinity), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. + {hibernate, stop_stats_timer( + stop_rate_timer(State#q{backing_queue_state = BQS2}))}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index c76c01ac..c1445a0c 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -48,11 +48,11 @@ ({ok, rabbit_router:routing_result(), [pid()]} | rabbit_types:error('not_found'))). --spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). +-spec(publish/1 :: + (rabbit_types:delivery()) -> publish_result()). -spec(delivery/4 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) - -> rabbit_types:delivery()). + rabbit_types:message()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9c2f9ee8..edbcb2ea 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,6 +38,7 @@ -export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([emit_stats/1, flush/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -46,22 +47,27 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid}). + consumer_mapping, blocking, queue_collector_pid, stats_timer}). -define(MAX_PERMISSION_CACHE_SIZE, 12). --define(INFO_KEYS, +-define(STATISTICS_KEYS, [pid, - connection, - number, - user, - vhost, transactional, consumer_count, messages_unacknowledged, acks_uncommitted, prefetch_count]). +-define(CREATION_EVENT_KEYS, + [pid, + connection, + number, + user, + vhost]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -88,6 +94,7 @@ -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -endif. @@ -135,29 +142,39 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). +emit_stats(Pid) -> + gen_server2:pcast(Pid, 7, emit_stats). + +flush(Pid) -> + gen_server2:call(Pid, flush). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - {ok, #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid}, - hibernate, + State = #ch{state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + stats_timer = rabbit_event:init_stats_timer()}, + rabbit_event:notify( + channel_created, + [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(info, _From, State) -> @@ -169,6 +186,9 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(_Request, _From, State) -> noreply(State). @@ -207,7 +227,17 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - noreply(State1#ch{next_tag = DeliveryTag + 1}). + {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + maybe_incr_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State), + noreply(State1#ch{next_tag = DeliveryTag + 1}); + +handle_cast(emit_stats, State) -> + internal_emit_stats(State), + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -216,11 +246,12 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), - {hibernate, State}. + {hibernate, stop_stats_timer(State)}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -238,9 +269,23 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +reply(Reply, NewState) -> + {reply, Reply, ensure_stats_timer(NewState), hibernate}. + +noreply(NewState) -> + {noreply, ensure_stats_timer(NewState), hibernate}. + +ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> + ChPid = self(), + State#ch{stats_timer = rabbit_event:ensure_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end, + fun() -> emit_stats(ChPid) end)}. -noreply(NewState) -> {noreply, NewState, hibernate}. +stop_stats_timer(State = #ch{stats_timer = StatsTimer}) -> + State#ch{stats_timer = rabbit_event:stop_stats_timer( + StatsTimer, + fun() -> internal_emit_stats(State) end)}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -393,6 +438,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -403,7 +451,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(TxnKey, Acked), + QIncs = ack(TxnKey, Acked), + Participants = [QPid || {QPid, _} <- QIncs], + maybe_incr_stats(QIncs, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -426,11 +476,16 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + maybe_incr_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end, State), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -909,7 +964,7 @@ ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), - [QPid | L] + [{QPid, length(MsgIds)} | L] end, [], UAQ). make_tx_id() -> rabbit_guid:guid(). @@ -1036,6 +1091,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]), rabbit_writer:shutdown(WriterPid), rabbit_limiter:shutdown(LimiterPid). @@ -1058,3 +1114,60 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). + +maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) -> + case rabbit_event:stats_level(StatsTimer) of + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok + end. + +incr_stats({QPid, _} = QX, Inc, Measure) -> + maybe_monitor(QPid), + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + maybe_monitor(QPid), + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). + +maybe_monitor(QPid) -> + case get({monitoring, QPid}) of + undefined -> erlang:monitor(process, QPid), + put({monitoring, QPid}, true); + _ -> ok + end. + +update_measures(Type, QX, Inc, Measure) -> + Measures = case get({Type, QX}) of + undefined -> []; + D -> D + end, + Cur = case orddict:find(Measure, Measures) of + error -> 0; + {ok, C} -> C + end, + put({Type, QX}, + orddict:store(Measure, Cur + Inc, Measures)). + +internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> + CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + case rabbit_event:stats_level(StatsTimer) of + coarse -> + rabbit_event:notify(channel_stats, CoarseStats); + fine -> + FineStats = + [{channel_queue_stats, + [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, + {channel_exchange_stats, + [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {channel_queue_exchange_stats, + [{QX, Stats} || + {{queue_exchange_stats, QX}, Stats} <- get()]}], + rabbit_event:notify(channel_stats, CoarseStats ++ FineStats) + end. + +erase_queue_stats(QPid) -> + erase({monitoring, QPid}), + erase({queue_stats, QPid}), + [erase({queue_exchange_stats, QX}) || + {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl new file mode 100644 index 00000000..07027cd5 --- /dev/null +++ b/src/rabbit_event.erl @@ -0,0 +1,131 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_event). + +-include("rabbit.hrl"). + +-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]). +-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]). +-export([stats_level/1]). +-export([notify/2]). + +%%---------------------------------------------------------------------------- + +-record(state, {level, timer}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([event_type/0, event_props/0, event_timestamp/0, event/0]). + +-type(event_type() :: atom()). +-type(event_props() :: term()). +-type(event_timestamp() :: + {non_neg_integer(), non_neg_integer(), non_neg_integer()}). + +-type(event() :: #event { + type :: event_type(), + props :: event_props(), + timestamp :: event_timestamp() + }). + +-type(level() :: 'none' | 'coarse' | 'fine'). + +-opaque(state() :: #state { + level :: level(), + timer :: atom() + }). + +-type(timer_fun() :: fun (() -> 'ok')). + +-spec(init_stats_timer/0 :: () -> state()). +-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()). +-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()). +-spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()). +-spec(reset_stats_timer_after/1 :: (state()) -> state()). +-spec(stats_level/1 :: (state()) -> level()). +-spec(notify/2 :: (event_type(), event_props()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +init_stats_timer() -> + {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), + #state{level = StatsLevel, timer = undefined}. + +ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) -> + State; +ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) -> + NowFun(), + {ok, TRef} = timer:apply_interval(?STATS_INTERVAL, + erlang, apply, [TimerFun, []]), + State#state{timer = TRef}; +ensure_stats_timer(State, _NowFun, _TimerFun) -> + State. + +stop_stats_timer(State = #state{level = none}, _NowFun) -> + State; +stop_stats_timer(State = #state{timer = undefined}, _NowFun) -> + State; +stop_stats_timer(State = #state{timer = TRef}, NowFun) -> + {ok, cancel} = timer:cancel(TRef), + NowFun(), + State#state{timer = undefined}. + +ensure_stats_timer_after(State = #state{level = none}, _TimerFun) -> + State; +ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) -> + {ok, TRef} = timer:apply_after(?STATS_INTERVAL, + erlang, apply, [TimerFun, []]), + State#state{timer = TRef}; +ensure_stats_timer_after(State, _TimerFun) -> + State. + +reset_stats_timer_after(State) -> + State#state{timer = undefined}. + +stats_level(#state{level = Level}) -> + Level. + +notify(Type, Props) -> + try + gen_event:notify(rabbit_event, #event{type = Type, + props = Props, + timestamp = os:timestamp()}) + catch error:badarg -> + %% badarg means rabbit_event is no longer registered. We never + %% unregister it so the great likelihood is that we're shutting + %% down the broker but some events were backed up. Ignore it. + ok + end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 49f87a22..af4eb1bd 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -71,18 +71,21 @@ -spec(declare/5 :: (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) -> rabbit_types:exchange()). --spec(check_type/1 :: (binary()) -> atom()). +-spec(check_type/1 :: + (binary()) -> atom() | rabbit_types:connection_exit()). -spec(assert_equivalence/5 :: (rabbit_types:exchange(), atom(), boolean(), boolean(), rabbit_framing:amqp_table()) - -> 'ok' | no_return()). + -> 'ok' | rabbit_types:connection_exit()). -spec(assert_args_equivalence/2 :: - (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> - 'ok' | no_return()). + (rabbit_types:exchange(), rabbit_framing:amqp_table()) + -> 'ok' | rabbit_types:connection_exit()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:exchange()) | rabbit_types:error('not_found')). --spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange()). +-spec(lookup_or_die/1 :: + (name()) -> rabbit_types:exchange() | + rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]). @@ -96,8 +99,7 @@ -> {rabbit_router:routing_result(), [pid()]}). -spec(add_binding/5 :: (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) - -> bind_res()). + rabbit_framing:amqp_table(), inner_fun()) -> bind_res()). -spec(delete_binding/5 :: (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), rabbit_framing:amqp_table(), inner_fun()) @@ -107,9 +109,9 @@ -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), rabbit_framing:amqp_table()}]). -spec(delete_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> none())). + (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete_transient_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> none())). + (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | @@ -190,6 +192,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end) of {new, X} -> TypeModule:create(X), + rabbit_event:notify( + exchange_created, + [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]), X; {existing, X} -> X; Err -> Err @@ -197,12 +202,8 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - case rabbit_exchange_type_registry:lookup_module(T) of - {ok, Module} -> Module; - {error, not_found} -> rabbit_misc:protocol_error( - command_invalid, - "invalid exchange type '~s'", [T]) - end. + {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + Module. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -211,8 +212,12 @@ check_type(TypeBin) -> rabbit_misc:protocol_error( command_invalid, "unknown exchange type '~s'", [TypeBin]); T -> - _Module = type_to_module(T), - T + case rabbit_exchange_type_registry:lookup_module(T) of + {error, not_found} -> rabbit_misc:protocol_error( + command_invalid, + "invalid exchange type '~s'", [T]); + {ok, _Module} -> T + end end. assert_equivalence(X = #exchange{ durable = Durable, @@ -426,6 +431,12 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> X#exchange.durable andalso Q#amqqueue.durable, fun mnesia:write/3), + rabbit_event:notify( + binding_created, + [{exchange_name, ExchangeName}, + {queue_name, QueueName}, + {routing_key, RoutingKey}, + {arguments, Arguments}]), {new, X, B}; [_R] -> {existing, X, B} @@ -458,6 +469,10 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> X#exchange.durable andalso Q#amqqueue.durable, fun mnesia:delete_object/3), + rabbit_event:notify( + binding_deleted, + [{exchange_name, ExchangeName}, + {queue_name, QueueName}]), {maybe_auto_delete(X), B}; {error, _} = E -> E @@ -576,6 +591,7 @@ unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}), + rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 050b499f..3b665d38 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -79,30 +79,33 @@ -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). -type(resource_name() :: binary()). +-type(channel_or_connection_exit() + :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). --spec(die/1 :: (rabbit_framing:amqp_exception()) -> no_return()). +-spec(die/1 :: + (rabbit_framing:amqp_exception()) -> channel_or_connection_exit()). -spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary()) - -> no_return()). + -> rabbit_types:connection_exit()). -spec(amqp_error/4 :: (rabbit_framing:amqp_exception(), string(), [any()], rabbit_framing:amqp_method_name()) -> rabbit_types:amqp_error()). -spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()]) - -> no_return()). + -> channel_or_connection_exit()). -spec(protocol_error/4 :: (rabbit_framing:amqp_exception(), string(), [any()], - rabbit_framing:amqp_method_name()) - -> no_return()). --spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()). --spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()). + rabbit_framing:amqp_method_name()) -> channel_or_connection_exit()). +-spec(protocol_error/1 :: + (rabbit_types:amqp_error()) -> channel_or_connection_exit()). +-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()). -spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> - 'ok' | no_return()). + 'ok' | rabbit_types:connection_exit()). -spec(get_config/1 :: (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(get_config/2 :: (atom(), A) -> A). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index d6c15170..5cffaa44 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -43,6 +43,8 @@ -export([analyze_frame/3]). +-export([emit_stats/1]). + -import(gen_tcp). -import(fprof). -import(inet). @@ -58,12 +60,16 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_length, recv_ref, - connection_state, queue_collector, heartbeater}). + connection_state, queue_collector, heartbeater, stats_timer}). + +-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, + send_pend, state, channels]). + +-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, + protocol, user, vhost, timeout, frame_max, + client_properties]). --define(INFO_KEYS, - [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels, - protocol, user, vhost, timeout, frame_max, client_properties]). +-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). %% connection lifecycle %% @@ -157,6 +163,7 @@ -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). +-spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). @@ -198,6 +205,9 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. +emit_stats(Pid) -> + gen_server:cast(Pid, emit_stats). + setup_profiling() -> Value = rabbit_misc:get_config(profiling_enabled, false), case Value of @@ -277,7 +287,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> recv_ref = none, connection_state = pre_init, queue_collector = Collector, - heartbeater = none}, + heartbeater = none, + stats_timer = + rabbit_event:init_stats_timer()}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -297,7 +309,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), rabbit_queue_collector:shutdown(Collector), - rabbit_misc:unlink_and_capture_exit(Collector) + rabbit_misc:unlink_and_capture_exit(Collector), + rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -365,6 +378,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> catch Error -> {error, Error} end), mainloop(Parent, Deb, State); + {'$gen_cast', emit_stats} -> + internal_emit_stats(State), + mainloop(Parent, Deb, + State#v1{stats_timer = + rabbit_event:reset_stats_timer_after( + State#v1.stats_timer)}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); @@ -589,7 +608,8 @@ analyze_frame(_Type, _Body, _Protocol) -> handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]), - {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1}; + {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of @@ -612,13 +632,22 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); +%% This is the protocol header for 0-9, which we can safely treat as +%% though it were 0-9-1. handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); -%% the 0-8 spec, confusingly, defines the version as 8-0 +%% This is what most clients send for 0-8. The 0-8 spec, confusingly, +%% defines the version as 8-0. handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); +%% The 0-8 spec as on the AMQP web site actually has this as the +%% protocol header; some libraries e.g., py-amqplib, send it when they +%% want 0-8. +handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_version, A, B, C, D}); @@ -650,6 +679,12 @@ refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), throw(Exception). +ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) -> + Self = self(), + State#v1{stats_timer = rabbit_event:ensure_stats_timer_after( + StatsTimer, + fun() -> emit_stats(Self) end)}. + %%-------------------------------------------------------------------------- handle_method0(MethodName, FieldsBin, @@ -724,8 +759,12 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - State#v1{connection_state = running, - connection = NewConnection}; + State1 = State#v1{connection_state = running, + connection = NewConnection}, + rabbit_event:notify( + connection_created, + [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), + State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); @@ -886,3 +925,7 @@ amqp_exception_explanation(Text, Expl) -> if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; true -> CompleteTextBin end. + +internal_emit_stats(State) -> + rabbit_event:notify(connection_stats, + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bd243de4..923c3403 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -67,6 +67,7 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), + passed = test_statistics(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -1031,16 +1032,108 @@ test_hooks() -> end, passed. -expect_normal_channel_termination(MRef, Ch) -> - receive {'DOWN', MRef, process, Ch, normal} -> ok - after 1000 -> throw(channel_failed_to_exit) +test_spawn(Receiver) -> + Me = self(), + Writer = spawn(fun () -> Receiver(Me) end), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, + <<"/">>, self()), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + MRef = erlang:monitor(process, Ch), + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + {Writer, Ch, MRef}. + +test_statistics_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + Pid ! Method, + test_statistics_receiver(Pid) end. -gobble_channel_exit() -> - receive {channel_exit, _, _} -> ok - after 1000 -> throw(channel_exit_not_received) +test_statistics_event_receiver(Pid) -> + receive + Foo -> + Pid ! Foo, + test_statistics_event_receiver(Pid) end. +test_statistics_receive_event(Ch, Matcher) -> + rabbit_channel:flush(Ch), + rabbit_channel:emit_stats(Ch), + test_statistics_receive_event1(Ch, Matcher). + +test_statistics_receive_event1(Ch, Matcher) -> + receive #event{type = channel_stats, props = Props} -> + case Matcher(Props) of + true -> Props; + _ -> test_statistics_receive_event1(Ch, Matcher) + end + after 1000 -> throw(failed_to_receive_event) + end. + +test_statistics() -> + application:set_env(rabbit, collect_statistics, fine), + + %% ATM this just tests the queue / exchange stats in channels. That's + %% by far the most complex code though. + + %% Set up a channel and queue + {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + rabbit_channel:do(Ch, #'queue.declare'{}), + QName = receive #'queue.declare_ok'{queue = Q0} -> + Q0 + after 1000 -> throw(failed_to_receive_queue_declare_ok) + end, + {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), + QPid = Q#amqqueue.pid, + X = rabbit_misc:r(<<"/">>, exchange, <<"">>), + + rabbit_tests_event_receiver:start(self()), + + %% Check stats empty + Event = test_statistics_receive_event(Ch, fun (_) -> true end), + [] = proplists:get_value(channel_queue_stats, Event), + [] = proplists:get_value(channel_exchange_stats, Event), + [] = proplists:get_value(channel_queue_exchange_stats, Event), + + %% Publish and get a message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{}, <<"">>)), + rabbit_channel:do(Ch, #'basic.get'{queue = QName}), + + %% Check the stats reflect that + Event2 = test_statistics_receive_event( + Ch, + fun (E) -> + length(proplists:get_value( + channel_queue_exchange_stats, E)) > 0 + end), + [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2), + [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2), + [{{QPid,X},[{publish,1}]}] = + proplists:get_value(channel_queue_exchange_stats, Event2), + + %% Check the stats remove stuff on queue deletion + rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), + Event3 = test_statistics_receive_event( + Ch, + fun (E) -> + length(proplists:get_value( + channel_queue_exchange_stats, E)) == 0 + end), + + [] = proplists:get_value(channel_queue_stats, Event3), + [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3), + [] = proplists:get_value(channel_queue_exchange_stats, Event3), + + rabbit_channel:shutdown(Ch), + rabbit_tests_event_receiver:stop(), + passed. + test_delegates_async(SecondaryNode) -> Self = self(), Sender = fun (Pid) -> Pid ! {invoked, Self} end, diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl new file mode 100644 index 00000000..a92e3da7 --- /dev/null +++ b/src/rabbit_tests_event_receiver.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_tests_event_receiver). + +-export([start/1, stop/0]). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +start(Pid) -> + gen_event:add_handler(rabbit_event, ?MODULE, [Pid]). + +stop() -> + gen_event:delete_handler(rabbit_event, ?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([Pid]) -> + {ok, Pid}. + +handle_call(_Request, Pid) -> + {ok, not_understood, Pid}. + +handle_event(Event, Pid) -> + Pid ! Event, + {ok, Pid}. + +handle_info(_Info, Pid) -> + {ok, Pid}. + +terminate(_Arg, _Pid) -> + ok. + +code_change(_OldVsn, Pid, _Extra) -> + {ok, Pid}. + +%%---------------------------------------------------------------------------- diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 3aaf1917..a9313503 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -40,7 +40,11 @@ unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, - user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]). + user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1, + channel_exit/0, connection_exit/0]). + +-type(channel_exit() :: no_return()). +-type(connection_exit() :: no_return()). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -133,7 +137,7 @@ -type(connection() :: pid()). --type(protocol() :: atom()). +-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1'). -type(user() :: #user{username :: rabbit_access_control:username(), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index bbc3a8c0..3cbc80f8 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -78,7 +78,7 @@ rabbit_types:ok(pid())). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). --spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_vm_limit/0 :: () -> non_neg_integer()). -spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). |