summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-17 12:54:57 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-17 12:54:57 +0100
commit64d7eebf985987a10eaeaef3f8e8111b1f9cd5bf (patch)
tree257d1e863c0fb0cfcf671bf2998b1950e888287e
parent8ccf027aee330024c6c626d485798fc796a6dd6e (diff)
parentc9ecb104b823054dfda5333d2aa8f6f019882d1d (diff)
downloadrabbitmq-server-64d7eebf985987a10eaeaef3f8e8111b1f9cd5bf.tar.gz
merge bug23132 into default
-rw-r--r--src/file_handle_cache.erl241
1 files changed, 165 insertions, 76 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 8db5a794..19b2654f 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -148,6 +148,8 @@
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
+-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)).
+
%%----------------------------------------------------------------------------
-record(file,
@@ -175,8 +177,11 @@
-record(fhc_state,
{ elders,
limit,
- count,
- obtains,
+ open_count,
+ open_pending,
+ obtain_limit,
+ obtain_count,
+ obtain_pending,
callbacks,
client_mrefs,
timer_ref
@@ -308,7 +313,7 @@ append(Ref, Data) ->
Size1 = Size + iolist_size(Data),
Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
write_buffer_size = Size1 },
- case Limit /= infinity andalso Size1 > Limit of
+ case Limit =/= infinity andalso Size1 > Limit of
true -> {Result, Handle3} = write_buffer(Handle2),
{Result, [Handle3]};
false -> {ok, [Handle2]}
@@ -429,28 +434,22 @@ set_maximum_since_use(MaximumAge) ->
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
Age = timer:now_diff(Now, Then),
- case Hdl /= closed andalso Age >= MaximumAge of
- true -> {Res, Handle1} = soft_close(Handle),
- case Res of
- ok -> put({Ref, fhc_handle}, Handle1),
- false;
- _ -> put_handle(Ref, Handle1),
- Rep
- end;
+ case Hdl =/= closed andalso Age >= MaximumAge of
+ true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
(_KeyValuePair, Rep) ->
Rep
- end, true, get()) of
- true -> age_tree_change(), ok;
- false -> ok
+ end, false, get()) of
+ false -> age_tree_change(), ok;
+ true -> ok
end.
release_on_death(Pid) when is_pid(Pid) ->
gen_server:cast(?SERVER, {release_on_death, Pid}).
obtain() ->
- gen_server:call(?SERVER, obtain, infinity).
+ gen_server:call(?SERVER, {obtain, self()}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -523,20 +522,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-with_age_tree(Fun) ->
- put(fhc_age_tree, Fun(case get(fhc_age_tree) of
- undefined -> gb_trees:empty();
- AgeTree -> AgeTree
- end)).
+with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
+
+get_age_tree() ->
+ case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end.
+
+put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_insert(Now, Ref) ->
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- gen_server:cast(?SERVER, {open, self(), Oldest}),
- Tree1
- end).
+ Tree = get_age_tree(),
+ Tree1 = gb_trees:insert(Now, Ref, Tree),
+ {Oldest, _Ref} = gb_trees:smallest(Tree1),
+ case gen_server:call(?SERVER, {open, self(), Oldest,
+ not gb_trees:is_empty(Tree)}, infinity) of
+ ok ->
+ put_age_tree(Tree1);
+ close ->
+ [soft_close(Ref1, Handle1) ||
+ {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
+ Hdl1 =/= closed],
+ age_tree_insert(Now, Ref)
+ end.
age_tree_update(Then, Now, Ref) ->
with_age_tree(
@@ -575,6 +584,8 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
new -> Mode;
reopen -> [read | Mode]
end,
+ Now = now(),
+ age_tree_insert(Now, Ref),
case file:open(Path, Mode1) of
{ok, Hdl} ->
WriteBufferSize =
@@ -583,7 +594,6 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
- Now = now(),
Handle = #handle { hdl = Hdl,
offset = 0,
trusted_offset = 0,
@@ -601,12 +611,21 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
{{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
Handle2 = Handle1 #handle { trusted_offset = Offset1 },
put({Ref, fhc_handle}, Handle2),
- age_tree_insert(Now, Ref),
{ok, Handle2};
{error, Reason} ->
+ age_tree_delete(Now),
{error, Reason}
end.
+soft_close(Ref, Handle) ->
+ {Res, Handle1} = soft_close(Handle),
+ case Res of
+ ok -> put({Ref, fhc_handle}, Handle1),
+ true;
+ _ -> put_handle(Ref, Handle1),
+ false
+ end.
+
soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
@@ -709,18 +728,60 @@ init([]) ->
_ ->
ulimit()
end,
- error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
- {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [], callbacks = dict:new(),
- client_mrefs = dict:new(), timer_ref = undefined }}.
-
-handle_call(obtain, From, State = #fhc_state { count = Count }) ->
- State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
- maybe_reduce(State #fhc_state { count = Count + 1 }),
- case Limit /= infinity andalso Count1 >= Limit of
- true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
- count = Count1 - 1 }};
- false -> {reply, ok, State1}
+ ObtainLimit = case Limit of
+ infinity -> infinity;
+ _ -> ?OBTAIN_LIMIT(Limit)
+ end,
+ error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
+ [Limit, ObtainLimit]),
+ {ok, #fhc_state { elders = dict:new(),
+ limit = Limit,
+ open_count = 0,
+ open_pending = [],
+ obtain_limit = ObtainLimit,
+ obtain_count = 0,
+ obtain_pending = [],
+ callbacks = dict:new(),
+ client_mrefs = dict:new(),
+ timer_ref = undefined }}.
+
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
+ obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders })
+ when Limit =/= infinity andalso Count >= Limit ->
+ {noreply, State #fhc_state { obtain_pending = [From | Pending],
+ elders = dict:erase(Pid, Elders) }};
+handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
+ obtain_pending = Pending,
+ elders = Elders }) ->
+ case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of
+ {true, State1} ->
+ {noreply, State1 #fhc_state { obtain_count = Count,
+ obtain_pending = [From | Pending],
+ elders = dict:erase(Pid, Elders) }};
+ {false, State1} ->
+ {reply, ok, State1}
+ end;
+
+handle_call({open, Pid, EldestUnusedSince, CanClose}, From,
+ State = #fhc_state { open_count = Count,
+ open_pending = Pending,
+ elders = Elders }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ case maybe_reduce(
+ ensure_mref(Pid, State #fhc_state { open_count = Count + 1,
+ elders = Elders1 })) of
+ {true, State1} ->
+ State2 = State1 #fhc_state { open_count = Count },
+ case CanClose of
+ true -> {reply, close, State2};
+ false -> {noreply, State2 #fhc_state {
+ open_pending = [From | Pending],
+ elders = dict:erase(Pid, Elders1) }}
+ end;
+ {false, State1} ->
+ {reply, ok, State1}
end.
handle_cast({register_callback, Pid, MFA},
@@ -729,47 +790,41 @@ handle_cast({register_callback, Pid, MFA},
Pid, State #fhc_state {
callbacks = dict:store(Pid, MFA, Callbacks) })};
-handle_cast({open, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- {noreply, maybe_reduce(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count + 1 }))};
-
handle_cast({update, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders }) ->
+ #fhc_state { elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
{noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
+ #fhc_state { elders = Elders, open_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
- {noreply, process_obtains(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count - 1 }))};
+ {noreply, process_pending(
+ ensure_mref(Pid, State #fhc_state { open_count = Count - 1,
+ elders = Elders1 }))};
handle_cast(check_counts, State) ->
- {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
+ {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }),
+ {noreply, State1};
handle_cast({release_on_death, Pid}, State) ->
_MRef = erlang:monitor(process, Pid),
{noreply, State}.
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { count = Count, callbacks = Callbacks,
+ #fhc_state { obtain_count = Count, callbacks = Callbacks,
client_mrefs = ClientMRefs, elders = Elders }) ->
- {noreply, process_obtains(
+ {noreply, process_pending(
case dict:find(Pid, ClientMRefs) of
{ok, MRef} -> State #fhc_state {
elders = dict:erase(Pid, Elders),
client_mrefs = dict:erase(Pid, ClientMRefs),
callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { count = Count - 1 }
+ _ -> State #fhc_state { obtain_count = Count - 1 }
end)}.
terminate(_Reason, State) ->
@@ -782,23 +837,56 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
-process_obtains(State = #fhc_state { obtains = [] }) ->
+process_pending(State = #fhc_state { limit = infinity }) ->
State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count })
- when Limit /= infinity andalso Count >= Limit ->
- State;
-process_obtains(State = #fhc_state { limit = Limit, count = Count,
- obtains = Obtains }) ->
- ObtainsLen = length(Obtains),
- ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
- Take = ObtainsLen - ObtainableLen,
- {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
- [gen_server:reply(From, ok) || From <- ObtainableRev],
- State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
-
-maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
- callbacks = Callbacks, timer_ref = TRef })
- when Limit /= infinity andalso Count >= Limit ->
+process_pending(State) ->
+ process_obtain(process_open(State)).
+
+process_open(State = #fhc_state { limit = Limit,
+ open_pending = Pending,
+ open_count = OpenCount,
+ obtain_count = ObtainCount }) ->
+ {Pending1, Inc} =
+ process_pending(Pending, Limit - (ObtainCount + OpenCount)),
+ State #fhc_state { open_pending = Pending1,
+ open_count = OpenCount + Inc }.
+
+process_obtain(State = #fhc_state { limit = Limit,
+ obtain_pending = Pending,
+ obtain_limit = ObtainLimit,
+ obtain_count = ObtainCount,
+ open_count = OpenCount }) ->
+ Quota = lists:min([ObtainLimit - ObtainCount,
+ Limit - (ObtainCount + OpenCount)]),
+ {Pending1, Inc} = process_pending(Pending, Quota),
+ State #fhc_state { obtain_pending = Pending1,
+ obtain_count = ObtainCount + Inc }.
+
+process_pending([], _Quota) ->
+ {[], 0};
+process_pending(Pending, Quota) when Quota =< 0 ->
+ {Pending, 0};
+process_pending(Pending, Quota) ->
+ PendingLen = length(Pending),
+ SatisfiableLen = lists:min([PendingLen, Quota]),
+ Take = PendingLen - SatisfiableLen,
+ {PendingNew, SatisfiableRev} = lists:split(Take, Pending),
+ [gen_server:reply(From, ok) || From <- SatisfiableRev],
+ {PendingNew, SatisfiableLen}.
+
+maybe_reduce(State = #fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count = ObtainCount,
+ obtain_limit = ObtainLimit,
+ obtain_pending = ObtainPending,
+ elders = Elders,
+ callbacks = Callbacks,
+ timer_ref = TRef })
+ when Limit =/= infinity andalso
+ (((OpenCount + ObtainCount) > Limit) orelse
+ (OpenPending =/= []) orelse
+ (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) ->
Now = now(),
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
@@ -818,15 +906,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
end
end, Pids)
end,
+ AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit,
case TRef of
undefined -> {ok, TRef1} = timer:apply_after(
?FILE_HANDLES_CHECK_INTERVAL,
gen_server, cast, [?SERVER, check_counts]),
- State #fhc_state { timer_ref = TRef1 };
- _ -> State
+ {AboveLimit, State #fhc_state { timer_ref = TRef1 }};
+ _ -> {AboveLimit, State}
end;
maybe_reduce(State) ->
- State.
+ {false, State}.
%% For all unices, assume ulimit exists. Further googling suggests
%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n