-module(worker_pool).

%% Generic worker pool manager.
%%
%% Supports nested submission of jobs (nested jobs always run
%% immediately in current worker process).
%%
%% Possible future enhancements:
%%
%% 1. Allow priorities (basically, change the pending queue to a
%%    priority_queue).
%%
%% 2. Allow the submission to the pool_worker to be async.

-behaviour(gen_server2).

-export([start_link/0, submit/1, idle/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%%----------------------------------------------------------------------------

-ifdef(use_specs).

-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).

-endif.

%%----------------------------------------------------------------------------

-define(SERVER, ?MODULE).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).

-record(state, { available, pending }).

%%----------------------------------------------------------------------------

start_link() ->
    gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
                           [{timeout, infinity}]).

submit(Fun) ->
    case get(worker_pool_worker) of
        true -> worker_pool_worker:run(Fun);
        _    -> Pid = gen_server2:call(?SERVER, next_free, infinity),
                worker_pool_worker:submit(Pid, Fun)
    end.

idle(WId) ->
    gen_server2:cast(?SERVER, {idle, WId}).

%%----------------------------------------------------------------------------

init([]) ->
    {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.

handle_call(next_free, From, State = #state { available = Avail,
                                               pending = Pending }) ->
    case queue:out(Avail) of
        {empty, _Avail} ->
            {noreply, State #state { pending = queue:in(From, Pending) }};
        {{value, WId}, Avail1} ->
            {reply, get_worker_pid(WId), State #state { available = Avail1 }}
    end;

handle_call(Msg, _From, State) ->
    {stop, {unexpected_call, Msg}, State}.

handle_cast({idle, WId}, State = #state { available = Avail,
                                          pending = Pending }) ->
    {noreply,
     case queue:out(Pending) of
         {empty, _Pending} ->
             State #state { available = queue:in(WId, Avail) };
         {{value, From}, Pending1} ->
             gen_server2:reply(From, get_worker_pid(WId)),
             State #state { pending = Pending1 }
     end};

handle_cast(Msg, State) ->
    {stop, {unexpected_cast, Msg}, State}.

handle_info(Msg, State) ->
    {stop, {unexpected_info, Msg}, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

terminate(_Reason, State) ->
    State.

%%----------------------------------------------------------------------------

get_worker_pid(WId) ->
    [{WId, Pid, _Type, _Modules} | _] =
        lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) when Id =:= WId ->
                                false;
                            (_) ->
                                true
                        end, supervisor:which_children(worker_pool_sup)),
    Pid.