summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-21 21:26:23 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-21 21:26:23 +0100
commit23054839976ab35667c18094cf553b776d8aa1de (patch)
treebdc666c52c9541434706a78bf6e2e01d91d7a25d
parente251316ee3ecff05d062432b89d2117eebde0fcd (diff)
downloadrabbitmq-server-23054839976ab35667c18094cf553b776d8aa1de.tar.gz
Rewrite the channel stats again to use ets. Not sure if this is any faster, maybe a bit.
-rw-r--r--src/rabbit_channel.erl89
1 files changed, 53 insertions, 36 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ef33a64c..aff63b61 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,7 +50,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, flow,
- stats_timer_ref}).
+ stats_table, stats_timer_ref}).
-record(flow, {server, client, pending}).
@@ -189,6 +189,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
pending = none},
+ stats_table = ets:new(anon, [private]),
stats_timer_ref = undefined},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -245,7 +246,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
case AckRequired of
true -> deliver;
false -> deliver_no_ack
- end),
+ end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
@@ -280,7 +281,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
State1 = queue_blocked(QPid, State),
- erase_queue_stats(QPid),
+ erase_queue_stats(QPid, State),
{noreply, State1}.
handle_pre_hibernate(State) ->
@@ -485,7 +486,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish),
+ QPid <- DeliveredQPids]], publish, State),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -498,7 +499,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
QIncs = ack(TxnKey, Acked),
Participants = [QPid || {QPid, _} <- QIncs],
- incr_stats(QIncs, ack),
+ incr_stats(QIncs, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
@@ -530,7 +531,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
case NoAck of
true -> get_no_ack;
false -> get
- end),
+ end, State1),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -1185,19 +1186,19 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-incr_stats(QXIncs, Measure) ->
- [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs].
+incr_stats(QXIncs, Measure, State) ->
+ [incr_stats(QX, Inc, Measure, State) || {QX, Inc} <- QXIncs].
-incr_stats({QPid, _} = QX, Inc, Measure) ->
+incr_stats({QPid, _} = QX, Inc, Measure, State) ->
maybe_monitor(QPid),
- update_measures(queue_exchange_stats, QX, Inc, Measure);
+ update_measures(queue_exchange_stats, QX, Inc, Measure, State);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
maybe_monitor(QPid),
- update_measures(queue_stats, QPid, Inc, Measure);
+ update_measures(queue_stats, QPid, Inc, Measure, State);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
+incr_stats(X, Inc, Measure, State) ->
+ update_measures(exchange_stats, X, Inc, Measure, State).
maybe_monitor(QPid) ->
case get({monitoring, QPid}) of
@@ -1206,31 +1207,47 @@ maybe_monitor(QPid) ->
_ -> ok
end.
-update_measures(Type, QX, Inc, Measure) ->
- Measures = case get({Type, QX}) of
- undefined -> [];
- D -> D
- end,
- Cur = case orddict:find(Measure, Measures) of
- error -> 0;
- {ok, C} -> C
- end,
- put({Type, QX},
- orddict:store(Measure, Cur + Inc, Measures)).
+update_measures(Type, QX, Inc, Measure, #ch{stats_table = Table}) ->
+ try
+ ets:update_counter(Table, {Type, QX, Measure}, Inc)
+ catch error:badarg ->
+ ets:insert_new(Table, {{Type, QX, Measure}, Inc})
+ end.
-internal_emit_stats(State) ->
+internal_emit_stats(State = #ch{stats_table = Table}) ->
+ {QStats, XStats, QXStats} =
+ lists:foldl(
+ fun ({{Type, QX, Measure}, Val}, Stats) ->
+ store(Type, QX, Measure, Val, Stats)
+ end,
+ {[], [], []},
+ [T || T <- ets:tab2list(Table)]
+ ),
rabbit_event:notify(
channel_stats,
- [{queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {queue_exchange_stats,
- [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++
+ [{queue_stats, QStats},
+ {exchange_stats, XStats},
+ {queue_exchange_stats, QXStats}] ++
[{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
-erase_queue_stats(QPid) ->
+store(Type, QX, Measure, Val, Stats) ->
+ Index = case Type of
+ queue_stats -> 1;
+ exchange_stats -> 2;
+ queue_exchange_stats -> 3
+ end,
+ setelement(Index, Stats,
+ orddict:update(
+ QX,
+ fun (Measures) ->
+ orddict:store(Measure, Val, Measures)
+ end,
+ [{Measure, Val}], element(Index, Stats))).
+
+erase_queue_stats(QPid, #ch{stats_table = Table}) ->
erase({monitoring, QPid}),
- erase({queue_stats, QPid}),
- [erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0].
+ [ets:delete(Table, K) ||
+ {K, _V} <- ets:match(Table, {{queue_stats, QPid, '_'}, '_'})],
+ [ets:delete(Table, K) ||
+ {K, _V} <-
+ ets:match(Table, {{queue_exchange_stats, {QPid, '_'}, '_'}, '_'})].