diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-29 11:25:34 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-29 11:25:34 +0100 |
commit | aa2cbd3584de485a094a2b2ab394c5b660740d61 (patch) | |
tree | b16a6b325fbc182bc3cbe0c95763bc9a07c37943 | |
parent | 94e415335ba30ab4eb1854605bab2eb768ad4c26 (diff) | |
parent | c3da328ac578d3f3ddabba32863431c2201b28b1 (diff) | |
download | rabbitmq-server-aa2cbd3584de485a094a2b2ab394c5b660740d61.tar.gz |
Merging bug 22554 onto default
-rw-r--r-- | .hgignore | 4 | ||||
-rw-r--r-- | Makefile | 75 | ||||
-rw-r--r-- | generate_deps | 5 | ||||
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_control.erl | 5 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 5 | ||||
-rw-r--r-- | src/worker_pool.erl | 135 | ||||
-rw-r--r-- | src/worker_pool_sup.erl | 69 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 94 |
10 files changed, 374 insertions, 42 deletions
@@ -11,8 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^src/rabbit_framing\.erl$ -^src/rabbitmqctl_usage\.erl$ -^src/rabbitmqmulti_usage\.erl$ +^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ ^ebin/rabbit\.(app|rel|boot|script)$ @@ -28,4 +27,3 @@ syntax: regexp ^docs/.*\.[15]\.gz$ ^docs/.*\.man\.xml$ -^docs/.*\.usage.erl$ @@ -10,14 +10,16 @@ DEPS_FILE=deps.mk SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include +DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl $(SOURCE_DIR)/rabbitmqmulti_usage.erl +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ -MANPAGES=$(patsubst %.xml, %.gz, $(wildcard docs/*.[0-9].xml)) -WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard docs/*.[0-9].xml) docs/rabbitmq-service.xml) -USAGES=$(patsubst %.1.xml, %.usage.erl, $(wildcard docs/*.[0-9].xml)) +MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) +WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) +USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml +USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -60,6 +62,14 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e ERL_EBIN=erl -noinput -pa $(EBIN_DIR) +define usage_xml_to_erl + $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, $(SOURCE_DIR)/rabbit_%_usage.erl, $(subst -,_,$(1)))) +endef + +define usage_dep + $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl +endef + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -68,9 +78,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl +$(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -78,12 +87,6 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ -$(SOURCE_DIR)/rabbitmqctl_usage.erl: docs/rabbitmqctl.usage.erl - cp docs/rabbitmqctl.usage.erl $@ - -$(SOURCE_DIR)/rabbitmqmulti_usage.erl: docs/rabbitmq-multi.usage.erl - cp docs/rabbitmq-multi.usage.erl $@ - dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))." @@ -107,8 +110,8 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl codegen.pyc - rm -f docs/*.[0-9].gz docs/*.usage.erl docs/*.man.xml docs/*.erl + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -184,7 +187,7 @@ srcdist: distclean cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) - cp -r docs $(TARGET_SRC_DIR) + cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) @@ -197,25 +200,25 @@ distclean: clean find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; # xmlto can not read from standard input, so we mess with a tmp file. -%.gz: %.xml docs/examples-to-end.xsl - xsltproc docs/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o docs --stringparam man.indent.verbatims=0 $<.tmp && \ - gzip -f docs/`basename $< .xml` +%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp -%.usage.erl: %.1.xml docs/usage.xsl - xsltproc --stringparam modulename "`basename $< .1.xml | tr -d -`_usage" \ - docs/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \ - fold -s > docs/`basename $< .1.xml`.usage.erl +$(SOURCE_DIR)/%_usage.erl: + xsltproc --stringparam modulename "`basename $@ .erl`" \ + $(DOCS_DIR)/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \ + fold -s > $@ # We rename the file before xmlto sees it since xmlto will use the name of # the file to make internal links. -%.man.xml: %.xml docs/html-to-website-xml.xsl +%.man.xml: %.xml $(DOCS_DIR)/html-to-website-xml.xsl cp $< `basename $< .xml`.xml && \ xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml cat `basename $< .xml`.html | \ - xsltproc --novalid docs/remove-namespaces.xsl - | \ - xsltproc --stringparam original `basename $<` docs/html-to-website-xml.xsl - | \ + xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \ + xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \ xmllint --format - > $@ rm `basename $< .xml`.html @@ -237,7 +240,7 @@ install: all docs_all install_dirs done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ - for manpage in docs/*.$$section.gz; do \ + for manpage in $(DOCS_DIR)/*.$$section.gz; do \ cp $$manpage $(MAN_DIR)/man$$section; \ done; \ done @@ -246,4 +249,22 @@ install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(TARGET_DIR)/sbin +$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) + +# Note that all targets which depend on clean must have clean in their +# name. Also any target that doesn't depend on clean should not have +# clean in its name, unless you know that you don't need any of the +# automatic dependency generation for that target (eg cleandb). + +# We want to load the dep file if *any* target *doesn't* contain +# "clean" - i.e. if removing all clean-like targets leaves something + +ifeq "$(MAKECMDGOALS)" "" +TESTABLEGOALS:=$(.DEFAULT_GOAL) +else +TESTABLEGOALS:=$(MAKECMDGOALS) +endif + +ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" -include $(DEPS_FILE) +endif diff --git a/generate_deps b/generate_deps index e16624d2..29587b5a 100644 --- a/generate_deps +++ b/generate_deps @@ -23,10 +23,11 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> ok; (Path, Dep, ok) -> Module = filename:basename(Path, ".erl"), - ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]), + ok = file:write(Hdl, [EbinDir, "/", Module, ".beam: ", + Path]), ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end, ok, Dep), - file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"]) + file:write(Hdl, ["\n"]) end, ok, Deps), ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]), ok = file:sync(Hdl), diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede..b1204997 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,27 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). -rabbit_boot_step({rabbit_exchange_type_registry, [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6f564b1d..f2f29169 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -46,6 +46,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(action/4 :: (atom(), erlang_node(), [string()], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -129,8 +130,8 @@ parse_args([], _) -> stop() -> ok. -usage() -> - rabbitmqctl_usage:usage(). +usage() -> + rabbit_ctl_usage:usage(). action(stop, Node, [], Inform) -> Inform("Stopping and halting node ~p", [Node]), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9abc1695..81cecb38 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index bc04357b..9ff2c5cb 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -42,6 +42,7 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -85,8 +86,8 @@ parse_args([Command | Args]) -> stop() -> ok. -usage() -> - rabbitmqmulti_usage:usage(). +usage() -> + rabbit_multi_usage:usage(). action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 00000000..b883d4f0 --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,135 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). +%% +%% 2. Allow the submission to the pool_worker to be async. + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, State #state { pending = queue:in(From, Pending) }}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, From}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 } + end}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 00000000..4ded63a8 --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 00000000..fc3ce371 --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |