summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-16 14:53:56 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-16 14:53:56 +0100
commitf31ed236b4dc90b1ab75aa158f63b4585c649398 (patch)
treeddcaa2b78dab326cfda40265b24855fcf46f7239
parentdb054fbd6cfa1af1d32258aaa849a9698fa3a1e8 (diff)
downloadrabbitmq-server-f31ed236b4dc90b1ab75aa158f63b4585c649398.tar.gz
Add a file variant to fhc:obtain() and friends so that rabbit_file can keep working even when we have hit the socket limit.
-rw-r--r--src/file_handle_cache.erl262
-rw-r--r--src/rabbit_file.erl4
2 files changed, 157 insertions, 109 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 71645a3c..7005a929 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
+-export([obtain/0, obtain/1, obtain/2,
+ release/0, release/1, release/2, transfer/1, transfer/2, transfer/3,
set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -192,9 +193,11 @@
limit,
open_count,
open_pending,
- obtain_limit,
- obtain_count,
- obtain_pending,
+ obtain_limit, %%socket
+ obtain_count_socket,
+ obtain_count_file,
+ obtain_pending_socket,
+ obtain_pending_file,
clients,
timer_ref,
alarm_set,
@@ -205,7 +208,8 @@
{ pid,
callback,
opened,
- obtained,
+ obtained_socket,
+ obtained_file,
blocked,
pending_closes
}).
@@ -230,6 +234,7 @@
{('bof' |'eof'), non_neg_integer()} |
{'cur', integer()})).
-type(offset() :: non_neg_integer()).
+-type(obtain_type() :: 'file' | 'socket').
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
@@ -253,10 +258,13 @@
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
-spec(obtain/1 :: (non_neg_integer()) -> 'ok').
+-spec(obtain/2 :: (non_neg_integer(), obtain_type()) -> 'ok').
-spec(release/0 :: () -> 'ok').
-spec(release/1 :: (non_neg_integer()) -> 'ok').
+-spec(release/2 :: (non_neg_integer(), obtain_type()) -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(transfer/3 :: (pid(), non_neg_integer(), obtain_type()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -493,18 +501,23 @@ obtain() -> obtain(1).
release() -> release(1).
transfer(Pid) -> transfer(Pid, 1).
-obtain(Count) when Count > 0 ->
+obtain(Count) -> obtain(Count, socket).
+release(Count) -> release(Count, socket).
+transfer(Pid, Count) -> transfer(Pid, Count, socket).
+
+obtain(Count, Type) when Count > 0 ->
%% If the FHC isn't running, obtains succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
- _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity)
+ _ -> gen_server2:call(
+ ?SERVER, {obtain, Count, Type, self()}, infinity)
end.
-release(Count) when Count > 0 ->
- gen_server2:cast(?SERVER, {release, Count, self()}).
+release(Count, Type) when Count > 0 ->
+ gen_server2:cast(?SERVER, {release, Count, Type, self()}).
-transfer(Pid, Count) when Count > 0 ->
- gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
+transfer(Pid, Count, Type) when Count > 0 ->
+ gen_server2:cast(?SERVER, {transfer, Count, Type, self(), Pid}).
set_limit(Limit) ->
gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
@@ -810,12 +823,16 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(total_limit, #fhc_state{limit = Limit}) -> Limit;
-i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
-i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
-i(sockets_used, #fhc_state{obtain_count = Count}) -> Count;
+i(total_limit, #fhc_state{limit = Limit}) -> Limit;
+i(total_used, State) -> used(State);
+i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(sockets_used, #fhc_state{obtain_count_socket = Count}) -> Count;
i(Item, _) -> throw({bad_argument, Item}).
+used(#fhc_state{open_count = C1,
+ obtain_count_socket = C2,
+ obtain_count_file = C3}) -> C1 + C2 + C3.
+
%%----------------------------------------------------------------------------
%% gen_server2 callbacks
%%----------------------------------------------------------------------------
@@ -836,21 +853,23 @@ init([AlarmSet, AlarmClear]) ->
[Limit, ObtainLimit]),
Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
- {ok, #fhc_state { elders = Elders,
- limit = Limit,
- open_count = 0,
- open_pending = pending_new(),
- obtain_limit = ObtainLimit,
- obtain_count = 0,
- obtain_pending = pending_new(),
- clients = Clients,
- timer_ref = undefined,
- alarm_set = AlarmSet,
- alarm_clear = AlarmClear }}.
+ {ok, #fhc_state { elders = Elders,
+ limit = Limit,
+ open_count = 0,
+ open_pending = pending_new(),
+ obtain_limit = ObtainLimit,
+ obtain_count_file = 0,
+ obtain_pending_file = pending_new(),
+ obtain_count_socket = 0,
+ obtain_pending_socket = pending_new(),
+ clients = Clients,
+ timer_ref = undefined,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }}.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
- {release, _, _} -> 5;
+ {release, _, _, _} -> 5;
_ -> 0
end.
@@ -883,23 +902,24 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State)}
end;
-handle_call({obtain, N, Pid}, From, State = #fhc_state {
- obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients }) ->
+handle_call({obtain, N, Type, Pid}, From,
+ State = #fhc_state { clients = Clients }) ->
+ Count = obtain_state(Type, count, State),
+ Pending = obtain_state(Type, pending, State),
ok = track_client(Pid, Clients),
- Item = #pending { kind = obtain, pid = Pid, requested = N, from = From },
+ Item = #pending { kind = {obtain, Type}, pid = Pid,
+ requested = N, from = From },
Enqueue = fun () ->
true = ets:update_element(Clients, Pid,
{#cstate.blocked, true}),
- State #fhc_state {
- obtain_pending = pending_in(Item, Pending) }
+ set_obtain_state(Type, pending,
+ pending_in(Item, Pending), State)
end,
{noreply,
- case obtain_limit_reached(State) of
+ case obtain_limit_reached(Type, State) of
true -> Enqueue();
- false -> case needs_reduce(State #fhc_state {
- obtain_count = Count + N }) of
+ false -> case needs_reduce(
+ set_obtain_state(Type, count, Count + 1, State)) of
true -> reduce(Enqueue());
false -> adjust_alarm(
State, run_pending_item(Item, State))
@@ -934,9 +954,9 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
-handle_cast({release, N, Pid}, State) ->
- {noreply, adjust_alarm(State, process_pending(
- update_counts(obtain, Pid, -N, State)))};
+handle_cast({release, N, Type, Pid}, State) ->
+ State1 = process_pending(update_counts({obtain, Type}, Pid, -N, State)),
+ {noreply, adjust_alarm(State, State1)};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
@@ -948,35 +968,41 @@ handle_cast({close, Pid, EldestUnusedSince},
{noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1, State)))};
-handle_cast({transfer, N, FromPid, ToPid}, State) ->
+handle_cast({transfer, N, Type, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
- update_counts(obtain, ToPid, +N,
- update_counts(obtain, FromPid, -N, State)))}.
+ update_counts({obtain, Type}, ToPid, +N,
+ update_counts({obtain, Type}, FromPid, -N,
+ State)))}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #fhc_state { elders = Elders,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_pending = ObtainPending,
- clients = Clients }) ->
- [#cstate { opened = Opened, obtained = Obtained }] =
+ State = #fhc_state { elders = Elders,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count_file = ObtainCountF,
+ obtain_count_socket = ObtainCountS,
+ obtain_pending_file = ObtainPendingF,
+ obtain_pending_socket = ObtainPendingS,
+ clients = Clients }) ->
+ [#cstate { opened = Opened,
+ obtained_file = ObtainedFile,
+ obtained_socket = ObtainedSocket}] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
true = ets:delete(Elders, Pid),
- FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- {noreply, adjust_alarm(
- State,
- process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(FilterFun, OpenPending),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
+ Fun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
+ State1 = process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(Fun, OpenPending),
+ obtain_count_file = ObtainCountF - ObtainedFile,
+ obtain_count_socket = ObtainCountS - ObtainedSocket,
+ obtain_pending_file = filter_pending(Fun, ObtainPendingF),
+ obtain_pending_socket = filter_pending(Fun, ObtainPendingS) }),
+ {noreply, adjust_alarm(State, State1)}.
terminate(_Reason, State = #fhc_state { clients = Clients,
elders = Elders }) ->
@@ -1039,10 +1065,23 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
OLimit -> OLimit
end.
-obtain_limit_reached(#fhc_state { obtain_limit = Limit,
- obtain_count = Count}) ->
+obtain_limit_reached(socket, State) -> obtain_limit_reached(State);
+obtain_limit_reached(file, State) -> needs_reduce(State).
+
+obtain_limit_reached(#fhc_state{obtain_limit = Limit,
+ obtain_count_socket = Count}) ->
Limit =/= infinity andalso Count >= Limit.
+obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N;
+obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N;
+obtain_state(file, pending, #fhc_state{obtain_pending_file = N}) -> N;
+obtain_state(socket, pending, #fhc_state{obtain_pending_socket = N}) -> N.
+
+set_obtain_state(file, count, N, S) -> S#fhc_state{obtain_count_file = N};
+set_obtain_state(socket, count, N, S) -> S#fhc_state{obtain_count_socket = N};
+set_obtain_state(file, pending, N, S) -> S#fhc_state{obtain_pending_file = N};
+set_obtain_state(socket, pending, N, S) -> S#fhc_state{obtain_pending_socket = N}.
+
adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
alarm_clear = AlarmClear }, NewState) ->
case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
@@ -1055,25 +1094,24 @@ adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
- process_open(process_obtain(State)).
+ process_open(process_obtain(socket, process_obtain(file, State))).
process_open(State = #fhc_state { limit = Limit,
- open_pending = Pending,
- open_count = OpenCount,
- obtain_count = ObtainCount }) ->
- {Pending1, State1} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ open_pending = Pending}) ->
+ {Pending1, State1} = process_pending(Pending, Limit - used(State), State),
State1 #fhc_state { open_pending = Pending1 }.
-process_obtain(State = #fhc_state { limit = Limit,
- obtain_pending = Pending,
- obtain_limit = ObtainLimit,
- obtain_count = ObtainCount,
- open_count = OpenCount }) ->
- Quota = lists:min([ObtainLimit - ObtainCount,
- Limit - (ObtainCount + OpenCount)]),
+process_obtain(Type, State = #fhc_state { limit = Limit,
+ obtain_limit = ObtainLimit }) ->
+ ObtainCount = obtain_state(Type, count, State),
+ Pending = obtain_state(Type, pending, State),
+ Quota = case Type of
+ file -> Limit - (used(State));
+ socket -> lists:min([ObtainLimit - ObtainCount,
+ Limit - (used(State))])
+ end,
{Pending1, State1} = process_pending(Pending, Quota, State),
- State1 #fhc_state { obtain_pending = Pending1 }.
+ set_obtain_state(Type, pending, Pending1, State1).
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, State};
@@ -1099,19 +1137,25 @@ run_pending_item(#pending { kind = Kind,
update_counts(Kind, Pid, Requested, State).
update_counts(Kind, Pid, Delta,
- State = #fhc_state { open_count = OpenCount,
- obtain_count = ObtainCount,
- clients = Clients }) ->
- {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
- State #fhc_state { open_count = OpenCount + OpenDelta,
- obtain_count = ObtainCount + ObtainDelta }.
+ State = #fhc_state { open_count = OpenCount,
+ obtain_count_file = ObtainCountF,
+ obtain_count_socket = ObtainCountS,
+ clients = Clients }) ->
+ {OpenDelta, ObtainDeltaF, ObtainDeltaS} =
+ update_counts1(Kind, Pid, Delta, Clients),
+ State #fhc_state { open_count = OpenCount + OpenDelta,
+ obtain_count_file = ObtainCountF + ObtainDeltaF,
+ obtain_count_socket = ObtainCountS + ObtainDeltaS }.
update_counts1(open, Pid, Delta, Clients) ->
ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
- {Delta, 0};
-update_counts1(obtain, Pid, Delta, Clients) ->
- ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
- {0, Delta}.
+ {Delta, 0, 0};
+update_counts1({obtain, file}, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}),
+ {0, Delta, 0};
+update_counts1({obtain, socket}, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
+ {0, 0, Delta}.
maybe_reduce(State) ->
case needs_reduce(State) of
@@ -1119,23 +1163,25 @@ maybe_reduce(State) ->
false -> State
end.
-needs_reduce(#fhc_state { limit = Limit,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_limit = ObtainLimit,
- obtain_pending = ObtainPending }) ->
+needs_reduce(State = #fhc_state { limit = Limit,
+ open_pending = OpenPending,
+ obtain_limit = ObtainLimit,
+ obtain_count_socket = ObtainCountS,
+ obtain_pending_file = ObtainPendingF,
+ obtain_pending_socket = ObtainPendingS }) ->
Limit =/= infinity
- andalso ((OpenCount + ObtainCount > Limit)
+ andalso ((used(State) > Limit)
orelse (not pending_is_empty(OpenPending))
- orelse (ObtainCount < ObtainLimit
- andalso not pending_is_empty(ObtainPending))).
-
-reduce(State = #fhc_state { open_pending = OpenPending,
- obtain_pending = ObtainPending,
- elders = Elders,
- clients = Clients,
- timer_ref = TRef }) ->
+ orelse (not pending_is_empty(ObtainPendingF))
+ orelse (ObtainCountS < ObtainLimit
+ andalso not pending_is_empty(ObtainPendingS))).
+
+reduce(State = #fhc_state { open_pending = OpenPending,
+ obtain_pending_file = ObtainPendingFile,
+ obtain_pending_socket = ObtainPendingSocket,
+ elders = Elders,
+ clients = Clients,
+ timer_ref = TRef }) ->
Now = now(),
{CStates, Sum, ClientCount} =
ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
@@ -1159,7 +1205,8 @@ reduce(State = #fhc_state { open_pending = OpenPending,
_ ->
notify_age0(Clients, CStates,
pending_count(OpenPending) +
- pending_count(ObtainPending))
+ pending_count(ObtainPendingFile) +
+ pending_count(ObtainPendingSocket))
end
end,
case TRef of
@@ -1196,12 +1243,13 @@ notify(Clients, Required, [#cstate{ pid = Pid,
notify(Clients, Required - Opened, Notifications).
track_client(Pid, Clients) ->
- case ets:insert_new(Clients, #cstate { pid = Pid,
- callback = undefined,
- opened = 0,
- obtained = 0,
- blocked = false,
- pending_closes = 0 }) of
+ case ets:insert_new(Clients, #cstate { pid = Pid,
+ callback = undefined,
+ opened = 0,
+ obtained_file = 0,
+ obtained_socket = 0,
+ blocked = false,
+ pending_closes = 0 }) of
true -> _MRef = erlang:monitor(process, Pid),
ok;
false -> ok
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index d71818c8..d6f0f632 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -110,9 +110,9 @@ with_fhc_handle(Fun) ->
with_fhc_handle(1, Fun).
with_fhc_handle(N, Fun) ->
- ok = file_handle_cache:obtain(N),
+ ok = file_handle_cache:obtain(N, file),
try Fun()
- after ok = file_handle_cache:release(N)
+ after ok = file_handle_cache:release(N, file)
end.
read_term_file(File) ->