diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-04-17 13:51:52 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-04-17 13:51:52 +0100 |
commit | dfb19cda6c6663ea3ed3cc00bfc9996016ccba69 (patch) | |
tree | d587bc4d1ac357a9dd167caed53ab4cb68862cf8 | |
parent | 3a7232556affd21adba895ab378748bfddbe40a5 (diff) | |
parent | 4f92344718a60caf0b3d570a4b3ade1fb269d365 (diff) | |
download | rabbitmq-server-dfb19cda6c6663ea3ed3cc00bfc9996016ccba69.tar.gz |
Merge bug 26123
-rw-r--r-- | src/worker_pool.erl | 104 | ||||
-rw-r--r-- | src/worker_pool_sup.erl | 2 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 54 |
3 files changed, 72 insertions, 88 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 0f265e22..b1dba5a2 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -28,7 +28,7 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit_async/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -42,7 +42,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). --spec(idle/1 :: (any()) -> 'ok'). +-spec(ready/1 :: (pid()) -> 'ok'). +-spec(idle/1 :: (pid()) -> 'ok'). -endif. @@ -56,9 +57,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], - [{timeout, infinity}]). +start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). submit(Fun) -> case get(worker_pool_worker) of @@ -67,64 +67,65 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. -submit_async(Fun) -> - gen_server2:cast(?SERVER, {run_async, Fun}). +submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). -idle(WId) -> - gen_server2:cast(?SERVER, {idle, WId}). +ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}). + +idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}). %%---------------------------------------------------------------------------- init([]) -> - {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({next_free, CPid}, From, State = #state { available = Avail, - pending = Pending }) -> - case queue:out(Avail) of - {empty, _Avail} -> - {noreply, - State#state{pending = queue:in({next_free, From, CPid}, Pending)}, - hibernate}; - {{value, WId}, Avail1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - {reply, WPid, State #state { available = Avail1 }, - hibernate} - end; +handle_call({next_free, CPid}, From, State = #state { available = [], + pending = Pending }) -> + {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)}, + hibernate}; +handle_call({next_free, CPid}, _From, State = #state { available = + [WPid | Avail1] }) -> + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({idle, WId}, State = #state { available = Avail, - pending = Pending }) -> - {noreply, case queue:out(Pending) of - {empty, _Pending} -> - State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From, CPid}}, Pending1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - gen_server2:reply(From, WPid), - State #state { pending = Pending1 }; - {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { pending = Pending1 } - end, hibernate}; - -handle_cast({run_async, Fun}, State = #state { available = Avail, - pending = Pending }) -> +handle_cast({ready, WPid}, State) -> + erlang:monitor(process, WPid), + handle_cast({idle, WPid}, State); + +handle_cast({idle, WPid}, State = #state { available = Avail, + pending = Pending }) -> {noreply, - case queue:out(Avail) of - {empty, _Avail} -> - State #state { pending = queue:in({run_async, Fun}, Pending)}; - {{value, WId}, Avail1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { available = Avail1 } + case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = ordsets:add_element(WPid, Avail) }; + {{value, {next_free, From, CPid}}, Pending1} -> + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(WPid, Fun), + State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [], + pending = Pending }) -> + {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)}, + hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) -> + worker_pool_worker:submit_async(WPid, Fun), + {noreply, State #state { available = Avail1 }, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', _MRef, process, WPid, _Reason}, + State = #state { available = Avail }) -> + {noreply, State #state { available = ordsets:del_element(WPid, Avail) }, + hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -133,14 +134,3 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. - -%%---------------------------------------------------------------------------- - -get_worker_pid(WId) -> - [{WId, Pid, _Type, _Modules} | _] = - lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) - when Id =:= WId -> false; - (_) -> true - end, - supervisor:which_children(worker_pool_sup)), - Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index 16c359a0..89d2ed46 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -49,5 +49,5 @@ init([WCount]) -> {ok, {{one_for_one, 10, 10}, [{worker_pool, {worker_pool, start_link, []}, transient, 16#ffffffff, worker, [worker_pool]} | - [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff, worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 43673cb2..beb95bc6 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). +-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -31,7 +31,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). @@ -45,12 +45,10 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --record(state, {id, next}). - %%---------------------------------------------------------------------------- -start_link(WId) -> - gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). @@ -71,45 +69,43 @@ run(Fun) -> %%---------------------------------------------------------------------------- -init([WId]) -> +init([]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - ok = worker_pool:idle(WId), + ok = worker_pool:ready(self()), put(worker_pool_worker, true), - {ok, #state{id = WId}, hibernate, + {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> - {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; +handle_call({submit, Fun, CPid}, From, undefined) -> + {noreply, {job, CPid, From, Fun}, hibernate}; -handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, - id = WId}) -> +handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> +handle_cast({next_job_from, CPid}, undefined) -> MRef = erlang:monitor(process, CPid), - {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + {noreply, {from, CPid, MRef}, hibernate}; -handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, - id = WId}) -> +handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) -> gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; -handle_cast({submit_async, Fun}, State = #state{id = WId}) -> +handle_cast({submit_async, Fun}, undefined) -> run(Fun), - ok = worker_pool:idle(WId), - {noreply, State, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. -handle_info({'DOWN', MRef, process, CPid, _Reason}, - State = #state{id = WId, - next = {from, CPid, MRef}}) -> - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}}; +handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> - {noreply, State}; + {noreply, State, hibernate}; handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. |