summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-17 18:37:35 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-17 18:37:35 +0100
commit85e0e51aa522f698dd4e4b813b35f5d4aefa50ac (patch)
tree3a39a4687ad6de28e2d98539fc70e9a71320b0c3
parentac55ed28110a5c51327bc62eae155071bb2fb466 (diff)
parentb40e8ecf8a3f04cff949dcdb0c273e1e84cb70cb (diff)
downloadrabbitmq-server-85e0e51aa522f698dd4e4b813b35f5d4aefa50ac.tar.gz
Merging default into bug 15930
-rw-r--r--src/file_handle_cache.erl287
-rw-r--r--src/rabbit_msg_store.erl234
-rw-r--r--src/tcp_acceptor.erl4
3 files changed, 277 insertions, 248 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 4eaccf49..fe4bdc03 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -130,17 +130,26 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([release_on_death/1, obtain/0]).
+-export([obtain/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
--define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
+
+%% Googling around suggests that Windows has a limit somewhere around
+%% 16M, eg
+%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
+%% however, it turns out that's only available through the win32
+%% API. Via the C Runtime, we have just 512:
+%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx
+-define(FILE_HANDLES_LIMIT_WINDOWS, 512).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
+-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+
%%----------------------------------------------------------------------------
-record(file,
@@ -168,8 +177,11 @@
-record(fhc_state,
{ elders,
limit,
- count,
- obtains,
+ open_count,
+ open_pending,
+ obtain_limit,
+ obtain_count,
+ obtain_pending,
callbacks,
client_mrefs,
timer_ref
@@ -210,8 +222,7 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
--spec(release_on_death/1 :: (pid()) -> 'ok').
--spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (pid()) -> 'ok').
-endif.
@@ -301,7 +312,7 @@ append(Ref, Data) ->
Size1 = Size + iolist_size(Data),
Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
write_buffer_size = Size1 },
- case Limit /= infinity andalso Size1 > Limit of
+ case Limit =/= infinity andalso Size1 > Limit of
true -> {Result, Handle3} = write_buffer(Handle2),
{Result, [Handle3]};
false -> {ok, [Handle2]}
@@ -375,7 +386,8 @@ copy(Src, Dest, Count) ->
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
- DHandle #handle { offset = DOffset + Count1 }]};
+ DHandle #handle { offset = DOffset + Count1,
+ is_dirty = true }]};
Error ->
{Error, [SHandle, DHandle]}
end;
@@ -421,28 +433,19 @@ set_maximum_since_use(MaximumAge) ->
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
Age = timer:now_diff(Now, Then),
- case Hdl /= closed andalso Age >= MaximumAge of
- true -> {Res, Handle1} = soft_close(Handle),
- case Res of
- ok -> put({Ref, fhc_handle}, Handle1),
- false;
- _ -> put_handle(Ref, Handle1),
- Rep
- end;
+ case Hdl =/= closed andalso Age >= MaximumAge of
+ true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
(_KeyValuePair, Rep) ->
Rep
- end, true, get()) of
- true -> age_tree_change(), ok;
- false -> ok
+ end, false, get()) of
+ false -> age_tree_change(), ok;
+ true -> ok
end.
-release_on_death(Pid) when is_pid(Pid) ->
- gen_server:cast(?SERVER, {release_on_death, Pid}).
-
-obtain() ->
- gen_server:call(?SERVER, obtain, infinity).
+obtain(Pid) ->
+ gen_server:call(?SERVER, {obtain, Pid}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -515,20 +518,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-with_age_tree(Fun) ->
- put(fhc_age_tree, Fun(case get(fhc_age_tree) of
- undefined -> gb_trees:empty();
- AgeTree -> AgeTree
- end)).
+with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
+
+get_age_tree() ->
+ case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end.
+
+put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_insert(Now, Ref) ->
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- gen_server:cast(?SERVER, {open, self(), Oldest}),
- Tree1
- end).
+ Tree = get_age_tree(),
+ Tree1 = gb_trees:insert(Now, Ref, Tree),
+ {Oldest, _Ref} = gb_trees:smallest(Tree1),
+ case gen_server:call(?SERVER, {open, self(), Oldest,
+ not gb_trees:is_empty(Tree)}, infinity) of
+ ok ->
+ put_age_tree(Tree1);
+ close ->
+ [soft_close(Ref1, Handle1) ||
+ {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
+ Hdl1 =/= closed],
+ age_tree_insert(Now, Ref)
+ end.
age_tree_update(Then, Now, Ref) ->
with_age_tree(
@@ -567,6 +580,8 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
new -> Mode;
reopen -> [read | Mode]
end,
+ Now = now(),
+ age_tree_insert(Now, Ref),
case file:open(Path, Mode1) of
{ok, Hdl} ->
WriteBufferSize =
@@ -575,7 +590,6 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
- Now = now(),
Handle = #handle { hdl = Hdl,
offset = 0,
trusted_offset = 0,
@@ -593,12 +607,21 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
{{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
Handle2 = Handle1 #handle { trusted_offset = Offset1 },
put({Ref, fhc_handle}, Handle2),
- age_tree_insert(Now, Ref),
{ok, Handle2};
{error, Reason} ->
+ age_tree_delete(Now),
{error, Reason}
end.
+soft_close(Ref, Handle) ->
+ {Res, Handle1} = soft_close(Handle),
+ case Res of
+ ok -> put({Ref, fhc_handle}, Handle1),
+ true;
+ _ -> put_handle(Ref, Handle1),
+ false
+ end.
+
soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
@@ -701,18 +724,63 @@ init([]) ->
_ ->
ulimit()
end,
- error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
- {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new(),
- client_mrefs = dict:new(), timer_ref = undefined }}.
-
-handle_call(obtain, From, State = #fhc_state { count = Count }) ->
- State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
- maybe_reduce(State #fhc_state { count = Count + 1 }),
- case Limit /= infinity andalso Count1 >= Limit of
- true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
- count = Count1 - 1 }};
- false -> {reply, ok, State1}
+ ObtainLimit = case Limit of
+ infinity -> infinity;
+ _ -> ?OBTAIN_LIMIT(Limit)
+ end,
+ error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
+ [Limit, ObtainLimit]),
+ {ok, #fhc_state { elders = dict:new(),
+ limit = Limit,
+ open_count = 0,
+ open_pending = [],
+ obtain_limit = ObtainLimit,
+ obtain_count = 0,
+ obtain_pending = [],
+ callbacks = dict:new(),
+ client_mrefs = dict:new(),
+ timer_ref = undefined }}.
+
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
+ when Limit =/= infinity andalso Count >= Limit ->
+ {noreply,
+ State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ {true, State1} ->
+ {noreply, State1 #fhc_state {
+ obtain_count = Count,
+ obtain_pending = [{obtain, Pid, From} | Pending],
+ elders = dict:erase(Pid, Elders) }};
+ {false, State1} ->
+ _MRef = erlang:monitor(process, Pid),
+ {reply, ok, State1}
+ end;
+
+handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
+ State = #fhc_state { open_count = Count,
+ open_pending = Pending,
+ elders = Elders }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ case maybe_reduce(
+ ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
+ elders = Elders1 })) of
+ {true, State1} ->
+ State2 = State1 #fhc_state { open_count = Count },
+ case CanClose of
+ true -> {reply, close, State2};
+ false -> {noreply, State2 #fhc_state {
+ open_pending = [{open, From} | Pending],
+ elders = dict:erase(Pid, Elders1) }}
+ end;
+ {false, State1} ->
+ {reply, ok, State1}
end.
handle_cast({register_callback, Pid, MFA},
@@ -721,47 +789,37 @@ handle_cast({register_callback, Pid, MFA},
Pid, State #fhc_state {
callbacks = dict:store(Pid, MFA, Callbacks) })};
-handle_cast({open, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- {noreply, maybe_reduce(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count + 1 }))};
-
handle_cast({update, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders }) ->
+ #fhc_state { elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
{noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
+ #fhc_state { elders = Elders, open_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_obtains(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count - 1 }))};
+ {noreply, process_pending(
+ ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
+ elders = Elders1 }))};
handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
-
-handle_cast({release_on_death, Pid}, State) ->
- _MRef = erlang:monitor(process, Pid),
- {noreply, State}.
+ {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
+ {noreply, State1}.
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { count = Count, callbacks = Callbacks,
+ #fhc_state { obtain_count = Count, callbacks = Callbacks,
client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_obtains(
+ {noreply, process_pending(
case dict:find(Pid, ClientMRefs) of
{ok, MRef} -> State #fhc_state {
elders = dict:erase(Pid, Elders),
client_mrefs = dict:erase(Pid, ClientMRefs),
callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { count = Count - 1 }
+ _ -> State #fhc_state { obtain_count = Count - 1 }
end)}.
terminate(_Reason, State) ->
@@ -774,23 +832,62 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
-process_obtains(State = #fhc_state { obtains = [] }) ->
+process_pending(State = #fhc_state { limit = infinity }) ->
State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count })
- when Limit /= infinity andalso Count >= Limit ->
- State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count,
- obtains = Obtains }) ->
- ObtainsLen = length(Obtains),
- ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
- Take = ObtainsLen - ObtainableLen,
- {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
- [gen_server:reply(From, ok) || From <- ObtainableRev],
- State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
-
-maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
- callbacks = Callbacks, timer_ref = TRef })
- when Limit /= infinity andalso Count >= Limit ->
+process_pending(State) ->
+ process_obtain(process_open(State)).
+
+process_open(State = #fhc_state { limit = Limit,
+ open_pending = Pending,
+ open_count = OpenCount,
+ obtain_count = ObtainCount }) ->
+ {Pending1, Inc} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount)),
+ State #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
+
+process_obtain(State = #fhc_state { limit = Limit,
+ obtain_pending = Pending,
+ obtain_limit = ObtainLimit,
+ obtain_count = ObtainCount,
+ open_count = OpenCount }) ->
+ Quota = lists:min([ObtainLimit - ObtainCount,
+ Limit - (ObtainCount + OpenCount)]),
+ {Pending1, Inc} = process_pending(Pending, Quota),
+ State #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+
+process_pending([], _Quota) ->
+ {[], 0};
+process_pending(Pending, Quota) when Quota =< 0 ->
+ {Pending, 0};
+process_pending(Pending, Quota) ->
+ PendingLen = length(Pending),
+ SatisfiableLen = lists:min([PendingLen, Quota]),
+ Take = PendingLen - SatisfiableLen,
+ {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
+ [run_pending_item(Item) || Item <- SatisfiableRev],
+ {PendingNew, SatisfiableLen}.
+
+run_pending_item({open, From}) ->
+ gen_server:reply(From, ok);
+run_pending_item({obtain, Pid, From}) ->
+ _MRef = erlang:monitor(process, Pid),
+ gen_server:reply(From, ok).
+
+maybe_reduce(State = #fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ callbacks = Callbacks,
+ timer_ref = TRef })
+ when Limit =/= infinity andalso
+ (((OpenCount + ObtainCount) > Limit) orelse
+ (OpenPending =/= []) orelse
+ (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) ->
Now = now(),
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
@@ -810,26 +907,24 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
end
end, Pids)
end,
+ AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- State #fhc_state { timer_ref = TRef1 };
- _ -> State
+ {AboveLimit, State #fhc_state { timer_ref = TRef1 }};
+ _ -> {AboveLimit, State}
end;
maybe_reduce(State) ->
- State.
+ {false, State}.
-%% Googling around suggests that Windows has a limit somewhere around
-%% 16M, eg
-%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
-%% For everything else, assume ulimit exists. Further googling
-%% suggests that BSDs (incl OS X), solaris and linux all agree that
-%% ulimit -n is file handles
+%% For all unices, assume ulimit exists. Further googling suggests
+%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
+%% is file handles
ulimit() ->
case os:type() of
{win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS;
+ ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
%% builtin, not a command. In OS X, it's a command.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c7ebfd97..5bc1f9d5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -495,7 +495,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
+init([Server, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
@@ -506,11 +506,32 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
- {AllCleanShutdown, IndexState, ClientRefs1} =
- recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server),
+ AttemptFileSummaryRecovery =
+ case ClientRefs of
+ undefined -> ok = rabbit_misc:recursive_delete([Dir]),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ false;
+ _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ recover_crashed_compactions(Dir)
+ end,
+ %% if we found crashed compactions we trust neither the
+ %% file_summary nor the location index. Note the file_summary is
+ %% left empty here if it can't be recovered.
{FileSummaryRecovered, FileSummaryEts} =
- recover_file_summary(AllCleanShutdown, Dir, Server),
+ recover_file_summary(AttemptFileSummaryRecovery, Dir),
+
+ {CleanShutdown, IndexState, ClientRefs1} =
+ recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
+ ClientRefs, Dir, Server),
+ %% CleanShutdown => msg location index and file_summary both
+ %% recovered correctly.
+ true = case {FileSummaryRecovered, CleanShutdown} of
+ {true, false} -> ets:delete_all_objects(FileSummaryEts);
+ _ -> true
+ end,
+ %% CleanShutdown <=> msg location index and file_summary both
+ %% recovered correctly.
DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
@@ -537,26 +558,14 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
- successfully_recovered = AllCleanShutdown,
+ successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit
},
- ok = case AllCleanShutdown of
- true -> ok;
- false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State)
- end,
-
- FileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
- TmpFileNames =
- sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
-
- %% There should be no more tmp files now, so go ahead and load the
- %% whole lot
- Files = [filename_to_num(FileName) || FileName <- FileNames],
+ %% If we didn't recover the msg location index then we need to
+ %% rebuild it now.
{Offset, State1 = #msstate { current_file = CurFile }} =
- build_index(FileSummaryRecovered, Files, State),
+ build_index(CleanShutdown, StartupFunState, State),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -1038,9 +1047,9 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-sort_file_names(FileNames) ->
+list_sorted_file_names(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
- FileNames).
+ filelib:wildcard("*" ++ Ext, Dir)).
%%----------------------------------------------------------------------------
%% message cache helper functions
@@ -1120,15 +1129,15 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, undefined, Dir, _Server) ->
- ok = rabbit_misc:recursive_delete([Dir]),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
+ {false, IndexModule:new(Dir), sets:new()};
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
+ rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
{false, IndexModule:new(Dir), sets:new()};
-recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
- rabbit_log:warning("~w: " ++ ErrorMsg ++
- "~nrebuilding indices from scratch~n",
+ rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
+ "rebuilding indices from scratch~n",
[Server | ErrorArgs]),
{false, IndexModule:new(Dir), sets:new()}
end,
@@ -1142,12 +1151,12 @@ recover_index_and_client_refs(IndexModule, ClientRefs, Dir, Server) ->
andalso IndexModule =:= RecIndexModule) of
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
- ClientRefs1 = sets:from_list(ClientRefs),
- {true, IndexState1, ClientRefs1};
+ {true, IndexState1,
+ sets:from_list(ClientRefs)};
{error, Error} ->
Fresh("failed to recover index: ~p", [Error])
end;
- false -> Fresh("recovery terms differ from present", [])
+ false -> Fresh("recovery terms differ from present", [])
end
end.
@@ -1168,7 +1177,7 @@ store_file_summary(Tid, Dir) ->
ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
[{extended_info, [object_count]}]).
-recover_file_summary(false, _Dir, _Server) ->
+recover_file_summary(false, _Dir) ->
%% TODO: the only reason for this to be an *ordered*_set is so
%% that a) maybe_compact can start a traversal from the eldest
%% file, and b) build_index in fast recovery mode can easily
@@ -1178,15 +1187,12 @@ recover_file_summary(false, _Dir, _Server) ->
%% ditching the latter would be neater.
{false, ets:new(rabbit_msg_store_file_summary,
[ordered_set, public, {keypos, #file_summary.file}])};
-recover_file_summary(true, Dir, Server) ->
+recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
- {ok, Tid} -> file:delete(Path),
+ {ok, Tid} -> file:delete(Path),
{true, Tid};
- {error, Error} -> rabbit_log:warning(
- "~w: failed to recover file summary: ~p~n"
- "rebuilding~n", [Server, Error]),
- recover_file_summary(false, Dir, Server)
+ {error, _Error} -> recover_file_summary(false, Dir)
end.
count_msg_refs(Gen, Seed, State) ->
@@ -1199,6 +1205,7 @@ count_msg_refs(Gen, Seed, State) ->
ok = case index_lookup(Guid, State) of
not_found ->
index_insert(#msg_location { guid = Guid,
+ file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
@@ -1213,7 +1220,9 @@ count_msg_refs(Gen, Seed, State) ->
count_msg_refs(Gen, Next, State)
end.
-recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
+recover_crashed_compactions(Dir) ->
+ FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
@@ -1222,103 +1231,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
ok = recover_crashed_compaction(
Dir, TmpFileName, NonTmpRelatedFileName)
end, TmpFileNames),
- ok.
+ TmpFileNames == [].
recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
- {ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
- {ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
- %% 1) It's possible that everything in the tmp file is also in the
- %% main file such that the main file is (prefix ++
- %% tmpfile). This means that compaction failed immediately
- %% prior to the final step of deleting the tmp file. Plan: just
- %% delete the tmp file
- %% 2) It's possible that everything in the tmp file is also in the
- %% main file but with holes throughout (or just somthing like
- %% main = (prefix ++ hole ++ tmpfile)). This means that
- %% compaction wrote out the tmp file successfully and then
- %% failed. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 3) It's possible that everything in the tmp file is also in the
- %% main file but such that the main file does not end with tmp
- %% file (and there are valid messages in the suffix; main =
- %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
- %% means that compaction failed as we were writing out the tmp
- %% file. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 4) It's possible that there are messages in the tmp file which
- %% are not in the main file. This means that writing out the
- %% tmp file succeeded, but then we failed as we were copying
- %% them back over to the main file, after truncating the main
- %% file. As the main file has already been truncated, it should
- %% consist only of valid messages. Plan: Truncate the main file
- %% back to before any of the files in the tmp file and copy
- %% them over again
- TmpPath = form_filename(Dir, TmpFileName),
- case is_sublist(GuidsTmp, Guids) of
- true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file
- %% is empty
- ok = file:delete(TmpPath);
- false ->
- %% We're in case 4 above. We only care about the inital
- %% msgs in main file that are not in the tmp file. If
- %% there are no msgs in the tmp file then we would be in
- %% the 'true' branch of this case, so we know the
- %% lists:last call is safe.
- EldestTmpGuid = lists:last(GuidsTmp),
- {Guids1, UncorruptedMessages1}
- = case lists:splitwith(
- fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of
- {_Guids, []} -> %% no msgs from tmp in main
- {Guids, UncorruptedMessages};
- {Dropped, [EldestTmpGuid | Rest]} ->
- %% Msgs in Dropped are in tmp, so forget them.
- %% *cry*. Lists indexed from 1.
- {Rest, lists:sublist(UncorruptedMessages,
- 2 + length(Dropped),
- length(Rest))}
- end,
- %% The main file prefix should be contiguous
- {Top, Guids1} = find_contiguous_block_prefix(
- lists:reverse(UncorruptedMessages1)),
- %% we should have that none of the messages in the prefix
- %% are in the tmp file
- true = is_disjoint(Guids1, GuidsTmp),
- %% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- [read | ?WRITE_MODE]),
- %% Wipe out any rubbish at the end of the file. Remember
- %% the head of the list will be the highest entry in the
- %% file.
- [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize,
- %% Extend the main file as big as necessary in a single
- %% move. If we run out of disk space, this truncate could
- %% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
- {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
- ok = file_handle_cache:close(MainHdl),
- ok = file_handle_cache:delete(TmpHdl),
-
- {ok, _MainMessages, GuidsMain} =
- scan_file_for_valid_messages_and_guids(
- Dir, NonTmpRelatedFileName),
- %% check that everything in Guids1 is in GuidsMain
- true = is_sublist(Guids1, GuidsMain),
- %% check that everything in GuidsTmp is in GuidsMain
- true = is_sublist(GuidsTmp, GuidsMain)
- end,
+ %% Because a msg can legitimately appear multiple times in the
+ %% same file, identifying the contents of the tmp file and where
+ %% they came from is non-trivial. If we are recovering a crashed
+ %% compaction then we will be rebuilding the index, which can cope
+ %% with duplicates appearing. Thus the simplest and safest thing
+ %% to do is to append the contents of the tmp file to its main
+ %% file.
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_MODE),
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
+ ?READ_MODE ++ ?WRITE_MODE),
+ {ok, _End} = file_handle_cache:position(MainHdl, eof),
+ Size = filelib:file_size(form_filename(Dir, TmpFileName)),
+ {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
+ ok = file_handle_cache:close(MainHdl),
+ ok = file_handle_cache:delete(TmpHdl),
ok.
-is_sublist(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL).
-
-is_disjoint(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
@@ -1332,10 +1264,6 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_and_guids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
- {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
-
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1350,8 +1278,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids}.
-build_index(true, _Files, State = #msstate {
- file_summary_ets = FileSummaryEts }) ->
+build_index(true, _StartupFunState,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
file_size = FileSize,
@@ -1363,12 +1291,17 @@ build_index(true, _Files, State = #msstate {
sum_file_size = SumFileSize + FileSize,
current_file = File }}
end, {0, State}, FileSummaryEts);
-build_index(false, Files, State) ->
+build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
+ State = #msstate { dir = Dir }) ->
+ ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
{ok, Pid} = gatherer:start_link(),
- case Files of
- [] -> build_index(Pid, undefined, [State #msstate.current_file], State);
- _ -> {Offset, State1} = build_index(Pid, undefined, Files, State),
- {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}
+ case [filename_to_num(FileName) ||
+ FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
+ [] -> build_index(Pid, undefined, [State #msstate.current_file],
+ State);
+ Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2,
+ State1, Files)}
end.
build_index(Gatherer, Left, [],
@@ -1409,14 +1342,14 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case index_lookup(Guid, State) of
- not_found ->
- {VMAcc, VTSAcc};
- StoreEntry ->
+ #msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
- {[Obj | VMAcc], VTSAcc + TotalSize}
+ {[Obj | VMAcc], VTSAcc + TotalSize};
+ _ ->
+ {VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
%% foldl reverses lists, find_contiguous_block_prefix needs
@@ -1671,9 +1604,10 @@ find_unremoved_messages_in_file(File,
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(fun ({Guid, TotalSize, _Offset}, Acc = {List, Size}) ->
+ lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
- #msg_location { file = File } = Entry ->
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
Acc
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index cc4982c9..11ce6fc5 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,6 +55,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(accept, State) ->
+ ok = file_handle_cache:obtain(self()),
accept(State);
handle_cast(_Msg, State) ->
@@ -83,7 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
%% is drained.
gen_event:which_handlers(error_logger),
%% handle
- file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
+ file_handle_cache:obtain(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -111,7 +112,6 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
- ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}