diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-18 10:59:31 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-18 10:59:31 +0000 |
commit | 0645b79f2db3d705513eda532a5923350452ba27 (patch) | |
tree | 56620c7001d29b252dfb2a8818dde7b4d2ea59ac | |
parent | c756d9590074d4f27f26cb0a91c85147a7e7ab12 (diff) | |
download | rabbitmq-server-0645b79f2db3d705513eda532a5923350452ba27.tar.gz |
stats for confirms work again
-rw-r--r-- | src/rabbit_channel.erl | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f2b74dd1..083a1313 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -301,12 +301,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, {[], UC}), State1 = case lists:usort(EMs) of [] -> State; - [{XName, [MsgSeqNo]} | EMs1] -> + [{XName, MsgSeqNo} | EMs1] -> EMs2 = group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}]), - lists:fold(fun({XName1, MsgSeqNos}, State0) -> - send_confirms(MsgSeqNos, XName1, State0) - end, State#ch{unconfirmed = UC1}, EMs2) + lists:foldl(fun({XName1, MsgSeqNos}, State0) -> + send_confirms(MsgSeqNos, XName1, State0) + end, State#ch{unconfirmed = UC1}, EMs2) end, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -494,7 +494,7 @@ group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> EMs = lists:foldl( fun(MsgSeqNo, EMs) -> case gb_trees:lookup(MsgSeqNo, UC) of - {value, {_, XName}} -> [{MsgSeqNo, XName} | EMs]; + {value, {_, XName}} -> [{XName, MsgSeqNo} | EMs]; none -> EMs end end, [], MsgSeqNos), @@ -519,23 +519,25 @@ group_confirms_by_exchange([{E, Msg1} | EMs], Acc) -> confirm([], _QPid, _XName, State) -> State; confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) -> - {{DoneMessages, UC1}, UniqueSeqNos} = + {{EMs, UC1}, UniqueSeqNos} = lists:foldl( fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) -> case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; + none -> {Acc, USN}; {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc), USN + 1} end end, {{[], UC}, 0}, MsgSeqNos), + DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs], maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State), send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MsgSeqNos, UC}) -> +remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) -> + %% remove QPid from MsgSeqNo's mapping Qs1 = sets:del_element(QPid, Qs), case sets:size(Qs1) of - 0 -> {[{XName, MsgSeqNo} | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} + 0 -> {[{XName, MsgSeqNo} | XMs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {XMs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1272,7 +1274,8 @@ send_confirms([], _, State) -> send_confirms([MsgSeqNo], XName, State = #ch{writer_pid = WriterPid}) -> send_confirm(MsgSeqNo, WriterPid), - maybe_incr_stats([{XName, 1}], confirm, State); + maybe_incr_stats([{XName, 1}], confirm, State), + State; send_confirms(Cs, XName, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), @@ -1288,7 +1291,8 @@ send_confirms(Cs, XName, multiple = true}) end, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], - maybe_incr_stats([{XName, length(Cs)}], confirm, State). + maybe_incr_stats([{XName, length(Cs)}], confirm, State), + State. send_confirm(undefined, _WriterPid) -> ok; @@ -1376,6 +1380,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], + io:format("~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. |