diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-23 15:28:03 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-23 15:28:03 +0000 |
commit | d48a90fcd8ec97649df488644f051c99339e29a6 (patch) | |
tree | 7246a3f2cc933ab5904f94d1dcccbba47006c576 /src | |
parent | b23697669c2d9bb527f9580e0e14c9d5ed15698a (diff) | |
download | rabbitmq-server-d48a90fcd8ec97649df488644f051c99339e29a6.tar.gz |
Created worker pool system, and pushed the rabbit_misc:execute_mnesia_txn through it.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
-rw-r--r-- | src/worker_pool.erl | 135 | ||||
-rw-r--r-- | src/worker_pool_sup.erl | 69 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 94 |
5 files changed, 316 insertions, 6 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede..b1204997 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,27 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). -rabbit_boot_step({rabbit_exchange_type_registry, [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9abc1695..81cecb38 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 00000000..8e2c4774 --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,135 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). +%% +%% 2. Allow the submission to the pool_worker to be async. + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, State #state { pending = queue:in(From, Pending) }}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, From}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 } + end}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 00000000..4ded63a8 --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 00000000..fc3ce371 --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |