diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 21:26:23 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 21:26:23 +0100 |
commit | 23054839976ab35667c18094cf553b776d8aa1de (patch) | |
tree | bdc666c52c9541434706a78bf6e2e01d91d7a25d | |
parent | e251316ee3ecff05d062432b89d2117eebde0fcd (diff) | |
download | rabbitmq-server-23054839976ab35667c18094cf553b776d8aa1de.tar.gz |
Rewrite the channel stats again to use ets. Not sure if this is any faster, maybe a bit.
-rw-r--r-- | src/rabbit_channel.erl | 89 |
1 files changed, 53 insertions, 36 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ef33a64c..aff63b61 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow, - stats_timer_ref}). + stats_table, stats_timer_ref}). -record(flow, {server, client, pending}). @@ -189,6 +189,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, pending = none}, + stats_table = ets:new(anon, [private]), stats_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -245,7 +246,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, case AckRequired of true -> deliver; false -> deliver_no_ack - end), + end, State), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> @@ -280,7 +281,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> State1 = queue_blocked(QPid, State), - erase_queue_stats(QPid), + erase_queue_stats(QPid, State), {noreply, State1}. handle_pre_hibernate(State) -> @@ -485,7 +486,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish), + QPid <- DeliveredQPids]], publish, State), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -498,7 +499,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), QIncs = ack(TxnKey, Acked), Participants = [QPid || {QPid, _} <- QIncs], - incr_stats(QIncs, ack), + incr_stats(QIncs, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -530,7 +531,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, case NoAck of true -> get_no_ack; false -> get - end), + end, State1), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -1185,19 +1186,19 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_stats(QXIncs, Measure) -> - [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]. +incr_stats(QXIncs, Measure, State) -> + [incr_stats(QX, Inc, Measure, State) || {QX, Inc} <- QXIncs]. -incr_stats({QPid, _} = QX, Inc, Measure) -> +incr_stats({QPid, _} = QX, Inc, Measure, State) -> maybe_monitor(QPid), - update_measures(queue_exchange_stats, QX, Inc, Measure); + update_measures(queue_exchange_stats, QX, Inc, Measure, State); -incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> +incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> maybe_monitor(QPid), - update_measures(queue_stats, QPid, Inc, Measure); + update_measures(queue_stats, QPid, Inc, Measure, State); -incr_stats(X, Inc, Measure) -> - update_measures(exchange_stats, X, Inc, Measure). +incr_stats(X, Inc, Measure, State) -> + update_measures(exchange_stats, X, Inc, Measure, State). maybe_monitor(QPid) -> case get({monitoring, QPid}) of @@ -1206,31 +1207,47 @@ maybe_monitor(QPid) -> _ -> ok end. -update_measures(Type, QX, Inc, Measure) -> - Measures = case get({Type, QX}) of - undefined -> []; - D -> D - end, - Cur = case orddict:find(Measure, Measures) of - error -> 0; - {ok, C} -> C - end, - put({Type, QX}, - orddict:store(Measure, Cur + Inc, Measures)). +update_measures(Type, QX, Inc, Measure, #ch{stats_table = Table}) -> + try + ets:update_counter(Table, {Type, QX, Measure}, Inc) + catch error:badarg -> + ets:insert_new(Table, {{Type, QX, Measure}, Inc}) + end. -internal_emit_stats(State) -> +internal_emit_stats(State = #ch{stats_table = Table}) -> + {QStats, XStats, QXStats} = + lists:foldl( + fun ({{Type, QX, Measure}, Val}, Stats) -> + store(Type, QX, Measure, Val, Stats) + end, + {[], [], []}, + [T || T <- ets:tab2list(Table)] + ), rabbit_event:notify( channel_stats, - [{queue_stats, - [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, - {exchange_stats, - [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {queue_exchange_stats, - [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++ + [{queue_stats, QStats}, + {exchange_stats, XStats}, + {queue_exchange_stats, QXStats}] ++ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). -erase_queue_stats(QPid) -> +store(Type, QX, Measure, Val, Stats) -> + Index = case Type of + queue_stats -> 1; + exchange_stats -> 2; + queue_exchange_stats -> 3 + end, + setelement(Index, Stats, + orddict:update( + QX, + fun (Measures) -> + orddict:store(Measure, Val, Measures) + end, + [{Measure, Val}], element(Index, Stats))). + +erase_queue_stats(QPid, #ch{stats_table = Table}) -> erase({monitoring, QPid}), - erase({queue_stats, QPid}), - [erase({queue_exchange_stats, QX}) || - {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0]. + [ets:delete(Table, K) || + {K, _V} <- ets:match(Table, {{queue_stats, QPid, '_'}, '_'})], + [ets:delete(Table, K) || + {K, _V} <- + ets:match(Table, {{queue_exchange_stats, {QPid, '_'}, '_'}, '_'})]. |