summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl102
1 files changed, 21 insertions, 81 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cac622f8..4a0e93be 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,8 +37,8 @@
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed,
+ confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -201,8 +201,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -548,45 +547,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
- record_confirms(MXs, State1).
-
-process_confirms(MsgSeqNos, QPid, Nack, State) ->
- lists:foldl(
- fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], State}, MsgSeqNos).
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
- {MXs, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}},
- Nack) ->
- State1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> UQM1 = gb_trees:delete(QPid, UQM),
- State#ch{unconfirmed_qm = UQM1};
- false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State#ch{unconfirmed_qm = UQM1}
- end;
- none ->
- State
- end,
- Qs1 = gb_sets:del_element(QPid, Qs),
- %% If QPid somehow died initiating a nack, clear the message from
- %% internal data-structures. Also, cleanup empty entries.
- case (Nack orelse gb_sets:is_empty(Qs1)) of
- true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
- {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
- false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
- {MXs, State1#ch{unconfirmed_mq = UMQ1}}
- end.
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1152,22 +1115,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
true -> State
end.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} =
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
- false -> {false, fun record_confirms/2}
- end,
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- SendFun(MXs, State2).
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(QPid, UC),
+ (case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> fun send_nacks/2;
+ false -> fun record_confirms/2
+ end)(MXs, State#ch{unconfirmed = UC1}).
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
@@ -1392,21 +1345,8 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ} = State,
- UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
- SingletonSet = gb_sets:singleton(MsgSeqNo),
- lists:foldl(
- fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State0#ch{unconfirmed_qm = UQM1};
- none ->
- UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- State0#ch{unconfirmed_qm = UQM1}
- end
- end, State#ch{unconfirmed_mq = UMQ1}, QPids).
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
@@ -1444,11 +1384,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UMQ) of
+ CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
+ false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1462,8 +1402,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
State;
-maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) of
+maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
+ case dtree:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -1491,8 +1431,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
- gb_trees:size(UMQ);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->