diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-12-05 16:28:25 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-12-05 16:28:25 +0000 |
commit | 3761e87b40caaf58962655130bb5eea3f9d20860 (patch) | |
tree | c40c2a8eedc55028f8920176cffc20bd219078bd | |
parent | a38e4cfeb5e1657210c8badcccef54d55f48f22b (diff) | |
download | rabbitmq-server-3761e87b40caaf58962655130bb5eea3f9d20860.tar.gz |
send expired messages to self() one at a time
which simplifies the code a fair bit and also means we don't build up
a potentially huge intermediate data structure.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 69 |
1 files changed, 31 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ffa2a1a..2bb476fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -724,26 +724,24 @@ drop_expired_messages(State = #q{dlx = DLX, backing_queue = BQ }) -> Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = case DLX of - undefined -> BQ:dropwhile(ExpirePred, BQS); - _ -> {Next, Msgs, BQS2} = - BQ:fetchwhile(ExpirePred, - fun accumulate_msgs/4, - [], BQS), - case Msgs of - [] -> ok; - _ -> (dead_letter_fun(expired))( - lists:reverse(Msgs)) - end, - {Next, BQS2} - end, + {Props, BQS1} = + case DLX of + undefined -> BQ:dropwhile(ExpirePred, BQS); + _ -> DLXFun = dead_letter_fun(expired), + {Next, ok, BQS2} = + BQ:fetchwhile( + ExpirePred, + fun (Msg, _IsDelivered, AckTag, Acc) -> + DLXFun(Msg, AckTag), + Acc + end, ok, BQS), + {Next, BQS2} + end, ensure_ttl_timer(case Props of undefined -> undefined; #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). -accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. - ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> @@ -764,7 +762,9 @@ ensure_ttl_timer(_Expiry, State) -> State. dead_letter_fun(Reason) -> - fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end. + fun(Msg, AckTag) -> + gen_server2:cast(self(), {dead_letter, Msg, AckTag, Reason}) + end. dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> DLMsg = make_dead_letter_msg(Reason, Msg, State), @@ -1213,8 +1213,7 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, - BQS, AckTags), + BQS1 = BQ:foreach_ack(DLXFun, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); @@ -1262,29 +1261,23 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> +handle_cast({dead_letter, Msg, AckTag, Reason}, + State = #q{dlx = XName, + publish_seqno = SeqNo, + unconfirmed = UC, + queue_monitors = QMons}) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - {AckImmediately, State2} = - lists:foldl( - fun({Msg, AckTag}, - {Acks, State1 = #q{publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMons}}) -> - case dead_letter_publish(Msg, Reason, X, State1) of - [] -> {[AckTag | Acks], State1}; - QPids -> UC1 = dtree:insert( - SeqNo, QPids, AckTag, UC), - QMons1 = pmon:monitor_all(QPids, QMons), - {Acks, - State1#q{publish_seqno = SeqNo + 1, - unconfirmed = UC1, - queue_monitors = QMons1}} - end - end, {[], State}, Msgs), - cleanup_after_confirm(AckImmediately, State2); + case dead_letter_publish(Msg, Reason, X, State) of + [] -> cleanup_after_confirm([AckTag], State); + QPids -> UC1 = dtree:insert(SeqNo, QPids, AckTag, UC), + QMons1 = pmon:monitor_all(QPids, QMons), + State#q{publish_seqno = SeqNo + 1, + unconfirmed = UC1, + queue_monitors = QMons1} + end; {error, not_found} -> - cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) + cleanup_after_confirm([AckTag], State) end; handle_cast(start_mirroring, State = #q{backing_queue = BQ, |