summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-17 13:08:01 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-17 13:08:01 +0000
commit91d1eb55d90cbf16f089ed75de63a77c95b601c3 (patch)
tree976e6ee6ab8765f30db88cd108e79e18c8f87d3d
parentd7424357ff5d05d3307ff491c761befdf18ad95c (diff)
downloadrabbitmq-server-91d1eb55d90cbf16f089ed75de63a77c95b601c3.tar.gz
Reworked the GC of msg_store so that it scans the files themselves for their content, rather than a select on ets. This bounds the time it can take (ets could have many billions of other entries in it), and also makes it simpler to make the msg_location pluggable => toke. Also reduce the msg file size to 16MB from 256MB as tests show that although max write speed drops (more fsyncs and fclose), the GC is much faster. This may go back up a bit when lazy+background GC arrives.
-rw-r--r--src/rabbit_msg_store.erl50
-rw-r--r--src/rabbit_tests.erl3
2 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b9bffef6..0702cf36 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -43,7 +43,7 @@
-define(SERVER, ?MODULE).
--define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(FILE_SIZE_LIMIT, (16*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
@@ -622,13 +622,6 @@ index_delete(Key, #msstate { msg_locations = MsgLocations }) ->
true = ets:delete(MsgLocations, Key),
ok.
-index_search_by_file(File, #msstate { msg_locations = MsgLocations }) ->
- lists:sort(fun (#msg_location { offset = OffA },
- #msg_location { offset = OffB }) ->
- OffA < OffB
- end, ets:match_object(MsgLocations,
- #msg_location { file = File, _ = '_' })).
-
index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) ->
MatchHead = #msg_location { file = File, _ = '_' },
ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
@@ -798,8 +791,7 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
build_index([], State) ->
- CurFile = State #msstate.current_file,
- build_index(undefined, [CurFile], [], State);
+ build_index(undefined, [State #msstate.current_file], [], State);
build_index(Files, State) ->
build_index(undefined, Files, [], State).
@@ -990,8 +982,6 @@ combine_files(#file_summary { file = Source,
ok = truncate_and_extend_file(DestinationHdl,
DestinationValid, ExpectedSize);
true ->
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
Worklist =
lists:dropwhile(
fun (#msg_location { offset = Offset })
@@ -1005,7 +995,9 @@ combine_files(#file_summary { file = Source,
%% that the list should be naturally sorted
%% as we require, however, we need to
%% enforce it anyway
- end, index_search_by_file(Destination, State1)),
+ end, find_unremoved_messages_in_file(Destination, State1)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State1),
@@ -1024,7 +1016,7 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
- SourceWorkList = index_search_by_file(Source, State1),
+ SourceWorkList = find_unremoved_messages_in_file(Source, State1),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
@@ -1033,6 +1025,19 @@ combine_files(#file_summary { file = Source,
ok = file:delete(form_filename(Dir, SourceName)),
State1.
+find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case index_lookup(MsgId, State) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
@@ -1065,11 +1070,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{NextOffset, Offset, Offset + TotalSize}
end
end, {InitOffset, undefined, undefined}, WorkList),
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} = file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file_handle_cache:sync(DestinationHdl),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
ok.
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index dc81ea18..fe782049 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1005,6 +1005,9 @@ test_msg_store() ->
%% this should force some sort of sync internally otherwise misread
ok = msg_store_read(MsgIds1stHalf),
ok = rabbit_msg_store:remove(MsgIds1stHalf),
+ %% restart empty
+ ok = stop_msg_store(),
+ ok = start_msg_store_empty(), %% now safe to reuse msg_ids
%% push a lot of msgs in...
BigCount = 100000,
MsgIdsBig = lists:seq(1, BigCount),