diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 18:37:35 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-17 18:37:35 +0100 |
commit | 85e0e51aa522f698dd4e4b813b35f5d4aefa50ac (patch) | |
tree | 3a39a4687ad6de28e2d98539fc70e9a71320b0c3 | |
parent | ac55ed28110a5c51327bc62eae155071bb2fb466 (diff) | |
parent | b40e8ecf8a3f04cff949dcdb0c273e1e84cb70cb (diff) | |
download | rabbitmq-server-85e0e51aa522f698dd4e4b813b35f5d4aefa50ac.tar.gz |
Merging default into bug 15930
-rw-r--r-- | src/file_handle_cache.erl | 287 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 234 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 4 |
3 files changed, 277 insertions, 248 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 4eaccf49..fe4bdc03 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -130,17 +130,26 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([release_on_death/1, obtain/0]). +-export([obtain/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). --define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). + +%% Googling around suggests that Windows has a limit somewhere around +%% 16M, eg +%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx +%% however, it turns out that's only available through the win32 +%% API. Via the C Runtime, we have just 512: +%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx +-define(FILE_HANDLES_LIMIT_WINDOWS, 512). -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). + %%---------------------------------------------------------------------------- -record(file, @@ -168,8 +177,11 @@ -record(fhc_state, { elders, limit, - count, - obtains, + open_count, + open_pending, + obtain_limit, + obtain_count, + obtain_pending, callbacks, client_mrefs, timer_ref @@ -210,8 +222,7 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(release_on_death/1 :: (pid()) -> 'ok'). --spec(obtain/0 :: () -> 'ok'). +-spec(obtain/1 :: (pid()) -> 'ok'). -endif. @@ -301,7 +312,7 @@ append(Ref, Data) -> Size1 = Size + iolist_size(Data), Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, write_buffer_size = Size1 }, - case Limit /= infinity andalso Size1 > Limit of + case Limit =/= infinity andalso Size1 > Limit of true -> {Result, Handle3} = write_buffer(Handle2), {Result, [Handle3]}; false -> {ok, [Handle2]} @@ -375,7 +386,8 @@ copy(Src, Dest, Count) -> {ok, Count1} = Result1 -> {Result1, [SHandle #handle { offset = SOffset + Count1 }, - DHandle #handle { offset = DOffset + Count1 }]}; + DHandle #handle { offset = DOffset + Count1, + is_dirty = true }]}; Error -> {Error, [SHandle, DHandle]} end; @@ -421,28 +433,19 @@ set_maximum_since_use(MaximumAge) -> fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> Age = timer:now_diff(Now, Then), - case Hdl /= closed andalso Age >= MaximumAge of - true -> {Res, Handle1} = soft_close(Handle), - case Res of - ok -> put({Ref, fhc_handle}, Handle1), - false; - _ -> put_handle(Ref, Handle1), - Rep - end; + case Hdl =/= closed andalso Age >= MaximumAge of + true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; (_KeyValuePair, Rep) -> Rep - end, true, get()) of - true -> age_tree_change(), ok; - false -> ok + end, false, get()) of + false -> age_tree_change(), ok; + true -> ok end. -release_on_death(Pid) when is_pid(Pid) -> - gen_server:cast(?SERVER, {release_on_death, Pid}). - -obtain() -> - gen_server:call(?SERVER, obtain, infinity). +obtain(Pid) -> + gen_server:call(?SERVER, {obtain, Pid}, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -515,20 +518,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) -> age_tree_update(Then, Now, Ref), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). -with_age_tree(Fun) -> - put(fhc_age_tree, Fun(case get(fhc_age_tree) of - undefined -> gb_trees:empty(); - AgeTree -> AgeTree - end)). +with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())). + +get_age_tree() -> + case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end. + +put_age_tree(Tree) -> put(fhc_age_tree, Tree). age_tree_insert(Now, Ref) -> - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - gen_server:cast(?SERVER, {open, self(), Oldest}), - Tree1 - end). + Tree = get_age_tree(), + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + case gen_server:call(?SERVER, {open, self(), Oldest, + not gb_trees:is_empty(Tree)}, infinity) of + ok -> + put_age_tree(Tree1); + close -> + [soft_close(Ref1, Handle1) || + {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), + Hdl1 =/= closed], + age_tree_insert(Now, Ref) + end. age_tree_update(Then, Now, Ref) -> with_age_tree( @@ -567,6 +580,8 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> new -> Mode; reopen -> [read | Mode] end, + Now = now(), + age_tree_insert(Now, Ref), case file:open(Path, Mode1) of {ok, Hdl} -> WriteBufferSize = @@ -575,7 +590,6 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> infinity -> infinity; N when is_integer(N) -> N end, - Now = now(), Handle = #handle { hdl = Hdl, offset = 0, trusted_offset = 0, @@ -593,12 +607,21 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), Handle2 = Handle1 #handle { trusted_offset = Offset1 }, put({Ref, fhc_handle}, Handle2), - age_tree_insert(Now, Ref), {ok, Handle2}; {error, Reason} -> + age_tree_delete(Now), {error, Reason} end. +soft_close(Ref, Handle) -> + {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + true; + _ -> put_handle(Ref, Handle1), + false + end. + soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> @@ -701,18 +724,63 @@ init([]) -> _ -> ulimit() end, - error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), - {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [], callbacks = dict:new(), - client_mrefs = dict:new(), timer_ref = undefined }}. - -handle_call(obtain, From, State = #fhc_state { count = Count }) -> - State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = - maybe_reduce(State #fhc_state { count = Count + 1 }), - case Limit /= infinity andalso Count1 >= Limit of - true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], - count = Count1 - 1 }}; - false -> {reply, ok, State1} + ObtainLimit = case Limit of + infinity -> infinity; + _ -> ?OBTAIN_LIMIT(Limit) + end, + error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", + [Limit, ObtainLimit]), + {ok, #fhc_state { elders = dict:new(), + limit = Limit, + open_count = 0, + open_pending = [], + obtain_limit = ObtainLimit, + obtain_count = 0, + obtain_pending = [], + callbacks = dict:new(), + client_mrefs = dict:new(), + timer_ref = undefined }}. + +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, + obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) + when Limit =/= infinity andalso Count >= Limit -> + {noreply, + State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], + elders = dict:erase(Pid, Elders) }}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + obtain_pending = Pending, + elders = Elders }) -> + case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of + {true, State1} -> + {noreply, State1 #fhc_state { + obtain_count = Count, + obtain_pending = [{obtain, Pid, From} | Pending], + elders = dict:erase(Pid, Elders) }}; + {false, State1} -> + _MRef = erlang:monitor(process, Pid), + {reply, ok, State1} + end; + +handle_call({open, Pid, EldestUnusedSince, CanClose}, From, + State = #fhc_state { open_count = Count, + open_pending = Pending, + elders = Elders }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + case maybe_reduce( + ensure_mref(Pid, State #fhc_state { open_count = Count + 1, + elders = Elders1 })) of + {true, State1} -> + State2 = State1 #fhc_state { open_count = Count }, + case CanClose of + true -> {reply, close, State2}; + false -> {noreply, State2 #fhc_state { + open_pending = [{open, From} | Pending], + elders = dict:erase(Pid, Elders1) }} + end; + {false, State1} -> + {reply, ok, State1} end. handle_cast({register_callback, Pid, MFA}, @@ -721,47 +789,37 @@ handle_cast({register_callback, Pid, MFA}, Pid, State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) })}; -handle_cast({open, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - {noreply, maybe_reduce( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count + 1 }))}; - handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + #fhc_state { elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> + #fhc_state { elders = Elders, open_count = Count }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - {noreply, process_obtains( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count - 1 }))}; + {noreply, process_pending( + ensure_mref(Pid, State #fhc_state { open_count = Count - 1, + elders = Elders1 }))}; handle_cast(check_counts, State) -> - {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; - -handle_cast({release_on_death, Pid}, State) -> - _MRef = erlang:monitor(process, Pid), - {noreply, State}. + {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), + {noreply, State1}. handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { count = Count, callbacks = Callbacks, + #fhc_state { obtain_count = Count, callbacks = Callbacks, client_mrefs = ClientMRefs, elders = Elders }) -> - {noreply, process_obtains( + {noreply, process_pending( case dict:find(Pid, ClientMRefs) of {ok, MRef} -> State #fhc_state { elders = dict:erase(Pid, Elders), client_mrefs = dict:erase(Pid, ClientMRefs), callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { count = Count - 1 } + _ -> State #fhc_state { obtain_count = Count - 1 } end)}. terminate(_Reason, State) -> @@ -774,23 +832,62 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- -process_obtains(State = #fhc_state { obtains = [] }) -> +process_pending(State = #fhc_state { limit = infinity }) -> State; -process_obtains(State = #fhc_state { limit = Limit, count = Count }) - when Limit /= infinity andalso Count >= Limit -> - State; -process_obtains(State = #fhc_state { limit = Limit, count = Count, - obtains = Obtains }) -> - ObtainsLen = length(Obtains), - ObtainableLen = lists:min([ObtainsLen, Limit - Count]), - Take = ObtainsLen - ObtainableLen, - {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), - [gen_server:reply(From, ok) || From <- ObtainableRev], - State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. - -maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, - callbacks = Callbacks, timer_ref = TRef }) - when Limit /= infinity andalso Count >= Limit -> +process_pending(State) -> + process_obtain(process_open(State)). + +process_open(State = #fhc_state { limit = Limit, + open_pending = Pending, + open_count = OpenCount, + obtain_count = ObtainCount }) -> + {Pending1, Inc} = + process_pending(Pending, Limit - (ObtainCount + OpenCount)), + State #fhc_state { open_pending = Pending1, + open_count = OpenCount + Inc }. + +process_obtain(State = #fhc_state { limit = Limit, + obtain_pending = Pending, + obtain_limit = ObtainLimit, + obtain_count = ObtainCount, + open_count = OpenCount }) -> + Quota = lists:min([ObtainLimit - ObtainCount, + Limit - (ObtainCount + OpenCount)]), + {Pending1, Inc} = process_pending(Pending, Quota), + State #fhc_state { obtain_pending = Pending1, + obtain_count = ObtainCount + Inc }. + +process_pending([], _Quota) -> + {[], 0}; +process_pending(Pending, Quota) when Quota =< 0 -> + {Pending, 0}; +process_pending(Pending, Quota) -> + PendingLen = length(Pending), + SatisfiableLen = lists:min([PendingLen, Quota]), + Take = PendingLen - SatisfiableLen, + {PendingNew, SatisfiableRev} = lists:split(Take, Pending), + [run_pending_item(Item) || Item <- SatisfiableRev], + {PendingNew, SatisfiableLen}. + +run_pending_item({open, From}) -> + gen_server:reply(From, ok); +run_pending_item({obtain, Pid, From}) -> + _MRef = erlang:monitor(process, Pid), + gen_server:reply(From, ok). + +maybe_reduce(State = #fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending, + elders = Elders, + callbacks = Callbacks, + timer_ref = TRef }) + when Limit =/= infinity andalso + (((OpenCount + ObtainCount) > Limit) orelse + (OpenPending =/= []) orelse + (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> @@ -810,26 +907,24 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, end end, Pids) end, + AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State #fhc_state { timer_ref = TRef1 }; - _ -> State + {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; + _ -> {AboveLimit, State} end; maybe_reduce(State) -> - State. + {false, State}. -%% Googling around suggests that Windows has a limit somewhere around -%% 16M, eg -%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx -%% For everything else, assume ulimit exists. Further googling -%% suggests that BSDs (incl OS X), solaris and linux all agree that -%% ulimit -n is file handles +%% For all unices, assume ulimit exists. Further googling suggests +%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n +%% is file handles ulimit() -> case os:type() of {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS; + ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS; {unix, _OsName} -> %% Under Linux, Solaris and FreeBSD, ulimit is a shell %% builtin, not a command. In OS X, it's a command. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c7ebfd97..5bc1f9d5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -495,7 +495,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> +init([Server, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, @@ -506,11 +506,32 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> {ok, IndexModule} = application:get_env(msg_store_index_module), rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), - {AllCleanShutdown, IndexState, ClientRefs1} = - recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server), + AttemptFileSummaryRecovery = + case ClientRefs of + undefined -> ok = rabbit_misc:recursive_delete([Dir]), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + false; + _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + recover_crashed_compactions(Dir) + end, + %% if we found crashed compactions we trust neither the + %% file_summary nor the location index. Note the file_summary is + %% left empty here if it can't be recovered. {FileSummaryRecovered, FileSummaryEts} = - recover_file_summary(AllCleanShutdown, Dir, Server), + recover_file_summary(AttemptFileSummaryRecovery, Dir), + + {CleanShutdown, IndexState, ClientRefs1} = + recover_index_and_client_refs(IndexModule, FileSummaryRecovered, + ClientRefs, Dir, Server), + %% CleanShutdown => msg location index and file_summary both + %% recovered correctly. + true = case {FileSummaryRecovered, CleanShutdown} of + {true, false} -> ets:delete_all_objects(FileSummaryEts); + _ -> true + end, + %% CleanShutdown <=> msg location index and file_summary both + %% recovered correctly. DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, @@ -537,26 +558,14 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, - successfully_recovered = AllCleanShutdown, + successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit }, - ok = case AllCleanShutdown of - true -> ok; - false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State) - end, - - FileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), - TmpFileNames = - sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), - - %% There should be no more tmp files now, so go ahead and load the - %% whole lot - Files = [filename_to_num(FileName) || FileName <- FileNames], + %% If we didn't recover the msg location index then we need to + %% rebuild it now. {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(FileSummaryRecovered, Files, State), + build_index(CleanShutdown, StartupFunState, State), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -1038,9 +1047,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). -sort_file_names(FileNames) -> +list_sorted_file_names(Dir, Ext) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, - FileNames). + filelib:wildcard("*" ++ Ext, Dir)). %%---------------------------------------------------------------------------- %% message cache helper functions @@ -1120,15 +1129,15 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) -> - ok = rabbit_misc:recursive_delete([Dir]), - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> + {false, IndexModule:new(Dir), sets:new()}; +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> + rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), {false, IndexModule:new(Dir), sets:new()}; -recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> Fresh = fun (ErrorMsg, ErrorArgs) -> - rabbit_log:warning("~w: " ++ ErrorMsg ++ - "~nrebuilding indices from scratch~n", + rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" + "rebuilding indices from scratch~n", [Server | ErrorArgs]), {false, IndexModule:new(Dir), sets:new()} end, @@ -1142,12 +1151,12 @@ recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) -> andalso IndexModule =:= RecIndexModule) of true -> case IndexModule:recover(Dir) of {ok, IndexState1} -> - ClientRefs1 = sets:from_list(ClientRefs), - {true, IndexState1, ClientRefs1}; + {true, IndexState1, + sets:from_list(ClientRefs)}; {error, Error} -> Fresh("failed to recover index: ~p", [Error]) end; - false -> Fresh("recovery terms differ from present", []) + false -> Fresh("recovery terms differ from present", []) end end. @@ -1168,7 +1177,7 @@ store_file_summary(Tid, Dir) -> ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME), [{extended_info, [object_count]}]). -recover_file_summary(false, _Dir, _Server) -> +recover_file_summary(false, _Dir) -> %% TODO: the only reason for this to be an *ordered*_set is so %% that a) maybe_compact can start a traversal from the eldest %% file, and b) build_index in fast recovery mode can easily @@ -1178,15 +1187,12 @@ recover_file_summary(false, _Dir, _Server) -> %% ditching the latter would be neater. {false, ets:new(rabbit_msg_store_file_summary, [ordered_set, public, {keypos, #file_summary.file}])}; -recover_file_summary(true, Dir, Server) -> +recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of - {ok, Tid} -> file:delete(Path), + {ok, Tid} -> file:delete(Path), {true, Tid}; - {error, Error} -> rabbit_log:warning( - "~w: failed to recover file summary: ~p~n" - "rebuilding~n", [Server, Error]), - recover_file_summary(false, Dir, Server) + {error, _Error} -> recover_file_summary(false, Dir) end. count_msg_refs(Gen, Seed, State) -> @@ -1199,6 +1205,7 @@ count_msg_refs(Gen, Seed, State) -> ok = case index_lookup(Guid, State) of not_found -> index_insert(#msg_location { guid = Guid, + file = undefined, ref_count = Delta }, State); #msg_location { ref_count = RefCount } = StoreEntry -> @@ -1213,7 +1220,9 @@ count_msg_refs(Gen, Seed, State) -> count_msg_refs(Gen, Next, State) end. -recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> +recover_crashed_compactions(Dir) -> + FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION), + TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP), lists:foreach( fun (TmpFileName) -> NonTmpRelatedFileName = @@ -1222,103 +1231,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> ok = recover_crashed_compaction( Dir, TmpFileName, NonTmpRelatedFileName) end, TmpFileNames), - ok. + TmpFileNames == []. recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> - {ok, UncorruptedMessagesTmp, GuidsTmp} = - scan_file_for_valid_messages_and_guids(Dir, TmpFileName), - {ok, UncorruptedMessages, Guids} = - scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), - %% 1) It's possible that everything in the tmp file is also in the - %% main file such that the main file is (prefix ++ - %% tmpfile). This means that compaction failed immediately - %% prior to the final step of deleting the tmp file. Plan: just - %% delete the tmp file - %% 2) It's possible that everything in the tmp file is also in the - %% main file but with holes throughout (or just somthing like - %% main = (prefix ++ hole ++ tmpfile)). This means that - %% compaction wrote out the tmp file successfully and then - %% failed. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 3) It's possible that everything in the tmp file is also in the - %% main file but such that the main file does not end with tmp - %% file (and there are valid messages in the suffix; main = - %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This - %% means that compaction failed as we were writing out the tmp - %% file. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 4) It's possible that there are messages in the tmp file which - %% are not in the main file. This means that writing out the - %% tmp file succeeded, but then we failed as we were copying - %% them back over to the main file, after truncating the main - %% file. As the main file has already been truncated, it should - %% consist only of valid messages. Plan: Truncate the main file - %% back to before any of the files in the tmp file and copy - %% them over again - TmpPath = form_filename(Dir, TmpFileName), - case is_sublist(GuidsTmp, Guids) of - true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file - %% is empty - ok = file:delete(TmpPath); - false -> - %% We're in case 4 above. We only care about the inital - %% msgs in main file that are not in the tmp file. If - %% there are no msgs in the tmp file then we would be in - %% the 'true' branch of this case, so we know the - %% lists:last call is safe. - EldestTmpGuid = lists:last(GuidsTmp), - {Guids1, UncorruptedMessages1} - = case lists:splitwith( - fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of - {_Guids, []} -> %% no msgs from tmp in main - {Guids, UncorruptedMessages}; - {Dropped, [EldestTmpGuid | Rest]} -> - %% Msgs in Dropped are in tmp, so forget them. - %% *cry*. Lists indexed from 1. - {Rest, lists:sublist(UncorruptedMessages, - 2 + length(Dropped), - length(Rest))} - end, - %% The main file prefix should be contiguous - {Top, Guids1} = find_contiguous_block_prefix( - lists:reverse(UncorruptedMessages1)), - %% we should have that none of the messages in the prefix - %% are in the tmp file - true = is_disjoint(Guids1, GuidsTmp), - %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - [read | ?WRITE_MODE]), - %% Wipe out any rubbish at the end of the file. Remember - %% the head of the list will be the highest entry in the - %% file. - [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize, - %% Extend the main file as big as necessary in a single - %% move. If we run out of disk space, this truncate could - %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), - {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), - ok = file_handle_cache:close(MainHdl), - ok = file_handle_cache:delete(TmpHdl), - - {ok, _MainMessages, GuidsMain} = - scan_file_for_valid_messages_and_guids( - Dir, NonTmpRelatedFileName), - %% check that everything in Guids1 is in GuidsMain - true = is_sublist(Guids1, GuidsMain), - %% check that everything in GuidsTmp is in GuidsMain - true = is_sublist(GuidsTmp, GuidsMain) - end, + %% Because a msg can legitimately appear multiple times in the + %% same file, identifying the contents of the tmp file and where + %% they came from is non-trivial. If we are recovering a crashed + %% compaction then we will be rebuilding the index, which can cope + %% with duplicates appearing. Thus the simplest and safest thing + %% to do is to append the contents of the tmp file to its main + %% file. + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE), + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, + ?READ_MODE ++ ?WRITE_MODE), + {ok, _End} = file_handle_cache:position(MainHdl, eof), + Size = filelib:file_size(form_filename(Dir, TmpFileName)), + {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size), + ok = file_handle_cache:close(MainHdl), + ok = file_handle_cache:delete(TmpHdl), ok. -is_sublist(SmallerL, BiggerL) -> - lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL). - -is_disjoint(SmallerL, BiggerL) -> - lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). - scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( @@ -1332,10 +1264,6 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_file_for_valid_messages_and_guids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), - {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. - %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -1350,8 +1278,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(true, _Files, State = #msstate { - file_summary_ets = FileSummaryEts }) -> +build_index(true, _StartupFunState, + State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, file_size = FileSize, @@ -1363,12 +1291,17 @@ build_index(true, _Files, State = #msstate { sum_file_size = SumFileSize + FileSize, current_file = File }} end, {0, State}, FileSummaryEts); -build_index(false, Files, State) -> +build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, + State = #msstate { dir = Dir }) -> + ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), {ok, Pid} = gatherer:start_link(), - case Files of - [] -> build_index(Pid, undefined, [State #msstate.current_file], State); - _ -> {Offset, State1} = build_index(Pid, undefined, Files, State), - {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} + case [filename_to_num(FileName) || + FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of + [] -> build_index(Pid, undefined, [State #msstate.current_file], + State); + Files -> {Offset, State1} = build_index(Pid, undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, + State1, Files)} end. build_index(Gatherer, Left, [], @@ -1409,14 +1342,14 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, lists:foldl( fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case index_lookup(Guid, State) of - not_found -> - {VMAcc, VTSAcc}; - StoreEntry -> + #msg_location { file = undefined } = StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, total_size = TotalSize }, State), - {[Obj | VMAcc], VTSAcc + TotalSize} + {[Obj | VMAcc], VTSAcc + TotalSize}; + _ -> + {VMAcc, VTSAcc} end end, {[], 0}, Messages), %% foldl reverses lists, find_contiguous_block_prefix needs @@ -1671,9 +1604,10 @@ find_unremoved_messages_in_file(File, {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) -> + lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of - #msg_location { file = File } = Entry -> + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> {[ Entry | List ], TotalSize + Size}; _ -> Acc diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index cc4982c9..11ce6fc5 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,6 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> + ok = file_handle_cache:obtain(self()), accept(State); handle_cast(_Msg, State) -> @@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) + file_handle_cache:obtain(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> - ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} |