summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-21 16:17:01 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-21 16:17:01 +0100
commitf1ffcf3fc686d596180f486150a0ff39a94202e8 (patch)
tree64843407ec05f9f317677dad919b4e21404fc27e
parentea28306cba88cbc4df6552840f27bfee217ff38a (diff)
downloadrabbitmq-server-f1ffcf3fc686d596180f486150a0ff39a94202e8.tar.gz
Store queue/exchange stats in the process dictionary since that's rather a lot faster than dict:
-rw-r--r--src/rabbit_channel.erl115
1 files changed, 55 insertions, 60 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 21d2b3de..7fe29d3e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -242,12 +242,12 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
{_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
- State2 = incr_stats([{QPid, 1}],
- case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- noreply(State2#ch{next_tag = DeliveryTag + 1});
+ incr_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end),
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
noreply(State);
@@ -281,8 +281,8 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
State1 = queue_blocked(QPid, State),
- State2 = erase_queue_stats(QPid, State1),
- {noreply, State2}.
+ erase_queue_stats(QPid),
+ {noreply, State1}.
handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
@@ -485,12 +485,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
- State1 = incr_stats(
- [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish,
- State),
+ incr_stats([{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish),
{noreply, case TxnKey of
- none -> State1;
- _ -> add_tx_participants(DeliveredQPids, State1)
+ none -> State;
+ _ -> add_tx_participants(DeliveredQPids, State)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -498,18 +496,18 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- QsCounts = ack(TxnKey, Acked),
- Participants = [QPid || {QPid, _} <- QsCounts],
- State1 = incr_stats(QsCounts, ack, State),
+ QsIncs = ack(TxnKey, Acked),
+ Participants = [QPid || {QPid, _} <- QsIncs],
+ incr_stats(QsIncs, ack),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
- State1#ch{unacked_message_q = Remaining};
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
Participants,
- State1#ch{unacked_message_q = Remaining,
- uncommitted_ack_q = NewUAQ})
+ State#ch{unacked_message_q = Remaining,
+ uncommitted_ack_q = NewUAQ})
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -528,11 +526,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
- State2 = incr_stats([{QPid, 1}],
- case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
+ incr_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -541,7 +539,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State2#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -1187,42 +1185,39 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-incr_stats(QXCounts, Item, State = #ch{queue_exchange_stats = Stats}) ->
- Stats1 = lists:foldl(
- fun ({QX, Inc}, Stats0) ->
- QPid = case QX of
- {Q, _X} -> Q;
- Q -> Q
- end,
- case dict:is_key(QPid, Stats0) of
- false -> erlang:monitor(process, QPid);
- _ -> ok
- end,
- dict:update(QX,
- fun(D) ->
- Count = case orddict:find(Item, D) of
- error -> 0;
- {ok, C} -> C
- end,
- orddict:store(Item, Count + Inc, D)
- end,
- [],
- Stats0)
- end, Stats, QXCounts),
- State#ch{queue_exchange_stats = Stats1}.
-
-internal_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats}) ->
+incr_stats(QXIncs, Item) ->
+ [incr_stats(QX, Inc, Item) || {QX, Inc} <- QXIncs].
+
+incr_stats(QX, Inc, Item) ->
+ QPid = case QX of
+ {Q, _X} -> Q;
+ Q -> Q
+ end,
+ case get({monitoring, QPid}) of
+ undefined -> erlang:monitor(process, QPid),
+ put({monitoring, QPid}, true);
+ _ -> ok
+ end,
+ Measures = case get({queue_exchange_stats, QX}) of
+ undefined -> [];
+ D -> D
+ end,
+ Cur = case orddict:find(Item, Measures) of
+ error -> 0;
+ {ok, C} -> C
+ end,
+ put({queue_exchange_stats, QX}, orddict:store(Item, Cur + Inc, Measures)).
+
+internal_emit_stats(State) ->
rabbit_event:notify(
channel_stats,
- [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} |
+ [{queue_exchange_stats,
+ [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]} |
[{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]).
-erase_queue_stats(QPid, State = #ch{queue_exchange_stats = Stats}) ->
- Stats1 = dict:erase(QPid, Stats),
- Stats2 = dict:filter(fun (K, _V) ->
- case K of
- {QPid, _} -> false;
- _ -> true
- end
- end, Stats1),
- State#ch{queue_exchange_stats = Stats2}.
+erase_queue_stats(QPid) ->
+ erase({monitoring, QPid}),
+ erase({queue_exchange_stats, QPid}).
+ %% TODO make this work
+ %% [erase({queue_exchange_stats, QX}) ||
+ %% {{queue_exchange_stats, QX}, _} <- get(), {QPid, _} == QX].