summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-01-09 11:10:56 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-01-09 11:10:56 +0000
commit534051bae26aa38e7770351546173034c9c2d7d6 (patch)
tree9789399654931e478ff0b464cbcfc1ea6157183a
parent101814b527d93876831751a0c9542ac48879b49e (diff)
downloadrabbitmq-server-534051bae26aa38e7770351546173034c9c2d7d6.tar.gz
Update dead-lettering due to queue length limit
-rw-r--r--src/rabbit_amqqueue_process.erl22
1 files changed, 18 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2506ff91..81db5491 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -563,10 +563,18 @@ maybe_drop_head(State = #q{max_length = MaxLen,
backing_queue = BQ,
backing_queue_state = BQS}) ->
case BQ:len(BQS) >= MaxLen of
- true -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
- (dead_letter_fun(maxlen))([{Msg, AckTag}]),
- State#q{backing_queue_state = BQS1};
- false -> State
+ true ->
+ with_dlx(State#q.dlx,
+ fun (X) ->
+ {ok, State1} = dead_letter_maxlen_msgs(X, State),
+ State1
+ end,
+ fun () ->
+ {_, BQS1} = BQ:drop(false, BQS),
+ State#q{backing_queue_state = BQS1}
+ end);
+ false ->
+ State
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
@@ -746,6 +754,12 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1),
+ {ok, DLFun(Msg, AckTag, Acc), BQS2}
+ end, maxlen, X, State).
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,