summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-14 10:53:01 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-14 10:53:01 +0000
commit10b506a3a574638c5a177456df22a951f0981513 (patch)
treee3481d5e94846ed19f27b4f0ca1eeb4657463744
parentf5ef17aac0a739db3519ade1f9a71bdc0e42bdc8 (diff)
downloadrabbitmq-server-10b506a3a574638c5a177456df22a951f0981513.tar.gz
don't do confirm stats book-keeping unless stats are enabled
-rw-r--r--src/rabbit_channel.erl66
1 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8c91e717..6d10e7ea 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -287,33 +287,33 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-%% Add this case for when stats are disabled
-%% handle_cast({confirm, MsgSeqNos, From}, State) ->
-%% {noreply, confirm(MsgSeqNos, From, State)}.
handle_cast({confirm, MsgSeqNos, From},
- State = #ch{exchange_for_msg = EFM}) ->
- EMs =
- lists:foldl(
- fun(MsgSeqNo, EMs) ->
- case dict:find(MsgSeqNo, EFM) of
- {ok, XName} ->
- [{XName, MsgSeqNo} | EMs];
- _ ->
- EMs
- end
- end, [], MsgSeqNos),
- State1 =
- case lists:usort(EMs) of
- [{XName, MsgSeqNo} | EMs1] ->
+ State= #ch{exchange_for_msg = EFM, stats_timer = StatsTimer}) ->
+ case rabbit_event:stats_level(StatsTimer) of
+ fine ->
+ EMs =
lists:foldl(
- fun({ExchangeName, MsgSeqNosE}, State0) ->
- confirm(MsgSeqNosE, From, ExchangeName, State0)
- end, State,
- group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}]));
- [] ->
- State
- end,
- {noreply, State1}.
+ fun(MsgSeqNo, EMs) ->
+ case dict:find(MsgSeqNo, EFM) of
+ {ok, XName} -> [{XName, MsgSeqNo} | EMs];
+ _ -> EMs
+ end
+ end, [], MsgSeqNos),
+ {noreply,
+ case lists:usort(EMs) of
+ [{XName, MsgSeqNo} | EMs1] ->
+ lists:foldl(
+ fun({XName1, MsgSeqNosE}, State0) ->
+ confirm(MsgSeqNosE, From, XName1, State0)
+ end, State,
+ group_confirms_by_exchange(EMs1,
+ [{XName, [MsgSeqNo]}]));
+ [] ->
+ State
+ end};
+ _ ->
+ {noreply, confirm(MsgSeqNos, From, undefined, State)}
+ end.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) ->
@@ -1242,11 +1242,11 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, XName, MsgSeqNo, _, State) ->
- ok = basic_return(MsgSeqNo, State#ch.writer_pid, no_route),
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
send_confirms([MsgSeqNo], XName, State);
-process_routing_result(not_delivered, _, XName, MsgSeqNo, _, State) ->
- ok = basic_return(MsgSeqNo, State#ch.writer_pid, no_consumers),
+process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
send_confirms([MsgSeqNo], XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
send_confirms([MsgSeqNo], XName, State);
@@ -1258,7 +1258,11 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
[maybe_monitor(QPid) || QPid <- QPids],
State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids),
QFM),
- exchange_for_msg = dict:store(MsgSeqNo, XName, EFM),
+ exchange_for_msg = case XName of
+ undefined -> EFM;
+ _ -> dict:store(MsgSeqNo,
+ XName, EFM)
+ end,
unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1289,6 +1293,8 @@ send_confirms(Cs, XName,
[ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State).
+maybe_incr_confirm_exchange_stats_and_cleanup(_, undefined, State) ->
+ State;
maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State) ->
maybe_incr_stats([{XName, length(Cs)}], confirm, State),
lists:foldl(fun(MsgSeqNo, State0 = #ch{exchange_for_msg = EFM}) ->