diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 21:45:20 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 21:45:20 +0100 |
commit | f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71 (patch) | |
tree | 8824fe82b4cf05f16e6a8ef4c9975b1a13c58527 | |
parent | bc6e57ca96c63a7e54dc8540b0e0fe9f928075aa (diff) | |
download | rabbitmq-server-f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71.tar.gz |
track workers by Pid instead of name
-rw-r--r-- | src/worker_pool.erl | 40 | ||||
-rw-r--r-- | src/worker_pool_sup.erl | 2 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 54 |
3 files changed, 37 insertions, 59 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 268c703f..db8c4e96 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -42,7 +42,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). --spec(idle/1 :: (any()) -> 'ok'). +-spec(idle/1 :: (pid()) -> 'ok'). -endif. @@ -56,9 +56,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], - [{timeout, infinity}]). +start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). submit(Fun) -> case get(worker_pool_worker) of @@ -67,11 +66,9 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. -submit_async(Fun) -> - gen_server2:cast(?SERVER, {run_async, Fun}). +submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). -idle(WId) -> - gen_server2:cast(?SERVER, {idle, WId}). +idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}). %%---------------------------------------------------------------------------- @@ -84,27 +81,25 @@ handle_call({next_free, CPid}, From, State = #state { available = [], {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)}, hibernate}; handle_call({next_free, CPid}, _From, State = #state { available = - [WId | Avail1] }) -> - WPid = get_worker_pid(WId), + [WPid | Avail1] }) -> worker_pool_worker:next_job_from(WPid, CPid), {reply, WPid, State #state { available = Avail1 }, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({idle, WId}, State = #state { available = Avail, - pending = Pending }) -> +handle_cast({idle, WPid}, State = #state { available = Avail, + pending = Pending }) -> {noreply, case queue:out(Pending) of {empty, _Pending} -> - State #state { available = ordsets:add_element(WId, Avail) }; + State #state { available = ordsets:add_element(WPid, Avail) }; {{value, {next_free, From, CPid}}, Pending1} -> - WPid = get_worker_pid(WId), worker_pool_worker:next_job_from(WPid, CPid), gen_server2:reply(From, WPid), State #state { pending = Pending1 }; {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + worker_pool_worker:submit_async(WPid, Fun), State #state { pending = Pending1 } end, hibernate}; @@ -112,8 +107,8 @@ handle_cast({run_async, Fun}, State = #state { available = [], pending = Pending }) -> {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)}, hibernate}; -handle_cast({run_async, Fun}, State = #state { available = [WId | Avail1] }) -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), +handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) -> + worker_pool_worker:submit_async(WPid, Fun), {noreply, State #state { available = Avail1 }, hibernate}; handle_cast(Msg, State) -> @@ -127,14 +122,3 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. - -%%---------------------------------------------------------------------------- - -get_worker_pid(WId) -> - [{WId, Pid, _Type, _Modules} | _] = - lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) - when Id =:= WId -> false; - (_) -> true - end, - supervisor:which_children(worker_pool_sup)), - Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index 16c359a0..89d2ed46 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -49,5 +49,5 @@ init([WCount]) -> {ok, {{one_for_one, 10, 10}, [{worker_pool, {worker_pool, start_link, []}, transient, 16#ffffffff, worker, [worker_pool]} | - [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff, worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 43673cb2..ef6f115a 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). +-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -31,7 +31,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). @@ -45,12 +45,10 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --record(state, {id, next}). - %%---------------------------------------------------------------------------- -start_link(WId) -> - gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). @@ -71,45 +69,43 @@ run(Fun) -> %%---------------------------------------------------------------------------- -init([WId]) -> +init([]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - ok = worker_pool:idle(WId), + ok = worker_pool:idle(self()), put(worker_pool_worker, true), - {ok, #state{id = WId}, hibernate, + {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> - {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; +handle_call({submit, Fun, CPid}, From, undefined) -> + {noreply, {job, CPid, From, Fun}, hibernate}; -handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, - id = WId}) -> +handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> +handle_cast({next_job_from, CPid}, undefined) -> MRef = erlang:monitor(process, CPid), - {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + {noreply, {from, CPid, MRef}, hibernate}; -handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, - id = WId}) -> +handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) -> gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; -handle_cast({submit_async, Fun}, State = #state{id = WId}) -> +handle_cast({submit_async, Fun}, undefined) -> run(Fun), - ok = worker_pool:idle(WId), - {noreply, State, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. -handle_info({'DOWN', MRef, process, CPid, _Reason}, - State = #state{id = WId, - next = {from, CPid, MRef}}) -> - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}}; +handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> - {noreply, State}; + {noreply, State, hibernate}; handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. |