summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-08-12 12:13:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-08-12 12:13:52 +0100
commita026764f8010dd49780cf9ca4b00fd0501a4a7a8 (patch)
treefefdac68c223ec3cdbba93ffad58272558bae773
parentffcd9f37bda544fdfd03e5283165e8295ff55d6e (diff)
downloadrabbitmq-server-a026764f8010dd49780cf9ca4b00fd0501a4a7a8.tar.gz
schedule message expiry timer based on expiry time of queue head
-rw-r--r--src/rabbit_amqqueue_process.erl42
-rw-r--r--src/rabbit_variable_queue.erl8
2 files changed, 28 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6bf290de..85c64fe6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -576,7 +576,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -717,29 +718,34 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- BQS1 = case DLXFun of
- undefined -> {undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- BQS2;
- _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
Msgs),
- BQS2
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Next} -> Next
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL,
+ ttl_timer_ref = undefined})
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
+ false -> TRef = erlang:send_after((Expiry - now_micros()) div 1000,
+ self(), drop_expired),
State#q{ttl_timer_ref = TRef}
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49213c95..bd606dfb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
- (S) -> {undefined, S}
+ End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
+ (Next, S) -> {Next, undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
- End(a(State1));
+ End(undefined, a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
@@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- End(a(in_r(MsgStatus, State1)))
+ End(MsgProps, a(in_r(MsgStatus, State1)))
end
end.