summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-31 13:25:42 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-31 13:25:42 +0000
commit61398037deb3e03dce203ff05f9fab3c6e05ae52 (patch)
treef644159b4fbfaba1afda5287caa11fdc00e1bfb4
parent65efb0a3887f32fb77c3b6f60d0dfe7d51b6e1fd (diff)
downloadrabbitmq-server-61398037deb3e03dce203ff05f9fab3c6e05ae52.tar.gz
update exchange and queue_exchange stats on confirms
-rw-r--r--src/rabbit_channel.erl82
1 files changed, 52 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 08fcd768..697e3d7f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -50,7 +50,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, published_count, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ held_confirms, unconfirmed, queues_for_msg, exchange_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -197,6 +197,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
confirm_multiple = false,
held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ exchange_for_msg = dict:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
@@ -288,16 +289,19 @@ handle_cast(flush_multiple_acks,
{noreply, State#ch{held_confirms = gb_sets:new(),
confirm_tref = undefined}};
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNo, From},
+ State = #ch{exchange_for_msg = EFM}) ->
+ {ok, ExchangeName} = dict:find(MsgSeqNo, EFM),
+ {noreply, send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
- State = #ch{queues_for_msg = QFM}) ->
+ State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) ->
State1 = dict:fold(
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> send_or_enqueue_ack(Msg, QPid, State0);
+ 0 -> {ok, ExchangeName} = dict:find(Msg, EFM),
+ send_or_enqueue_ack(Msg, QPid, ExchangeName, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -462,12 +466,13 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-send_or_enqueue_ack(undefined, _QPid, State) ->
+send_or_enqueue_ack(undefined, _QPid, _EN, State) ->
State;
-send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) ->
State;
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- maybe_incr_stats([{channel_stats, 1}], confirm, State),
+send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
+ State = #ch{confirm_multiple = false}) ->
+ maybe_incr_confirm_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
@@ -475,17 +480,26 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
WriterPid, #'basic.ack'{delivery_tag = MSN}),
State1
end, State);
-send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- maybe_incr_stats([{channel_stats, 1}], confirm, State),
+send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
+ State = #ch{confirm_multiple = true}) ->
+ maybe_incr_confirm_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)})
end, State).
+maybe_incr_confirm_stats(QPid, ExchangeName, State) ->
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+ case QPid of
+ undefined -> ok;
+ _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State)
+ end.
+
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
+ State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM,
+ exchange_for_msg = EFM}) ->
%% clears references to MsgSeqNo and does ConfirmFun
case gb_sets:is_element(MsgSeqNo, UC) of
true ->
@@ -502,6 +516,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
State#ch{
queues_for_msg =
dict:erase(MsgSeqNo, QFM),
+ exchange_for_msg =
+ dict:erase(MsgSeqNo, EFM),
unconfirmed =
gb_sets:delete(MsgSeqNo, UC)});
_ -> State#ch{queues_for_msg =
@@ -557,6 +573,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
content = DecodedContent,
guid = rabbit_guid:guid(),
is_persistent = IsPersistent},
+ io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -565,7 +582,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> MsgSeqNo;
false -> undefined
end)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
+ IsPersistent,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1249,23 +1267,31 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) ->
+process_routing_result(unroutable, _QPids, ExchangeName, _Persistent, MsgSeqNo,
+ Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) ->
+ send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State);
+process_routing_result(not_delivered, _QPids, ExchangeName, _Persistent, MsgSeqNo,
+ Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, [], _, MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, _, _, undefined, _, State) ->
+ send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State);
+process_routing_result(routed, [], ExchangeName, _Persistent, MsgSeqNo,
+ _Msg, State) ->
+ send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State);
+process_routing_result(routed, _QPids, _EN, _Persistent, undefined,
+ _Msg, State) ->
State;
-process_routing_result(routed, _, false, MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, QPids, true, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
+process_routing_result(routed, _QPids, ExchangeName, false, MsgSeqNo,
+ _Msg, State) ->
+ send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State);
+process_routing_result(routed, QPids, ExchangeName, true, MsgSeqNo,
+ _Msg, State = #ch{queues_for_msg = QFM,
+ exchange_for_msg = EFM}) ->
+ EFM1 = dict:store(MsgSeqNo, ExchangeName, EFM),
+ io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, ExchangeName]),
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1335,8 +1361,6 @@ incr_stats({QPid, _} = QX, Inc, Measure) ->
incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
maybe_monitor(QPid),
update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(channel_stats, Inc, Measure) ->
- update_measures(channel_stats, self(), Inc, Measure);
incr_stats(X, Inc, Measure) ->
update_measures(exchange_stats, X, Inc, Measure).
@@ -1370,8 +1394,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
[{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
{channel_exchange_stats,
[{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_channel_stats,
- [Stats || {{channel_stats, _}, Stats} <- get()]},
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],