summaryrefslogtreecommitdiff
path: root/src/worker_pool.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-09 17:12:37 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-09 17:12:37 +0100
commit2db1402bf3bf6b1ffe80aa655eb24c0bbd5e4d7f (patch)
treece41763ee4f81ca5e5c7a3808c197562ed0020a2 /src/worker_pool.erl
parente367a2ecb372af75b234ecd0c2b601dc1f6d7575 (diff)
downloadrabbitmq-server-2db1402bf3bf6b1ffe80aa655eb24c0bbd5e4d7f.tar.gz
add worker_pook:submit_async/1
This was on the todo list and got cherry-picked from the bug21673 branch.
Diffstat (limited to 'src/worker_pool.erl')
-rw-r--r--src/worker_pool.erl28
1 files changed, 23 insertions, 5 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 1ee958af..97e07545 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -40,12 +40,10 @@
%%
%% 1. Allow priorities (basically, change the pending queue to a
%% priority_queue).
-%%
-%% 2. Allow the submission to the pool_worker to be async.
-behaviour(gen_server2).
--export([start_link/0, submit/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -56,6 +54,8 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/1 ::
+ (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
-endif.
@@ -80,6 +80,9 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
+submit_async(Fun) ->
+ gen_server2:cast(?SERVER, {run_async, Fun}).
+
idle(WId) ->
gen_server2:cast(?SERVER, {idle, WId}).
@@ -93,7 +96,8 @@ handle_call(next_free, From, State = #state { available = Avail,
pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
- {noreply, State #state { pending = queue:in(From, Pending) },
+ {noreply,
+ State #state { pending = queue:in({next_free, From}, Pending) },
hibernate};
{{value, WId}, Avail1} ->
{reply, get_worker_pid(WId), State #state { available = Avail1 },
@@ -108,11 +112,25 @@ handle_cast({idle, WId}, State = #state { available = Avail,
{noreply, case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = queue:in(WId, Avail) };
- {{value, From}, Pending1} ->
+ {{value, {next_free, From}}, Pending1} ->
gen_server2:reply(From, get_worker_pid(WId)),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply,
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ State #state { pending = queue:in({run_async, Fun}, Pending)};
+ {{value, WId}, Avail1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ State #state { available = Avail1 }
+ end, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.