diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-08 17:51:04 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-08 17:51:04 +0100 |
commit | e69eb1bd1765c36fa7d8f4519ae3c8616dd7e10c (patch) | |
tree | ae99c0a8e3ec6e87fd6b80fbaf034bd735b60002 | |
parent | a88163e4a07fbbdef2010c0308ce7d7ab9dd5c40 (diff) | |
download | rabbitmq-server-bug23233.tar.gz |
Remove all traces of contiguous_top and do necessary refactorings. Manually verified by various stressful invocations of McM that both branches of the case in combine_files are reached and work correctly. Code is substantially simplerbug23233
-rw-r--r-- | src/rabbit_msg_store.erl | 118 |
1 files changed, 44 insertions, 74 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a9c7db76..18810833 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -98,8 +98,7 @@ }). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size, - locked, readers}). + {file, valid_total_size, left, right, file_size, locked, readers}). %%---------------------------------------------------------------------------- @@ -159,8 +158,7 @@ %% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: -%% {File, ValidTotalSize, ContiguousTop, Left, Right, -%% FileSize, Locked, Readers} +%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} %% %% The basic idea is that messages are appended to the current file up %% until that file becomes too big (> file_size_limit). At that point, @@ -176,9 +174,7 @@ %% %% As messages are removed from files, holes appear in these %% files. The field ValidTotalSize contains the total amount of useful -%% data left in the file, whilst ContiguousTop contains the amount of -%% valid data right at the start of each file. These are needed for -%% garbage collection. +%% data left in the file. This is needed for garbage collection. %% %% When we discover that a file is now empty, we delete it. When we %% discover that it can be combined with the useful data in either its @@ -224,9 +220,7 @@ %% above B (i.e. truncate to the limit of the good contiguous region %% at the start of the file), then write C and D on top and then write %% E, F and G from the right file on top. Thus contiguous blocks of -%% good data at the bottom of files are not rewritten (yes, this is -%% the data the size of which is tracked by the ContiguousTop -%% variable. Judicious use of a mirror is required). +%% good data at the bottom of files are not rewritten. %% %% +-------+ +-------+ +-------+ %% | X | | G | | G | @@ -628,20 +622,14 @@ handle_cast({write, Guid}, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, right = undefined, locked = false, file_size = FileSize }] = ets:lookup(FileSummaryEts, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = case CurOffset =:= ContiguousTop of - true -> ValidTotalSize1; - false -> ContiguousTop - end, true = ets:update_element( FileSummaryEts, CurFile, [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}, {#file_summary.file_size, FileSize + TotalSize}]), NextOffset = CurOffset + TotalSize, noreply( @@ -902,8 +890,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = - index_lookup(Guid, State), + total_size = TotalSize } = index_lookup(Guid, State), case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because @@ -911,7 +898,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, %% msg. ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of @@ -919,12 +905,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, add_to_pending_gc_completion({remove, Guid}, State); false -> ok = index_delete(Guid, State), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}]), + true = + ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}]), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; @@ -1271,16 +1256,17 @@ scan_file_for_valid_messages(Dir, FileName) -> %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. -find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []). +drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0). -find_contiguous_block_prefix([], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}; -find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], - ExpectedOffset, Guids) -> +drop_contiguous_block_prefix([], ExpectedOffset) -> + {ExpectedOffset, []}; +drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset, + total_size = TotalSize } | Tail], + ExpectedOffset) -> ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}. + drop_contiguous_block_prefix(Tail, ExpectedOffset1); +drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) -> + {ExpectedOffset, MsgsAfterGap}. build_index(true, _StartupFunState, State = #msstate { file_summary_ets = FileSummaryEts }) -> @@ -1356,9 +1342,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, {VMAcc, VTSAcc} end end, {[], 0}, Messages), - %% foldl reverses lists, find_contiguous_block_prefix needs - %% msgs eldest first, so, ValidMessages is the right way round - {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), {Right, FileSize1} = case Files of %% if it's the last file, we'll truncate to remove any @@ -1375,7 +1358,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, ok = gatherer:in(Gatherer, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, left = Left, right = Right, file_size = FileSize1, @@ -1403,7 +1385,6 @@ maybe_roll_to_new_file( true = ets:insert_new(FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, - contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, @@ -1530,7 +1511,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> true = ets:update_element( FileSummaryEts, DstFile, [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; false -> concurrent_readers @@ -1541,7 +1521,6 @@ combine_files(#file_summary { file = Source, left = Destination }, #file_summary { file = Destination, valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), @@ -1557,41 +1536,32 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - case DestinationContiguousTop =:= DestinationValid of - true -> - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); - false -> - {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset =/= DestinationContiguousTop -> - %% it cannot be that Offset =:= - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - end, DestinationWorkList), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:delete(TmpHdl) + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), + {DestinationContiguousTop, DestinationWorkListTail} = + drop_contiguous_block_prefix(DestinationWorkList), + case DestinationWorkListTail of + [] -> ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize); + _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), + ok = copy_messages( + DestinationWorkListTail, DestinationContiguousTop, + DestinationValid, DestinationHdl, TmpHdl, Destination, + State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), |