summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-21 14:15:31 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-21 14:15:31 +0100
commitea28306cba88cbc4df6552840f27bfee217ff38a (patch)
treec8a6925f43361708dad0697deb5af3b594982506
parentce04648364101332f298ffda7558496990d465b4 (diff)
downloadrabbitmq-server-ea28306cba88cbc4df6552840f27bfee217ff38a.tar.gz
Invoking os:timestamp and timer:now_diff all the time is too expensive, use timers instead.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl47
-rw-r--r--src/rabbit_channel.erl55
-rw-r--r--src/rabbit_reader.erl41
5 files changed, 100 insertions, 53 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index ac7e3851..5df9e690 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -82,7 +82,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(STATISTICS_UPDATE_INTERVAL, 5000000). %% microseconds
+-define(STATS_INTERVAL, 5000).
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f1b52768..7bc85f2e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -41,6 +41,7 @@
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
@@ -104,6 +105,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
+-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -305,6 +307,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+emit_stats(#amqqueue{pid = QPid}) ->
+ delegate_cast(QPid, emit_stats).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -449,6 +454,9 @@ safe_delegate_call_ok(H, F, Pids) ->
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
+delegate_cast(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
+
delegate_pcall(Pid, Pri, Msg, Timeout) ->
delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78f3de97..e7d4817d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -58,7 +58,7 @@
blocked_consumers,
sync_timer_ref,
rate_timer_ref,
- last_stats_update
+ stats_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -117,7 +117,7 @@ init(Q) ->
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
rate_timer_ref = undefined,
- last_stats_update = {0,0,0}}, hibernate,
+ stats_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -191,7 +191,7 @@ noreply(NewState) ->
next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
- State2 = maybe_emit_stats(State1),
+ State2 = ensure_stats_timer(State1),
case BQ:needs_sync(BQS)of
true -> {ensure_sync_timer(State2), 0};
false -> {stop_sync_timer(State2), hibernate}
@@ -231,6 +231,23 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
+ensure_stats_timer(State = #q{stats_timer_ref = undefined, q = Q}) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ rabbit_amqqueue, emit_stats,
+ [Q]),
+ State#q{stats_timer_ref = TRef};
+ensure_stats_timer(State) ->
+ State.
+
+stop_stats_timer(State = #q{stats_timer_ref = undefined}) ->
+ emit_stats(State),
+ State;
+stop_stats_timer(State = #q{stats_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ emit_stats(State),
+ State#q{stats_timer_ref = undefined}.
+
+
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -541,19 +558,10 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-%---------------------------------------------------------------------------
+emit_stats(State) ->
+ rabbit_event:notify(queue_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
-maybe_emit_stats(State = #q{last_stats_update = LastUpdate}) ->
- Now = os:timestamp(),
- case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of
- true ->
- rabbit_event:notify(
- queue_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]),
- State#q{last_stats_update = Now};
- _ ->
- State
- end.
%---------------------------------------------------------------------------
handle_call({init, Recover}, From,
@@ -829,7 +837,11 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State).
+ noreply(State);
+
+handle_cast(emit_stats, State) ->
+ emit_stats(State),
+ noreply(State#q{stats_timer_ref = undefined}).
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -866,4 +878,5 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
+ {hibernate, stop_stats_timer(
+ stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c878617..21d2b3de 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,6 +38,7 @@
-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([emit_stats/1]).
-export([flow_timeout/2]).
@@ -49,7 +50,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, flow,
- queue_exchange_stats, last_stats_update}).
+ queue_exchange_stats, stats_timer_ref}).
-record(flow, {server, client, pending}).
@@ -100,6 +101,7 @@
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-endif.
@@ -155,6 +157,9 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+emit_stats(Pid) ->
+ gen_server2:cast(Pid, emit_stats).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
@@ -185,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
flow = #flow{server = true, client = true,
pending = none},
queue_exchange_stats = dict:new(),
- last_stats_update = {0,0,0}},
+ stats_timer_ref = undefined},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -262,7 +267,11 @@ handle_cast({flow_timeout, Ref},
"timeout waiting for channel.flow_ok{active=~w}",
[not Flow], none), State)};
handle_cast({flow_timeout, _Ref}, State) ->
- {noreply, State}.
+ {noreply, State};
+
+handle_cast(emit_stats, State) ->
+ internal_emit_stats(State),
+ noreply(State#ch{stats_timer_ref = undefined}).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -277,7 +286,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
- {hibernate, State}.
+ {hibernate, stop_stats_timer(State)}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -296,13 +305,29 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
reply(Reply, NewState) ->
- NewState1 = maybe_emit_stats(NewState),
+ NewState1 = ensure_stats_timer(NewState),
{reply, Reply, NewState1, hibernate}.
noreply(NewState) ->
- NewState1 = maybe_emit_stats(NewState),
+ NewState1 = ensure_stats_timer(NewState),
{noreply, NewState1, hibernate}.
+ensure_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ rabbit_channel, emit_stats,
+ [self()]),
+ State#ch{stats_timer_ref = TRef};
+ensure_stats_timer(State) ->
+ State.
+
+stop_stats_timer(State = #ch{stats_timer_ref = undefined}) ->
+ internal_emit_stats(State),
+ State;
+stop_stats_timer(State = #ch{stats_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ internal_emit_stats(State),
+ State#ch{stats_timer_ref = undefined}.
+
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1186,19 +1211,11 @@ incr_stats(QXCounts, Item, State = #ch{queue_exchange_stats = Stats}) ->
end, Stats, QXCounts),
State#ch{queue_exchange_stats = Stats1}.
-maybe_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats,
- last_stats_update = LastUpdate}) ->
- Now = os:timestamp(),
- case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of
- true ->
- rabbit_event:notify(
- channel_stats,
- [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} |
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]),
- State#ch{last_stats_update = Now};
- _ ->
- State
- end.
+internal_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats}) ->
+ rabbit_event:notify(
+ channel_stats,
+ [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} |
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]).
erase_queue_stats(QPid, State = #ch{queue_exchange_stats = Stats}) ->
Stats1 = dict:erase(QPid, Stats),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4be191ae..1143794e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -43,6 +43,8 @@
-export([analyze_frame/2]).
+-export([emit_stats/1]).
+
-import(gen_tcp).
-import(fprof).
-import(inet).
@@ -58,7 +60,7 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector, last_stats_update}).
+ queue_collector, stats_timer_ref}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -144,6 +146,7 @@
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
@@ -184,6 +187,9 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
+emit_stats(Pid) ->
+ gen_server2:cast(Pid, emit_stats).
+
setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
@@ -257,7 +263,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
recv_ref = none,
connection_state = pre_init,
queue_collector = Collector,
- last_stats_update = {0,0,0}},
+ stats_timer_ref = undefined},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -282,9 +288,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
end,
done.
-mainloop(Parent, Deb, State_ = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
- State = maybe_emit_stats(State_),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
@@ -345,6 +350,9 @@ mainloop(Parent, Deb, State_ = #v1{sock= Sock, recv_ref = Ref}) ->
catch Error -> {error, Error}
end),
mainloop(Parent, Deb, State);
+ {'$gen_cast', emit_stats} ->
+ internal_emit_stats(State),
+ mainloop(Parent, Deb, State#v1{stats_timer_ref = undefined});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -537,7 +545,8 @@ analyze_frame(_Type, _Body) ->
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
%%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]),
- {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1};
+ {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1};
handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) ->
case PayloadAndMarker of
@@ -591,6 +600,14 @@ check_version(ClientVersion, ServerVersion) ->
(ClientMajor == ServerMajor andalso
ClientMinor >= ServerMinor).
+ensure_stats_timer(State = #v1{stats_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ rabbit_reader, emit_stats,
+ [self()]),
+ State#v1{stats_timer_ref = TRef};
+ensure_stats_timer(State) ->
+ State.
+
%%--------------------------------------------------------------------------
handle_method0(MethodName, FieldsBin, State) ->
@@ -857,14 +874,6 @@ amqp_exception_explanation(Text, Expl) ->
true -> CompleteTextBin
end.
-maybe_emit_stats(State = #v1{last_stats_update = LastUpdate}) ->
- Now = os:timestamp(),
- case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of
- true ->
- rabbit_event:notify(
- connection_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]),
- State#v1{last_stats_update = Now};
- _ ->
- State
- end.
+internal_emit_stats(State) ->
+ rabbit_event:notify(connection_stats,
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).