diff options
author | Tim Fox <tim@rabbitmq.com> | 2011-02-16 17:01:43 +0000 |
---|---|---|
committer | Tim Fox <tim@rabbitmq.com> | 2011-02-16 17:01:43 +0000 |
commit | 0b2b3854a9f2a7480560b5200cf87d797e57119b (patch) | |
tree | 73b4778cdd36f357af8079f76f38130d4cd20f82 | |
parent | 5855919d941d049d56a372d1763112302fbece34 (diff) | |
download | rabbitmq-server-0b2b3854a9f2a7480560b5200cf87d797e57119b.tar.gz |
Applied Matthias recommendations from comment#2 of bugzilla with the exception of using gb_sets throughout (will contact Alexandru first)
-rw-r--r-- | src/rabbit_channel.erl | 86 |
1 files changed, 46 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f49dbd93..f69a49b1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,8 +34,8 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed, - queue_unconfirmed}). + confirm_enabled, publish_seqno, unconfirmed_mq, confirmed, + unconfirmed_qm}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -174,9 +174,9 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty(), + unconfirmed_mq = gb_trees:empty(), confirmed = [], - queue_unconfirmed = dict:new()}, + unconfirmed_qm = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -280,12 +280,16 @@ handle_info(timeout, State) -> noreply(State); handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{queue_unconfirmed = QU}) -> - MsgSeqNos = case dict:find(QPid, QU) of - {ok, MsgSet} -> gb_sets:to_list(MsgSet); + State = #ch{unconfirmed_qm = UQM}) -> + MsgSeqNos = case dict:find(QPid, UQM) of + {ok, MsgSet} -> gb_sets:to_list(MsgSet); error -> [] end, - {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + %% We remove the MsgSeqNos from UQM before calling process_confirms to + %% prevent each MsgSeqNo being removed from the set one by one which + %% which would be inefficient + NewState = State#ch{unconfirmed_qm = dict:erase(QPid, UQM)}, + {MXs, State1} = process_confirms(MsgSeqNos, QPid, NewState), erase_queue_stats(QPid), State2 = (case Reason of normal -> fun record_confirms/2; @@ -493,35 +497,35 @@ confirm(MsgSeqNos, QPid, State) -> {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), record_confirms(MXs, State1). -process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed = UC, queue_unconfirmed = QU}) -> - {MXs, UC1, QU1} = +process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> + {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0, _QU} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) - end - end, {[], UC, QU}, MsgSeqNos), - {MXs, State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}}. - -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC, QU}, State) -> + fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + none -> Acc; + {value, XQ} -> confirm_msg(MsgSeqNo, QPid, XQ, Acc, State) + end + end, {[], UMQ, UQM}, MsgSeqNos), + {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. + +confirm_msg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC, UQM}, State) -> Qs1 = sets:del_element(QPid, Qs), %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - - case dict:find(QPid, QU) of - {ok, Msgs} -> Msgs1 = gb_sets:delete(MsgSeqNo, Msgs), - case gb_sets:is_empty(Msgs1) of - true -> QU1 = dict:erase(QPid, QU); - false -> QU1 = dict:store(QPid, Msgs1, QU) - end; - _ -> QU1 = QU - end, - + UQM1 = + case dict:find(QPid, UQM) of + {ok, Msgs} -> Msgs1 = gb_sets:delete(MsgSeqNo, Msgs), + case gb_sets:is_empty(Msgs1) of + true -> dict:erase(QPid, UQM); + false -> dict:store(QPid, Msgs1, UQM) + end; + error -> UQM + end, case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC), QU1}; - _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC), QU1} + 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC), UQM1}; + _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC), UQM1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1258,14 +1262,16 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed = UC, queue_unconfirmed = QU} = State, - UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), - QU1 = lists:foldl(fun (QPid, QU2) -> + #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + UMQ1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UMQ), + SingletonSet = gb_sets:singleton(MsgSeqNo), + UQM1 = lists:foldl(fun (QPid, UQM2) -> maybe_monitor(QPid), - dict:update(QPid, fun (Msgs)-> gb_sets:add(MsgSeqNo, Msgs) end, - gb_sets:singleton(MsgSeqNo), QU2) - end, QU, QPids), - State#ch{unconfirmed = UC1, queue_unconfirmed = QU1}. + dict:update(QPid, fun (Msgs)-> gb_sets:add(MsgSeqNo, + Msgs) end, + SingletonSet, UQM2) + end, UQM, QPids), + State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1301,7 +1307,7 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + State = #ch{writer_pid = WriterPid, unconfirmed_mq = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; @@ -1332,7 +1338,7 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> +i(messages_unconfirmed, #ch{unconfirmed_mq = UC}) -> gb_trees:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> |