diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-12-14 16:26:07 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-12-14 16:26:07 +0000 |
commit | dedee77a913c7b2f1f6d11e6f649a163797f12c4 (patch) | |
tree | d4fdfaaa3632fb2ca51a50574532dc4d1836f94b | |
parent | 01db5e5c77b2bccf0133ff17c836b269d5931562 (diff) | |
download | rabbitmq-server-dedee77a913c7b2f1f6d11e6f649a163797f12c4.tar.gz |
Maxdepth checking with confirms and DLX
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 43 |
1 files changed, 33 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b61df6d6..bcbdb0ad 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -568,7 +568,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {false, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), @@ -579,27 +581,48 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2} -> - BQS1 = publish_max(Message, Props, Delivered, SenderPid, State2), + BQS1 = publish_max(Delivery, Props, Delivered, State2), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. -publish_max(Message, Props, Delivered, SenderPid, #q{backing_queue = BQ, - backing_queue_state = BQS, - max_depth = undefined }) -> +publish_max(#delivery{message = Message, + sender = SenderPid}, + Props, Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + max_depth = undefined}) -> BQ:publish(Message, Props, Delivered, SenderPid, BQS); -publish_max(Message, Props, Delivered, SenderPid, #q{backing_queue = BQ, - backing_queue_state = BQS, - dlx = XName, - max_depth = MaxDepth }) -> +publish_max(#delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, + Props = #message_properties{needs_confirming = Confirm}, + Delivered, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + dlx = XName, + max_depth = MaxDepth }) -> {Depth, Len} = {BQ:depth(BQS), BQ:len(BQS)}, case {Depth >= MaxDepth, Len =:= 0} of {false, _} -> BQ:publish(Message, Props, Delivered, SenderPid, BQS); {true, true} -> + case XName of + undefined -> + ok; + _ -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> dead_letter_publish(Message, maxdepth, X, State); + {error, not_found} -> ok + end + end, + case Confirm of + true -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); + _ -> ok + end, BQS; {true, false} -> - {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(false, BQS), + {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS), + (dead_letter_fun(maxdepth))([{Msg, AckTag}]), BQ:publish(Message, Props, Delivered, SenderPid, BQS1) end. |