summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-20 15:54:54 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-20 15:54:54 +0000
commit7f1a4cd62f261734e84a12631ac47a5d1d56f0fd (patch)
treebbb200cf72d2000db06f8f8e2b271de6e6058c1b
parent626e654987a95b8cfb0e85079899de4e0dee2d33 (diff)
downloadrabbitmq-server-bug25305.tar.gz
ack unroutable dead-lettered messages straight awaybug25305
thus plugging a leak
-rw-r--r--src/rabbit_amqqueue_process.erl30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f87f5777..d8b20335 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1274,18 +1274,24 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
case rabbit_exchange:lookup(XName) of
{ok, X} ->
- noreply(lists:foldl(
- fun({Msg, AckTag}, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMon}) ->
- QPids = dead_letter_publish(Msg, Reason, X,
- State1),
- UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
- QMons = pmon:monitor_all(QPids, QMon),
- State1#q{queue_monitors = QMons,
- publish_seqno = SeqNo + 1,
- unconfirmed = UC1}
- end, State, Msgs));
+ {AckImmediately, State2} =
+ lists:foldl(
+ fun({Msg, AckTag},
+ {Acks, State1 = #q{publish_seqno = SeqNo,
+ unconfirmed = UC,
+ queue_monitors = QMons}}) ->
+ case dead_letter_publish(Msg, Reason, X, State1) of
+ [] -> {[AckTag | Acks], State1};
+ QPids -> UC1 = dtree:insert(
+ SeqNo, QPids, AckTag, UC),
+ QMons1 = pmon:monitor_all(QPids, QMons),
+ {Acks,
+ State1#q{publish_seqno = SeqNo + 1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1}}
+ end
+ end, {[], State}, Msgs),
+ cleanup_after_confirm(AckImmediately, State2);
{error, not_found} ->
cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
end;