summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:45:20 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:45:20 +0100
commitf6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71 (patch)
tree8824fe82b4cf05f16e6a8ef4c9975b1a13c58527
parentbc6e57ca96c63a7e54dc8540b0e0fe9f928075aa (diff)
downloadrabbitmq-server-f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71.tar.gz
track workers by Pid instead of name
-rw-r--r--src/worker_pool.erl40
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl54
3 files changed, 37 insertions, 59 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 268c703f..db8c4e96 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -42,7 +42,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
--spec(idle/1 :: (any()) -> 'ok').
+-spec(idle/1 :: (pid()) -> 'ok').
-endif.
@@ -56,9 +56,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
- [{timeout, infinity}]).
+start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
submit(Fun) ->
case get(worker_pool_worker) of
@@ -67,11 +66,9 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
-submit_async(Fun) ->
- gen_server2:cast(?SERVER, {run_async, Fun}).
+submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
-idle(WId) ->
- gen_server2:cast(?SERVER, {idle, WId}).
+idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
%%----------------------------------------------------------------------------
@@ -84,27 +81,25 @@ handle_call({next_free, CPid}, From, State = #state { available = [],
{noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
hibernate};
handle_call({next_free, CPid}, _From, State = #state { available =
- [WId | Avail1] }) ->
- WPid = get_worker_pid(WId),
+ [WPid | Avail1] }) ->
worker_pool_worker:next_job_from(WPid, CPid),
{reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({idle, WId}, State = #state { available = Avail,
- pending = Pending }) ->
+handle_cast({idle, WPid}, State = #state { available = Avail,
+ pending = Pending }) ->
{noreply,
case queue:out(Pending) of
{empty, _Pending} ->
- State #state { available = ordsets:add_element(WId, Avail) };
+ State #state { available = ordsets:add_element(WPid, Avail) };
{{value, {next_free, From, CPid}}, Pending1} ->
- WPid = get_worker_pid(WId),
worker_pool_worker:next_job_from(WPid, CPid),
gen_server2:reply(From, WPid),
State #state { pending = Pending1 };
{{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ worker_pool_worker:submit_async(WPid, Fun),
State #state { pending = Pending1 }
end, hibernate};
@@ -112,8 +107,8 @@ handle_cast({run_async, Fun}, State = #state { available = [],
pending = Pending }) ->
{noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
hibernate};
-handle_cast({run_async, Fun}, State = #state { available = [WId | Avail1] }) ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
+ worker_pool_worker:submit_async(WPid, Fun),
{noreply, State #state { available = Avail1 }, hibernate};
handle_cast(Msg, State) ->
@@ -127,14 +122,3 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
-
-%%----------------------------------------------------------------------------
-
-get_worker_pid(WId) ->
- [{WId, Pid, _Type, _Modules} | _] =
- lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
- when Id =:= WId -> false;
- (_) -> true
- end,
- supervisor:which_children(worker_pool_sup)),
- Pid.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 16c359a0..89d2ed46 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -49,5 +49,5 @@ init([WCount]) ->
{ok, {{one_for_one, 10, 10},
[{worker_pool, {worker_pool, start_link, []}, transient,
16#ffffffff, worker, [worker_pool]} |
- [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff,
+ [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff,
worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 43673cb2..ef6f115a 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -31,7 +31,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
@@ -45,12 +45,10 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--record(state, {id, next}).
-
%%----------------------------------------------------------------------------
-start_link(WId) ->
- gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
@@ -71,45 +69,43 @@ run(Fun) ->
%%----------------------------------------------------------------------------
-init([WId]) ->
+init([]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:idle(WId),
+ ok = worker_pool:idle(self()),
put(worker_pool_worker, true),
- {ok, #state{id = WId}, hibernate,
+ {ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
- {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+handle_call({submit, Fun, CPid}, From, undefined) ->
+ {noreply, {job, CPid, From, Fun}, hibernate};
-handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
- id = WId}) ->
+handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
- {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+ {noreply, {from, CPid, MRef}, hibernate};
-handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
- id = WId}) ->
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
-handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
+handle_cast({submit_async, Fun}, undefined) ->
run(Fun),
- ok = worker_pool:idle(WId),
- {noreply, State, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
-handle_info({'DOWN', MRef, process, CPid, _Reason},
- State = #state{id = WId,
- next = {from, CPid, MRef}}) ->
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}};
+handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
- {noreply, State};
+ {noreply, State, hibernate};
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.