diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-13 11:46:27 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-13 11:46:27 +0100 |
commit | ebdb4847e0ddde94e4ec5c649cb8e8d6a81b06f3 (patch) | |
tree | 092ed930f227852221233857fff113df49093c03 | |
parent | 35dc9ebfc09e93d1095fac179e46313c61207a76 (diff) | |
download | rabbitmq-server-ebdb4847e0ddde94e4ec5c649cb8e8d6a81b06f3.tar.gz |
prompt expiry of requeued messages
which may require moving the timer expiry forward
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0250902f..283eba7c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + ttl_timer_expiry, senders, publish_seqno, unconfirmed, @@ -734,19 +735,23 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). -ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) - when TTL =/= undefined -> - case BQ:is_empty(BQS) of - true -> State; - false -> After = (case Expiry - now_micros() of - V when V > 0 -> V + 999; %% always fire later - _ -> 0 - end) div 1000, - TRef = erlang:send_after(After, self(), drop_expired), - State#q{ttl_timer_ref = TRef} +ensure_ttl_timer(undefined, State) -> + State; +ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) -> + State; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> + After = (case Expiry - now_micros() of + V when V > 0 -> V + 999; %% always fire later + _ -> 0 + end) div 1000, + TRef = erlang:send_after(After, self(), drop_expired), + State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, + ttl_timer_expiry = TExpiry}) + when Expiry < TExpiry -> + case erlang:cancel_timer(TRef) of + false -> State; + _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; ensure_ttl_timer(_Expiry, State) -> State. |