diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-18 11:51:55 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-18 11:51:55 +0000 |
commit | 83ebec1bcf772c48c786e354a0c03db16d664706 (patch) | |
tree | 2732b04377575964d2015becfbd4edfe6245f3a6 | |
parent | 0645b79f2db3d705513eda532a5923350452ba27 (diff) | |
download | rabbitmq-server-83ebec1bcf772c48c786e354a0c03db16d664706.tar.gz |
merge duplicate code
-rw-r--r-- | src/rabbit_channel.erl | 57 |
1 files changed, 24 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 083a1313..90fe230d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -288,7 +288,7 @@ handle_cast({confirm, MsgSeqNos, From}, State= #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)}; - _ -> {noreply, confirm(MsgSeqNos, From, undefined, State)} + _ -> {noreply, nogroup_confirm(MsgSeqNos, From, State)} end. handle_info({'DOWN', _MRef, process, QPid, _Reason}, @@ -299,15 +299,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, {EMs, UC1} = remove_queue_unconfirmed( gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}), - State1 = case lists:usort(EMs) of - [] -> State; - [{XName, MsgSeqNo} | EMs1] -> - EMs2 = group_confirms_by_exchange(EMs1, - [{XName, [MsgSeqNo]}]), - lists:foldl(fun({XName1, MsgSeqNos}, State0) -> - send_confirms(MsgSeqNos, XName1, State0) - end, State#ch{unconfirmed = UC1}, EMs2) - end, + State1 = confirm_grouped(EMs, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -490,19 +482,17 @@ remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) -> group_and_confirm([], _QPid, State) -> State; -group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - EMs = lists:foldl( - fun(MsgSeqNo, EMs) -> - case gb_trees:lookup(MsgSeqNo, UC) of - {value, {_, XName}} -> [{XName, MsgSeqNo} | EMs]; - none -> EMs - end - end, [], MsgSeqNos), +group_and_confirm(MsgSeqNos, QPid, State) -> + {EMs, UC1} = + take_from_unconfirmed(MsgSeqNos, QPid, State), + confirm_grouped(EMs, State#ch{unconfirmed=UC1}). + +confirm_grouped(EMs, State) -> case lists:usort(EMs) of [{XName, MsgSeqNo} | EMs1] -> lists:foldl( fun({XName1, MsgSeqNosE}, State0) -> - confirm(MsgSeqNosE, QPid, XName1, State0) + send_confirms(MsgSeqNosE, XName1, State0) end, State, group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}])); [] -> @@ -516,21 +506,23 @@ group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) -> group_confirms_by_exchange([{E, Msg1} | EMs], Acc) -> group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]). -confirm([], _QPid, _XName, State) -> +nogroup_confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) -> - {{EMs, UC1}, UniqueSeqNos} = - lists:foldl( - fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> {Acc, USN}; - {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc), - USN + 1} - end - end, {{[], UC}, 0}, MsgSeqNos), +nogroup_confirm(MsgSeqNos, QPid, State) -> + {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State), DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs], - maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State), - send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}). + send_confirms(DoneMessages, undefined, State#ch{unconfirmed = UC1}). + +take_from_unconfirmed(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + lists:foldl( + fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UC0) of + none -> Acc; + {value, {_, XName} = QX} -> + maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), + remove_qmsg(MsgSeqNo, QPid, QX, Acc) + end + end, {[], UC}, MsgSeqNos). remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) -> %% remove QPid from MsgSeqNo's mapping @@ -1380,7 +1372,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], - io:format("~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. |