diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-18 13:27:42 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-18 13:27:42 +0100 |
commit | 61405ee6e1a2a02189f77ceddebfc471d917956c (patch) | |
tree | ded5d760a4fba08275f573c5ed10ae0129ea4360 | |
parent | dcc60acba1f22da0497534f5227677c7cb4b8228 (diff) | |
download | rabbitmq-server-61405ee6e1a2a02189f77ceddebfc471d917956c.tar.gz |
well, I think it works, but it's now much much slower.
-rw-r--r-- | src/rabbit_disk_queue.erl | 171 |
1 files changed, 100 insertions, 71 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 3370ef84..1e2226bb 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -44,7 +44,7 @@ dump_queue/1, delete_non_durable_queues/1, auto_ack_next_message/1 ]). --export([length/1]). +-export([length/1, filesync/0]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -68,6 +68,8 @@ -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(SYNC_INTERVAL, 5). %% milliseconds + -record(dqstate, {msg_location_dets, %% where are messages? msg_location_ets, %% as above, but for ets version @@ -82,7 +84,9 @@ %% since the last fsync? file_size_limit, %% how big can our files get? read_file_handles, %% file handles for reading (LRU) - read_file_handles_limit %% how many file handles can we open? + read_file_handles_limit, %% how many file handles can we open? + on_sync_functions, %% list of functions to run on sync (reversed) + timer_ref %% TRef for our interval timer }). %% The components: @@ -260,6 +264,7 @@ -spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(to_disk_only_mode/0 :: () -> 'ok'). -spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(filesync/0 :: () -> 'ok'). -endif. @@ -322,14 +327,17 @@ stop_and_obliterate() -> gen_server2:call(?SERVER, stop_vaporise, infinity). to_disk_only_mode() -> - gen_server2:pcall(?SERVER, 10, to_disk_only_mode, infinity). + gen_server2:pcall(?SERVER, 9, to_disk_only_mode, infinity). to_ram_disk_mode() -> - gen_server2:pcall(?SERVER, 10, to_ram_disk_mode, infinity). + gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity). length(Q) -> gen_server2:call(?SERVER, {length, Q}, infinity). +filesync() -> + gen_server2:pcast(?SERVER, 10, filesync). + %% ---- GEN-SERVER INTERNAL API ---- init([FileSizeLimit, ReadFileHandlesLimit]) -> @@ -368,6 +376,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + {ok, TRef} = timer:apply_interval(?SYNC_INTERVAL, ?MODULE, filesync, []), + InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location_dets = MsgLocationDets, @@ -384,7 +394,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> current_dirty = false, file_size_limit = FileSizeLimit, read_file_handles = {dict:new(), gb_trees:empty()}, - read_file_handles_limit = ReadFileHandlesLimit + read_file_handles_limit = ReadFileHandlesLimit, + on_sync_functions = [], + timer_ref = TRef }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -406,20 +418,20 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> handle_call({publish, Q, MsgId, MsgBody}, _From, State) -> {ok, MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, true, State), - {reply, MsgSeqId, State1}; + reply(MsgSeqId, State1); handle_call({deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, true, false, State), - {reply, Result, State1}; + reply(Result, State1); handle_call({phantom_deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, false, false, State), - {reply, Result, State1}; -handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> + reply(Result, State1); +handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}), - {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State), - {reply, ok, State1}; + {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State), + noreply(State1); handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), - {reply, Count, State1}; + reply(Count, State1); handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(stop_vaporise, _From, State) -> @@ -436,7 +448,7 @@ handle_call(stop_vaporise, _From, State) -> %% gen_server now calls terminate, which then calls shutdown handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) -> - {reply, ok, State}; + reply(ok, State); handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk, msg_location_dets = MsgLocationDets, @@ -446,10 +458,10 @@ handle_call(to_disk_only_mode, _From, disc_only_copies), ok = dets:from_ets(MsgLocationDets, MsgLocationEts), true = ets:delete_all_objects(MsgLocationEts), - {reply, ok, State #dqstate { operation_mode = disk_only }}; + reply(ok, State #dqstate { operation_mode = disk_only }); handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) -> - {reply, ok, State}; + reply(ok, State); handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only, msg_location_dets = MsgLocationDets, @@ -459,46 +471,50 @@ handle_call(to_ram_disk_mode, _From, disc_copies), true = ets:from_dets(MsgLocationEts, MsgLocationDets), ok = dets:delete_all_objects(MsgLocationDets), - {reply, ok, State #dqstate { operation_mode = ram_disk }}; + reply(ok, State #dqstate { operation_mode = ram_disk }); handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), - {reply, Length, State}; + reply(Length, State); handle_call({dump_queue, Q}, _From, State) -> {Result, State1} = internal_dump_queue(Q, State), - {reply, Result, State1}; + reply(Result, State1); handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), - {reply, ok, State1}. + reply(ok, State1). handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, _MsgSeqId, State1} = internal_publish(Q, MsgId, next, MsgBody, false, State), - {noreply, State1}; + noreply(State1); handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), - {noreply, State1}; + noreply(State1); handle_cast({auto_ack_next_message, Q}, State) -> {ok, State1} = internal_auto_ack(Q, State), - {noreply, State1}; + noreply(State1); handle_cast({tx_publish, MsgId, MsgBody}, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), - {noreply, State1}; + noreply(State1); handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), - {noreply, State1}; + noreply(State1); handle_cast({requeue, Q, MsgSeqIds}, State) -> MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, {next, true}}), {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), - {noreply, State1}; + noreply(State1); handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), - {noreply, State1}; + noreply(State1); handle_cast({delete_queue, Q}, State) -> {ok, State1} = internal_delete_queue(Q, State), - {noreply, State1}. + noreply(State1); +handle_cast(filesync, State) -> + noreply(sync_current_file_handle(State)). handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; +handle_info(timeout, State = #dqstate { current_dirty = true }) -> + noreply(sync_current_file_handle(State)); handle_info(_Info, State) -> {noreply, State}. @@ -508,16 +524,18 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, current_file_handle = FileHdl, - read_file_handles = {ReadHdls, _ReadHdlsAge} + read_file_handles = {ReadHdls, _ReadHdlsAge}, + timer_ref = TRef }) -> %% deliberately ignoring return codes here + timer:cancel(TRef), dets:close(MsgLocationDets), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), true = ets:delete_all_objects(MsgLocationEts), case FileHdl of undefined -> ok; - _ -> file:sync(FileHdl), + _ -> sync_current_file_handle(State), file:close(FileHdl) end, dict:fold(fun (_File, Hdl, _Acc) -> @@ -525,13 +543,25 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, end, ok, ReadHdls), State #dqstate { current_file_handle = undefined, current_dirty = false, - read_file_handles = {dict:new(), gb_trees:empty()}}. + read_file_handles = {dict:new(), gb_trees:empty()}, + timer_ref = undefined + }. code_change(_OldVsn, State, _Extra) -> {ok, State}. %% ---- UTILITY FUNCTIONS ---- +noreply(NewState = #dqstate { current_dirty = true }) -> + {noreply, NewState, 0}; +noreply(NewState) -> + {noreply, NewState, infinity}. + +reply(Reply, NewState = #dqstate { current_dirty = true }) -> + {reply, Reply, NewState, 0}; +reply(Reply, NewState) -> + {reply, Reply, NewState, infinity}. + form_filename(Name) -> filename:join(base_directory(), Name). @@ -613,14 +643,12 @@ get_read_handle(File, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit, current_file_name = CurName, - current_file_handle = CurHdl, current_dirty = IsDirty }) -> - IsDirty1 = if CurName =:= File andalso IsDirty -> - file:sync(CurHdl), - false; - true -> IsDirty - end, + State1 = if CurName =:= File andalso IsDirty -> + sync_current_file_handle(State); + true -> State + end, Now = now(), {FileHdl, ReadHdls1, ReadHdlsAge1} = case dict:find(File, ReadHdls) of @@ -644,9 +672,8 @@ get_read_handle(File, State = end, ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1), ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), - {FileHdl, State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3}, - current_dirty = IsDirty1 - }}. + {FileHdl, + State1 #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge3} }}. adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) -> ExpectedSeqId; @@ -676,6 +703,17 @@ sequence_lookup(Sequences, Q) -> {ReadSeqId, WriteSeqId, Length} end. +sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, + current_dirty = IsDirty, + on_sync_functions = Funcs + }) -> + ok = case IsDirty of + true -> file:sync(CurHdl); + false -> ok + end, + lists:map(fun (Fun) -> Fun() end, lists:reverse(Funcs)), + State #dqstate { current_dirty = false, on_sync_functions = [] }. + %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, ReadMsg, FakeDeliver, @@ -818,12 +856,10 @@ internal_tx_publish(MsgId, MsgBody, end. %% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) -internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, - State = #dqstate { current_file_handle = CurHdl, - current_file_name = CurName, - current_dirty = IsDirty, - sequences = Sequences - }) -> +internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, + State = #dqstate { sequences = Sequences, + on_sync_functions = SyncFuncs + }) -> {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of [] -> {[], undefined, undefined, undefined}; @@ -835,7 +871,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, { zip_with_tail(PubMsgSeqIds, {last, {next, next}}), InitWriteSeqId, InitReadSeqId1, InitLength} end, - {atomic, {Sync, WriteSeqId, State1}} = + {atomic, {WriteSeqId, State1}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), @@ -844,11 +880,11 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, %% it's been published, which is clearly %% nonsense. I.e. in commit, do not do things in an %% order which _could_not_ have happened. - {Sync1, WriteSeqId1} = + WriteSeqId1 = lists:foldl( fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, - {Acc, ExpectedSeqId}) -> - [{MsgId, _RefCount, File, _Offset, + ExpectedSeqId) -> + [{MsgId, _RefCount, _File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), SeqId1 = adjust_last_msg_seq_id( Q, ExpectedSeqId, SeqId, write), @@ -863,23 +899,21 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, next_seq_id = NextSeqId1 }, write), - {Acc orelse (CurName =:= File), NextSeqId1} - end, {false, PubAcc}, PubList), - + NextSeqId1 + end, PubAcc, PubList), {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), - {Sync1, WriteSeqId1, State2} + {WriteSeqId1, State2} end), true = case PubList of [] -> true; _ -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)}) end, - IsDirty1 = if IsDirty andalso Sync -> - ok = file:sync(CurHdl), - false; - true -> IsDirty - end, - {ok, State1 #dqstate { current_dirty = IsDirty1 }}. + {ok, + State1 #dqstate { on_sync_functions = [fun() -> + gen_server2:reply(From, ok) + end | SyncFuncs]} + }. %% SeqId can be 'next' internal_publish(Q, MsgId, SeqId, MsgBody, IsDelivered, State) -> @@ -1051,14 +1085,10 @@ maybe_roll_to_new_file(Offset, current_file_name = CurName, current_file_handle = CurHdl, current_file_num = CurNum, - current_dirty = IsDirty, file_summary = FileSummary } ) when Offset >= FileSizeLimit -> - ok = case IsDirty of - true -> file:sync(CurHdl); - false -> ok - end, + State1 = sync_current_file_handle(State), ok = file:close(CurHdl), NextNum = CurNum + 1, NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION, @@ -1067,13 +1097,12 @@ maybe_roll_to_new_file(Offset, ok = preallocate(NextHdl, FileSizeLimit, 0), true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}), - State1 = State #dqstate { current_file_name = NextName, - current_file_handle = NextHdl, - current_file_num = NextNum, - current_offset = 0, - current_dirty = false - }, - {ok, compact(sets:from_list([CurName]), State1)}; + State2 = State1 #dqstate { current_file_name = NextName, + current_file_handle = NextHdl, + current_file_num = NextNum, + current_offset = 0 + }, + {ok, compact(sets:from_list([CurName]), State2)}; maybe_roll_to_new_file(_, State) -> {ok, State}. |