summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-08-13 11:46:27 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-08-13 11:46:27 +0100
commitebdb4847e0ddde94e4ec5c649cb8e8d6a81b06f3 (patch)
tree092ed930f227852221233857fff113df49093c03
parent35dc9ebfc09e93d1095fac179e46313c61207a76 (diff)
downloadrabbitmq-server-ebdb4847e0ddde94e4ec5c649cb8e8d6a81b06f3.tar.gz
prompt expiry of requeued messages
which may require moving the timer expiry forward
-rw-r--r--src/rabbit_amqqueue_process.erl31
1 files changed, 18 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0250902f..283eba7c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ ttl_timer_expiry,
senders,
publish_seqno,
unconfirmed,
@@ -734,19 +735,23 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
-ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
- when TTL =/= undefined ->
- case BQ:is_empty(BQS) of
- true -> State;
- false -> After = (case Expiry - now_micros() of
- V when V > 0 -> V + 999; %% always fire later
- _ -> 0
- end) div 1000,
- TRef = erlang:send_after(After, self(), drop_expired),
- State#q{ttl_timer_ref = TRef}
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
ensure_ttl_timer(_Expiry, State) ->
State.