diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-16 16:08:17 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-16 16:08:17 +0100 |
commit | 7966fb26206fde37c4485750c91cadb656aa0010 (patch) | |
tree | 248b2f2855cfd207c38fcd78bfcf13a3ddfc6b1b | |
parent | dfc9add60a8288e94bfaf99d3fd0a8086091f800 (diff) | |
download | rabbitmq-server-7966fb26206fde37c4485750c91cadb656aa0010.tar.gz |
Sockets have priority up to 90%. Processes have the full 100% but at lower priority.
-rw-r--r-- | src/file_handle_cache.erl | 133 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
2 files changed, 80 insertions, 55 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 957c1d1d..b6b9c3e4 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -179,11 +179,12 @@ -record(fhc_state, { elders, - count, limit, - opens, + opens_count, + opens_pending, obtains_limit, - obtains, + obtains_count, + obtains_pending, callbacks, client_mrefs, timer_ref @@ -733,30 +734,47 @@ init([]) -> ObtainsLimit = ?OBTAINS_LIMIT(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainsLimit]), - {ok, #fhc_state { elders = dict:new(), count = 0, limit = Limit, opens = [], - obtains_limit = ObtainsLimit, obtains = [], + {ok, #fhc_state { elders = dict:new(), limit = Limit, opens_count = 0, + opens_pending = [], obtains_limit = ObtainsLimit, + obtains_count = 0, obtains_pending = [], callbacks = dict:new(), client_mrefs = dict:new(), timer_ref = undefined }}. -handle_call(obtain, From, State = #fhc_state { obtains_limit = Limit }) -> - case over_limit(Limit, State) of - {true, State1 = #fhc_state { obtains = Obtains }} -> - {noreply, State1 #fhc_state { obtains = [From | Obtains] }}; - {false, State1} -> +handle_call(obtain, From, State = #fhc_state { obtains_count = Count, + obtains_limit = Limit, + obtains_pending = Pending }) + when Count >= Limit -> + {noreply, State #fhc_state { obtains_pending = [From | Pending] }}; +handle_call(obtain, From, State = #fhc_state { obtains_count = ObtainsCount, + obtains_pending = Pending, + limit = Limit }) -> + State1 = #fhc_state { opens_count = OpensCount1, + obtains_count = ObtainsCount1 } = + maybe_reduce(State #fhc_state { obtains_count = ObtainsCount + 1 }), + case Limit =/= infinity andalso OpensCount1 + ObtainsCount1 >= Limit of + true -> + {noreply, State1 #fhc_state { obtains_count = ObtainsCount1 - 1, + obtains_pending = [From | Pending] }}; + false -> {reply, ok, State1} end; - handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = - #fhc_state { limit = Limit, elders = Elders }) -> + #fhc_state { limit = Limit, opens_count = OpensCount, + opens_pending = Pending, elders = Elders }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - case over_limit(Limit, - ensure_mref(Pid, State #fhc_state { elders = Elders1 })) of - {true, State1 = #fhc_state { opens = Opens }} -> + State1 = #fhc_state { opens_count = OpensCount1, + obtains_count = ObtainsCount1 } = + maybe_reduce(State #fhc_state { opens_count = OpensCount + 1, + elders = Elders1 }), + case Limit =/= infinity andalso OpensCount1 + ObtainsCount1 > Limit of + true -> + State2 = State1 #fhc_state { opens_count = OpensCount1 - 1 }, case CanClose of - true -> {reply, close, State1}; - false -> {noreply, State1 #fhc_state { opens = [From | Opens] }} + true -> {reply, close, State2}; + false -> {noreply, State2 #fhc_state { opens_pending = + [From | Pending] }} end; - {false, State1} -> + false -> {reply, ok, State1} end. @@ -774,14 +792,14 @@ handle_cast({update, Pid, EldestUnusedSince}, State = {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, count = Count }) -> + #fhc_state { elders = Elders, opens_count = Count }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, {noreply, process_pending( - ensure_mref(Pid, State #fhc_state { elders = Elders1, - count = Count - 1 }))}; + ensure_mref(Pid, State #fhc_state { opens_count = Count - 1, + elders = Elders1 }))}; handle_cast(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; @@ -791,7 +809,7 @@ handle_cast({release_on_death, Pid}, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { count = Count, callbacks = Callbacks, + #fhc_state { obtains_count = Count, callbacks = Callbacks, client_mrefs = ClientMRefs, elders = Elders }) -> {noreply, process_pending( case dict:find(Pid, ClientMRefs) of @@ -799,7 +817,7 @@ handle_info({'DOWN', MRef, process, Pid, _Reason}, State = elders = dict:erase(Pid, Elders), client_mrefs = dict:erase(Pid, ClientMRefs), callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { count = Count - 1 } + _ -> State #fhc_state { obtains_count = Count - 1 } end)}. terminate(_Reason, State) -> @@ -812,44 +830,51 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- -over_limit(Limit, State = #fhc_state { count = Count }) -> - State1 = #fhc_state { count = Count1 } = - maybe_reduce(State #fhc_state { count = Count + 1 }), - case Limit =/= infinity andalso Count1 >= Limit of - true -> {true, State1 #fhc_state { count = Count1 - 1 }}; - false -> {false, State1} - end. - +process_pending(State = #fhc_state { limit = infinity }) -> + State; process_pending(State) -> process_obtains(process_opens(State)). -process_opens(State = - #fhc_state { opens = Opens, limit = Limit, count = Count }) -> - {Opens1, Count1} = process_pending(Opens, Limit, Count), - State #fhc_state { opens = Opens1, count = Count1}. - -process_obtains(State = #fhc_state { obtains = Obtains, obtains_limit = Limit, - count = Count }) -> - {Obtains1, Count1} = process_pending(Obtains, Limit, Count), - State #fhc_state { obtains = Obtains1, count = Count1 }. - -%% if limit is infinity then we'll never have any pendings, so will -%% always hit the first head. -process_pending([], _Limit, Count) -> - {[], Count}; -process_pending(Pending, Limit, Count) when Count >= Limit -> - {Pending, Count}; -process_pending(Pending, Limit, Count) -> +process_opens(State = #fhc_state { opens_pending = Pending, limit = Limit, + opens_count = OpensCount, + obtains_count = ObtainsCount }) -> + {Pending1, Inc} = + process_pending(Pending, Limit - (ObtainsCount + OpensCount)), + State #fhc_state { opens_pending = Pending1, + opens_count = OpensCount + Inc }. + +process_obtains(State = #fhc_state { obtains_pending = Pending, + obtains_limit = ObtainsLimit, + obtains_count = ObtainsCount, + opens_count = OpensCount, + limit = Limit }) -> + Quota = lists:min([ObtainsLimit - ObtainsCount, + Limit - (ObtainsCount + OpensCount)]), + {Pending1, Inc} = process_pending(Pending, Quota), + State #fhc_state { obtains_pending = Pending1, + obtains_count = ObtainsCount + Inc }. + +process_pending([], _Quota) -> + {[], 0}; +process_pending(Pending, Quota) when 0 >= Quota -> + {Pending, 0}; +process_pending(Pending, Quota) -> PendingLen = length(Pending), - SatisfiableLen = lists:min([PendingLen, Limit - Count]), + SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), [gen_server:reply(From, ok) || From <- SatisfiableRev], - {PendingNew, Count + SatisfiableLen}. - -maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, - callbacks = Callbacks, timer_ref = TRef }) - when Limit /= infinity andalso Count >= Limit -> + {PendingNew, SatisfiableLen}. + +maybe_reduce(State = #fhc_state { limit = Limit, opens_count = OpensCount, + obtains_count = ObtainsCount, + obtains_limit = ObtainsLimit, + obtains_pending = ObtainsPending, + elders = Elders, callbacks = Callbacks, + timer_ref = TRef }) + when Limit /= infinity andalso + (((OpensCount + ObtainsCount) >= Limit) orelse + (ObtainsCount < ObtainsLimit andalso [] =/= ObtainsPending)) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 66d07640..c07055af 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -54,7 +54,7 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> - application:set_env(rabbit, file_handles_high_watermark, 20, infinity), + application:set_env(rabbit, file_handles_high_watermark, 10, infinity), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), |