summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-18 11:51:55 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-18 11:51:55 +0000
commit83ebec1bcf772c48c786e354a0c03db16d664706 (patch)
tree2732b04377575964d2015becfbd4edfe6245f3a6
parent0645b79f2db3d705513eda532a5923350452ba27 (diff)
downloadrabbitmq-server-83ebec1bcf772c48c786e354a0c03db16d664706.tar.gz
merge duplicate code
-rw-r--r--src/rabbit_channel.erl57
1 files changed, 24 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 083a1313..90fe230d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -288,7 +288,7 @@ handle_cast({confirm, MsgSeqNos, From},
State= #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)};
- _ -> {noreply, confirm(MsgSeqNos, From, undefined, State)}
+ _ -> {noreply, nogroup_confirm(MsgSeqNos, From, State)}
end.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
@@ -299,15 +299,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
{EMs, UC1} = remove_queue_unconfirmed(
gb_trees:next(gb_trees:iterator(UC)), QPid,
{[], UC}),
- State1 = case lists:usort(EMs) of
- [] -> State;
- [{XName, MsgSeqNo} | EMs1] ->
- EMs2 = group_confirms_by_exchange(EMs1,
- [{XName, [MsgSeqNo]}]),
- lists:foldl(fun({XName1, MsgSeqNos}, State0) ->
- send_confirms(MsgSeqNos, XName1, State0)
- end, State#ch{unconfirmed = UC1}, EMs2)
- end,
+ State1 = confirm_grouped(EMs, State#ch{unconfirmed = UC1}),
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
@@ -490,19 +482,17 @@ remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) ->
group_and_confirm([], _QPid, State) ->
State;
-group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- EMs = lists:foldl(
- fun(MsgSeqNo, EMs) ->
- case gb_trees:lookup(MsgSeqNo, UC) of
- {value, {_, XName}} -> [{XName, MsgSeqNo} | EMs];
- none -> EMs
- end
- end, [], MsgSeqNos),
+group_and_confirm(MsgSeqNos, QPid, State) ->
+ {EMs, UC1} =
+ take_from_unconfirmed(MsgSeqNos, QPid, State),
+ confirm_grouped(EMs, State#ch{unconfirmed=UC1}).
+
+confirm_grouped(EMs, State) ->
case lists:usort(EMs) of
[{XName, MsgSeqNo} | EMs1] ->
lists:foldl(
fun({XName1, MsgSeqNosE}, State0) ->
- confirm(MsgSeqNosE, QPid, XName1, State0)
+ send_confirms(MsgSeqNosE, XName1, State0)
end, State,
group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}]));
[] ->
@@ -516,21 +506,23 @@ group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) ->
group_confirms_by_exchange([{E, Msg1} | EMs], Acc) ->
group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]).
-confirm([], _QPid, _XName, State) ->
+nogroup_confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) ->
- {{EMs, UC1}, UniqueSeqNos} =
- lists:foldl(
- fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> {Acc, USN};
- {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc),
- USN + 1}
- end
- end, {{[], UC}, 0}, MsgSeqNos),
+nogroup_confirm(MsgSeqNos, QPid, State) ->
+ {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State),
DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs],
- maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State),
- send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}).
+ send_confirms(DoneMessages, undefined, State#ch{unconfirmed = UC1}).
+
+take_from_unconfirmed(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ lists:foldl(
+ fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UC0) of
+ none -> Acc;
+ {value, {_, XName} = QX} ->
+ maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
+ remove_qmsg(MsgSeqNo, QPid, QX, Acc)
+ end
+ end, {[], UC}, MsgSeqNos).
remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) ->
%% remove QPid from MsgSeqNo's mapping
@@ -1380,7 +1372,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
- io:format("~p~n", [Extra ++ CoarseStats ++ FineStats]),
rabbit_event:notify(channel_stats,
Extra ++ CoarseStats ++ FineStats)
end.