summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-08-10 17:17:34 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-08-10 17:17:34 +0100
commit70612671f27997b9275010c3bfab91fd70cf3156 (patch)
treefec07db10e64c807e981193bd959eb9b2f33d469
parent26461122d432be23aabed9a9b67019ea652d2071 (diff)
downloadrabbitmq-server-70612671f27997b9275010c3bfab91fd70cf3156.tar.gz
Prevent possible deadlock when obtaining multiple filehandles
-rw-r--r--src/file_handle_cache.erl59
-rw-r--r--src/rabbit_file.erl4
2 files changed, 36 insertions, 27 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 68c095d2..c3c5ed8e 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,12 +120,12 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain, release and transfer. obtain/0
+%% The server also supports obtain, release and transfer. obtain
%% blocks until a file descriptor is available, at which point the
-%% requesting process is considered to 'own' one more
-%% descriptor. release/0 is the inverse operation and releases a
-%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain has a
+%% requesting process is considered to 'own' more descriptor(s).
+%% release is the inverse operation and releases previously obtained
+%% descriptor(s). transfer transfers ownership of file descriptor(s)
+%% between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -136,8 +136,8 @@
%% as sockets can do so in such a way that the overall number of open
%% file descriptors is managed.
%%
-%% The callers of register_callback/3, obtain/0, and the argument of
-%% transfer/1 are monitored, reducing the count of handles in use
+%% The callers of register_callback/3, obtain, and the argument of
+%% transfer are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
-behaviour(gen_server2).
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
+ set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -251,8 +252,11 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (non_neg_integer()) -> 'ok').
-spec(release/0 :: () -> 'ok').
+-spec(release/1 :: (non_neg_integer()) -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(transfer/2 :: (non_neg_integer(), pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain() ->
+obtain() -> obtain(1).
+release() -> release(1).
+transfer(Pid) -> transfer(1, Pid).
+
+obtain(Count) when Count > 0 ->
%% If the FHC isn't running, obtains succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
- _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity)
end.
-release() ->
- gen_server2:cast(?SERVER, {release, self()}).
+release(Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {release, Count, self()}).
-transfer(Pid) ->
- gen_server2:cast(?SERVER, {transfer, self(), Pid}).
+transfer(Count, Pid) when Count > 0 ->
+ gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
set_limit(Limit) ->
gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
@@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) ->
prioritise_cast(Msg, _State) ->
case Msg of
- {release, _} -> 5;
+ {release, _, _} -> 5;
_ -> 0
end.
@@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients }) ->
+handle_call({obtain, N, Pid}, From, State = #fhc_state {
+ obtain_count = Count,
+ obtain_pending = Pending,
+ clients = Clients }) ->
ok = track_client(Pid, Clients),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Item = #pending { kind = obtain, pid = Pid, requested = N, from = From },
Enqueue = fun () ->
true = ets:update_element(Clients, Pid,
{#cstate.blocked, true}),
@@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
case obtain_limit_reached(State) of
true -> Enqueue();
false -> case needs_reduce(State #fhc_state {
- obtain_count = Count + 1 }) of
+ obtain_count = Count + N }) of
true -> reduce(Enqueue());
false -> adjust_alarm(
State, run_pending_item(Item, State))
@@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
-handle_cast({release, Pid}, State) ->
+handle_cast({release, N, Pid}, State) ->
{noreply, adjust_alarm(State, process_pending(
- update_counts(obtain, Pid, -1, State)))};
+ update_counts(obtain, Pid, -N, State)))};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
@@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince},
{noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1, State)))};
-handle_cast({transfer, FromPid, ToPid}, State) ->
+handle_cast({transfer, N, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
- update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1, State)))}.
+ update_counts(obtain, ToPid, +N,
+ update_counts(obtain, FromPid, -N, State)))}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index a95f8f26..26f74796 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -105,9 +105,9 @@ with_fhc_handle(Fun) ->
with_fhc_handle(1, Fun).
with_fhc_handle(N, Fun) ->
- [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
+ ok = file_handle_cache:obtain(N),
try Fun()
- after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
+ after ok = file_handle_cache:release(N)
end.
read_term_file(File) ->