summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-01 16:22:57 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-01 16:22:57 +0000
commit37ef7fd3b3621fe7c91a67c325a4678cb579aeca (patch)
tree3fe0536a726514173b9db0837a1a9c3526946657
parent3eeb54407bc7efe970e401d81fc1ab5f8a512c8e (diff)
downloadrabbitmq-server-37ef7fd3b3621fe7c91a67c325a4678cb579aeca.tar.gz
don't send expired messages to self() during bulk expiry
since they all end up in the mailbox, consuming memory
-rw-r--r--src/rabbit_amqqueue_process.erl47
1 files changed, 35 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 461b5a6d..057d9391 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,23 +716,46 @@ drop_expired_messages(State = #q{dlx = DLX,
backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} =
+ {Props, State1} =
case DLX of
- undefined -> BQ:dropwhile(ExpirePred, BQS);
- _ -> DLXFun = dead_letter_fun(expired),
- {Next, ok, BQS2} =
- BQ:fetchwhile(
- ExpirePred,
- fun (Msg, _IsDelivered, AckTag, Acc) ->
- DLXFun(Msg, AckTag),
- Acc
- end, ok, BQS),
- {Next, BQS2}
+ undefined -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}};
+ _ -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} ->
+ drop_expired_messages(ExpirePred, X, State);
+ {error, not_found} ->
+ {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}}
+ end
end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
- end, State#q{backing_queue_state = BQS1}).
+ end, State1).
+
+drop_expired_messages(ExpirePred, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {Next, {ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ BQ:fetchwhile(
+ ExpirePred,
+ fun (Msg, _IsDelivered, AckTag, {ConfirmImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, expired, X, RK, SeqNo, QName) of
+ [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons};
+ QPids -> {ConfirmImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS),
+ {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1),
+ {Next, State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}}.
ensure_ttl_timer(undefined, State) ->
State;