summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-26 13:12:12 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-26 13:12:12 +0100
commit4322cc5fa45c0726db7811726713fab62e0e1a99 (patch)
treeee373f92a07da6abc2bb12be42395865e97386f2
parenta8b9e4a4633ed2d5f0809c19f9bbb174e3050835 (diff)
downloadrabbitmq-server-4322cc5fa45c0726db7811726713fab62e0e1a99.tar.gz
Revert to the prior, process dictionary-based way of storing statistics.
-rw-r--r--src/rabbit_channel.erl90
1 files changed, 38 insertions, 52 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ac52a7f1..d18329ff 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,7 +50,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, flow,
- stats_table, stats_timer_ref}).
+ stats_timer_ref}).
-record(flow, {server, client, pending}).
@@ -189,7 +189,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
pending = none},
- stats_table = ets:new(anon, [private]),
stats_timer_ref = undefined},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -246,7 +245,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
case AckRequired of
true -> deliver;
false -> deliver_no_ack
- end, State),
+ end),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
@@ -281,7 +280,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
State1 = queue_blocked(QPid, State),
- erase_queue_stats(QPid, State),
+ erase_queue_stats(QPid),
{noreply, State1}.
handle_pre_hibernate(State) ->
@@ -486,7 +485,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State),
+ QPid <- DeliveredQPids]], publish),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -499,7 +498,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
QIncs = ack(TxnKey, Acked),
Participants = [QPid || {QPid, _} <- QIncs],
- incr_stats(QIncs, ack, State),
+ incr_stats(QIncs, ack),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State#ch{unacked_message_q = Remaining};
@@ -531,7 +530,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
case NoAck of
true -> get_no_ack;
false -> get
- end, State1),
+ end),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -1186,19 +1185,19 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-incr_stats(QXIncs, Measure, State) ->
- [incr_stats(QX, Inc, Measure, State) || {QX, Inc} <- QXIncs].
+incr_stats(QXIncs, Measure) ->
+ [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs].
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
+incr_stats({QPid, _} = QX, Inc, Measure) ->
maybe_monitor(QPid),
- update_measures(queue_exchange_stats, QX, Inc, Measure, State);
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
maybe_monitor(QPid),
- update_measures(queue_stats, QPid, Inc, Measure, State);
+ update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure, State).
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
maybe_monitor(QPid) ->
case get({monitoring, QPid}) of
@@ -1207,44 +1206,31 @@ maybe_monitor(QPid) ->
_ -> ok
end.
-update_measures(Type, QX, Inc, Measure, #ch{stats_table = Table}) ->
- try
- ets:update_counter(Table, {Type, QX, Measure}, Inc)
- catch error:badarg ->
- ets:insert_new(Table, {{Type, QX, Measure}, Inc})
- end.
-
-internal_emit_stats(State = #ch{stats_table = Table}) ->
- {QStats, XStats, QXStats} =
- lists:foldl(
- fun ({{Type, QX, Measure}, Val}, Stats) ->
- store(Type, QX, Measure, Val, Stats)
+update_measures(Type, QX, Inc, Measure) ->
+ Measures = case get({Type, QX}) of
+ undefined -> [];
+ D -> D
+ end,
+ Cur = case orddict:find(Measure, Measures) of
+ error -> 0;
+ {ok, C} -> C
end,
- {[], [], []},
- [T || T <- ets:tab2list(Table)]
- ),
+ put({Type, QX},
+ orddict:store(Measure, Cur + Inc, Measures)).
+
+internal_emit_stats(State) ->
rabbit_event:notify(
channel_stats,
- [{queue_stats, QStats},
- {exchange_stats, XStats},
- {queue_exchange_stats, QXStats}] ++
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
-
-store(Type, QX, Measure, Val, Stats) ->
- Index = case Type of
- queue_stats -> 1;
- exchange_stats -> 2;
- queue_exchange_stats -> 3
- end,
- setelement(Index, Stats,
- orddict:update(
- QX,
- fun (Measures) ->
- orddict:store(Measure, Val, Measures)
- end,
- [{Measure, Val}], element(Index, Stats))).
-
-erase_queue_stats(QPid, #ch{stats_table = Table}) ->
+ [{queue_stats,
+ [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
+ {exchange_stats,
+ [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {queue_exchange_stats,
+ [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+
+erase_queue_stats(QPid) ->
erase({monitoring, QPid}),
- ets:match_delete(Table, {{queue_stats, QPid, '_'}, '_'}),
- ets:match_delete(Table, {{queue_exchange_stats, {QPid, '_'}, '_'}, '_'}).
+ erase({queue_stats, QPid}),
+ [erase({queue_exchange_stats, QX}) ||
+ {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0].