diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-12 12:13:52 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-12 12:13:52 +0100 |
commit | a026764f8010dd49780cf9ca4b00fd0501a4a7a8 (patch) | |
tree | fefdac68c223ec3cdbba93ffad58272558bae773 | |
parent | ffcd9f37bda544fdfd03e5283165e8295ff55d6e (diff) | |
download | rabbitmq-server-a026764f8010dd49780cf9ca4b00fd0501a4a7a8.tar.gz |
schedule message expiry timer based on expiry time of queue head
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
2 files changed, 28 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6bf290de..85c64fe6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -576,7 +576,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -717,29 +718,34 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), DLXFun = dead_letter_fun(expired, State), ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - BQS1 = case DLXFun of - undefined -> {undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - BQS2; - _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - lists:foreach( - fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, + {Props, BQS1} = + case DLXFun of + undefined -> + {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> + {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), - BQS2 - end, - ensure_ttl_timer(State#q{backing_queue_state = BQS1}). - -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) + {Next, BQS2} + end, + ensure_ttl_timer(case Props of + undefined -> undefined; + #message_properties{expiry = Next} -> Next + end, State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = erlang:send_after(TTL, self(), drop_expired), + false -> TRef = erlang:send_after((Expiry - now_micros()) div 1000, + self(), drop_expired), State#q{ttl_timer_ref = TRef} end; -ensure_ttl_timer(State) -> +ensure_ttl_timer(_Expiry, State) -> State. ack_if_no_dlx(AckTags, State = #q{dlx = undefined, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 49213c95..bd606dfb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; - (S) -> {undefined, S} + End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; + (Next, S) -> {Next, undefined, S} end, case queue_out(State) of {empty, State1} -> - End(a(State1)); + End(undefined, a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case {Pred(MsgProps), AckRequired} of {true, true} -> @@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - End(a(in_r(MsgStatus, State1))) + End(MsgProps, a(in_r(MsgStatus, State1))) end end. |