summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-02-17 14:38:16 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-02-17 14:38:16 +0000
commitec8d4bd1be3a59ae5319c4ea4933d415b53530b4 (patch)
treeb4935f1bc921954a14a2986aaf5b42272297bec8
parent821285f6a75b1bf46ad32ccc04feacc8fff4e049 (diff)
downloadrabbitmq-server-ec8d4bd1be3a59ae5319c4ea4933d415b53530b4.tar.gz
cosmetic
-rw-r--r--src/rabbit_channel.erl77
1 files changed, 39 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cb681661..ab570f2c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -285,11 +285,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
+ none -> []
end,
- %% We remove the MsgSeqNos from UQM before calling process_confirms to
- %% prevent each MsgSeqNo being removed from the set one by one which
- %% which would be inefficient
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
@@ -504,33 +504,34 @@ process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ,
{MXs, UMQ1, UQM1} =
lists:foldl(
fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- none -> Acc;
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
- State)
- end
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ none -> Acc;
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc,
+ State)
+ end
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) ->
%% these confirms will be emitted even when a queue dies, but that
%% should be fine, since the queue stats get erased immediately
maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
- UQM1 =
- case gb_trees:lookup(QPid, UQM) of
- {value, Msgs} ->
- Msgs1 = gb_sets:delete(MsgSeqNo, Msgs),
- case gb_sets:is_empty(Msgs1) of
- true -> gb_trees:delete(QPid, UQM);
- false -> gb_trees:update(QPid, Msgs1, UQM)
- end;
- none -> UQM
- end,
+ UQM1 = case gb_trees:lookup(QPid, UQM) of
+ {value, Msgs} ->
+ Msgs1 = gb_sets:delete(MsgSeqNo, Msgs),
+ case gb_sets:is_empty(Msgs1) of
+ true -> gb_trees:delete(QPid, UQM);
+ false -> gb_trees:update(QPid, Msgs1, UQM)
+ end;
+ none ->
+ UQM
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
case gb_sets:is_empty(Qs1) of
- true -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ),
- UQM1};
- false -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
+ true ->
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ false ->
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1270,16 +1271,16 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
#ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- UQM1 = lists:foldl(fun (QPid, UQM2) ->
- maybe_monitor(QPid),
- case gb_trees:lookup(QPid, UQM2) of
- {value, Msgs} ->
- Msgs1 = gb_sets:insert(MsgSeqNo, Msgs),
- gb_trees:update(QPid, Msgs1, UQM2);
- none -> gb_trees:insert(QPid, SingletonSet,
- UQM2)
- end
- end, UQM, QPids),
+ UQM1 = lists:foldl(fun (QPid, UQM2) ->
+ maybe_monitor(QPid),
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, Msgs} ->
+ Msgs1 = gb_sets:insert(MsgSeqNo, Msgs),
+ gb_trees:update(QPid, Msgs1, UQM2);
+ none ->
+ gb_trees:insert(QPid, SingletonSet, UQM2)
+ end
+ end, UQM, QPids),
State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1316,11 +1317,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed_mq = UC}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UMQ) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1347,8 +1348,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed_mq = UC}) ->
- gb_trees:size(UC);
+i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
+ gb_trees:size(UMQ);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);