diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 16:22:57 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 16:22:57 +0000 |
commit | 37ef7fd3b3621fe7c91a67c325a4678cb579aeca (patch) | |
tree | 3fe0536a726514173b9db0837a1a9c3526946657 | |
parent | 3eeb54407bc7efe970e401d81fc1ab5f8a512c8e (diff) | |
download | rabbitmq-server-37ef7fd3b3621fe7c91a67c325a4678cb579aeca.tar.gz |
don't send expired messages to self() during bulk expiry
since they all end up in the mailbox, consuming memory
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 |
1 files changed, 35 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 461b5a6d..057d9391 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,23 +716,46 @@ drop_expired_messages(State = #q{dlx = DLX, backing_queue = BQ }) -> Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = + {Props, State1} = case DLX of - undefined -> BQ:dropwhile(ExpirePred, BQS); - _ -> DLXFun = dead_letter_fun(expired), - {Next, ok, BQS2} = - BQ:fetchwhile( - ExpirePred, - fun (Msg, _IsDelivered, AckTag, Acc) -> - DLXFun(Msg, AckTag), - Acc - end, ok, BQS), - {Next, BQS2} + undefined -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), + {Next, State#q{backing_queue_state = BQS1}}; + _ -> case rabbit_exchange:lookup(DLX) of + {ok, X} -> + drop_expired_messages(ExpirePred, X, State); + {error, not_found} -> + {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), + {Next, State#q{backing_queue_state = BQS1}} + end end, ensure_ttl_timer(case Props of undefined -> undefined; #message_properties{expiry = Exp} -> Exp - end, State#q{backing_queue_state = BQS1}). + end, State1). + +drop_expired_messages(ExpirePred, X, State = #q{dlx_routing_key = RK, + publish_seqno = SeqNo0, + unconfirmed = UC0, + queue_monitors = QMons0, + backing_queue_state = BQS, + backing_queue = BQ}) -> + QName = qname(State), + {Next, {ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} = + BQ:fetchwhile( + ExpirePred, + fun (Msg, _IsDelivered, AckTag, {ConfirmImm, SeqNo, UC, QMons}) -> + case dead_letter_publish(Msg, expired, X, RK, SeqNo, QName) of + [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons}; + QPids -> {ConfirmImm, SeqNo + 1, + dtree:insert(SeqNo, QPids, AckTag, UC), + pmon:monitor_all(QPids, QMons)} + end + end, {[], SeqNo0, UC0, QMons0}, BQS), + {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1), + {Next, State#q{publish_seqno = SeqNo1, + unconfirmed = UC1, + queue_monitors = QMons1, + backing_queue_state = BQS2}}. ensure_ttl_timer(undefined, State) -> State; |