summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-16 12:13:56 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-16 12:13:56 +0100
commit7c559387acdfcdc51eced753696b6c6a2d678662 (patch)
treef815b9c941f8ed48c84c4721bda98f43d71d4c24
parent2d9da1b6e17f128c72267ab24057a197cba3827b (diff)
downloadrabbitmq-server-7c559387acdfcdc51eced753696b6c6a2d678662.tar.gz
Allow the fhc server to request the client closes file handles before granting an open - thus prevents deadlocks
-rw-r--r--src/file_handle_cache.erl83
1 files changed, 52 insertions, 31 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f026c83f..e79fbb96 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -429,21 +429,15 @@ set_maximum_since_use(MaximumAge) ->
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
Age = timer:now_diff(Now, Then),
- case Hdl /= closed andalso Age >= MaximumAge of
- true -> {Res, Handle1} = soft_close(Handle),
- case Res of
- ok -> put({Ref, fhc_handle}, Handle1),
- false;
- _ -> put_handle(Ref, Handle1),
- Rep
- end;
+ case Hdl =/= closed andalso Age >= MaximumAge of
+ true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
(_KeyValuePair, Rep) ->
Rep
- end, true, get()) of
- true -> age_tree_change(), ok;
- false -> ok
+ end, false, get()) of
+ false -> age_tree_change(), ok;
+ true -> ok
end.
release_on_death(Pid) when is_pid(Pid) ->
@@ -523,20 +517,30 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-with_age_tree(Fun) ->
- put(fhc_age_tree, Fun(case get(fhc_age_tree) of
- undefined -> gb_trees:empty();
- AgeTree -> AgeTree
- end)).
+with_age_tree(Fun) -> put_age_tree(Fun(get_age_tree())).
+
+get_age_tree() ->
+ case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end.
+
+put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_insert(Now, Ref) ->
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:insert(Now, Ref, Tree),
- {Oldest, _Ref} = gb_trees:smallest(Tree1),
- gen_server:call(?SERVER, {open, self(), Oldest}, infinity),
- Tree1
- end).
+ Tree = get_age_tree(),
+ Tree1 = gb_trees:insert(Now, Ref, Tree),
+ {Oldest, _Ref} = gb_trees:smallest(Tree1),
+ case gen_server:call(?SERVER, {open, self(), Oldest,
+ 0 < gb_trees:size(Tree)}, infinity) of
+ ok ->
+ put_age_tree(Tree1);
+ close ->
+ [soft_close(Ref1, Handle1) ||
+ {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(),
+ Hdl1 =/= closed],
+ age_tree_insert(Now, Ref)
+ end.
age_tree_update(Then, Now, Ref) ->
with_age_tree(
@@ -608,6 +612,15 @@ open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
{error, Reason}
end.
+soft_close(Ref, Handle) ->
+ {Res, Handle1} = soft_close(Handle),
+ case Res of
+ ok -> put({Ref, fhc_handle}, Handle1),
+ true;
+ _ -> put_handle(Ref, Handle1),
+ false
+ end.
+
soft_close(Handle = #handle { hdl = closed }) ->
{ok, Handle};
soft_close(Handle) ->
@@ -716,12 +729,13 @@ init([]) ->
client_mrefs = dict:new(), timer_ref = undefined }}.
handle_call(obtain, From, State) ->
- add_obtains(From, State);
+ add_obtains(From, false, State);
-handle_call({open, Pid, EldestUnusedSince}, From, State =
- #fhc_state { elders = Elders }) ->
+handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State =
+ #fhc_state { elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- add_obtains(From, ensure_mref(Pid, State #fhc_state { elders = Elders1 })).
+ add_obtains(From, CanClose,
+ ensure_mref(Pid, State #fhc_state { elders = Elders1 })).
handle_cast({register_callback, Pid, MFA},
State = #fhc_state { callbacks = Callbacks }) ->
@@ -775,13 +789,20 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
-add_obtains(From, State = #fhc_state { count = Count }) ->
+add_obtains(From, CanClose, State = #fhc_state { count = Count }) ->
State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
maybe_reduce(State #fhc_state { count = Count + 1 }),
case Limit /= infinity andalso Count1 >= Limit of
- true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
- count = Count1 - 1 }};
- false -> {reply, ok, State1}
+ true ->
+ case CanClose of
+ true ->
+ {reply, close, State1 #fhc_state { count = Count1 - 1 }};
+ false ->
+ {noreply, State1 #fhc_state { obtains = [From | Obtains],
+ count = Count1 - 1 }}
+ end;
+ false ->
+ {reply, ok, State1}
end.
process_obtains(State = #fhc_state { obtains = [] }) ->