summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl130
1 files changed, 24 insertions, 106 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b016c4d2..3712a625 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -49,10 +49,6 @@
ttl_timer_ref,
ttl_timer_expiry,
senders,
- publish_seqno,
- unconfirmed,
- delayed_stop,
- queue_monitors,
dlx,
dlx_routing_key,
max_length,
@@ -151,9 +147,6 @@ init_state(Q) ->
has_had_consumers = false,
active_consumers = queue:new(),
senders = pmon:new(),
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
@@ -820,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
State1.
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
- publish_seqno = SeqNo0,
- unconfirmed = UC0,
- queue_monitors = QMons0,
backing_queue_state = BQS,
backing_queue = BQ}) ->
QName = qname(State),
- {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
- Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
- case dead_letter_publish(Msg, Reason,
- X, RK, SeqNo, QName) of
- [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
- QPids -> {AckImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
- pmon:monitor_all(QPids, QMons)}
- end
- end, {[], SeqNo0, UC0, QMons0}, BQS),
- {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
- {Res, State#q{publish_seqno = SeqNo1,
- unconfirmed = UC1,
- queue_monitors = QMons1,
- backing_queue_state = BQS2}}.
-
-dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ {Res, Acks1, BQS1} =
+ Fun(fun (Msg, AckTag, Acks) ->
+ dead_letter_publish(Msg, Reason, X, RK, QName),
+ [AckTag | Acks]
+ end, [], BQS),
+ {_Guids, BQS2} = BQ:ack(Acks1, BQS1),
+ {Res, State#q{backing_queue_state = BQS2}}.
+
+dead_letter_publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(
- rabbit_amqqueue:lookup(Queues), Delivery),
- DeliveredQPids.
-
-handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
- unconfirmed = UC}) ->
- case pmon:is_monitored(QPid, QMons) of
- false -> noreply(State);
- true -> case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
- QNameS = rabbit_misc:rs(qname(State)),
- rabbit_log:warning("DLQ ~p for ~s died with "
- "~p unconfirmed messages~n",
- [QPid, QNameS, length(Lost)]);
- false -> ok
- end,
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = pmon:erase(QPid, QMons),
- unconfirmed = UC1})
- end.
+ rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
-stop(State) -> stop(undefined, noreply, State).
+stop(State) -> stop(noreply, State).
-stop(From, Reply, State = #q{unconfirmed = UC}) ->
- case {dtree:is_empty(UC), Reply} of
- {true, noreply} -> {stop, normal, State};
- {true, _} -> {stop, normal, Reply, State};
- {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
- end.
+stop(noreply, State) -> {stop, normal, State};
+stop(Reply, State) -> {stop, normal, Reply, State}.
-cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
- unconfirmed = UC,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case dtree:is_empty(UC) andalso DS =/= undefined of
- true -> case DS of
- {_, noreply} -> ok;
- {From, Reply} -> gen_server2:reply(From, Reply)
- end,
- {stop, normal, State1};
- false -> noreply(State1)
- end.
detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
@@ -1073,9 +1017,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
_ -> 0
end.
-handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -1115,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) ->
gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, Delivered, State));
-handle_call({notify_down, ChPid}, From, State) ->
+handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
%% are no longer visible by the time we send a response to the
%% client. The queue is ultimately deleted in terminate/2; if we
%% return stop with a reply, terminate/2 will be called by
- %% gen_server2 *before* the reply is sent. FIXME: in case of a
- %% delayed stop the reply is sent earlier.
+ %% gen_server2 *before* the reply is sent.
case handle_ch_down(ChPid, State) of
{ok, State1} -> reply(ok, State1);
- {stop, State1} -> stop(From, ok, State1)
+ {stop, State1} -> stop(ok, State1)
end;
handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
@@ -1186,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
-handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
+handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
@@ -1215,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
- true -> stop(From, ok, State1)
+ true -> stop(ok, State1)
end
end;
@@ -1224,14 +1164,14 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
-handle_call({delete, IfUnused, IfEmpty}, From,
+handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
- true -> stop(From, {ok, BQ:len(BQS)}, State)
+ true -> stop({ok, BQ:len(BQS)}, State)
end;
handle_call(purge, _From, State = #q{backing_queue = BQ,
@@ -1286,19 +1226,6 @@ handle_call(force_event_refresh, _From,
end,
reply(ok, State).
-handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
- {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- State1 = case dtree:is_defined(QPid, UC1) of
- false -> QMons = State#q.queue_monitors,
- State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
- true -> State
- end,
- cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State1#q{unconfirmed = UC1});
-
-handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
@@ -1405,15 +1332,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
handle_cast(wake_up, State) ->
noreply(State).
-%% We need to not ignore this as we need to remove outstanding
-%% confirms due to queue death.
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason},
- State = #q{delayed_stop = DS}) when DS =/= undefined ->
- handle_queue_down(DownPid, Reason, State);
-
-handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
- noreply(State);
-
handle_info(maybe_expire, State) ->
case is_unused(State) of
true -> stop(State);
@@ -1442,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% unexpectedly.
stop(State);
-handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) ->
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> handle_queue_down(DownPid, Reason, State1);
+ {ok, State1} -> noreply(State1);
{stop, State1} -> stop(State1)
end;