diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 16:17:01 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-21 16:17:01 +0100 |
commit | f1ffcf3fc686d596180f486150a0ff39a94202e8 (patch) | |
tree | 64843407ec05f9f317677dad919b4e21404fc27e | |
parent | ea28306cba88cbc4df6552840f27bfee217ff38a (diff) | |
download | rabbitmq-server-f1ffcf3fc686d596180f486150a0ff39a94202e8.tar.gz |
Store queue/exchange stats in the process dictionary since that's rather a lot faster than dict:
-rw-r--r-- | src/rabbit_channel.erl | 115 |
1 files changed, 55 insertions, 60 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 21d2b3de..7fe29d3e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -242,12 +242,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, - State2 = incr_stats([{QPid, 1}], - case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - noreply(State2#ch{next_tag = DeliveryTag + 1}); + incr_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end), + noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> noreply(State); @@ -281,8 +281,8 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> State1 = queue_blocked(QPid, State), - State2 = erase_queue_stats(QPid, State1), - {noreply, State2}. + erase_queue_stats(QPid), + {noreply, State1}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -485,12 +485,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, - State1 = incr_stats( - [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish, - State), + incr_stats([{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish), {noreply, case TxnKey of - none -> State1; - _ -> add_tx_participants(DeliveredQPids, State1) + none -> State; + _ -> add_tx_participants(DeliveredQPids, State) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -498,18 +496,18 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - QsCounts = ack(TxnKey, Acked), - Participants = [QPid || {QPid, _} <- QsCounts], - State1 = incr_stats(QsCounts, ack, State), + QsIncs = ack(TxnKey, Acked), + Participants = [QPid || {QPid, _} <- QsIncs], + incr_stats(QsIncs, ack), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), - State1#ch{unacked_message_q = Remaining}; + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( Participants, - State1#ch{unacked_message_q = Remaining, - uncommitted_ack_q = NewUAQ}) + State#ch{unacked_message_q = Remaining, + uncommitted_ack_q = NewUAQ}) end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -528,11 +526,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), - State2 = incr_stats([{QPid, 1}], - case NoAck of - true -> get_no_ack; - false -> get - end, State1), + incr_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -541,7 +539,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State2#ch{next_tag = DeliveryTag + 1}}; + {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -1187,42 +1185,39 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_stats(QXCounts, Item, State = #ch{queue_exchange_stats = Stats}) -> - Stats1 = lists:foldl( - fun ({QX, Inc}, Stats0) -> - QPid = case QX of - {Q, _X} -> Q; - Q -> Q - end, - case dict:is_key(QPid, Stats0) of - false -> erlang:monitor(process, QPid); - _ -> ok - end, - dict:update(QX, - fun(D) -> - Count = case orddict:find(Item, D) of - error -> 0; - {ok, C} -> C - end, - orddict:store(Item, Count + Inc, D) - end, - [], - Stats0) - end, Stats, QXCounts), - State#ch{queue_exchange_stats = Stats1}. - -internal_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats}) -> +incr_stats(QXIncs, Item) -> + [incr_stats(QX, Inc, Item) || {QX, Inc} <- QXIncs]. + +incr_stats(QX, Inc, Item) -> + QPid = case QX of + {Q, _X} -> Q; + Q -> Q + end, + case get({monitoring, QPid}) of + undefined -> erlang:monitor(process, QPid), + put({monitoring, QPid}, true); + _ -> ok + end, + Measures = case get({queue_exchange_stats, QX}) of + undefined -> []; + D -> D + end, + Cur = case orddict:find(Item, Measures) of + error -> 0; + {ok, C} -> C + end, + put({queue_exchange_stats, QX}, orddict:store(Item, Cur + Inc, Measures)). + +internal_emit_stats(State) -> rabbit_event:notify( channel_stats, - [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} | + [{queue_exchange_stats, + [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]} | [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]). -erase_queue_stats(QPid, State = #ch{queue_exchange_stats = Stats}) -> - Stats1 = dict:erase(QPid, Stats), - Stats2 = dict:filter(fun (K, _V) -> - case K of - {QPid, _} -> false; - _ -> true - end - end, Stats1), - State#ch{queue_exchange_stats = Stats2}. +erase_queue_stats(QPid) -> + erase({monitoring, QPid}), + erase({queue_exchange_stats, QPid}). + %% TODO make this work + %% [erase({queue_exchange_stats, QX}) || + %% {{queue_exchange_stats, QX}, _} <- get(), {QPid, _} == QX]. |