summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-04 09:53:26 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-04 09:53:26 +0100
commit896ab1ca43156ca83f3a603cec273ef8d33630ea (patch)
treeec1552ebaa20b0aa47addfbf2684623f343c48c4
parentc59fbb37fda9c4e707bf91622af9a70e5c46932d (diff)
parent10bc8267622f8f07e744d2ae3c5281d6c19a2353 (diff)
downloadrabbitmq-server-896ab1ca43156ca83f3a603cec273ef8d33630ea.tar.gz
merged default into bug22964
-rw-r--r--codegen.py3
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl3
-rw-r--r--include/rabbit_exchange_type_spec.hrl5
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_access_control.erl16
-rw-r--r--src/rabbit_amqqueue.erl32
-rw-r--r--src/rabbit_amqqueue_process.erl63
-rw-r--r--src/rabbit_basic.erl8
-rw-r--r--src/rabbit_channel.erl176
-rw-r--r--src/rabbit_event.erl131
-rw-r--r--src/rabbit_exchange.erl48
-rw-r--r--src/rabbit_misc.erl16
-rw-r--r--src/rabbit_reader.erl54
-rw-r--r--src/rabbit_tests.erl100
-rw-r--r--src/rabbit_tests_event_receiver.erl66
-rw-r--r--src/rabbit_types.erl8
17 files changed, 639 insertions, 100 deletions
diff --git a/codegen.py b/codegen.py
index 230d785e..14229753 100644
--- a/codegen.py
+++ b/codegen.py
@@ -407,7 +407,8 @@ def genErl(spec):
-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()).
-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]).
--spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()).
+-spec(decode_method_fields/2 ::
+ (amqp_method_name(), binary()) -> amqp_method_record() | rabbit_types:connection_exit()).
-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()).
-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()).
-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 2cd28abb..48e19ff8 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -27,4 +27,5 @@
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
- {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}]}]}.
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {collect_statistics, none}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 0d3b1441..b9abd788 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -72,6 +72,8 @@
-record(delivery, {mandatory, immediate, txn, sender, message}).
-record(amqp_error, {name, explanation, method = none}).
+-record(event, {type, props, timestamp}).
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
@@ -83,6 +85,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(STATS_INTERVAL, 5000).
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index f05bcb84..cecd666b 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -43,7 +43,8 @@
rabbit_types:binding()) -> 'ok').
-spec(remove_bindings/2 :: (rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(assert_args_equivalence/2 :: (rabbit_types:exchange(),
- rabbit_framing:amqp_table()) -> 'ok').
+-spec(assert_args_equivalence/2 ::
+ (rabbit_types:exchange(), rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit()).
-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ada2c38e..1fab7e4d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -89,6 +89,13 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
+-rabbit_boot_step({rabbit_event,
+ [{description, "statistics event handler"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [gen_event, [{local, rabbit_event}]]}},
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
+
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
{requires, external_infrastructure}]}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 84496eb2..61160edd 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -53,12 +53,16 @@
-type(regexp() :: binary()).
-type(scope() :: binary()).
--spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user()).
+-spec(check_login/2 ::
+ (binary(), binary()) -> rabbit_types:user() |
+ rabbit_types:channel_exit()).
-spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()).
-spec(check_vhost_access/2 ::
- (rabbit_types:user(), rabbit_types:vhost()) -> 'ok').
+ (rabbit_types:user(), rabbit_types:vhost())
+ -> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
- (username(), rabbit_types:r(atom()), permission_atom()) -> 'ok').
+ (username(), rabbit_types:r(atom()), permission_atom())
+ -> 'ok' | rabbit_types:channel_exit()).
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -66,8 +70,10 @@
-spec(lookup_user/1 ::
(username()) -> rabbit_types:ok(rabbit_types:user())
| rabbit_types:error('not_found')).
--spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(add_vhost/1 ::
+ (rabbit_types:vhost()) -> 'ok' | rabbit_types:connection_exit()).
+-spec(delete_vhost/1 ::
+ (rabbit_types:vhost()) -> 'ok' | rabbit_types:connection_exit()).
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 870c119a..f9f8e2ec 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,6 +41,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -77,18 +78,24 @@
-spec(declare/5 ::
(name(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> {'new' | 'existing', rabbit_types:amqqueue()}).
+ -> {'new' | 'existing', rabbit_types:amqqueue()} |
+ rabbit_types:channel_exit()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found')).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
--spec(with_or_die/2 :: (name(), qfun(A)) -> A).
+-spec(with_or_die/2 ::
+ (name(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(assert_equivalence/5 ::
(rabbit_types:amqqueue(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
- -> 'ok' | no_return()).
--spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok').
--spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
+ -> 'ok' | rabbit_types:channel_exit() |
+ rabbit_types:connection_exit()).
+-spec(check_exclusive_access/2 ::
+ (rabbit_types:amqqueue(), pid())
+ -> 'ok' | rabbit_types:channel_exit()).
+-spec(with_exclusive_access_or_die/3 ::
+ (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]).
@@ -107,6 +114,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
+-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -142,15 +150,18 @@
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
- -> rabbit_types:amqqueue() | 'not_found').
--spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found')).
+ -> rabbit_types:amqqueue() | 'not_found' |
+ rabbit_types:connection_exit()).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(maybe_expire/1 :: (pid()) -> 'ok').
--spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(on_node_down/1 :: (node()) -> 'ok' | rabbit_types:connection_exit()).
-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
-endif.
@@ -345,6 +356,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+emit_stats(#amqqueue{pid = QPid}) ->
+ delegate_pcast(QPid, 7, emit_stats).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -442,7 +456,7 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun},
+ gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
infinity).
update_ram_duration(QPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ac5fb7f9..5e1b1f71 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -59,7 +59,8 @@
expires,
sync_timer_ref,
rate_timer_ref,
- expiry_timer_ref
+ expiry_timer_ref,
+ stats_timer
}).
-record(consumer, {tag, ack_required}).
@@ -74,13 +75,8 @@
txn,
unsent_message_count}).
--define(INFO_KEYS,
- [name,
- durable,
- auto_delete,
- arguments,
- pid,
- owner_pid,
+-define(STATISTICS_KEYS,
+ [pid,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -91,6 +87,17 @@
backing_queue_status
]).
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ durable,
+ auto_delete,
+ arguments,
+ owner_pid
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
%%----------------------------------------------------------------------------
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -114,7 +121,8 @@ init(Q) ->
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
- expiry_timer_ref = undefined}, hibernate,
+ expiry_timer_ref = undefined,
+ stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -155,6 +163,10 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
+ rabbit_event:notify(
+ queue_created,
+ [{Item, i(Item, State)} ||
+ Item <- ?CREATION_EVENT_KEYS]),
noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -173,6 +185,7 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -189,9 +202,10 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
+ State2 = ensure_stats_timer(State1),
case BQ:needs_idle_timeout(BQS)of
- true -> {ensure_sync_timer(State1), 0};
- false -> {stop_sync_timer(State1), hibernate}
+ true -> {ensure_sync_timer(State2), 0};
+ false -> {stop_sync_timer(State2), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
@@ -227,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-
+
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
@@ -249,6 +263,18 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
+ensure_stats_timer(State = #q{stats_timer = StatsTimer,
+ q = Q}) ->
+ State#q{stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end,
+ fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
+
+stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
+ State#q{stats_timer = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> emit_stats(State) end)}.
+
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -560,6 +586,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+emit_stats(State) ->
+ rabbit_event:notify(queue_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
@@ -856,7 +886,11 @@ handle_cast(maybe_expire, State) ->
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
- end.
+ end;
+
+handle_cast(emit_stats, State) ->
+ emit_stats(State),
+ noreply(State).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -893,4 +927,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
+ {hibernate, stop_stats_timer(
+ stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index c76c01ac..b536fa79 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -48,7 +48,9 @@
({ok, rabbit_router:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
--spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()).
+-spec(publish/1 ::
+ (rabbit_types:delivery()) -> publish_result() |
+ rabbit_types:connection_exit()).
-spec(delivery/4 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
rabbit_types:message())
@@ -62,12 +64,12 @@
-spec(publish/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
- -> publish_result()).
+ -> publish_result() | rabbit_types:connection_exit()).
-spec(publish/7 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
properties_input(), binary())
- -> publish_result()).
+ -> publish_result() | rabbit_types:connection_exit()).
-spec(build_content/2 ::
(rabbit_framing:amqp_property_record(), binary())
-> rabbit_types:content()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c4ff361d..6ee6e209 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,6 +38,7 @@
-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([emit_stats/1, flush/1]).
-export([flow_timeout/2]).
@@ -48,25 +49,31 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow}).
+ consumer_mapping, blocking, queue_collector_pid, flow,
+ stats_timer}).
-record(flow, {server, client, pending}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
--define(INFO_KEYS,
+-define(STATISTICS_KEYS,
[pid,
- connection,
- number,
- user,
- vhost,
transactional,
consumer_count,
messages_unacknowledged,
acks_uncommitted,
prefetch_count]).
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ connection,
+ number,
+ user,
+ vhost]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -96,6 +103,7 @@
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-endif.
@@ -149,31 +157,41 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+emit_stats(Pid) ->
+ gen_server2:pcast(Pid, 7, emit_stats).
+
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
- {ok, #ch{state = starting,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- flow = #flow{server = true, client = true,
- pending = none}},
- hibernate,
+ State = #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none},
+ stats_timer = rabbit_event:init_stats_timer()},
+ rabbit_event:notify(
+ channel_created,
+ [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(info, _From, State) ->
@@ -185,6 +203,9 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -223,6 +244,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+ maybe_incr_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
@@ -243,6 +270,10 @@ handle_cast({flow_timeout, Ref},
"timeout waiting for channel.flow_ok{active=~w}",
[not Flow], none), State)};
handle_cast({flow_timeout, _Ref}, State) ->
+ {noreply, State};
+
+handle_cast(emit_stats, State) ->
+ internal_emit_stats(State),
{noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
@@ -252,11 +283,12 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State)}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {hibernate, State}.
+ {hibernate, stop_stats_timer(State)}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -274,9 +306,23 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ {reply, Reply, ensure_stats_timer(NewState), hibernate}.
+
+noreply(NewState) ->
+ {noreply, ensure_stats_timer(NewState), hibernate}.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ ChPid = self(),
+ State#ch{stats_timer = rabbit_event:ensure_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end,
+ fun() -> emit_stats(ChPid) end)}.
+
+stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
+ State#ch{stats_timer = rabbit_event:stop_stats_timer(
+ StatsTimer,
+ fun() -> internal_emit_stats(State) end)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -436,6 +482,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -446,7 +495,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(TxnKey, Acked),
+ QIncs = ack(TxnKey, Acked),
+ Participants = [QPid || {QPid, _} <- QIncs],
+ maybe_incr_stats(QIncs, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
@@ -469,11 +520,16 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ maybe_incr_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -988,7 +1044,7 @@ ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [QPid | L]
+ [{QPid, length(MsgIds)} | L]
end, [], UAQ).
make_tx_id() -> rabbit_guid:guid().
@@ -1115,6 +1171,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]),
rabbit_writer:shutdown(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
@@ -1137,3 +1194,60 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
+
+maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+ case rabbit_event:stats_level(StatsTimer) of
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
+ end.
+
+incr_stats({QPid, _} = QX, Inc, Measure) ->
+ maybe_monitor(QPid),
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ maybe_monitor(QPid),
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
+
+maybe_monitor(QPid) ->
+ case get({monitoring, QPid}) of
+ undefined -> erlang:monitor(process, QPid),
+ put({monitoring, QPid}, true);
+ _ -> ok
+ end.
+
+update_measures(Type, QX, Inc, Measure) ->
+ Measures = case get({Type, QX}) of
+ undefined -> [];
+ D -> D
+ end,
+ Cur = case orddict:find(Measure, Measures) of
+ error -> 0;
+ {ok, C} -> C
+ end,
+ put({Type, QX},
+ orddict:store(Measure, Cur + Inc, Measures)).
+
+internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
+ CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ case rabbit_event:stats_level(StatsTimer) of
+ coarse ->
+ rabbit_event:notify(channel_stats, CoarseStats);
+ fine ->
+ FineStats =
+ [{channel_queue_stats,
+ [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, CoarseStats ++ FineStats)
+ end.
+
+erase_queue_stats(QPid) ->
+ erase({monitoring, QPid}),
+ erase({queue_stats, QPid}),
+ [erase({queue_exchange_stats, QX}) ||
+ {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
new file mode 100644
index 00000000..07027cd5
--- /dev/null
+++ b/src/rabbit_event.erl
@@ -0,0 +1,131 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_event).
+
+-include("rabbit.hrl").
+
+-export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
+-export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
+-export([stats_level/1]).
+-export([notify/2]).
+
+%%----------------------------------------------------------------------------
+
+-record(state, {level, timer}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([event_type/0, event_props/0, event_timestamp/0, event/0]).
+
+-type(event_type() :: atom()).
+-type(event_props() :: term()).
+-type(event_timestamp() ::
+ {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
+
+-type(event() :: #event {
+ type :: event_type(),
+ props :: event_props(),
+ timestamp :: event_timestamp()
+ }).
+
+-type(level() :: 'none' | 'coarse' | 'fine').
+
+-opaque(state() :: #state {
+ level :: level(),
+ timer :: atom()
+ }).
+
+-type(timer_fun() :: fun (() -> 'ok')).
+
+-spec(init_stats_timer/0 :: () -> state()).
+-spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
+-spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
+-spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(stats_level/1 :: (state()) -> level()).
+-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+init_stats_timer() ->
+ {ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
+ #state{level = StatsLevel, timer = undefined}.
+
+ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ State;
+ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
+ NowFun(),
+ {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ State.
+
+stop_stats_timer(State = #state{level = none}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+ State;
+stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+ {ok, cancel} = timer:cancel(TRef),
+ NowFun(),
+ State#state{timer = undefined}.
+
+ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
+ State;
+ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [TimerFun, []]),
+ State#state{timer = TRef};
+ensure_stats_timer_after(State, _TimerFun) ->
+ State.
+
+reset_stats_timer_after(State) ->
+ State#state{timer = undefined}.
+
+stats_level(#state{level = Level}) ->
+ Level.
+
+notify(Type, Props) ->
+ try
+ gen_event:notify(rabbit_event, #event{type = Type,
+ props = Props,
+ timestamp = os:timestamp()})
+ catch error:badarg ->
+ %% badarg means rabbit_event is no longer registered. We never
+ %% unregister it so the great likelihood is that we're shutting
+ %% down the broker but some events were backed up. Ignore it.
+ ok
+ end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 49f87a22..75154388 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -67,22 +67,25 @@
fun((rabbit_types:exchange(), queue()) ->
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok' | rabbit_types:connection_exit()).
-spec(declare/5 ::
(name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
- -> rabbit_types:exchange()).
--spec(check_type/1 :: (binary()) -> atom()).
+ -> rabbit_types:exchange() | rabbit_types:connection_exit()).
+-spec(check_type/1 ::
+ (binary()) -> atom() | rabbit_types:connection_exit()).
-spec(assert_equivalence/5 ::
(rabbit_types:exchange(), atom(), boolean(), boolean(),
rabbit_framing:amqp_table())
- -> 'ok' | no_return()).
+ -> 'ok' | rabbit_types:connection_exit()).
-spec(assert_args_equivalence/2 ::
- (rabbit_types:exchange(), rabbit_framing:amqp_table()) ->
- 'ok' | no_return()).
+ (rabbit_types:exchange(), rabbit_framing:amqp_table())
+ -> 'ok' | rabbit_types:connection_exit()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:exchange()) |
rabbit_types:error('not_found')).
--spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange()).
+-spec(lookup_or_die/1 ::
+ (name()) -> rabbit_types:exchange() |
+ rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]).
@@ -93,27 +96,32 @@
-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+ -> {rabbit_router:routing_result(), [pid()]} |
+ rabbit_types:connection_exit()).
-spec(add_binding/5 ::
(name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table(), inner_fun())
- -> bind_res()).
+ -> bind_res() | rabbit_types:connection_exit()).
-spec(delete_binding/5 ::
(name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found')).
+ -> bind_res() | rabbit_types:error('binding_not_found') |
+ rabbit_types:connection_exit()).
-spec(list_bindings/1 ::
(rabbit_types:vhost())
-> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
rabbit_framing:amqp_table()}]).
-spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> none())).
+ (rabbit_amqqueue:name())
+ -> fun (() -> none()) | rabbit_types:connection_exit()).
-spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> none())).
+ (rabbit_amqqueue:name())
+ -> fun (() -> none()) | rabbit_types:connection_exit()).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
- rabbit_types:error('in_use')).
+ rabbit_types:error('in_use') |
+ rabbit_types:connection_exit()).
-spec(list_queue_bindings/1 ::
(rabbit_amqqueue:name())
-> [{name(), rabbit_router:routing_key(),
@@ -190,6 +198,9 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} -> TypeModule:create(X),
+ rabbit_event:notify(
+ exchange_created,
+ [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
X;
{existing, X} -> X;
Err -> Err
@@ -426,6 +437,12 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
X#exchange.durable andalso
Q#amqqueue.durable,
fun mnesia:write/3),
+ rabbit_event:notify(
+ binding_created,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName},
+ {routing_key, RoutingKey},
+ {arguments, Arguments}]),
{new, X, B};
[_R] ->
{existing, X, B}
@@ -458,6 +475,10 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
X#exchange.durable andalso
Q#amqqueue.durable,
fun mnesia:delete_object/3),
+ rabbit_event:notify(
+ binding_deleted,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName}]),
{maybe_auto_delete(X), B};
{error, _} = E ->
E
@@ -576,6 +597,7 @@ unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Bindings = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1b492646..4827506b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -86,25 +86,29 @@
-> rabbit_framing:amqp_method_name()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
--spec(die/1 :: (rabbit_framing:amqp_exception()) -> no_return()).
+-spec(die/1 ::
+ (rabbit_framing:amqp_exception())
+ -> rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
- -> no_return()).
+ -> rabbit_types:connection_exit()).
-spec(amqp_error/4 ::
(rabbit_framing:amqp_exception(), string(), [any()],
rabbit_framing:amqp_method_name())
-> rabbit_types:amqp_error()).
-spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()])
- -> no_return()).
+ -> rabbit_types:channel_exit() |
+ rabbit_types:connection_exit()).
-spec(protocol_error/4 ::
(rabbit_framing:amqp_exception(), string(), [any()],
rabbit_framing:amqp_method_name())
- -> no_return()).
+ -> rabbit_types:channel_exit() |
+ rabbit_types:connection_exit()).
-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
--spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()).
+-spec(not_found/1 :: (rabbit_types:r(atom())) -> rabbit_types:channel_exit()).
-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
rabbit_framing:amqp_table(),
rabbit_types:r(any()), [binary()]) ->
- 'ok' | no_return()).
+ 'ok' | rabbit_types:connection_exit()).
-spec(get_config/1 ::
(atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(get_config/2 :: (atom(), A) -> A).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a8b2ae54..17443926 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -43,6 +43,8 @@
-export([analyze_frame/3]).
+-export([emit_stats/1]).
+
-import(gen_tcp).
-import(fprof).
-import(inet).
@@ -58,12 +60,16 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+ queue_collector, stats_timer}).
+
+-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
+ send_pend, state, channels]).
--define(INFO_KEYS,
- [pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels,
- protocol, user, vhost, timeout, frame_max, client_properties]).
+-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port,
+ protocol, user, vhost, timeout, frame_max,
+ client_properties]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%% connection lifecycle
%%
@@ -141,6 +147,7 @@
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
@@ -181,6 +188,9 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
+emit_stats(Pid) ->
+ gen_server:cast(Pid, emit_stats).
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -254,7 +264,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector},
+ queue_collector = Collector,
+ stats_timer =
+ rabbit_event:init_stats_timer()},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -274,7 +286,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
rabbit_queue_collector:shutdown(Collector),
- rabbit_misc:unlink_and_capture_exit(Collector)
+ rabbit_misc:unlink_and_capture_exit(Collector),
+ rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
@@ -340,6 +353,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
catch Error -> {error, Error}
end),
mainloop(Parent, Deb, State);
+ {'$gen_cast', emit_stats} ->
+ internal_emit_stats(State),
+ mainloop(Parent, Deb,
+ State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -539,7 +558,8 @@ analyze_frame(_Type, _Body, _Protocol) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
%%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1};
+ {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1};
handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
case PayloadAndMarker of
@@ -609,6 +629,12 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ Self = self(),
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ StatsTimer,
+ fun() -> emit_stats(Self) end)}.
+
%%--------------------------------------------------------------------------
handle_method0(MethodName, FieldsBin,
@@ -679,8 +705,12 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State#v1{connection_state = running,
- connection = NewConnection};
+ State1 = State#v1{connection_state = running,
+ connection = NewConnection},
+ rabbit_event:notify(
+ connection_created,
+ [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ State1;
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -842,3 +872,7 @@ amqp_exception_explanation(Text, Expl) ->
if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
true -> CompleteTextBin
end.
+
+internal_emit_stats(State) ->
+ rabbit_event:notify(connection_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e6163d45..947fcd08 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -68,6 +68,7 @@ all_tests() ->
passed = test_app_management(),
passed = test_log_management_during_startup(),
passed = test_memory_pressure(),
+ passed = test_statistics(),
passed = test_option_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
@@ -1101,10 +1102,13 @@ test_memory_pressure_sync(Ch, Writer) ->
end.
test_memory_pressure_spawn() ->
+ test_spawn(fun test_memory_pressure_receiver/1).
+
+test_spawn(Receiver) ->
Me = self(),
- Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- <<"user">>, <<"/">>, self()),
+ Writer = spawn(fun () -> Receiver(Me) end),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>,
+ <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
MRef = erlang:monitor(process, Ch),
receive #'channel.open_ok'{} -> ok
@@ -1197,6 +1201,96 @@ test_memory_pressure() ->
passed.
+test_statistics_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_statistics_receiver(Pid)
+ end.
+
+test_statistics_event_receiver(Pid) ->
+ receive
+ Foo ->
+ Pid ! Foo,
+ test_statistics_event_receiver(Pid)
+ end.
+
+test_statistics_receive_event(Ch, Matcher) ->
+ rabbit_channel:flush(Ch),
+ rabbit_channel:emit_stats(Ch),
+ test_statistics_receive_event1(Ch, Matcher).
+
+test_statistics_receive_event1(Ch, Matcher) ->
+ receive #event{type = channel_stats, props = Props} ->
+ case Matcher(Props) of
+ true -> Props;
+ _ -> test_statistics_receive_event1(Ch, Matcher)
+ end
+ after 1000 -> throw(failed_to_receive_event)
+ end.
+
+test_statistics() ->
+ application:set_env(rabbit, collect_statistics, fine),
+
+ %% ATM this just tests the queue / exchange stats in channels. That's
+ %% by far the most complex code though.
+
+ %% Set up a channel and queue
+ {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} ->
+ Q0
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q#amqqueue.pid,
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ rabbit_tests_event_receiver:start(self()),
+
+ %% Check stats empty
+ Event = test_statistics_receive_event(Ch, fun (_) -> true end),
+ [] = proplists:get_value(channel_queue_stats, Event),
+ [] = proplists:get_value(channel_exchange_stats, Event),
+ [] = proplists:get_value(channel_queue_exchange_stats, Event),
+
+ %% Publish and get a message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
+
+ %% Check the stats reflect that
+ Event2 = test_statistics_receive_event(
+ Ch,
+ fun (E) ->
+ length(proplists:get_value(
+ channel_queue_exchange_stats, E)) > 0
+ end),
+ [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
+ [{{QPid,X},[{publish,1}]}] =
+ proplists:get_value(channel_queue_exchange_stats, Event2),
+
+ %% Check the stats remove stuff on queue deletion
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ Event3 = test_statistics_receive_event(
+ Ch,
+ fun (E) ->
+ length(proplists:get_value(
+ channel_queue_exchange_stats, E)) == 0
+ end),
+
+ [] = proplists:get_value(channel_queue_stats, Event3),
+ [{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event3),
+ [] = proplists:get_value(channel_queue_exchange_stats, Event3),
+
+ rabbit_channel:shutdown(Ch),
+ rabbit_tests_event_receiver:stop(),
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
Sender = fun (Pid) -> Pid ! {invoked, Self} end,
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
new file mode 100644
index 00000000..a92e3da7
--- /dev/null
+++ b/src/rabbit_tests_event_receiver.erl
@@ -0,0 +1,66 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_tests_event_receiver).
+
+-export([start/1, stop/0]).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+start(Pid) ->
+ gen_event:add_handler(rabbit_event, ?MODULE, [Pid]).
+
+stop() ->
+ gen_event:delete_handler(rabbit_event, ?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init([Pid]) ->
+ {ok, Pid}.
+
+handle_call(_Request, Pid) ->
+ {ok, not_understood, Pid}.
+
+handle_event(Event, Pid) ->
+ Pid ! Event,
+ {ok, Pid}.
+
+handle_info(_Info, Pid) ->
+ {ok, Pid}.
+
+terminate(_Arg, _Pid) ->
+ ok.
+
+code_change(_OldVsn, Pid, _Extra) ->
+ {ok, Pid}.
+
+%%----------------------------------------------------------------------------
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3aaf1917..a9313503 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -40,7 +40,11 @@
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
- user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
+ user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1,
+ channel_exit/0, connection_exit/0]).
+
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -133,7 +137,7 @@
-type(connection() :: pid()).
--type(protocol() :: atom()).
+-type(protocol() :: 'rabbit_framing_amqp_0_8' | 'rabbit_framing_amqp_0_9_1').
-type(user() ::
#user{username :: rabbit_access_control:username(),