summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-09-11 17:14:25 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-09-11 17:14:25 +0100
commitac52bbd8546648702bfc8ce43245105da0f5635b (patch)
tree3dcc445a36cc54b0493047543abad1ce6603a1ce
parent9b1caeb7c72008ae2ce435a20b7960d053039eb0 (diff)
downloadrabbitmq-server-ac52bbd8546648702bfc8ce43245105da0f5635b.tar.gz
Version TTL and expiry messages, and ignore the wrong ones. Also stop timers and then start them again when re-initing.
-rw-r--r--src/rabbit_amqqueue_process.erl42
1 files changed, 29 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9c5b5fa2..c7d345d2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
dlx,
dlx_routing_key,
max_length,
+ args_policy_version,
status
}).
@@ -150,7 +151,8 @@ init_state(Q) ->
active_consumers = priority_queue:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
- status = running},
+ status = running,
+ args_policy_version = 0},
rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
@@ -263,11 +265,12 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
-process_args_policy(State = #q{q = Q}) ->
+process_args_policy(State = #q{q = Q,
+ args_policy_version = N}) ->
lists:foldl(
fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
- end, State,
+ end, State#q{args_policy_version = N + 1},
[{<<"expires">>, fun res_min/2, fun init_exp/2},
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
@@ -286,11 +289,16 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
res_arg(_PolVal, ArgVal) -> ArgVal.
res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal).
+%% In both these we init with the undefined variant first to stop any
+%% existing timer, then start a new one which may fire after a
+%% different time.
init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined});
-init_exp(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
+init_exp(Expires, State) -> State1 = init_exp(undefined, State),
+ ensure_expiry_timer(State1#q{expires = Expires}).
init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined});
-init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> State1 = init_ttl(undefined, State),
+ drop_expired_msgs(State1#q{ttl = TTL}).
init_dlx(undefined, State) ->
State#q{dlx = undefined};
@@ -363,11 +371,12 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref).
%% configured period.
ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
-ensure_expiry_timer(State = #q{expires = Expires}) ->
+ensure_expiry_timer(State = #q{expires = Expires,
+ args_policy_version = Version}) ->
case is_unused(State) of
true -> NewState = stop_expiry_timer(State),
rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref,
- Expires, maybe_expire);
+ Expires, {maybe_expire, Version});
false -> State
end.
@@ -375,12 +384,13 @@ stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref).
ensure_ttl_timer(undefined, State) ->
State;
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
+ args_policy_version = Version}) ->
After = (case Expiry - now_micros() of
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), drop_expired),
+ TRef = erlang:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
@@ -1107,8 +1117,8 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;
update_ram_duration -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
+ {maybe_expire, _Version} -> 8;
+ {drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
_ -> 0
@@ -1451,17 +1461,23 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
{ok, Q} = rabbit_amqqueue:lookup(Name),
noreply(process_args_policy(State#q{q = Q})).
-handle_info(maybe_expire, State) ->
+handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) ->
case is_unused(State) of
true -> stop(State);
false -> noreply(State#q{expiry_timer_ref = undefined})
end;
-handle_info(drop_expired, State) ->
+handle_info({maybe_expire, _Vsn}, State) ->
+ noreply(State);
+
+handle_info({drop_expired, Vsn}, State = #q{args_policy_version = Vsn}) ->
WasEmpty = is_empty(State),
State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
noreply(maybe_send_drained(WasEmpty, State1));
+handle_info({drop_expired, _Vsn}, State) ->
+ noreply(State);
+
handle_info(emit_stats, State) ->
emit_stats(State),
%% Don't call noreply/1, we don't want to set timers