summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-08 17:51:04 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-08 17:51:04 +0100
commite69eb1bd1765c36fa7d8f4519ae3c8616dd7e10c (patch)
treeae99c0a8e3ec6e87fd6b80fbaf034bd735b60002
parenta88163e4a07fbbdef2010c0308ce7d7ab9dd5c40 (diff)
downloadrabbitmq-server-bug23233.tar.gz
Remove all traces of contiguous_top and do necessary refactorings. Manually verified by various stressful invocations of McM that both branches of the case in combine_files are reached and work correctly. Code is substantially simplerbug23233
-rw-r--r--src/rabbit_msg_store.erl118
1 files changed, 44 insertions, 74 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a9c7db76..18810833 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -98,8 +98,7 @@
}).
-record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size,
- locked, readers}).
+ {file, valid_total_size, left, right, file_size, locked, readers}).
%%----------------------------------------------------------------------------
@@ -159,8 +158,7 @@
%% {Guid, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which maps File to #file_summary{}:
-%% {File, ValidTotalSize, ContiguousTop, Left, Right,
-%% FileSize, Locked, Readers}
+%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
%%
%% The basic idea is that messages are appended to the current file up
%% until that file becomes too big (> file_size_limit). At that point,
@@ -176,9 +174,7 @@
%%
%% As messages are removed from files, holes appear in these
%% files. The field ValidTotalSize contains the total amount of useful
-%% data left in the file, whilst ContiguousTop contains the amount of
-%% valid data right at the start of each file. These are needed for
-%% garbage collection.
+%% data left in the file. This is needed for garbage collection.
%%
%% When we discover that a file is now empty, we delete it. When we
%% discover that it can be combined with the useful data in either its
@@ -224,9 +220,7 @@
%% above B (i.e. truncate to the limit of the good contiguous region
%% at the start of the file), then write C and D on top and then write
%% E, F and G from the right file on top. Thus contiguous blocks of
-%% good data at the bottom of files are not rewritten (yes, this is
-%% the data the size of which is tracked by the ContiguousTop
-%% variable. Judicious use of a mirror is required).
+%% good data at the bottom of files are not rewritten.
%%
%% +-------+ +-------+ +-------+
%% | X | | G | | G |
@@ -628,20 +622,14 @@ handle_cast({write, Guid},
offset = CurOffset, total_size = TotalSize },
State),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
right = undefined,
locked = false,
file_size = FileSize }] =
ets:lookup(FileSummaryEts, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
- ContiguousTop1 = case CurOffset =:= ContiguousTop of
- true -> ValidTotalSize1;
- false -> ContiguousTop
- end,
true = ets:update_element(
FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1},
{#file_summary.file_size, FileSize + TotalSize}]),
NextOffset = CurOffset + TotalSize,
noreply(
@@ -902,8 +890,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize } =
- index_lookup(Guid, State),
+ total_size = TotalSize } = index_lookup(Guid, State),
case RefCount of
1 ->
%% don't remove from CUR_FILE_CACHE_ETS_NAME here because
@@ -911,7 +898,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
%% msg.
ok = remove_cache_entry(DedupCacheEts, Guid),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
@@ -919,12 +905,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
add_to_pending_gc_completion({remove, Guid}, State);
false ->
ok = index_delete(Guid, State),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:update_element(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1}]),
+ true =
+ ets:update_element(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size, ValidTotalSize1}]),
State1 = delete_file_if_empty(File, State),
State1 #msstate { sum_valid_data = SumValid - TotalSize }
end;
@@ -1271,16 +1256,17 @@ scan_file_for_valid_messages(Dir, FileName) ->
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
-find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []).
+drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0).
-find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
- {ExpectedOffset, Guids};
-find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
- ExpectedOffset, Guids) ->
+drop_contiguous_block_prefix([], ExpectedOffset) ->
+ {ExpectedOffset, []};
+drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset,
+ total_size = TotalSize } | Tail],
+ ExpectedOffset) ->
ExpectedOffset1 = ExpectedOffset + TotalSize,
- find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]);
-find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
- {ExpectedOffset, Guids}.
+ drop_contiguous_block_prefix(Tail, ExpectedOffset1);
+drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) ->
+ {ExpectedOffset, MsgsAfterGap}.
build_index(true, _StartupFunState,
State = #msstate { file_summary_ets = FileSummaryEts }) ->
@@ -1356,9 +1342,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
{VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
- %% foldl reverses lists, find_contiguous_block_prefix needs
- %% msgs eldest first, so, ValidMessages is the right way round
- {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
{Right, FileSize1} =
case Files of
%% if it's the last file, we'll truncate to remove any
@@ -1375,7 +1358,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
ok = gatherer:in(Gatherer, #file_summary {
file = File,
valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
left = Left,
right = Right,
file_size = FileSize1,
@@ -1403,7 +1385,6 @@ maybe_roll_to_new_file(
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
- contiguous_top = 0,
left = CurFile,
right = undefined,
file_size = 0,
@@ -1530,7 +1511,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
true = ets:update_element(
FileSummaryEts, DstFile,
[{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
SrcFileSize + DstFileSize - TotalValidData;
false -> concurrent_readers
@@ -1541,7 +1521,6 @@ combine_files(#file_summary { file = Source,
left = Destination },
#file_summary { file = Destination,
valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
@@ -1557,41 +1536,32 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- case DestinationContiguousTop =:= DestinationValid of
- true ->
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
- false ->
- {DestinationWorkList, DestinationValid} =
- find_unremoved_messages_in_file(Destination, State),
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset =/= DestinationContiguousTop ->
- %% it cannot be that Offset =:=
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- end, DestinationWorkList),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and index_state has been updated to
- %% reflect the compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:delete(TmpHdl)
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
+ {DestinationContiguousTop, DestinationWorkListTail} =
+ drop_contiguous_block_prefix(DestinationWorkList),
+ case DestinationWorkListTail of
+ [] -> ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize);
+ _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
+ ok = copy_messages(
+ DestinationWorkListTail, DestinationContiguousTop,
+ DestinationValid, DestinationHdl, TmpHdl, Destination,
+ State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage
+ %% from Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:delete(TmpHdl)
end,
{SourceWorkList, SourceValid} =
find_unremoved_messages_in_file(Source, State),