diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-20 15:54:54 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-20 15:54:54 +0000 |
commit | 7f1a4cd62f261734e84a12631ac47a5d1d56f0fd (patch) | |
tree | bbb200cf72d2000db06f8f8e2b271de6e6058c1b | |
parent | 626e654987a95b8cfb0e85079899de4e0dee2d33 (diff) | |
download | rabbitmq-server-bug25305.tar.gz |
ack unroutable dead-lettered messages straight awaybug25305
thus plugging a leak
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f87f5777..d8b20335 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1274,18 +1274,24 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - noreply(lists:foldl( - fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMon}) -> - QPids = dead_letter_publish(Msg, Reason, X, - State1), - UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), - QMons = pmon:monitor_all(QPids, QMon), - State1#q{queue_monitors = QMons, - publish_seqno = SeqNo + 1, - unconfirmed = UC1} - end, State, Msgs)); + {AckImmediately, State2} = + lists:foldl( + fun({Msg, AckTag}, + {Acks, State1 = #q{publish_seqno = SeqNo, + unconfirmed = UC, + queue_monitors = QMons}}) -> + case dead_letter_publish(Msg, Reason, X, State1) of + [] -> {[AckTag | Acks], State1}; + QPids -> UC1 = dtree:insert( + SeqNo, QPids, AckTag, UC), + QMons1 = pmon:monitor_all(QPids, QMons), + {Acks, + State1#q{publish_seqno = SeqNo + 1, + unconfirmed = UC1, + queue_monitors = QMons1}} + end + end, {[], State}, Msgs), + cleanup_after_confirm(AckImmediately, State2); {error, not_found} -> cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) end; |