diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 21:41:29 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 21:41:29 +0000 |
commit | de7441ce679272f8f7200b768c00cf1e06996823 (patch) | |
tree | 9fe983ab16aeebaa68ee23e9f8b0cb8f9cda4838 | |
parent | 2ae774b2f225b08ff9b1240d70f0d62c948e3362 (diff) | |
download | rabbitmq-server-de7441ce679272f8f7200b768c00cf1e06996823.tar.gz |
don't send dead-lettered messages to self() during 'reject' handling
since they all end up in the mailbox, consuming memory
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 69 |
1 files changed, 33 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b908361c..8e20f4e1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -757,6 +757,29 @@ dead_letter_expired_msgs(ExpirePred, X, State = #q{dlx_routing_key = RK, queue_monitors = QMons1, backing_queue_state = BQS2}}. +dead_letter_rejected_msgs(AckTags, X, State = #q{dlx_routing_key = RK, + publish_seqno = SeqNo0, + unconfirmed = UC0, + queue_monitors = QMons0, + backing_queue_state = BQS, + backing_queue = BQ}) -> + QName = qname(State), + {{ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} = + BQ:ackfold( + fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) -> + case dead_letter_publish(Msg, rejected, X, RK, SeqNo, QName) of + [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons}; + QPids -> {ConfirmImm, SeqNo + 1, + dtree:insert(SeqNo, QPids, AckTag, UC), + pmon:monitor_all(QPids, QMons)} + end + end, {[], SeqNo0, UC0, QMons0}, BQS, AckTags), + {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1), + State#q{publish_seqno = SeqNo1, + unconfirmed = UC1, + queue_monitors = QMons1, + backing_queue_state = BQS2}. + ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> @@ -776,11 +799,6 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ensure_ttl_timer(_Expiry, State) -> State. -dead_letter_fun(Reason) -> - fun(Msg, AckTag) -> - gen_server2:cast(self(), {dead_letter, Msg, AckTag, Reason}) - end. - dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), @@ -1216,17 +1234,16 @@ handle_cast({reject, AckTags, true, ChPid}, State) -> handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) -> noreply(ack(AckTags, ChPid, State)); -handle_cast({reject, AckTags, false, ChPid}, State) -> - DLXFun = dead_letter_fun(rejected), - noreply(subtract_acks( - ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {ok, BQS1} = BQ:ackfold( - fun (M, A, ok) -> DLXFun([{M, A}]) end, - ok, BQS, AckTags), - State1#q{backing_queue_state = BQS1} - end)); +handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = DLX}) -> + noreply(case rabbit_exchange:lookup(DLX) of + {ok, X} -> subtract_acks( + ChPid, AckTags, State, + fun (State1) -> + dead_letter_rejected_msgs( + AckTags, X, State1) + end); + {error, not_found} -> ack(AckTags, ChPid, State) + end); handle_cast(delete_immediately, State) -> stop(State); @@ -1272,26 +1289,6 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({dead_letter, Msg, AckTag, Reason}, - State = #q{dlx = XName, - dlx_routing_key = RK, - publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMons}) -> - case rabbit_exchange:lookup(XName) of - {ok, X} -> - case dead_letter_publish(Msg, Reason, X, RK, SeqNo, qname(State)) of - [] -> cleanup_after_confirm([AckTag], State); - QPids -> UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), - QMons1 = pmon:monitor_all(QPids, QMons), - State#q{publish_seqno = SeqNo + 1, - unconfirmed = UC1, - queue_monitors = QMons1} - end; - {error, not_found} -> - cleanup_after_confirm([AckTag], State) - end; - handle_cast(start_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> %% lookup again to get policy for init_with_existing_bq |