diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-12-17 13:08:01 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-12-17 13:08:01 +0000 |
commit | 91d1eb55d90cbf16f089ed75de63a77c95b601c3 (patch) | |
tree | 976e6ee6ab8765f30db88cd108e79e18c8f87d3d | |
parent | d7424357ff5d05d3307ff491c761befdf18ad95c (diff) | |
download | rabbitmq-server-91d1eb55d90cbf16f089ed75de63a77c95b601c3.tar.gz |
Reworked the GC of msg_store so that it scans the files themselves for their content, rather than a select on ets. This bounds the time it can take (ets could have many billions of other entries in it), and also makes it simpler to make the msg_location pluggable => toke. Also reduce the msg file size to 16MB from 256MB as tests show that although max write speed drops (more fsyncs and fclose), the GC is much faster. This may go back up a bit when lazy+background GC arrives.
-rw-r--r-- | src/rabbit_msg_store.erl | 50 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 3 |
2 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b9bffef6..0702cf36 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -43,7 +43,7 @@ -define(SERVER, ?MODULE). --define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(FILE_SIZE_LIMIT, (16*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds -define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB @@ -622,13 +622,6 @@ index_delete(Key, #msstate { msg_locations = MsgLocations }) -> true = ets:delete(MsgLocations, Key), ok. -index_search_by_file(File, #msstate { msg_locations = MsgLocations }) -> - lists:sort(fun (#msg_location { offset = OffA }, - #msg_location { offset = OffB }) -> - OffA < OffB - end, ets:match_object(MsgLocations, - #msg_location { file = File, _ = '_' })). - index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) -> MatchHead = #msg_location { file = File, _ = '_' }, ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), @@ -798,8 +791,7 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. build_index([], State) -> - CurFile = State #msstate.current_file, - build_index(undefined, [CurFile], [], State); + build_index(undefined, [State #msstate.current_file], [], State); build_index(Files, State) -> build_index(undefined, Files, [], State). @@ -990,8 +982,6 @@ combine_files(#file_summary { file = Source, ok = truncate_and_extend_file(DestinationHdl, DestinationValid, ExpectedSize); true -> - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), Worklist = lists:dropwhile( fun (#msg_location { offset = Offset }) @@ -1005,7 +995,9 @@ combine_files(#file_summary { file = Source, %% that the list should be naturally sorted %% as we require, however, we need to %% enforce it anyway - end, index_search_by_file(Destination, State1)), + end, find_unremoved_messages_in_file(Destination, State1)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State1), @@ -1024,7 +1016,7 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:close(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, - SourceWorkList = index_search_by_file(Source, State1), + SourceWorkList = find_unremoved_messages_in_file(Source, State1), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State1), %% tidy up @@ -1033,6 +1025,19 @@ combine_files(#file_summary { file = Source, ok = file:delete(form_filename(Dir, SourceName)), State1. +find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> + %% Msgs here will be end-of-file at start-of-list + {ok, Messages, _FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + %% foldl will reverse so will end up with msgs in ascending offset order + lists:foldl( + fun ({MsgId, _TotalSize, _Offset}, Acc) -> + case index_lookup(MsgId, State) of + Entry = #msg_location { file = File } -> [ Entry | Acc ]; + _ -> Acc + end + end, [], Messages). + copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, State) -> {FinalOffset, BlockStart1, BlockEnd1} = @@ -1065,11 +1070,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {NextOffset, Offset, Offset + TotalSize} end end, {InitOffset, undefined, undefined}, WorkList), - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1), - {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), - ok = file_handle_cache:sync(DestinationHdl), + case WorkList of + [] -> + ok; + _ -> + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = + file_handle_cache:position(SourceHdl, BlockStart1), + {ok, BSize1} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + ok = file_handle_cache:sync(DestinationHdl) + end, ok. delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index dc81ea18..fe782049 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1005,6 +1005,9 @@ test_msg_store() -> %% this should force some sort of sync internally otherwise misread ok = msg_store_read(MsgIds1stHalf), ok = rabbit_msg_store:remove(MsgIds1stHalf), + %% restart empty + ok = stop_msg_store(), + ok = start_msg_store_empty(), %% now safe to reuse msg_ids %% push a lot of msgs in... BigCount = 100000, MsgIdsBig = lists:seq(1, BigCount), |