diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-26 13:12:12 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-26 13:12:12 +0100 |
commit | 4322cc5fa45c0726db7811726713fab62e0e1a99 (patch) | |
tree | ee373f92a07da6abc2bb12be42395865e97386f2 | |
parent | a8b9e4a4633ed2d5f0809c19f9bbb174e3050835 (diff) | |
download | rabbitmq-server-4322cc5fa45c0726db7811726713fab62e0e1a99.tar.gz |
Revert to the prior, process dictionary-based way of storing statistics.
-rw-r--r-- | src/rabbit_channel.erl | 90 |
1 files changed, 38 insertions, 52 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ac52a7f1..d18329ff 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow, - stats_table, stats_timer_ref}). + stats_timer_ref}). -record(flow, {server, client, pending}). @@ -189,7 +189,6 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, pending = none}, - stats_table = ets:new(anon, [private]), stats_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -246,7 +245,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, case AckRequired of true -> deliver; false -> deliver_no_ack - end, State), + end), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> @@ -281,7 +280,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> State1 = queue_blocked(QPid, State), - erase_queue_stats(QPid, State), + erase_queue_stats(QPid), {noreply, State1}. handle_pre_hibernate(State) -> @@ -486,7 +485,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -499,7 +498,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), QIncs = ack(TxnKey, Acked), Participants = [QPid || {QPid, _} <- QIncs], - incr_stats(QIncs, ack, State), + incr_stats(QIncs, ack), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State#ch{unacked_message_q = Remaining}; @@ -531,7 +530,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, case NoAck of true -> get_no_ack; false -> get - end, State1), + end), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -1186,19 +1185,19 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_stats(QXIncs, Measure, State) -> - [incr_stats(QX, Inc, Measure, State) || {QX, Inc} <- QXIncs]. +incr_stats(QXIncs, Measure) -> + [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]. -incr_stats({QPid, _} = QX, Inc, Measure, State) -> +incr_stats({QPid, _} = QX, Inc, Measure) -> maybe_monitor(QPid), - update_measures(queue_exchange_stats, QX, Inc, Measure, State); + update_measures(queue_exchange_stats, QX, Inc, Measure); -incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> maybe_monitor(QPid), - update_measures(queue_stats, QPid, Inc, Measure, State); + update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(X, Inc, Measure, State) -> - update_measures(exchange_stats, X, Inc, Measure, State). +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). maybe_monitor(QPid) -> case get({monitoring, QPid}) of @@ -1207,44 +1206,31 @@ maybe_monitor(QPid) -> _ -> ok end. -update_measures(Type, QX, Inc, Measure, #ch{stats_table = Table}) -> - try - ets:update_counter(Table, {Type, QX, Measure}, Inc) - catch error:badarg -> - ets:insert_new(Table, {{Type, QX, Measure}, Inc}) - end. - -internal_emit_stats(State = #ch{stats_table = Table}) -> - {QStats, XStats, QXStats} = - lists:foldl( - fun ({{Type, QX, Measure}, Val}, Stats) -> - store(Type, QX, Measure, Val, Stats) +update_measures(Type, QX, Inc, Measure) -> + Measures = case get({Type, QX}) of + undefined -> []; + D -> D + end, + Cur = case orddict:find(Measure, Measures) of + error -> 0; + {ok, C} -> C end, - {[], [], []}, - [T || T <- ets:tab2list(Table)] - ), + put({Type, QX}, + orddict:store(Measure, Cur + Inc, Measures)). + +internal_emit_stats(State) -> rabbit_event:notify( channel_stats, - [{queue_stats, QStats}, - {exchange_stats, XStats}, - {queue_exchange_stats, QXStats}] ++ - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). - -store(Type, QX, Measure, Val, Stats) -> - Index = case Type of - queue_stats -> 1; - exchange_stats -> 2; - queue_exchange_stats -> 3 - end, - setelement(Index, Stats, - orddict:update( - QX, - fun (Measures) -> - orddict:store(Measure, Val, Measures) - end, - [{Measure, Val}], element(Index, Stats))). - -erase_queue_stats(QPid, #ch{stats_table = Table}) -> + [{queue_stats, + [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, + {exchange_stats, + [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {queue_exchange_stats, + [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++ + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + +erase_queue_stats(QPid) -> erase({monitoring, QPid}), - ets:match_delete(Table, {{queue_stats, QPid, '_'}, '_'}), - ets:match_delete(Table, {{queue_exchange_stats, {QPid, '_'}, '_'}, '_'}). + erase({queue_stats, QPid}), + [erase({queue_exchange_stats, QX}) || + {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0]. |