diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-18 14:19:16 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-18 14:19:16 +0100 |
commit | ac39c8b78683bf04529f60daec89fb0d8281a15e (patch) | |
tree | 3963a454134a5dc26211ac94b121fbd5e4de11cd | |
parent | 132d3f05fd04ce55564b8b30855e109210760c75 (diff) | |
download | rabbitmq-server-ac39c8b78683bf04529f60daec89fb0d8281a15e.tar.gz |
stop the commit timer if we're no longer dirty. This means it should no longer be a repeat timer because once it's set were either going to receive the explicit sync call or we're going to timeout on message queue at which point we're no longer dirty and so we'll then cancel the timer....
-rw-r--r-- | src/rabbit_disk_queue.erl | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c6076635..192995b2 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -376,8 +376,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), - {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []), - InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location_dets = MsgLocationDets, @@ -395,8 +393,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit, - on_sync_froms = [], - timer_ref = TRef + on_sync_froms = [], + timer_ref = undefined }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -528,11 +526,10 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge}, - timer_ref = TRef + read_file_handles = {ReadHdls, _ReadHdlsAge} }) -> + State1 = stop_commit_timer(State), %% deliberately ignoring return codes here - timer:cancel(TRef), dets:close(MsgLocationDets), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), @@ -545,11 +542,10 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, dict:fold(fun (_File, Hdl, _Acc) -> file:close(Hdl) end, ok, ReadHdls), - State #dqstate { current_file_handle = undefined, - current_dirty = false, - read_file_handles = {dict:new(), gb_trees:empty()}, - timer_ref = undefined - }. + State1 #dqstate { current_file_handle = undefined, + current_dirty = false, + read_file_handles = {dict:new(), gb_trees:empty()} + }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -557,14 +553,14 @@ code_change(_OldVsn, State, _Extra) -> %% ---- UTILITY FUNCTIONS ---- noreply(NewState = #dqstate { current_dirty = true }) -> - {noreply, NewState, 0}; + {noreply, start_commit_timer(NewState), 0}; noreply(NewState) -> - {noreply, NewState, infinity}. + {noreply, stop_commit_timer(NewState), infinity}. reply(Reply, NewState = #dqstate { current_dirty = true }) -> - {reply, Reply, NewState, 0}; + {reply, Reply, start_commit_timer(NewState), 0}; reply(Reply, NewState) -> - {reply, Reply, NewState, infinity}. + {reply, Reply, stop_commit_timer(NewState), infinity}. form_filename(Name) -> filename:join(base_directory(), Name). @@ -707,6 +703,18 @@ sequence_lookup(Sequences, Q) -> {ReadSeqId, WriteSeqId, Length} end. +start_commit_timer(State = #dqstate { timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), + State #dqstate { timer_ref = TRef }; +start_commit_timer(State) -> + State. + +stop_commit_timer(State = #dqstate { timer_ref = undefined }) -> + State; +stop_commit_timer(State = #dqstate { timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #dqstate { timer_ref = undefined }. + sync_current_file_handle(State = #dqstate { current_dirty = false, on_sync_froms = [] }) -> State; |