diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-13 22:28:28 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-13 22:28:28 +0000 |
commit | 86104054ea93ba9efcab2981e9546586b1eb527c (patch) | |
tree | c94ec79c441bd3e8429e3946a6f4429b36b06fe5 | |
parent | 2450a19f4341c7cea84960d7e2ef8fd26b6eeb07 (diff) | |
download | rabbitmq-server-86104054ea93ba9efcab2981e9546586b1eb527c.tar.gz |
combine 'unconfirmed' and queues_for_msgs
and permit coalescing of confirms on queue 'DOWN'
-rw-r--r-- | src/rabbit_channel.erl | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7b5f096b..94f0b562 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -186,8 +186,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_sets:new(), - queues_for_msg = dict:new()}, + unconfirmed = gb_trees:empty()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -287,16 +286,14 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{queues_for_msg = QFM}) -> - State1 = dict:fold( - fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> - Qs = sets:del_element(QPid, QPids), - case sets:size(Qs) of - 0 -> confirm([Msg], QPid, State0); - _ -> State0#ch{queues_for_msg = - dict:store(Msg, Qs, QFM0)} - end - end, State, QFM), + State = #ch{unconfirmed = UC}) -> + %% TODO: this does a complete scan and rebuild of the tree, which + %% is quite efficient. To do better we'd need to maintain a + %% secondary mapping, from QPids to MsgSeqNos. + {MsgSeqNos, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], gb_trees:empty()}), + State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -471,30 +468,36 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +remove_queue_unconfirmed(none, _QPid, Res) -> + Res; +remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, {MsgSeqNos, UC}) -> + Qs1 = sets:del_element(QPid, Qs), + remove_queue_unconfirmed( + gb_trees:next(Next), QPid, + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | MsgSeqNos], UC}; + _ -> {MsgSeqNos, gb_trees:insert(MsgSeqNo, Qs1, UC)} + end). + confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {DoneMessages, State1} = +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {DoneMessages, UC2} = lists:foldl( - fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, - queues_for_msg = QFM0}}) -> - case gb_sets:is_element(MsgSeqNo, UC0) of - false -> {DMs, State0}; - true -> Qs1 = sets:del_element( - QPid, dict:fetch(MsgSeqNo, QFM0)), - case sets:size(Qs1) of - 0 -> {[MsgSeqNo | DMs], - State0#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM0), - unconfirmed = - gb_sets:delete(MsgSeqNo, UC0)}}; - _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), - {DMs, State0#ch{queues_for_msg = QFM1}} - end + fun(MsgSeqNo, {DMs, UC0}) -> + case gb_trees:lookup(MsgSeqNo, UC0) of + none -> + {DMs, UC0}; + {value, Qs} -> + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | DMs], + gb_trees:delete(MsgSeqNo, UC0)}; + _ -> {DMs, gb_trees:update(MsgSeqNo, Qs1, UC0)} + end end - end, {[], State}, MsgSeqNos), - send_confirms(DoneMessages, State1). + end, {[], UC}, MsgSeqNos), + send_confirms(DoneMessages, State#ch{unconfirmed = UC2}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1215,10 +1218,10 @@ process_routing_result(routed, [], MsgSeqNo, _, State) -> process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, State) -> - #ch{queues_for_msg = QFM, unconfirmed = UC} = State, + #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), - unconfirmed = gb_sets:add(MsgSeqNo, UC)}. + UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1232,9 +1235,9 @@ send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> State; send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), - CutOff = case gb_sets:is_empty(UC) of + CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> gb_sets:smallest(UC) + false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of |