diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 10:32:13 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 10:32:13 +0100 |
commit | a4d652ec66431782a6907609b7a87fd2aa77fca5 (patch) | |
tree | 81867bb8a46a0e91dee9d2ca2d600f7765118092 | |
parent | 0cb2eb7b8754fac0078e8fb8db32f9f893c262a1 (diff) | |
download | rabbitmq-server-a4d652ec66431782a6907609b7a87fd2aa77fca5.tar.gz |
Store (channel, queue, exchange) stats instead of (channel, exchange). Unify the two stats dictionaries in the channel.
-rw-r--r-- | src/rabbit_channel.erl | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b1a67640..2ff337f8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, flow, - exchange_stats, queue_stats, last_stats_update}). + queue_exchange_stats, last_stats_update}). -record(flow, {server, client, pending}). @@ -184,8 +184,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, pending = none}, - exchange_stats = dict:new(), - queue_stats = dict:new(), + queue_exchange_stats = dict:new(), last_stats_update = {0,0,0}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -238,11 +237,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, - State2 = incr_queue_stats([{QPid, 1}], - case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), + State2 = incr_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), noreply(State2#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> @@ -273,7 +272,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> State1 = queue_blocked(QPid, State), - State2 = erase_stats(QPid, State1), + State2 = erase_queue_stats(QPid, State1), {noreply, State2}. handle_pre_hibernate(State) -> @@ -461,7 +460,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, - State1 = incr_exchange_stats([{ExchangeName, 1}], publish, State), + State1 = incr_stats( + [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish, + State), {noreply, case TxnKey of none -> State1; _ -> add_tx_participants(DeliveredQPids, State1) @@ -474,7 +475,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), QsCounts = ack(TxnKey, Acked), Participants = [QPid || {QPid, _} <- QsCounts], - State1 = incr_queue_stats(QsCounts, ack, State), + State1 = incr_stats(QsCounts, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), State1#ch{unacked_message_q = Remaining}; @@ -502,11 +503,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), - State2 = incr_queue_stats([{QPid, 1}], - case NoAck of - true -> get_no_ack; - false -> get - end, State1), + State2 = incr_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end, State1), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -1161,18 +1162,16 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_exchange_stats(XCounts, Item, State = #ch{exchange_stats = Stats}) -> - State#ch{exchange_stats = incr_stats(XCounts, Item, Stats)}. - -incr_queue_stats(QCounts, Item, State = #ch{queue_stats = Stats}) -> - State#ch{queue_stats = incr_stats(QCounts, Item, Stats)}. - -incr_stats(QXCounts, Item, Stats) -> +incr_stats(QXCounts, Item, State = #ch{queue_exchange_stats = Stats}) -> Stats1 = lists:foldl( fun ({QX, Inc}, Stats0) -> - case is_pid(QX) andalso not dict:is_key(QX, Stats) of - true -> erlang:monitor(process, QX); - _ -> ok + QPid = case QX of + {Q, _X} -> Q; + Q -> Q + end, + case dict:is_key(QPid, Stats) of + false -> erlang:monitor(process, QPid); + _ -> ok end, dict:update(QX, fun(D) -> @@ -1185,23 +1184,28 @@ incr_stats(QXCounts, Item, Stats) -> [], Stats0) end, Stats, QXCounts), - Stats1. + State#ch{queue_exchange_stats = Stats1}. -maybe_emit_stats(State = #ch{exchange_stats = ExchangeStats, - queue_stats = QueueStats, +maybe_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats, last_stats_update = LastUpdate}) -> Now = os:timestamp(), case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of true -> rabbit_event:notify( channel_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS] ++ - [{per_exchange_stats, dict:to_list(ExchangeStats)}, - {per_queue_stats, dict:to_list(QueueStats)}]), + [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} | + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]), State#ch{last_stats_update = Now}; _ -> State end. -erase_stats(QPid, State = #ch{queue_stats = QueueStats}) -> - State#ch{queue_stats = dict:erase(QPid, QueueStats)}. +erase_queue_stats(QPid, State = #ch{queue_exchange_stats = Stats}) -> + Stats1 = dict:erase(QPid, Stats), + Stats2 = dict:filter(fun (K, _V) -> + case K of + {QPid, _} -> false; + _ -> true + end + end, Stats1), + State#ch{queue_exchange_stats = Stats2}. |