summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-12-14 16:26:07 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-12-14 16:26:07 +0000
commitdedee77a913c7b2f1f6d11e6f649a163797f12c4 (patch)
treed4fdfaaa3632fb2ca51a50574532dc4d1836f94b
parent01db5e5c77b2bccf0133ff17c836b269d5931562 (diff)
downloadrabbitmq-server-dedee77a913c7b2f1f6d11e6f649a163797f12c4.tar.gz
Maxdepth checking with confirms and DLX
-rw-r--r--src/rabbit_amqqueue_process.erl43
1 files changed, 33 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b61df6d6..bcbdb0ad 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -568,7 +568,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
{false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
@@ -579,27 +581,48 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2} ->
- BQS1 = publish_max(Message, Props, Delivered, SenderPid, State2),
+ BQS1 = publish_max(Delivery, Props, Delivered, State2),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
-publish_max(Message, Props, Delivered, SenderPid, #q{backing_queue = BQ,
- backing_queue_state = BQS,
- max_depth = undefined }) ->
+publish_max(#delivery{message = Message,
+ sender = SenderPid},
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ max_depth = undefined}) ->
BQ:publish(Message, Props, Delivered, SenderPid, BQS);
-publish_max(Message, Props, Delivered, SenderPid, #q{backing_queue = BQ,
- backing_queue_state = BQS,
- dlx = XName,
- max_depth = MaxDepth }) ->
+publish_max(#delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid},
+ Props = #message_properties{needs_confirming = Confirm},
+ Delivered,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ dlx = XName,
+ max_depth = MaxDepth }) ->
{Depth, Len} = {BQ:depth(BQS), BQ:len(BQS)},
case {Depth >= MaxDepth, Len =:= 0} of
{false, _} ->
BQ:publish(Message, Props, Delivered, SenderPid, BQS);
{true, true} ->
+ case XName of
+ undefined ->
+ ok;
+ _ ->
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} -> dead_letter_publish(Message, maxdepth, X, State);
+ {error, not_found} -> ok
+ end
+ end,
+ case Confirm of
+ true -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]);
+ _ -> ok
+ end,
BQS;
{true, false} ->
- {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(false, BQS),
+ {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
+ (dead_letter_fun(maxdepth))([{Msg, AckTag}]),
BQ:publish(Message, Props, Delivered, SenderPid, BQS1)
end.