diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-28 15:06:51 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-28 15:06:51 +0100 |
commit | b7ad7e9c73f4688783d155acd66a9cd92b292291 (patch) | |
tree | ea3821c6d75f5d86f3e33cc7d3af45bd3a0fe1ba | |
parent | 366d57a2a812c02222daf3f7e99f18f7132754bd (diff) | |
download | rabbitmq-server-bug21922.tar.gz |
Only start the expiry timer if no consumers, and only reset on ch_down, cancel and getbug21922
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 |
1 files changed, 19 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 81aa3f0b..67f0fcf5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -234,14 +234,20 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{expiry_timer_ref = undefined}. +%% We only wish to expire where there are no consumers *and* when +%% basic.get hasn't been called for the configured period. ensure_expiry_timer(State = #q{expires = undefined}) -> State; ensure_expiry_timer(State = #q{expires = Expires}) -> - ?LOGDEBUG("~p: Starting expire timer: ~p~n", [State#q.q, Expires]), - NewState = stop_expiry_timer(State), - {ok, TRef} = timer:apply_after( - Expires, rabbit_amqqueue, maybe_expire, [self()]), - NewState#q{expiry_timer_ref = TRef}. + case is_unused(State) of + true -> + NewState = stop_expiry_timer(State), + {ok, TRef} = timer:apply_after( + Expires, rabbit_amqqueue, maybe_expire, [self()]), + NewState#q{expiry_timer_ref = TRef}; + false -> + State + end. assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -464,7 +470,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> _ -> rollback_transaction(Txn, ChPid, State1) end, - {ok, requeue_and_run(sets:to_list(ChAckTags), State2)} + {ok, requeue_and_run(sets:to_list(ChAckTags), + ensure_expiry_timer(State2))} end end. @@ -635,9 +642,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, backing_queue_state = BQS, backing_queue = BQ}) -> AckRequired = not NoAck, + State1 = ensure_expiry_timer(State), case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, ensure_expiry_timer( - State#q{backing_queue_state = BQS1})); + {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), @@ -646,8 +653,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, - ensure_expiry_timer(State#q{backing_queue_state = BQS1})) + reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -687,7 +693,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, - reply(ok, ensure_expiry_timer(State2)) + reply(ok, State2) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -750,7 +756,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(ensure_expiry_timer(requeue_and_run(AckTags, State))) + noreply(requeue_and_run(AckTags, State)) end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> @@ -774,7 +780,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, store_ch_record(C1), - noreply(ensure_expiry_timer(State#q{backing_queue_state = BQS1})) + noreply(State#q{backing_queue_state = BQS1}) end; handle_cast({rollback, Txn, ChPid}, State) -> |