summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl119
1 files changed, 40 insertions, 79 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 41fc173b..afece3f3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -51,8 +51,7 @@
ttl,
ttl_timer_ref,
publish_seqno,
- unconfirmed_mq,
- unconfirmed_qm,
+ unconfirmed,
delayed_stop,
queue_monitors,
dlx,
@@ -138,8 +137,7 @@ init(Q) ->
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
@@ -164,8 +162,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
expiry_timer_ref = undefined,
ttl = undefined,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
@@ -742,11 +739,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
end.
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed_mq = UMQ,
- dlx = DLX,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX}) ->
{ok, _, QPids} =
rabbit_basic:publish(
rabbit_basic:delivery(
@@ -755,20 +750,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
State2 = State1#q{publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
- cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
- _ -> State3 =
- lists:foldl(
- fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
- UQM1 = rabbit_misc:gb_trees_set_insert(
- QPid, MsgSeqNo, UQM),
- State0#q{unconfirmed_qm = UQM1}
- end, State2, QPids),
- noreply(State3#q{
- unconfirmed_mq =
- gb_trees:insert(
- MsgSeqNo, {gb_sets:from_list(QPids),
- AckTag}, UMQ)})
+ [] -> cleanup_after_confirm([AckTag], State2);
+ _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
+ noreply(State2#q{unconfirmed = UC1})
end.
monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
@@ -787,64 +771,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed_qm = UQM}) ->
+ unconfirmed = UC}) ->
case dict:find(QPid, QMons) of
error ->
noreply(State);
{ok, _} ->
rabbit_log:info("DLQ ~p (for ~s) died~n",
[QPid, rabbit_misc:rs(qname(State))]),
- State1 = State#q{queue_monitors = dict:erase(QPid, QMons)},
- case gb_trees:lookup(QPid, UQM) of
- none ->
- noreply(State1);
- {value, MsgSeqNosSet} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> rabbit_log:warning(
- "Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]);
- false -> ok
- end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1)
- end
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ case (MsgSeqNoAckTags =/= [] andalso
+ rabbit_misc:is_abnormal_termination(Reason)) of
+ true -> rabbit_log:warning("Dead queue lost ~p messages~n",
+ [length(MsgSeqNoAckTags)]);
+ false -> ok
+ end,
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = dict:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
-handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckTags1, UMQ3} =
- lists:foldl(
- fun (MsgSeqNo, {AckTags, UMQ1}) ->
- {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
- QPids1 = gb_sets:delete(QPid, QPids),
- case gb_sets:is_empty(QPids1) of
- true -> {[AckTag | AckTags],
- gb_trees:delete(MsgSeqNo, UMQ1)};
- false -> {AckTags, gb_trees:update(
- MsgSeqNo, {QPids1, AckTag}, UMQ1)}
- end
- end, {[], UMQ}, MsgSeqNos),
- {_Guids, BQS1} = BQ:ack(AckTags1, BQS),
- MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
- gb_sets:from_list(MsgSeqNos)),
- State1 = case gb_sets:is_empty(MsgSeqNos1) of
- false -> State#q{
- unconfirmed_qm =
- gb_trees:update(QPid, MsgSeqNos1, UQM)};
- true -> demonitor_queue(
- QPid, State#q{
- unconfirmed_qm =
- gb_trees:delete(QPid, UQM)})
- end,
- cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
- backing_queue_state = BQS1}).
-
stop_later(Reason, State) ->
stop_later(Reason, undefined, noreply, State).
-stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
- case {gb_trees:is_empty(UMQ), Reply} of
+stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) ->
+ case {dtree:is_empty(UC), Reply} of
{true, noreply} ->
{stop, Reason, State};
{true, _} ->
@@ -853,16 +804,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) ->
noreply(State#q{delayed_stop = {Reason, {From, Reply}}})
end.
-cleanup_after_confirm(State = #q{delayed_stop = DS,
- unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) andalso DS =/= undefined of
+cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
+ unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1 = State#q{backing_queue_state = BQS1},
+ case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
{_, {_, noreply}} -> ok;
{_, {From, Reply}} -> gen_server2:reply(From, Reply)
end,
{Reason, _} = DS,
- {stop, Reason, State};
- false -> noreply(State)
+ {stop, Reason, State1};
+ false -> noreply(State1)
end.
already_been_here(_Delivery, #q{dlx = undefined}) ->
@@ -1243,8 +1198,14 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State) ->
- handle_confirm(MsgSeqNos, QPid, State);
+handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
+ {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ State1 = case dtree:is_defined(QPid, UC1) of
+ false -> demonitor_queue(QPid, State);
+ true -> State
+ end,
+ cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State1#q{unconfirmed = UC1});
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);