diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f87f5777..d8b20335 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1274,18 +1274,24 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - noreply(lists:foldl( - fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMon}) -> - QPids = dead_letter_publish(Msg, Reason, X, - State1), - UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), - QMons = pmon:monitor_all(QPids, QMon), - State1#q{queue_monitors = QMons, - publish_seqno = SeqNo + 1, - unconfirmed = UC1} - end, State, Msgs)); + {AckImmediately, State2} = + lists:foldl( + fun({Msg, AckTag}, + {Acks, State1 = #q{publish_seqno = SeqNo, + unconfirmed = UC, + queue_monitors = QMons}}) -> + case dead_letter_publish(Msg, Reason, X, State1) of + [] -> {[AckTag | Acks], State1}; + QPids -> UC1 = dtree:insert( + SeqNo, QPids, AckTag, UC), + QMons1 = pmon:monitor_all(QPids, QMons), + {Acks, + State1#q{publish_seqno = SeqNo + 1, + unconfirmed = UC1, + queue_monitors = QMons1}} + end + end, {[], State}, Msgs), + cleanup_after_confirm(AckImmediately, State2); {error, not_found} -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; |