summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-16 07:14:32 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-16 07:14:32 +0100
commit8ccf027aee330024c6c626d485798fc796a6dd6e (patch)
tree579d282d4002d011c771c2e8d476c4d2fb08d221
parent85a9de64b873e6a9d7df2047fbc0ce3143309485 (diff)
parent1d8e8a4d11ef2b7b397e366de389b6f514e46914 (diff)
downloadrabbitmq-server-8ccf027aee330024c6c626d485798fc796a6dd6e.tar.gz
bug23129
-rw-r--r--src/file_handle_cache.erl3
-rw-r--r--src/rabbit_msg_store.erl234
2 files changed, 86 insertions, 151 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 5b348580..8db5a794 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -382,7 +382,8 @@ copy(Src, Dest, Count) ->
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
- DHandle #handle { offset = DOffset + Count1 }]};
+ DHandle #handle { offset = DOffset + Count1,
+ is_dirty = true }]};
Error ->
{Error, [SHandle, DHandle]}
end;
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c7ebfd97..5bc1f9d5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -495,7 +495,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
+init([Server, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
@@ -506,11 +506,32 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
- {AllCleanShutdown, IndexState, ClientRefs1} =
- recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server),
+ AttemptFileSummaryRecovery =
+ case ClientRefs of
+ undefined -> ok = rabbit_misc:recursive_delete([Dir]),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ false;
+ _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ recover_crashed_compactions(Dir)
+ end,
+ %% if we found crashed compactions we trust neither the
+ %% file_summary nor the location index. Note the file_summary is
+ %% left empty here if it can't be recovered.
{FileSummaryRecovered, FileSummaryEts} =
- recover_file_summary(AllCleanShutdown, Dir, Server),
+ recover_file_summary(AttemptFileSummaryRecovery, Dir),
+
+ {CleanShutdown, IndexState, ClientRefs1} =
+ recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
+ ClientRefs, Dir, Server),
+ %% CleanShutdown => msg location index and file_summary both
+ %% recovered correctly.
+ true = case {FileSummaryRecovered, CleanShutdown} of
+ {true, false} -> ets:delete_all_objects(FileSummaryEts);
+ _ -> true
+ end,
+ %% CleanShutdown <=> msg location index and file_summary both
+ %% recovered correctly.
DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
@@ -537,26 +558,14 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
- successfully_recovered = AllCleanShutdown,
+ successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit
},
- ok = case AllCleanShutdown of
- true -> ok;
- false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State)
- end,
-
- FileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- TmpFileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
-
- %% There should be no more tmp files now, so go ahead and load the
- %% whole lot
- Files = [filename_to_num(FileName) || FileName <- FileNames],
+ %% If we didn't recover the msg location index then we need to
+ %% rebuild it now.
{Offset, State1 = #msstate { current_file = CurFile }} =
- build_index(FileSummaryRecovered, Files, State),
+ build_index(CleanShutdown, StartupFunState, State),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -1038,9 +1047,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-sort_file_names(FileNames) ->
+list_sorted_file_names(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
- FileNames).
+ filelib:wildcard("*" ++ Ext, Dir)).
%%----------------------------------------------------------------------------
%% message cache helper functions
@@ -1120,15 +1129,15 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) ->
- ok = rabbit_misc:recursive_delete([Dir]),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
+ {false, IndexModule:new(Dir), sets:new()};
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
+ rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
{false, IndexModule:new(Dir), sets:new()};
-recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
- rabbit_log:warning("~w: " ++ ErrorMsg ++
- "~nrebuilding indices from scratch~n",
+ rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
+ "rebuilding indices from scratch~n",
[Server | ErrorArgs]),
{false, IndexModule:new(Dir), sets:new()}
end,
@@ -1142,12 +1151,12 @@ recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
andalso IndexModule =:= RecIndexModule) of
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
- ClientRefs1 = sets:from_list(ClientRefs),
- {true, IndexState1, ClientRefs1};
+ {true, IndexState1,
+ sets:from_list(ClientRefs)};
{error, Error} ->
Fresh("failed to recover index: ~p", [Error])
end;
- false -> Fresh("recovery terms differ from present", [])
+ false -> Fresh("recovery terms differ from present", [])
end
end.
@@ -1168,7 +1177,7 @@ store_file_summary(Tid, Dir) ->
ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
[{extended_info, [object_count]}]).
-recover_file_summary(false, _Dir, _Server) ->
+recover_file_summary(false, _Dir) ->
%% TODO: the only reason for this to be an *ordered*_set is so
%% that a) maybe_compact can start a traversal from the eldest
%% file, and b) build_index in fast recovery mode can easily
@@ -1178,15 +1187,12 @@ recover_file_summary(false, _Dir, _Server) ->
%% ditching the latter would be neater.
{false, ets:new(rabbit_msg_store_file_summary,
[ordered_set, public, {keypos, #file_summary.file}])};
-recover_file_summary(true, Dir, Server) ->
+recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
+ {ok, Tid} -> file:delete(Path),
{true, Tid};
- {error, Error} -> rabbit_log:warning(
- "~w: failed to recover file summary: ~p~n"
- "rebuilding~n", [Server, Error]),
- recover_file_summary(false, Dir, Server)
+ {error, _Error} -> recover_file_summary(false, Dir)
end.
count_msg_refs(Gen, Seed, State) ->
@@ -1199,6 +1205,7 @@ count_msg_refs(Gen, Seed, State) ->
ok = case index_lookup(Guid, State) of
not_found ->
index_insert(#msg_location { guid = Guid,
+ file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
@@ -1213,7 +1220,9 @@ count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Next, State)
end.
-recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
+recover_crashed_compactions(Dir) ->
+ FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
@@ -1222,103 +1231,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
ok = recover_crashed_compaction(
Dir, TmpFileName, NonTmpRelatedFileName)
end, TmpFileNames),
- ok.
+ TmpFileNames == [].
recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
- {ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
- {ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
- %% 1) It's possible that everything in the tmp file is also in the
- %% main file such that the main file is (prefix ++
- %% tmpfile). This means that compaction failed immediately
- %% prior to the final step of deleting the tmp file. Plan: just
- %% delete the tmp file
- %% 2) It's possible that everything in the tmp file is also in the
- %% main file but with holes throughout (or just somthing like
- %% main = (prefix ++ hole ++ tmpfile)). This means that
- %% compaction wrote out the tmp file successfully and then
- %% failed. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 3) It's possible that everything in the tmp file is also in the
- %% main file but such that the main file does not end with tmp
- %% file (and there are valid messages in the suffix; main =
- %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
- %% means that compaction failed as we were writing out the tmp
- %% file. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 4) It's possible that there are messages in the tmp file which
- %% are not in the main file. This means that writing out the
- %% tmp file succeeded, but then we failed as we were copying
- %% them back over to the main file, after truncating the main
- %% file. As the main file has already been truncated, it should
- %% consist only of valid messages. Plan: Truncate the main file
- %% back to before any of the files in the tmp file and copy
- %% them over again
- TmpPath = form_filename(Dir, TmpFileName),
- case is_sublist(GuidsTmp, Guids) of
- true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file
- %% is empty
- ok = file:delete(TmpPath);
- false ->
- %% We're in case 4 above. We only care about the inital
- %% msgs in main file that are not in the tmp file. If
- %% there are no msgs in the tmp file then we would be in
- %% the 'true' branch of this case, so we know the
- %% lists:last call is safe.
- EldestTmpGuid = lists:last(GuidsTmp),
- {Guids1, UncorruptedMessages1}
- = case lists:splitwith(
- fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of
- {_Guids, []} -> %% no msgs from tmp in main
- {Guids, UncorruptedMessages};
- {Dropped, [EldestTmpGuid | Rest]} ->
- %% Msgs in Dropped are in tmp, so forget them.
- %% *cry*. Lists indexed from 1.
- {Rest, lists:sublist(UncorruptedMessages,
- 2 + length(Dropped),
- length(Rest))}
- end,
- %% The main file prefix should be contiguous
- {Top, Guids1} = find_contiguous_block_prefix(
- lists:reverse(UncorruptedMessages1)),
- %% we should have that none of the messages in the prefix
- %% are in the tmp file
- true = is_disjoint(Guids1, GuidsTmp),
- %% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- [read | ?WRITE_MODE]),
- %% Wipe out any rubbish at the end of the file. Remember
- %% the head of the list will be the highest entry in the
- %% file.
- [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize,
- %% Extend the main file as big as necessary in a single
- %% move. If we run out of disk space, this truncate could
- %% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
- {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
- ok = file_handle_cache:close(MainHdl),
- ok = file_handle_cache:delete(TmpHdl),
-
- {ok, _MainMessages, GuidsMain} =
- scan_file_for_valid_messages_and_guids(
- Dir, NonTmpRelatedFileName),
- %% check that everything in Guids1 is in GuidsMain
- true = is_sublist(Guids1, GuidsMain),
- %% check that everything in GuidsTmp is in GuidsMain
- true = is_sublist(GuidsTmp, GuidsMain)
- end,
+ %% Because a msg can legitimately appear multiple times in the
+ %% same file, identifying the contents of the tmp file and where
+ %% they came from is non-trivial. If we are recovering a crashed
+ %% compaction then we will be rebuilding the index, which can cope
+ %% with duplicates appearing. Thus the simplest and safest thing
+ %% to do is to append the contents of the tmp file to its main
+ %% file.
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
+ ?READ_MODE ++ ?WRITE_MODE),
+ {ok, _End} = file_handle_cache:position(MainHdl, eof),
+ Size = filelib:file_size(form_filename(Dir, TmpFileName)),
+ {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
+ ok = file_handle_cache:close(MainHdl),
+ ok = file_handle_cache:delete(TmpHdl),
ok.
-is_sublist(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL).
-
-is_disjoint(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
@@ -1332,10 +1264,6 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_and_guids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
- {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
-
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1350,8 +1278,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids}.
-build_index(true, _Files, State = #msstate {
- file_summary_ets = FileSummaryEts }) ->
+build_index(true, _StartupFunState,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
file_size = FileSize,
@@ -1363,12 +1291,17 @@ build_index(true, _Files, State = #msstate {
sum_file_size = SumFileSize + FileSize,
current_file = File }}
end, {0, State}, FileSummaryEts);
-build_index(false, Files, State) ->
+build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
+ State = #msstate { dir = Dir }) ->
+ ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
{ok, Pid} = gatherer:start_link(),
- case Files of
- [] -> build_index(Pid, undefined, [State #msstate.current_file], State);
- _ -> {Offset, State1} = build_index(Pid, undefined, Files, State),
- {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}
+ case [filename_to_num(FileName) ||
+ FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
+ [] -> build_index(Pid, undefined, [State #msstate.current_file],
+ State);
+ Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2,
+ State1, Files)}
end.
build_index(Gatherer, Left, [],
@@ -1409,14 +1342,14 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case index_lookup(Guid, State) of
- not_found ->
- {VMAcc, VTSAcc};
- StoreEntry ->
+ #msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
- {[Obj | VMAcc], VTSAcc + TotalSize}
+ {[Obj | VMAcc], VTSAcc + TotalSize};
+ _ ->
+ {VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
%% foldl reverses lists, find_contiguous_block_prefix needs
@@ -1671,9 +1604,10 @@ find_unremoved_messages_in_file(File,
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
+ lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
- #msg_location { file = File } = Entry ->
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
Acc