summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl47
1 files changed, 25 insertions, 22 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bc68d2cd..e3aae572 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -820,15 +820,16 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State1 = case CurHdl of
undefined -> State;
_ -> State2 = internal_sync(State),
- file_handle_cache:close(CurHdl),
+ ok = file_handle_cache:close(CurHdl),
State2
end,
State3 = close_all_handles(State1),
- store_file_summary(FileSummaryEts, Dir),
- [ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
+ ok = store_file_summary(FileSummaryEts, Dir),
+ [true = ets:delete(T) ||
+ T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
- {index_module, IndexModule}], Dir),
+ ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ {index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -881,13 +882,16 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, MsgIds} | NS]
end
end, [], CTM),
- case {Syncs, CGs} of
- {[], []} -> ok;
- _ -> file_handle_cache:sync(CurHdl)
- end,
+ ok = case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
+ end,
[K() || K <- lists:reverse(Syncs)],
- [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs],
- State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }.
+ State2 = lists:foldl(
+ fun ({CRef, MsgIds}, StateN) ->
+ client_confirm(CRef, MsgIds, written, StateN)
+ end, State1, CGs),
+ State2 #msstate { on_sync = [] }.
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
@@ -1384,7 +1388,7 @@ recover_file_summary(false, _Dir) ->
recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
+ {ok, Tid} -> ok = file:delete(Path),
{true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1451,9 +1455,7 @@ scan_file_for_valid_messages(Dir, FileName) ->
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
fun scan_fun/2, []),
- %% if something really bad has happened,
- %% the close could fail, but ignore
- file_handle_cache:close(Hdl),
+ ok = file_handle_cache:close(Hdl),
Valid;
{error, enoent} -> {ok, [], 0};
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
@@ -1889,32 +1891,33 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
force_recovery(BaseDir, Store) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
- file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ ok = file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
recover_crashed_compactions(BaseDir),
ok.
foreach_file(D, Fun, Files) ->
- [Fun(filename:join(D, File)) || File <- Files].
+ [ok = Fun(filename:join(D, File)) || File <- Files].
foreach_file(D1, D2, Fun, Files) ->
- [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+ [ok = Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
transform_dir(BaseDir, Store, TransformFun) ->
Dir = filename:join(BaseDir, atom_to_list(Store)),
TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
foreach_file(Dir, TmpDir, TransformFile, FileList),
foreach_file(Dir, fun file:delete/1, FileList),
- foreach_file(TmpDir, Dir, fun file:copy/2, FileList),
+ foreach_file(TmpDir, Dir, CopyFile, FileList),
foreach_file(TmpDir, fun file:delete/1, FileList),
ok = file:del_dir(TmpDir)
end.
transform_msg_file(FileOld, FileNew, TransformFun) ->
- rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ ok = rabbit_misc:ensure_parent_dirs_exist(FileNew),
{ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
@@ -1927,6 +1930,6 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
{ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
- file_handle_cache:close(RefOld),
- file_handle_cache:close(RefNew),
+ ok = file_handle_cache:close(RefOld),
+ ok = file_handle_cache:close(RefNew),
ok.