summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-28 15:06:51 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-28 15:06:51 +0100
commitb7ad7e9c73f4688783d155acd66a9cd92b292291 (patch)
treeea3821c6d75f5d86f3e33cc7d3af45bd3a0fe1ba
parent366d57a2a812c02222daf3f7e99f18f7132754bd (diff)
downloadrabbitmq-server-bug21922.tar.gz
Only start the expiry timer if no consumers, and only reset on ch_down, cancel and getbug21922
-rw-r--r--src/rabbit_amqqueue_process.erl32
1 files changed, 19 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 81aa3f0b..67f0fcf5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -234,14 +234,20 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{expiry_timer_ref = undefined}.
+%% We only wish to expire where there are no consumers *and* when
+%% basic.get hasn't been called for the configured period.
ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
ensure_expiry_timer(State = #q{expires = Expires}) ->
- ?LOGDEBUG("~p: Starting expire timer: ~p~n", [State#q.q, Expires]),
- NewState = stop_expiry_timer(State),
- {ok, TRef} = timer:apply_after(
- Expires, rabbit_amqqueue, maybe_expire, [self()]),
- NewState#q{expiry_timer_ref = TRef}.
+ case is_unused(State) of
+ true ->
+ NewState = stop_expiry_timer(State),
+ {ok, TRef} = timer:apply_after(
+ Expires, rabbit_amqqueue, maybe_expire, [self()]),
+ NewState#q{expiry_timer_ref = TRef};
+ false ->
+ State
+ end.
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -464,7 +470,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
_ -> rollback_transaction(Txn, ChPid,
State1)
end,
- {ok, requeue_and_run(sets:to_list(ChAckTags), State2)}
+ {ok, requeue_and_run(sets:to_list(ChAckTags),
+ ensure_expiry_timer(State2))}
end
end.
@@ -635,9 +642,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
backing_queue_state = BQS, backing_queue = BQ}) ->
AckRequired = not NoAck,
+ State1 = ensure_expiry_timer(State),
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, ensure_expiry_timer(
- State#q{backing_queue_state = BQS1}));
+ {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
@@ -646,8 +653,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg},
- ensure_expiry_timer(State#q{backing_queue_state = BQS1}))
+ reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -687,7 +693,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
- reply(ok, ensure_expiry_timer(State2))
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -750,7 +756,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(ensure_expiry_timer(requeue_and_run(AckTags, State)))
+ noreply(requeue_and_run(AckTags, State))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
@@ -774,7 +780,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
store_ch_record(C1),
- noreply(ensure_expiry_timer(State#q{backing_queue_state = BQS1}))
+ noreply(State#q{backing_queue_state = BQS1})
end;
handle_cast({rollback, Txn, ChPid}, State) ->