diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-31 13:25:42 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-31 13:25:42 +0000 |
commit | 61398037deb3e03dce203ff05f9fab3c6e05ae52 (patch) | |
tree | f644159b4fbfaba1afda5287caa11fdc00e1bfb4 | |
parent | 65efb0a3887f32fb77c3b6f60d0dfe7d51b6e1fd (diff) | |
download | rabbitmq-server-61398037deb3e03dce203ff05f9fab3c6e05ae52.tar.gz |
update exchange and queue_exchange stats on confirms
-rw-r--r-- | src/rabbit_channel.erl | 82 |
1 files changed, 52 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 08fcd768..697e3d7f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm_enabled, published_count, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + held_confirms, unconfirmed, queues_for_msg, exchange_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -197,6 +197,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, confirm_multiple = false, held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), + exchange_for_msg = dict:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, @@ -288,16 +289,19 @@ handle_cast(flush_multiple_acks, {noreply, State#ch{held_confirms = gb_sets:new(), confirm_tref = undefined}}; -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNo, From}, + State = #ch{exchange_for_msg = EFM}) -> + {ok, ExchangeName} = dict:find(MsgSeqNo, EFM), + {noreply, send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{queues_for_msg = QFM}) -> + State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> State1 = dict:fold( fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> send_or_enqueue_ack(Msg, QPid, State0); + 0 -> {ok, ExchangeName} = dict:find(Msg, EFM), + send_or_enqueue_ack(Msg, QPid, ExchangeName, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -462,12 +466,13 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, _QPid, State) -> +send_or_enqueue_ack(undefined, _QPid, _EN, State) -> State; -send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - maybe_incr_stats([{channel_stats, 1}], confirm, State), +send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, + State = #ch{confirm_multiple = false}) -> + maybe_incr_confirm_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> @@ -475,17 +480,26 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> WriterPid, #'basic.ack'{delivery_tag = MSN}), State1 end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - maybe_incr_stats([{channel_stats, 1}], confirm, State), +send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, + State = #ch{confirm_multiple = true}) -> + maybe_incr_confirm_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). +maybe_incr_confirm_stats(QPid, ExchangeName, State) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State), + case QPid of + undefined -> ok; + _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State) + end. + do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> + State = #ch{unconfirmed = UC, + queues_for_msg = QFM, + exchange_for_msg = EFM}) -> %% clears references to MsgSeqNo and does ConfirmFun case gb_sets:is_element(MsgSeqNo, UC) of true -> @@ -502,6 +516,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, State#ch{ queues_for_msg = dict:erase(MsgSeqNo, QFM), + exchange_for_msg = + dict:erase(MsgSeqNo, EFM), unconfirmed = gb_sets:delete(MsgSeqNo, UC)}); _ -> State#ch{queues_for_msg = @@ -557,6 +573,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, guid = rabbit_guid:guid(), is_persistent = IsPersistent}, + io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -565,7 +582,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> MsgSeqNo; false -> undefined end)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, + IsPersistent, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1249,23 +1267,31 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> +process_routing_result(unroutable, _QPids, ExchangeName, _Persistent, MsgSeqNo, + Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(not_delivered, _QPids, ExchangeName, _Persistent, MsgSeqNo, + Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, [], _, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, _, _, undefined, _, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(routed, [], ExchangeName, _Persistent, MsgSeqNo, + _Msg, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(routed, _QPids, _EN, _Persistent, undefined, + _Msg, State) -> State; -process_routing_result(routed, _, false, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, QPids, true, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> +process_routing_result(routed, _QPids, ExchangeName, false, MsgSeqNo, + _Msg, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(routed, QPids, ExchangeName, true, MsgSeqNo, + _Msg, State = #ch{queues_for_msg = QFM, + exchange_for_msg = EFM}) -> + EFM1 = dict:store(MsgSeqNo, ExchangeName, EFM), + io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, ExchangeName]), QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1335,8 +1361,6 @@ incr_stats({QPid, _} = QX, Inc, Measure) -> incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> maybe_monitor(QPid), update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(channel_stats, Inc, Measure) -> - update_measures(channel_stats, self(), Inc, Measure); incr_stats(X, Inc, Measure) -> update_measures(exchange_stats, X, Inc, Measure). @@ -1370,8 +1394,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, {channel_exchange_stats, [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_channel_stats, - [Stats || {{channel_stats, _}, Stats} <- get()]}, {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], |