diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 130 |
1 files changed, 24 insertions, 106 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b016c4d2..3712a625 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,10 +49,6 @@ ttl_timer_ref, ttl_timer_expiry, senders, - publish_seqno, - unconfirmed, - delayed_stop, - queue_monitors, dlx, dlx_routing_key, max_length, @@ -151,9 +147,6 @@ init_state(Q) -> has_had_consumers = false, active_consumers = queue:new(), senders = pmon:new(), - publish_seqno = 1, - unconfirmed = dtree:empty(), - queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty(), status = running}, rabbit_event:init_stats_timer(State, #q.stats_timer). @@ -820,80 +813,31 @@ dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> State1. dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, - publish_seqno = SeqNo0, - unconfirmed = UC0, - queue_monitors = QMons0, backing_queue_state = BQS, backing_queue = BQ}) -> QName = qname(State), - {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} = - Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) -> - case dead_letter_publish(Msg, Reason, - X, RK, SeqNo, QName) of - [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; - QPids -> {AckImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), - pmon:monitor_all(QPids, QMons)} - end - end, {[], SeqNo0, UC0, QMons0}, BQS), - {_Guids, BQS2} = BQ:ack(AckImm1, BQS1), - {Res, State#q{publish_seqno = SeqNo1, - unconfirmed = UC1, - queue_monitors = QMons1, - backing_queue_state = BQS2}}. - -dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> + {Res, Acks1, BQS1} = + Fun(fun (Msg, AckTag, Acks) -> + dead_letter_publish(Msg, Reason, X, RK, QName), + [AckTag | Acks] + end, [], BQS), + {_Guids, BQS2} = BQ:ack(Acks1, BQS1), + {Res, State#q{backing_queue_state = BQS2}}. + +dead_letter_publish(Msg, Reason, X, RK, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - {_, DeliveredQPids} = rabbit_amqqueue:deliver( - rabbit_amqqueue:lookup(Queues), Delivery), - DeliveredQPids. - -handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed = UC}) -> - case pmon:is_monitored(QPid, QMons) of - false -> noreply(State); - true -> case rabbit_misc:is_abnormal_exit(Reason) of - true -> {Lost, _UC1} = dtree:take_all(QPid, UC), - QNameS = rabbit_misc:rs(qname(State)), - rabbit_log:warning("DLQ ~p for ~s died with " - "~p unconfirmed messages~n", - [QPid, QNameS, length(Lost)]); - false -> ok - end, - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = pmon:erase(QPid, QMons), - unconfirmed = UC1}) - end. + rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), + ok. -stop(State) -> stop(undefined, noreply, State). +stop(State) -> stop(noreply, State). -stop(From, Reply, State = #q{unconfirmed = UC}) -> - case {dtree:is_empty(UC), Reply} of - {true, noreply} -> {stop, normal, State}; - {true, _} -> {stop, normal, Reply, State}; - {false, _} -> noreply(State#q{delayed_stop = {From, Reply}}) - end. +stop(noreply, State) -> {stop, normal, State}; +stop(Reply, State) -> {stop, normal, Reply, State}. -cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, - unconfirmed = UC, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case dtree:is_empty(UC) andalso DS =/= undefined of - true -> case DS of - {_, noreply} -> ok; - {From, Reply} -> gen_server2:reply(From, Reply) - end, - {stop, normal, State1}; - false -> noreply(State1) - end. detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = @@ -1073,9 +1017,6 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. -handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -1115,16 +1056,15 @@ handle_call({deliver, Delivery, Delivered}, From, State) -> gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, Delivered, State)); -handle_call({notify_down, ChPid}, From, State) -> +handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by - %% gen_server2 *before* the reply is sent. FIXME: in case of a - %% delayed stop the reply is sent earlier. + %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of {ok, State1} -> reply(ok, State1); - {stop, State1} -> stop(From, ok, State1) + {stop, State1} -> stop(ok, State1) end; handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, @@ -1186,7 +1126,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -1215,7 +1155,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> stop(From, ok, State1) + true -> stop(ok, State1) end end; @@ -1224,14 +1164,14 @@ handle_call(stat, _From, State) -> ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, From, +handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> stop(From, {ok, BQ:len(BQS)}, State) + true -> stop({ok, BQ:len(BQS)}, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1286,19 +1226,6 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> - {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), - State1 = case dtree:is_defined(QPid, UC1) of - false -> QMons = State#q.queue_monitors, - State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; - true -> State - end, - cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State1#q{unconfirmed = UC1}); - -handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); @@ -1405,15 +1332,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, handle_cast(wake_up, State) -> noreply(State). -%% We need to not ignore this as we need to remove outstanding -%% confirms due to queue death. -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, - State = #q{delayed_stop = DS}) when DS =/= undefined -> - handle_queue_down(DownPid, Reason, State); - -handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> - noreply(State); - handle_info(maybe_expire, State) -> case is_unused(State) of true -> stop(State); @@ -1442,9 +1360,9 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% unexpectedly. stop(State); -handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, State1} -> handle_queue_down(DownPid, Reason, State1); + {ok, State1} -> noreply(State1); {stop, State1} -> stop(State1) end; |