From a026764f8010dd49780cf9ca4b00fd0501a4a7a8 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 12 Aug 2012 12:13:52 +0100 Subject: schedule message expiry timer based on expiry time of queue head --- src/rabbit_amqqueue_process.erl | 42 +++++++++++++++++++++++------------------ src/rabbit_variable_queue.erl | 8 ++++---- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6bf290de..85c64fe6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -576,7 +576,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -717,29 +718,34 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), DLXFun = dead_letter_fun(expired, State), ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - BQS1 = case DLXFun of - undefined -> {undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - BQS2; - _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - lists:foreach( - fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, + {Props, BQS1} = + case DLXFun of + undefined -> + {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> + {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), - BQS2 - end, - ensure_ttl_timer(State#q{backing_queue_state = BQS1}). - -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) + {Next, BQS2} + end, + ensure_ttl_timer(case Props of + undefined -> undefined; + #message_properties{expiry = Next} -> Next + end, State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = erlang:send_after(TTL, self(), drop_expired), + false -> TRef = erlang:send_after((Expiry - now_micros()) div 1000, + self(), drop_expired), State#q{ttl_timer_ref = TRef} end; -ensure_ttl_timer(State) -> +ensure_ttl_timer(_Expiry, State) -> State. ack_if_no_dlx(AckTags, State = #q{dlx = undefined, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 49213c95..bd606dfb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; - (S) -> {undefined, S} + End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; + (Next, S) -> {Next, undefined, S} end, case queue_out(State) of {empty, State1} -> - End(a(State1)); + End(undefined, a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case {Pred(MsgProps), AckRequired} of {true, true} -> @@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - End(a(in_r(MsgStatus, State1))) + End(MsgProps, a(in_r(MsgStatus, State1))) end end. -- cgit v1.2.1