summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-15 13:55:00 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-15 13:55:00 +0100
commitc5ed6b0741a24142869d02acc87c33ab52fc33cf (patch)
treea655fe41f23e22c128b5bce29d346bef756f2207
parent5b5740227ebe2b6304117f65f14ebb31dfbff981 (diff)
downloadrabbitmq-server-c5ed6b0741a24142869d02acc87c33ab52fc33cf.tar.gz
Refactorings and improvents to msg_store startup
-rw-r--r--src/rabbit_msg_store.erl68
1 files changed, 39 insertions, 29 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index ccf8c189..242c1fb8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -506,12 +506,15 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
- FileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- TmpFileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
FoundCrashedCompactions =
- recover_crashed_compactions(Dir, FileNames, TmpFileNames),
+ case ClientRefs of
+ undefined -> %% we're going to wipe everything anyway
+ false;
+ _ ->
+ FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
+ recover_crashed_compactions(Dir, FileNames, TmpFileNames)
+ end,
%% if we found crashed compactions we trust neither the
%% file_summary nor the location index. Note the file_summary is
@@ -572,9 +575,8 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
%% recover_index_and_client_refs could have wiped out all files,
%% so need to look again at what files we have to scan
- FileNames1 =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- Files = [filename_to_num(FileName) || FileName <- FileNames1],
+ Files = [filename_to_num(FileName) ||
+ FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)],
{Offset, State1 = #msstate { current_file = CurFile }} =
build_index(AllCleanShutdown, Files, State),
@@ -974,6 +976,7 @@ safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
%% file helper functions
%%----------------------------------------------------------------------------
+
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
[{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
@@ -1058,6 +1061,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
+list_sorted_file_names(Dir, Ext) ->
+ sort_file_names(filelib:wildcard("*" ++ Ext, Dir)).
+
sort_file_names(FileNames) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
FileNames).
@@ -1145,10 +1151,8 @@ recover_index_and_client_refs(IndexModule, _AttemptRecovery, undefined, Dir,
ok = rabbit_misc:recursive_delete([Dir]),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{false, IndexModule:new(Dir), sets:new()};
-recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, _Server) ->
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- {false, IndexModule:new(Dir), sets:new()};
-recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
+recover_index_and_client_refs(IndexModule, AttemptRecovery, ClientRefs, Dir,
+ Server) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("~w: " ++ ErrorMsg ++
@@ -1156,23 +1160,29 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
[Server | ErrorArgs]),
{false, IndexModule:new(Dir), sets:new()}
end,
- case read_recovery_terms(Dir) of
- {false, Error} ->
- Fresh("failed to read recovery terms: ~p", [Error]);
- {true, Terms} ->
- RecClientRefs = proplists:get_value(client_refs, Terms, []),
- RecIndexModule = proplists:get_value(index_module, Terms),
- case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
- andalso IndexModule =:= RecIndexModule) of
- true -> case IndexModule:recover(Dir) of
- {ok, IndexState1} ->
- ClientRefs1 = sets:from_list(ClientRefs),
- {true, IndexState1, ClientRefs1};
- {error, Error} ->
- Fresh("failed to recover index: ~p", [Error])
- end;
- false -> Fresh("recovery terms differ from present", [])
- end
+ case AttemptRecovery of
+ true ->
+ case read_recovery_terms(Dir) of
+ {false, Error} ->
+ Fresh("failed to read recovery terms: ~p", [Error]);
+ {true, Terms} ->
+ RecClientRefs = proplists:get_value(client_refs, Terms, []),
+ RecIndexModule = proplists:get_value(index_module, Terms),
+ case (lists:sort(ClientRefs) =:= lists:sort(RecClientRefs)
+ andalso IndexModule =:= RecIndexModule) of
+ true -> case IndexModule:recover(Dir) of
+ {ok, IndexState1} ->
+ {true, IndexState1,
+ sets:from_list(ClientRefs)};
+ {error, Error} ->
+ Fresh("failed to recover index: ~p",
+ [Error])
+ end;
+ false -> Fresh("recovery terms differ from present", [])
+ end
+ end;
+ false ->
+ Fresh("", [])
end.
store_recovery_terms(Terms, Dir) ->