diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-28 11:53:44 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-28 11:53:44 +0100 |
commit | 366d57a2a812c02222daf3f7e99f18f7132754bd (patch) | |
tree | 49afd7ca13ee71e5febfa12192fd4ae8353ad45a | |
parent | c6581a20f35a3c47a4eca60842aba39c3c26d4bd (diff) | |
download | rabbitmq-server-366d57a2a812c02222daf3f7e99f18f7132754bd.tar.gz |
Renaming and minor refactor
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 24 |
1 files changed, 11 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b98e1801..81aa3f0b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -111,6 +111,7 @@ init(Q) -> backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), + expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined}, hibernate, @@ -137,7 +138,7 @@ code_change(_OldVsn, State, _Extra) -> init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {long, Expires} -> start_expiry_timer(State, Expires); + {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); undefined -> State end. @@ -233,18 +234,15 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{expiry_timer_ref = undefined}. -start_expiry_timer(State = #q{expires = undefined}) -> +ensure_expiry_timer(State = #q{expires = undefined}) -> State; -start_expiry_timer(State = #q{expires = Expires}) -> +ensure_expiry_timer(State = #q{expires = Expires}) -> ?LOGDEBUG("~p: Starting expire timer: ~p~n", [State#q.q, Expires]), NewState = stop_expiry_timer(State), {ok, TRef} = timer:apply_after( Expires, rabbit_amqqueue, maybe_expire, [self()]), NewState#q{expiry_timer_ref = TRef}. -start_expiry_timer(State, Expires) -> - start_expiry_timer(State#q{expires = Expires}). - assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -638,7 +636,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, backing_queue_state = BQS, backing_queue = BQ}) -> AckRequired = not NoAck, case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, start_expiry_timer( + {empty, BQS1} -> reply(empty, ensure_expiry_timer( State#q{backing_queue_state = BQS1})); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of @@ -649,7 +647,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, end, Msg = {QName, self(), AckTag, IsDelivered, Message}, reply({ok, Remaining, Msg}, - start_expiry_timer(State#q{backing_queue_state = BQS1})) + ensure_expiry_timer(State#q{backing_queue_state = BQS1})) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -689,7 +687,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, - reply(ok, start_expiry_timer(State2)) + reply(ok, ensure_expiry_timer(State2)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -716,7 +714,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ChPid, ConsumerTag, State#q.blocked_consumers)}, case should_auto_delete(NewState) of - false -> reply(ok, start_expiry_timer(NewState)); + false -> reply(ok, ensure_expiry_timer(NewState)); true -> {stop, normal, ok, NewState} end end; @@ -752,7 +750,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(start_expiry_timer(requeue_and_run(AckTags, State))) + noreply(ensure_expiry_timer(requeue_and_run(AckTags, State))) end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> @@ -776,7 +774,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, store_ch_record(C1), - noreply(start_expiry_timer(State#q{backing_queue_state = BQS1})) + noreply(ensure_expiry_timer(State#q{backing_queue_state = BQS1})) end; handle_cast({rollback, Txn, ChPid}, State) -> @@ -836,7 +834,7 @@ handle_cast(maybe_expire, State) -> case is_unused(State) of true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), {stop, normal, State}; - false -> noreply(start_expiry_timer(State)) + false -> noreply(ensure_expiry_timer(State)) end. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, |