diff options
-rw-r--r-- | src/rabbit_channel.erl | 102 |
1 files changed, 21 insertions, 81 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cac622f8..4a0e93be 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,8 +37,8 @@ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + stats_timer, confirm_enabled, publish_seqno, unconfirmed, + confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, @@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State), - record_confirms(MXs, State1). - -process_confirms(MsgSeqNos, QPid, Nack, State) -> - lists:foldl( - fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) -> - case gb_trees:lookup(MsgSeqNo, UMQ0) of - {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, - Acc, Nack); - none -> Acc - end - end, {[], State}, MsgSeqNos). - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, - {MXs, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}}, - Nack) -> - State1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> UQM1 = gb_trees:delete(QPid, UQM), - State#ch{unconfirmed_qm = UQM1}; - false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State#ch{unconfirmed_qm = UQM1} - end; - none -> - State - end, - Qs1 = gb_sets:del_element(QPid, Qs), - %% If QPid somehow died initiating a nack, clear the message from - %% internal data-structures. Also, cleanup empty entries. - case (Nack orelse gb_sets:is_empty(Qs1)) of - true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ), - {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}}; - false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), - {MXs, State1#ch{unconfirmed_mq = UMQ1}} - end. +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1152,22 +1115,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> true -> State end. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> - MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] - end, - %% We remove the MsgSeqNos from UQM before calling - %% process_confirms to prevent each MsgSeqNo being removed from - %% the set one by one which which would be inefficient - State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, - {Nack, SendFun} = - case rabbit_misc:is_abnormal_termination(Reason) of - true -> {true, fun send_nacks/2}; - false -> {false, fun record_confirms/2} - end, - {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), - SendFun(MXs, State2). +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(QPid, UC), + (case rabbit_misc:is_abnormal_termination(Reason) of + true -> fun send_nacks/2; + false -> fun record_confirms/2 + end)(MXs, State#ch{unconfirmed = UC1}). handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, @@ -1392,21 +1345,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ} = State, - UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), - SingletonSet = gb_sets:singleton(MsgSeqNo), - lists:foldl( - fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) -> - case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), - State0#ch{unconfirmed_qm = UQM1}; - none -> - UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - State0#ch{unconfirmed_qm = UQM1} - end - end, State#ch{unconfirmed_mq = UMQ1}, QPids). + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, + State#ch.unconfirmed)}. send_nacks([], State) -> State; @@ -1444,11 +1384,11 @@ send_confirms(Cs, State) -> end, State). coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) -> + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), - CutOff = case gb_trees:is_empty(UMQ) of + CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; - false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo + false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of @@ -1462,8 +1402,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, maybe_complete_tx(State = #ch{tx_status = in_progress}) -> State; -maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) of +maybe_complete_tx(State = #ch{unconfirmed = UC}) -> + case dtree:is_empty(UC) of false -> State; true -> complete_tx(State#ch{confirmed = []}) end. @@ -1491,8 +1431,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> - gb_trees:size(UMQ); +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> + dtree:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> |