diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-11 17:14:25 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-11 17:14:25 +0100 |
commit | ac52bbd8546648702bfc8ce43245105da0f5635b (patch) | |
tree | 3dcc445a36cc54b0493047543abad1ce6603a1ce | |
parent | 9b1caeb7c72008ae2ce435a20b7960d053039eb0 (diff) | |
download | rabbitmq-server-ac52bbd8546648702bfc8ce43245105da0f5635b.tar.gz |
Version TTL and expiry messages, and ignore the wrong ones. Also stop timers and then start them again when re-initing.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 |
1 files changed, 29 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9c5b5fa2..c7d345d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ dlx, dlx_routing_key, max_length, + args_policy_version, status }). @@ -150,7 +151,8 @@ init_state(Q) -> active_consumers = priority_queue:new(), senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), - status = running}, + status = running, + args_policy_version = 0}, rabbit_event:init_stats_timer(State, #q.stats_timer). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -263,11 +265,12 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -process_args_policy(State = #q{q = Q}) -> +process_args_policy(State = #q{q = Q, + args_policy_version = N}) -> lists:foldl( fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) - end, State, + end, State#q{args_policy_version = N + 1}, [{<<"expires">>, fun res_min/2, fun init_exp/2}, {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, @@ -286,11 +289,16 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> res_arg(_PolVal, ArgVal) -> ArgVal. res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal). +%% In both these we init with the undefined variant first to stop any +%% existing timer, then start a new one which may fire after a +%% different time. init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined}); -init_exp(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). +init_exp(Expires, State) -> State1 = init_exp(undefined, State), + ensure_expiry_timer(State1#q{expires = Expires}). init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined}); -init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}). +init_ttl(TTL, State) -> State1 = init_ttl(undefined, State), + drop_expired_msgs(State1#q{ttl = TTL}). init_dlx(undefined, State) -> State#q{dlx = undefined}; @@ -363,11 +371,12 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref). %% configured period. ensure_expiry_timer(State = #q{expires = undefined}) -> State; -ensure_expiry_timer(State = #q{expires = Expires}) -> +ensure_expiry_timer(State = #q{expires = Expires, + args_policy_version = Version}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref, - Expires, maybe_expire); + Expires, {maybe_expire, Version}); false -> State end. @@ -375,12 +384,13 @@ stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref). ensure_ttl_timer(undefined, State) -> State; -ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, + args_policy_version = Version}) -> After = (case Expiry - now_micros() of V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, - TRef = erlang:send_after(After, self(), drop_expired), + TRef = erlang:send_after(After, self(), {drop_expired, Version}), State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) @@ -1107,8 +1117,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; - maybe_expire -> 8; - drop_expired -> 8; + {maybe_expire, _Version} -> 8; + {drop_expired, _Version} -> 8; emit_stats -> 7; sync_timeout -> 6; _ -> 0 @@ -1451,17 +1461,23 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> {ok, Q} = rabbit_amqqueue:lookup(Name), noreply(process_args_policy(State#q{q = Q})). -handle_info(maybe_expire, State) -> +handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) -> case is_unused(State) of true -> stop(State); false -> noreply(State#q{expiry_timer_ref = undefined}) end; -handle_info(drop_expired, State) -> +handle_info({maybe_expire, _Vsn}, State) -> + noreply(State); + +handle_info({drop_expired, Vsn}, State = #q{args_policy_version = Vsn}) -> WasEmpty = is_empty(State), State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), noreply(maybe_send_drained(WasEmpty, State1)); +handle_info({drop_expired, _Vsn}, State) -> + noreply(State); + handle_info(emit_stats, State) -> emit_stats(State), %% Don't call noreply/1, we don't want to set timers |