diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-05-16 14:53:56 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-05-16 14:53:56 +0100 |
commit | f31ed236b4dc90b1ab75aa158f63b4585c649398 (patch) | |
tree | ddcaa2b78dab326cfda40265b24855fcf46f7239 | |
parent | db054fbd6cfa1af1d32258aaa849a9698fa3a1e8 (diff) | |
download | rabbitmq-server-f31ed236b4dc90b1ab75aa158f63b4585c649398.tar.gz |
Add a file variant to fhc:obtain() and friends so that rabbit_file can keep working even when we have hit the socket limit.
-rw-r--r-- | src/file_handle_cache.erl | 262 | ||||
-rw-r--r-- | src/rabbit_file.erl | 4 |
2 files changed, 157 insertions, 109 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 71645a3c..7005a929 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, +-export([obtain/0, obtain/1, obtain/2, + release/0, release/1, release/2, transfer/1, transfer/2, transfer/3, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -192,9 +193,11 @@ limit, open_count, open_pending, - obtain_limit, - obtain_count, - obtain_pending, + obtain_limit, %%socket + obtain_count_socket, + obtain_count_file, + obtain_pending_socket, + obtain_pending_file, clients, timer_ref, alarm_set, @@ -205,7 +208,8 @@ { pid, callback, opened, - obtained, + obtained_socket, + obtained_file, blocked, pending_closes }). @@ -230,6 +234,7 @@ {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})). -type(offset() :: non_neg_integer()). +-type(obtain_type() :: 'file' | 'socket'). -spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: @@ -253,10 +258,13 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). -spec(obtain/1 :: (non_neg_integer()) -> 'ok'). +-spec(obtain/2 :: (non_neg_integer(), obtain_type()) -> 'ok'). -spec(release/0 :: () -> 'ok'). -spec(release/1 :: (non_neg_integer()) -> 'ok'). +-spec(release/2 :: (non_neg_integer(), obtain_type()) -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). -spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(transfer/3 :: (pid(), non_neg_integer(), obtain_type()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -493,18 +501,23 @@ obtain() -> obtain(1). release() -> release(1). transfer(Pid) -> transfer(Pid, 1). -obtain(Count) when Count > 0 -> +obtain(Count) -> obtain(Count, socket). +release(Count) -> release(Count, socket). +transfer(Pid, Count) -> transfer(Pid, Count, socket). + +obtain(Count, Type) when Count > 0 -> %% If the FHC isn't running, obtains succeed immediately. case whereis(?SERVER) of undefined -> ok; - _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity) + _ -> gen_server2:call( + ?SERVER, {obtain, Count, Type, self()}, infinity) end. -release(Count) when Count > 0 -> - gen_server2:cast(?SERVER, {release, Count, self()}). +release(Count, Type) when Count > 0 -> + gen_server2:cast(?SERVER, {release, Count, Type, self()}). -transfer(Pid, Count) when Count > 0 -> - gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}). +transfer(Pid, Count, Type) when Count > 0 -> + gen_server2:cast(?SERVER, {transfer, Count, Type, self(), Pid}). set_limit(Limit) -> gen_server2:call(?SERVER, {set_limit, Limit}, infinity). @@ -810,12 +823,16 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(total_limit, #fhc_state{limit = Limit}) -> Limit; -i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2; -i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; -i(sockets_used, #fhc_state{obtain_count = Count}) -> Count; +i(total_limit, #fhc_state{limit = Limit}) -> Limit; +i(total_used, State) -> used(State); +i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(sockets_used, #fhc_state{obtain_count_socket = Count}) -> Count; i(Item, _) -> throw({bad_argument, Item}). +used(#fhc_state{open_count = C1, + obtain_count_socket = C2, + obtain_count_file = C3}) -> C1 + C2 + C3. + %%---------------------------------------------------------------------------- %% gen_server2 callbacks %%---------------------------------------------------------------------------- @@ -836,21 +853,23 @@ init([AlarmSet, AlarmClear]) -> [Limit, ObtainLimit]), Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]), - {ok, #fhc_state { elders = Elders, - limit = Limit, - open_count = 0, - open_pending = pending_new(), - obtain_limit = ObtainLimit, - obtain_count = 0, - obtain_pending = pending_new(), - clients = Clients, - timer_ref = undefined, - alarm_set = AlarmSet, - alarm_clear = AlarmClear }}. + {ok, #fhc_state { elders = Elders, + limit = Limit, + open_count = 0, + open_pending = pending_new(), + obtain_limit = ObtainLimit, + obtain_count_file = 0, + obtain_pending_file = pending_new(), + obtain_count_socket = 0, + obtain_pending_socket = pending_new(), + clients = Clients, + timer_ref = undefined, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }}. prioritise_cast(Msg, _Len, _State) -> case Msg of - {release, _, _} -> 5; + {release, _, _, _} -> 5; _ -> 0 end. @@ -883,23 +902,24 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State)} end; -handle_call({obtain, N, Pid}, From, State = #fhc_state { - obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) -> +handle_call({obtain, N, Type, Pid}, From, + State = #fhc_state { clients = Clients }) -> + Count = obtain_state(Type, count, State), + Pending = obtain_state(Type, pending, State), ok = track_client(Pid, Clients), - Item = #pending { kind = obtain, pid = Pid, requested = N, from = From }, + Item = #pending { kind = {obtain, Type}, pid = Pid, + requested = N, from = From }, Enqueue = fun () -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - State #fhc_state { - obtain_pending = pending_in(Item, Pending) } + set_obtain_state(Type, pending, + pending_in(Item, Pending), State) end, {noreply, - case obtain_limit_reached(State) of + case obtain_limit_reached(Type, State) of true -> Enqueue(); - false -> case needs_reduce(State #fhc_state { - obtain_count = Count + N }) of + false -> case needs_reduce( + set_obtain_state(Type, count, Count + 1, State)) of true -> reduce(Enqueue()); false -> adjust_alarm( State, run_pending_item(Item, State)) @@ -934,9 +954,9 @@ handle_cast({update, Pid, EldestUnusedSince}, %% storm of messages {noreply, State}; -handle_cast({release, N, Pid}, State) -> - {noreply, adjust_alarm(State, process_pending( - update_counts(obtain, Pid, -N, State)))}; +handle_cast({release, N, Type, Pid}, State) -> + State1 = process_pending(update_counts({obtain, Type}, Pid, -N, State)), + {noreply, adjust_alarm(State, State1)}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> @@ -948,35 +968,41 @@ handle_cast({close, Pid, EldestUnusedSince}, {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, State)))}; -handle_cast({transfer, N, FromPid, ToPid}, State) -> +handle_cast({transfer, N, Type, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( - update_counts(obtain, ToPid, +N, - update_counts(obtain, FromPid, -N, State)))}. + update_counts({obtain, Type}, ToPid, +N, + update_counts({obtain, Type}, FromPid, -N, + State)))}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #fhc_state { elders = Elders, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_pending = ObtainPending, - clients = Clients }) -> - [#cstate { opened = Opened, obtained = Obtained }] = + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count_file = ObtainCountF, + obtain_count_socket = ObtainCountS, + obtain_pending_file = ObtainPendingF, + obtain_pending_socket = ObtainPendingS, + clients = Clients }) -> + [#cstate { opened = Opened, + obtained_file = ObtainedFile, + obtained_socket = ObtainedSocket}] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), true = ets:delete(Elders, Pid), - FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, - {noreply, adjust_alarm( - State, - process_pending( - State #fhc_state { - open_count = OpenCount - Opened, - open_pending = filter_pending(FilterFun, OpenPending), - obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending) }))}. + Fun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, + State1 = process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(Fun, OpenPending), + obtain_count_file = ObtainCountF - ObtainedFile, + obtain_count_socket = ObtainCountS - ObtainedSocket, + obtain_pending_file = filter_pending(Fun, ObtainPendingF), + obtain_pending_socket = filter_pending(Fun, ObtainPendingS) }), + {noreply, adjust_alarm(State, State1)}. terminate(_Reason, State = #fhc_state { clients = Clients, elders = Elders }) -> @@ -1039,10 +1065,23 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of OLimit -> OLimit end. -obtain_limit_reached(#fhc_state { obtain_limit = Limit, - obtain_count = Count}) -> +obtain_limit_reached(socket, State) -> obtain_limit_reached(State); +obtain_limit_reached(file, State) -> needs_reduce(State). + +obtain_limit_reached(#fhc_state{obtain_limit = Limit, + obtain_count_socket = Count}) -> Limit =/= infinity andalso Count >= Limit. +obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N; +obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N; +obtain_state(file, pending, #fhc_state{obtain_pending_file = N}) -> N; +obtain_state(socket, pending, #fhc_state{obtain_pending_socket = N}) -> N. + +set_obtain_state(file, count, N, S) -> S#fhc_state{obtain_count_file = N}; +set_obtain_state(socket, count, N, S) -> S#fhc_state{obtain_count_socket = N}; +set_obtain_state(file, pending, N, S) -> S#fhc_state{obtain_pending_file = N}; +set_obtain_state(socket, pending, N, S) -> S#fhc_state{obtain_pending_socket = N}. + adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, alarm_clear = AlarmClear }, NewState) -> case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of @@ -1055,25 +1094,24 @@ adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_open(process_obtain(State)). + process_open(process_obtain(socket, process_obtain(file, State))). process_open(State = #fhc_state { limit = Limit, - open_pending = Pending, - open_count = OpenCount, - obtain_count = ObtainCount }) -> - {Pending1, State1} = - process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + open_pending = Pending}) -> + {Pending1, State1} = process_pending(Pending, Limit - used(State), State), State1 #fhc_state { open_pending = Pending1 }. -process_obtain(State = #fhc_state { limit = Limit, - obtain_pending = Pending, - obtain_limit = ObtainLimit, - obtain_count = ObtainCount, - open_count = OpenCount }) -> - Quota = lists:min([ObtainLimit - ObtainCount, - Limit - (ObtainCount + OpenCount)]), +process_obtain(Type, State = #fhc_state { limit = Limit, + obtain_limit = ObtainLimit }) -> + ObtainCount = obtain_state(Type, count, State), + Pending = obtain_state(Type, pending, State), + Quota = case Type of + file -> Limit - (used(State)); + socket -> lists:min([ObtainLimit - ObtainCount, + Limit - (used(State))]) + end, {Pending1, State1} = process_pending(Pending, Quota, State), - State1 #fhc_state { obtain_pending = Pending1 }. + set_obtain_state(Type, pending, Pending1, State1). process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, State}; @@ -1099,19 +1137,25 @@ run_pending_item(#pending { kind = Kind, update_counts(Kind, Pid, Requested, State). update_counts(Kind, Pid, Delta, - State = #fhc_state { open_count = OpenCount, - obtain_count = ObtainCount, - clients = Clients }) -> - {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), - State #fhc_state { open_count = OpenCount + OpenDelta, - obtain_count = ObtainCount + ObtainDelta }. + State = #fhc_state { open_count = OpenCount, + obtain_count_file = ObtainCountF, + obtain_count_socket = ObtainCountS, + clients = Clients }) -> + {OpenDelta, ObtainDeltaF, ObtainDeltaS} = + update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count_file = ObtainCountF + ObtainDeltaF, + obtain_count_socket = ObtainCountS + ObtainDeltaS }. update_counts1(open, Pid, Delta, Clients) -> ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), - {Delta, 0}; -update_counts1(obtain, Pid, Delta, Clients) -> - ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), - {0, Delta}. + {Delta, 0, 0}; +update_counts1({obtain, file}, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}), + {0, Delta, 0}; +update_counts1({obtain, socket}, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}), + {0, 0, Delta}. maybe_reduce(State) -> case needs_reduce(State) of @@ -1119,23 +1163,25 @@ maybe_reduce(State) -> false -> State end. -needs_reduce(#fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending }) -> +needs_reduce(State = #fhc_state { limit = Limit, + open_pending = OpenPending, + obtain_limit = ObtainLimit, + obtain_count_socket = ObtainCountS, + obtain_pending_file = ObtainPendingF, + obtain_pending_socket = ObtainPendingS }) -> Limit =/= infinity - andalso ((OpenCount + ObtainCount > Limit) + andalso ((used(State) > Limit) orelse (not pending_is_empty(OpenPending)) - orelse (ObtainCount < ObtainLimit - andalso not pending_is_empty(ObtainPending))). - -reduce(State = #fhc_state { open_pending = OpenPending, - obtain_pending = ObtainPending, - elders = Elders, - clients = Clients, - timer_ref = TRef }) -> + orelse (not pending_is_empty(ObtainPendingF)) + orelse (ObtainCountS < ObtainLimit + andalso not pending_is_empty(ObtainPendingS))). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending_file = ObtainPendingFile, + obtain_pending_socket = ObtainPendingSocket, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), {CStates, Sum, ClientCount} = ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> @@ -1159,7 +1205,8 @@ reduce(State = #fhc_state { open_pending = OpenPending, _ -> notify_age0(Clients, CStates, pending_count(OpenPending) + - pending_count(ObtainPending)) + pending_count(ObtainPendingFile) + + pending_count(ObtainPendingSocket)) end end, case TRef of @@ -1196,12 +1243,13 @@ notify(Clients, Required, [#cstate{ pid = Pid, notify(Clients, Required - Opened, Notifications). track_client(Pid, Clients) -> - case ets:insert_new(Clients, #cstate { pid = Pid, - callback = undefined, - opened = 0, - obtained = 0, - blocked = false, - pending_closes = 0 }) of + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained_file = 0, + obtained_socket = 0, + blocked = false, + pending_closes = 0 }) of true -> _MRef = erlang:monitor(process, Pid), ok; false -> ok diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index d71818c8..d6f0f632 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -110,9 +110,9 @@ with_fhc_handle(Fun) -> with_fhc_handle(1, Fun). with_fhc_handle(N, Fun) -> - ok = file_handle_cache:obtain(N), + ok = file_handle_cache:obtain(N, file), try Fun() - after ok = file_handle_cache:release(N) + after ok = file_handle_cache:release(N, file) end. read_term_file(File) -> |