summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2010-05-27 16:40:15 +0100
committerEmile Joubert <emile@rabbitmq.com>2010-05-27 16:40:15 +0100
commit6258717370e33bfa3d31a0c420d3d32d435bd7cc (patch)
tree398026c8669db20e273b63e82b7592e9120118da
parent51fc0609e180ee83ec18956e3bcdc3f33f873e0d (diff)
parent615eb18470ed8b69fb6f94c8331c0e09559d263d (diff)
downloadrabbitmq-server-6258717370e33bfa3d31a0c420d3d32d435bd7cc.tar.gz
Merge bug22596 into default
-rw-r--r--Makefile16
-rw-r--r--docs/examples-to-end.xsl4
-rw-r--r--docs/html-to-website-xml.xsl10
-rw-r--r--docs/rabbitmq-activate-plugins.1.xml2
-rw-r--r--docs/rabbitmq-deactivate-plugins.1.xml2
-rw-r--r--docs/rabbitmq-multi.1.xml2
-rw-r--r--docs/rabbitmq-server.1.xml6
-rw-r--r--docs/rabbitmq-service.xml24
-rw-r--r--docs/rabbitmq.conf.5.xml2
-rw-r--r--docs/rabbitmqctl.1.xml33
-rw-r--r--docs/usage.xsl10
-rw-r--r--ebin/rabbit_app.in3
-rw-r--r--include/rabbit.hrl30
-rw-r--r--include/rabbit_backing_queue_spec.hrl63
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec14
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf362
-rw-r--r--packaging/debs/Debian/debian/control4
-rw-r--r--packaging/debs/Debian/debian/postrm.in19
-rw-r--r--packaging/debs/Debian/debian/rules3
-rw-r--r--packaging/macports/Makefile2
-rw-r--r--packaging/macports/Portfile.in2
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--src/delegate.erl211
-rw-r--r--src/delegate_sup.erl63
-rw-r--r--src/file_handle_cache.erl862
-rw-r--r--src/gen_server2.erl8
-rw-r--r--src/rabbit.erl120
-rw-r--r--src/rabbit_amqqueue.erl282
-rw-r--r--src/rabbit_amqqueue_process.erl792
-rw-r--r--src/rabbit_amqqueue_sup.erl15
-rw-r--r--src/rabbit_backing_queue.erl133
-rw-r--r--src/rabbit_basic.erl29
-rw-r--r--src/rabbit_binary_generator.erl54
-rw-r--r--src/rabbit_channel.erl209
-rw-r--r--src/rabbit_dialyzer.erl6
-rw-r--r--src/rabbit_exchange.erl13
-rw-r--r--src/rabbit_guid.erl14
-rw-r--r--src/rabbit_invariable_queue.erl276
-rw-r--r--src/rabbit_limiter.erl5
-rw-r--r--src/rabbit_memory_monitor.erl293
-rw-r--r--src/rabbit_misc.erl143
-rw-r--r--src/rabbit_mnesia.erl7
-rw-r--r--src/rabbit_networking.erl1
-rw-r--r--src/rabbit_persister.erl281
-rw-r--r--src/rabbit_plugin_activator.erl1
-rw-r--r--src/rabbit_reader.erl39
-rw-r--r--src/rabbit_reader_queue_collector.erl108
-rw-r--r--src/rabbit_router.erl155
-rw-r--r--src/rabbit_sup.erl7
-rw-r--r--src/rabbit_tests.erl127
-rw-r--r--src/supervisor2.erl917
-rw-r--r--src/tcp_acceptor.erl10
-rw-r--r--src/worker_pool.erl28
-rw-r--r--src/worker_pool_worker.erl40
55 files changed, 4654 insertions, 1211 deletions
diff --git a/Makefile b/Makefile
index 6cb086ab..982780c7 100644
--- a/Makefile
+++ b/Makefile
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME)
SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
-AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
+AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
@@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
$(EBIN_DIR)/%.beam:
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
-$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
+$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
-$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
+$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
@@ -164,7 +164,11 @@ stop-node:
COVER_DIR=.
start-cover: all
- echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
+ echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL)
+ echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
+
+start-secondary-cover: all
+ echo "rabbit_misc:start_cover([\"hare\"])." | $(ERL_CALL)
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl
index 496fcc1c..d9686ada 100644
--- a/docs/examples-to-end.xsl
+++ b/docs/examples-to-end.xsl
@@ -55,7 +55,7 @@ indented)
<term>
<cmdsynopsis>
- <command>list_connections</command>
+ <command>list_connections</command>
<arg choice="opt">
<replaceable>connectioninfoitem</replaceable>
...
@@ -66,7 +66,7 @@ indented)
However, while DocBook renders this sensibly for HTML, for some reason it
doen't show anything inside <cmdsynopsis> at all for man pages. I think what
we're doing is semantically correct so this is a bug in DocBook. The following
- rules essentially do what DocBook does when <cmdsynopsis> is not inside a
+ rules essentially do what DocBook does when <cmdsynopsis> is not inside a
<term>.
-->
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index a35b8699..f2117e26 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -26,8 +26,8 @@
<xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
<p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
</p>
<p>
<a href="manpages.html">See a list of all manual pages</a>.
@@ -35,13 +35,13 @@
</xsl:when>
<xsl:otherwise>
<p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
</p>
</xsl:otherwise>
</xsl:choose>
<p>
- For more general documentation, please see the
+ For more general documentation, please see the
<a href="admin-guide.html">administrator's guide</a>.
</p>
diff --git a/docs/rabbitmq-activate-plugins.1.xml b/docs/rabbitmq-activate-plugins.1.xml
index ef81c201..5f831634 100644
--- a/docs/rabbitmq-activate-plugins.1.xml
+++ b/docs/rabbitmq-activate-plugins.1.xml
@@ -24,7 +24,7 @@
<command>rabbitmq-activate-plugins</command>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmq-deactivate-plugins.1.xml b/docs/rabbitmq-deactivate-plugins.1.xml
index eacd014b..bbf1207e 100644
--- a/docs/rabbitmq-deactivate-plugins.1.xml
+++ b/docs/rabbitmq-deactivate-plugins.1.xml
@@ -24,7 +24,7 @@
<command>rabbitmq-deactivate-plugins</command>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml
index b3862fdf..6586890a 100644
--- a/docs/rabbitmq-multi.1.xml
+++ b/docs/rabbitmq-multi.1.xml
@@ -26,7 +26,7 @@
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml
index 25c2aefb..921da4f1 100644
--- a/docs/rabbitmq-server.1.xml
+++ b/docs/rabbitmq-server.1.xml
@@ -25,7 +25,7 @@
<arg choice="opt">-detached</arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
@@ -72,7 +72,7 @@ be placed in this directory.
<para>
Defaults to rabbit. This can be useful if you want to run more than
one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per
-erlang-node-and-machine combination. See the
+erlang-node-and-machine combination. See the
<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single
machine guide</ulink> for details.
</para>
@@ -93,7 +93,7 @@ one network interface.
<term>RABBITMQ_NODE_PORT</term>
<listitem>
<para>
-Defaults to 5672.
+Defaults to 5672.
</para>
</listitem>
</varlistentry>
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index d59ed638..2b416e3e 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -24,7 +24,7 @@
<arg choice="opt">command</arg>
</cmdsynopsis>
</refsynopsisdiv>
-
+
<refsect1>
<title>Description</title>
<para>
@@ -34,14 +34,14 @@ scalable implementation of an AMQP broker.
</para>
<para>
Running <command>rabbitmq-service</command> allows the RabbitMQ broker to be run as a
-service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker
-service can be started and stopped using the Windows® services
-applet.
+service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker
+service can be started and stopped using the Windows® services
+applet.
</para>
<para>
-By default the service will run in the authentication context of the
+By default the service will run in the authentication context of the
local system account. It is therefore necessary to synchronise Erlang
-cookies between the local system account (typically
+cookies between the local system account (typically
<filename>C:\WINDOWS\.erlang.cookie</filename> and the account that will be used to
run <command>rabbitmqctl</command>.
</para>
@@ -87,7 +87,7 @@ deleted as a consequence and <command>rabbitmq-server</command> will remain oper
<listitem>
<para>
Start the service. The service must have been correctly installed
-beforehand.
+beforehand.
</para>
</listitem>
</varlistentry>
@@ -96,7 +96,7 @@ beforehand.
<term>stop</term>
<listitem>
<para>
-Stop the service. The service must be running for this command to
+Stop the service. The service must be running for this command to
have any effect.
</para>
</listitem>
@@ -154,7 +154,7 @@ This is the location of log and database directories.
<para>
Defaults to rabbit. This can be useful if you want to run more than
one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per
-erlang-node-and-machine combination. See the
+erlang-node-and-machine combination. See the
<ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single
machine guide</ulink> for details.
</para>
@@ -175,7 +175,7 @@ one network interface.
<term>RABBITMQ_NODE_PORT</term>
<listitem>
<para>
-Defaults to 5672.
+Defaults to 5672.
</para>
</listitem>
</varlistentry>
@@ -208,11 +208,11 @@ for details.
<term>RABBITMQ_CONSOLE_LOG</term>
<listitem>
<para>
-Set this varable to <code>new</code> or <code>reuse</code> to have the console
+Set this varable to <code>new</code> or <code>reuse</code> to have the console
output from the server redirected to a file named <code>SERVICENAME</code>.debug
in the application data directory of the user that installed the service.
Under Vista this will be <filename>C:\Users\AppData\username\SERVICENAME</filename>.
-Under previous versions of Windows this will be
+Under previous versions of Windows this will be
<filename>C:\Documents and Settings\username\Application Data\SERVICENAME</filename>.
If <code>RABBITMQ_CONSOLE_LOG</code> is set to <code>new</code> then a new file will be
created each time the service starts. If <code>RABBITMQ_CONSOLE_LOG</code> is
diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml
index 34f20f92..31de7164 100644
--- a/docs/rabbitmq.conf.5.xml
+++ b/docs/rabbitmq.conf.5.xml
@@ -18,7 +18,7 @@
<refname>rabbitmq.conf</refname>
<refpurpose>default settings for RabbitMQ AMQP server</refpurpose>
</refnamediv>
-
+
<refsect1>
<title>Description</title>
<para>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 8043dfc7..a2038cf0 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1,19 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
-<!--
+<!--
There is some extra magic in this document besides the usual DocBook semantics
- to allow us to derive manpages, HTML and usage messages from the same source
+ to allow us to derive manpages, HTML and usage messages from the same source
document.
Examples need to be moved to the end for man pages. To this end, <para>s and
- <screen>s with role="example" will be moved, and with role="example-prefix"
+ <screen>s with role="example" will be moved, and with role="example-prefix"
will be removed.
The usage messages are more involved. We have some magic in usage.xsl to pull
out the command synopsis, global option and subcommand synopses. We also pull
out <para>s with role="usage".
- Finally we construct lists of possible values for subcommand options, if the
+ Finally we construct lists of possible values for subcommand options, if the
subcommand's <varlistentry> has role="usage-has-option-list". The option which
takes the values should be marked with role="usage-option-list".
-->
@@ -664,7 +664,7 @@
<para>
The <command>queueinfoitem</command> parameter is used to indicate which queue
information items to include in the results. The column order in the
- results will match the order of the parameters.
+ results will match the order of the parameters.
<command>queueinfoitem</command> can take any value from the list
that follows:
</para>
@@ -715,28 +715,15 @@
<listitem><para>Number of messages delivered to clients but not yet acknowledged.</para></listitem>
</varlistentry>
<varlistentry>
- <term>messages_uncommitted</term>
- <listitem><para>Number of messages published in as yet uncommitted transactions</para></listitem>
- </varlistentry>
- <varlistentry>
<term>messages</term>
- <listitem><para>Sum of ready, unacknowledged and uncommitted messages
+ <listitem><para>Sum of ready and unacknowledged messages
(queue depth).</para></listitem>
</varlistentry>
<varlistentry>
- <term>acks_uncommitted</term>
- <listitem><para>Number of acknowledgements received in as yet uncommitted
- transactions.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>consumers</term>
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
- <term>transactions</term>
- <listitem><para>Number of transactions.</para></listitem>
- </varlistentry>
- <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
@@ -768,7 +755,7 @@
<para>
The <command>exchangeinfoitem</command> parameter is used to indicate which
exchange information items to include in the results. The column order in the
- results will match the order of the parameters.
+ results will match the order of the parameters.
<command>exchangeinfoitem</command> can take any value from the list
that follows:
</para>
@@ -797,7 +784,7 @@
</varlistentry>
</variablelist>
<para>
- If no <command>exchangeinfoitem</command>s are specified then
+ If no <command>exchangeinfoitem</command>s are specified then
exchange name and type are displayed.
</para>
<para role="example-prefix">
@@ -839,7 +826,7 @@
<para>
The <command>connectioninfoitem</command> parameter is used to indicate
which connection information items to include in the results. The
- column order in the results will match the order of the parameters.
+ column order in the results will match the order of the parameters.
<command>connectioninfoitem</command> can take any value from the list
that follows:
</para>
@@ -945,7 +932,7 @@
The <command>channelinfoitem</command> parameter is used to
indicate which channel information items to include in the
results. The column order in the results will match the
- order of the parameters.
+ order of the parameters.
<command>channelinfoitem</command> can take any value from the list
that follows:
</para>
diff --git a/docs/usage.xsl b/docs/usage.xsl
index 72f8880a..a6cebd93 100644
--- a/docs/usage.xsl
+++ b/docs/usage.xsl
@@ -11,7 +11,7 @@
<xsl:output method="text"
encoding="UTF-8"
indent="no"/>
-<xsl:strip-space elements="*"/>
+<xsl:strip-space elements="*"/>
<xsl:preserve-space elements="cmdsynopsis arg" />
<xsl:template match="/">
@@ -19,7 +19,7 @@
-module(<xsl:value-of select="$modulename" />).
-export([usage/0]).
usage() -> %QUOTE%Usage:
-<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/>
+<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/>
<xsl:text> </xsl:text>
<xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg">
<xsl:apply-templates select="." />
@@ -28,7 +28,7 @@ usage() -> %QUOTE%Usage:
<xsl:text>&#10;</xsl:text>
-<!-- List options (any variable list in a section called "Options"). -->
+<!-- List options (any variable list in a section called "Options"). -->
<xsl:for-each select=".//*[title='Options']/variablelist">
<xsl:if test="position() = 1">&#10;Options:&#10;</xsl:if>
<xsl:for-each select="varlistentry">
@@ -40,13 +40,13 @@ usage() -> %QUOTE%Usage:
</xsl:for-each>
</xsl:for-each>
-<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). -->
+<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). -->
<xsl:text>&#10;</xsl:text>
<xsl:for-each select=".//*[title='Options']//para[@role='usage']">
<xsl:value-of select="normalize-space(.)"/><xsl:text>&#10;&#10;</xsl:text>
</xsl:for-each>
-<!-- List commands (any first-level variable list in a section called "Commands"). -->
+<!-- List commands (any first-level variable list in a section called "Commands"). -->
<xsl:for-each select=".//*[title='Commands']/variablelist | .//*[title='Commands']/refsect2/variablelist">
<xsl:if test="position() = 1">Commands:&#10;</xsl:if>
<xsl:for-each select="varlistentry">
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 3616fcbf..bdf407eb 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -18,6 +18,9 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
+ {backing_queue_module, rabbit_invariable_queue},
+ {persister_max_wrap_entries, 500},
+ {persister_hibernate_after, 10000},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 38142491..0d75310b 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -51,7 +51,8 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, pid}).
+-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
+ arguments, pid}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -62,7 +63,8 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content, guid,
+ is_persistent}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
@@ -83,9 +85,10 @@
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).
-type(regexp() :: binary()).
+-type(file_path() :: string()).
%% this is really an abstract type, but dialyzer does not support them
--type(guid() :: any()).
+-type(guid() :: binary()).
-type(txn() :: guid()).
-type(pkey() :: guid()).
-type(r(Kind) ::
@@ -102,11 +105,12 @@
write :: regexp(),
read :: regexp()}).
-type(amqqueue() ::
- #amqqueue{name :: queue_name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: amqp_table(),
- pid :: maybe(pid())}).
+ #amqqueue{name :: queue_name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: maybe(pid()),
+ arguments :: amqp_table(),
+ pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
type :: exchange_type(),
@@ -144,7 +148,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
@@ -154,7 +159,7 @@
message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
+-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-type(listener() ::
#listener{node :: erlang_node(),
protocol :: atom(),
@@ -166,15 +171,20 @@
#amqp_error{name :: atom(),
explanation :: string(),
method :: atom()}).
+
-endif.
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
+-define(ERTS_MINIMUM, "5.6.3").
-define(MAX_WAIT, 16#ffffffff).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
new file mode 100644
index 00000000..1b536dfa
--- /dev/null
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-type(fetch_result() ::
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})).
+-type(is_durable() :: boolean()).
+-type(attempt_recovery() :: boolean()).
+-type(purged_msg_count() :: non_neg_integer()).
+-type(ack_required() :: boolean()).
+
+-spec(start/1 :: ([queue_name()]) -> 'ok').
+-spec(init/3 :: (queue_name(), is_durable(), attempt_recovery()) -> state()).
+-spec(terminate/1 :: (state()) -> state()).
+-spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
+-spec(publish/2 :: (basic_message(), state()) -> state()).
+-spec(publish_delivered/3 ::
+ (ack_required(), basic_message(), state()) -> {ack(), state()}).
+-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
+-spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()).
+-spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()).
+-spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(requeue/2 :: ([ack()], state()) -> state()).
+-spec(len/1 :: (state()) -> non_neg_integer()).
+-spec(is_empty/1 :: (state()) -> boolean()).
+-spec(set_ram_duration_target/2 ::
+ (('undefined' | 'infinity' | number()), state()) -> state()).
+-spec(ram_duration/1 :: (state()) -> {number(), state()}).
+-spec(needs_sync/1 :: (state()) -> boolean()).
+-spec(sync/1 :: (state()) -> state()).
+-spec(handle_pre_hibernate/1 :: (state()) -> state()).
+-spec(status/1 :: (state()) -> [{atom(), any()}]).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index c318a96c..00066a15 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -10,10 +10,11 @@ Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-asroot-script-wrapper
+Source5: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang, python-simplejson
-Requires: erlang, logrotate
+BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt
+Requires: erlang >= R12B-3, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
Requires(post): %%REQUIRES%%
@@ -29,6 +30,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version}
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
+%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -38,6 +40,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_asroot_wrapper}
+cp %{S:5} %{_rabbit_server_ocf}
make %{?_smp_mflags}
%install
@@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins
+install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -103,6 +107,12 @@ if [ $1 = 0 ]; then
# Leave rabbitmq user and group
fi
+# Clean out plugin activation state, both on uninstall and upgrade
+rm -rf %{_rabbit_erllibdir}/priv
+for ext in rel script boot ; do
+ rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext
+done
+
%files -f ../%{name}.files
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
new file mode 100755
index 00000000..97c58ea2
--- /dev/null
+++ b/packaging/common/rabbitmq-server.ocf
@@ -0,0 +1,362 @@
+#!/bin/sh
+##
+## OCF Resource Agent compliant rabbitmq-server resource script.
+##
+
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2010 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+## OCF instance parameters
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
+
+#######################################################################
+# Initialization:
+
+. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+
+#######################################################################
+
+OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi"
+OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
+OCF_RESKEY_nodename_default="rabbit@localhost"
+OCF_RESKEY_log_base_default="/var/log/rabbitmq"
+: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}}
+: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
+: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}}
+: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
+
+meta_data() {
+ cat <<END
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="rabbitmq-server">
+<version>1.0</version>
+
+<longdesc lang="en">
+Resource agent for RabbitMQ-server
+</longdesc>
+
+<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc>
+
+<parameters>
+<parameter name="multi" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmq-multi script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmq-multi</shortdesc>
+<content type="string" default="${OCF_RESKEY_multi_default}" />
+</parameter>
+
+<parameter name="ctl" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmqctl script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmqctl</shortdesc>
+<content type="string" default="${OCF_RESKEY_ctl_default}" />
+</parameter>
+
+<parameter name="nodename" unique="0" required="0">
+<longdesc lang="en">
+The node name for rabbitmq-server
+</longdesc>
+<shortdesc lang="en">Node name</shortdesc>
+<content type="string" default="${OCF_RESKEY_nodename_default}" />
+</parameter>
+
+<parameter name="ip" unique="0" required="0">
+<longdesc lang="en">
+The IP address for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Address</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="port" unique="0" required="0">
+<longdesc lang="en">
+The IP Port for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Port</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="cluster_config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the cluster config file
+</longdesc>
+<shortdesc lang="en">Cluster config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the config file
+</longdesc>
+<shortdesc lang="en">Config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="log_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which logs will be created
+</longdesc>
+<shortdesc lang="en">Log base path</shortdesc>
+<content type="string" default="${OCF_RESKEY_log_base_default}" />
+</parameter>
+
+<parameter name="mnesia_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which mnesia will store data
+</longdesc>
+<shortdesc lang="en">Mnesia base path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="server_start_args" unique="0" required="0">
+<longdesc lang="en">
+Additional arguments provided to the server on startup
+</longdesc>
+<shortdesc lang="en">Server start arguments</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+</parameters>
+
+<actions>
+<action name="start" timeout="600" />
+<action name="stop" timeout="120" />
+<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="validate-all" timeout="30" />
+<action name="meta-data" timeout="5" />
+</actions>
+</resource-agent>
+END
+}
+
+rabbit_usage() {
+ cat <<END
+usage: $0 {start|stop|monitor|validate-all|meta-data}
+
+Expects to have a fully populated OCF RA-compliant environment set.
+END
+}
+
+RABBITMQ_MULTI=$OCF_RESKEY_multi
+RABBITMQ_CTL=$OCF_RESKEY_ctl
+RABBITMQ_NODENAME=$OCF_RESKEY_nodename
+RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip
+RABBITMQ_NODE_PORT=$OCF_RESKEY_port
+RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file
+RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file
+RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base
+RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base
+RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args
+[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME"
+[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME
+
+export_vars() {
+ [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS
+ [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT
+ [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE
+ [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE
+ [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE
+ [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE
+ [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS
+}
+
+rabbit_validate_partial() {
+ if [ ! -x $RABBITMQ_MULTI ]; then
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -x $RABBITMQ_CTL ]; then
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+}
+
+rabbit_validate_full() {
+ if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ rabbit_validate_partial
+
+ return $OCF_SUCCESS
+}
+
+rabbit_status() {
+ local rc
+ $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
+ rc=$?
+ case "$rc" in
+ 0)
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ return $OCF_ERR_GENERIC
+ esac
+}
+
+rabbit_start() {
+ local rc
+
+ rabbit_validate_full
+ rc=$?
+ if [ "$rc" != $OCF_SUCCESS ]; then
+ return $rc
+ fi
+
+ export_vars
+
+ $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to come up.
+ # Let the CRM/LRM time us out if required
+ start_wait=1
+ while [ $start_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_stop() {
+ local rc
+ $RABBITMQ_MULTI stop_all &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to shut down.
+ # Let the CRM/LRM time us out if required
+ stop_wait=1
+ while [ $stop_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
+ break
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_monitor() {
+ rabbit_status
+ return $?
+}
+
+case $__OCF_ACTION in
+ meta-data)
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
+ usage|help)
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
+esac
+
+rabbit_validate_partial || exit
+
+case $__OCF_ACTION in
+ start)
+ rabbit_start
+ ;;
+ stop)
+ rabbit_stop
+ ;;
+ monitor)
+ rabbit_monitor
+ ;;
+ validate-all)
+ exit $OCF_SUCCESS
+ ;;
+ *)
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
+esac
+
+exit $? \ No newline at end of file
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index d4e2cd17..a44f49a0 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,12 +2,12 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-base | erlang-base-hipe, erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in
index bfcf1f53..5290de9b 100644
--- a/packaging/debs/Debian/debian/postrm.in
+++ b/packaging/debs/Debian/debian/postrm.in
@@ -18,6 +18,13 @@ set -e
# for details, see http://www.debian.org/doc/debian-policy/ or
# the debian-policy package
+remove_plugin_traces() {
+ # Remove traces of plugins
+ rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
+ for ext in rel script boot ; do
+ rm -f @RABBIT_LIB@/ebin/rabbit.$ext
+ done
+}
case "$1" in
purge)
@@ -34,11 +41,7 @@ case "$1" in
if [ -d /etc/rabbitmq ]; then
rm -r /etc/rabbitmq
fi
- # Remove traces of plugins
- rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins
- for ext in rel script boot ; do
- rm -f @RABBIT_LIB@/ebin/rabbit.$ext
- done
+ remove_plugin_traces
if getent passwd rabbitmq >/dev/null; then
# Stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
@@ -50,7 +53,11 @@ case "$1" in
fi
;;
- remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
+ remove|upgrade)
+ remove_plugin_traces
+ ;;
+
+ failed-upgrade|abort-install|abort-upgrade|disappear)
;;
*)
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 3799c438..19166514 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -13,7 +13,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/
install/rabbitmq-server::
mkdir -p $(DOCDIR)
- rm $(RABBIT_LIB)LICENSE*
+ rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL*
for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
@@ -21,3 +21,4 @@ install/rabbitmq-server::
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
+ install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile
index 0ef7dd5e..4ad4c30b 100644
--- a/packaging/macports/Makefile
+++ b/packaging/macports/Makefile
@@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile
$(DEST)/files/rabbitmq-script-wrapper
cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files
if [ -n "$(MACPORTS_USERHOST)" ] ; then \
- tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \
+ tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \
d="/tmp/mkportindex.$$$$" ; \
mkdir $$d \
&& cd $$d \
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e1f58212..153727be 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -23,7 +23,7 @@ checksums \
sha1 @sha1@ \
rmd160 @rmd160@
-depends_build port:erlang
+depends_build port:erlang port:xmlto port:libxslt
depends_run port:erlang
platform darwin 7 {
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 638498c1..ccdfc401 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -31,7 +31,7 @@
##
NODENAME=rabbit
-SERVER_ERL_ARGS="+K true +A30 \
+SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 28eb8ebb..57fe1328 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -145,6 +145,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-s rabbit ^
+W w ^
+A30 ^
++P 1048576 ^
-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
diff --git a/src/delegate.erl b/src/delegate.erl
new file mode 100644
index 00000000..98353453
--- /dev/null
+++ b/src/delegate.erl
@@ -0,0 +1,211 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate).
+
+-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2).
+
+-behaviour(gen_server2).
+
+-export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+
+-spec(process_count/0 :: () -> non_neg_integer()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
+%%----------------------------------------------------------------------------
+
+start_link(Hash) ->
+ gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
+
+invoke(Pid, Fun) when is_pid(Pid) ->
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
+ case Res of
+ {ok, Result, _} ->
+ Result;
+ {error, {Class, Reason, StackTrace}, _} ->
+ erlang:raise(Class, Reason, StackTrace)
+ end;
+
+invoke(Pids, Fun) when is_list(Pids) ->
+ lists:foldl(
+ fun({Status, Result, Pid}, {Good, Bad}) ->
+ case Status of
+ ok -> {[{Pid, Result}|Good], Bad};
+ error -> {Good, [{Pid, Result}|Bad]}
+ end
+ end,
+ {[], []},
+ invoke_per_node(split_delegate_per_node(Pids), Fun)).
+
+invoke_no_result(Pid, Fun) when is_pid(Pid) ->
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
+ ok;
+
+invoke_no_result(Pids, Fun) when is_list(Pids) ->
+ invoke_no_result_per_node(split_delegate_per_node(Pids), Fun),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+internal_call(Node, Thunk) when is_atom(Node) ->
+ gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity).
+
+internal_cast(Node, Thunk) when is_atom(Node) ->
+ gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
+
+split_delegate_per_node(Pids) ->
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
+
+invoke_per_node(NodePids, Fun) ->
+ lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
+
+invoke_no_result_per_node(NodePids, Fun) ->
+ delegate_per_node(NodePids, Fun, fun internal_cast/2),
+ ok.
+
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
+ Self = self(),
+ %% Note that this is unsafe if the Fun requires reentrancy to the
+ %% local_server. I.e. if self() == local_server(Node) then we'll
+ %% block forever.
+ [gen_server2:cast(
+ local_server(Node),
+ {thunk, fun() ->
+ Self ! {result,
+ DelegateFun(
+ Node, fun() -> safe_invoke(Pids, Fun) end)}
+ end}) || {Node, Pids} <- NodePids],
+ [receive {result, Result} -> Result end || _ <- NodePids].
+
+local_server(Node) ->
+ case get({delegate_local_server_name, Node}) of
+ undefined ->
+ Name = server(erlang:phash2({self(), Node}, process_count())),
+ put({delegate_local_server_name, Node}, Name),
+ Name;
+ Name -> Name
+ end.
+
+remote_server(Node) ->
+ case get({delegate_remote_server_name, Node}) of
+ undefined ->
+ case rpc:call(Node, delegate, process_count, []) of
+ {badrpc, _} ->
+ %% Have to return something, if we're just casting
+ %% then we don't want to blow up
+ server(1);
+ Count ->
+ Name = server(erlang:phash2({self(), Node}, Count)),
+ put({delegate_remote_server_name, Node}, Name),
+ Name
+ end;
+ Name -> Name
+ end.
+
+server(Hash) ->
+ list_to_atom("delegate_process_" ++ integer_to_list(Hash)).
+
+safe_invoke(Pids, Fun) when is_list(Pids) ->
+ [safe_invoke(Pid, Fun) || Pid <- Pids];
+safe_invoke(Pid, Fun) when is_pid(Pid) ->
+ try
+ {ok, Fun(Pid), Pid}
+ catch
+ Class:Reason ->
+ {error, {Class, Reason, erlang:get_stacktrace()}, Pid}
+ end.
+
+process_count() ->
+ ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers).
+
+%%--------------------------------------------------------------------
+
+init([]) ->
+ {ok, no_state, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+%% We don't need a catch here; we always go via safe_invoke. A catch here would
+%% be the wrong thing anyway since the Thunk can throw multiple errors.
+handle_call({thunk, Thunk}, _From, State) ->
+ {reply, Thunk(), State, hibernate}.
+
+handle_cast({thunk, Thunk}, State) ->
+ Thunk(),
+ {noreply, State, hibernate}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
new file mode 100644
index 00000000..1c1d62a9
--- /dev/null
+++ b/src/delegate_sup.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(delegate_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init(_Args) ->
+ {ok, {{one_for_one, 10, 10},
+ [{Hash, {delegate, start_link, [Hash]},
+ transient, 16#ffffffff, worker, [delegate]} ||
+ Hash <- lists:seq(0, delegate:process_count() - 1)]}}.
+
+%%----------------------------------------------------------------------------
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
new file mode 100644
index 00000000..0f648dcd
--- /dev/null
+++ b/src/file_handle_cache.erl
@@ -0,0 +1,862 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(file_handle_cache).
+
+%% A File Handle Cache
+%%
+%% This extends a subset of the functionality of the Erlang file
+%% module.
+%%
+%% Some constraints
+%% 1) This supports one writer, multiple readers per file. Nothing
+%% else.
+%% 2) Do not open the same file from different processes. Bad things
+%% may happen.
+%% 3) Writes are all appends. You cannot write to the middle of a
+%% file, although you can truncate and then append if you want.
+%% 4) Although there is a write buffer, there is no read buffer. Feel
+%% free to use the read_ahead mode, but beware of the interaction
+%% between that buffer and the write buffer.
+%%
+%% Some benefits
+%% 1) You do not have to remember to call sync before close
+%% 2) Buffering is much more flexible than with plain file module, and
+%% you can control when the buffer gets flushed out. This means that
+%% you can rely on reads-after-writes working, without having to call
+%% the expensive sync.
+%% 3) Unnecessary calls to position and sync get optimised out.
+%% 4) You can find out what your 'real' offset is, and what your
+%% 'virtual' offset is (i.e. where the hdl really is, and where it
+%% would be after the write buffer is written out).
+%% 5) You can find out what the offset was when you last sync'd.
+%%
+%% There is also a server component which serves to limit the number
+%% of open file handles in a "soft" way - the server will never
+%% prevent a client from opening a handle, but may immediately tell it
+%% to close the handle. Thus you can set the limit to zero and it will
+%% still all work correctly, it is just that effectively no caching
+%% will take place. The operation of limiting is as follows:
+%%
+%% On open and close, the client sends messages to the server
+%% informing it of opens and closes. This allows the server to keep
+%% track of the number of open handles. The client also keeps a
+%% gb_tree which is updated on every use of a file handle, mapping the
+%% time at which the file handle was last used (timestamp) to the
+%% handle. Thus the smallest key in this tree maps to the file handle
+%% that has not been used for the longest amount of time. This
+%% smallest key is included in the messages to the server. As such,
+%% the server keeps track of when the least recently used file handle
+%% was used *at the point of the most recent open or close* by each
+%% client.
+%%
+%% Note that this data can go very out of date, by the client using
+%% the least recently used handle.
+%%
+%% When the limit is reached, the server calculates the average age of
+%% the last reported least recently used file handle of all the
+%% clients. It then tells all the clients to close any handles not
+%% used for longer than this average, by invoking the callback the
+%% client registered. The client should receive this message and pass
+%% it into set_maximum_since_use/1. However, it is highly possible
+%% this age will be greater than the ages of all the handles the
+%% client knows of because the client has used its file handles in the
+%% mean time. Thus at this point the client reports to the server the
+%% current timestamp at which its least recently used file handle was
+%% last used. The server will check two seconds later that either it
+%% is back under the limit, in which case all is well again, or if
+%% not, it will calculate a new average age. Its data will be much
+%% more recent now, and so it is very likely that when this is
+%% communicated to the clients, the clients will close file handles.
+%%
+%% The advantage of this scheme is that there is only communication
+%% from the client to the server on open, close, and when in the
+%% process of trying to reduce file handle usage. There is no
+%% communication from the client to the server on normal file handle
+%% operations. This scheme forms a feed-back loop - the server does
+%% not care which file handles are closed, just that some are, and it
+%% checks this repeatedly when over the limit. Given the guarantees of
+%% now(), even if there is just one file handle open, a limit of 1,
+%% and one client, it is certain that when the client calculates the
+%% age of the handle, it will be greater than when the server
+%% calculated it, hence it should be closed.
+%%
+%% Handles which are closed as a result of the server are put into a
+%% "soft-closed" state in which the handle is closed (data flushed out
+%% and sync'd first) but the state is maintained. The handle will be
+%% fully reopened again as soon as needed, thus users of this library
+%% do not need to worry about their handles being closed by the server
+%% - reopening them when necessary is handled transparently.
+%%
+%% The server also supports obtain and release_on_death. obtain/0
+%% blocks until a file descriptor is available. release_on_death/1
+%% takes a pid and monitors the pid, reducing the count by 1 when the
+%% pid dies. Thus the assumption is that obtain/0 is called first, and
+%% when that returns, release_on_death/1 is called with the pid who
+%% "owns" the file descriptor. This is, for example, used to track the
+%% use of file descriptors through network sockets.
+
+-behaviour(gen_server).
+
+-export([register_callback/3]).
+-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
+ last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
+ flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
+-export([release_on_death/1, obtain/0]).
+
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+-define(RESERVED_FOR_OTHERS, 100).
+-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
+-define(FILE_HANDLES_LIMIT_OTHER, 1024).
+-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
+
+%%----------------------------------------------------------------------------
+
+-record(file,
+ { reader_count,
+ has_writer
+ }).
+
+-record(handle,
+ { hdl,
+ offset,
+ trusted_offset,
+ is_dirty,
+ write_buffer_size,
+ write_buffer_size_limit,
+ write_buffer,
+ at_eof,
+ path,
+ mode,
+ options,
+ is_write,
+ is_read,
+ last_used_at
+ }).
+
+-record(fhc_state,
+ { elders,
+ limit,
+ count,
+ obtains,
+ callbacks,
+ client_mrefs,
+ timer_ref
+ }).
+
+%%----------------------------------------------------------------------------
+%% Specs
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(ref() :: any()).
+-type(error() :: {'error', any()}).
+-type(ok_or_error() :: ('ok' | error())).
+-type(val_or_error(T) :: ({'ok', T} | error())).
+-type(position() :: ('bof' | 'eof' | non_neg_integer() |
+ {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})).
+-type(offset() :: non_neg_integer()).
+
+-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
+-spec(open/3 ::
+ (string(), [any()],
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) ->
+ val_or_error(ref())).
+-spec(close/1 :: (ref()) -> ok_or_error()).
+-spec(read/2 :: (ref(), non_neg_integer()) ->
+ val_or_error([char()] | binary()) | 'eof').
+-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
+-spec(sync/1 :: (ref()) -> ok_or_error()).
+-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
+-spec(truncate/1 :: (ref()) -> ok_or_error()).
+-spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())).
+-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())).
+-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
+-spec(flush/1 :: (ref()) -> ok_or_error()).
+-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
+ val_or_error(non_neg_integer())).
+-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
+-spec(delete/1 :: (ref()) -> ok_or_error()).
+-spec(clear/1 :: (ref()) -> ok_or_error()).
+-spec(release_on_death/1 :: (pid()) -> 'ok').
+-spec(obtain/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+
+register_callback(M, F, A)
+ when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
+ gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
+
+open(Path, Mode, Options) ->
+ Path1 = filename:absname(Path),
+ File1 = #file { reader_count = RCount, has_writer = HasWriter } =
+ case get({Path1, fhc_file}) of
+ File = #file {} -> File;
+ undefined -> #file { reader_count = 0,
+ has_writer = false }
+ end,
+ Mode1 = append_to_write(Mode),
+ IsWriter = is_writer(Mode1),
+ case IsWriter andalso HasWriter of
+ true -> {error, writer_exists};
+ false -> Ref = make_ref(),
+ case open1(Path1, Mode1, Options, Ref, bof, new) of
+ {ok, _Handle} ->
+ RCount1 = case is_reader(Mode1) of
+ true -> RCount + 1;
+ false -> RCount
+ end,
+ HasWriter1 = HasWriter orelse IsWriter,
+ put({Path1, fhc_file},
+ File1 #file { reader_count = RCount1,
+ has_writer = HasWriter1 }),
+ {ok, Ref};
+ Error ->
+ Error
+ end
+ end.
+
+close(Ref) ->
+ case erase({Ref, fhc_handle}) of
+ undefined -> ok;
+ Handle -> case hard_close(Handle) of
+ ok -> ok;
+ {Error, Handle1} -> put_handle(Ref, Handle1),
+ Error
+ end
+ end.
+
+read(Ref, Count) ->
+ with_flushed_handles(
+ [Ref],
+ fun ([#handle { is_read = false }]) ->
+ {error, not_open_for_reading};
+ ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
+ case file:read(Hdl, Count) of
+ {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
+ {Obj,
+ [Handle #handle { offset = Offset1 }]};
+ eof -> {eof, [Handle #handle { at_eof = true }]};
+ Error -> {Error, [Handle]}
+ end
+ end).
+
+append(Ref, Data) ->
+ with_handles(
+ [Ref],
+ fun ([#handle { is_write = false }]) ->
+ {error, not_open_for_writing};
+ ([Handle]) ->
+ case maybe_seek(eof, Handle) of
+ {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset,
+ write_buffer_size_limit = 0,
+ at_eof = true } = Handle1} ->
+ Offset1 = Offset + iolist_size(Data),
+ {file:write(Hdl, Data),
+ [Handle1 #handle { is_dirty = true, offset = Offset1 }]};
+ {{ok, _Offset}, #handle { write_buffer = WriteBuffer,
+ write_buffer_size = Size,
+ write_buffer_size_limit = Limit,
+ at_eof = true } = Handle1} ->
+ WriteBuffer1 = [Data | WriteBuffer],
+ Size1 = Size + iolist_size(Data),
+ Handle2 = Handle1 #handle { write_buffer = WriteBuffer1,
+ write_buffer_size = Size1 },
+ case Limit /= infinity andalso Size1 > Limit of
+ true -> {Result, Handle3} = write_buffer(Handle2),
+ {Result, [Handle3]};
+ false -> {ok, [Handle2]}
+ end;
+ {{error, _} = Error, Handle1} ->
+ {Error, [Handle1]}
+ end
+ end).
+
+sync(Ref) ->
+ with_flushed_handles(
+ [Ref],
+ fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
+ ok;
+ ([Handle = #handle { hdl = Hdl, offset = Offset,
+ is_dirty = true, write_buffer = [] }]) ->
+ case file:sync(Hdl) of
+ ok -> {ok, [Handle #handle { trusted_offset = Offset,
+ is_dirty = false }]};
+ Error -> {Error, [Handle]}
+ end
+ end).
+
+position(Ref, NewOffset) ->
+ with_flushed_handles(
+ [Ref],
+ fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
+ {Result, [Handle1]}
+ end).
+
+truncate(Ref) ->
+ with_flushed_handles(
+ [Ref],
+ fun ([Handle1 = #handle { hdl = Hdl, offset = Offset,
+ trusted_offset = TOffset }]) ->
+ case file:truncate(Hdl) of
+ ok -> TOffset1 = lists:min([Offset, TOffset]),
+ {ok, [Handle1 #handle { trusted_offset = TOffset1,
+ at_eof = true }]};
+ Error -> {Error, [Handle1]}
+ end
+ end).
+
+last_sync_offset(Ref) ->
+ with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) ->
+ {ok, TOffset}
+ end).
+
+current_virtual_offset(Ref) ->
+ with_handles([Ref], fun ([#handle { at_eof = true, is_write = true,
+ offset = Offset,
+ write_buffer_size = Size }]) ->
+ {ok, Offset + Size};
+ ([#handle { offset = Offset }]) ->
+ {ok, Offset}
+ end).
+
+current_raw_offset(Ref) ->
+ with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end).
+
+flush(Ref) ->
+ with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end).
+
+copy(Src, Dest, Count) ->
+ with_flushed_handles(
+ [Src, Dest],
+ fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
+ DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
+ ) ->
+ case file:copy(SHdl, DHdl, Count) of
+ {ok, Count1} = Result1 ->
+ {Result1,
+ [SHandle #handle { offset = SOffset + Count1 },
+ DHandle #handle { offset = DOffset + Count1 }]};
+ Error ->
+ {Error, [SHandle, DHandle]}
+ end;
+ (_Handles) ->
+ {error, incorrect_handle_modes}
+ end).
+
+delete(Ref) ->
+ case erase({Ref, fhc_handle}) of
+ undefined ->
+ ok;
+ Handle = #handle { path = Path } ->
+ case hard_close(Handle #handle { is_dirty = false,
+ write_buffer = [] }) of
+ ok -> file:delete(Path);
+ {Error, Handle1} -> put_handle(Ref, Handle1),
+ Error
+ end
+ end.
+
+clear(Ref) ->
+ with_handles(
+ [Ref],
+ fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
+ ok;
+ ([Handle]) ->
+ case maybe_seek(bof, Handle #handle { write_buffer = [],
+ write_buffer_size = 0 }) of
+ {{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
+ case file:truncate(Hdl) of
+ ok -> {ok, [Handle1 #handle {trusted_offset = 0,
+ at_eof = true }]};
+ Error -> {Error, [Handle1]}
+ end;
+ {{error, _} = Error, Handle1} ->
+ {Error, [Handle1]}
+ end
+ end).
+
+set_maximum_since_use(MaximumAge) ->
+ Now = now(),
+ case lists:foldl(
+ fun ({{Ref, fhc_handle},
+ Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
+ Age = timer:now_diff(Now, Then),
+ case Hdl /= closed andalso Age >= MaximumAge of
+ true -> {Res, Handle1} = soft_close(Handle),
+ case Res of
+ ok -> put({Ref, fhc_handle}, Handle1),
+ false;
+ _ -> put_handle(Ref, Handle1),
+ Rep
+ end;
+ false -> Rep
+ end;
+ (_KeyValuePair, Rep) ->
+ Rep
+ end, true, get()) of
+ true -> age_tree_change(), ok;
+ false -> ok
+ end.
+
+release_on_death(Pid) when is_pid(Pid) ->
+ gen_server:cast(?SERVER, {release_on_death, Pid}).
+
+obtain() ->
+ gen_server:call(?SERVER, obtain, infinity).
+
+%%----------------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------------
+
+is_reader(Mode) -> lists:member(read, Mode).
+
+is_writer(Mode) -> lists:member(write, Mode).
+
+append_to_write(Mode) ->
+ case lists:member(append, Mode) of
+ true -> [write | Mode -- [append, write]];
+ false -> Mode
+ end.
+
+with_handles(Refs, Fun) ->
+ ResHandles = lists:foldl(
+ fun (Ref, {ok, HandlesAcc}) ->
+ case get_or_reopen(Ref) of
+ {ok, Handle} -> {ok, [Handle | HandlesAcc]};
+ Error -> Error
+ end;
+ (_Ref, Error) ->
+ Error
+ end, {ok, []}, Refs),
+ case ResHandles of
+ {ok, Handles} ->
+ case Fun(lists:reverse(Handles)) of
+ {Result, Handles1} when is_list(Handles1) ->
+ lists:zipwith(fun put_handle/2, Refs, Handles1),
+ Result;
+ Result ->
+ Result
+ end;
+ Error ->
+ Error
+ end.
+
+with_flushed_handles(Refs, Fun) ->
+ with_handles(
+ Refs,
+ fun (Handles) ->
+ case lists:foldl(
+ fun (Handle, {ok, HandlesAcc}) ->
+ {Res, Handle1} = write_buffer(Handle),
+ {Res, [Handle1 | HandlesAcc]};
+ (Handle, {Error, HandlesAcc}) ->
+ {Error, [Handle | HandlesAcc]}
+ end, {ok, []}, Handles) of
+ {ok, Handles1} ->
+ Fun(lists:reverse(Handles1));
+ {Error, Handles1} ->
+ {Error, lists:reverse(Handles1)}
+ end
+ end).
+
+get_or_reopen(Ref) ->
+ case get({Ref, fhc_handle}) of
+ undefined ->
+ {error, not_open, Ref};
+ #handle { hdl = closed, offset = Offset,
+ path = Path, mode = Mode, options = Options } ->
+ open1(Path, Mode, Options, Ref, Offset, reopen);
+ Handle ->
+ {ok, Handle}
+ end.
+
+put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
+ Now = now(),
+ age_tree_update(Then, Now, Ref),
+ put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
+
+with_age_tree(Fun) ->
+ put(fhc_age_tree, Fun(case get(fhc_age_tree) of
+ undefined -> gb_trees:empty();
+ AgeTree -> AgeTree
+ end)).
+
+age_tree_insert(Now, Ref) ->
+ with_age_tree(
+ fun (Tree) ->
+ Tree1 = gb_trees:insert(Now, Ref, Tree),
+ {Oldest, _Ref} = gb_trees:smallest(Tree1),
+ gen_server:cast(?SERVER, {open, self(), Oldest}),
+ Tree1
+ end).
+
+age_tree_update(Then, Now, Ref) ->
+ with_age_tree(
+ fun (Tree) ->
+ gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
+ end).
+
+age_tree_delete(Then) ->
+ with_age_tree(
+ fun (Tree) ->
+ Tree1 = gb_trees:delete_any(Then, Tree),
+ Oldest = case gb_trees:is_empty(Tree1) of
+ true ->
+ undefined;
+ false ->
+ {Oldest1, _Ref} = gb_trees:smallest(Tree1),
+ Oldest1
+ end,
+ gen_server:cast(?SERVER, {close, self(), Oldest}),
+ Tree1
+ end).
+
+age_tree_change() ->
+ with_age_tree(
+ fun (Tree) ->
+ case gb_trees:is_empty(Tree) of
+ true -> Tree;
+ false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ gen_server:cast(?SERVER, {update, self(), Oldest})
+ end,
+ Tree
+ end).
+
+open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
+ Mode1 = case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end,
+ case file:open(Path, Mode1) of
+ {ok, Hdl} ->
+ WriteBufferSize =
+ case proplists:get_value(write_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ infinity -> infinity;
+ N when is_integer(N) -> N
+ end,
+ Now = now(),
+ Handle = #handle { hdl = Hdl,
+ offset = 0,
+ trusted_offset = 0,
+ is_dirty = false,
+ write_buffer_size = 0,
+ write_buffer_size_limit = WriteBufferSize,
+ write_buffer = [],
+ at_eof = false,
+ path = Path,
+ mode = Mode,
+ options = Options,
+ is_write = is_writer(Mode),
+ is_read = is_reader(Mode),
+ last_used_at = Now },
+ {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle),
+ Handle2 = Handle1 #handle { trusted_offset = Offset1 },
+ put({Ref, fhc_handle}, Handle2),
+ age_tree_insert(Now, Ref),
+ {ok, Handle2};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+soft_close(Handle = #handle { hdl = closed }) ->
+ {ok, Handle};
+soft_close(Handle) ->
+ case write_buffer(Handle) of
+ {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty,
+ last_used_at = Then } = Handle1 } ->
+ ok = case IsDirty of
+ true -> file:sync(Hdl);
+ false -> ok
+ end,
+ ok = file:close(Hdl),
+ age_tree_delete(Then),
+ {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset,
+ is_dirty = false }};
+ {_Error, _Handle} = Result ->
+ Result
+ end.
+
+hard_close(Handle) ->
+ case soft_close(Handle) of
+ {ok, #handle { path = Path,
+ is_read = IsReader, is_write = IsWriter }} ->
+ #file { reader_count = RCount, has_writer = HasWriter } = File =
+ get({Path, fhc_file}),
+ RCount1 = case IsReader of
+ true -> RCount - 1;
+ false -> RCount
+ end,
+ HasWriter1 = HasWriter andalso not IsWriter,
+ case RCount1 =:= 0 andalso not HasWriter1 of
+ true -> erase({Path, fhc_file});
+ false -> put({Path, fhc_file},
+ File #file { reader_count = RCount1,
+ has_writer = HasWriter1 })
+ end,
+ ok;
+ {_Error, _Handle} = Result ->
+ Result
+ end.
+
+maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
+ at_eof = AtEoF }) ->
+ {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
+ case (case NeedsSeek of
+ true -> file:position(Hdl, NewOffset);
+ false -> {ok, Offset}
+ end) of
+ {ok, Offset1} = Result ->
+ {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
+ {error, _} = Error ->
+ {Error, Handle}
+ end.
+
+needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
+needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false};
+needs_seek( true, _CurOffset, eof ) -> {true , false};
+needs_seek( true, _CurOffset, {eof, 0}) -> {true , false};
+needs_seek( false, _CurOffset, eof ) -> {true , true };
+needs_seek( false, _CurOffset, {eof, 0}) -> {true , true };
+needs_seek( AtEoF, 0, bof ) -> {AtEoF, false};
+needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false};
+needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false};
+needs_seek( true, CurOffset, {bof, DesiredOffset})
+ when DesiredOffset >= CurOffset ->
+ {true, true};
+needs_seek( true, _CurOffset, {cur, DesiredOffset})
+ when DesiredOffset > 0 ->
+ {true, true};
+needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO}
+ when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset ->
+ {true, true};
+%% because we can't really track size, we could well end up at EoF and not know
+needs_seek(_AtEoF, _CurOffset, _DesiredOffset) ->
+ {false, true}.
+
+write_buffer(Handle = #handle { write_buffer = [] }) ->
+ {ok, Handle};
+write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
+ write_buffer = WriteBuffer,
+ write_buffer_size = DataSize,
+ at_eof = true }) ->
+ case file:write(Hdl, lists:reverse(WriteBuffer)) of
+ ok ->
+ Offset1 = Offset + DataSize,
+ {ok, Handle #handle { offset = Offset1, is_dirty = true,
+ write_buffer = [], write_buffer_size = 0 }};
+ {error, _} = Error ->
+ {Error, Handle}
+ end.
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ Limit = case application:get_env(file_handles_high_watermark) of
+ {ok, Watermark} when (is_integer(Watermark) andalso
+ Watermark > 0) ->
+ Watermark;
+ _ ->
+ ulimit()
+ end,
+ error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
+ {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
+ obtains = [], callbacks = dict:new(),
+ client_mrefs = dict:new(), timer_ref = undefined }}.
+
+handle_call(obtain, From, State = #fhc_state { count = Count }) ->
+ State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
+ maybe_reduce(State #fhc_state { count = Count + 1 }),
+ case Limit /= infinity andalso Count1 >= Limit of
+ true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
+ count = Count1 - 1 }};
+ false -> {reply, ok, State1}
+ end.
+
+handle_cast({register_callback, Pid, MFA},
+ State = #fhc_state { callbacks = Callbacks }) ->
+ {noreply, ensure_mref(
+ Pid, State #fhc_state {
+ callbacks = dict:store(Pid, MFA, Callbacks) })};
+
+handle_cast({open, Pid, EldestUnusedSince}, State =
+ #fhc_state { elders = Elders, count = Count }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ {noreply, maybe_reduce(
+ ensure_mref(Pid, State #fhc_state { elders = Elders1,
+ count = Count + 1 }))};
+
+handle_cast({update, Pid, EldestUnusedSince}, State =
+ #fhc_state { elders = Elders }) ->
+ Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ %% don't call maybe_reduce from here otherwise we can create a
+ %% storm of messages
+ {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })};
+
+handle_cast({close, Pid, EldestUnusedSince}, State =
+ #fhc_state { elders = Elders, count = Count }) ->
+ Elders1 = case EldestUnusedSince of
+ undefined -> dict:erase(Pid, Elders);
+ _ -> dict:store(Pid, EldestUnusedSince, Elders)
+ end,
+ {noreply, process_obtains(
+ ensure_mref(Pid, State #fhc_state { elders = Elders1,
+ count = Count - 1 }))};
+
+handle_cast(check_counts, State) ->
+ {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
+
+handle_cast({release_on_death, Pid}, State) ->
+ _MRef = erlang:monitor(process, Pid),
+ {noreply, State}.
+
+handle_info({'DOWN', MRef, process, Pid, _Reason}, State =
+ #fhc_state { count = Count, callbacks = Callbacks,
+ client_mrefs = ClientMRefs, elders = Elders }) ->
+ {noreply, process_obtains(
+ case dict:find(Pid, ClientMRefs) of
+ {ok, MRef} -> State #fhc_state {
+ elders = dict:erase(Pid, Elders),
+ client_mrefs = dict:erase(Pid, ClientMRefs),
+ callbacks = dict:erase(Pid, Callbacks) };
+ _ -> State #fhc_state { count = Count - 1 }
+ end)}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+%% server helpers
+%%----------------------------------------------------------------------------
+
+process_obtains(State = #fhc_state { obtains = [] }) ->
+ State;
+process_obtains(State = #fhc_state { limit = Limit, count = Count })
+ when Limit /= infinity andalso Count >= Limit ->
+ State;
+process_obtains(State = #fhc_state { limit = Limit, count = Count,
+ obtains = Obtains }) ->
+ ObtainsLen = length(Obtains),
+ ObtainableLen = lists:min([ObtainsLen, Limit - Count]),
+ Take = ObtainsLen - ObtainableLen,
+ {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains),
+ [gen_server:reply(From, ok) || From <- ObtainableRev],
+ State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
+
+maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders,
+ callbacks = Callbacks, timer_ref = TRef })
+ when Limit /= infinity andalso Count >= Limit ->
+ Now = now(),
+ {Pids, Sum, ClientCount} =
+ dict:fold(fun (_Pid, undefined, Accs) ->
+ Accs;
+ (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) ->
+ {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest),
+ CountAcc + 1}
+ end, {[], 0, 0}, Elders),
+ case Pids of
+ [] -> ok;
+ _ -> AverageAge = Sum / ClientCount,
+ lists:foreach(
+ fun (Pid) ->
+ case dict:find(Pid, Callbacks) of
+ error -> ok;
+ {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge])
+ end
+ end, Pids)
+ end,
+ case TRef of
+ undefined -> {ok, TRef1} = timer:apply_after(
+ ?FILE_HANDLES_CHECK_INTERVAL,
+ gen_server, cast, [?SERVER, check_counts]),
+ State #fhc_state { timer_ref = TRef1 };
+ _ -> State
+ end;
+maybe_reduce(State) ->
+ State.
+
+%% Googling around suggests that Windows has a limit somewhere around
+%% 16M, eg
+%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
+%% For everything else, assume ulimit exists. Further googling
+%% suggests that BSDs (incl OS X), solaris and linux all agree that
+%% ulimit -n is file handles
+ulimit() ->
+ case os:type() of
+ {win32, _OsName} ->
+ ?FILE_HANDLES_LIMIT_WINDOWS;
+ {unix, _OsName} ->
+ %% Under Linux, Solaris and FreeBSD, ulimit is a shell
+ %% builtin, not a command. In OS X, it's a command.
+ %% Fortunately, os:cmd invokes the cmd in a shell env, so
+ %% we're safe in all cases.
+ case os:cmd("ulimit -n") of
+ "unlimited" ->
+ infinity;
+ String = [C|_] when $0 =< C andalso C =< $9 ->
+ Num = list_to_integer(
+ lists:takewhile(
+ fun (D) -> $0 =< D andalso D =< $9 end, String)) -
+ ?RESERVED_FOR_OTHERS,
+ lists:max([1, Num]);
+ _ ->
+ %% probably a variant of
+ %% "/bin/sh: line 1: ulimit: command not found\n"
+ ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ end;
+ _ ->
+ ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ end.
+
+ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) ->
+ case dict:find(Pid, ClientMRefs) of
+ {ok, _MRef} -> State;
+ error -> MRef = erlang:monitor(process, Pid),
+ State #fhc_state {
+ client_mrefs = dict:store(Pid, MRef, ClientMRefs) }
+ end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index c33582e3..5b899cdb 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -443,7 +443,7 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
name({local,Name}) -> Name;
name({global,Name}) -> Name;
%% name(Pid) when is_pid(Pid) -> Pid;
-%% when R11 goes away, drop the line beneath and uncomment the line above
+%% when R12 goes away, drop the line beneath and uncomment the line above
name(Name) -> Name.
unregister_name({local,Name}) ->
@@ -607,9 +607,9 @@ process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
Debug, Msg) ->
case Msg of
{system, From, Req} ->
- sys:handle_system_msg
- (Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, TimeoutState, Queue]);
+ sys:handle_system_msg(
+ Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, TimeoutState, Queue]);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b1204997..c389178a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -47,12 +47,19 @@
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
check_empty_content_body_frame_size,
- []}}]}).
+ []}},
+ {enables, external_infrastructure}]}).
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({file_handle_cache,
+ [{description, "file handle cache server"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [file_handle_cache]}},
+ {enables, worker_pool}]}).
+
-rabbit_boot_step({worker_pool,
[{description, "worker pool"},
{mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
@@ -65,21 +72,21 @@
[{description, "exchange type registry"},
{mfa, {rabbit_sup, start_child,
[rabbit_exchange_type_registry]}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_log]}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({rabbit_hooks,
[{description, "internal event notification system"},
{mfa, {rabbit_hooks, start, []}},
- {enables, kernel_ready},
- {requires, external_infrastructure}]}).
+ {requires, external_infrastructure},
+ {enables, kernel_ready}]}).
-rabbit_boot_step({kernel_ready,
[{description, "kernel ready"},
@@ -91,16 +98,24 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_amqqueue_sup,
- [{description, "queue supervisor"},
- {mfa, {rabbit_amqqueue, start, []}},
- {requires, kernel_ready},
+-rabbit_boot_step({rabbit_memory_monitor,
+ [{description, "memory monitor"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_memory_monitor]}},
+ {requires, rabbit_alarm},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_router,
- [{description, "cluster router"},
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
{mfa, {rabbit_sup, start_restartable_child,
- [rabbit_router]}},
+ [rabbit_guid]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
+-rabbit_boot_step({delegate_sup,
+ [{description, "cluster delegate"},
+ {mfa, {rabbit_sup, start_child,
+ [delegate_sup]}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -109,45 +124,39 @@
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_node_monitor]}},
{requires, kernel_ready},
- {requires, rabbit_amqqueue_sup},
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
- [{description, "core initialized"}]}).
+ [{description, "core initialized"},
+ {requires, kernel_ready}]}).
-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
- {requires, core_initialized}]}).
+ {requires, core_initialized},
+ {enables, routing_ready}]}).
-rabbit_boot_step({exchange_recovery,
[{description, "exchange recovery"},
{mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check}]}).
-
--rabbit_boot_step({queue_recovery,
- [{description, "queue recovery"},
- {mfa, {rabbit_amqqueue, recover, []}},
- {requires, exchange_recovery}]}).
-
--rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
- {requires, queue_recovery}]}).
+ {requires, empty_db_check},
+ {enables, routing_ready}]}).
--rabbit_boot_step({guid_generator,
- [{description, "guid generator"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_guid]}},
- {requires, persister},
+-rabbit_boot_step({queue_sup_queue_recovery,
+ [{description, "queue supervisor and queue recovery"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {requires, empty_db_check},
{enables, routing_ready}]}).
-rabbit_boot_step({routing_ready,
- [{description, "message delivery logic ready"}]}).
+ [{description, "message delivery logic ready"},
+ {requires, core_initialized}]}).
-rabbit_boot_step({log_relay,
[{description, "error log relay"},
{mfa, {rabbit_error_logger, boot, []}},
- {requires, routing_ready}]}).
+ {requires, routing_ready},
+ {enables, networking}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
@@ -232,14 +241,18 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
start(normal, []) ->
- {ok, SupPid} = rabbit_sup:start_link(),
+ case erts_version_check() of
+ ok ->
+ {ok, SupPid} = rabbit_sup:start_link(),
- print_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
- io:format("~nbroker running~n"),
-
- {ok, SupPid}.
+ print_banner(),
+ [ok = run_boot_step(Step) || Step <- boot_steps()],
+ io:format("~nbroker running~n"),
+ {ok, SupPid};
+ Error ->
+ Error
+ end.
stop(_State) ->
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
@@ -252,6 +265,14 @@ stop(_State) ->
%%---------------------------------------------------------------------------
+erts_version_check() ->
+ FoundVer = erlang:system_info(version),
+ case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
+ true -> ok;
+ false -> {error, {erlang_version_too_old,
+ {found, FoundVer}, {required, ?ERTS_MINIMUM}}}
+ end.
+
boot_error(Format, Args) ->
io:format("BOOT ERROR: " ++ Format, Args),
error_logger:error_msg(Format, Args),
@@ -278,6 +299,18 @@ run_boot_step({StepName, Attributes}) ->
ok
end.
+module_attributes(Module) ->
+ case catch Module:module_info(attributes) of
+ {'EXIT', {undef, [{Module, module_info, _} | _]}} ->
+ io:format("WARNING: module ~p not found, so not scanned for boot steps.~n",
+ [Module]),
+ [];
+ {'EXIT', Reason} ->
+ exit(Reason);
+ V ->
+ V
+ end.
+
boot_steps() ->
AllApps = [App || {App, _, _} <- application:loaded_applications()],
Modules = lists:usort(
@@ -289,7 +322,7 @@ boot_steps() ->
lists:flatmap(fun (Module) ->
[{StepName, Attributes}
|| {rabbit_boot_step, [{StepName, Attributes}]}
- <- Module:module_info(attributes)]
+ <- module_attributes(Module)]
end, Modules),
sort_boot_steps(UnsortedSteps).
@@ -401,8 +434,9 @@ print_banner() ->
{"cookie hash", rabbit_misc:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()}],
- DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
+ {"database dir", rabbit_mnesia:dir()},
+ {"erlang version", erlang:system_info(version)}],
+ DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
io:nl().
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ceec00fd..483b5a93 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,17 +31,19 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1]).
+-export([start/0, declare/5, delete/3, purge/1]).
+-export([internal_declare/2, internal_delete/1,
+ maybe_run_queue_via_backing_queue/2,
+ update_ram_duration/1, set_ram_duration_target/2,
+ set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
+ stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
--export([claim_queue/2]).
--export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
+-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -63,9 +65,8 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
- amqqueue()).
+-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
+ maybe(pid())) -> amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -89,27 +90,28 @@
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
--spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
--spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+ {'ok', non_neg_integer(), qmsg()} | 'empty').
+-spec(basic_consume/7 ::
+ (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
- 'ok' | {'error', 'queue_owned_by_another_connection' |
- 'exclusive_consume_unavailable'}).
+ 'ok' | {'error', 'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
+-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
+-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
+-spec(update_ram_duration/1 :: (pid()) -> 'ok').
+-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok').
+-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -118,80 +120,66 @@
%%----------------------------------------------------------------------------
start() ->
+ DurableQueues = find_durable_queues(),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
+ _RealDurableQueues = recover_durable_queues(DurableQueues),
ok.
-recover() ->
- ok = recover_durable_queues(),
- ok.
-
-recover_durable_queues() ->
+find_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
- end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
-
-declare(QueueName, Durable, AutoDelete, Args) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end).
+
+recover_durable_queues(DurableQueues) ->
+ Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
+
+declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
+ exclusive_owner = Owner,
pid = none}),
- internal_declare(Q, true).
-
-internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] ->
- case mnesia:read(
- {rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- case WantDefaultBinding of
- true -> add_default_binding(Q);
- false -> ok
- end,
- Q;
- [_] -> not_found %% existing Q on stopped node
- end;
- [ExistingQ] ->
- ExistingQ
- end
- end) of
- not_found -> exit(Q#amqqueue.pid, shutdown),
- rabbit_misc:not_found(QueueName);
- Q -> Q;
- ExistingQ -> exit(Q#amqqueue.pid, shutdown),
- ExistingQ
+ case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ not_found -> rabbit_misc:not_found(QueueName);
+ Q1 -> Q1
end.
+internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case Recover of
+ true ->
+ ok = store_queue(Q),
+ Q;
+ false ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] ->
+ case mnesia:read({rabbit_durable_queue,
+ QueueName}) of
+ [] -> ok = store_queue(Q),
+ ok = add_default_binding(Q),
+ Q;
+ [_] -> not_found %% Q exists on stopped node
+ end;
+ [ExistingQ] ->
+ ExistingQ
+ end
+ end
+ end).
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q, write),
ok = mnesia:write(rabbit_queue, Q, write),
@@ -201,7 +189,7 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok.
start_queue_process(Q) ->
- {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
+ {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -234,10 +222,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, info, infinity).
+ delegate_pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -247,7 +235,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, consumers, infinity).
+ delegate_pcall(QPid, 9, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -256,15 +244,15 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
deliver(QPid, #delivery{immediate = true,
txn = Txn, sender = ChPid, message = Message}) ->
@@ -278,29 +266,24 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
true.
-redeliver(QPid, Messages) ->
- gen_server2:cast(QPid, {redeliver, Messages}).
-
requeue(QPid, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
-commit_all(QPids, Txn) ->
- safe_pmap_ok(
+commit_all(QPids, Txn, ChPid) ->
+ safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
-rollback_all(QPids, Txn) ->
- safe_pmap_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
- QPids).
+rollback_all(QPids, Txn, ChPid) ->
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
notify_down_all(QPids, ChPid) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
@@ -308,38 +291,41 @@ notify_down_all(QPids, ChPid) ->
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
- QPids).
-
-claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
+ end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ delegate_call(QPid, {basic_consume, NoAck, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+ delegate_pcast(QPid, 7, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
- QPids).
+ delegate:invoke_no_result(
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+
+internal_delete1(QueueName) ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things, as
+ %% decided by rabbit_exchange, after the
+ %% transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName).
internal_delete(QueueName) ->
case
@@ -347,13 +333,7 @@ internal_delete(QueueName) ->
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] ->
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- %% we want to execute some things, as
- %% decided by rabbit_exchange, after the
- %% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName)
+ [_] -> internal_delete1(QueueName)
end
end) of
Err = {error, _} -> Err;
@@ -362,6 +342,19 @@ internal_delete(QueueName) ->
ok
end.
+maybe_run_queue_via_backing_queue(QPid, Fun) ->
+ gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun},
+ infinity).
+
+update_ram_duration(QPid) ->
+ gen_server2:pcast(QPid, 8, update_ram_duration).
+
+set_ram_duration_target(QPid, Duration) ->
+ gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+
on_node_down(Node) ->
[Hook() ||
Hook <- rabbit_misc:execute_mnesia_transaction(
@@ -385,17 +378,28 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_pmap_ok(H, F, L) ->
- case [R || R <- rabbit_misc:upmap(
- fun (V) ->
- try
- rabbit_misc:with_exit_handler(
- fun () -> H(V) end,
- fun () -> F(V) end)
- catch Class:Reason -> {Class, Reason}
- end
- end, L),
- R =/= ok] of
- [] -> ok;
- Errors -> {error, Errors}
+safe_delegate_call_ok(H, F, Pids) ->
+ {_, Bad} = delegate:invoke(Pids,
+ fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> H(Pid) end,
+ fun () -> F(Pid) end)
+ end),
+ case Bad of
+ [] -> ok;
+ _ -> {error, Bad}
end.
+
+delegate_call(Pid, Msg, Timeout) ->
+ delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end).
+
+delegate_pcall(Pid, Pri, Msg, Timeout) ->
+ delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
+
+delegate_cast(Pid, Msg) ->
+ delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+
+delegate_pcast(Pid, Pri, Msg) ->
+ delegate:invoke_no_result(Pid,
+ fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 19cb5c71..8bd6e68b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -35,13 +35,14 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
+-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(SYNC_INTERVAL, 5). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-export([start_link/1, info_keys/0]).
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, handle_pre_hibernate/1]).
-import(queue).
-import(erlang).
@@ -49,24 +50,24 @@
% Queue's state
-record(q, {q,
- owner,
exclusive_consumer,
has_had_consumers,
- next_msg_id,
- message_buffer,
+ backing_queue,
+ backing_queue_state,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ sync_timer_ref,
+ rate_timer_ref
+ }).
-record(consumer, {tag, ack_required}).
--record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
-
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
limiter_pid,
monitor_ref,
- unacked_messages,
+ acktags,
is_limit_active,
txn,
unsent_message_count}).
@@ -82,49 +83,127 @@
exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
- messages_uncommitted,
messages,
- acks_uncommitted,
consumers,
- transactions,
- memory]).
+ memory,
+ backing_queue_status
+ ]).
%%----------------------------------------------------------------------------
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- {ok, #q{q = Q,
- owner = none,
+ process_flag(trap_exit, true),
+ {ok, BQ} = application:get_env(backing_queue_module),
+
+ {ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- next_msg_id = 1,
- message_buffer = queue:new(),
+ backing_queue = BQ,
+ backing_queue_state = undefined,
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, hibernate,
+ blocked_consumers = queue:new(),
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(_Reason, State) ->
+terminate(shutdown, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
+terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
+terminate(_Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
- QName = qname(State),
- lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
- all_tx()),
- ok = purge_message_buffer(QName, State#q.message_buffer),
- ok = rabbit_amqqueue:internal_delete(QName).
+ terminate_shutdown(fun (BQS) ->
+ BQS1 = BQ:delete_and_terminate(BQS),
+ %% don't care if the internal delete
+ %% doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(qname(State)),
+ BQS1
+ end, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+terminate_shutdown(Fun, State) ->
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ stop_sync_timer(stop_rate_timer(State)),
+ case BQS of
+ undefined -> State;
+ _ -> ok = rabbit_memory_monitor:deregister(self()),
+ BQS1 = lists:foldl(
+ fun (#cr{txn = none}, BQSN) ->
+ BQSN;
+ (#cr{txn = Txn}, BQSN) ->
+ {_AckTags, BQSN1} =
+ BQ:tx_rollback(Txn, BQSN),
+ BQSN1
+ end, BQS, all_ch_record()),
+ State1#q{backing_queue_state = Fun(BQS1)}
+ end.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ assert_invariant(NewState),
+ {NewState1, Timeout} = next_state(NewState),
+ {reply, Reply, NewState1, Timeout}.
+
+noreply(NewState) ->
+ assert_invariant(NewState),
+ {NewState1, Timeout} = next_state(NewState),
+ {noreply, NewState1, Timeout}.
+
+next_state(State) ->
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ ensure_rate_timer(State),
+ case BQ:needs_sync(BQS)of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
+ end.
+
+ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL,
+ rabbit_amqqueue, maybe_run_queue_via_backing_queue,
+ [self(), fun (BQS) -> BQ:sync(BQS) end]),
+ State#q{sync_timer_ref = TRef};
+ensure_sync_timer(State) ->
+ State.
+
+stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
+ State;
+stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{sync_timer_ref = undefined}.
+
+ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State#q{rate_timer_ref = TRef};
+ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
+ State#q{rate_timer_ref = undefined};
+ensure_rate_timer(State) ->
+ State.
+
+stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
+ State;
+stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
+ State#q{rate_timer_ref = undefined};
+stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{rate_timer_ref = undefined}.
+
+assert_invariant(#q{active_consumers = AC,
+ backing_queue = BQ, backing_queue_state = BQS}) ->
+ true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -140,7 +219,7 @@ ch_record(ChPid) ->
C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
- unacked_messages = dict:new(),
+ acktags = sets:new(),
is_limit_active = false,
txn = none,
unsent_message_count = 0},
@@ -171,29 +250,33 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_immediately(Message, IsDelivered,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+ State = #q{q = #amqqueue{name = QName},
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ acktags = ChAckTags} = ch_record(ChPid),
+ IsMsgReady = PredFun(FunAcc, State),
+ case (IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of
true ->
+ {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
+ DeliverFun(AckRequired, FunAcc, State),
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, IsDelivered, Message}),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> sets:add_element(
+ AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM},
+ acktags = ChAckTags1},
store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
@@ -207,88 +290,85 @@ deliver_immediately(Message, IsDelivered,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers,
- next_msg_id = NextId + 1}};
- false ->
+ State2 = State1#q{
+ active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers},
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
+ %% if IsMsgReady then we've hit the limiter
+ false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
BlockedConsumers),
- deliver_immediately(
- Message, IsDelivered,
+ deliver_msgs_to_consumers(
+ Funs, FunAcc,
State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
- end;
- {empty, _} ->
- {not_offered, State}
- end.
-
-run_message_queue(State = #q{message_buffer = MessageBuffer}) ->
- run_message_queue(MessageBuffer, State).
-
-run_message_queue(MessageBuffer, State) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, IsDelivered}}, BufferTail} ->
- case deliver_immediately(Message, IsDelivered, State) of
- {offered, true, NewState} ->
- persist_delivery(qname(State), Message, IsDelivered),
- run_message_queue(BufferTail, NewState);
- {offered, false, NewState} ->
- persist_auto_ack(qname(State), Message),
- run_message_queue(BufferTail, NewState);
- {not_offered, NewState} ->
- NewState#q{message_buffer = MessageBuffer}
+ blocked_consumers = NewBlockedConsumers});
+ false ->
+ %% no message was ready, so we don't need to block anyone
+ {FunAcc, State}
end;
{empty, _} ->
- State#q{message_buffer = MessageBuffer}
+ {FunAcc, State}
end.
-attempt_delivery(none, _ChPid, Message, State) ->
- case deliver_immediately(Message, false, State) of
- {offered, false, State1} ->
- {true, State1};
- {offered, true, State1} ->
- persist_message(none, qname(State), Message),
- persist_delivery(qname(State), Message, false),
- {true, State1};
- {not_offered, State1} ->
- {false, State1}
- end;
-attempt_delivery(Txn, ChPid, Message, State) ->
- persist_message(Txn, qname(State), Message),
- record_pending_message(Txn, ChPid, Message),
- {true, State}.
+deliver_from_queue_pred(IsEmpty, _State) ->
+ not IsEmpty.
+
+deliver_from_queue_deliver(AckRequired, false,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ {{Message, IsDelivered, AckTag}, 0 == Remaining,
+ State #q { backing_queue_state = BQS1 }}.
+
+run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Funs = {fun deliver_from_queue_pred/2,
+ fun deliver_from_queue_deliver/3},
+ IsEmpty = BQ:is_empty(BQS),
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
+ State1.
+
+attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
+ {AckTag, BQS1} =
+ BQ:publish_delivered(AckRequired, Message, BQS),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS1}}
+ end,
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ record_current_channel_tx(ChPid, Txn),
+ {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
-deliver_or_enqueue(Txn, ChPid, Message, State) ->
+deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- persist_message(Txn, qname(State), Message),
- NewMB = queue:in({Message, false}, NewState#q.message_buffer),
- {false, NewState#q{message_buffer = NewMB}}
+ %% Txn is none and no unblocked channels with consumers
+ BQS = BQ:publish(Message, State #q.backing_queue_state),
+ {false, NewState#q{backing_queue_state = BQS}}
end.
-deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
- run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)),
- State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+ maybe_run_queue_via_backing_queue(
+ fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
remove_consumer(ChPid, ConsumerTag, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(
- fun ({CP, #consumer{tag = CT}}) ->
- (CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(Queue))).
+ queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= ConsumerTag)
+ end, Queue).
remove_consumers(ChPid, Queue) ->
- %% TODO: replace this with queue:filter/2 once we move to R12
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue))).
+ queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
move_consumers(ChPid, From, To) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
@@ -323,7 +403,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
not_found ->
{ok, State};
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
- unacked_messages = UAM} ->
+ acktags = ChAckTags} ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
State1 = State#q{
@@ -337,15 +417,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
ChPid, State#q.blocked_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> case Txn of
- none -> ok;
- _ -> ok = rollback_work(Txn, qname(State1)),
- erase_tx(Txn)
- end,
- {ok, deliver_or_enqueue_n(
- [{Message, true} ||
- {_MsgId, Message} <- dict:to_list(UAM)],
- State1)}
+ false -> State2 = case Txn of
+ none -> State1;
+ _ -> rollback_transaction(Txn, ChPid,
+ State1)
+ end,
+ {ok, requeue_and_run(sets:to_list(ChAckTags), State2)}
end
end.
@@ -354,10 +431,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
Holder.
-check_queue_owner(none, _) -> ok;
-check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
-check_queue_owner({_, _}, _) -> mismatch.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -376,134 +449,30 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
- ok;
-persist_message(Txn, QName, Message) ->
- M = Message#basic_message{
- %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore
- content = rabbit_binary_parser:clear_decoded_content(
- Message#basic_message.content)},
- persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
-
-persist_delivery(_QName, _Message,
- true) ->
- ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
- _IsDelivered) ->
- ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
- _IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
-
-persist_acks(Txn, QName, Messages) ->
- persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
-
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
- ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
- %% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
-
-commit_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName).
-
-rollback_work(Txn, QName) ->
- do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName).
-
-%% optimisation: don't do unnecessary work
-%% it would be nice if this was handled by the persister
-do_if_persistent(F, Txn, QName) ->
- case is_tx_persistent(Txn) of
- false -> ok;
- true -> ok = F({Txn, QName})
- end.
-
-lookup_tx(Txn) ->
- case get({txn, Txn}) of
- undefined -> #tx{ch_pid = none,
- is_persistent = false,
- pending_messages = [],
- pending_acks = []};
- V -> V
- end.
-
-store_tx(Txn, Tx) ->
- put({txn, Txn}, Tx).
-
-erase_tx(Txn) ->
- erase({txn, Txn}).
-
-all_tx_record() ->
- [T || {{txn, _}, T} <- get()].
-
-all_tx() ->
- [Txn || {{txn, Txn}, _} <- get()].
-
-mark_tx_persistent(Txn) ->
- Tx = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{is_persistent = true}).
-
-is_tx_persistent(Txn) ->
- #tx{is_persistent = Res} = lookup_tx(Txn),
- Res.
-
-record_pending_message(Txn, ChPid, Message) ->
- Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
- ch_pid = ChPid}).
-
-record_pending_acks(Txn, ChPid, MsgIds) ->
- Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
- record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
- ch_pid = ChPid}).
-
-process_pending(Txn, State) ->
- #tx{ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
- case lookup_ch(ChPid) of
- not_found -> ok;
- C = #cr{unacked_messages = UAM} ->
- {_Acked, Remaining} =
- collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining})
- end,
- deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
-
-collect_messages(MsgIds, UAM) ->
- lists:mapfoldl(
- fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end,
- UAM, MsgIds).
-
-purge_message_buffer(QName, MessageBuffer) ->
- Messages =
- [[Message || {Message, _IsDelivered} <-
- queue:to_list(MessageBuffer)] |
- lists:map(
- fun (#cr{unacked_messages = UAM}) ->
- [Message || {_MsgId, Message} <- dict:to_list(UAM)]
- end,
- all_ch_record())],
- %% the simplest, though certainly not the most obvious or
- %% efficient, way to purge messages from the persister is to
- %% artifically ack them.
- persist_acks(none, QName, lists:append(Messages)).
+maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+ run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+
+commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {AckTags, BQS1} =
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+ %% ChPid must be known here because of the participant management
+ %% by the channel.
+ C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ State#q{backing_queue_state = BQS1}.
+
+rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ %% Iff we removed acktags from the channel record on ack+txn then
+ %% we would add them back in here (would also require ChPid)
+ record_current_channel_tx(ChPid, none),
+ State#q{backing_queue_state = BQS1}.
+
+subtract_acks(A, B) when is_list(B) ->
+ lists:foldl(fun sets:del_element/2, A, B).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -513,10 +482,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
-i(owner_pid, #q{owner = none}) ->
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
-i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
- ReaderPid;
+i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
+ ExclusiveOwner;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -525,33 +494,72 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
ConsumerTag;
-i(messages_ready, #q{message_buffer = MessageBuffer}) ->
- queue:len(MessageBuffer);
+i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([dict:size(UAM) ||
- #cr{unacked_messages = UAM} <- all_ch_record()]);
-i(messages_uncommitted, _) ->
- lists:sum([length(Pending) ||
- #tx{pending_messages = Pending} <- all_tx_record()]);
+ lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
-i(acks_uncommitted, _) ->
- lists:sum([length(Pending) ||
- #tx{pending_acks = Pending} <- all_tx_record()]);
+ messages_unacknowledged]]);
i(consumers, State) ->
queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
-i(transactions, _) ->
- length(all_tx_record());
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
+i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
%---------------------------------------------------------------------------
+handle_call({init, Recover}, From,
+ State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable,
+ exclusive_owner = ExclusiveOwner},
+ backing_queue = BQ, backing_queue_state = undefined}) ->
+ Declare =
+ fun() ->
+ case rabbit_amqqueue:internal_declare(Q, Recover) of
+ not_found ->
+ {stop, normal, not_found, State};
+ Q ->
+ gen_server2:reply(From, Q),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use,
+ [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ noreply(
+ State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)});
+ Q1 ->
+ {stop, normal, Q1, State}
+ end
+ end,
+
+ case ExclusiveOwner of
+ none ->
+ Declare();
+ Owner ->
+ case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
+ true ->
+ erlang:monitor(process, Owner),
+ Declare();
+ _ ->
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n",
+ [QName])
+ end,
+ %% Rely on terminate to delete the queue.
+ {stop, normal, not_found,
+ State#q{backing_queue_state =
+ BQ:init(QName, IsDurable, Recover)}}
+ end
+ end;
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -592,13 +600,9 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
{Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
reply(Delivered, NewState);
-handle_call({commit, Txn}, From, State) ->
- ok = commit_work(Txn, qname(State)),
- %% optimisation: we reply straight away so the sender can continue
- gen_server2:reply(From, ok),
- NewState = process_pending(Txn, State),
- erase_tx(Txn),
- noreply(NewState);
+handle_call({commit, Txn, ChPid}, From, State) ->
+ NewState = commit_transaction(Txn, From, ChPid, State),
+ noreply(run_message_queue(NewState));
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -613,73 +617,59 @@ handle_call({notify_down, ChPid}, _From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
- next_msg_id = NextId,
- message_buffer = MessageBuffer}) ->
- case queue:out(MessageBuffer) of
- {{value, {Message, IsDelivered}}, BufferTail} ->
- AckRequired = not(NoAck),
+ backing_queue_state = BQS, backing_queue = BQ}) ->
+ AckRequired = not NoAck,
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1});
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
- true ->
- persist_delivery(QName, Message, IsDelivered),
- C = #cr{unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = dict:store(NextId, Message, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM});
- false ->
- persist_auto_ack(QName, Message)
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
+ false -> ok
end,
- Msg = {QName, self(), NextId, IsDelivered, Message},
- reply({ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail,
- next_msg_id = NextId + 1});
- {empty, _} ->
- reply(empty, State)
+ Msg = {QName, self(), AckTag, IsDelivered, Message},
+ reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder}) ->
- case check_queue_owner(Owner, ReaderPid) of
- mismatch ->
- reply({error, queue_owned_by_another_connection}, State);
+ _From, State = #q{exclusive_consumer = ExistingHolder}) ->
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
+ in_use ->
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
- in_use ->
- reply({error, exclusive_consume_unavailable}, State);
- ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag,
- ack_required = not(NoAck)},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
- case ConsumerCount of
- 0 -> ok = rabbit_limiter:register(LimiterPid, self());
- _ -> ok
- end,
- ExclusiveConsumer = case ExclusiveConsume of
- true -> {ChPid, ConsumerTag};
- false -> ExistingHolder
- end,
- State1 = State#q{has_had_consumers = true,
- exclusive_consumer = ExclusiveConsumer},
- ok = maybe_send_reply(ChPid, OkMsg),
- State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
- end,
- reply(ok, State2)
- end
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not NoAck},
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
+ ok = case ConsumerCount of
+ 0 -> rabbit_limiter:register(LimiterPid, self());
+ _ -> ok
+ end,
+ ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
+ State1 = State#q{has_had_consumers = true,
+ exclusive_consumer = ExclusiveConsumer},
+ ok = maybe_send_reply(ChPid, OkMsg),
+ State2 =
+ case is_ch_blocked(C) of
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
+ false -> run_message_queue(
+ State1#q{
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
+ end,
+ reply(ok, State2)
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -712,14 +702,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- message_buffer = MessageBuffer,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- Length = queue:len(MessageBuffer),
- reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
+ reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q{message_buffer = MessageBuffer}) ->
- IsEmpty = queue:is_empty(MessageBuffer),
+ State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
@@ -727,77 +717,51 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IfUnused and not(IsUnused) ->
reply({error, in_use}, State);
true ->
- {stop, normal, {ok, queue:len(MessageBuffer)}, State}
+ {stop, normal, {ok, BQ:len(BQS)}, State}
end;
-handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
- ok = purge_message_buffer(qname(State), MessageBuffer),
- reply({ok, queue:len(MessageBuffer)},
- State#q{message_buffer = queue:new()});
+handle_call(purge, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Count, BQS1} = BQ:purge(BQS),
+ reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({claim_queue, ReaderPid}, _From,
- State = #q{owner = Owner, exclusive_consumer = Holder}) ->
- case Owner of
- none ->
- case check_exclusive_access(Holder, true, State) of
- in_use ->
- %% FIXME: Is this really the right answer? What if
- %% an active consumer's reader is actually the
- %% claiming pid? Should that be allowed? In order
- %% to check, we'd need to hold not just the ch
- %% pid for each consumer, but also its reader
- %% pid...
- reply(locked, State);
- ok ->
- MonitorRef = erlang:monitor(process, ReaderPid),
- reply(ok, State#q{owner = {ReaderPid, MonitorRef}})
- end;
- {ReaderPid, _MonitorRef} ->
- reply(ok, State);
- _ ->
- reply(locked, State)
- end.
+handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
noreply(NewState);
-handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
+handle_cast({ack, Txn, AckTags, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {Acked, Remaining} = collect_messages(MsgIds, UAM),
- persist_acks(Txn, qname(State), Acked),
- case Txn of
- none ->
- store_ch_record(C#cr{unacked_messages = Remaining});
- _ ->
- record_pending_acks(Txn, ChPid, MsgIds)
- end,
- noreply(State)
+ C = #cr{acktags = ChAckTags} ->
+ {C1, BQS1} =
+ case Txn of
+ none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
+ _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
+ end,
+ store_ch_record(C1),
+ noreply(State #q { backing_queue_state = BQS1 })
end;
-handle_cast({rollback, Txn}, State) ->
- ok = rollback_work(Txn, qname(State)),
- erase_tx(Txn),
- noreply(State);
-
-handle_cast({redeliver, Messages}, State) ->
- noreply(deliver_or_enqueue_n(Messages, State));
+handle_cast({rollback, Txn, ChPid}, State) ->
+ noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, MsgIds, ChPid}, State) ->
+handle_cast({requeue, AckTags, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
noreply(State);
- C = #cr{unacked_messages = UAM} ->
- {Messages, NewUAM} = collect_messages(MsgIds, UAM),
- store_ch_record(C#cr{unacked_messages = NewUAM}),
- noreply(deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State))
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
end;
handle_cast({unblock, ChPid}, State) ->
@@ -830,27 +794,59 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
+ noreply(State);
+
+handle_cast(update_ram_duration, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State#q{rate_timer_ref = just_measured,
+ backing_queue_state = BQS2});
+
+handle_cast({set_ram_duration_target, Duration},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State#q{backing_queue_state = BQS1});
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State).
-handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
- State = #q{owner = {DownPid, MonitorRef}}) ->
- %% We know here that there are no consumers on this queue that are
- %% owned by other pids than the one that just went down, so since
- %% exclusive in some sense implies autodelete, we delete the queue
- %% here. The other way of implementing the "exclusive implies
- %% autodelete" feature is to actually set autodelete when an
- %% exclusive declaration is seen, but this has the problem that
- %% the python tests rely on the queue not going away after a
- %% basic.cancel when the queue was declared exclusive and
- %% nonautodelete.
- NewState = State#q{owner = none},
- {stop, normal, NewState};
+handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
+ %% Exclusively owned queues must disappear with their owner. In
+ %% the case of clean shutdown we delete the queue synchronously in
+ %% the reader - although not required by the spec this seems to
+ %% match what people expect (see bug 21824). However we need this
+ %% monitor-and-async- delete in case the connection goes away
+ %% unexpectedly.
+ {stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, NewState} -> noreply(NewState);
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info(timeout, State = #q{backing_queue = BQ}) ->
+ noreply(maybe_run_queue_via_backing_queue(
+ fun (BQS) -> BQ:sync(BQS) end, State));
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
+
+handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
+ {hibernate, State};
+handle_pre_hibernate(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:handle_pre_hibernate(BQS),
+ %% no activity for a while == 0 egress and ingress rates
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}.
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 0f3a8664..97d6cef9 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -31,18 +31,23 @@
-module(rabbit_amqqueue_sup).
--behaviour(supervisor).
+-behaviour(supervisor2).
--export([start_link/0]).
+-export([start_link/0, start_child/1]).
-export([init/1]).
+-include("rabbit.hrl").
+
-define(SERVER, ?MODULE).
start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_child(Args) ->
+ supervisor2:start_child(?SERVER, Args).
init([]) ->
- {ok, {{simple_one_for_one, 10, 10},
+ {ok, {{simple_one_for_one_terminate, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
- temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}.
+ temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
new file mode 100644
index 00000000..2dba00ad
--- /dev/null
+++ b/src/rabbit_backing_queue.erl
@@ -0,0 +1,133 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_backing_queue).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% Called on startup with a list of durable queue names. The
+ %% queues aren't being started at this point, but this call
+ %% allows the backing queue to perform any checking necessary for
+ %% the consistency of those queues, or initialise any other
+ %% shared resources.
+ {start, 1},
+
+ %% Initialise the backing queue and its state.
+ {init, 3},
+
+ %% Called on queue shutdown when queue isn't being deleted.
+ {terminate, 1},
+
+ %% Called when the queue is terminating and needs to delete all
+ %% its content.
+ {delete_and_terminate, 1},
+
+ %% Remove all messages in the queue, but not messages which have
+ %% been fetched and are pending acks.
+ {purge, 1},
+
+ %% Publish a message.
+ {publish, 2},
+
+ %% Called for messages which have already been passed straight
+ %% out to a client. The queue will be empty for these calls
+ %% (i.e. saves the round trip through the backing queue).
+ {publish_delivered, 3},
+
+ %% Produce the next message.
+ {fetch, 2},
+
+ %% Acktags supplied are for messages which can now be forgotten
+ %% about.
+ {ack, 2},
+
+ %% A publish, but in the context of a transaction.
+ {tx_publish, 3},
+
+ %% Acks, but in the context of a transaction.
+ {tx_ack, 3},
+
+ %% Undo anything which has been done in the context of the
+ %% specified transaction.
+ {tx_rollback, 2},
+
+ %% Commit a transaction. The Fun passed in must be called once
+ %% the messages have really been commited. This CPS permits the
+ %% possibility of commit coalescing.
+ {tx_commit, 3},
+
+ %% Reinsert messages into the queue which have already been
+ %% delivered and were pending acknowledgement.
+ {requeue, 2},
+
+ %% How long is my queue?
+ {len, 1},
+
+ %% Is my queue empty?
+ {is_empty, 1},
+
+ %% For the next three functions, the assumption is that you're
+ %% monitoring something like the ingress and egress rates of the
+ %% queue. The RAM duration is thus the length of time represented
+ %% by the messages held in RAM given the current rates. If you
+ %% want to ignore all of this stuff, then do so, and return 0 in
+ %% ram_duration/1.
+
+ %% The target is to have no more messages in RAM than indicated
+ %% by the duration and the current queue rates.
+ {set_ram_duration_target, 2},
+
+ %% Optionally recalculate the duration internally (likely to be
+ %% just update your internal rates), and report how many seconds
+ %% the messages in RAM represent given the current rates of the
+ %% queue.
+ {ram_duration, 1},
+
+ %% Should 'sync' be called as soon as the queue process can
+ %% manage (either on an empty mailbox, or when a timer fires)?
+ {needs_sync, 1},
+
+ %% Called (eventually) after needs_sync returns 'true'. Note this
+ %% may be called more than once for each 'true' returned from
+ %% needs_sync.
+ {sync, 1},
+
+ %% Called immediately before the queue hibernates.
+ {handle_pre_hibernate, 1},
+
+ %% Exists for debugging purposes, to be able to expose state via
+ %% rabbitmqctl list_queues backing_queue_status
+ {status, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9ebb6e72..4ab7a2a0 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -36,6 +36,7 @@
-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
+-export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -48,7 +49,7 @@
-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> message()).
+ binary()) -> (message() | {'error', any()})).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
@@ -57,6 +58,8 @@
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-spec(is_message_persistent/1 ::
+ (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
@@ -93,10 +96,17 @@ from_content(Content) ->
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = build_content(Properties, BodyBin),
- persistent_key = none}.
+ Content = build_content(Properties, BodyBin),
+ case is_message_persistent(Content) of
+ {invalid, Other} ->
+ {error, {invalid_delivery_mode, Other}};
+ IsPersistent when is_boolean(IsPersistent) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent}
+ end.
properties(P = #'P_basic'{}) ->
P;
@@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
properties(Properties), BodyBin))).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> {invalid, Other}
+ end.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 1d47d764..27a1275a 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -95,33 +95,37 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
maybe_encode_properties(ContentProperties, none) ->
rabbit_framing:encode_properties(ContentProperties).
-build_content_frames(FragmentsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if
- FrameMax == 0 ->
- none;
- true ->
+build_content_frames(FragsRev, FrameMax, ChannelInt) ->
+ BodyPayloadMax = if FrameMax == 0 ->
+ iolist_size(FragsRev);
+ true ->
FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
end,
- build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt).
-
-build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) ->
- {SizeAcc, FragmentAcc};
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt)
- when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) ->
- <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment,
- build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev],
- BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc + size(Fragment),
- [create_frame(3, ChannelInt, Fragment) | FragmentAcc],
- FragmentsRev,
- BodyPayloadMax,
- ChannelInt).
+ build_content_frames(0, [], BodyPayloadMax, [],
+ lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
+
+build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [],
+ [], _BodyPayloadMax, _ChannelInt) ->
+ {SizeAcc, lists:reverse(FramesAcc)};
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ Frags, BodyPayloadMax, ChannelInt)
+ when FragSizeRem == 0 orelse Frags == [] ->
+ Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)),
+ FrameSize = BodyPayloadMax - FragSizeRem,
+ build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc],
+ BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt);
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ [Frag | Frags], BodyPayloadMax, ChannelInt) ->
+ Size = size(Frag),
+ {NewFragSizeRem, NewFragAcc, NewFrags} =
+ if Size == 0 -> {FragSizeRem, FragAcc, Frags};
+ Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ true -> <<Head:FragSizeRem/binary, Tail/binary>> =
+ Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
+ end,
+ build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
+ NewFrags, BodyPayloadMax, ChannelInt).
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3597fcd7..50cb5f20 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -46,10 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
+ consumer_mapping, blocking, queue_collector_pid}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -69,13 +66,13 @@
-ifdef(use_specs).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
@@ -89,10 +86,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -138,10 +135,9 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
@@ -157,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -302,6 +299,26 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ case rabbit_amqqueue:with_or_die(
+ QName, fun(Q) -> case Q of
+ #amqqueue{exclusive_owner = none} ->
+ F(Q);
+ #amqqueue{exclusive_owner = ReaderPid} ->
+ F(Q);
+ _ ->
+ {error, wrong_exclusive_owner}
+ end
+ end) of
+ {error, wrong_exclusive_owner} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QName)]);
+ Else ->
+ Else
+ end.
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -356,6 +373,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{reply, #'channel.open_ok'{}, State#ch{state = running}};
handle_method(#'channel.open'{}, _, _State) ->
@@ -386,14 +404,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
+ IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -488,11 +504,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -502,14 +518,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
dict:store(ActualConsumerTag,
QueueName,
ConsumerMapping)}};
- {error, queue_owned_by_another_connection} ->
- %% The spec is silent on which exception to use
- %% here. This seems reasonable?
- %% FIXME: check this
-
- rabbit_misc:protocol_error(
- resource_locked, "~s owned by another connection",
- [rabbit_misc:rs(QueueName)]);
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -679,34 +687,48 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
- %% FIXME: atomic create&claim
+ nowait = NoWait,
+ arguments = Args},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid}) ->
+ Owner = case ExclusiveDeclare of
+ true -> ReaderPid;
+ false -> none
+ end,
+ %% We use this in both branches, because queue_declare may yet return an
+ %% existing queue.
Finish =
- fun (Q) ->
- if ExclusiveDeclare ->
- case rabbit_amqqueue:claim_queue(Q, ReaderPid) of
- locked ->
- %% AMQP 0-8 doesn't say which
- %% exception to use, so we mimic QPid
- %% here.
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)]);
- ok -> ok
- end;
- true ->
- ok
- end,
- Q
+ fun(Q = #amqqueue{name = QueueName}) ->
+ case Q of
+ %% "equivalent" rule. NB: we don't pay attention to
+ %% anything in the arguments table, so for the sake of the
+ %% "equivalent" rule, all tables of arguments are
+ %% semantically equivalant.
+ #amqqueue{exclusive_owner = Owner} ->
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(
+ CollectorPid, Q)
+ end,
+ Q;
+ %% exclusivity trumps non-equivalence arbitrarily
+ #amqqueue{} ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)])
+ end
end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
@@ -718,34 +740,32 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- Finish(rabbit_amqqueue:declare(QueueName,
- Durable, AutoDelete, Args));
- Other = #amqqueue{name = QueueName} ->
- check_configure_permitted(QueueName, State),
+ Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner));
+ #amqqueue{} = Other ->
Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
- nowait = NoWait},
- _, State = #ch{ virtual_host = VHostPath }) ->
+ nowait = NoWait},
+ _, State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
- nowait = NoWait
- },
- _, State) ->
+ nowait = NoWait},
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -778,11 +798,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State) ->
+ _, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -933,7 +953,7 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> ok = notify_limiter(State#ch.limiter_pid,
State#ch.uncommitted_ack_q),
new_tx(State);
@@ -949,13 +969,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey) of
- ok -> NewUAMQ = queue:join(UAQ, UAMQ),
- new_tx(State#ch{unacked_message_q = NewUAMQ});
- {error, Errors} -> rabbit_misc:protocol_error(
- internal_error, "rollback failed: ~w", [Errors])
- end.
+ ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey, self()),
+ NewUAMQ = queue:join(UAQ, UAMQ),
+ new_tx(State#ch{unacked_message_q = NewUAMQ}).
rollback_and_notify(State = #ch{transaction_id = none}) ->
notify_queues(State);
@@ -966,14 +983,11 @@ fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
- %% dict:append would be simpler and avoid the
- %% lists:reverse in handle_message({recover, true},
- %% ...). However, it is significantly slower when
- %% going beyond a few thousand elements.
- dict:update(QPid,
- fun (MsgIds) -> [MsgId | MsgIds] end,
- [MsgId],
- D)
+ %% dict:append would avoid the lists:reverse in
+ %% handle_message({recover, true}, ...). However, it
+ %% is significantly slower when going beyond a few
+ %% thousand elements.
+ rabbit_misc:dict_cons(QPid, MsgId, D)
end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -1019,16 +1033,15 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(#content{properties = #'P_basic'{
- delivery_mode = Mode}}) ->
- case Mode of
- 1 -> false;
- 2 -> true;
- undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false
+is_message_persistent(Content) ->
+ case rabbit_basic:is_message_persistent(Content) of
+ {invalid, Other} ->
+ rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false;
+ IsPersistent when is_boolean(IsPersistent) ->
+ IsPersistent
end.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 078cf620..f19e8d02 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -38,9 +38,9 @@
-ifdef(use_specs).
--spec(create_basic_plt/1 :: (string()) -> 'ok').
--spec(add_to_plt/2 :: (string(), string()) -> 'ok').
--spec(dialyze_files/2 :: (string(), string()) -> 'ok').
+-spec(create_basic_plt/1 :: (file_path()) -> 'ok').
+-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok').
-spec(halt_with_code/1 :: (atom()) -> no_return()).
-endif.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 1cfba00e..8f41392f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -341,16 +341,7 @@ delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
contains(Table, MatchHead) ->
- try
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- case mnesia:match_object(Table, MatchHead, read) of
- [] -> false;
- [_|_] -> true
- end
- end.
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
@@ -382,7 +373,7 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
- case mnesia:read(rabbit_route, B) of
+ case mnesia:read({rabbit_route, B}) of
[] ->
sync_binding(B, Q#amqqueue.durable,
fun mnesia:write/3),
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2fa531a7..1ae8f7da 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -67,7 +67,7 @@ update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
Serial = case rabbit_misc:read_term_file(Filename) of
{ok, [Num]} -> Num;
- {error, enoent} -> rabbit_persister:serial();
+ {error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
@@ -78,7 +78,7 @@ update_disk_serial() ->
end,
Serial.
-%% generate a guid that is monotonically increasing per process.
+%% generate a GUID.
%%
%% The id is only unique within a single cluster and as long as the
%% serial store hasn't been deleted.
@@ -92,20 +92,18 @@ guid() ->
%% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
- %% us a GUID that is monotonically increasing per process.
+ %% us a GUID.
G = case get(guid) of
undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
0};
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
-%% generate a readable string representation of a guid. Note that any
-%% monotonicity of the guid is not preserved in the encoding.
+%% generate a readable string representation of a GUID.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(
- erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
new file mode 100644
index 00000000..a7ca20c8
--- /dev/null
+++ b/src/rabbit_invariable_queue.erl
@@ -0,0 +1,276 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_invariable_queue).
+
+-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2,
+ publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3,
+ tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1,
+ set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1,
+ handle_pre_hibernate/1, status/1]).
+
+-export([start/1]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(iv_state, { queue, qname, durable, len, pending_ack }).
+-record(tx, { pending_messages, pending_acks, is_persistent }).
+
+-ifdef(use_specs).
+
+-type(ack() :: guid() | 'blank_ack').
+-type(state() :: #iv_state { queue :: queue(),
+ qname :: queue_name(),
+ len :: non_neg_integer(),
+ pending_ack :: dict()
+ }).
+-include("rabbit_backing_queue_spec.hrl").
+
+-endif.
+
+start(DurableQueues) ->
+ ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
+
+init(QName, IsDurable, Recover) ->
+ Q = queue:from_list(case IsDurable andalso Recover of
+ true -> rabbit_persister:queue_content(QName);
+ false -> []
+ end),
+ #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = queue:len(Q),
+ pending_ack = dict:new() }.
+
+terminate(State) ->
+ State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
+
+delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
+ {_PLen, State1} = purge(State),
+ terminate(State1).
+
+purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ %% We do not purge messages pending acks.
+ {AckTags, PA} =
+ rabbit_misc:queue_fold(
+ fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) ->
+ Acc;
+ ({Msg = #basic_message { guid = Guid }, IsDelivered},
+ {AckTagsN, PAN}) ->
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
+ {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)}
+ end, {[], dict:new()}, Q),
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
+ {Len, State #iv_state { len = 0, queue = queue:new() }}.
+
+publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
+ len = Len }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }.
+
+publish_delivered(false, _Msg, State) ->
+ {blank_ack, State};
+publish_delivered(true, Msg = #basic_message { guid = Guid },
+ State = #iv_state { qname = QName, durable = IsDurable,
+ len = 0, pending_ack = PA }) ->
+ ok = persist_message(QName, IsDurable, none, Msg),
+ ok = persist_delivery(QName, IsDurable, false, Msg),
+ {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}.
+
+fetch(_AckRequired, State = #iv_state { len = 0 }) ->
+ {empty, State};
+fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName,
+ durable = IsDurable,
+ pending_ack = PA }) ->
+ {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} =
+ queue:out(Q),
+ Len1 = Len - 1,
+ ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
+ PA1 = dict:store(Guid, Msg, PA),
+ {AckTag, PA2} = case AckRequired of
+ true -> {Guid, PA1};
+ false -> ok = persist_acks(QName, IsDurable, none,
+ [Guid], PA1),
+ {blank_ack, PA}
+ end,
+ {{Msg, IsDelivered, AckTag, Len1},
+ State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
+
+ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ ok = persist_acks(QName, IsDurable, none, AckTags, PA),
+ PA1 = remove_acks(AckTags, PA),
+ State #iv_state { pending_ack = PA1 }.
+
+tx_publish(Txn, Msg, State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
+ Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }),
+ ok = persist_message(QName, IsDurable, Txn, Msg),
+ State.
+
+tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
+ pending_ack = PA }) ->
+ Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
+ store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
+ ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
+ State.
+
+tx_rollback(Txn, State = #iv_state { qname = QName }) ->
+ #tx { pending_acks = AckTags } = lookup_tx(Txn),
+ ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1,
+ Txn, QName),
+ erase_tx(Txn),
+ {lists:flatten(AckTags), State}.
+
+tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA,
+ queue = Q, len = Len }) ->
+ #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
+ ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
+ Txn, QName),
+ erase_tx(Txn),
+ Fun(),
+ AckTags1 = lists:flatten(AckTags),
+ PA1 = remove_acks(AckTags1, PA),
+ {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) ->
+ {queue:in({Msg, false}, QN), LenN + 1}
+ end, {Q, Len}, PubsRev),
+ {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
+
+requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q,
+ len = Len }) ->
+ %% We don't need to touch the persister here - the persister will
+ %% already have these messages published and delivered as
+ %% necessary. The complication is that the persister's seq_id will
+ %% now be wrong, given the position of these messages in our queue
+ %% here. However, the persister's seq_id is only used for sorting
+ %% on startup, and requeue is silent as to where the requeued
+ %% messages should appear, thus the persister is permitted to sort
+ %% based on seq_id, even though it'll likely give a different
+ %% order to the last known state of our queue, prior to shutdown.
+ {Q1, Len1} = lists:foldl(
+ fun (Guid, {QN, LenN}) ->
+ {ok, Msg = #basic_message {}} = dict:find(Guid, PA),
+ {queue:in({Msg, true}, QN), LenN + 1}
+ end, {Q, Len}, AckTags),
+ PA1 = remove_acks(AckTags, PA),
+ State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
+
+len(#iv_state { len = Len }) -> Len.
+
+is_empty(State) -> 0 == len(State).
+
+set_ram_duration_target(_DurationTarget, State) -> State.
+
+ram_duration(State) -> {0, State}.
+
+needs_sync(_State) -> false.
+
+sync(State) -> State.
+
+handle_pre_hibernate(State) -> State.
+
+status(_State) -> [].
+
+%%----------------------------------------------------------------------------
+
+remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
+
+%%----------------------------------------------------------------------------
+
+lookup_tx(Txn) ->
+ case get({txn, Txn}) of
+ undefined -> #tx { pending_messages = [],
+ pending_acks = [],
+ is_persistent = false };
+ V -> V
+ end.
+
+store_tx(Txn, Tx) ->
+ put({txn, Txn}, Tx).
+
+erase_tx(Txn) ->
+ erase({txn, Txn}).
+
+mark_tx_persistent(Txn) ->
+ store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }).
+
+is_tx_persistent(Txn) ->
+ (lookup_tx(Txn)) #tx.is_persistent.
+
+do_if_persistent(F, Txn, QName) ->
+ ok = case is_tx_persistent(Txn) of
+ false -> ok;
+ true -> F({Txn, QName})
+ end.
+
+%%----------------------------------------------------------------------------
+
+persist_message(QName, true, Txn, Msg = #basic_message {
+ is_persistent = true }) ->
+ Msg1 = Msg #basic_message {
+ %% don't persist any recoverable decoded properties,
+ %% rebuild from properties_bin on restore
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)},
+ persist_work(Txn, QName,
+ [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]);
+persist_message(_QName, _IsDurable, _Txn, _Msg) ->
+ ok.
+
+persist_delivery(QName, true, false, #basic_message { is_persistent = true,
+ guid = Guid }) ->
+ persist_work(none, QName, [{deliver, {QName, Guid}}]);
+persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
+ ok.
+
+persist_acks(QName, true, Txn, AckTags, PA) ->
+ persist_work(Txn, QName,
+ [{ack, {QName, Guid}} || Guid <- AckTags,
+ begin
+ {ok, Msg} = dict:find(Guid, PA),
+ Msg #basic_message.is_persistent
+ end]);
+persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
+ ok.
+
+persist_work(_Txn,_QName, []) ->
+ ok;
+persist_work(none, _QName, WorkList) ->
+ rabbit_persister:dirty_work(WorkList);
+persist_work(Txn, QName, WorkList) ->
+ mark_tx_persistent(Txn),
+ rabbit_persister:extend_transaction({Txn, QName}, WorkList).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 7d840861..878af029 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -249,10 +249,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
State#lim{queues = NewQueues}.
unlink_on_stopped(LimiterPid, stopped) ->
- true = unlink(LimiterPid),
- ok = receive {'EXIT', LimiterPid, _Reason} -> ok
- after 0 -> ok
- end,
+ ok = rabbit_misc:unlink_and_capture_exit(LimiterPid),
stopped;
unlink_on_stopped(_LimiterPid, Result) ->
Result.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
new file mode 100644
index 00000000..91e97ffe
--- /dev/null
+++ b/src/rabbit_memory_monitor.erl
@@ -0,0 +1,293 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+
+%% This module handles the node-wide memory statistics.
+%% It receives statistics from all queues, counts the desired
+%% queue length (in seconds), and sends this information back to
+%% queues.
+
+-module(rabbit_memory_monitor).
+
+-behaviour(gen_server2).
+
+-export([start_link/0, update/0, register/2, deregister/1,
+ report_ram_duration/2, stop/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(process, {pid, reported, sent, callback, monitor}).
+
+-record(state, {timer, %% 'internal_update' timer
+ queue_durations, %% ets #process
+ queue_duration_sum, %% sum of all queue_durations
+ queue_duration_count, %% number of elements in sum
+ memory_limit, %% how much memory we intend to use
+ desired_duration %% the desired queue duration
+ }).
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_UPDATE_INTERVAL, 2500).
+-define(TABLE_NAME, ?MODULE).
+
+%% Because we have a feedback loop here, we need to ensure that we
+%% have some space for when the queues don't quite respond as fast as
+%% we would like, or when there is buffering going on in other parts
+%% of the system. In short, we aim to stay some distance away from
+%% when the memory alarms will go off, which cause channel.flow.
+%% Note that all other Thresholds are relative to this scaling.
+-define(MEMORY_LIMIT_SCALING, 0.4).
+
+-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this
+
+%% If all queues are pushed to disk (duration 0), then the sum of
+%% their reported lengths will be 0. If memory then becomes available,
+%% unless we manually intervene, the sum will remain 0, and the queues
+%% will never get a non-zero duration. Thus when the mem use is <
+%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT.
+-define(SUM_INC_THRESHOLD, 0.95).
+-define(SUM_INC_AMOUNT, 1.0).
+
+%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
+-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
+
+-define(EPSILON, 0.000001). %% less than this and we clamp to 0
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
+-spec(update/0 :: () -> 'ok').
+-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
+-spec(deregister/1 :: (pid()) -> 'ok').
+-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()).
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+update() ->
+ gen_server2:cast(?SERVER, update).
+
+register(Pid, MFA = {_M, _F, _A}) ->
+ gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
+
+deregister(Pid) ->
+ gen_server2:cast(?SERVER, {deregister, Pid}).
+
+report_ram_duration(Pid, QueueDuration) ->
+ gen_server2:call(?SERVER,
+ {report_ram_duration, Pid, QueueDuration}, infinity).
+
+stop() ->
+ gen_server2:cast(?SERVER, stop).
+
+%%----------------------------------------------------------------------------
+%% Gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
+ (try
+ vm_memory_monitor:get_memory_limit()
+ catch
+ exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
+ end)),
+
+ {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
+ ?SERVER, update, []),
+
+ Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
+
+ {ok, internal_update(
+ #state { timer = TRef,
+ queue_durations = Ets,
+ queue_duration_sum = 0.0,
+ queue_duration_count = 0,
+ memory_limit = MemoryLimit,
+ desired_duration = infinity })}.
+
+handle_call({report_ram_duration, Pid, QueueDuration}, From,
+ State = #state { queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations,
+ desired_duration = SendDuration }) ->
+
+ [Proc = #process { reported = PrevQueueDuration }] =
+ ets:lookup(Durations, Pid),
+
+ gen_server2:reply(From, SendDuration),
+
+ {Sum1, Count1} =
+ case {PrevQueueDuration, QueueDuration} of
+ {infinity, infinity} -> {Sum, Count};
+ {infinity, _} -> {Sum + QueueDuration, Count + 1};
+ {_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
+ {_, _} -> {Sum - PrevQueueDuration + QueueDuration,
+ Count}
+ end,
+ true = ets:insert(Durations, Proc #process { reported = QueueDuration,
+ sent = SendDuration }),
+ {noreply, State #state { queue_duration_sum = zero_clamp(Sum1),
+ queue_duration_count = Count1 }};
+
+handle_call({register, Pid, MFA}, _From,
+ State = #state { queue_durations = Durations }) ->
+ MRef = erlang:monitor(process, Pid),
+ true = ets:insert(Durations, #process { pid = Pid, reported = infinity,
+ sent = infinity, callback = MFA,
+ monitor = MRef }),
+ {reply, ok, State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast({deregister, Pid}, State) ->
+ {noreply, internal_deregister(Pid, true, State)};
+
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
+ {noreply, internal_deregister(Pid, false, State)};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state { timer = TRef }) ->
+ timer:cancel(TRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%%----------------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------------
+
+zero_clamp(Sum) ->
+ case Sum < ?EPSILON of
+ true -> 0.0;
+ false -> Sum
+ end.
+
+internal_deregister(Pid, Demonitor,
+ State = #state { queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations }) ->
+ case ets:lookup(Durations, Pid) of
+ [] -> State;
+ [#process { reported = PrevQueueDuration, monitor = MRef }] ->
+ true = case Demonitor of
+ true -> erlang:demonitor(MRef);
+ false -> true
+ end,
+ {Sum1, Count1} =
+ case PrevQueueDuration of
+ infinity -> {Sum, Count};
+ _ -> {zero_clamp(Sum - PrevQueueDuration),
+ Count - 1}
+ end,
+ true = ets:delete(Durations, Pid),
+ State #state { queue_duration_sum = Sum1,
+ queue_duration_count = Count1 }
+ end.
+
+internal_update(State = #state { memory_limit = Limit,
+ queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count }) ->
+ MemoryRatio = erlang:memory(total) / Limit,
+ DesiredDurationAvg1 =
+ case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
+ true ->
+ infinity;
+ false ->
+ Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
+ true -> Sum + ?SUM_INC_AMOUNT;
+ false -> Sum
+ end,
+ (Sum1 / Count) / MemoryRatio
+ end,
+ State1 = State #state { desired_duration = DesiredDurationAvg1 },
+
+ %% only inform queues immediately if the desired duration has
+ %% decreased
+ case DesiredDurationAvg1 == infinity orelse
+ (DesiredDurationAvg /= infinity andalso
+ DesiredDurationAvg1 >= DesiredDurationAvg) of
+ true ->
+ ok;
+ false ->
+ true =
+ ets:foldl(
+ fun (Proc = #process { reported = QueueDuration,
+ sent = PrevSendDuration,
+ callback = {M, F, A} }, true) ->
+ case (case {QueueDuration, PrevSendDuration} of
+ {infinity, infinity} ->
+ true;
+ {infinity, D} ->
+ DesiredDurationAvg1 < D;
+ {D, infinity} ->
+ DesiredDurationAvg1 < D;
+ {D1, D2} ->
+ DesiredDurationAvg1 <
+ lists:min([D1,D2])
+ end) of
+ true ->
+ ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
+ ets:insert(
+ Durations,
+ Proc #process {sent = DesiredDurationAvg1});
+ false ->
+ true
+ end
+ end, true, Durations)
+ end,
+ State1.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81cecb38..9a911ab1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -43,6 +43,7 @@
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
+-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -59,6 +60,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]).
-import(mnesia).
-import(lists).
@@ -96,9 +98,10 @@
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
+-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (string()) -> ok_or_error()).
--spec(report_cover/1 :: (string()) -> 'ok').
+-spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
+-spec(report_cover/1 :: (file_path()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -119,20 +122,27 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
--spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
--spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (string(), string()) -> ok_or_error()).
+-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()).
+-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
--spec(ceil/1 :: (number()) -> number()).
+-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
+-spec(version_compare/3 :: (string(), string(),
+ ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()).
+-spec(recursive_delete/1 :: ([file_path()]) ->
+ 'ok' | {'error', {file_path(), any()}}).
+-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
+-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-endif.
@@ -220,6 +230,10 @@ enable_cover(Root) ->
_ -> ok
end.
+start_cover(NodesS) ->
+ {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ ok.
+
report_cover() ->
report_cover(".").
@@ -523,51 +537,25 @@ pid_to_string(Pid) when is_pid(Pid) ->
%% inverse of above
string_to_pid(Str) ->
- ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end,
- %% TODO: simplify this code by using the 're' module, once we drop
- %% support for R11
- %%
- %% 1) sanity check
+ Err = {error, {invalid_pid_syntax, Str}},
%% The \ before the trailing $ is only there to keep emacs
%% font-lock from getting confused.
- case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of
- {match, _, _} ->
- %% 2) strip <>
- Str1 = string:substr(Str, 2, string:len(Str) - 2),
- %% 3) extract three constituent parts, taking care to
- %% handle dots in the node part (hence the reverse and concat)
- [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")),
- NodeStr = lists:concat(lists:reverse(Rest)),
- %% 4) construct a triple term from the three parts
- TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.",
- [NodeStr, IdStr, SerStr])),
- %% 5) parse the triple
- Tokens = case erl_scan:string(TripleStr) of
- {ok, Tokens1, _} -> Tokens1;
- {error, _, _} -> ErrorFun()
- end,
- Term = case erl_parse:parse_term(Tokens) of
- {ok, Term1} -> Term1;
- {error, _} -> ErrorFun()
- end,
- {Node, Id, Ser} =
- case Term of
- {Node1, Id1, Ser1} when is_atom(Node1) andalso
- is_integer(Id1) andalso
- is_integer(Ser1) ->
- Term;
- _ ->
- ErrorFun()
- end,
- %% 6) turn the triple into a pid - see pid_to_string
- <<131,NodeEnc/binary>> = term_to_binary(Node),
+ case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$",
+ [{capture,all_but_first,list}]) of
+ {match, [NodeStr, IdStr, SerStr]} ->
+ %% the NodeStr atom might be quoted, so we have to parse
+ %% it rather than doing a simple list_to_atom
+ NodeAtom = case erl_scan:string(NodeStr) of
+ {ok, [{atom, _, X}], _} -> X;
+ {error, _, _} -> throw(Err)
+ end,
+ <<131,NodeEnc/binary>> = term_to_binary(NodeAtom),
+ Id = list_to_integer(IdStr),
+ Ser = list_to_integer(SerStr),
binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>);
nomatch ->
- ErrorFun();
- Error ->
- %% invalid regexp - shouldn't happen
- throw(Error)
- end.
+ throw(Err)
+ end.
version_compare(A, B, lte) ->
case version_compare(A, B) of
@@ -584,20 +572,67 @@ version_compare(A, B, gte) ->
version_compare(A, B, Result) ->
Result =:= version_compare(A, B).
-version_compare([], []) ->
+version_compare(A, A) ->
eq;
-version_compare([], _ ) ->
+version_compare([], [$0 | B]) ->
+ version_compare([], dropdot(B));
+version_compare([], _) ->
lt; %% 2.3 < 2.3.1
-version_compare(_ , []) ->
+version_compare([$0 | A], []) ->
+ version_compare(dropdot(A), []);
+version_compare(_, []) ->
gt; %% 2.3.1 > 2.3
version_compare(A, B) ->
{AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A),
{BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B),
ANum = list_to_integer(AStr),
BNum = list_to_integer(BStr),
- if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl),
- BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl),
- version_compare(ATl1, BTl1);
+ if ANum =:= BNum -> version_compare(dropdot(ATl), dropdot(BTl));
ANum < BNum -> lt;
ANum > BNum -> gt
end.
+
+dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
+
+recursive_delete(Files) ->
+ lists:foldl(fun (Path, ok ) -> recursive_delete1(Path);
+ (_Path, {error, _Err} = Error) -> Error
+ end, ok, Files).
+
+recursive_delete1(Path) ->
+ case filelib:is_dir(Path) of
+ false -> case file:delete(Path) of
+ ok -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ true -> case file:list_dir(Path) of
+ {ok, FileNames} ->
+ case lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete1(
+ filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames) of
+ ok ->
+ case file:del_dir(Path) of
+ ok -> ok;
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ {error, _Err} = Error ->
+ Error
+ end;
+ {error, Err} ->
+ {error, {Path, Err}}
+ end
+ end.
+
+dict_cons(Key, Value, Dict) ->
+ dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
+
+unlink_and_capture_exit(Pid) ->
+ unlink(Pid),
+ receive {'EXIT', Pid, _} -> ok
+ after 0 -> ok
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6ec3cf74..55a6761d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -48,7 +48,7 @@
-ifdef(use_specs).
-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
--spec(dir/0 :: () -> string()).
+-spec(dir/0 :: () -> file_path()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
@@ -424,9 +424,8 @@ reset(Force) ->
cannot_delete_schema)
end,
ok = delete_cluster_nodes_config(),
- %% remove persistet messages and any other garbage we find
- lists:foreach(fun file:delete/1,
- filelib:wildcard(dir() ++ "/*")),
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 7978573d..c3d0b7b7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -51,6 +51,7 @@
binary,
{packet, raw}, % no packaging
{reuseaddr, true}, % allow rebind without waiting
+ {backlog, 128}, % use the maximum listen(2) backlog value
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
%% {delay_send, true},
{exit_on_close, false}
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 019d2a26..3cd42e47 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -33,14 +33,14 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
+ force_snapshot/0, queue_content/1]).
-include("rabbit.hrl").
@@ -49,48 +49,44 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
--define(HIBERNATE_AFTER, 10000).
-
--define(MAX_WRAP_ENTRIES, 500).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies,
- snapshot}).
+ pending_logs, pending_replies, snapshot}).
%% two tables for efficient persistency
%% one maps a key to a message
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
+-record(psnapshot, {transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--type(qmsg() :: {amqqueue(), pkey()}).
+-type(pmsg() :: {queue_name(), pkey()}).
-type(work_item() ::
- {publish, message(), qmsg()} |
- {deliver, qmsg()} |
- {ack, qmsg()}).
+ {publish, message(), pmsg()} |
+ {deliver, pmsg()} |
+ {ack, pmsg()}).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 :: ([queue_name()]) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok').
+-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: (txn()) -> 'ok').
--spec(rollback_transaction/1 :: (txn()) -> 'ok').
+-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
+-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link(DurableQueues) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
@@ -116,19 +112,19 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
+queue_content(QName) ->
+ gen_server:call(?SERVER, {queue_content, QName}, infinity).
%%--------------------------------------------------------------------
-init(_Args) ->
+init([DurableQueues]) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
+ Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
+ queues = ets:new(queues, [ordered_set]),
+ next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
@@ -143,9 +139,8 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
+ {Res, NewSnapshot} =
+ internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -153,12 +148,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
+ pending_replies = [],
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -168,9 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
+handle_call({queue_content, QName}, _From,
+ State = #pstate{snapshot = #psnapshot{messages = Messages,
+ queues = Queues}}) ->
+ MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}],
+ do_reply([{ets:lookup_element(Messages, K, 2), D} ||
+ {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))],
+ State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -185,9 +184,7 @@ handle_cast(_Msg, State) ->
handle_info(timeout, State = #pstate{deadline = infinity}) ->
State1 = flush(true, State),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State1, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
+ {noreply, State1, hibernate};
handle_info(timeout, State) ->
do_noreply(flush(State));
handle_info(_Info, State) ->
@@ -236,8 +233,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline,
%% "tied" is met.
log_work(CreateWorkUnit, MessageList,
State = #pstate{
- snapshot = Snapshot = #psnapshot{
- messages = Messages}}) ->
+ snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
fun(M = {publish, Message, QK = {_QName, PKey}}) ->
@@ -282,12 +278,15 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
log_handle = LH,
- snapshot = Snapshot})
- when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
-maybe_take_snapshot(_Force, State) ->
- State.
+ snapshot = Snapshot}) ->
+ {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries),
+ if
+ Force orelse EntryCount >= MaxWrapEntries ->
+ ok = take_snapshot(LH, Snapshot),
+ State#pstate{entry_count = 0};
+ true ->
+ State
+ end.
later_ms(DeltaMilliSec) ->
{MegaSec, Sec, MicroSec} = now(),
@@ -304,7 +303,8 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- ?HIBERNATE_AFTER;
+ {ok, HibernateAfter} = application:get_env(persister_hibernate_after),
+ HibernateAfter;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -343,56 +343,64 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
+current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
- prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
+ PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
+ sets:add_element(PKey, S)
+ end, sets:new(), Queues),
+ prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
+ InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
-prune_table(Tab, Keys) ->
+prune_table(Tab, Pred) ->
true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Keys, ets:first(Tab)),
+ ok = prune_table(Tab, Pred, ets:first(Tab)),
true = ets:safe_fixtable(Tab, false).
-prune_table(_Tab, _Keys, '$end_of_table') -> ok;
-prune_table(Tab, Keys, Key) ->
- case sets:is_element(Key, Keys) of
+prune_table(_Tab, _Pred, '$end_of_table') -> ok;
+prune_table(Tab, Pred, Key) ->
+ case Pred(Key) of
true -> ok;
false -> ets:delete(Tab, Key)
end,
- prune_table(Tab, Keys, ets:next(Tab, Key)).
+ prune_table(Tab, Pred, ets:next(Tab, Key)).
internal_load_snapshot(LogHandle,
+ DurableQueues,
Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
+ {{txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
- serial = Serial,
- transactions = Ts}),
- Snapshot2 = requeue_messages(Snapshot1),
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
+ %% Remove all entries for queues that no longer exist.
+ %% Note that the 'messages' table is pruned when the next
+ %% snapshot is taken.
+ DurableQueuesSet = sets:from_list(DurableQueues),
+ prune_table(Snapshot1#psnapshot.queues,
+ fun ({QName, _PKey}) ->
+ sets:is_element(QName, DurableQueuesSet)
+ end),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
%% any uncompleted transactions will have been aborted.
- {ok, Snapshot2#psnapshot{transactions = dict:new()}};
+ {ok, Snapshot1#psnapshot{transactions = dict:new()}};
{error, Reason} -> {{error, Reason}, Snapshot}
end.
@@ -404,56 +412,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
check_version(_Other) ->
{error, unrecognised_persister_log_format}.
-requeue_messages(Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues),
- %% unstable parallel map, because order doesn't matter
- L = lists:append(
- rabbit_misc:upmap(
- %% we do as much work as possible in spawned worker
- %% processes, but we need to make sure the ets:inserts are
- %% performed in self()
- fun ({QName, Requeues}) ->
- requeue(QName, Requeues, Messages)
- end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
- ets:delete_all_objects(Messages),
- ets:delete_all_objects(Queues),
- true = ets:insert(Messages, NewMessages),
- true = ets:insert(Queues, NewQueues),
- %% contains the mutated messages and queues tables
- Snapshot.
-
-accumulate_requeues({{QName, PKey}, Delivered}, Acc) ->
- Requeue = {PKey, Delivered},
- dict:update(QName,
- fun (Requeues) -> [Requeue | Requeues] end,
- [Requeue],
- Acc).
-
-requeue(QName, Requeues, Messages) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} ->
- RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
- {_, Message} <- ets:lookup(Messages, PKey)],
- rabbit_amqqueue:redeliver(
- QPid,
- %% Messages published by the same process receive
- %% persistence keys that are monotonically
- %% increasing. Since message ordering is defined on a
- %% per-channel basis, and channels are bound to specific
- %% processes, sorting the list does provide the correct
- %% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
- lists:sort(RequeueMessages)]),
- RequeueMessages;
- {error, not_found} ->
- []
- end.
-
replay([], LogHandle, K, Snapshot) ->
case disk_log:chunk(LogHandle, K) of
{K1, Items} ->
@@ -474,50 +432,55 @@ internal_integrate_messages(Items, Snapshot) ->
internal_integrate1({extend_transaction, Key, MessageList},
Snapshot = #psnapshot {transactions = Transactions}) ->
- NewTransactions =
- dict:update(Key,
- fun (MessageLists) -> [MessageList | MessageLists] end,
- [MessageList],
- Transactions),
- Snapshot#psnapshot{transactions = NewTransactions};
+ Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList,
+ Transactions)};
internal_integrate1({rollback_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions}) ->
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 274981ef..ef3c5cc2 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -108,6 +108,7 @@ start() ->
WApp == stdlib;
WApp == kernel;
WApp == sasl;
+ WApp == crypto;
WApp == os_mon -> false;
_ -> true
end]),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5cf519b7..73a58f13 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -52,10 +52,12 @@
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
+-define(SILENT_CLOSE_DELAY, 3).
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-record(v1, {sock, connection, callback, recv_ref, connection_state,
+ queue_collector}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -233,6 +235,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
+ {ok, Collector} = rabbit_reader_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -244,7 +247,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
- connection_state = pre_init},
+ connection_state = pre_init,
+ queue_collector = Collector},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -262,7 +266,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue)
+ teardown_profiling(ProfilingValue),
+ rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -425,11 +431,17 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State = #v1{connection_state = closing,
+ queue_collector = Collector}) ->
case all_channels() of
- [] -> ok = send_on_channel0(
- State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ [] ->
+ %% Spec says "Exclusive queues may only be accessed by the current
+ %% connection, and are deleted when that connection closes."
+ %% This does not strictly imply synchrony, but in practice it seems
+ %% to be what people assume.
+ rabbit_reader_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ close_connection(State);
_ -> State
end;
maybe_close(State) ->
@@ -575,7 +587,11 @@ handle_method0(MethodName, FieldsBin, State) ->
end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
- Other -> throw({channel0_error, Other, CompleteReason})
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, Other, CompleteReason})
end
end.
@@ -722,15 +738,16 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+send_to_new_channel(Channel, AnalyzedFrame,
+ State = #v1{queue_collector = Collector}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl
new file mode 100644
index 00000000..841549e9
--- /dev/null
+++ b/src/rabbit_reader_queue_collector.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_reader_queue_collector).
+
+-behaviour(gen_server).
+
+-export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {exclusive_queues}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> {'ok', pid()}).
+-spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok').
+-spec(delete_all/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+register_exclusive_queue(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+
+delete_all(CollectorPid) ->
+ gen_server:call(CollectorPid, delete_all, infinity).
+
+shutdown(CollectorPid) ->
+ gen_server:call(CollectorPid, shutdown, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{exclusive_queues = dict:new()}}.
+
+%%--------------------------------------------------------------------------
+
+handle_call({register_exclusive_queue, Q}, _From,
+ State = #state{exclusive_queues = Queues}) ->
+ MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+ {reply, ok,
+ State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+
+handle_call(delete_all, _From,
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ [rabbit_misc:with_exit_handler(
+ fun() -> ok end,
+ fun() ->
+ erlang:demonitor(MonitorRef),
+ rabbit_amqqueue:delete(Q, false, false)
+ end)
+ || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ {reply, ok, State};
+
+handle_call(shutdown, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
+ State = #state{exclusive_queues = ExclusiveQueues}) ->
+ {noreply, State#state{exclusive_queues =
+ dict:erase(MonitorRef, ExclusiveQueues)}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 884ea4ab..03979d6c 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,104 +33,40 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--behaviour(gen_server2).
-
--export([start_link/0,
- deliver/2,
+-export([deliver/2,
match_bindings/2,
match_routing_key/2]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--define(SERVER, ?MODULE).
-
-%% cross-node routing optimisation is disabled because of bug 19758.
--define(BUG19758, true).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
--ifdef(BUG19758).
-
-deliver(QPids, Delivery) ->
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery)).
-
--else.
+deliver(QPids, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ delegate:invoke_no_result(
+ QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ {routed, QPids};
deliver(QPids, Delivery) ->
- %% we reduce inter-node traffic by grouping the qpids by node and
- %% only delivering one copy of the message to each node involved,
- %% which then in turn delivers it to its queues.
- deliver_per_node(
- dict:to_list(
- lists:foldl(
- fun (QPid, D) ->
- dict:update(node(QPid),
- fun (QPids1) -> [QPid | QPids1] end,
- [QPid], D)
- end,
- dict:new(), QPids)),
- Delivery).
-
-deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
- %% optimisation
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- run_bindings(QPids, Delivery));
-deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver in run_bindings below will deliver the
- %% message to the queue process asynchronously, and return true,
- %% which means all the QPids will always be returned. It is
- %% therefore safe to use a fire-and-forget cast here and return
- %% the QPids - the semantics is preserved. This scales much better
- %% than the non-immediate case below.
- {routed,
- lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}),
- QPids
- end,
- NodeQPids)};
-deliver_per_node(NodeQPids, Delivery) ->
- R = rabbit_misc:upmap(
- fun ({Node, QPids}) ->
- try gen_server2:call({?SERVER, Node},
- {deliver, QPids, Delivery},
- infinity)
- catch
- _Class:_Reason ->
- %% TODO: figure out what to log (and do!) here
- {false, []}
- end
- end,
- NodeQPids),
- {Routed, Handled} =
- lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) ->
- {Routed or RoutedAcc,
- %% we do the concatenation below, which
- %% should be faster
- [Handled | HandledAcc]}
- end,
- {false, []},
- R),
+ {Success, _} =
+ delegate:invoke(QPids,
+ fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
+ {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, lists:append(Handled)}).
-
--endif.
+ {Routed, Handled}).
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
@@ -141,20 +77,7 @@ match_bindings(Name, Match) ->
mnesia:table(rabbit_route),
ExchangeName == Name,
Match(Binding)]),
- lookup_qpids(
- try
- mnesia:async_dirty(fun qlc:e/1, [Query])
- catch exit:{aborted, {badarg, _}} ->
- %% work around OTP-7025, which was fixed in R12B-1, by
- %% falling back on a less efficient method
- [QName || #route{binding = Binding = #binding{
- queue_name = QName}} <-
- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = Name,
- _ = '_'}}),
- Match(Binding)]
- end).
+ lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
match_routing_key(Name, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
@@ -174,44 +97,8 @@ lookup_qpids(Queues) ->
%%--------------------------------------------------------------------
-init([]) ->
- {ok, no_state}.
-
-handle_call({deliver, QPids, Delivery}, From, State) ->
- spawn(
- fun () ->
- R = run_bindings(QPids, Delivery),
- gen_server2:reply(From, R)
- end),
- {noreply, State}.
-
-handle_cast({deliver, QPids, Delivery}, State) ->
- %% in order to preserve message ordering we must not spawn here
- run_bindings(QPids, Delivery),
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-run_bindings(QPids, Delivery) ->
- lists:foldl(
- fun (QPid, {Routed, Handled}) ->
- case catch rabbit_amqqueue:deliver(QPid, Delivery) of
- true -> {true, [QPid | Handled]};
- false -> {true, Handled};
- {'EXIT', _Reason} -> {Routed, Handled}
- end
- end,
- {false, []},
- QPids).
+fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]};
+fold_deliveries({_, false},{_, Handled}) -> {true, Handled}.
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
check_delivery(true, _ , {false, []}) -> {unroutable, []};
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 25715e6e..2c5e5112 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0, start_child/1, start_child/2,
+-export([start_link/0, start_child/1, start_child/2, start_child/3,
start_restartable_child/1, start_restartable_child/2]).
-export([init/1]).
@@ -49,8 +49,11 @@ start_child(Mod) ->
start_child(Mod, []).
start_child(Mod, Args) ->
+ start_child(Mod, Mod, Args).
+
+start_child(ChildId, Mod, Args) ->
{ok, _} = supervisor:start_child(?SERVER,
- {Mod, {Mod, start_link, Args},
+ {ChildId, {Mod, start_link, Args},
transient, ?MAX_WAIT, worker, [Mod]}),
ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 82f2d199..fa0ce2db 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -61,7 +61,32 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
- passed = test_hooks(),
+ passed = maybe_run_cluster_dependent_tests(),
+ passed.
+
+
+maybe_run_cluster_dependent_tests() ->
+ SecondaryNode = rabbit_misc:makenode("hare"),
+
+ case net_adm:ping(SecondaryNode) of
+ pong -> passed = run_cluster_dependent_tests(SecondaryNode);
+ pang -> io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end,
+ passed.
+
+run_cluster_dependent_tests(SecondaryNode) ->
+ SecondaryNodeS = atom_to_list(SecondaryNode),
+
+ ok = control_action(stop_app, []),
+ ok = control_action(reset, []),
+ ok = control_action(cluster, [SecondaryNodeS]),
+ ok = control_action(start_app, []),
+
+ io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
+ passed = test_delegates_async(SecondaryNode),
+ passed = test_delegates_sync(SecondaryNode),
+
passed.
test_priority_queue() ->
@@ -625,8 +650,12 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
%% NB: this will log an inconsistent_database error, which is harmless
+ %% Turning cover on / off is OK even if we're not in general using cover,
+ %% it just turns the engine on / off, doesn't actually log anything.
+ cover:stop([SecondaryNode]),
true = disconnect_node(SecondaryNode),
pong = net_adm:ping(SecondaryNode),
+ cover:start([SecondaryNode]),
%% leaving a cluster as a ram node
ok = control_action(reset, []),
@@ -717,17 +746,16 @@ test_user_management() ->
passed.
test_server_status() ->
-
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
- false, false, []) ||
+ false, false, [], none) ||
Name <- [<<"foo">>, <<"bar">>]],
- ok = rabbit_amqqueue:claim_queue(Q, self()),
- ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined,
<<"ctag">>, true, undefined),
%% list queues
@@ -811,6 +839,93 @@ test_hooks() ->
end,
passed.
+test_delegates_async(SecondaryNode) ->
+ Self = self(),
+ Sender = fun(Pid) -> Pid ! {invoked, Self} end,
+
+ Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end),
+
+ ok = delegate:invoke_no_result(spawn(Responder), Sender),
+ ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
+ await_response(2),
+
+ LocalPids = spawn_responders(node(), Responder, 10),
+ RemotePids = spawn_responders(SecondaryNode, Responder, 10),
+ ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
+ await_response(20),
+
+ passed.
+
+make_responder(FMsg) -> make_responder(FMsg, timeout).
+make_responder(FMsg, Throw) ->
+ fun() ->
+ receive Msg -> FMsg(Msg)
+ after 1000 -> throw(Throw)
+ end
+ end.
+
+spawn_responders(Node, Responder, Count) ->
+ [spawn(Node, Responder) || _ <- lists:seq(1, Count)].
+
+await_response(0) ->
+ ok;
+await_response(Count) ->
+ receive
+ response -> ok,
+ await_response(Count - 1)
+ after 1000 ->
+ io:format("Async reply not received~n"),
+ throw(timeout)
+ end.
+
+must_exit(Fun) ->
+ try
+ Fun(),
+ throw(exit_not_thrown)
+ catch
+ exit:_ -> ok
+ end.
+
+test_delegates_sync(SecondaryNode) ->
+ Sender = fun(Pid) -> gen_server:call(Pid, invoked) end,
+ BadSender = fun(_Pid) -> exit(exception) end,
+
+ Responder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end),
+
+ BadResponder = make_responder(fun({'$gen_call', From, invoked}) ->
+ gen_server:reply(From, response)
+ end, bad_responder_died),
+
+ response = delegate:invoke(spawn(Responder), Sender),
+ response = delegate:invoke(spawn(SecondaryNode, Responder), Sender),
+
+ must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end),
+ must_exit(fun() ->
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
+
+ LocalGoodPids = spawn_responders(node(), Responder, 2),
+ RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
+ LocalBadPids = spawn_responders(node(), BadResponder, 2),
+ RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2),
+
+ {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender),
+ true = lists:all(fun ({_, response}) -> true end, GoodRes),
+ GoodResPids = [Pid || {Pid, _} <- GoodRes],
+
+ Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
+ Good = ordsets:from_list(GoodResPids),
+
+ {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
+ true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
+ BadResPids = [Pid || {Pid, _} <- BadRes],
+
+ Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
+ Bad = ordsets:from_list(BadResPids),
+
+ passed.
+
%---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, node(), Args).
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
new file mode 100644
index 00000000..55753512
--- /dev/null
+++ b/src/supervisor2.erl
@@ -0,0 +1,917 @@
+%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is supervisor2
+%%
+%% 2) there is a new strategy called
+%% simple_one_for_one_terminate. This is exactly the same as for
+%% simple_one_for_one, except that children *are* explicitly
+%% terminated as per the shutdown component of the child_spec.
+%%
+%% All modifications are (C) 2010 LShift Ltd.
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(supervisor2).
+
+-behaviour(gen_server).
+
+%% External exports
+-export([start_link/2,start_link/3,
+ start_child/2, restart_child/2,
+ delete_child/2, terminate_child/2,
+ which_children/1,
+ check_childspecs/1]).
+
+-export([behaviour_info/1]).
+
+%% Internal exports
+-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
+-export([handle_cast/2]).
+
+-define(DICT, dict).
+
+-record(state, {name,
+ strategy,
+ children = [],
+ dynamics = ?DICT:new(),
+ intensity,
+ period,
+ restarts = [],
+ module,
+ args}).
+
+-record(child, {pid = undefined, % pid is undefined when child is not running
+ name,
+ mfa,
+ restart_type,
+ shutdown,
+ child_type,
+ modules = []}).
+
+-define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse
+ State#state.strategy =:= simple_one_for_one_terminate).
+-define(is_terminate_simple(State),
+ State#state.strategy =:= simple_one_for_one_terminate).
+
+behaviour_info(callbacks) ->
+ [{init,1}];
+behaviour_info(_Other) ->
+ undefined.
+
+%%% ---------------------------------------------------
+%%% This is a general process supervisor built upon gen_server.erl.
+%%% Servers/processes should/could also be built using gen_server.erl.
+%%% SupName = {local, atom()} | {global, atom()}.
+%%% ---------------------------------------------------
+start_link(Mod, Args) ->
+ gen_server:start_link(?MODULE, {self, Mod, Args}, []).
+
+start_link(SupName, Mod, Args) ->
+ gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
+
+%%% ---------------------------------------------------
+%%% Interface functions.
+%%% ---------------------------------------------------
+start_child(Supervisor, ChildSpec) ->
+ call(Supervisor, {start_child, ChildSpec}).
+
+restart_child(Supervisor, Name) ->
+ call(Supervisor, {restart_child, Name}).
+
+delete_child(Supervisor, Name) ->
+ call(Supervisor, {delete_child, Name}).
+
+%%-----------------------------------------------------------------
+%% Func: terminate_child/2
+%% Returns: ok | {error, Reason}
+%% Note that the child is *always* terminated in some
+%% way (maybe killed).
+%%-----------------------------------------------------------------
+terminate_child(Supervisor, Name) ->
+ call(Supervisor, {terminate_child, Name}).
+
+which_children(Supervisor) ->
+ call(Supervisor, which_children).
+
+call(Supervisor, Req) ->
+ gen_server:call(Supervisor, Req, infinity).
+
+check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
+ case check_startspec(ChildSpecs) of
+ {ok, _} -> ok;
+ Error -> {error, Error}
+ end;
+check_childspecs(X) -> {error, {badarg, X}}.
+
+%%% ---------------------------------------------------
+%%%
+%%% Initialize the supervisor.
+%%%
+%%% ---------------------------------------------------
+init({SupName, Mod, Args}) ->
+ process_flag(trap_exit, true),
+ case Mod:init(Args) of
+ {ok, {SupFlags, StartSpec}} ->
+ case init_state(SupName, SupFlags, Mod, Args) of
+ {ok, State} when ?is_simple(State) ->
+ init_dynamic(State, StartSpec);
+ {ok, State} ->
+ init_children(State, StartSpec);
+ Error ->
+ {stop, {supervisor_data, Error}}
+ end;
+ ignore ->
+ ignore;
+ Error ->
+ {stop, {bad_return, {Mod, init, Error}}}
+ end.
+
+init_children(State, StartSpec) ->
+ SupName = State#state.name,
+ case check_startspec(StartSpec) of
+ {ok, Children} ->
+ case start_children(Children, SupName) of
+ {ok, NChildren} ->
+ {ok, State#state{children = NChildren}};
+ {error, NChildren} ->
+ terminate_children(NChildren, SupName),
+ {stop, shutdown}
+ end;
+ Error ->
+ {stop, {start_spec, Error}}
+ end.
+
+init_dynamic(State, [StartSpec]) ->
+ case check_startspec([StartSpec]) of
+ {ok, Children} ->
+ {ok, State#state{children = Children}};
+ Error ->
+ {stop, {start_spec, Error}}
+ end;
+init_dynamic(_State, StartSpec) ->
+ {stop, {bad_start_spec, StartSpec}}.
+
+%%-----------------------------------------------------------------
+%% Func: start_children/2
+%% Args: Children = [#child] in start order
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Purpose: Start all children. The new list contains #child's
+%% with pids.
+%% Returns: {ok, NChildren} | {error, NChildren}
+%% NChildren = [#child] in termination order (reversed
+%% start order)
+%%-----------------------------------------------------------------
+start_children(Children, SupName) -> start_children(Children, [], SupName).
+
+start_children([Child|Chs], NChildren, SupName) ->
+ case do_start_child(SupName, Child) of
+ {ok, Pid} ->
+ start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
+ {ok, Pid, _Extra} ->
+ start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName);
+ {error, Reason} ->
+ report_error(start_error, Reason, Child, SupName),
+ {error, lists:reverse(Chs) ++ [Child | NChildren]}
+ end;
+start_children([], NChildren, _SupName) ->
+ {ok, NChildren}.
+
+do_start_child(SupName, Child) ->
+ #child{mfa = {M, F, A}} = Child,
+ case catch apply(M, F, A) of
+ {ok, Pid} when is_pid(Pid) ->
+ NChild = Child#child{pid = Pid},
+ report_progress(NChild, SupName),
+ {ok, Pid};
+ {ok, Pid, Extra} when is_pid(Pid) ->
+ NChild = Child#child{pid = Pid},
+ report_progress(NChild, SupName),
+ {ok, Pid, Extra};
+ ignore ->
+ {ok, undefined};
+ {error, What} -> {error, What};
+ What -> {error, What}
+ end.
+
+do_start_child_i(M, F, A) ->
+ case catch apply(M, F, A) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {ok, Pid, Extra} when is_pid(Pid) ->
+ {ok, Pid, Extra};
+ ignore ->
+ {ok, undefined};
+ {error, Error} ->
+ {error, Error};
+ What ->
+ {error, What}
+ end.
+
+
+%%% ---------------------------------------------------
+%%%
+%%% Callback functions.
+%%%
+%%% ---------------------------------------------------
+handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
+ #child{mfa = {M, F, A}} = hd(State#state.children),
+ Args = A ++ EArgs,
+ case do_start_child_i(M, F, Args) of
+ {ok, Pid} ->
+ NState = State#state{dynamics =
+ ?DICT:store(Pid, Args, State#state.dynamics)},
+ {reply, {ok, Pid}, NState};
+ {ok, Pid, Extra} ->
+ NState = State#state{dynamics =
+ ?DICT:store(Pid, Args, State#state.dynamics)},
+ {reply, {ok, Pid, Extra}, NState};
+ What ->
+ {reply, What, State}
+ end;
+
+%%% The requests terminate_child, delete_child and restart_child are
+%%% invalid for simple_one_for_one and simple_one_for_one_terminate
+%%% supervisors.
+handle_call({_Req, _Data}, _From, State) when ?is_simple(State) ->
+ {reply, {error, State#state.strategy}, State};
+
+handle_call({start_child, ChildSpec}, _From, State) ->
+ case check_childspec(ChildSpec) of
+ {ok, Child} ->
+ {Resp, NState} = handle_start_child(Child, State),
+ {reply, Resp, NState};
+ What ->
+ {reply, {error, What}, State}
+ end;
+
+handle_call({restart_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} when Child#child.pid =:= undefined ->
+ case do_start_child(State#state.name, Child) of
+ {ok, Pid} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {reply, {ok, Pid}, NState};
+ {ok, Pid, Extra} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {reply, {ok, Pid, Extra}, NState};
+ Error ->
+ {reply, Error, State}
+ end;
+ {value, _} ->
+ {reply, {error, running}, State};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({delete_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} when Child#child.pid =:= undefined ->
+ NState = remove_child(Child, State),
+ {reply, ok, NState};
+ {value, _} ->
+ {reply, {error, running}, State};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call({terminate_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} ->
+ NChild = do_terminate(Child, State#state.name),
+ {reply, ok, replace_child(NChild, State)};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+handle_call(which_children, _From, State) when ?is_simple(State) ->
+ [#child{child_type = CT, modules = Mods}] = State#state.children,
+ Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end,
+ ?DICT:to_list(State#state.dynamics)),
+ {reply, Reply, State};
+
+handle_call(which_children, _From, State) ->
+ Resp =
+ lists:map(fun(#child{pid = Pid, name = Name,
+ child_type = ChildType, modules = Mods}) ->
+ {Name, Pid, ChildType, Mods}
+ end,
+ State#state.children),
+ {reply, Resp, State}.
+
+
+%%% Hopefully cause a function-clause as there is no API function
+%%% that utilizes cast.
+handle_cast(null, State) ->
+ error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
+ []),
+
+ {noreply, State}.
+
+%%
+%% Take care of terminated children.
+%%
+handle_info({'EXIT', Pid, Reason}, State) ->
+ case restart_child(Pid, Reason, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {shutdown, State1} ->
+ {stop, shutdown, State1}
+ end;
+
+handle_info(Msg, State) ->
+ error_logger:error_msg("Supervisor received unexpected message: ~p~n",
+ [Msg]),
+ {noreply, State}.
+%%
+%% Terminate this server.
+%%
+terminate(_Reason, State) when ?is_terminate_simple(State) ->
+ terminate_simple_children(
+ hd(State#state.children), State#state.dynamics, State#state.name),
+ ok;
+terminate(_Reason, State) ->
+ terminate_children(State#state.children, State#state.name),
+ ok.
+
+%%
+%% Change code for the supervisor.
+%% Call the new call-back module and fetch the new start specification.
+%% Combine the new spec. with the old. If the new start spec. is
+%% not valid the code change will not succeed.
+%% Use the old Args as argument to Module:init/1.
+%% NOTE: This requires that the init function of the call-back module
+%% does not have any side effects.
+%%
+code_change(_, State, _) ->
+ case (State#state.module):init(State#state.args) of
+ {ok, {SupFlags, StartSpec}} ->
+ case catch check_flags(SupFlags) of
+ ok ->
+ {Strategy, MaxIntensity, Period} = SupFlags,
+ update_childspec(State#state{strategy = Strategy,
+ intensity = MaxIntensity,
+ period = Period},
+ StartSpec);
+ Error ->
+ {error, Error}
+ end;
+ ignore ->
+ {ok, State};
+ Error ->
+ Error
+ end.
+
+check_flags({Strategy, MaxIntensity, Period}) ->
+ validStrategy(Strategy),
+ validIntensity(MaxIntensity),
+ validPeriod(Period),
+ ok;
+check_flags(What) ->
+ {bad_flags, What}.
+
+update_childspec(State, StartSpec) when ?is_simple(State) ->
+ case check_startspec(StartSpec) of
+ {ok, [Child]} ->
+ {ok, State#state{children = [Child]}};
+ Error ->
+ {error, Error}
+ end;
+
+update_childspec(State, StartSpec) ->
+ case check_startspec(StartSpec) of
+ {ok, Children} ->
+ OldC = State#state.children, % In reverse start order !
+ NewC = update_childspec1(OldC, Children, []),
+ {ok, State#state{children = NewC}};
+ Error ->
+ {error, Error}
+ end.
+
+update_childspec1([Child|OldC], Children, KeepOld) ->
+ case update_chsp(Child, Children) of
+ {ok,NewChildren} ->
+ update_childspec1(OldC, NewChildren, KeepOld);
+ false ->
+ update_childspec1(OldC, Children, [Child|KeepOld])
+ end;
+update_childspec1([], Children, KeepOld) ->
+ % Return them in (keeped) reverse start order.
+ lists:reverse(Children ++ KeepOld).
+
+update_chsp(OldCh, Children) ->
+ case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name ->
+ Ch#child{pid = OldCh#child.pid};
+ (Ch) ->
+ Ch
+ end,
+ Children) of
+ Children ->
+ false; % OldCh not found in new spec.
+ NewC ->
+ {ok, NewC}
+ end.
+
+%%% ---------------------------------------------------
+%%% Start a new child.
+%%% ---------------------------------------------------
+
+handle_start_child(Child, State) ->
+ case get_child(Child#child.name, State) of
+ false ->
+ case do_start_child(State#state.name, Child) of
+ {ok, Pid} ->
+ Children = State#state.children,
+ {{ok, Pid},
+ State#state{children =
+ [Child#child{pid = Pid}|Children]}};
+ {ok, Pid, Extra} ->
+ Children = State#state.children,
+ {{ok, Pid, Extra},
+ State#state{children =
+ [Child#child{pid = Pid}|Children]}};
+ {error, What} ->
+ {{error, {What, Child}}, State}
+ end;
+ {value, OldChild} when OldChild#child.pid =/= undefined ->
+ {{error, {already_started, OldChild#child.pid}}, State};
+ {value, _OldChild} ->
+ {{error, already_present}, State}
+ end.
+
+%%% ---------------------------------------------------
+%%% Restart. A process has terminated.
+%%% Returns: {ok, #state} | {shutdown, #state}
+%%% ---------------------------------------------------
+
+restart_child(Pid, Reason, State) when ?is_simple(State) ->
+ case ?DICT:find(Pid, State#state.dynamics) of
+ {ok, Args} ->
+ [Child] = State#state.children,
+ RestartType = Child#child.restart_type,
+ {M, F, _} = Child#child.mfa,
+ NChild = Child#child{pid = Pid, mfa = {M, F, Args}},
+ do_restart(RestartType, Reason, NChild, State);
+ error ->
+ {ok, State}
+ end;
+restart_child(Pid, Reason, State) ->
+ Children = State#state.children,
+ case lists:keysearch(Pid, #child.pid, Children) of
+ {value, Child} ->
+ RestartType = Child#child.restart_type,
+ do_restart(RestartType, Reason, Child, State);
+ _ ->
+ {ok, State}
+ end.
+
+do_restart(permanent, Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name),
+ restart(Child, State);
+do_restart(_, normal, Child, State) ->
+ NState = state_del_child(Child, State),
+ {ok, NState};
+do_restart(_, shutdown, Child, State) ->
+ NState = state_del_child(Child, State),
+ {ok, NState};
+do_restart(transient, Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name),
+ restart(Child, State);
+do_restart(temporary, Reason, Child, State) ->
+ report_error(child_terminated, Reason, Child, State#state.name),
+ NState = state_del_child(Child, State),
+ {ok, NState}.
+
+restart(Child, State) ->
+ case add_restart(State) of
+ {ok, NState} ->
+ restart(NState#state.strategy, Child, NState);
+ {terminate, NState} ->
+ report_error(shutdown, reached_max_restart_intensity,
+ Child, State#state.name),
+ {shutdown, remove_child(Child, NState)}
+ end.
+
+restart(Strategy, Child, State)
+ when Strategy =:= simple_one_for_one orelse
+ Strategy =:= simple_one_for_one_terminate ->
+ #child{mfa = {M, F, A}} = Child,
+ Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
+ case do_start_child_i(M, F, A) of
+ {ok, Pid} ->
+ NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
+ {ok, NState};
+ {ok, Pid, _Extra} ->
+ NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
+ {ok, NState};
+ {error, Error} ->
+ report_error(start_error, Error, Child, State#state.name),
+ restart(Child, State)
+ end;
+restart(one_for_one, Child, State) ->
+ case do_start_child(State#state.name, Child) of
+ {ok, Pid} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {ok, NState};
+ {ok, Pid, _Extra} ->
+ NState = replace_child(Child#child{pid = Pid}, State),
+ {ok, NState};
+ {error, Reason} ->
+ report_error(start_error, Reason, Child, State#state.name),
+ restart(Child, State)
+ end;
+restart(rest_for_one, Child, State) ->
+ {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children),
+ ChAfter2 = terminate_children(ChAfter, State#state.name),
+ case start_children(ChAfter2, State#state.name) of
+ {ok, ChAfter3} ->
+ {ok, State#state{children = ChAfter3 ++ ChBefore}};
+ {error, ChAfter3} ->
+ restart(Child, State#state{children = ChAfter3 ++ ChBefore})
+ end;
+restart(one_for_all, Child, State) ->
+ Children1 = del_child(Child#child.pid, State#state.children),
+ Children2 = terminate_children(Children1, State#state.name),
+ case start_children(Children2, State#state.name) of
+ {ok, NChs} ->
+ {ok, State#state{children = NChs}};
+ {error, NChs} ->
+ restart(Child, State#state{children = NChs})
+ end.
+
+%%-----------------------------------------------------------------
+%% Func: terminate_children/2
+%% Args: Children = [#child] in termination order
+%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
+%% Returns: NChildren = [#child] in
+%% startup order (reversed termination order)
+%%-----------------------------------------------------------------
+terminate_children(Children, SupName) ->
+ terminate_children(Children, SupName, []).
+
+terminate_children([Child | Children], SupName, Res) ->
+ NChild = do_terminate(Child, SupName),
+ terminate_children(Children, SupName, [NChild | Res]);
+terminate_children([], _SupName, Res) ->
+ Res.
+
+terminate_simple_children(Child, Dynamics, SupName) ->
+ dict:fold(fun (Pid, _Args, _Any) ->
+ do_terminate(Child#child{pid = Pid}, SupName)
+ end, ok, Dynamics),
+ ok.
+
+do_terminate(Child, SupName) when Child#child.pid =/= undefined ->
+ case shutdown(Child#child.pid,
+ Child#child.shutdown) of
+ ok ->
+ Child#child{pid = undefined};
+ {error, OtherReason} ->
+ report_error(shutdown_error, OtherReason, Child, SupName),
+ Child#child{pid = undefined}
+ end;
+do_terminate(Child, _SupName) ->
+ Child.
+
+%%-----------------------------------------------------------------
+%% Shutdowns a child. We must check the EXIT value
+%% of the child, because it might have died with another reason than
+%% the wanted. In that case we want to report the error. We put a
+%% monitor on the child an check for the 'DOWN' message instead of
+%% checking for the 'EXIT' message, because if we check the 'EXIT'
+%% message a "naughty" child, who does unlink(Sup), could hang the
+%% supervisor.
+%% Returns: ok | {error, OtherReason} (this should be reported)
+%%-----------------------------------------------------------------
+shutdown(Pid, brutal_kill) ->
+
+ case monitor_child(Pid) of
+ ok ->
+ exit(Pid, kill),
+ receive
+ {'DOWN', _MRef, process, Pid, killed} ->
+ ok;
+ {'DOWN', _MRef, process, Pid, OtherReason} ->
+ {error, OtherReason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end;
+
+shutdown(Pid, Time) ->
+
+ case monitor_child(Pid) of
+ ok ->
+ exit(Pid, shutdown), %% Try to shutdown gracefully
+ receive
+ {'DOWN', _MRef, process, Pid, shutdown} ->
+ ok;
+ {'DOWN', _MRef, process, Pid, OtherReason} ->
+ {error, OtherReason}
+ after Time ->
+ exit(Pid, kill), %% Force termination.
+ receive
+ {'DOWN', _MRef, process, Pid, OtherReason} ->
+ {error, OtherReason}
+ end
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+%% Help function to shutdown/2 switches from link to monitor approach
+monitor_child(Pid) ->
+
+ %% Do the monitor operation first so that if the child dies
+ %% before the monitoring is done causing a 'DOWN'-message with
+ %% reason noproc, we will get the real reason in the 'EXIT'-message
+ %% unless a naughty child has already done unlink...
+ erlang:monitor(process, Pid),
+ unlink(Pid),
+
+ receive
+ %% If the child dies before the unlik we must empty
+ %% the mail-box of the 'EXIT'-message and the 'DOWN'-message.
+ {'EXIT', Pid, Reason} ->
+ receive
+ {'DOWN', _, process, Pid, _} ->
+ {error, Reason}
+ end
+ after 0 ->
+ %% If a naughty child did unlink and the child dies before
+ %% monitor the result will be that shutdown/2 receives a
+ %% 'DOWN'-message with reason noproc.
+ %% If the child should die after the unlink there
+ %% will be a 'DOWN'-message with a correct reason
+ %% that will be handled in shutdown/2.
+ ok
+ end.
+
+
+%%-----------------------------------------------------------------
+%% Child/State manipulating functions.
+%%-----------------------------------------------------------------
+state_del_child(#child{pid = Pid}, State) when ?is_simple(State) ->
+ NDynamics = ?DICT:erase(Pid, State#state.dynamics),
+ State#state{dynamics = NDynamics};
+state_del_child(Child, State) ->
+ NChildren = del_child(Child#child.name, State#state.children),
+ State#state{children = NChildren}.
+
+del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name ->
+ [Ch#child{pid = undefined} | Chs];
+del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid ->
+ [Ch#child{pid = undefined} | Chs];
+del_child(Name, [Ch|Chs]) ->
+ [Ch|del_child(Name, Chs)];
+del_child(_, []) ->
+ [].
+
+%% Chs = [S4, S3, Ch, S1, S0]
+%% Ret: {[S4, S3, Ch], [S1, S0]}
+split_child(Name, Chs) ->
+ split_child(Name, Chs, []).
+
+split_child(Name, [Ch|Chs], After) when Ch#child.name =:= Name ->
+ {lists:reverse([Ch#child{pid = undefined} | After]), Chs};
+split_child(Pid, [Ch|Chs], After) when Ch#child.pid =:= Pid ->
+ {lists:reverse([Ch#child{pid = undefined} | After]), Chs};
+split_child(Name, [Ch|Chs], After) ->
+ split_child(Name, Chs, [Ch | After]);
+split_child(_, [], After) ->
+ {lists:reverse(After), []}.
+
+get_child(Name, State) ->
+ lists:keysearch(Name, #child.name, State#state.children).
+replace_child(Child, State) ->
+ Chs = do_replace_child(Child, State#state.children),
+ State#state{children = Chs}.
+
+do_replace_child(Child, [Ch|Chs]) when Ch#child.name =:= Child#child.name ->
+ [Child | Chs];
+do_replace_child(Child, [Ch|Chs]) ->
+ [Ch|do_replace_child(Child, Chs)].
+
+remove_child(Child, State) ->
+ Chs = lists:keydelete(Child#child.name, #child.name, State#state.children),
+ State#state{children = Chs}.
+
+%%-----------------------------------------------------------------
+%% Func: init_state/4
+%% Args: SupName = {local, atom()} | {global, atom()} | self
+%% Type = {Strategy, MaxIntensity, Period}
+%% Strategy = one_for_one | one_for_all | simple_one_for_one |
+%% rest_for_one
+%% MaxIntensity = integer()
+%% Period = integer()
+%% Mod :== atom()
+%% Arsg :== term()
+%% Purpose: Check that Type is of correct type (!)
+%% Returns: {ok, #state} | Error
+%%-----------------------------------------------------------------
+init_state(SupName, Type, Mod, Args) ->
+ case catch init_state1(SupName, Type, Mod, Args) of
+ {ok, State} ->
+ {ok, State};
+ Error ->
+ Error
+ end.
+
+init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) ->
+ validStrategy(Strategy),
+ validIntensity(MaxIntensity),
+ validPeriod(Period),
+ {ok, #state{name = supname(SupName,Mod),
+ strategy = Strategy,
+ intensity = MaxIntensity,
+ period = Period,
+ module = Mod,
+ args = Args}};
+init_state1(_SupName, Type, _, _) ->
+ {invalid_type, Type}.
+
+validStrategy(simple_one_for_one_terminate) -> true;
+validStrategy(simple_one_for_one) -> true;
+validStrategy(one_for_one) -> true;
+validStrategy(one_for_all) -> true;
+validStrategy(rest_for_one) -> true;
+validStrategy(What) -> throw({invalid_strategy, What}).
+
+validIntensity(Max) when is_integer(Max),
+ Max >= 0 -> true;
+validIntensity(What) -> throw({invalid_intensity, What}).
+
+validPeriod(Period) when is_integer(Period),
+ Period > 0 -> true;
+validPeriod(What) -> throw({invalid_period, What}).
+
+supname(self,Mod) -> {self(),Mod};
+supname(N,_) -> N.
+
+%%% ------------------------------------------------------
+%%% Check that the children start specification is valid.
+%%% Shall be a six (6) tuple
+%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
+%%% where Name is an atom
+%%% Func is {Mod, Fun, Args} == {atom, atom, list}
+%%% RestartType is permanent | temporary | transient
+%%% Shutdown = integer() | infinity | brutal_kill
+%%% ChildType = supervisor | worker
+%%% Modules = [atom()] | dynamic
+%%% Returns: {ok, [#child]} | Error
+%%% ------------------------------------------------------
+
+check_startspec(Children) -> check_startspec(Children, []).
+
+check_startspec([ChildSpec|T], Res) ->
+ case check_childspec(ChildSpec) of
+ {ok, Child} ->
+ case lists:keymember(Child#child.name, #child.name, Res) of
+ true -> {duplicate_child_name, Child#child.name};
+ false -> check_startspec(T, [Child | Res])
+ end;
+ Error -> Error
+ end;
+check_startspec([], Res) ->
+ {ok, lists:reverse(Res)}.
+
+check_childspec({Name, Func, RestartType, Shutdown, ChildType, Mods}) ->
+ catch check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods);
+check_childspec(X) -> {invalid_child_spec, X}.
+
+check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) ->
+ validName(Name),
+ validFunc(Func),
+ validRestartType(RestartType),
+ validChildType(ChildType),
+ validShutdown(Shutdown, ChildType),
+ validMods(Mods),
+ {ok, #child{name = Name, mfa = Func, restart_type = RestartType,
+ shutdown = Shutdown, child_type = ChildType, modules = Mods}}.
+
+validChildType(supervisor) -> true;
+validChildType(worker) -> true;
+validChildType(What) -> throw({invalid_child_type, What}).
+
+validName(_Name) -> true.
+
+validFunc({M, F, A}) when is_atom(M),
+ is_atom(F),
+ is_list(A) -> true;
+validFunc(Func) -> throw({invalid_mfa, Func}).
+
+validRestartType(permanent) -> true;
+validRestartType(temporary) -> true;
+validRestartType(transient) -> true;
+validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}).
+
+validShutdown(Shutdown, _)
+ when is_integer(Shutdown), Shutdown > 0 -> true;
+validShutdown(infinity, supervisor) -> true;
+validShutdown(brutal_kill, _) -> true;
+validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}).
+
+validMods(dynamic) -> true;
+validMods(Mods) when is_list(Mods) ->
+ lists:foreach(fun(Mod) ->
+ if
+ is_atom(Mod) -> ok;
+ true -> throw({invalid_module, Mod})
+ end
+ end,
+ Mods);
+validMods(Mods) -> throw({invalid_modules, Mods}).
+
+%%% ------------------------------------------------------
+%%% Add a new restart and calculate if the max restart
+%%% intensity has been reached (in that case the supervisor
+%%% shall terminate).
+%%% All restarts accured inside the period amount of seconds
+%%% are kept in the #state.restarts list.
+%%% Returns: {ok, State'} | {terminate, State'}
+%%% ------------------------------------------------------
+
+add_restart(State) ->
+ I = State#state.intensity,
+ P = State#state.period,
+ R = State#state.restarts,
+ Now = erlang:now(),
+ R1 = add_restart([Now|R], Now, P),
+ State1 = State#state{restarts = R1},
+ case length(R1) of
+ CurI when CurI =< I ->
+ {ok, State1};
+ _ ->
+ {terminate, State1}
+ end.
+
+add_restart([R|Restarts], Now, Period) ->
+ case inPeriod(R, Now, Period) of
+ true ->
+ [R|add_restart(Restarts, Now, Period)];
+ _ ->
+ []
+ end;
+add_restart([], _, _) ->
+ [].
+
+inPeriod(Time, Now, Period) ->
+ case difference(Time, Now) of
+ T when T > Period ->
+ false;
+ _ ->
+ true
+ end.
+
+%%
+%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored)
+%% Calculate the time elapsed in seconds between two timestamps.
+%% If MegaSecs is equal just subtract Secs.
+%% Else calculate the Mega difference and add the Secs difference,
+%% note that Secs difference can be negative, e.g.
+%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs.
+%%
+difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM ->
+ ((CurM - TimeM) * 1000000) + (CurS - TimeS);
+difference({_, TimeS, _}, {_, CurS, _}) ->
+ CurS - TimeS.
+
+%%% ------------------------------------------------------
+%%% Error and progress reporting.
+%%% ------------------------------------------------------
+
+report_error(Error, Reason, Child, SupName) ->
+ ErrorMsg = [{supervisor, SupName},
+ {errorContext, Error},
+ {reason, Reason},
+ {offender, extract_child(Child)}],
+ error_logger:error_report(supervisor_report, ErrorMsg).
+
+
+extract_child(Child) ->
+ [{pid, Child#child.pid},
+ {name, Child#child.name},
+ {mfa, Child#child.mfa},
+ {restart_type, Child#child.restart_type},
+ {shutdown, Child#child.shutdown},
+ {child_type, Child#child.child_type}].
+
+report_progress(Child, SupName) ->
+ Progress = [{supervisor, SupName},
+ {started, extract_child(Child)}],
+ error_logger:info_report(progress, Progress).
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 68efc27f..cc4982c9 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -75,8 +75,15 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% In the event that somebody floods us with connections we can spew
+ %% the above message at error_logger faster than it can keep up.
+ %% So error_logger's mailbox grows unbounded until we eat all the
+ %% memory available and crash. So here's a meaningless synchronous call
+ %% to the underlying gen_event mechanism - when it returns the mailbox
+ %% is drained.
+ gen_event:which_handlers(error_logger),
%% handle
- apply(M, F, A ++ [Sock])
+ file_handle_cache:release_on_death(apply(M, F, A ++ [Sock]))
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
@@ -104,6 +111,7 @@ code_change(_OldVsn, State, _Extra) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
accept(State = #state{sock=LSock}) ->
+ ok = file_handle_cache:obtain(),
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 1ee958af..97e07545 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -40,12 +40,10 @@
%%
%% 1. Allow priorities (basically, change the pending queue to a
%% priority_queue).
-%%
-%% 2. Allow the submission to the pool_worker to be async.
-behaviour(gen_server2).
--export([start_link/0, submit/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -56,6 +54,8 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/1 ::
+ (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
-endif.
@@ -80,6 +80,9 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
+submit_async(Fun) ->
+ gen_server2:cast(?SERVER, {run_async, Fun}).
+
idle(WId) ->
gen_server2:cast(?SERVER, {idle, WId}).
@@ -93,7 +96,8 @@ handle_call(next_free, From, State = #state { available = Avail,
pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
- {noreply, State #state { pending = queue:in(From, Pending) },
+ {noreply,
+ State #state { pending = queue:in({next_free, From}, Pending) },
hibernate};
{{value, WId}, Avail1} ->
{reply, get_worker_pid(WId), State #state { available = Avail1 },
@@ -108,11 +112,25 @@ handle_cast({idle, WId}, State = #state { available = Avail,
{noreply, case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = queue:in(WId, Avail) };
- {{value, From}, Pending1} ->
+ {{value, {next_free, From}}, Pending1} ->
gen_server2:reply(From, get_worker_pid(WId)),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply,
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ State #state { pending = queue:in({run_async, Fun}, Pending)};
+ {{value, WId}, Avail1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ State #state { available = Avail1 }
+ end, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 3bfcc2d9..57901fd5 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -33,7 +33,9 @@
-behaviour(gen_server2).
--export([start_link/1, submit/2, run/1]).
+-export([start_link/1, submit/2, submit_async/2, run/1]).
+
+-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,6 +46,11 @@
-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/2 ::
+ (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A;
+ ({atom(), atom(), [any()]}) -> any()).
+-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.
@@ -60,7 +67,22 @@ start_link(WId) ->
submit(Pid, Fun) ->
gen_server2:call(Pid, {submit, Fun}, infinity).
+submit_async(Pid, Fun) ->
+ gen_server2:cast(Pid, {submit_async, Fun}).
+
+set_maximum_since_use(Pid, Age) ->
+ gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+
+run({M, F, A}) ->
+ apply(M, F, A);
+run(Fun) ->
+ Fun().
+
+%%----------------------------------------------------------------------------
+
init([WId]) ->
+ ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
+ [self()]),
ok = worker_pool:idle(WId),
put(worker_pool_worker, true),
{ok, WId, hibernate,
@@ -74,6 +96,15 @@ handle_call({submit, Fun}, From, WId) ->
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
+handle_cast({submit_async, Fun}, WId) ->
+ run(Fun),
+ ok = worker_pool:idle(WId),
+ {noreply, WId, hibernate};
+
+handle_cast({set_maximum_since_use, Age}, WId) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ {noreply, WId, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -85,10 +116,3 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
-
-%%----------------------------------------------------------------------------
-
-run({M, F, A}) ->
- apply(M, F, A);
-run(Fun) ->
- Fun().