diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-14 10:53:01 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-14 10:53:01 +0000 |
commit | 10b506a3a574638c5a177456df22a951f0981513 (patch) | |
tree | e3481d5e94846ed19f27b4f0ca1eeb4657463744 | |
parent | f5ef17aac0a739db3519ade1f9a71bdc0e42bdc8 (diff) | |
download | rabbitmq-server-10b506a3a574638c5a177456df22a951f0981513.tar.gz |
don't do confirm stats book-keeping unless stats are enabled
-rw-r--r-- | src/rabbit_channel.erl | 66 |
1 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8c91e717..6d10e7ea 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -287,33 +287,33 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -%% Add this case for when stats are disabled -%% handle_cast({confirm, MsgSeqNos, From}, State) -> -%% {noreply, confirm(MsgSeqNos, From, State)}. handle_cast({confirm, MsgSeqNos, From}, - State = #ch{exchange_for_msg = EFM}) -> - EMs = - lists:foldl( - fun(MsgSeqNo, EMs) -> - case dict:find(MsgSeqNo, EFM) of - {ok, XName} -> - [{XName, MsgSeqNo} | EMs]; - _ -> - EMs - end - end, [], MsgSeqNos), - State1 = - case lists:usort(EMs) of - [{XName, MsgSeqNo} | EMs1] -> + State= #ch{exchange_for_msg = EFM, stats_timer = StatsTimer}) -> + case rabbit_event:stats_level(StatsTimer) of + fine -> + EMs = lists:foldl( - fun({ExchangeName, MsgSeqNosE}, State0) -> - confirm(MsgSeqNosE, From, ExchangeName, State0) - end, State, - group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}])); - [] -> - State - end, - {noreply, State1}. + fun(MsgSeqNo, EMs) -> + case dict:find(MsgSeqNo, EFM) of + {ok, XName} -> [{XName, MsgSeqNo} | EMs]; + _ -> EMs + end + end, [], MsgSeqNos), + {noreply, + case lists:usort(EMs) of + [{XName, MsgSeqNo} | EMs1] -> + lists:foldl( + fun({XName1, MsgSeqNosE}, State0) -> + confirm(MsgSeqNosE, From, XName1, State0) + end, State, + group_confirms_by_exchange(EMs1, + [{XName, [MsgSeqNo]}])); + [] -> + State + end}; + _ -> + {noreply, confirm(MsgSeqNos, From, undefined, State)} + end. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> @@ -1242,11 +1242,11 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, XName, MsgSeqNo, _, State) -> - ok = basic_return(MsgSeqNo, State#ch.writer_pid, no_route), +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), send_confirms([MsgSeqNo], XName, State); -process_routing_result(not_delivered, _, XName, MsgSeqNo, _, State) -> - ok = basic_return(MsgSeqNo, State#ch.writer_pid, no_consumers), +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), send_confirms([MsgSeqNo], XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> send_confirms([MsgSeqNo], XName, State); @@ -1258,7 +1258,11 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> [maybe_monitor(QPid) || QPid <- QPids], State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), - exchange_for_msg = dict:store(MsgSeqNo, XName, EFM), + exchange_for_msg = case XName of + undefined -> EFM; + _ -> dict:store(MsgSeqNo, + XName, EFM) + end, unconfirmed = gb_sets:add(MsgSeqNo, UC)}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1289,6 +1293,8 @@ send_confirms(Cs, XName, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State). +maybe_incr_confirm_exchange_stats_and_cleanup(_, undefined, State) -> + State; maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State) -> maybe_incr_stats([{XName, length(Cs)}], confirm, State), lists:foldl(fun(MsgSeqNo, State0 = #ch{exchange_for_msg = EFM}) -> |