summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-16 16:08:17 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-16 16:08:17 +0100
commit7966fb26206fde37c4485750c91cadb656aa0010 (patch)
tree248b2f2855cfd207c38fcd78bfcf13a3ddfc6b1b
parentdfc9add60a8288e94bfaf99d3fd0a8086091f800 (diff)
downloadrabbitmq-server-7966fb26206fde37c4485750c91cadb656aa0010.tar.gz
Sockets have priority up to 90%. Processes have the full 100% but at lower priority.
-rw-r--r--src/file_handle_cache.erl133
-rw-r--r--src/rabbit_tests.erl2
2 files changed, 80 insertions, 55 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 957c1d1d..b6b9c3e4 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -179,11 +179,12 @@
-record(fhc_state,
{ elders,
- count,
limit,
- opens,
+ opens_count,
+ opens_pending,
obtains_limit,
- obtains,
+ obtains_count,
+ obtains_pending,
callbacks,
client_mrefs,
timer_ref
@@ -733,30 +734,47 @@ init([]) ->
ObtainsLimit = ?OBTAINS_LIMIT(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainsLimit]),
- {ok, #fhc_state { elders = dict:new(), count = 0, limit = Limit, opens = [],
- obtains_limit = ObtainsLimit, obtains = [],
+ {ok, #fhc_state { elders = dict:new(), limit = Limit, opens_count = 0,
+ opens_pending = [], obtains_limit = ObtainsLimit,
+ obtains_count = 0, obtains_pending = [],
callbacks = dict:new(), client_mrefs = dict:new(),
timer_ref = undefined }}.
-handle_call(obtain, From, State = #fhc_state { obtains_limit = Limit }) ->
- case over_limit(Limit, State) of
- {true, State1 = #fhc_state { obtains = Obtains }} ->
- {noreply, State1 #fhc_state { obtains = [From | Obtains] }};
- {false, State1} ->
+handle_call(obtain, From, State = #fhc_state { obtains_count = Count,
+ obtains_limit = Limit,
+ obtains_pending = Pending })
+ when Count >= Limit ->
+ {noreply, State #fhc_state { obtains_pending = [From | Pending] }};
+handle_call(obtain, From, State = #fhc_state { obtains_count = ObtainsCount,
+ obtains_pending = Pending,
+ limit = Limit }) ->
+ State1 = #fhc_state { opens_count = OpensCount1,
+ obtains_count = ObtainsCount1 } =
+ maybe_reduce(State #fhc_state { obtains_count = ObtainsCount + 1 }),
+ case Limit =/= infinity andalso OpensCount1 + ObtainsCount1 >= Limit of
+ true ->
+ {noreply, State1 #fhc_state { obtains_count = ObtainsCount1 - 1,
+ obtains_pending = [From | Pending] }};
+ false ->
{reply, ok, State1}
end;
-
handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State =
- #fhc_state { limit = Limit, elders = Elders }) ->
+ #fhc_state { limit = Limit, opens_count = OpensCount,
+ opens_pending = Pending, elders = Elders }) ->
Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
- case over_limit(Limit,
- ensure_mref(Pid, State #fhc_state { elders = Elders1 })) of
- {true, State1 = #fhc_state { opens = Opens }} ->
+ State1 = #fhc_state { opens_count = OpensCount1,
+ obtains_count = ObtainsCount1 } =
+ maybe_reduce(State #fhc_state { opens_count = OpensCount + 1,
+ elders = Elders1 }),
+ case Limit =/= infinity andalso OpensCount1 + ObtainsCount1 > Limit of
+ true ->
+ State2 = State1 #fhc_state { opens_count = OpensCount1 - 1 },
case CanClose of
- true -> {reply, close, State1};
- false -> {noreply, State1 #fhc_state { opens = [From | Opens] }}
+ true -> {reply, close, State2};
+ false -> {noreply, State2 #fhc_state { opens_pending =
+ [From | Pending] }}
end;
- {false, State1} ->
+ false ->
{reply, ok, State1}
end.
@@ -774,14 +792,14 @@ handle_cast({update, Pid, EldestUnusedSince}, State =
{noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
handle_cast({close, Pid, EldestUnusedSince}, State =
- #fhc_state { elders = Elders, count = Count }) ->
+ #fhc_state { elders = Elders, opens_count = Count }) ->
Elders1 = case EldestUnusedSince of
undefined -> dict:erase(Pid, Elders);
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
{noreply, process_pending(
- ensure_mref(Pid, State #fhc_state { elders = Elders1,
- count = Count - 1 }))};
+ ensure_mref(Pid, State #fhc_state { opens_count = Count - 1,
+ elders = Elders1 }))};
handle_cast(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
@@ -791,7 +809,7 @@ handle_cast({release_on_death, Pid}, State) ->
{noreply, State}.
handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
- #fhc_state { count = Count, callbacks = Callbacks,
+ #fhc_state { obtains_count = Count, callbacks = Callbacks,
client_mrefs = ClientMRefs, elders = Elders }) ->
{noreply, process_pending(
case dict:find(Pid, ClientMRefs) of
@@ -799,7 +817,7 @@ handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
elders = dict:erase(Pid, Elders),
client_mrefs = dict:erase(Pid, ClientMRefs),
callbacks = dict:erase(Pid, Callbacks) };
- _ -> State #fhc_state { count = Count - 1 }
+ _ -> State #fhc_state { obtains_count = Count - 1 }
end)}.
terminate(_Reason, State) ->
@@ -812,44 +830,51 @@ code_change(_OldVsn, State, _Extra) ->
%% server helpers
%%----------------------------------------------------------------------------
-over_limit(Limit, State = #fhc_state { count = Count }) ->
- State1 = #fhc_state { count = Count1 } =
- maybe_reduce(State #fhc_state { count = Count + 1 }),
- case Limit =/= infinity andalso Count1 >= Limit of
- true -> {true, State1 #fhc_state { count = Count1 - 1 }};
- false -> {false, State1}
- end.
-
+process_pending(State = #fhc_state { limit = infinity }) ->
+ State;
process_pending(State) ->
process_obtains(process_opens(State)).
-process_opens(State =
- #fhc_state { opens = Opens, limit = Limit, count = Count }) ->
- {Opens1, Count1} = process_pending(Opens, Limit, Count),
- State #fhc_state { opens = Opens1, count = Count1}.
-
-process_obtains(State = #fhc_state { obtains = Obtains, obtains_limit = Limit,
- count = Count }) ->
- {Obtains1, Count1} = process_pending(Obtains, Limit, Count),
- State #fhc_state { obtains = Obtains1, count = Count1 }.
-
-%% if limit is infinity then we'll never have any pendings, so will
-%% always hit the first head.
-process_pending([], _Limit, Count) ->
- {[], Count};
-process_pending(Pending, Limit, Count) when Count >= Limit ->
- {Pending, Count};
-process_pending(Pending, Limit, Count) ->
+process_opens(State = #fhc_state { opens_pending = Pending, limit = Limit,
+ opens_count = OpensCount,
+ obtains_count = ObtainsCount }) ->
+ {Pending1, Inc} =
+ process_pending(Pending, Limit - (ObtainsCount + OpensCount)),
+ State #fhc_state { opens_pending = Pending1,
+ opens_count = OpensCount + Inc }.
+
+process_obtains(State = #fhc_state { obtains_pending = Pending,
+ obtains_limit = ObtainsLimit,
+ obtains_count = ObtainsCount,
+ opens_count = OpensCount,
+ limit = Limit }) ->
+ Quota = lists:min([ObtainsLimit - ObtainsCount,
+ Limit - (ObtainsCount + OpensCount)]),
+ {Pending1, Inc} = process_pending(Pending, Quota),
+ State #fhc_state { obtains_pending = Pending1,
+ obtains_count = ObtainsCount + Inc }.
+
+process_pending([], _Quota) ->
+ {[], 0};
+process_pending(Pending, Quota) when 0 >= Quota ->
+ {Pending, 0};
+process_pending(Pending, Quota) ->
PendingLen = length(Pending),
- SatisfiableLen = lists:min([PendingLen, Limit - Count]),
+ SatisfiableLen = lists:min([PendingLen, Quota]),
Take = PendingLen - SatisfiableLen,
{PendingNew, SatisfiableRev} = lists:split(Take, Pending),
[gen_server:reply(From, ok) || From <- SatisfiableRev],
- {PendingNew, Count + SatisfiableLen}.
-
-maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
- callbacks = Callbacks, timer_ref = TRef })
- when Limit /= infinity andalso Count >= Limit ->
+ {PendingNew, SatisfiableLen}.
+
+maybe_reduce(State = #fhc_state { limit = Limit, opens_count = OpensCount,
+ obtains_count = ObtainsCount,
+ obtains_limit = ObtainsLimit,
+ obtains_pending = ObtainsPending,
+ elders = Elders, callbacks = Callbacks,
+ timer_ref = TRef })
+ when Limit /= infinity andalso
+ (((OpensCount + ObtainsCount) >= Limit) orelse
+ (ObtainsCount < ObtainsLimit andalso [] =/= ObtainsPending)) ->
Now = now(),
{Pids, Sum, ClientCount} =
dict:fold(fun (_Pid, undefined, Accs) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 66d07640..c07055af 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -54,7 +54,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
- application:set_env(rabbit, file_handles_high_watermark, 20, infinity),
+ application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
passed = test_backing_queue(),
passed = test_priority_queue(),
passed = test_bpqueue(),