diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-02-17 19:16:57 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-02-17 19:16:57 +0000 |
commit | 8d63498af6e7ddc305c0bec040aca4ac17fd9663 (patch) | |
tree | c160d421039a19aba2fde639c75e3e50ee131153 | |
parent | f3139c0e04b8056f8cc152aa9795ad522c90e882 (diff) | |
parent | a79a11b7e9821ed06c77b1da9bed56e9f4d4b0e9 (diff) | |
download | rabbitmq-server-8d63498af6e7ddc305c0bec040aca4ac17fd9663.tar.gz |
merge bug23850 into default
-rw-r--r-- | src/rabbit_channel.erl | 114 |
1 files changed, 70 insertions, 44 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a6790b6c..4fd7e7e4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,8 +34,8 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, confirmed, - capabilities}). + confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, + confirmed, capabilities}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -175,7 +175,8 @@ init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_trees:empty(), + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), confirmed = [], capabilities = Capabilities}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -281,19 +282,22 @@ handle_info(timeout, State) -> noreply(State); handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed = UC}) -> - %% TODO: this does a complete scan and partial rebuild of the - %% tree, which is quite efficient. To do better we'd need to - %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MXs, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}, State), + State = #ch{unconfirmed_qm = UQM}) -> + MsgSeqNos = case gb_trees:lookup(QPid, UQM) of + {value, MsgSet} -> gb_sets:to_list(MsgSet); + none -> [] + end, + %% We remove the MsgSeqNos from UQM before calling + %% process_confirms to prevent each MsgSeqNo being removed from + %% the set one by one which which would be inefficient + State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, + {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), - State1 = case Reason of - normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) - end, - noreply(queue_blocked(QPid, State1)). + State3 = (case Reason of + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 + end)(MXs, State2), + noreply(queue_blocked(QPid, State3)). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -479,13 +483,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QPid, Acc, _State) -> - Acc; -remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> - remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), - State). - record_confirm(undefined, _, State) -> State; record_confirm(MsgSeqNo, XName, State) -> @@ -498,25 +495,43 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = +confirm(MsgSeqNos, QPid, State) -> + {MXs, State1} = process_confirms(MsgSeqNos, QPid, State), + record_confirms(MXs, State1). + +process_confirms(MsgSeqNos, QPid, State = #ch{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM}) -> + {MXs, UMQ1, UQM1} = lists:foldl( - fun(MsgSeqNo, {_DMs, UC0} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) + fun(MsgSeqNo, {_DMs, UMQ0, _UQM} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UMQ0) of + {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, + State); + none -> Acc end - end, {[], UC}, MsgSeqNos), - record_confirms(MXs, State#ch{unconfirmed = UC1}). + end, {[], UMQ, UQM}, MsgSeqNos), + {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> - Qs1 = sets:del_element(QPid, Qs), +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, State) -> %% these confirms will be emitted even when a queue dies, but that %% should be fine, since the queue stats get erased immediately maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), - case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} + UQM1 = case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> gb_trees:delete(QPid, UQM); + false -> gb_trees:update(QPid, MsgSeqNos1, UQM) + end; + none -> + UQM + end, + Qs1 = gb_sets:del_element(QPid, Qs), + case gb_sets:is_empty(Qs1) of + true -> + {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + false -> + {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1253,10 +1268,21 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed = UC} = State, - [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), - State#ch{unconfirmed = UC1}. + #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), + SingletonSet = gb_sets:singleton(MsgSeqNo), + UQM1 = lists:foldl( + fun (QPid, UQM2) -> + maybe_monitor(QPid), + case gb_trees:lookup(QPid, UQM2) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), + gb_trees:update(QPid, MsgSeqNos1, UQM2); + none -> + gb_trees:insert(QPid, SingletonSet, UQM2) + end + end, UQM, QPids), + State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1292,11 +1318,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UC) of + CutOff = case gb_trees:is_empty(UMQ) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1323,8 +1349,8 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed = UC}) -> - gb_trees:size(UC); +i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> + gb_trees:size(UMQ); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); |