diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-02-17 14:38:16 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-02-17 14:38:16 +0000 |
commit | ec8d4bd1be3a59ae5319c4ea4933d415b53530b4 (patch) | |
tree | b4935f1bc921954a14a2986aaf5b42272297bec8 | |
parent | 821285f6a75b1bf46ad32ccc04feacc8fff4e049 (diff) | |
download | rabbitmq-server-ec8d4bd1be3a59ae5319c4ea4933d415b53530b4.tar.gz |
cosmetic
-rw-r--r-- | src/rabbit_channel.erl | 77 |
1 files changed, 39 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cb681661..ab570f2c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -285,11 +285,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] + none -> [] end, - %% We remove the MsgSeqNos from UQM before calling process_confirms to - %% prevent each MsgSeqNo being removed from the set one by one which - %% which would be inefficient + %% We remove the MsgSeqNos from UQM before calling + %% process_confirms to prevent each MsgSeqNo being removed from + %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), @@ -504,33 +504,34 @@ process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, {MXs, UMQ1, UQM1} = lists:foldl( fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - none -> Acc; - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, - State) - end + case gb_trees:lookup(MsgSeqNo, UMQ0) of + none -> Acc; + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, + State) + end end, {[], UMQ, UQM}, MsgSeqNos), {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - UQM1 = - case gb_trees:lookup(QPid, UQM) of - {value, Msgs} -> - Msgs1 = gb_sets:delete(MsgSeqNo, Msgs), - case gb_sets:is_empty(Msgs1) of - true -> gb_trees:delete(QPid, UQM); - false -> gb_trees:update(QPid, Msgs1, UQM) - end; - none -> UQM - end, + UQM1 = case gb_trees:lookup(QPid, UQM) of + {value, Msgs} -> + Msgs1 = gb_sets:delete(MsgSeqNo, Msgs), + case gb_sets:is_empty(Msgs1) of + true -> gb_trees:delete(QPid, UQM); + false -> gb_trees:update(QPid, Msgs1, UQM) + end; + none -> + UQM + end, Qs1 = gb_sets:del_element(QPid, Qs), case gb_sets:is_empty(Qs1) of - true -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), - UQM1}; - false -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} + true -> + {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + false -> + {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1270,16 +1271,16 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), SingletonSet = gb_sets:singleton(MsgSeqNo), - UQM1 = lists:foldl(fun (QPid, UQM2) -> - maybe_monitor(QPid), - case gb_trees:lookup(QPid, UQM2) of - {value, Msgs} -> - Msgs1 = gb_sets:insert(MsgSeqNo, Msgs), - gb_trees:update(QPid, Msgs1, UQM2); - none -> gb_trees:insert(QPid, SingletonSet, - UQM2) - end - end, UQM, QPids), + UQM1 = lists:foldl(fun (QPid, UQM2) -> + maybe_monitor(QPid), + case gb_trees:lookup(QPid, UQM2) of + {value, Msgs} -> + Msgs1 = gb_sets:insert(MsgSeqNo, Msgs), + gb_trees:update(QPid, Msgs1, UQM2); + none -> + gb_trees:insert(QPid, SingletonSet, UQM2) + end + end, UQM, QPids), State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1316,11 +1317,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed_mq = UC}) -> + State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UC) of + CutOff = case gb_trees:is_empty(UMQ) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1347,8 +1348,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed_mq = UC}) -> - gb_trees:size(UC); +i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> + gb_trees:size(UMQ); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); |