summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-18 14:19:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-18 14:19:16 +0100
commitac39c8b78683bf04529f60daec89fb0d8281a15e (patch)
tree3963a454134a5dc26211ac94b121fbd5e4de11cd
parent132d3f05fd04ce55564b8b30855e109210760c75 (diff)
downloadrabbitmq-server-ac39c8b78683bf04529f60daec89fb0d8281a15e.tar.gz
stop the commit timer if we're no longer dirty. This means it should no longer be a repeat timer because once it's set were either going to receive the explicit sync call or we're going to timeout on message queue at which point we're no longer dirty and so we'll then cancel the timer....
-rw-r--r--src/rabbit_disk_queue.erl40
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c6076635..192995b2 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -376,8 +376,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
- {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []),
-
InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
@@ -395,8 +393,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
file_size_limit = FileSizeLimit,
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
- on_sync_froms = [],
- timer_ref = TRef
+ on_sync_froms = [],
+ timer_ref = undefined
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -528,11 +526,10 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge},
- timer_ref = TRef
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
}) ->
+ State1 = stop_commit_timer(State),
%% deliberately ignoring return codes here
- timer:cancel(TRef),
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
@@ -545,11 +542,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
dict:fold(fun (_File, Hdl, _Acc) ->
file:close(Hdl)
end, ok, ReadHdls),
- State #dqstate { current_file_handle = undefined,
- current_dirty = false,
- read_file_handles = {dict:new(), gb_trees:empty()},
- timer_ref = undefined
- }.
+ State1 #dqstate { current_file_handle = undefined,
+ current_dirty = false,
+ read_file_handles = {dict:new(), gb_trees:empty()}
+ }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -557,14 +553,14 @@ code_change(_OldVsn, State, _Extra) ->
%% ---- UTILITY FUNCTIONS ----
noreply(NewState = #dqstate { current_dirty = true }) ->
- {noreply, NewState, 0};
+ {noreply, start_commit_timer(NewState), 0};
noreply(NewState) ->
- {noreply, NewState, infinity}.
+ {noreply, stop_commit_timer(NewState), infinity}.
reply(Reply, NewState = #dqstate { current_dirty = true }) ->
- {reply, Reply, NewState, 0};
+ {reply, Reply, start_commit_timer(NewState), 0};
reply(Reply, NewState) ->
- {reply, Reply, NewState, infinity}.
+ {reply, Reply, stop_commit_timer(NewState), infinity}.
form_filename(Name) ->
filename:join(base_directory(), Name).
@@ -707,6 +703,18 @@ sequence_lookup(Sequences, Q) ->
{ReadSeqId, WriteSeqId, Length}
end.
+start_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
+ State #dqstate { timer_ref = TRef };
+start_commit_timer(State) ->
+ State.
+
+stop_commit_timer(State = #dqstate { timer_ref = undefined }) ->
+ State;
+stop_commit_timer(State = #dqstate { timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { timer_ref = undefined }.
+
sync_current_file_handle(State = #dqstate { current_dirty = false,
on_sync_froms = [] }) ->
State;