summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-29 12:18:04 +0100
committerMatthew Sackman <matthew@lshift.net>2010-03-29 12:18:04 +0100
commit5409a68783e6e909fe8ed4dcc018a23654758179 (patch)
treee02863467d4fcaca06cf7057da8045f34baa6843
parenta502d7b954bff8a8c0e3773b5285294fc3d7e57a (diff)
parentfc1d45b378e1dcbcc7dec9cba55e2e9f0159fe04 (diff)
downloadrabbitmq-server-5409a68783e6e909fe8ed4dcc018a23654758179.tar.gz
Merging bug 22559 onto default
-rw-r--r--.hgignore4
-rw-r--r--Makefile59
-rw-r--r--docs/examples-to-end.xsl13
-rw-r--r--docs/rabbitmq.conf.5.xml12
-rw-r--r--docs/usage.xsl4
-rw-r--r--generate_deps5
-rw-r--r--src/rabbit.erl22
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_misc.erl2
-rw-r--r--src/rabbit_multi.erl2
-rw-r--r--src/worker_pool.erl135
-rw-r--r--src/worker_pool_sup.erl69
-rw-r--r--src/worker_pool_worker.erl94
13 files changed, 374 insertions, 49 deletions
diff --git a/.hgignore b/.hgignore
index 75ccfb73..caaa3ace 100644
--- a/.hgignore
+++ b/.hgignore
@@ -11,8 +11,7 @@ syntax: regexp
^dist/
^include/rabbit_framing\.hrl$
^src/rabbit_framing\.erl$
-^src/rabbitmqctl_usage\.erl$
-^src/rabbitmqmulti_usage\.erl$
+^src/.*\_usage.erl$
^rabbit\.plt$
^basic.plt$
^ebin/rabbit\.(app|rel|boot|script)$
@@ -28,4 +27,3 @@ syntax: regexp
^docs/.*\.[15]\.gz$
^docs/.*\.man\.xml$
-^docs/.*\.usage.erl$
diff --git a/Makefile b/Makefile
index c93d8085..c946f92b 100644
--- a/Makefile
+++ b/Makefile
@@ -10,14 +10,16 @@ DEPS_FILE=deps.mk
SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
+DOCS_DIR=docs
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
-SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl $(SOURCE_DIR)/rabbitmqmulti_usage.erl
+SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
-MANPAGES=$(patsubst %.xml, %.gz, $(wildcard docs/*.[0-9].xml))
-WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard docs/*.[0-9].xml) docs/rabbitmq-service.xml)
-USAGES=$(patsubst %.1.xml, %.usage.erl, $(wildcard docs/*.[0-9].xml))
+MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
+WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
+USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
+USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
@@ -60,6 +62,14 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
ERL_EBIN=erl -noinput -pa $(EBIN_DIR)
+define usage_xml_to_erl
+ $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, $(SOURCE_DIR)/rabbit_%_usage.erl, $(subst -,_,$(1))))
+endef
+
+define usage_dep
+ $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
+endef
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -68,9 +78,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl
+$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
@@ -78,12 +87,6 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
-$(SOURCE_DIR)/rabbitmqctl_usage.erl: docs/rabbitmqctl.usage.erl
- cp docs/rabbitmqctl.usage.erl $@
-
-$(SOURCE_DIR)/rabbitmqmulti_usage.erl: docs/rabbitmq-multi.usage.erl
- cp docs/rabbitmq-multi.usage.erl $@
-
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
"rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))."
@@ -107,8 +110,8 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
- rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl codegen.pyc
- rm -f docs/*.[0-9].gz docs/*.usage.erl docs/*.man.xml docs/*.erl
+ rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
+ rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
@@ -184,7 +187,7 @@ srcdist: distclean
cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
- cp -r docs $(TARGET_SRC_DIR)
+ cp -r $(DOCS_DIR) $(TARGET_SRC_DIR)
chmod 0755 $(TARGET_SRC_DIR)/scripts/*
(cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME))
@@ -197,25 +200,25 @@ distclean: clean
find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \;
# xmlto can not read from standard input, so we mess with a tmp file.
-%.gz: %.xml docs/examples-to-end.xsl
- xsltproc docs/examples-to-end.xsl $< > $<.tmp && \
- xmlto man -o docs $<.tmp && \
- gzip -f docs/`basename $< .xml`
+%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
+ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
+ xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \
+ gzip -f $(DOCS_DIR)/`basename $< .xml`
rm -f $<.tmp
-%.usage.erl: %.1.xml docs/usage.xsl
- xsltproc --stringparam modulename "`basename $< .1.xml | tr -d -`_usage" \
- docs/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \
- fold -s > docs/`basename $< .1.xml`.usage.erl
+$(SOURCE_DIR)/%_usage.erl:
+ xsltproc --stringparam modulename "`basename $@ .erl`" \
+ $(DOCS_DIR)/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \
+ fold -s > $@
# We rename the file before xmlto sees it since xmlto will use the name of
# the file to make internal links.
-%.man.xml: %.xml docs/html-to-website-xml.xsl
+%.man.xml: %.xml $(DOCS_DIR)/html-to-website-xml.xsl
cp $< `basename $< .xml`.xml && \
xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml
cat `basename $< .xml`.html | \
- xsltproc --novalid docs/remove-namespaces.xsl - | \
- xsltproc --stringparam original `basename $<` docs/html-to-website-xml.xsl - | \
+ xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \
+ xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \
xmllint --format - > $@
rm `basename $< .xml`.html
@@ -237,7 +240,7 @@ install: all docs_all install_dirs
done
for section in 1 5; do \
mkdir -p $(MAN_DIR)/man$$section; \
- for manpage in docs/*.$$section.gz; do \
+ for manpage in $(DOCS_DIR)/*.$$section.gz; do \
cp $$manpage $(MAN_DIR)/man$$section; \
done; \
done
@@ -246,6 +249,8 @@ install_dirs:
mkdir -p $(SBIN_DIR)
mkdir -p $(TARGET_DIR)/sbin
+$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
+
# Note that all targets which depend on clean must have clean in their
# name. Also any target that doesn't depend on clean should not have
# clean in its name, unless you know that you don't need any of the
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
index b63ffcb3..496fcc1c 100644
--- a/docs/examples-to-end.xsl
+++ b/docs/examples-to-end.xsl
@@ -8,7 +8,7 @@
<xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" />
-<!-- Don't copy exmaples through in place -->
+<!-- Don't copy examples through in place -->
<xsl:template match="*[@role='example-prefix']"/>
<xsl:template match="*[@role='example']"/>
@@ -25,6 +25,7 @@
</xsl:for-each>
<refsect1>
<title>Examples</title>
+<xsl:if test="//screen[@role='example']">
<variablelist>
<xsl:for-each select="//screen[@role='example']">
<varlistentry>
@@ -35,6 +36,16 @@
</varlistentry>
</xsl:for-each>
</variablelist>
+</xsl:if>
+<!--
+We need to handle multiline examples separately, since not using a
+variablelist leads to slightly less nice formatting (the explanation doesn't get
+indented)
+-->
+<xsl:for-each select="//screen[@role='example-multiline']">
+<screen><emphasis role="bold"><xsl:copy-of select="text()"/></emphasis></screen>
+<xsl:copy-of select="following-sibling::para[@role='example']"/>
+</xsl:for-each>
</refsect1>
</refentry>
</xsl:template>
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml
index dcb1e49c..34f20f92 100644
--- a/docs/rabbitmq.conf.5.xml
+++ b/docs/rabbitmq.conf.5.xml
@@ -9,7 +9,7 @@
</refentryinfo>
<refmeta>
- <refentrytitle>/etc/rabbitmq/rabbitmq.conf</refentrytitle>
+ <refentrytitle>rabbitmq.conf</refentrytitle>
<manvolnum>5</manvolnum>
<refmiscinfo class="manual">RabbitMQ Server</refmiscinfo>
</refmeta>
@@ -59,11 +59,11 @@ environment variable names, with the <envar>RABBITMQ_</envar> prefix removed:
<filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc.
</para>
<para role="example-prefix">For example:</para>
- <screen role="example">
- # I am a complete /etc/rabbitmq/rabbitmq.conf file.
- # Comment lines start with a hash character.
- # This is a /bin/sh script file - use ordinary envt var syntax
- NODENAME=hare
+ <screen role="example-multiline">
+# I am a complete /etc/rabbitmq/rabbitmq.conf file.
+# Comment lines start with a hash character.
+# This is a /bin/sh script file - use ordinary envt var syntax
+NODENAME=hare
</screen>
<para role="example">
This is an example of a complete
diff --git a/docs/usage.xsl b/docs/usage.xsl
index f2e4c822..a2917401 100644
--- a/docs/usage.xsl
+++ b/docs/usage.xsl
@@ -12,7 +12,7 @@
encoding="UTF-8"
indent="no"/>
<xsl:strip-space elements="*"/>
-<xsl:preserve-space elements="term" />
+<xsl:preserve-space elements="cmdsynopsis arg" />
<xsl:template match="/">
<!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit!
@@ -74,7 +74,7 @@ usage() -> %QUOTE%Usage:
<!-- Don't show anything else in command usage -->
<xsl:template match="text()" mode="command-usage"/>
-<xsl:template match="option">[<xsl:apply-templates/>]</xsl:template>
+<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template>
<xsl:template match="replaceable">&lt;<xsl:value-of select="."/>&gt;</xsl:template>
</xsl:stylesheet>
diff --git a/generate_deps b/generate_deps
index e16624d2..29587b5a 100644
--- a/generate_deps
+++ b/generate_deps
@@ -23,10 +23,11 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) ->
ok;
(Path, Dep, ok) ->
Module = filename:basename(Path, ".erl"),
- ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]),
+ ok = file:write(Hdl, [EbinDir, "/", Module, ".beam: ",
+ Path]),
ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end,
ok, Dep),
- file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"])
+ file:write(Hdl, ["\n"])
end, ok, Deps),
ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]),
ok = file:sync(Hdl),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 700acede..b1204997 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -51,27 +51,39 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
- {enables, kernel_ready}]}).
+ {enables, external_infrastructure}]}).
+
+-rabbit_boot_step({worker_pool,
+ [{description, "worker pool"},
+ {mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {enables, external_infrastructure}]}).
+
+-rabbit_boot_step({external_infrastructure,
+ [{description, "external infrastructure ready"}]}).
-rabbit_boot_step({rabbit_exchange_type_registry,
[{description, "exchange type registry"},
{mfa, {rabbit_sup, start_child,
[rabbit_exchange_type_registry]}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_log]}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {enables, kernel_ready}]}).
+ {enables, kernel_ready},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({kernel_ready,
- [{description, "kernel ready"}]}).
+ [{description, "kernel ready"},
+ {requires, external_infrastructure}]}).
-rabbit_boot_step({rabbit_alarm,
[{description, "alarm handler"},
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 264192ff..d1834b3b 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -131,7 +131,7 @@ stop() ->
ok.
usage() ->
- io:format("~s", [rabbitmqctl_usage:usage()]),
+ io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
action(stop, Node, [], Inform) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9abc1695..81cecb38 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -307,7 +307,7 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case mnesia:sync_transaction(TxFun) of
+ case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
end.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 2ff39118..336f74bf 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -87,7 +87,7 @@ stop() ->
ok.
usage() ->
- io:format("~s", [rabbitmqmulti_usage:usage()]),
+ io:format("~s", [rabbit_multi_usage:usage()]),
halt(1).
action(start_all, [NodeCount], RpcTimeout) ->
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
new file mode 100644
index 00000000..b883d4f0
--- /dev/null
+++ b/src/worker_pool.erl
@@ -0,0 +1,135 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool).
+
+%% Generic worker pool manager.
+%%
+%% Supports nested submission of jobs (nested jobs always run
+%% immediately in current worker process).
+%%
+%% Possible future enhancements:
+%%
+%% 1. Allow priorities (basically, change the pending queue to a
+%% priority_queue).
+%%
+%% 2. Allow the submission to the pool_worker to be async.
+
+-behaviour(gen_server2).
+
+-export([start_link/0, submit/1, idle/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+-record(state, { available, pending }).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
+
+submit(Fun) ->
+ case get(worker_pool_worker) of
+ true -> worker_pool_worker:run(Fun);
+ _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ worker_pool_worker:submit(Pid, Fun)
+ end.
+
+idle(WId) ->
+ gen_server2:cast(?SERVER, {idle, WId}).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(next_free, From, State = #state { available = Avail,
+ pending = Pending }) ->
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ {noreply, State #state { pending = queue:in(From, Pending) }};
+ {{value, WId}, Avail1} ->
+ {reply, get_worker_pid(WId), State #state { available = Avail1 }}
+ end;
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast({idle, WId}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply, case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = queue:in(WId, Avail) };
+ {{value, From}, Pending1} ->
+ gen_server2:reply(From, get_worker_pid(WId)),
+ State #state { pending = Pending1 }
+ end};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+get_worker_pid(WId) ->
+ [{WId, Pid, _Type, _Modules} | _] =
+ lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
+ when Id =:= WId -> false;
+ (_) -> true
+ end,
+ supervisor:which_children(worker_pool_sup)),
+ Pid.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
new file mode 100644
index 00000000..4ded63a8
--- /dev/null
+++ b/src/worker_pool_sup.erl
@@ -0,0 +1,69 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0, start_link/1]).
+
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ start_link(erlang:system_info(schedulers)).
+
+start_link(WCount) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]).
+
+%%----------------------------------------------------------------------------
+
+init([WCount]) ->
+ {ok, {{one_for_one, 10, 10},
+ [{worker_pool, {worker_pool, start_link, []}, transient,
+ 16#ffffffff, worker, [worker_pool]} |
+ [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff,
+ worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
new file mode 100644
index 00000000..fc3ce371
--- /dev/null
+++ b/src/worker_pool_worker.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(worker_pool_worker).
+
+-behaviour(gen_server2).
+
+-export([start_link/1, submit/2, run/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link(WId) ->
+ gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+
+submit(Pid, Fun) ->
+ gen_server2:call(Pid, {submit, Fun}, infinity).
+
+init([WId]) ->
+ ok = worker_pool:idle(WId),
+ put(worker_pool_worker, true),
+ {ok, WId, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({submit, Fun}, From, WId) ->
+ gen_server2:reply(From, run(Fun)),
+ ok = worker_pool:idle(WId),
+ {noreply, WId};
+
+handle_call(Msg, _From, State) ->
+ {stop, {unexpected_call, Msg}, State}.
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_cast, Msg}, State}.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+%%----------------------------------------------------------------------------
+
+run({M, F, A}) ->
+ apply(M, F, A);
+run(Fun) ->
+ Fun().