summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-18 10:59:31 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-18 10:59:31 +0000
commit0645b79f2db3d705513eda532a5923350452ba27 (patch)
tree56620c7001d29b252dfb2a8818dde7b4d2ea59ac
parentc756d9590074d4f27f26cb0a91c85147a7e7ab12 (diff)
downloadrabbitmq-server-0645b79f2db3d705513eda532a5923350452ba27.tar.gz
stats for confirms work again
-rw-r--r--src/rabbit_channel.erl29
1 files changed, 17 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f2b74dd1..083a1313 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -301,12 +301,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
{[], UC}),
State1 = case lists:usort(EMs) of
[] -> State;
- [{XName, [MsgSeqNo]} | EMs1] ->
+ [{XName, MsgSeqNo} | EMs1] ->
EMs2 = group_confirms_by_exchange(EMs1,
[{XName, [MsgSeqNo]}]),
- lists:fold(fun({XName1, MsgSeqNos}, State0) ->
- send_confirms(MsgSeqNos, XName1, State0)
- end, State#ch{unconfirmed = UC1}, EMs2)
+ lists:foldl(fun({XName1, MsgSeqNos}, State0) ->
+ send_confirms(MsgSeqNos, XName1, State0)
+ end, State#ch{unconfirmed = UC1}, EMs2)
end,
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
@@ -494,7 +494,7 @@ group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
EMs = lists:foldl(
fun(MsgSeqNo, EMs) ->
case gb_trees:lookup(MsgSeqNo, UC) of
- {value, {_, XName}} -> [{MsgSeqNo, XName} | EMs];
+ {value, {_, XName}} -> [{XName, MsgSeqNo} | EMs];
none -> EMs
end
end, [], MsgSeqNos),
@@ -519,23 +519,25 @@ group_confirms_by_exchange([{E, Msg1} | EMs], Acc) ->
confirm([], _QPid, _XName, State) ->
State;
confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) ->
- {{DoneMessages, UC1}, UniqueSeqNos} =
+ {{EMs, UC1}, UniqueSeqNos} =
lists:foldl(
fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
+ none -> {Acc, USN};
{value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc),
USN + 1}
end
end, {{[], UC}, 0}, MsgSeqNos),
+ DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs],
maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State),
send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}).
-remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MsgSeqNos, UC}) ->
+remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) ->
+ %% remove QPid from MsgSeqNo's mapping
Qs1 = sets:del_element(QPid, Qs),
case sets:size(Qs1) of
- 0 -> {[{XName, MsgSeqNo} | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)}
+ 0 -> {[{XName, MsgSeqNo} | XMs], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {XMs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1272,7 +1274,8 @@ send_confirms([], _, State) ->
send_confirms([MsgSeqNo], XName,
State = #ch{writer_pid = WriterPid}) ->
send_confirm(MsgSeqNo, WriterPid),
- maybe_incr_stats([{XName, 1}], confirm, State);
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ State;
send_confirms(Cs, XName,
State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
@@ -1288,7 +1291,8 @@ send_confirms(Cs, XName,
multiple = true})
end,
[ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
- maybe_incr_stats([{XName, length(Cs)}], confirm, State).
+ maybe_incr_stats([{XName, length(Cs)}], confirm, State),
+ State.
send_confirm(undefined, _WriterPid) ->
ok;
@@ -1376,6 +1380,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
+ io:format("~p~n", [Extra ++ CoarseStats ++ FineStats]),
rabbit_event:notify(channel_stats,
Extra ++ CoarseStats ++ FineStats)
end.