diff options
Diffstat (limited to 'src/worker_pool.erl')
-rw-r--r-- | src/worker_pool.erl | 104 |
1 files changed, 47 insertions, 57 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 0f265e22..b1dba5a2 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -28,7 +28,7 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit_async/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -42,7 +42,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). --spec(idle/1 :: (any()) -> 'ok'). +-spec(ready/1 :: (pid()) -> 'ok'). +-spec(idle/1 :: (pid()) -> 'ok'). -endif. @@ -56,9 +57,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], - [{timeout, infinity}]). +start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). submit(Fun) -> case get(worker_pool_worker) of @@ -67,64 +67,65 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. -submit_async(Fun) -> - gen_server2:cast(?SERVER, {run_async, Fun}). +submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). -idle(WId) -> - gen_server2:cast(?SERVER, {idle, WId}). +ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}). + +idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}). %%---------------------------------------------------------------------------- init([]) -> - {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({next_free, CPid}, From, State = #state { available = Avail, - pending = Pending }) -> - case queue:out(Avail) of - {empty, _Avail} -> - {noreply, - State#state{pending = queue:in({next_free, From, CPid}, Pending)}, - hibernate}; - {{value, WId}, Avail1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - {reply, WPid, State #state { available = Avail1 }, - hibernate} - end; +handle_call({next_free, CPid}, From, State = #state { available = [], + pending = Pending }) -> + {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)}, + hibernate}; +handle_call({next_free, CPid}, _From, State = #state { available = + [WPid | Avail1] }) -> + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({idle, WId}, State = #state { available = Avail, - pending = Pending }) -> - {noreply, case queue:out(Pending) of - {empty, _Pending} -> - State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From, CPid}}, Pending1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - gen_server2:reply(From, WPid), - State #state { pending = Pending1 }; - {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { pending = Pending1 } - end, hibernate}; - -handle_cast({run_async, Fun}, State = #state { available = Avail, - pending = Pending }) -> +handle_cast({ready, WPid}, State) -> + erlang:monitor(process, WPid), + handle_cast({idle, WPid}, State); + +handle_cast({idle, WPid}, State = #state { available = Avail, + pending = Pending }) -> {noreply, - case queue:out(Avail) of - {empty, _Avail} -> - State #state { pending = queue:in({run_async, Fun}, Pending)}; - {{value, WId}, Avail1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { available = Avail1 } + case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = ordsets:add_element(WPid, Avail) }; + {{value, {next_free, From, CPid}}, Pending1} -> + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(WPid, Fun), + State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [], + pending = Pending }) -> + {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)}, + hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) -> + worker_pool_worker:submit_async(WPid, Fun), + {noreply, State #state { available = Avail1 }, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', _MRef, process, WPid, _Reason}, + State = #state { available = Avail }) -> + {noreply, State #state { available = ordsets:del_element(WPid, Avail) }, + hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -133,14 +134,3 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. - -%%---------------------------------------------------------------------------- - -get_worker_pid(WId) -> - [{WId, Pid, _Type, _Modules} | _] = - lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) - when Id =:= WId -> false; - (_) -> true - end, - supervisor:which_children(worker_pool_sup)), - Pid. |