summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-21 10:32:13 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-21 10:32:13 +0100
commita4d652ec66431782a6907609b7a87fd2aa77fca5 (patch)
tree81867bb8a46a0e91dee9d2ca2d600f7765118092
parent0cb2eb7b8754fac0078e8fb8db32f9f893c262a1 (diff)
downloadrabbitmq-server-a4d652ec66431782a6907609b7a87fd2aa77fca5.tar.gz
Store (channel, queue, exchange) stats instead of (channel, exchange). Unify the two stats dictionaries in the channel.
-rw-r--r--src/rabbit_channel.erl72
1 files changed, 38 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b1a67640..2ff337f8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, flow,
- exchange_stats, queue_stats, last_stats_update}).
+ queue_exchange_stats, last_stats_update}).
-record(flow, {server, client, pending}).
@@ -184,8 +184,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
pending = none},
- exchange_stats = dict:new(),
- queue_stats = dict:new(),
+ queue_exchange_stats = dict:new(),
last_stats_update = {0,0,0}},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -238,11 +237,11 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
{_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
- State2 = incr_queue_stats([{QPid, 1}],
- case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
+ State2 = incr_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
noreply(State2#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
@@ -273,7 +272,7 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
State1 = queue_blocked(QPid, State),
- State2 = erase_stats(QPid, State1),
+ State2 = erase_queue_stats(QPid, State1),
{noreply, State2}.
handle_pre_hibernate(State) ->
@@ -461,7 +460,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
- State1 = incr_exchange_stats([{ExchangeName, 1}], publish, State),
+ State1 = incr_stats(
+ [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish,
+ State),
{noreply, case TxnKey of
none -> State1;
_ -> add_tx_participants(DeliveredQPids, State1)
@@ -474,7 +475,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
QsCounts = ack(TxnKey, Acked),
Participants = [QPid || {QPid, _} <- QsCounts],
- State1 = incr_queue_stats(QsCounts, ack, State),
+ State1 = incr_stats(QsCounts, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
State1#ch{unacked_message_q = Remaining};
@@ -502,11 +503,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
- State2 = incr_queue_stats([{QPid, 1}],
- case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
+ State2 = incr_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -1161,18 +1162,16 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-incr_exchange_stats(XCounts, Item, State = #ch{exchange_stats = Stats}) ->
- State#ch{exchange_stats = incr_stats(XCounts, Item, Stats)}.
-
-incr_queue_stats(QCounts, Item, State = #ch{queue_stats = Stats}) ->
- State#ch{queue_stats = incr_stats(QCounts, Item, Stats)}.
-
-incr_stats(QXCounts, Item, Stats) ->
+incr_stats(QXCounts, Item, State = #ch{queue_exchange_stats = Stats}) ->
Stats1 = lists:foldl(
fun ({QX, Inc}, Stats0) ->
- case is_pid(QX) andalso not dict:is_key(QX, Stats) of
- true -> erlang:monitor(process, QX);
- _ -> ok
+ QPid = case QX of
+ {Q, _X} -> Q;
+ Q -> Q
+ end,
+ case dict:is_key(QPid, Stats) of
+ false -> erlang:monitor(process, QPid);
+ _ -> ok
end,
dict:update(QX,
fun(D) ->
@@ -1185,23 +1184,28 @@ incr_stats(QXCounts, Item, Stats) ->
[],
Stats0)
end, Stats, QXCounts),
- Stats1.
+ State#ch{queue_exchange_stats = Stats1}.
-maybe_emit_stats(State = #ch{exchange_stats = ExchangeStats,
- queue_stats = QueueStats,
+maybe_emit_stats(State = #ch{queue_exchange_stats = QueueExchangeStats,
last_stats_update = LastUpdate}) ->
Now = os:timestamp(),
case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of
true ->
rabbit_event:notify(
channel_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS] ++
- [{per_exchange_stats, dict:to_list(ExchangeStats)},
- {per_queue_stats, dict:to_list(QueueStats)}]),
+ [{queue_exchange_stats, dict:to_list(QueueExchangeStats)} |
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]),
State#ch{last_stats_update = Now};
_ ->
State
end.
-erase_stats(QPid, State = #ch{queue_stats = QueueStats}) ->
- State#ch{queue_stats = dict:erase(QPid, QueueStats)}.
+erase_queue_stats(QPid, State = #ch{queue_exchange_stats = Stats}) ->
+ Stats1 = dict:erase(QPid, Stats),
+ Stats2 = dict:filter(fun (K, _V) ->
+ case K of
+ {QPid, _} -> false;
+ _ -> true
+ end
+ end, Stats1),
+ State#ch{queue_exchange_stats = Stats2}.