summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-28 11:53:44 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-28 11:53:44 +0100
commit366d57a2a812c02222daf3f7e99f18f7132754bd (patch)
tree49afd7ca13ee71e5febfa12192fd4ae8353ad45a
parentc6581a20f35a3c47a4eca60842aba39c3c26d4bd (diff)
downloadrabbitmq-server-366d57a2a812c02222daf3f7e99f18f7132754bd.tar.gz
Renaming and minor refactor
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 11 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b98e1801..81aa3f0b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -111,6 +111,7 @@ init(Q) ->
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
+ expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined}, hibernate,
@@ -137,7 +138,7 @@ code_change(_OldVsn, State, _Extra) ->
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {long, Expires} -> start_expiry_timer(State, Expires);
+ {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
undefined -> State
end.
@@ -233,18 +234,15 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{expiry_timer_ref = undefined}.
-start_expiry_timer(State = #q{expires = undefined}) ->
+ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
-start_expiry_timer(State = #q{expires = Expires}) ->
+ensure_expiry_timer(State = #q{expires = Expires}) ->
?LOGDEBUG("~p: Starting expire timer: ~p~n", [State#q.q, Expires]),
NewState = stop_expiry_timer(State),
{ok, TRef} = timer:apply_after(
Expires, rabbit_amqqueue, maybe_expire, [self()]),
NewState#q{expiry_timer_ref = TRef}.
-start_expiry_timer(State, Expires) ->
- start_expiry_timer(State#q{expires = Expires}).
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -638,7 +636,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
backing_queue_state = BQS, backing_queue = BQ}) ->
AckRequired = not NoAck,
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, start_expiry_timer(
+ {empty, BQS1} -> reply(empty, ensure_expiry_timer(
State#q{backing_queue_state = BQS1}));
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
@@ -649,7 +647,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg},
- start_expiry_timer(State#q{backing_queue_state = BQS1}))
+ ensure_expiry_timer(State#q{backing_queue_state = BQS1}))
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -689,7 +687,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
- reply(ok, start_expiry_timer(State2))
+ reply(ok, ensure_expiry_timer(State2))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -716,7 +714,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ChPid, ConsumerTag,
State#q.blocked_consumers)},
case should_auto_delete(NewState) of
- false -> reply(ok, start_expiry_timer(NewState));
+ false -> reply(ok, ensure_expiry_timer(NewState));
true -> {stop, normal, ok, NewState}
end
end;
@@ -752,7 +750,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(start_expiry_timer(requeue_and_run(AckTags, State)))
+ noreply(ensure_expiry_timer(requeue_and_run(AckTags, State)))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
@@ -776,7 +774,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
store_ch_record(C1),
- noreply(start_expiry_timer(State#q{backing_queue_state = BQS1}))
+ noreply(ensure_expiry_timer(State#q{backing_queue_state = BQS1}))
end;
handle_cast({rollback, Txn, ChPid}, State) ->
@@ -836,7 +834,7 @@ handle_cast(maybe_expire, State) ->
case is_unused(State) of
true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
{stop, normal, State};
- false -> noreply(start_expiry_timer(State))
+ false -> noreply(ensure_expiry_timer(State))
end.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},