diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-08-10 17:17:34 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-08-10 17:17:34 +0100 |
commit | 70612671f27997b9275010c3bfab91fd70cf3156 (patch) | |
tree | fec07db10e64c807e981193bd959eb9b2f33d469 | |
parent | 26461122d432be23aabed9a9b67019ea652d2071 (diff) | |
download | rabbitmq-server-70612671f27997b9275010c3bfab91fd70cf3156.tar.gz |
Prevent possible deadlock when obtaining multiple filehandles
-rw-r--r-- | src/file_handle_cache.erl | 59 | ||||
-rw-r--r-- | src/rabbit_file.erl | 4 |
2 files changed, 36 insertions, 27 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 68c095d2..c3c5ed8e 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -120,12 +120,12 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain, release and transfer. obtain/0 +%% The server also supports obtain, release and transfer. obtain %% blocks until a file descriptor is available, at which point the -%% requesting process is considered to 'own' one more -%% descriptor. release/0 is the inverse operation and releases a -%% previously obtained descriptor. transfer/1 transfers ownership of a -%% file descriptor between processes. It is non-blocking. Obtain has a +%% requesting process is considered to 'own' more descriptor(s). +%% release is the inverse operation and releases previously obtained +%% descriptor(s). transfer transfers ownership of file descriptor(s) +%% between processes. It is non-blocking. Obtain has a %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use %% the entire limit, but will be evicted by obtain calls up to the %% point at which no more obtain calls can be satisfied by the obtains @@ -136,8 +136,8 @@ %% as sockets can do so in such a way that the overall number of open %% file descriptors is managed. %% -%% The callers of register_callback/3, obtain/0, and the argument of -%% transfer/1 are monitored, reducing the count of handles in use +%% The callers of register_callback/3, obtain, and the argument of +%% transfer are monitored, reducing the count of handles in use %% appropriately when the processes terminate. -behaviour(gen_server2). @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, +-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, + set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -251,8 +252,11 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). +-spec(obtain/1 :: (non_neg_integer()) -> 'ok'). -spec(release/0 :: () -> 'ok'). +-spec(release/1 :: (non_neg_integer()) -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). +-spec(transfer/2 :: (non_neg_integer(), pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain() -> +obtain() -> obtain(1). +release() -> release(1). +transfer(Pid) -> transfer(1, Pid). + +obtain(Count) when Count > 0 -> %% If the FHC isn't running, obtains succeed immediately. case whereis(?SERVER) of undefined -> ok; - _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity) + _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity) end. -release() -> - gen_server2:cast(?SERVER, {release, self()}). +release(Count) when Count > 0 -> + gen_server2:cast(?SERVER, {release, Count, self()}). -transfer(Pid) -> - gen_server2:cast(?SERVER, {transfer, self(), Pid}). +transfer(Count, Pid) when Count > 0 -> + gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}). set_limit(Limit) -> gen_server2:call(?SERVER, {set_limit, Limit}, infinity). @@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) -> prioritise_cast(Msg, _State) -> case Msg of - {release, _} -> 5; + {release, _, _} -> 5; _ -> 0 end. @@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State)} end; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) -> +handle_call({obtain, N, Pid}, From, State = #fhc_state { + obtain_count = Count, + obtain_pending = Pending, + clients = Clients }) -> ok = track_client(Pid, Clients), - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + Item = #pending { kind = obtain, pid = Pid, requested = N, from = From }, Enqueue = fun () -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), @@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, case obtain_limit_reached(State) of true -> Enqueue(); false -> case needs_reduce(State #fhc_state { - obtain_count = Count + 1 }) of + obtain_count = Count + N }) of true -> reduce(Enqueue()); false -> adjust_alarm( State, run_pending_item(Item, State)) @@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince}, %% storm of messages {noreply, State}; -handle_cast({release, Pid}, State) -> +handle_cast({release, N, Pid}, State) -> {noreply, adjust_alarm(State, process_pending( - update_counts(obtain, Pid, -1, State)))}; + update_counts(obtain, Pid, -N, State)))}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> @@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince}, {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, State)))}; -handle_cast({transfer, FromPid, ToPid}, State) -> +handle_cast({transfer, N, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( - update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, State)))}. + update_counts(obtain, ToPid, +N, + update_counts(obtain, FromPid, -N, State)))}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index a95f8f26..26f74796 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -105,9 +105,9 @@ with_fhc_handle(Fun) -> with_fhc_handle(1, Fun). with_fhc_handle(N, Fun) -> - [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)], + ok = file_handle_cache:obtain(N), try Fun() - after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)] + after ok = file_handle_cache:release(N) end. read_term_file(File) -> |