diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-29 12:18:04 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-29 12:18:04 +0100 |
commit | 5409a68783e6e909fe8ed4dcc018a23654758179 (patch) | |
tree | e02863467d4fcaca06cf7057da8045f34baa6843 | |
parent | a502d7b954bff8a8c0e3773b5285294fc3d7e57a (diff) | |
parent | fc1d45b378e1dcbcc7dec9cba55e2e9f0159fe04 (diff) | |
download | rabbitmq-server-5409a68783e6e909fe8ed4dcc018a23654758179.tar.gz |
Merging bug 22559 onto default
-rw-r--r-- | .hgignore | 4 | ||||
-rw-r--r-- | Makefile | 59 | ||||
-rw-r--r-- | docs/examples-to-end.xsl | 13 | ||||
-rw-r--r-- | docs/rabbitmq.conf.5.xml | 12 | ||||
-rw-r--r-- | docs/usage.xsl | 4 | ||||
-rw-r--r-- | generate_deps | 5 | ||||
-rw-r--r-- | src/rabbit.erl | 22 | ||||
-rw-r--r-- | src/rabbit_control.erl | 2 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 2 | ||||
-rw-r--r-- | src/worker_pool.erl | 135 | ||||
-rw-r--r-- | src/worker_pool_sup.erl | 69 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 94 |
13 files changed, 374 insertions, 49 deletions
@@ -11,8 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^src/rabbit_framing\.erl$ -^src/rabbitmqctl_usage\.erl$ -^src/rabbitmqmulti_usage\.erl$ +^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ ^ebin/rabbit\.(app|rel|boot|script)$ @@ -28,4 +27,3 @@ syntax: regexp ^docs/.*\.[15]\.gz$ ^docs/.*\.man\.xml$ -^docs/.*\.usage.erl$ @@ -10,14 +10,16 @@ DEPS_FILE=deps.mk SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include +DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl $(SOURCE_DIR)/rabbitmqmulti_usage.erl +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ -MANPAGES=$(patsubst %.xml, %.gz, $(wildcard docs/*.[0-9].xml)) -WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard docs/*.[0-9].xml) docs/rabbitmq-service.xml) -USAGES=$(patsubst %.1.xml, %.usage.erl, $(wildcard docs/*.[0-9].xml)) +MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) +WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) +USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml +USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -60,6 +62,14 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e ERL_EBIN=erl -noinput -pa $(EBIN_DIR) +define usage_xml_to_erl + $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, $(SOURCE_DIR)/rabbit_%_usage.erl, $(subst -,_,$(1)))) +endef + +define usage_dep + $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl +endef + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -68,9 +78,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl +$(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -78,12 +87,6 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ -$(SOURCE_DIR)/rabbitmqctl_usage.erl: docs/rabbitmqctl.usage.erl - cp docs/rabbitmqctl.usage.erl $@ - -$(SOURCE_DIR)/rabbitmqmulti_usage.erl: docs/rabbitmq-multi.usage.erl - cp docs/rabbitmq-multi.usage.erl $@ - dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))." @@ -107,8 +110,8 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl codegen.pyc - rm -f docs/*.[0-9].gz docs/*.usage.erl docs/*.man.xml docs/*.erl + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -184,7 +187,7 @@ srcdist: distclean cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) - cp -r docs $(TARGET_SRC_DIR) + cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) @@ -197,25 +200,25 @@ distclean: clean find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; # xmlto can not read from standard input, so we mess with a tmp file. -%.gz: %.xml docs/examples-to-end.xsl - xsltproc docs/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o docs $<.tmp && \ - gzip -f docs/`basename $< .xml` +%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp -%.usage.erl: %.1.xml docs/usage.xsl - xsltproc --stringparam modulename "`basename $< .1.xml | tr -d -`_usage" \ - docs/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \ - fold -s > docs/`basename $< .1.xml`.usage.erl +$(SOURCE_DIR)/%_usage.erl: + xsltproc --stringparam modulename "`basename $@ .erl`" \ + $(DOCS_DIR)/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \ + fold -s > $@ # We rename the file before xmlto sees it since xmlto will use the name of # the file to make internal links. -%.man.xml: %.xml docs/html-to-website-xml.xsl +%.man.xml: %.xml $(DOCS_DIR)/html-to-website-xml.xsl cp $< `basename $< .xml`.xml && \ xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml cat `basename $< .xml`.html | \ - xsltproc --novalid docs/remove-namespaces.xsl - | \ - xsltproc --stringparam original `basename $<` docs/html-to-website-xml.xsl - | \ + xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \ + xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \ xmllint --format - > $@ rm `basename $< .xml`.html @@ -237,7 +240,7 @@ install: all docs_all install_dirs done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ - for manpage in docs/*.$$section.gz; do \ + for manpage in $(DOCS_DIR)/*.$$section.gz; do \ cp $$manpage $(MAN_DIR)/man$$section; \ done; \ done @@ -246,6 +249,8 @@ install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(TARGET_DIR)/sbin +$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) + # Note that all targets which depend on clean must have clean in their # name. Also any target that doesn't depend on clean should not have # clean in its name, unless you know that you don't need any of the diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index b63ffcb3..496fcc1c 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -8,7 +8,7 @@ <xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> -<!-- Don't copy exmaples through in place --> +<!-- Don't copy examples through in place --> <xsl:template match="*[@role='example-prefix']"/> <xsl:template match="*[@role='example']"/> @@ -25,6 +25,7 @@ </xsl:for-each> <refsect1> <title>Examples</title> +<xsl:if test="//screen[@role='example']"> <variablelist> <xsl:for-each select="//screen[@role='example']"> <varlistentry> @@ -35,6 +36,16 @@ </varlistentry> </xsl:for-each> </variablelist> +</xsl:if> +<!-- +We need to handle multiline examples separately, since not using a +variablelist leads to slightly less nice formatting (the explanation doesn't get +indented) +--> +<xsl:for-each select="//screen[@role='example-multiline']"> +<screen><emphasis role="bold"><xsl:copy-of select="text()"/></emphasis></screen> +<xsl:copy-of select="following-sibling::para[@role='example']"/> +</xsl:for-each> </refsect1> </refentry> </xsl:template> diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml index dcb1e49c..34f20f92 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq.conf.5.xml @@ -9,7 +9,7 @@ </refentryinfo> <refmeta> - <refentrytitle>/etc/rabbitmq/rabbitmq.conf</refentrytitle> + <refentrytitle>rabbitmq.conf</refentrytitle> <manvolnum>5</manvolnum> <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> </refmeta> @@ -59,11 +59,11 @@ environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: <filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. </para> <para role="example-prefix">For example:</para> - <screen role="example"> - # I am a complete /etc/rabbitmq/rabbitmq.conf file. - # Comment lines start with a hash character. - # This is a /bin/sh script file - use ordinary envt var syntax - NODENAME=hare + <screen role="example-multiline"> +# I am a complete /etc/rabbitmq/rabbitmq.conf file. +# Comment lines start with a hash character. +# This is a /bin/sh script file - use ordinary envt var syntax +NODENAME=hare </screen> <para role="example"> This is an example of a complete diff --git a/docs/usage.xsl b/docs/usage.xsl index f2e4c822..a2917401 100644 --- a/docs/usage.xsl +++ b/docs/usage.xsl @@ -12,7 +12,7 @@ encoding="UTF-8" indent="no"/> <xsl:strip-space elements="*"/> -<xsl:preserve-space elements="term" /> +<xsl:preserve-space elements="cmdsynopsis arg" /> <xsl:template match="/"> <!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit! @@ -74,7 +74,7 @@ usage() -> %QUOTE%Usage: <!-- Don't show anything else in command usage --> <xsl:template match="text()" mode="command-usage"/> -<xsl:template match="option">[<xsl:apply-templates/>]</xsl:template> +<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template> <xsl:template match="replaceable"><<xsl:value-of select="."/>></xsl:template> </xsl:stylesheet> diff --git a/generate_deps b/generate_deps index e16624d2..29587b5a 100644 --- a/generate_deps +++ b/generate_deps @@ -23,10 +23,11 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> ok; (Path, Dep, ok) -> Module = filename:basename(Path, ".erl"), - ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]), + ok = file:write(Hdl, [EbinDir, "/", Module, ".beam: ", + Path]), ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end, ok, Dep), - file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"]) + file:write(Hdl, ["\n"]) end, ok, Deps), ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]), ok = file:sync(Hdl), diff --git a/src/rabbit.erl b/src/rabbit.erl index 700acede..b1204997 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,27 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). -rabbit_boot_step({rabbit_exchange_type_registry, [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 264192ff..d1834b3b 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -131,7 +131,7 @@ stop() -> ok. usage() -> - io:format("~s", [rabbitmqctl_usage:usage()]), + io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). action(stop, Node, [], Inform) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9abc1695..81cecb38 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 2ff39118..336f74bf 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -87,7 +87,7 @@ stop() -> ok. usage() -> - io:format("~s", [rabbitmqmulti_usage:usage()]), + io:format("~s", [rabbit_multi_usage:usage()]), halt(1). action(start_all, [NodeCount], RpcTimeout) -> diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 00000000..b883d4f0 --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,135 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). +%% +%% 2. Allow the submission to the pool_worker to be async. + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, State #state { pending = queue:in(From, Pending) }}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, From}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 } + end}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 00000000..4ded63a8 --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 00000000..fc3ce371 --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |