summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-04-17 13:51:52 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-04-17 13:51:52 +0100
commitdfb19cda6c6663ea3ed3cc00bfc9996016ccba69 (patch)
treed587bc4d1ac357a9dd167caed53ab4cb68862cf8
parent3a7232556affd21adba895ab378748bfddbe40a5 (diff)
parent4f92344718a60caf0b3d570a4b3ade1fb269d365 (diff)
downloadrabbitmq-server-dfb19cda6c6663ea3ed3cc00bfc9996016ccba69.tar.gz
Merge bug 26123
-rw-r--r--src/worker_pool.erl104
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl54
3 files changed, 72 insertions, 88 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 0f265e22..b1dba5a2 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,7 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit_async/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -42,7 +42,8 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
--spec(idle/1 :: (any()) -> 'ok').
+-spec(ready/1 :: (pid()) -> 'ok').
+-spec(idle/1 :: (pid()) -> 'ok').
-endif.
@@ -56,9 +57,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
- [{timeout, infinity}]).
+start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
submit(Fun) ->
case get(worker_pool_worker) of
@@ -67,64 +67,65 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
-submit_async(Fun) ->
- gen_server2:cast(?SERVER, {run_async, Fun}).
+submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
-idle(WId) ->
- gen_server2:cast(?SERVER, {idle, WId}).
+ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
+
+idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({next_free, CPid}, From, State = #state { available = Avail,
- pending = Pending }) ->
- case queue:out(Avail) of
- {empty, _Avail} ->
- {noreply,
- State#state{pending = queue:in({next_free, From, CPid}, Pending)},
- hibernate};
- {{value, WId}, Avail1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- {reply, WPid, State #state { available = Avail1 },
- hibernate}
- end;
+handle_call({next_free, CPid}, From, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
+ hibernate};
+handle_call({next_free, CPid}, _From, State = #state { available =
+ [WPid | Avail1] }) ->
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({idle, WId}, State = #state { available = Avail,
- pending = Pending }) ->
- {noreply, case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From, CPid}}, Pending1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- gen_server2:reply(From, WPid),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { pending = Pending1 }
- end, hibernate};
-
-handle_cast({run_async, Fun}, State = #state { available = Avail,
- pending = Pending }) ->
+handle_cast({ready, WPid}, State) ->
+ erlang:monitor(process, WPid),
+ handle_cast({idle, WPid}, State);
+
+handle_cast({idle, WPid}, State = #state { available = Avail,
+ pending = Pending }) ->
{noreply,
- case queue:out(Avail) of
- {empty, _Avail} ->
- State #state { pending = queue:in({run_async, Fun}, Pending)};
- {{value, WId}, Avail1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { available = Avail1 }
+ case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = ordsets:add_element(WPid, Avail) };
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(WPid, Fun),
+ State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
+ hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
+ worker_pool_worker:submit_async(WPid, Fun),
+ {noreply, State #state { available = Avail1 }, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', _MRef, process, WPid, _Reason},
+ State = #state { available = Avail }) ->
+ {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
+ hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -133,14 +134,3 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
-
-%%----------------------------------------------------------------------------
-
-get_worker_pid(WId) ->
- [{WId, Pid, _Type, _Modules} | _] =
- lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
- when Id =:= WId -> false;
- (_) -> true
- end,
- supervisor:which_children(worker_pool_sup)),
- Pid.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 16c359a0..89d2ed46 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -49,5 +49,5 @@ init([WCount]) ->
{ok, {{one_for_one, 10, 10},
[{worker_pool, {worker_pool, start_link, []}, transient,
16#ffffffff, worker, [worker_pool]} |
- [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff,
+ [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff,
worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 43673cb2..beb95bc6 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -31,7 +31,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
@@ -45,12 +45,10 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--record(state, {id, next}).
-
%%----------------------------------------------------------------------------
-start_link(WId) ->
- gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
@@ -71,45 +69,43 @@ run(Fun) ->
%%----------------------------------------------------------------------------
-init([WId]) ->
+init([]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:idle(WId),
+ ok = worker_pool:ready(self()),
put(worker_pool_worker, true),
- {ok, #state{id = WId}, hibernate,
+ {ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
- {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+handle_call({submit, Fun, CPid}, From, undefined) ->
+ {noreply, {job, CPid, From, Fun}, hibernate};
-handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
- id = WId}) ->
+handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
- {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+ {noreply, {from, CPid, MRef}, hibernate};
-handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
- id = WId}) ->
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
-handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
+handle_cast({submit_async, Fun}, undefined) ->
run(Fun),
- ok = worker_pool:idle(WId),
- {noreply, State, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
-handle_info({'DOWN', MRef, process, CPid, _Reason},
- State = #state{id = WId,
- next = {from, CPid, MRef}}) ->
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}};
+handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
- {noreply, State};
+ {noreply, State, hibernate};
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.