diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-05-27 16:40:15 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-05-27 16:40:15 +0100 |
commit | 6258717370e33bfa3d31a0c420d3d32d435bd7cc (patch) | |
tree | 398026c8669db20e273b63e82b7592e9120118da | |
parent | 51fc0609e180ee83ec18956e3bcdc3f33f873e0d (diff) | |
parent | 615eb18470ed8b69fb6f94c8331c0e09559d263d (diff) | |
download | rabbitmq-server-6258717370e33bfa3d31a0c420d3d32d435bd7cc.tar.gz |
Merge bug22596 into default
55 files changed, 4654 insertions, 1211 deletions
@@ -56,7 +56,7 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json +AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -81,11 +81,11 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ +$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ @@ -164,7 +164,11 @@ stop-node: COVER_DIR=. start-cover: all - echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) + echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL) + echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) + +start-secondary-cover: all + echo "rabbit_misc:start_cover([\"hare\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index 496fcc1c..d9686ada 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -55,7 +55,7 @@ indented) <term> <cmdsynopsis> - <command>list_connections</command> + <command>list_connections</command> <arg choice="opt"> <replaceable>connectioninfoitem</replaceable> ... @@ -66,7 +66,7 @@ indented) However, while DocBook renders this sensibly for HTML, for some reason it doen't show anything inside <cmdsynopsis> at all for man pages. I think what we're doing is semantically correct so this is a bug in DocBook. The following - rules essentially do what DocBook does when <cmdsynopsis> is not inside a + rules essentially do what DocBook does when <cmdsynopsis> is not inside a <term>. --> diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index a35b8699..f2117e26 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -26,8 +26,8 @@ <xsl:choose> <xsl:when test="document($original)/refentry/refmeta/manvolnum"> <p> - This is the manual page for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. + This is the manual page for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. </p> <p> <a href="manpages.html">See a list of all manual pages</a>. @@ -35,13 +35,13 @@ </xsl:when> <xsl:otherwise> <p> - This is the documentation for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. + This is the documentation for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. </p> </xsl:otherwise> </xsl:choose> <p> - For more general documentation, please see the + For more general documentation, please see the <a href="admin-guide.html">administrator's guide</a>. </p> diff --git a/docs/rabbitmq-activate-plugins.1.xml b/docs/rabbitmq-activate-plugins.1.xml index ef81c201..5f831634 100644 --- a/docs/rabbitmq-activate-plugins.1.xml +++ b/docs/rabbitmq-activate-plugins.1.xml @@ -24,7 +24,7 @@ <command>rabbitmq-activate-plugins</command> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmq-deactivate-plugins.1.xml b/docs/rabbitmq-deactivate-plugins.1.xml index eacd014b..bbf1207e 100644 --- a/docs/rabbitmq-deactivate-plugins.1.xml +++ b/docs/rabbitmq-deactivate-plugins.1.xml @@ -24,7 +24,7 @@ <command>rabbitmq-deactivate-plugins</command> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml index b3862fdf..6586890a 100644 --- a/docs/rabbitmq-multi.1.xml +++ b/docs/rabbitmq-multi.1.xml @@ -26,7 +26,7 @@ <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index 25c2aefb..921da4f1 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -25,7 +25,7 @@ <arg choice="opt">-detached</arg> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> @@ -72,7 +72,7 @@ be placed in this directory. <para> Defaults to rabbit. This can be useful if you want to run more than one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per -erlang-node-and-machine combination. See the +erlang-node-and-machine combination. See the <ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single machine guide</ulink> for details. </para> @@ -93,7 +93,7 @@ one network interface. <term>RABBITMQ_NODE_PORT</term> <listitem> <para> -Defaults to 5672. +Defaults to 5672. </para> </listitem> </varlistentry> diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index d59ed638..2b416e3e 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -24,7 +24,7 @@ <arg choice="opt">command</arg> </cmdsynopsis> </refsynopsisdiv> - + <refsect1> <title>Description</title> <para> @@ -34,14 +34,14 @@ scalable implementation of an AMQP broker. </para> <para> Running <command>rabbitmq-service</command> allows the RabbitMQ broker to be run as a -service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker -service can be started and stopped using the Windows® services -applet. +service on NT/2000/2003/XP/Vista® environments. The RabbitMQ broker +service can be started and stopped using the Windows® services +applet. </para> <para> -By default the service will run in the authentication context of the +By default the service will run in the authentication context of the local system account. It is therefore necessary to synchronise Erlang -cookies between the local system account (typically +cookies between the local system account (typically <filename>C:\WINDOWS\.erlang.cookie</filename> and the account that will be used to run <command>rabbitmqctl</command>. </para> @@ -87,7 +87,7 @@ deleted as a consequence and <command>rabbitmq-server</command> will remain oper <listitem> <para> Start the service. The service must have been correctly installed -beforehand. +beforehand. </para> </listitem> </varlistentry> @@ -96,7 +96,7 @@ beforehand. <term>stop</term> <listitem> <para> -Stop the service. The service must be running for this command to +Stop the service. The service must be running for this command to have any effect. </para> </listitem> @@ -154,7 +154,7 @@ This is the location of log and database directories. <para> Defaults to rabbit. This can be useful if you want to run more than one node per machine - <envar>RABBITMQ_NODENAME</envar> should be unique per -erlang-node-and-machine combination. See the +erlang-node-and-machine combination. See the <ulink url="http://www.rabbitmq.com/clustering.html#single-machine">clustering on a single machine guide</ulink> for details. </para> @@ -175,7 +175,7 @@ one network interface. <term>RABBITMQ_NODE_PORT</term> <listitem> <para> -Defaults to 5672. +Defaults to 5672. </para> </listitem> </varlistentry> @@ -208,11 +208,11 @@ for details. <term>RABBITMQ_CONSOLE_LOG</term> <listitem> <para> -Set this varable to <code>new</code> or <code>reuse</code> to have the console +Set this varable to <code>new</code> or <code>reuse</code> to have the console output from the server redirected to a file named <code>SERVICENAME</code>.debug in the application data directory of the user that installed the service. Under Vista this will be <filename>C:\Users\AppData\username\SERVICENAME</filename>. -Under previous versions of Windows this will be +Under previous versions of Windows this will be <filename>C:\Documents and Settings\username\Application Data\SERVICENAME</filename>. If <code>RABBITMQ_CONSOLE_LOG</code> is set to <code>new</code> then a new file will be created each time the service starts. If <code>RABBITMQ_CONSOLE_LOG</code> is diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml index 34f20f92..31de7164 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq.conf.5.xml @@ -18,7 +18,7 @@ <refname>rabbitmq.conf</refname> <refpurpose>default settings for RabbitMQ AMQP server</refpurpose> </refnamediv> - + <refsect1> <title>Description</title> <para> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 8043dfc7..a2038cf0 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1,19 +1,19 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> -<!-- +<!-- There is some extra magic in this document besides the usual DocBook semantics - to allow us to derive manpages, HTML and usage messages from the same source + to allow us to derive manpages, HTML and usage messages from the same source document. Examples need to be moved to the end for man pages. To this end, <para>s and - <screen>s with role="example" will be moved, and with role="example-prefix" + <screen>s with role="example" will be moved, and with role="example-prefix" will be removed. The usage messages are more involved. We have some magic in usage.xsl to pull out the command synopsis, global option and subcommand synopses. We also pull out <para>s with role="usage". - Finally we construct lists of possible values for subcommand options, if the + Finally we construct lists of possible values for subcommand options, if the subcommand's <varlistentry> has role="usage-has-option-list". The option which takes the values should be marked with role="usage-option-list". --> @@ -664,7 +664,7 @@ <para> The <command>queueinfoitem</command> parameter is used to indicate which queue information items to include in the results. The column order in the - results will match the order of the parameters. + results will match the order of the parameters. <command>queueinfoitem</command> can take any value from the list that follows: </para> @@ -715,28 +715,15 @@ <listitem><para>Number of messages delivered to clients but not yet acknowledged.</para></listitem> </varlistentry> <varlistentry> - <term>messages_uncommitted</term> - <listitem><para>Number of messages published in as yet uncommitted transactions</para></listitem> - </varlistentry> - <varlistentry> <term>messages</term> - <listitem><para>Sum of ready, unacknowledged and uncommitted messages + <listitem><para>Sum of ready and unacknowledged messages (queue depth).</para></listitem> </varlistentry> <varlistentry> - <term>acks_uncommitted</term> - <listitem><para>Number of acknowledgements received in as yet uncommitted - transactions.</para></listitem> - </varlistentry> - <varlistentry> <term>consumers</term> <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> - <term>transactions</term> - <listitem><para>Number of transactions.</para></listitem> - </varlistentry> - <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> @@ -768,7 +755,7 @@ <para> The <command>exchangeinfoitem</command> parameter is used to indicate which exchange information items to include in the results. The column order in the - results will match the order of the parameters. + results will match the order of the parameters. <command>exchangeinfoitem</command> can take any value from the list that follows: </para> @@ -797,7 +784,7 @@ </varlistentry> </variablelist> <para> - If no <command>exchangeinfoitem</command>s are specified then + If no <command>exchangeinfoitem</command>s are specified then exchange name and type are displayed. </para> <para role="example-prefix"> @@ -839,7 +826,7 @@ <para> The <command>connectioninfoitem</command> parameter is used to indicate which connection information items to include in the results. The - column order in the results will match the order of the parameters. + column order in the results will match the order of the parameters. <command>connectioninfoitem</command> can take any value from the list that follows: </para> @@ -945,7 +932,7 @@ The <command>channelinfoitem</command> parameter is used to indicate which channel information items to include in the results. The column order in the results will match the - order of the parameters. + order of the parameters. <command>channelinfoitem</command> can take any value from the list that follows: </para> diff --git a/docs/usage.xsl b/docs/usage.xsl index 72f8880a..a6cebd93 100644 --- a/docs/usage.xsl +++ b/docs/usage.xsl @@ -11,7 +11,7 @@ <xsl:output method="text" encoding="UTF-8" indent="no"/> -<xsl:strip-space elements="*"/> +<xsl:strip-space elements="*"/> <xsl:preserve-space elements="cmdsynopsis arg" /> <xsl:template match="/"> @@ -19,7 +19,7 @@ -module(<xsl:value-of select="$modulename" />). -export([usage/0]). usage() -> %QUOTE%Usage: -<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> +<xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> <xsl:text> </xsl:text> <xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg"> <xsl:apply-templates select="." /> @@ -28,7 +28,7 @@ usage() -> %QUOTE%Usage: <xsl:text> </xsl:text> -<!-- List options (any variable list in a section called "Options"). --> +<!-- List options (any variable list in a section called "Options"). --> <xsl:for-each select=".//*[title='Options']/variablelist"> <xsl:if test="position() = 1"> Options: </xsl:if> <xsl:for-each select="varlistentry"> @@ -40,13 +40,13 @@ usage() -> %QUOTE%Usage: </xsl:for-each> </xsl:for-each> -<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). --> +<!-- Any paragraphs which have been marked as role="usage" (principally for global flags). --> <xsl:text> </xsl:text> <xsl:for-each select=".//*[title='Options']//para[@role='usage']"> <xsl:value-of select="normalize-space(.)"/><xsl:text> </xsl:text> </xsl:for-each> -<!-- List commands (any first-level variable list in a section called "Commands"). --> +<!-- List commands (any first-level variable list in a section called "Commands"). --> <xsl:for-each select=".//*[title='Commands']/variablelist | .//*[title='Commands']/refsect2/variablelist"> <xsl:if test="position() = 1">Commands: </xsl:if> <xsl:for-each select="varlistentry"> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 3616fcbf..bdf407eb 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -18,6 +18,9 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, + {backing_queue_module, rabbit_invariable_queue}, + {persister_max_wrap_entries, 500}, + {persister_hibernate_after, 10000}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 38142491..0d75310b 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,8 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, pid}). +-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, + arguments, pid}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -62,7 +63,8 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, + is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -83,9 +85,10 @@ -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). -type(regexp() :: binary()). +-type(file_path() :: string()). %% this is really an abstract type, but dialyzer does not support them --type(guid() :: any()). +-type(guid() :: binary()). -type(txn() :: guid()). -type(pkey() :: guid()). -type(r(Kind) :: @@ -102,11 +105,12 @@ write :: regexp(), read :: regexp()}). -type(amqqueue() :: - #amqqueue{name :: queue_name(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: amqp_table(), - pid :: maybe(pid())}). + #amqqueue{name :: queue_name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: maybe(pid()), + arguments :: amqp_table(), + pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), type :: exchange_type(), @@ -144,7 +148,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), @@ -154,7 +159,7 @@ message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(listener() :: #listener{node :: erlang_node(), protocol :: atom(), @@ -166,15 +171,20 @@ #amqp_error{name :: atom(), explanation :: string(), method :: atom()}). + -endif. %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(ERTS_MINIMUM, "5.6.3"). -define(MAX_WAIT, 16#ffffffff). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl new file mode 100644 index 00000000..1b536dfa --- /dev/null +++ b/include/rabbit_backing_queue_spec.hrl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-type(fetch_result() :: + %% Message, IsDelivered, AckTag, Remaining_Len + ('empty'|{basic_message(), boolean(), ack(), non_neg_integer()})). +-type(is_durable() :: boolean()). +-type(attempt_recovery() :: boolean()). +-type(purged_msg_count() :: non_neg_integer()). +-type(ack_required() :: boolean()). + +-spec(start/1 :: ([queue_name()]) -> 'ok'). +-spec(init/3 :: (queue_name(), is_durable(), attempt_recovery()) -> state()). +-spec(terminate/1 :: (state()) -> state()). +-spec(delete_and_terminate/1 :: (state()) -> state()). +-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). +-spec(publish/2 :: (basic_message(), state()) -> state()). +-spec(publish_delivered/3 :: + (ack_required(), basic_message(), state()) -> {ack(), state()}). +-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). +-spec(tx_publish/3 :: (txn(), basic_message(), state()) -> state()). +-spec(tx_ack/3 :: (txn(), [ack()], state()) -> state()). +-spec(tx_rollback/2 :: (txn(), state()) -> {[ack()], state()}). +-spec(tx_commit/3 :: (txn(), fun (() -> any()), state()) -> {[ack()], state()}). +-spec(requeue/2 :: ([ack()], state()) -> state()). +-spec(len/1 :: (state()) -> non_neg_integer()). +-spec(is_empty/1 :: (state()) -> boolean()). +-spec(set_ram_duration_target/2 :: + (('undefined' | 'infinity' | number()), state()) -> state()). +-spec(ram_duration/1 :: (state()) -> {number(), state()}). +-spec(needs_sync/1 :: (state()) -> boolean()). +-spec(sync/1 :: (state()) -> state()). +-spec(handle_pre_hibernate/1 :: (state()) -> state()). +-spec(status/1 :: (state()) -> [{atom(), any()}]). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c318a96c..00066a15 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -10,10 +10,11 @@ Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate Source4: rabbitmq-asroot-script-wrapper +Source5: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ BuildArch: noarch -BuildRequires: erlang, python-simplejson -Requires: erlang, logrotate +BuildRequires: erlang >= R12B-3, python-simplejson, xmlto, libxslt +Requires: erlang >= R12B-3, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server Requires(post): %%REQUIRES%% @@ -29,6 +30,7 @@ scalable implementation of an AMQP broker. %define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version} %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` +%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -38,6 +40,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} cp %{S:4} %{_rabbit_asroot_wrapper} +cp %{S:5} %{_rabbit_server_ocf} make %{?_smp_mflags} %install @@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins +install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server @@ -103,6 +107,12 @@ if [ $1 = 0 ]; then # Leave rabbitmq user and group fi +# Clean out plugin activation state, both on uninstall and upgrade +rm -rf %{_rabbit_erllibdir}/priv +for ext in rel script boot ; do + rm -f %{_rabbit_erllibdir}/ebin/rabbit.$ext +done + %files -f ../%{name}.files %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf new file mode 100755 index 00000000..97c58ea2 --- /dev/null +++ b/packaging/common/rabbitmq-server.ocf @@ -0,0 +1,362 @@ +#!/bin/sh +## +## OCF Resource Agent compliant rabbitmq-server resource script. +## + +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2010 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2010 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +## OCF instance parameters +## OCF_RESKEY_multi +## OCF_RESKEY_ctl +## OCF_RESKEY_nodename +## OCF_RESKEY_ip +## OCF_RESKEY_port +## OCF_RESKEY_cluster_config_file +## OCF_RESKEY_config_file +## OCF_RESKEY_log_base +## OCF_RESKEY_mnesia_base +## OCF_RESKEY_server_start_args + +####################################################################### +# Initialization: + +. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs + +####################################################################### + +OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi" +OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" +OCF_RESKEY_nodename_default="rabbit@localhost" +OCF_RESKEY_log_base_default="/var/log/rabbitmq" +: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}} +: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} +: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} +: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} + +meta_data() { + cat <<END +<?xml version="1.0"?> +<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd"> +<resource-agent name="rabbitmq-server"> +<version>1.0</version> + +<longdesc lang="en"> +Resource agent for RabbitMQ-server +</longdesc> + +<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc> + +<parameters> +<parameter name="multi" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmq-multi script +</longdesc> +<shortdesc lang="en">Path to rabbitmq-multi</shortdesc> +<content type="string" default="${OCF_RESKEY_multi_default}" /> +</parameter> + +<parameter name="ctl" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmqctl script +</longdesc> +<shortdesc lang="en">Path to rabbitmqctl</shortdesc> +<content type="string" default="${OCF_RESKEY_ctl_default}" /> +</parameter> + +<parameter name="nodename" unique="0" required="0"> +<longdesc lang="en"> +The node name for rabbitmq-server +</longdesc> +<shortdesc lang="en">Node name</shortdesc> +<content type="string" default="${OCF_RESKEY_nodename_default}" /> +</parameter> + +<parameter name="ip" unique="0" required="0"> +<longdesc lang="en"> +The IP address for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Address</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="port" unique="0" required="0"> +<longdesc lang="en"> +The IP Port for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Port</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="cluster_config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the cluster config file +</longdesc> +<shortdesc lang="en">Cluster config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the config file +</longdesc> +<shortdesc lang="en">Config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="log_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which logs will be created +</longdesc> +<shortdesc lang="en">Log base path</shortdesc> +<content type="string" default="${OCF_RESKEY_log_base_default}" /> +</parameter> + +<parameter name="mnesia_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which mnesia will store data +</longdesc> +<shortdesc lang="en">Mnesia base path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="server_start_args" unique="0" required="0"> +<longdesc lang="en"> +Additional arguments provided to the server on startup +</longdesc> +<shortdesc lang="en">Server start arguments</shortdesc> +<content type="string" default="" /> +</parameter> + +</parameters> + +<actions> +<action name="start" timeout="600" /> +<action name="stop" timeout="120" /> +<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" /> +<action name="validate-all" timeout="30" /> +<action name="meta-data" timeout="5" /> +</actions> +</resource-agent> +END +} + +rabbit_usage() { + cat <<END +usage: $0 {start|stop|monitor|validate-all|meta-data} + +Expects to have a fully populated OCF RA-compliant environment set. +END +} + +RABBITMQ_MULTI=$OCF_RESKEY_multi +RABBITMQ_CTL=$OCF_RESKEY_ctl +RABBITMQ_NODENAME=$OCF_RESKEY_nodename +RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip +RABBITMQ_NODE_PORT=$OCF_RESKEY_port +RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file +RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file +RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base +RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base +RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args +[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME" +[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME + +export_vars() { + [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS + [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT + [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE + [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE + [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE + [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE + [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS +} + +rabbit_validate_partial() { + if [ ! -x $RABBITMQ_MULTI ]; then + ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi + + if [ ! -x $RABBITMQ_CTL ]; then + ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi +} + +rabbit_validate_full() { + if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then + ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then + ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + rabbit_validate_partial + + return $OCF_SUCCESS +} + +rabbit_status() { + local rc + $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null + rc=$? + case "$rc" in + 0) + return $OCF_SUCCESS + ;; + 2) + return $OCF_NOT_RUNNING + ;; + *) + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + return $OCF_ERR_GENERIC + esac +} + +rabbit_start() { + local rc + + rabbit_validate_full + rc=$? + if [ "$rc" != $OCF_SUCCESS ]; then + return $rc + fi + + export_vars + + $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" + return $rc + fi + + # Spin waiting for the server to come up. + # Let the CRM/LRM time us out if required + start_wait=1 + while [ $start_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_SUCCESS ]; then + start_wait=0 + + elif [ "$rc" != $OCF_NOT_RUNNING ]; then + ocf_log info "rabbitmq-server start failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_stop() { + local rc + $RABBITMQ_MULTI stop_all & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + return $rc + fi + + # Spin waiting for the server to shut down. + # Let the CRM/LRM time us out if required + stop_wait=1 + while [ $stop_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_NOT_RUNNING ]; then + stop_wait=0 + break + elif [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server stop failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_monitor() { + rabbit_status + return $? +} + +case $__OCF_ACTION in + meta-data) + meta_data + exit $OCF_SUCCESS + ;; + usage|help) + rabbit_usage + exit $OCF_SUCCESS + ;; +esac + +rabbit_validate_partial || exit + +case $__OCF_ACTION in + start) + rabbit_start + ;; + stop) + rabbit_stop + ;; + monitor) + rabbit_monitor + ;; + validate-all) + exit $OCF_SUCCESS + ;; + *) + rabbit_usage + exit $OCF_ERR_UNIMPLEMENTED + ;; +esac + +exit $?
\ No newline at end of file diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index d4e2cd17..a44f49a0 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,12 +2,12 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson +Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -Depends: erlang-base | erlang-base-hipe, erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} +Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/packaging/debs/Debian/debian/postrm.in b/packaging/debs/Debian/debian/postrm.in index bfcf1f53..5290de9b 100644 --- a/packaging/debs/Debian/debian/postrm.in +++ b/packaging/debs/Debian/debian/postrm.in @@ -18,6 +18,13 @@ set -e # for details, see http://www.debian.org/doc/debian-policy/ or # the debian-policy package +remove_plugin_traces() { + # Remove traces of plugins + rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins + for ext in rel script boot ; do + rm -f @RABBIT_LIB@/ebin/rabbit.$ext + done +} case "$1" in purge) @@ -34,11 +41,7 @@ case "$1" in if [ -d /etc/rabbitmq ]; then rm -r /etc/rabbitmq fi - # Remove traces of plugins - rm -rf @RABBIT_LIB@/priv @RABBIT_LIB@/plugins - for ext in rel script boot ; do - rm -f @RABBIT_LIB@/ebin/rabbit.$ext - done + remove_plugin_traces if getent passwd rabbitmq >/dev/null; then # Stop epmd if run by the rabbitmq user pkill -u rabbitmq epmd || : @@ -50,7 +53,11 @@ case "$1" in fi ;; - remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear) + remove|upgrade) + remove_plugin_traces + ;; + + failed-upgrade|abort-install|abort-upgrade|disappear) ;; *) diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 3799c438..19166514 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -13,7 +13,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/ install/rabbitmq-server:: mkdir -p $(DOCDIR) - rm $(RABBIT_LIB)LICENSE* + rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL* for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done @@ -21,3 +21,4 @@ install/rabbitmq-server:: install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm + install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index 0ef7dd5e..4ad4c30b 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -39,7 +39,7 @@ macports: dirs $(DEST)/Portfile $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ - tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) $(MACPORTS_USERHOST) ' \ d="/tmp/mkportindex.$$$$" ; \ mkdir $$d \ && cd $$d \ diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index e1f58212..153727be 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -23,7 +23,7 @@ checksums \ sha1 @sha1@ \ rmd160 @rmd160@ -depends_build port:erlang +depends_build port:erlang port:xmlto port:libxslt depends_run port:erlang platform darwin 7 { diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 638498c1..ccdfc401 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -31,7 +31,7 @@ ## NODENAME=rabbit -SERVER_ERL_ARGS="+K true +A30 \ +SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 28eb8ebb..57fe1328 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -145,6 +145,7 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -s rabbit ^
+W w ^
+A30 ^
++P 1048576 ^
-kernel inet_default_listen_options "[{nodelay, true}]" ^
-kernel inet_default_connect_options "[{nodelay, true}]" ^
!RABBITMQ_LISTEN_ARG! ^
diff --git a/src/delegate.erl b/src/delegate.erl new file mode 100644 index 00000000..98353453 --- /dev/null +++ b/src/delegate.erl @@ -0,0 +1,211 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate). + +-define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). + +-behaviour(gen_server2). + +-export([start_link/1, invoke_no_result/2, invoke/2, process_count/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). + +-spec(process_count/0 :: () -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(Hash) -> + gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). + +invoke(Pid, Fun) when is_pid(Pid) -> + [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), + case Res of + {ok, Result, _} -> + Result; + {error, {Class, Reason, StackTrace}, _} -> + erlang:raise(Class, Reason, StackTrace) + end; + +invoke(Pids, Fun) when is_list(Pids) -> + lists:foldl( + fun({Status, Result, Pid}, {Good, Bad}) -> + case Status of + ok -> {[{Pid, Result}|Good], Bad}; + error -> {Good, [{Pid, Result}|Bad]} + end + end, + {[], []}, + invoke_per_node(split_delegate_per_node(Pids), Fun)). + +invoke_no_result(Pid, Fun) when is_pid(Pid) -> + invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), + ok; + +invoke_no_result(Pids, Fun) when is_list(Pids) -> + invoke_no_result_per_node(split_delegate_per_node(Pids), Fun), + ok. + +%%---------------------------------------------------------------------------- + +internal_call(Node, Thunk) when is_atom(Node) -> + gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). + +internal_cast(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). + +split_delegate_per_node(Pids) -> + LocalNode = node(), + {Local, Remote} = + lists:foldl( + fun (Pid, {L, D}) -> + Node = node(Pid), + case Node of + LocalNode -> {[Pid|L], D}; + _ -> {L, orddict:append(Node, Pid, D)} + end + end, + {[], orddict:new()}, Pids), + {Local, orddict:to_list(Remote)}. + +invoke_per_node(NodePids, Fun) -> + lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). + +invoke_no_result_per_node(NodePids, Fun) -> + delegate_per_node(NodePids, Fun, fun internal_cast/2), + ok. + +delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> + %% In the case where DelegateFun is internal_cast, the safe_invoke + %% is not actually async! However, in practice Fun will always be + %% something that does a gen_server:cast or similar, so I don't + %% think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. + [safe_invoke(LocalPids, Fun)| + delegate_per_remote_node(NodePids, Fun, DelegateFun)]. + +delegate_per_remote_node(NodePids, Fun, DelegateFun) -> + Self = self(), + %% Note that this is unsafe if the Fun requires reentrancy to the + %% local_server. I.e. if self() == local_server(Node) then we'll + %% block forever. + [gen_server2:cast( + local_server(Node), + {thunk, fun() -> + Self ! {result, + DelegateFun( + Node, fun() -> safe_invoke(Pids, Fun) end)} + end}) || {Node, Pids} <- NodePids], + [receive {result, Result} -> Result end || _ <- NodePids]. + +local_server(Node) -> + case get({delegate_local_server_name, Node}) of + undefined -> + Name = server(erlang:phash2({self(), Node}, process_count())), + put({delegate_local_server_name, Node}, Name), + Name; + Name -> Name + end. + +remote_server(Node) -> + case get({delegate_remote_server_name, Node}) of + undefined -> + case rpc:call(Node, delegate, process_count, []) of + {badrpc, _} -> + %% Have to return something, if we're just casting + %% then we don't want to blow up + server(1); + Count -> + Name = server(erlang:phash2({self(), Node}, Count)), + put({delegate_remote_server_name, Node}, Name), + Name + end; + Name -> Name + end. + +server(Hash) -> + list_to_atom("delegate_process_" ++ integer_to_list(Hash)). + +safe_invoke(Pids, Fun) when is_list(Pids) -> + [safe_invoke(Pid, Fun) || Pid <- Pids]; +safe_invoke(Pid, Fun) when is_pid(Pid) -> + try + {ok, Fun(Pid), Pid} + catch + Class:Reason -> + {error, {Class, Reason, erlang:get_stacktrace()}, Pid} + end. + +process_count() -> + ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, no_state, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +%% We don't need a catch here; we always go via safe_invoke. A catch here would +%% be the wrong thing anyway since the Thunk can throw multiple errors. +handle_call({thunk, Thunk}, _From, State) -> + {reply, Thunk(), State, hibernate}. + +handle_cast({thunk, Thunk}, State) -> + Thunk(), + {noreply, State, hibernate}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl new file mode 100644 index 00000000..1c1d62a9 --- /dev/null +++ b/src/delegate_sup.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(delegate_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%---------------------------------------------------------------------------- + +init(_Args) -> + {ok, {{one_for_one, 10, 10}, + [{Hash, {delegate, start_link, [Hash]}, + transient, 16#ffffffff, worker, [delegate]} || + Hash <- lists:seq(0, delegate:process_count() - 1)]}}. + +%%---------------------------------------------------------------------------- diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl new file mode 100644 index 00000000..0f648dcd --- /dev/null +++ b/src/file_handle_cache.erl @@ -0,0 +1,862 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(file_handle_cache). + +%% A File Handle Cache +%% +%% This extends a subset of the functionality of the Erlang file +%% module. +%% +%% Some constraints +%% 1) This supports one writer, multiple readers per file. Nothing +%% else. +%% 2) Do not open the same file from different processes. Bad things +%% may happen. +%% 3) Writes are all appends. You cannot write to the middle of a +%% file, although you can truncate and then append if you want. +%% 4) Although there is a write buffer, there is no read buffer. Feel +%% free to use the read_ahead mode, but beware of the interaction +%% between that buffer and the write buffer. +%% +%% Some benefits +%% 1) You do not have to remember to call sync before close +%% 2) Buffering is much more flexible than with plain file module, and +%% you can control when the buffer gets flushed out. This means that +%% you can rely on reads-after-writes working, without having to call +%% the expensive sync. +%% 3) Unnecessary calls to position and sync get optimised out. +%% 4) You can find out what your 'real' offset is, and what your +%% 'virtual' offset is (i.e. where the hdl really is, and where it +%% would be after the write buffer is written out). +%% 5) You can find out what the offset was when you last sync'd. +%% +%% There is also a server component which serves to limit the number +%% of open file handles in a "soft" way - the server will never +%% prevent a client from opening a handle, but may immediately tell it +%% to close the handle. Thus you can set the limit to zero and it will +%% still all work correctly, it is just that effectively no caching +%% will take place. The operation of limiting is as follows: +%% +%% On open and close, the client sends messages to the server +%% informing it of opens and closes. This allows the server to keep +%% track of the number of open handles. The client also keeps a +%% gb_tree which is updated on every use of a file handle, mapping the +%% time at which the file handle was last used (timestamp) to the +%% handle. Thus the smallest key in this tree maps to the file handle +%% that has not been used for the longest amount of time. This +%% smallest key is included in the messages to the server. As such, +%% the server keeps track of when the least recently used file handle +%% was used *at the point of the most recent open or close* by each +%% client. +%% +%% Note that this data can go very out of date, by the client using +%% the least recently used handle. +%% +%% When the limit is reached, the server calculates the average age of +%% the last reported least recently used file handle of all the +%% clients. It then tells all the clients to close any handles not +%% used for longer than this average, by invoking the callback the +%% client registered. The client should receive this message and pass +%% it into set_maximum_since_use/1. However, it is highly possible +%% this age will be greater than the ages of all the handles the +%% client knows of because the client has used its file handles in the +%% mean time. Thus at this point the client reports to the server the +%% current timestamp at which its least recently used file handle was +%% last used. The server will check two seconds later that either it +%% is back under the limit, in which case all is well again, or if +%% not, it will calculate a new average age. Its data will be much +%% more recent now, and so it is very likely that when this is +%% communicated to the clients, the clients will close file handles. +%% +%% The advantage of this scheme is that there is only communication +%% from the client to the server on open, close, and when in the +%% process of trying to reduce file handle usage. There is no +%% communication from the client to the server on normal file handle +%% operations. This scheme forms a feed-back loop - the server does +%% not care which file handles are closed, just that some are, and it +%% checks this repeatedly when over the limit. Given the guarantees of +%% now(), even if there is just one file handle open, a limit of 1, +%% and one client, it is certain that when the client calculates the +%% age of the handle, it will be greater than when the server +%% calculated it, hence it should be closed. +%% +%% Handles which are closed as a result of the server are put into a +%% "soft-closed" state in which the handle is closed (data flushed out +%% and sync'd first) but the state is maintained. The handle will be +%% fully reopened again as soon as needed, thus users of this library +%% do not need to worry about their handles being closed by the server +%% - reopening them when necessary is handled transparently. +%% +%% The server also supports obtain and release_on_death. obtain/0 +%% blocks until a file descriptor is available. release_on_death/1 +%% takes a pid and monitors the pid, reducing the count by 1 when the +%% pid dies. Thus the assumption is that obtain/0 is called first, and +%% when that returns, release_on_death/1 is called with the pid who +%% "owns" the file descriptor. This is, for example, used to track the +%% use of file descriptors through network sockets. + +-behaviour(gen_server). + +-export([register_callback/3]). +-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, + last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, + flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). +-export([release_on_death/1, obtain/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(RESERVED_FOR_OTHERS, 100). +-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). +-define(FILE_HANDLES_LIMIT_OTHER, 1024). +-define(FILE_HANDLES_CHECK_INTERVAL, 2000). + +%%---------------------------------------------------------------------------- + +-record(file, + { reader_count, + has_writer + }). + +-record(handle, + { hdl, + offset, + trusted_offset, + is_dirty, + write_buffer_size, + write_buffer_size_limit, + write_buffer, + at_eof, + path, + mode, + options, + is_write, + is_read, + last_used_at + }). + +-record(fhc_state, + { elders, + limit, + count, + obtains, + callbacks, + client_mrefs, + timer_ref + }). + +%%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ref() :: any()). +-type(error() :: {'error', any()}). +-type(ok_or_error() :: ('ok' | error())). +-type(val_or_error(T) :: ({'ok', T} | error())). +-type(position() :: ('bof' | 'eof' | non_neg_integer() | + {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})). +-type(offset() :: non_neg_integer()). + +-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). +-spec(open/3 :: + (string(), [any()], + [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) -> + val_or_error(ref())). +-spec(close/1 :: (ref()) -> ok_or_error()). +-spec(read/2 :: (ref(), non_neg_integer()) -> + val_or_error([char()] | binary()) | 'eof'). +-spec(append/2 :: (ref(), iodata()) -> ok_or_error()). +-spec(sync/1 :: (ref()) -> ok_or_error()). +-spec(position/2 :: (ref(), position()) -> val_or_error(offset())). +-spec(truncate/1 :: (ref()) -> ok_or_error()). +-spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). +-spec(flush/1 :: (ref()) -> ok_or_error()). +-spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> + val_or_error(non_neg_integer())). +-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). +-spec(delete/1 :: (ref()) -> ok_or_error()). +-spec(clear/1 :: (ref()) -> ok_or_error()). +-spec(release_on_death/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). + +register_callback(M, F, A) + when is_atom(M) andalso is_atom(F) andalso is_list(A) -> + gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}). + +open(Path, Mode, Options) -> + Path1 = filename:absname(Path), + File1 = #file { reader_count = RCount, has_writer = HasWriter } = + case get({Path1, fhc_file}) of + File = #file {} -> File; + undefined -> #file { reader_count = 0, + has_writer = false } + end, + Mode1 = append_to_write(Mode), + IsWriter = is_writer(Mode1), + case IsWriter andalso HasWriter of + true -> {error, writer_exists}; + false -> Ref = make_ref(), + case open1(Path1, Mode1, Options, Ref, bof, new) of + {ok, _Handle} -> + RCount1 = case is_reader(Mode1) of + true -> RCount + 1; + false -> RCount + end, + HasWriter1 = HasWriter orelse IsWriter, + put({Path1, fhc_file}, + File1 #file { reader_count = RCount1, + has_writer = HasWriter1 }), + {ok, Ref}; + Error -> + Error + end + end. + +close(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> ok; + Handle -> case hard_close(Handle) of + ok -> ok; + {Error, Handle1} -> put_handle(Ref, Handle1), + Error + end + end. + +read(Ref, Count) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_read = false }]) -> + {error, not_open_for_reading}; + ([Handle = #handle { hdl = Hdl, offset = Offset }]) -> + case file:read(Hdl, Count) of + {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data), + {Obj, + [Handle #handle { offset = Offset1 }]}; + eof -> {eof, [Handle #handle { at_eof = true }]}; + Error -> {Error, [Handle]} + end + end). + +append(Ref, Data) -> + with_handles( + [Ref], + fun ([#handle { is_write = false }]) -> + {error, not_open_for_writing}; + ([Handle]) -> + case maybe_seek(eof, Handle) of + {{ok, _Offset}, #handle { hdl = Hdl, offset = Offset, + write_buffer_size_limit = 0, + at_eof = true } = Handle1} -> + Offset1 = Offset + iolist_size(Data), + {file:write(Hdl, Data), + [Handle1 #handle { is_dirty = true, offset = Offset1 }]}; + {{ok, _Offset}, #handle { write_buffer = WriteBuffer, + write_buffer_size = Size, + write_buffer_size_limit = Limit, + at_eof = true } = Handle1} -> + WriteBuffer1 = [Data | WriteBuffer], + Size1 = Size + iolist_size(Data), + Handle2 = Handle1 #handle { write_buffer = WriteBuffer1, + write_buffer_size = Size1 }, + case Limit /= infinity andalso Size1 > Limit of + true -> {Result, Handle3} = write_buffer(Handle2), + {Result, [Handle3]}; + false -> {ok, [Handle2]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). + +sync(Ref) -> + with_flushed_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> + ok; + ([Handle = #handle { hdl = Hdl, offset = Offset, + is_dirty = true, write_buffer = [] }]) -> + case file:sync(Hdl) of + ok -> {ok, [Handle #handle { trusted_offset = Offset, + is_dirty = false }]}; + Error -> {Error, [Handle]} + end + end). + +position(Ref, NewOffset) -> + with_flushed_handles( + [Ref], + fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle), + {Result, [Handle1]} + end). + +truncate(Ref) -> + with_flushed_handles( + [Ref], + fun ([Handle1 = #handle { hdl = Hdl, offset = Offset, + trusted_offset = TOffset }]) -> + case file:truncate(Hdl) of + ok -> TOffset1 = lists:min([Offset, TOffset]), + {ok, [Handle1 #handle { trusted_offset = TOffset1, + at_eof = true }]}; + Error -> {Error, [Handle1]} + end + end). + +last_sync_offset(Ref) -> + with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) -> + {ok, TOffset} + end). + +current_virtual_offset(Ref) -> + with_handles([Ref], fun ([#handle { at_eof = true, is_write = true, + offset = Offset, + write_buffer_size = Size }]) -> + {ok, Offset + Size}; + ([#handle { offset = Offset }]) -> + {ok, Offset} + end). + +current_raw_offset(Ref) -> + with_handles([Ref], fun ([Handle]) -> {ok, Handle #handle.offset} end). + +flush(Ref) -> + with_flushed_handles([Ref], fun ([Handle]) -> {ok, [Handle]} end). + +copy(Src, Dest, Count) -> + with_flushed_handles( + [Src, Dest], + fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset }, + DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }] + ) -> + case file:copy(SHdl, DHdl, Count) of + {ok, Count1} = Result1 -> + {Result1, + [SHandle #handle { offset = SOffset + Count1 }, + DHandle #handle { offset = DOffset + Count1 }]}; + Error -> + {Error, [SHandle, DHandle]} + end; + (_Handles) -> + {error, incorrect_handle_modes} + end). + +delete(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> + ok; + Handle = #handle { path = Path } -> + case hard_close(Handle #handle { is_dirty = false, + write_buffer = [] }) of + ok -> file:delete(Path); + {Error, Handle1} -> put_handle(Ref, Handle1), + Error + end + end. + +clear(Ref) -> + with_handles( + [Ref], + fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) -> + ok; + ([Handle]) -> + case maybe_seek(bof, Handle #handle { write_buffer = [], + write_buffer_size = 0 }) of + {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> + case file:truncate(Hdl) of + ok -> {ok, [Handle1 #handle {trusted_offset = 0, + at_eof = true }]}; + Error -> {Error, [Handle1]} + end; + {{error, _} = Error, Handle1} -> + {Error, [Handle1]} + end + end). + +set_maximum_since_use(MaximumAge) -> + Now = now(), + case lists:foldl( + fun ({{Ref, fhc_handle}, + Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> + Age = timer:now_diff(Now, Then), + case Hdl /= closed andalso Age >= MaximumAge of + true -> {Res, Handle1} = soft_close(Handle), + case Res of + ok -> put({Ref, fhc_handle}, Handle1), + false; + _ -> put_handle(Ref, Handle1), + Rep + end; + false -> Rep + end; + (_KeyValuePair, Rep) -> + Rep + end, true, get()) of + true -> age_tree_change(), ok; + false -> ok + end. + +release_on_death(Pid) when is_pid(Pid) -> + gen_server:cast(?SERVER, {release_on_death, Pid}). + +obtain() -> + gen_server:call(?SERVER, obtain, infinity). + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +is_reader(Mode) -> lists:member(read, Mode). + +is_writer(Mode) -> lists:member(write, Mode). + +append_to_write(Mode) -> + case lists:member(append, Mode) of + true -> [write | Mode -- [append, write]]; + false -> Mode + end. + +with_handles(Refs, Fun) -> + ResHandles = lists:foldl( + fun (Ref, {ok, HandlesAcc}) -> + case get_or_reopen(Ref) of + {ok, Handle} -> {ok, [Handle | HandlesAcc]}; + Error -> Error + end; + (_Ref, Error) -> + Error + end, {ok, []}, Refs), + case ResHandles of + {ok, Handles} -> + case Fun(lists:reverse(Handles)) of + {Result, Handles1} when is_list(Handles1) -> + lists:zipwith(fun put_handle/2, Refs, Handles1), + Result; + Result -> + Result + end; + Error -> + Error + end. + +with_flushed_handles(Refs, Fun) -> + with_handles( + Refs, + fun (Handles) -> + case lists:foldl( + fun (Handle, {ok, HandlesAcc}) -> + {Res, Handle1} = write_buffer(Handle), + {Res, [Handle1 | HandlesAcc]}; + (Handle, {Error, HandlesAcc}) -> + {Error, [Handle | HandlesAcc]} + end, {ok, []}, Handles) of + {ok, Handles1} -> + Fun(lists:reverse(Handles1)); + {Error, Handles1} -> + {Error, lists:reverse(Handles1)} + end + end). + +get_or_reopen(Ref) -> + case get({Ref, fhc_handle}) of + undefined -> + {error, not_open, Ref}; + #handle { hdl = closed, offset = Offset, + path = Path, mode = Mode, options = Options } -> + open1(Path, Mode, Options, Ref, Offset, reopen); + Handle -> + {ok, Handle} + end. + +put_handle(Ref, Handle = #handle { last_used_at = Then }) -> + Now = now(), + age_tree_update(Then, Now, Ref), + put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). + +with_age_tree(Fun) -> + put(fhc_age_tree, Fun(case get(fhc_age_tree) of + undefined -> gb_trees:empty(); + AgeTree -> AgeTree + end)). + +age_tree_insert(Now, Ref) -> + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:insert(Now, Ref, Tree), + {Oldest, _Ref} = gb_trees:smallest(Tree1), + gen_server:cast(?SERVER, {open, self(), Oldest}), + Tree1 + end). + +age_tree_update(Then, Now, Ref) -> + with_age_tree( + fun (Tree) -> + gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree)) + end). + +age_tree_delete(Then) -> + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:delete_any(Then, Tree), + Oldest = case gb_trees:is_empty(Tree1) of + true -> + undefined; + false -> + {Oldest1, _Ref} = gb_trees:smallest(Tree1), + Oldest1 + end, + gen_server:cast(?SERVER, {close, self(), Oldest}), + Tree1 + end). + +age_tree_change() -> + with_age_tree( + fun (Tree) -> + case gb_trees:is_empty(Tree) of + true -> Tree; + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + gen_server:cast(?SERVER, {update, self(), Oldest}) + end, + Tree + end). + +open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> + Mode1 = case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end, + case file:open(Path, Mode1) of + {ok, Hdl} -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Now = now(), + Handle = #handle { hdl = Hdl, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = Now }, + {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + age_tree_insert(Now, Ref), + {ok, Handle2}; + {error, Reason} -> + {error, Reason} + end. + +soft_close(Handle = #handle { hdl = closed }) -> + {ok, Handle}; +soft_close(Handle) -> + case write_buffer(Handle) of + {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + last_used_at = Then } = Handle1 } -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + ok = file:close(Hdl), + age_tree_delete(Then), + {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, + is_dirty = false }}; + {_Error, _Handle} = Result -> + Result + end. + +hard_close(Handle) -> + case soft_close(Handle) of + {ok, #handle { path = Path, + is_read = IsReader, is_write = IsWriter }} -> + #file { reader_count = RCount, has_writer = HasWriter } = File = + get({Path, fhc_file}), + RCount1 = case IsReader of + true -> RCount - 1; + false -> RCount + end, + HasWriter1 = HasWriter andalso not IsWriter, + case RCount1 =:= 0 andalso not HasWriter1 of + true -> erase({Path, fhc_file}); + false -> put({Path, fhc_file}, + File #file { reader_count = RCount1, + has_writer = HasWriter1 }) + end, + ok; + {_Error, _Handle} = Result -> + Result + end. + +maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset, + at_eof = AtEoF }) -> + {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset), + case (case NeedsSeek of + true -> file:position(Hdl, NewOffset); + false -> {ok, Offset} + end) of + {ok, Offset1} = Result -> + {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }}; + {error, _} = Error -> + {Error, Handle} + end. + +needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false}; +needs_seek( AtEoF, _CurOffset, {cur, 0}) -> {AtEoF, false}; +needs_seek( true, _CurOffset, eof ) -> {true , false}; +needs_seek( true, _CurOffset, {eof, 0}) -> {true , false}; +needs_seek( false, _CurOffset, eof ) -> {true , true }; +needs_seek( false, _CurOffset, {eof, 0}) -> {true , true }; +needs_seek( AtEoF, 0, bof ) -> {AtEoF, false}; +needs_seek( AtEoF, 0, {bof, 0}) -> {AtEoF, false}; +needs_seek( AtEoF, CurOffset, CurOffset) -> {AtEoF, false}; +needs_seek( true, CurOffset, {bof, DesiredOffset}) + when DesiredOffset >= CurOffset -> + {true, true}; +needs_seek( true, _CurOffset, {cur, DesiredOffset}) + when DesiredOffset > 0 -> + {true, true}; +needs_seek( true, CurOffset, DesiredOffset) %% same as {bof, DO} + when is_integer(DesiredOffset) andalso DesiredOffset >= CurOffset -> + {true, true}; +%% because we can't really track size, we could well end up at EoF and not know +needs_seek(_AtEoF, _CurOffset, _DesiredOffset) -> + {false, true}. + +write_buffer(Handle = #handle { write_buffer = [] }) -> + {ok, Handle}; +write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, + write_buffer = WriteBuffer, + write_buffer_size = DataSize, + at_eof = true }) -> + case file:write(Hdl, lists:reverse(WriteBuffer)) of + ok -> + Offset1 = Offset + DataSize, + {ok, Handle #handle { offset = Offset1, is_dirty = true, + write_buffer = [], write_buffer_size = 0 }}; + {error, _} = Error -> + {Error, Handle} + end. + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + Limit = case application:get_env(file_handles_high_watermark) of + {ok, Watermark} when (is_integer(Watermark) andalso + Watermark > 0) -> + Watermark; + _ -> + ulimit() + end, + error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), + {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, + obtains = [], callbacks = dict:new(), + client_mrefs = dict:new(), timer_ref = undefined }}. + +handle_call(obtain, From, State = #fhc_state { count = Count }) -> + State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = + maybe_reduce(State #fhc_state { count = Count + 1 }), + case Limit /= infinity andalso Count1 >= Limit of + true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], + count = Count1 - 1 }}; + false -> {reply, ok, State1} + end. + +handle_cast({register_callback, Pid, MFA}, + State = #fhc_state { callbacks = Callbacks }) -> + {noreply, ensure_mref( + Pid, State #fhc_state { + callbacks = dict:store(Pid, MFA, Callbacks) })}; + +handle_cast({open, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders, count = Count }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + {noreply, maybe_reduce( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count + 1 }))}; + +handle_cast({update, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders }) -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + %% don't call maybe_reduce from here otherwise we can create a + %% storm of messages + {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + +handle_cast({close, Pid, EldestUnusedSince}, State = + #fhc_state { elders = Elders, count = Count }) -> + Elders1 = case EldestUnusedSince of + undefined -> dict:erase(Pid, Elders); + _ -> dict:store(Pid, EldestUnusedSince, Elders) + end, + {noreply, process_obtains( + ensure_mref(Pid, State #fhc_state { elders = Elders1, + count = Count - 1 }))}; + +handle_cast(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; + +handle_cast({release_on_death, Pid}, State) -> + _MRef = erlang:monitor(process, Pid), + {noreply, State}. + +handle_info({'DOWN', MRef, process, Pid, _Reason}, State = + #fhc_state { count = Count, callbacks = Callbacks, + client_mrefs = ClientMRefs, elders = Elders }) -> + {noreply, process_obtains( + case dict:find(Pid, ClientMRefs) of + {ok, MRef} -> State #fhc_state { + elders = dict:erase(Pid, Elders), + client_mrefs = dict:erase(Pid, ClientMRefs), + callbacks = dict:erase(Pid, Callbacks) }; + _ -> State #fhc_state { count = Count - 1 } + end)}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% server helpers +%%---------------------------------------------------------------------------- + +process_obtains(State = #fhc_state { obtains = [] }) -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count }) + when Limit /= infinity andalso Count >= Limit -> + State; +process_obtains(State = #fhc_state { limit = Limit, count = Count, + obtains = Obtains }) -> + ObtainsLen = length(Obtains), + ObtainableLen = lists:min([ObtainsLen, Limit - Count]), + Take = ObtainsLen - ObtainableLen, + {ObtainsNew, ObtainableRev} = lists:split(Take, Obtains), + [gen_server:reply(From, ok) || From <- ObtainableRev], + State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. + +maybe_reduce(State = #fhc_state { limit = Limit, count = Count, elders = Elders, + callbacks = Callbacks, timer_ref = TRef }) + when Limit /= infinity andalso Count >= Limit -> + Now = now(), + {Pids, Sum, ClientCount} = + dict:fold(fun (_Pid, undefined, Accs) -> + Accs; + (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> + {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end, {[], 0, 0}, Elders), + case Pids of + [] -> ok; + _ -> AverageAge = Sum / ClientCount, + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> ok; + {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) + end + end, Pids) + end, + case TRef of + undefined -> {ok, TRef1} = timer:apply_after( + ?FILE_HANDLES_CHECK_INTERVAL, + gen_server, cast, [?SERVER, check_counts]), + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end; +maybe_reduce(State) -> + State. + +%% Googling around suggests that Windows has a limit somewhere around +%% 16M, eg +%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx +%% For everything else, assume ulimit exists. Further googling +%% suggests that BSDs (incl OS X), solaris and linux all agree that +%% ulimit -n is file handles +ulimit() -> + case os:type() of + {win32, _OsName} -> + ?FILE_HANDLES_LIMIT_WINDOWS; + {unix, _OsName} -> + %% Under Linux, Solaris and FreeBSD, ulimit is a shell + %% builtin, not a command. In OS X, it's a command. + %% Fortunately, os:cmd invokes the cmd in a shell env, so + %% we're safe in all cases. + case os:cmd("ulimit -n") of + "unlimited" -> + infinity; + String = [C|_] when $0 =< C andalso C =< $9 -> + Num = list_to_integer( + lists:takewhile( + fun (D) -> $0 =< D andalso D =< $9 end, String)) - + ?RESERVED_FOR_OTHERS, + lists:max([1, Num]); + _ -> + %% probably a variant of + %% "/bin/sh: line 1: ulimit: command not found\n" + ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + end; + _ -> + ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + end. + +ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> + case dict:find(Pid, ClientMRefs) of + {ok, _MRef} -> State; + error -> MRef = erlang:monitor(process, Pid), + State #fhc_state { + client_mrefs = dict:store(Pid, MRef, ClientMRefs) } + end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index c33582e3..5b899cdb 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -443,7 +443,7 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> name({local,Name}) -> Name; name({global,Name}) -> Name; %% name(Pid) when is_pid(Pid) -> Pid; -%% when R11 goes away, drop the line beneath and uncomment the line above +%% when R12 goes away, drop the line beneath and uncomment the line above name(Name) -> Name. unregister_name({local,Name}) -> @@ -607,9 +607,9 @@ process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, Msg) -> case Msg of {system, From, Req} -> - sys:handle_system_msg - (Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, TimeoutState, Queue]); + sys:handle_system_msg( + Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, TimeoutState, Queue]); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. diff --git a/src/rabbit.erl b/src/rabbit.erl index b1204997..c389178a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -47,12 +47,19 @@ [{description, "codec correctness check"}, {mfa, {rabbit_binary_generator, check_empty_content_body_frame_size, - []}}]}). + []}}, + {enables, external_infrastructure}]}). -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, {enables, external_infrastructure}]}). +-rabbit_boot_step({file_handle_cache, + [{description, "file handle cache server"}, + {mfa, {rabbit_sup, start_restartable_child, + [file_handle_cache]}}, + {enables, worker_pool}]}). + -rabbit_boot_step({worker_pool, [{description, "worker pool"}, {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, @@ -65,21 +72,21 @@ [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type_registry]}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_log]}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}, - {requires, external_infrastructure}]}). + {requires, external_infrastructure}, + {enables, kernel_ready}]}). -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}, @@ -91,16 +98,24 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_amqqueue_sup, - [{description, "queue supervisor"}, - {mfa, {rabbit_amqqueue, start, []}}, - {requires, kernel_ready}, +-rabbit_boot_step({rabbit_memory_monitor, + [{description, "memory monitor"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_memory_monitor]}}, + {requires, rabbit_alarm}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_router, - [{description, "cluster router"}, +-rabbit_boot_step({guid_generator, + [{description, "guid generator"}, {mfa, {rabbit_sup, start_restartable_child, - [rabbit_router]}}, + [rabbit_guid]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + +-rabbit_boot_step({delegate_sup, + [{description, "cluster delegate"}, + {mfa, {rabbit_sup, start_child, + [delegate_sup]}}, {requires, kernel_ready}, {enables, core_initialized}]}). @@ -109,45 +124,39 @@ {mfa, {rabbit_sup, start_restartable_child, [rabbit_node_monitor]}}, {requires, kernel_ready}, - {requires, rabbit_amqqueue_sup}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, - [{description, "core initialized"}]}). + [{description, "core initialized"}, + {requires, kernel_ready}]}). -rabbit_boot_step({empty_db_check, [{description, "empty DB check"}, {mfa, {?MODULE, maybe_insert_default_data, []}}, - {requires, core_initialized}]}). + {requires, core_initialized}, + {enables, routing_ready}]}). -rabbit_boot_step({exchange_recovery, [{description, "exchange recovery"}, {mfa, {rabbit_exchange, recover, []}}, - {requires, empty_db_check}]}). - --rabbit_boot_step({queue_recovery, - [{description, "queue recovery"}, - {mfa, {rabbit_amqqueue, recover, []}}, - {requires, exchange_recovery}]}). - --rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {requires, queue_recovery}]}). + {requires, empty_db_check}, + {enables, routing_ready}]}). --rabbit_boot_step({guid_generator, - [{description, "guid generator"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_guid]}}, - {requires, persister}, +-rabbit_boot_step({queue_sup_queue_recovery, + [{description, "queue supervisor and queue recovery"}, + {mfa, {rabbit_amqqueue, start, []}}, + {requires, empty_db_check}, {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, - [{description, "message delivery logic ready"}]}). + [{description, "message delivery logic ready"}, + {requires, core_initialized}]}). -rabbit_boot_step({log_relay, [{description, "error log relay"}, {mfa, {rabbit_error_logger, boot, []}}, - {requires, routing_ready}]}). + {requires, routing_ready}, + {enables, networking}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, @@ -232,14 +241,18 @@ rotate_logs(BinarySuffix) -> %%-------------------------------------------------------------------- start(normal, []) -> - {ok, SupPid} = rabbit_sup:start_link(), + case erts_version_check() of + ok -> + {ok, SupPid} = rabbit_sup:start_link(), - print_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], - io:format("~nbroker running~n"), - - {ok, SupPid}. + print_banner(), + [ok = run_boot_step(Step) || Step <- boot_steps()], + io:format("~nbroker running~n"), + {ok, SupPid}; + Error -> + Error + end. stop(_State) -> terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), @@ -252,6 +265,14 @@ stop(_State) -> %%--------------------------------------------------------------------------- +erts_version_check() -> + FoundVer = erlang:system_info(version), + case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of + true -> ok; + false -> {error, {erlang_version_too_old, + {found, FoundVer}, {required, ?ERTS_MINIMUM}}} + end. + boot_error(Format, Args) -> io:format("BOOT ERROR: " ++ Format, Args), error_logger:error_msg(Format, Args), @@ -278,6 +299,18 @@ run_boot_step({StepName, Attributes}) -> ok end. +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + boot_steps() -> AllApps = [App || {App, _, _} <- application:loaded_applications()], Modules = lists:usort( @@ -289,7 +322,7 @@ boot_steps() -> lists:flatmap(fun (Module) -> [{StepName, Attributes} || {rabbit_boot_step, [{StepName, Attributes}]} - <- Module:module_info(attributes)] + <- module_attributes(Module)] end, Modules), sort_boot_steps(UnsortedSteps). @@ -401,8 +434,9 @@ print_banner() -> {"cookie hash", rabbit_misc:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}], - DescrLen = lists:max([length(K) || {K, _V} <- Settings]), + {"database dir", rabbit_mnesia:dir()}, + {"erlang version", erlang:system_info(version)}], + DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), io:nl(). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ceec00fd..483b5a93 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,17 +31,19 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1]). +-export([start/0, declare/5, delete/3, purge/1]). +-export([internal_declare/2, internal_delete/1, + maybe_run_queue_via_backing_queue/2, + update_ram_duration/1, set_ram_duration_target/2, + set_maximum_since_use/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). --export([claim_queue/2]). --export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -63,9 +65,8 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> - amqqueue()). +-spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), + maybe(pid())) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -89,27 +90,28 @@ {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). -spec(deliver/2 :: (pid(), delivery()) -> boolean()). --spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). --spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). --spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), + {'ok', non_neg_integer(), qmsg()} | 'empty'). +-spec(basic_consume/7 :: + (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> - 'ok' | {'error', 'queue_owned_by_another_connection' | - 'exclusive_consume_unavailable'}). + 'ok' | {'error', 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). +-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). +-spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(update_ram_duration/1 :: (pid()) -> 'ok'). +-spec(set_ram_duration_target/2 :: (pid(), number()) -> 'ok'). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -118,80 +120,66 @@ %%---------------------------------------------------------------------------- start() -> + DurableQueues = find_durable_queues(), + {ok, BQ} = application:get_env(backing_queue_module), + ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + _RealDurableQueues = recover_durable_queues(DurableQueues), ok. -recover() -> - ok = recover_durable_queues(), - ok. - -recover_durable_queues() -> +find_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) - end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. - -declare(QueueName, Durable, AutoDelete, Args) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end). + +recover_durable_queues(DurableQueues) -> + Qs = [start_queue_process(Q) || Q <- DurableQueues], + [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. + +declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, + exclusive_owner = Owner, pid = none}), - internal_declare(Q, true). - -internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> - case mnesia:read( - {rabbit_durable_queue, QueueName}) of - [] -> ok = store_queue(Q), - case WantDefaultBinding of - true -> add_default_binding(Q); - false -> ok - end, - Q; - [_] -> not_found %% existing Q on stopped node - end; - [ExistingQ] -> - ExistingQ - end - end) of - not_found -> exit(Q#amqqueue.pid, shutdown), - rabbit_misc:not_found(QueueName); - Q -> Q; - ExistingQ -> exit(Q#amqqueue.pid, shutdown), - ExistingQ + case gen_server2:call(Q#amqqueue.pid, {init, false}) of + not_found -> rabbit_misc:not_found(QueueName); + Q1 -> Q1 end. +internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case Recover of + true -> + ok = store_queue(Q), + Q; + false -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> + case mnesia:read({rabbit_durable_queue, + QueueName}) of + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), + Q; + [_] -> not_found %% Q exists on stopped node + end; + [ExistingQ] -> + ExistingQ + end + end + end). + store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q, write), ok = mnesia:write(rabbit_queue, Q, write), @@ -201,7 +189,7 @@ store_queue(Q = #amqqueue{durable = false}) -> ok. start_queue_process(Q) -> - {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), + {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -234,10 +222,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info, infinity). + delegate_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of + case delegate_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -247,7 +235,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, consumers, infinity). + delegate_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -256,15 +244,15 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -278,29 +266,24 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. -redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). - requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + delegate_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). -commit_all(QPids, Txn) -> - safe_pmap_ok( +commit_all(QPids, Txn, ChPid) -> + safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). -rollback_all(QPids, Txn) -> - safe_pmap_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, - QPids). +rollback_all(QPids, Txn, ChPid) -> + delegate:invoke_no_result( + QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end). notify_down_all(QPids, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, @@ -308,38 +291,41 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, LimiterPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, - QPids). - -claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). + delegate:invoke_no_result( + QPids, fun (QPid) -> + gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) + end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - infinity). + delegate_call(QPid, {basic_consume, NoAck, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). + delegate_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {unblock, ChPid}). + delegate_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, - QPids). + delegate:invoke_no_result( + QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). + +internal_delete1(QueueName) -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName). internal_delete(QueueName) -> case @@ -347,13 +333,7 @@ internal_delete(QueueName) -> fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [_] -> - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - %% we want to execute some things, as - %% decided by rabbit_exchange, after the - %% transaction. - rabbit_exchange:delete_queue_bindings(QueueName) + [_] -> internal_delete1(QueueName) end end) of Err = {error, _} -> Err; @@ -362,6 +342,19 @@ internal_delete(QueueName) -> ok end. +maybe_run_queue_via_backing_queue(QPid, Fun) -> + gen_server2:pcall(QPid, 7, {maybe_run_queue_via_backing_queue, Fun}, + infinity). + +update_ram_duration(QPid) -> + gen_server2:pcast(QPid, 8, update_ram_duration). + +set_ram_duration_target(QPid, Duration) -> + gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + +set_maximum_since_use(QPid, Age) -> + gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + on_node_down(Node) -> [Hook() || Hook <- rabbit_misc:execute_mnesia_transaction( @@ -385,17 +378,28 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_pmap_ok(H, F, L) -> - case [R || R <- rabbit_misc:upmap( - fun (V) -> - try - rabbit_misc:with_exit_handler( - fun () -> H(V) end, - fun () -> F(V) end) - catch Class:Reason -> {Class, Reason} - end - end, L), - R =/= ok] of - [] -> ok; - Errors -> {error, Errors} +safe_delegate_call_ok(H, F, Pids) -> + {_, Bad} = delegate:invoke(Pids, + fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> H(Pid) end, + fun () -> F(Pid) end) + end), + case Bad of + [] -> ok; + _ -> {error, Bad} end. + +delegate_call(Pid, Msg, Timeout) -> + delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end). + +delegate_pcall(Pid, Pri, Msg, Timeout) -> + delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). + +delegate_cast(Pid, Msg) -> + delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + +delegate_pcast(Pid, Pri, Msg) -> + delegate:invoke_no_result(Pid, + fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 19cb5c71..8bd6e68b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -35,13 +35,14 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). +-define(UNSENT_MESSAGE_LIMIT, 100). +-define(SYNC_INTERVAL, 5). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -export([start_link/1, info_keys/0]). --export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, + handle_info/2, handle_pre_hibernate/1]). -import(queue). -import(erlang). @@ -49,24 +50,24 @@ % Queue's state -record(q, {q, - owner, exclusive_consumer, has_had_consumers, - next_msg_id, - message_buffer, + backing_queue, + backing_queue_state, active_consumers, - blocked_consumers}). + blocked_consumers, + sync_timer_ref, + rate_timer_ref + }). -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). - %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, limiter_pid, monitor_ref, - unacked_messages, + acktags, is_limit_active, txn, unsent_message_count}). @@ -82,49 +83,127 @@ exclusive_consumer_tag, messages_ready, messages_unacknowledged, - messages_uncommitted, messages, - acks_uncommitted, consumers, - transactions, - memory]). + memory, + backing_queue_status + ]). %%---------------------------------------------------------------------------- start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). info_keys() -> ?INFO_KEYS. - + %%---------------------------------------------------------------------------- init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - {ok, #q{q = Q, - owner = none, + process_flag(trap_exit, true), + {ok, BQ} = application:get_env(backing_queue_module), + + {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - next_msg_id = 1, - message_buffer = queue:new(), + backing_queue = BQ, + backing_queue_state = undefined, active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + blocked_consumers = queue:new(), + sync_timer_ref = undefined, + rate_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, State) -> +terminate(shutdown, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); +terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> + terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); +terminate(_Reason, State = #q{backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? - QName = qname(State), - lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, - all_tx()), - ok = purge_message_buffer(QName, State#q.message_buffer), - ok = rabbit_amqqueue:internal_delete(QName). + terminate_shutdown(fun (BQS) -> + BQS1 = BQ:delete_and_terminate(BQS), + %% don't care if the internal delete + %% doesn't return 'ok'. + rabbit_amqqueue:internal_delete(qname(State)), + BQS1 + end, State). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +terminate_shutdown(Fun, State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + stop_sync_timer(stop_rate_timer(State)), + case BQS of + undefined -> State; + _ -> ok = rabbit_memory_monitor:deregister(self()), + BQS1 = lists:foldl( + fun (#cr{txn = none}, BQSN) -> + BQSN; + (#cr{txn = Txn}, BQSN) -> + {_AckTags, BQSN1} = + BQ:tx_rollback(Txn, BQSN), + BQSN1 + end, BQS, all_ch_record()), + State1#q{backing_queue_state = Fun(BQS1)} + end. -noreply(NewState) -> {noreply, NewState, hibernate}. +reply(Reply, NewState) -> + assert_invariant(NewState), + {NewState1, Timeout} = next_state(NewState), + {reply, Reply, NewState1, Timeout}. + +noreply(NewState) -> + assert_invariant(NewState), + {NewState1, Timeout} = next_state(NewState), + {noreply, NewState1, Timeout}. + +next_state(State) -> + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + ensure_rate_timer(State), + case BQ:needs_sync(BQS)of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} + end. + +ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, + rabbit_amqqueue, maybe_run_queue_via_backing_queue, + [self(), fun (BQS) -> BQ:sync(BQS) end]), + State#q{sync_timer_ref = TRef}; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> + State; +stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{sync_timer_ref = undefined}. + +ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State#q{rate_timer_ref = TRef}; +ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> + State; +stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> + State#q{rate_timer_ref = undefined}; +stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{rate_timer_ref = undefined}. + +assert_invariant(#q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS}) -> + true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -140,7 +219,7 @@ ch_record(ChPid) -> C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, - unacked_messages = dict:new(), + acktags = sets:new(), is_limit_active = false, txn = none, unsent_message_count = 0}, @@ -171,29 +250,33 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_immediately(Message, IsDelivered, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> +deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, + State = #q{q = #amqqueue{name = QName}, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + acktags = ChAckTags} = ch_record(ChPid), + IsMsgReady = PredFun(FunAcc, State), + case (IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of true -> + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, IsDelivered, Message}), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> sets:add_element( + AckTag, ChAckTags); + false -> ChAckTags + end, NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, + acktags = ChAckTags1}, store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of @@ -207,88 +290,85 @@ deliver_immediately(Message, IsDelivered, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers, - next_msg_id = NextId + 1}}; - false -> + State2 = State1#q{ + active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}, + deliver_msgs_to_consumers(Funs, FunAcc1, State2); + %% if IsMsgReady then we've hit the limiter + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, BlockedConsumers), - deliver_immediately( - Message, IsDelivered, + deliver_msgs_to_consumers( + Funs, FunAcc, State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) - end; - {empty, _} -> - {not_offered, State} - end. - -run_message_queue(State = #q{message_buffer = MessageBuffer}) -> - run_message_queue(MessageBuffer, State). - -run_message_queue(MessageBuffer, State) -> - case queue:out(MessageBuffer) of - {{value, {Message, IsDelivered}}, BufferTail} -> - case deliver_immediately(Message, IsDelivered, State) of - {offered, true, NewState} -> - persist_delivery(qname(State), Message, IsDelivered), - run_message_queue(BufferTail, NewState); - {offered, false, NewState} -> - persist_auto_ack(qname(State), Message), - run_message_queue(BufferTail, NewState); - {not_offered, NewState} -> - NewState#q{message_buffer = MessageBuffer} + blocked_consumers = NewBlockedConsumers}); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc, State} end; {empty, _} -> - State#q{message_buffer = MessageBuffer} + {FunAcc, State} end. -attempt_delivery(none, _ChPid, Message, State) -> - case deliver_immediately(Message, false, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - persist_message(none, qname(State), Message), - persist_delivery(qname(State), Message, false), - {true, State1}; - {not_offered, State1} -> - {false, State1} - end; -attempt_delivery(Txn, ChPid, Message, State) -> - persist_message(Txn, qname(State), Message), - record_pending_message(Txn, ChPid, Message), - {true, State}. +deliver_from_queue_pred(IsEmpty, _State) -> + not IsEmpty. + +deliver_from_queue_deliver(AckRequired, false, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + {{Message, IsDelivered, AckTag}, 0 == Remaining, + State #q { backing_queue_state = BQS1 }}. + +run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Funs = {fun deliver_from_queue_pred/2, + fun deliver_from_queue_deliver/3}, + IsEmpty = BQ:is_empty(BQS), + {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), + State1. + +attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Message, BQS), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS1}} + end, + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + record_current_channel_tx(ChPid, Txn), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State) -> +deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> - persist_message(Txn, qname(State), Message), - NewMB = queue:in({Message, false}, NewState#q.message_buffer), - {false, NewState#q{message_buffer = NewMB}} + %% Txn is none and no unblocked channels with consumers + BQS = BQ:publish(Message, State #q.backing_queue_state), + {false, NewState#q{backing_queue_state = BQS}} end. -deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> - run_message_queue(queue:join(MessageBuffer, queue:from_list(Messages)), - State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter( - fun ({CP, #consumer{tag = CT}}) -> - (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(Queue))). + queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= ConsumerTag) + end, Queue). remove_consumers(ChPid, Queue) -> - %% TODO: replace this with queue:filter/2 once we move to R12 - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue))). + queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). move_consumers(ChPid, From, To) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, @@ -323,7 +403,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - unacked_messages = UAM} -> + acktags = ChAckTags} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), State1 = State#q{ @@ -337,15 +417,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> case Txn of - none -> ok; - _ -> ok = rollback_work(Txn, qname(State1)), - erase_tx(Txn) - end, - {ok, deliver_or_enqueue_n( - [{Message, true} || - {_MsgId, Message} <- dict:to_list(UAM)], - State1)} + false -> State2 = case Txn of + none -> State1; + _ -> rollback_transaction(Txn, ChPid, + State1) + end, + {ok, requeue_and_run(sets:to_list(ChAckTags), State2)} end end. @@ -354,10 +431,6 @@ cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> cancel_holder(_ChPid, _ConsumerTag, Holder) -> Holder. -check_queue_owner(none, _) -> ok; -check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; -check_queue_owner({_, _}, _) -> mismatch. - check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; check_exclusive_access(none, false, _State) -> @@ -376,134 +449,30 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> - ok; -persist_message(Txn, QName, Message) -> - M = Message#basic_message{ - %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore - content = rabbit_binary_parser:clear_decoded_content( - Message#basic_message.content)}, - persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). - -persist_delivery(_QName, _Message, - true) -> - ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, - _IsDelivered) -> - ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, - _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). - -persist_acks(Txn, QName, Messages) -> - persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). - -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> - ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> - %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). - -commit_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName). - -rollback_work(Txn, QName) -> - do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName). - -%% optimisation: don't do unnecessary work -%% it would be nice if this was handled by the persister -do_if_persistent(F, Txn, QName) -> - case is_tx_persistent(Txn) of - false -> ok; - true -> ok = F({Txn, QName}) - end. - -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx{ch_pid = none, - is_persistent = false, - pending_messages = [], - pending_acks = []}; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -all_tx_record() -> - [T || {{txn, _}, T} <- get()]. - -all_tx() -> - [Txn || {{txn, Txn}, _} <- get()]. - -mark_tx_persistent(Txn) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{is_persistent = true}). - -is_tx_persistent(Txn) -> - #tx{is_persistent = Res} = lookup_tx(Txn), - Res. - -record_pending_message(Txn, ChPid, Message) -> - Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). - -record_pending_acks(Txn, ChPid, MsgIds) -> - Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). - -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, - deliver_or_enqueue_n(lists:reverse(PendingMessages), State). - -collect_messages(MsgIds, UAM) -> - lists:mapfoldl( - fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, - UAM, MsgIds). - -purge_message_buffer(QName, MessageBuffer) -> - Messages = - [[Message || {Message, _IsDelivered} <- - queue:to_list(MessageBuffer)] | - lists:map( - fun (#cr{unacked_messages = UAM}) -> - [Message || {_MsgId, Message} <- dict:to_list(UAM)] - end, - all_ch_record())], - %% the simplest, though certainly not the most obvious or - %% efficient, way to purge messages from the persister is to - %% artifically ack them. - persist_acks(none, QName, lists:append(Messages)). +maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckTags, BQS1} = + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + %% ChPid must be known here because of the participant management + %% by the channel. + C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1}. + +rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on ack+txn then + %% we would add them back in here (would also require ChPid) + record_current_channel_tx(ChPid, none), + State#q{backing_queue_state = BQS1}. + +subtract_acks(A, B) when is_list(B) -> + lists:foldl(fun sets:del_element/2, A, B). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -513,10 +482,10 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; i(pid, _) -> self(); -i(owner_pid, #q{owner = none}) -> +i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; -i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) -> - ReaderPid; +i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> + ExclusiveOwner; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> @@ -525,33 +494,72 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) -> ConsumerTag; -i(messages_ready, #q{message_buffer = MessageBuffer}) -> - queue:len(MessageBuffer); +i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:len(BQS); i(messages_unacknowledged, _) -> - lists:sum([dict:size(UAM) || - #cr{unacked_messages = UAM} <- all_ch_record()]); -i(messages_uncommitted, _) -> - lists:sum([length(Pending) || - #tx{pending_messages = Pending} <- all_tx_record()]); + lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); -i(acks_uncommitted, _) -> - lists:sum([length(Pending) || - #tx{pending_acks = Pending} <- all_tx_record()]); + messages_unacknowledged]]); i(consumers, State) -> queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); -i(transactions, _) -> - length(all_tx_record()); i(memory, _) -> {memory, M} = process_info(self(), memory), M; +i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> + BQ:status(BQS); i(Item, _) -> throw({bad_argument, Item}). %--------------------------------------------------------------------------- +handle_call({init, Recover}, From, + State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable, + exclusive_owner = ExclusiveOwner}, + backing_queue = BQ, backing_queue_state = undefined}) -> + Declare = + fun() -> + case rabbit_amqqueue:internal_declare(Q, Recover) of + not_found -> + {stop, normal, not_found, State}; + Q -> + gen_server2:reply(From, Q), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, + [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + noreply( + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}); + Q1 -> + {stop, normal, Q1, State} + end + end, + + case ExclusiveOwner of + none -> + Declare(); + Owner -> + case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of + true -> + erlang:monitor(process, Owner), + Declare(); + _ -> + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", + [QName]) + end, + %% Rely on terminate to delete the queue. + {stop, normal, not_found, + State#q{backing_queue_state = + BQ:init(QName, IsDurable, Recover)}} + end + end; + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -592,13 +600,9 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({commit, Txn}, From, State) -> - ok = commit_work(Txn, qname(State)), - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), - erase_tx(Txn), - noreply(NewState); +handle_call({commit, Txn, ChPid}, From, State) -> + NewState = commit_transaction(Txn, From, ChPid, State), + noreply(run_message_queue(NewState)); handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -613,73 +617,59 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, - next_msg_id = NextId, - message_buffer = MessageBuffer}) -> - case queue:out(MessageBuffer) of - {{value, {Message, IsDelivered}}, BufferTail} -> - AckRequired = not(NoAck), + backing_queue_state = BQS, backing_queue = BQ}) -> + AckRequired = not NoAck, + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of - true -> - persist_delivery(QName, Message, IsDelivered), - C = #cr{unacked_messages = UAM} = ch_record(ChPid), - NewUAM = dict:store(NextId, Message, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}); - false -> - persist_auto_ack(QName, Message) + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( + C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + false -> ok end, - Msg = {QName, self(), NextId, IsDelivered, Message}, - reply({ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, - next_msg_id = NextId + 1}); - {empty, _} -> - reply(empty, State) + Msg = {QName, self(), AckTag, IsDelivered, Message}, + reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder}) -> - case check_queue_owner(Owner, ReaderPid) of - mismatch -> - reply({error, queue_owned_by_another_connection}, State); + _From, State = #q{exclusive_consumer = ExistingHolder}) -> + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of + in_use -> + reply({error, exclusive_consume_unavailable}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume, - State) of - in_use -> - reply({error, exclusive_consume_unavailable}, State); - ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, - ack_required = not(NoAck)}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), - case ConsumerCount of - 0 -> ok = rabbit_limiter:register(LimiterPid, self()); - _ -> ok - end, - ExclusiveConsumer = case ExclusiveConsume of - true -> {ChPid, ConsumerTag}; - false -> ExistingHolder - end, - State1 = State#q{has_had_consumers = true, - exclusive_consumer = ExclusiveConsumer}, - ok = maybe_send_reply(ChPid, OkMsg), - State2 = - case is_ch_blocked(C) of - true -> State1#q{ - blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_message_queue( - State1#q{ - active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) - end, - reply(ok, State2) - end + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not NoAck}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), + ok = case ConsumerCount of + 0 -> rabbit_limiter:register(LimiterPid, self()); + _ -> ok + end, + ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + State1 = State#q{has_had_consumers = true, + exclusive_consumer = ExclusiveConsumer}, + ok = maybe_send_reply(ChPid, OkMsg), + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_message_queue( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -712,14 +702,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - message_buffer = MessageBuffer, + backing_queue = BQ, + backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - Length = queue:len(MessageBuffer), - reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); + reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q{message_buffer = MessageBuffer}) -> - IsEmpty = queue:is_empty(MessageBuffer), + State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> + IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> @@ -727,77 +717,51 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, queue:len(MessageBuffer)}, State} + {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> - ok = purge_message_buffer(qname(State), MessageBuffer), - reply({ok, queue:len(MessageBuffer)}, - State#q{message_buffer = queue:new()}); +handle_call(purge, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Count, BQS1} = BQ:purge(BQS), + reply({ok, Count}, State#q{backing_queue_state = BQS1}); -handle_call({claim_queue, ReaderPid}, _From, - State = #q{owner = Owner, exclusive_consumer = Holder}) -> - case Owner of - none -> - case check_exclusive_access(Holder, true, State) of - in_use -> - %% FIXME: Is this really the right answer? What if - %% an active consumer's reader is actually the - %% claiming pid? Should that be allowed? In order - %% to check, we'd need to hold not just the ch - %% pid for each consumer, but also its reader - %% pid... - reply(locked, State); - ok -> - MonitorRef = erlang:monitor(process, ReaderPid), - reply(ok, State#q{owner = {ReaderPid, MonitorRef}}) - end; - {ReaderPid, _MonitorRef} -> - reply(ok, State); - _ -> - reply(locked, State) - end. +handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); -handle_cast({ack, Txn, MsgIds, ChPid}, State) -> +handle_cast({ack, Txn, AckTags, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM} -> - {Acked, Remaining} = collect_messages(MsgIds, UAM), - persist_acks(Txn, qname(State), Acked), - case Txn of - none -> - store_ch_record(C#cr{unacked_messages = Remaining}); - _ -> - record_pending_acks(Txn, ChPid, MsgIds) - end, - noreply(State) + C = #cr{acktags = ChAckTags} -> + {C1, BQS1} = + case Txn of + none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), + {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; + _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + end, + store_ch_record(C1), + noreply(State #q { backing_queue_state = BQS1 }) end; -handle_cast({rollback, Txn}, State) -> - ok = rollback_work(Txn, qname(State)), - erase_tx(Txn), - noreply(State); - -handle_cast({redeliver, Messages}, State) -> - noreply(deliver_or_enqueue_n(Messages, State)); +handle_cast({rollback, Txn, ChPid}, State) -> + noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, MsgIds, ChPid}, State) -> +handle_cast({requeue, AckTags, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), noreply(State); - C = #cr{unacked_messages = UAM} -> - {Messages, NewUAM} = collect_messages(MsgIds, UAM), - store_ch_record(C#cr{unacked_messages = NewUAM}), - noreply(deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)) + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) end; handle_cast({unblock, ChPid}, State) -> @@ -830,27 +794,59 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), + noreply(State); + +handle_cast(update_ram_duration, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State#q{rate_timer_ref = just_measured, + backing_queue_state = BQS2}); + +handle_cast({set_ram_duration_target, Duration}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State#q{backing_queue_state = BQS1}); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), noreply(State). -handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, - State = #q{owner = {DownPid, MonitorRef}}) -> - %% We know here that there are no consumers on this queue that are - %% owned by other pids than the one that just went down, so since - %% exclusive in some sense implies autodelete, we delete the queue - %% here. The other way of implementing the "exclusive implies - %% autodelete" feature is to actually set autodelete when an - %% exclusive declaration is seen, but this has the problem that - %% the python tests rely on the queue not going away after a - %% basic.cancel when the queue was declared exclusive and - %% nonautodelete. - NewState = State#q{owner = none}, - {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> + %% Exclusively owned queues must disappear with their owner. In + %% the case of clean shutdown we delete the queue synchronously in + %% the reader - although not required by the spec this seems to + %% match what people expect (see bug 21824). However we need this + %% monitor-and-async- delete in case the connection goes away + %% unexpectedly. + {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, NewState} -> noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end; +handle_info(timeout, State = #q{backing_queue = BQ}) -> + noreply(maybe_run_queue_via_backing_queue( + fun (BQS) -> BQ:sync(BQS) end, State)); + +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. + +handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> + {hibernate, State}; +handle_pre_hibernate(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:handle_pre_hibernate(BQS), + %% no activity for a while == 0 egress and ingress rates + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), infinity), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + {hibernate, stop_rate_timer(State#q{backing_queue_state = BQS2})}. diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0f3a8664..97d6cef9 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -31,18 +31,23 @@ -module(rabbit_amqqueue_sup). --behaviour(supervisor). +-behaviour(supervisor2). --export([start_link/0]). +-export([start_link/0, start_child/1]). -export([init/1]). +-include("rabbit.hrl"). + -define(SERVER, ?MODULE). start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_child(Args) -> + supervisor2:start_child(?SERVER, Args). init([]) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, - temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}. + temporary, ?MAX_WAIT, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl new file mode 100644 index 00000000..2dba00ad --- /dev/null +++ b/src/rabbit_backing_queue.erl @@ -0,0 +1,133 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_backing_queue). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% Called on startup with a list of durable queue names. The + %% queues aren't being started at this point, but this call + %% allows the backing queue to perform any checking necessary for + %% the consistency of those queues, or initialise any other + %% shared resources. + {start, 1}, + + %% Initialise the backing queue and its state. + {init, 3}, + + %% Called on queue shutdown when queue isn't being deleted. + {terminate, 1}, + + %% Called when the queue is terminating and needs to delete all + %% its content. + {delete_and_terminate, 1}, + + %% Remove all messages in the queue, but not messages which have + %% been fetched and are pending acks. + {purge, 1}, + + %% Publish a message. + {publish, 2}, + + %% Called for messages which have already been passed straight + %% out to a client. The queue will be empty for these calls + %% (i.e. saves the round trip through the backing queue). + {publish_delivered, 3}, + + %% Produce the next message. + {fetch, 2}, + + %% Acktags supplied are for messages which can now be forgotten + %% about. + {ack, 2}, + + %% A publish, but in the context of a transaction. + {tx_publish, 3}, + + %% Acks, but in the context of a transaction. + {tx_ack, 3}, + + %% Undo anything which has been done in the context of the + %% specified transaction. + {tx_rollback, 2}, + + %% Commit a transaction. The Fun passed in must be called once + %% the messages have really been commited. This CPS permits the + %% possibility of commit coalescing. + {tx_commit, 3}, + + %% Reinsert messages into the queue which have already been + %% delivered and were pending acknowledgement. + {requeue, 2}, + + %% How long is my queue? + {len, 1}, + + %% Is my queue empty? + {is_empty, 1}, + + %% For the next three functions, the assumption is that you're + %% monitoring something like the ingress and egress rates of the + %% queue. The RAM duration is thus the length of time represented + %% by the messages held in RAM given the current rates. If you + %% want to ignore all of this stuff, then do so, and return 0 in + %% ram_duration/1. + + %% The target is to have no more messages in RAM than indicated + %% by the duration and the current queue rates. + {set_ram_duration_target, 2}, + + %% Optionally recalculate the duration internally (likely to be + %% just update your internal rates), and report how many seconds + %% the messages in RAM represent given the current rates of the + %% queue. + {ram_duration, 1}, + + %% Should 'sync' be called as soon as the queue process can + %% manage (either on an empty mailbox, or when a timer fires)? + {needs_sync, 1}, + + %% Called (eventually) after needs_sync returns 'true'. Note this + %% may be called more than once for each 'true' returned from + %% needs_sync. + {sync, 1}, + + %% Called immediately before the queue hibernates. + {handle_pre_hibernate, 1}, + + %% Exists for debugging purposes, to be able to expose state via + %% rabbitmqctl list_queues backing_queue_status + {status, 1} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72..4ab7a2a0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -36,6 +36,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,7 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -57,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -93,10 +96,17 @@ from_content(Content) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - persistent_key = none}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 1d47d764..27a1275a 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -95,33 +95,37 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin) maybe_encode_properties(ContentProperties, none) -> rabbit_framing:encode_properties(ContentProperties). -build_content_frames(FragmentsRev, FrameMax, ChannelInt) -> - BodyPayloadMax = if - FrameMax == 0 -> - none; - true -> +build_content_frames(FragsRev, FrameMax, ChannelInt) -> + BodyPayloadMax = if FrameMax == 0 -> + iolist_size(FragsRev); + true -> FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE end, - build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt). - -build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) -> - {SizeAcc, FragmentAcc}; -build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], - BodyPayloadMax, ChannelInt) - when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) -> - <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment, - build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev], - BodyPayloadMax, ChannelInt); -build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev], - BodyPayloadMax, ChannelInt) -> - build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt); -build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], - BodyPayloadMax, ChannelInt) -> - build_content_frames(SizeAcc + size(Fragment), - [create_frame(3, ChannelInt, Fragment) | FragmentAcc], - FragmentsRev, - BodyPayloadMax, - ChannelInt). + build_content_frames(0, [], BodyPayloadMax, [], + lists:reverse(FragsRev), BodyPayloadMax, ChannelInt). + +build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [], + [], _BodyPayloadMax, _ChannelInt) -> + {SizeAcc, lists:reverse(FramesAcc)}; +build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, + Frags, BodyPayloadMax, ChannelInt) + when FragSizeRem == 0 orelse Frags == [] -> + Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)), + FrameSize = BodyPayloadMax - FragSizeRem, + build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc], + BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt); +build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, + [Frag | Frags], BodyPayloadMax, ChannelInt) -> + Size = size(Frag), + {NewFragSizeRem, NewFragAcc, NewFrags} = + if Size == 0 -> {FragSizeRem, FragAcc, Frags}; + Size =< FragSizeRem -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; + true -> <<Head:FragSizeRem/binary, Tail/binary>> = + Frag, + {0, [Head | FragAcc], [Tail | Frags]} + end, + build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc, + NewFrags, BodyPayloadMax, ChannelInt). build_heartbeat_frame() -> create_frame(?FRAME_HEARTBEAT, 0, <<>>). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3597fcd7..50cb5f20 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -46,10 +46,7 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking}). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). + consumer_mapping, blocking, queue_collector_pid}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -69,13 +66,13 @@ -ifdef(use_specs). --spec(start_link/5 :: - (channel_number(), pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/6 :: + (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). @@ -89,10 +86,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> {ok, Pid} = gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost], []), + Username, VHost, CollectorPid], []), Pid. do(Pid, Method) -> @@ -138,10 +135,9 @@ info_all(Items) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, @@ -157,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), - blocking = dict:new()}, + blocking = dict:new(), + queue_collector_pid = CollectorPid}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -302,6 +299,26 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). +with_exclusive_access_or_die(QName, ReaderPid, F) -> + case rabbit_amqqueue:with_or_die( + QName, fun(Q) -> case Q of + #amqqueue{exclusive_owner = none} -> + F(Q); + #amqqueue{exclusive_owner = ReaderPid} -> + F(Q); + _ -> + {error, wrong_exclusive_owner} + end + end) of + {error, wrong_exclusive_owner} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QName)]); + Else -> + Else + end. + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -356,6 +373,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> + rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {reply, #'channel.open_ok'{}, State#ch{state = running}}; handle_method(#'channel.open'{}, _, _State) -> @@ -386,14 +404,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, + IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -488,11 +504,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, self(), LimiterPid, + Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -502,14 +518,6 @@ handle_method(#'basic.consume'{queue = QueueNameBin, dict:store(ActualConsumerTag, QueueName, ConsumerMapping)}}; - {error, queue_owned_by_another_connection} -> - %% The spec is silent on which exception to use - %% here. This seems reasonable? - %% FIXME: check this - - rabbit_misc:protocol_error( - resource_locked, "~s owned by another connection", - [rabbit_misc:rs(QueueName)]); {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -679,34 +687,48 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, - _, State = #ch { virtual_host = VHostPath, - reader_pid = ReaderPid }) -> - %% FIXME: atomic create&claim + nowait = NoWait, + arguments = Args}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid, + queue_collector_pid = CollectorPid}) -> + Owner = case ExclusiveDeclare of + true -> ReaderPid; + false -> none + end, + %% We use this in both branches, because queue_declare may yet return an + %% existing queue. Finish = - fun (Q) -> - if ExclusiveDeclare -> - case rabbit_amqqueue:claim_queue(Q, ReaderPid) of - locked -> - %% AMQP 0-8 doesn't say which - %% exception to use, so we mimic QPid - %% here. - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]); - ok -> ok - end; - true -> - ok - end, - Q + fun(Q = #amqqueue{name = QueueName}) -> + case Q of + %% "equivalent" rule. NB: we don't pay attention to + %% anything in the arguments table, so for the sake of the + %% "equivalent" rule, all tables of arguments are + %% semantically equivalant. + #amqqueue{exclusive_owner = Owner} -> + check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue( + CollectorPid, Q) + end, + Q; + %% exclusivity trumps non-equivalence arbitrarily + #amqqueue{} -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]) + end end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), @@ -718,34 +740,32 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configure_permitted(QueueName, State), - Finish(rabbit_amqqueue:declare(QueueName, - Durable, AutoDelete, Args)); - Other = #amqqueue{name = QueueName} -> - check_configure_permitted(QueueName, State), + Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner)); + #amqqueue{} = Other -> Other end, return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.declare'{queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, - nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + nowait = NoWait}, + _, State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), + Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, - nowait = NoWait - }, - _, State) -> + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -778,11 +798,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State) -> + _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, + {ok, PurgedMessageCount} = with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -933,7 +953,7 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); @@ -949,13 +969,10 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey) of - ok -> NewUAMQ = queue:join(UAQ, UAMQ), - new_tx(State#ch{unacked_message_q = NewUAMQ}); - {error, Errors} -> rabbit_misc:protocol_error( - internal_error, "rollback failed: ~w", [Errors]) - end. + ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey, self()), + NewUAMQ = queue:join(UAQ, UAMQ), + new_tx(State#ch{unacked_message_q = NewUAMQ}). rollback_and_notify(State = #ch{transaction_id = none}) -> notify_queues(State); @@ -966,14 +983,11 @@ fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> - %% dict:append would be simpler and avoid the - %% lists:reverse in handle_message({recover, true}, - %% ...). However, it is significantly slower when - %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + %% dict:append would avoid the lists:reverse in + %% handle_message({recover, true}, ...). However, it + %% is significantly slower when going beyond a few + %% thousand elements. + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -1019,16 +1033,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 078cf620..f19e8d02 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -38,9 +38,9 @@ -ifdef(use_specs). --spec(create_basic_plt/1 :: (string()) -> 'ok'). --spec(add_to_plt/2 :: (string(), string()) -> 'ok'). --spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file_path()) -> 'ok'). +-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 1cfba00e..8f41392f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -341,16 +341,7 @@ delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). contains(Table, MatchHead) -> - try - continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - case mnesia:match_object(Table, MatchHead, read) of - [] -> false; - [_|_] -> true - end - end. + continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). continue('$end_of_table') -> false; continue({[_|_], _}) -> true; @@ -382,7 +373,7 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> - case mnesia:read(rabbit_route, B) of + case mnesia:read({rabbit_route, B}) of [] -> sync_binding(B, Q#amqqueue.durable, fun mnesia:write/3), diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2fa531a7..1ae8f7da 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -67,7 +67,7 @@ update_disk_serial() -> Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), Serial = case rabbit_misc:read_term_file(Filename) of {ok, [Num]} -> Num; - {error, enoent} -> rabbit_persister:serial(); + {error, enoent} -> 0; {error, Reason} -> throw({error, {cannot_read_serial_file, Filename, Reason}}) end, @@ -78,7 +78,7 @@ update_disk_serial() -> end, Serial. -%% generate a guid that is monotonically increasing per process. +%% generate a GUID. %% %% The id is only unique within a single cluster and as long as the %% serial store hasn't been deleted. @@ -92,20 +92,18 @@ guid() -> %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give - %% us a GUID that is monotonically increasing per process. + %% us a GUID. G = case get(guid) of undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, 0}; {S, I} -> {S, I+1} end, put(guid, G), - G. + erlang:md5(term_to_binary(G)). -%% generate a readable string representation of a guid. Note that any -%% monotonicity of the guid is not preserved in the encoding. +%% generate a readable string representation of a GUID. string_guid(Prefix) -> - Prefix ++ "-" ++ base64:encode_to_string( - erlang:md5(term_to_binary(guid()))). + Prefix ++ "-" ++ base64:encode_to_string(guid()). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl new file mode 100644 index 00000000..a7ca20c8 --- /dev/null +++ b/src/rabbit_invariable_queue.erl @@ -0,0 +1,276 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_invariable_queue). + +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, + publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, + tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, + set_ram_duration_target/2, ram_duration/1, needs_sync/1, sync/1, + handle_pre_hibernate/1, status/1]). + +-export([start/1]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(iv_state, { queue, qname, durable, len, pending_ack }). +-record(tx, { pending_messages, pending_acks, is_persistent }). + +-ifdef(use_specs). + +-type(ack() :: guid() | 'blank_ack'). +-type(state() :: #iv_state { queue :: queue(), + qname :: queue_name(), + len :: non_neg_integer(), + pending_ack :: dict() + }). +-include("rabbit_backing_queue_spec.hrl"). + +-endif. + +start(DurableQueues) -> + ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). + +init(QName, IsDurable, Recover) -> + Q = queue:from_list(case IsDurable andalso Recover of + true -> rabbit_persister:queue_content(QName); + false -> [] + end), + #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = queue:len(Q), + pending_ack = dict:new() }. + +terminate(State) -> + State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. + +delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), + {_PLen, State1} = purge(State), + terminate(State1). + +purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + %% We do not purge messages pending acks. + {AckTags, PA} = + rabbit_misc:queue_fold( + fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + Acc; + ({Msg = #basic_message { guid = Guid }, IsDelivered}, + {AckTagsN, PAN}) -> + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), + {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + end, {[], dict:new()}, Q), + ok = persist_acks(QName, IsDurable, none, AckTags, PA), + {Len, State #iv_state { len = 0, queue = queue:new() }}. + +publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, + len = Len }) -> + ok = persist_message(QName, IsDurable, none, Msg), + State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. + +publish_delivered(false, _Msg, State) -> + {blank_ack, State}; +publish_delivered(true, Msg = #basic_message { guid = Guid }, + State = #iv_state { qname = QName, durable = IsDurable, + len = 0, pending_ack = PA }) -> + ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_delivery(QName, IsDurable, false, Msg), + {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + +fetch(_AckRequired, State = #iv_state { len = 0 }) -> + {empty, State}; +fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, + durable = IsDurable, + pending_ack = PA }) -> + {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = + queue:out(Q), + Len1 = Len - 1, + ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), + PA1 = dict:store(Guid, Msg, PA), + {AckTag, PA2} = case AckRequired of + true -> {Guid, PA1}; + false -> ok = persist_acks(QName, IsDurable, none, + [Guid], PA1), + {blank_ack, PA} + end, + {{Msg, IsDelivered, AckTag, Len1}, + State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. + +ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + ok = persist_acks(QName, IsDurable, none, AckTags, PA), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1 }. + +tx_publish(Txn, Msg, State = #iv_state { qname = QName, + durable = IsDurable }) -> + Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + ok = persist_message(QName, IsDurable, Txn, Msg), + State. + +tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, + pending_ack = PA }) -> + Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), + store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), + ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), + State. + +tx_rollback(Txn, State = #iv_state { qname = QName }) -> + #tx { pending_acks = AckTags } = lookup_tx(Txn), + ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, + Txn, QName), + erase_tx(Txn), + {lists:flatten(AckTags), State}. + +tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, + queue = Q, len = Len }) -> + #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), + ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, + Txn, QName), + erase_tx(Txn), + Fun(), + AckTags1 = lists:flatten(AckTags), + PA1 = remove_acks(AckTags1, PA), + {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> + {queue:in({Msg, false}, QN), LenN + 1} + end, {Q, Len}, PubsRev), + {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. + +requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, + len = Len }) -> + %% We don't need to touch the persister here - the persister will + %% already have these messages published and delivered as + %% necessary. The complication is that the persister's seq_id will + %% now be wrong, given the position of these messages in our queue + %% here. However, the persister's seq_id is only used for sorting + %% on startup, and requeue is silent as to where the requeued + %% messages should appear, thus the persister is permitted to sort + %% based on seq_id, even though it'll likely give a different + %% order to the last known state of our queue, prior to shutdown. + {Q1, Len1} = lists:foldl( + fun (Guid, {QN, LenN}) -> + {ok, Msg = #basic_message {}} = dict:find(Guid, PA), + {queue:in({Msg, true}, QN), LenN + 1} + end, {Q, Len}, AckTags), + PA1 = remove_acks(AckTags, PA), + State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. + +len(#iv_state { len = Len }) -> Len. + +is_empty(State) -> 0 == len(State). + +set_ram_duration_target(_DurationTarget, State) -> State. + +ram_duration(State) -> {0, State}. + +needs_sync(_State) -> false. + +sync(State) -> State. + +handle_pre_hibernate(State) -> State. + +status(_State) -> []. + +%%---------------------------------------------------------------------------- + +remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). + +%%---------------------------------------------------------------------------- + +lookup_tx(Txn) -> + case get({txn, Txn}) of + undefined -> #tx { pending_messages = [], + pending_acks = [], + is_persistent = false }; + V -> V + end. + +store_tx(Txn, Tx) -> + put({txn, Txn}, Tx). + +erase_tx(Txn) -> + erase({txn, Txn}). + +mark_tx_persistent(Txn) -> + store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). + +is_tx_persistent(Txn) -> + (lookup_tx(Txn)) #tx.is_persistent. + +do_if_persistent(F, Txn, QName) -> + ok = case is_tx_persistent(Txn) of + false -> ok; + true -> F({Txn, QName}) + end. + +%%---------------------------------------------------------------------------- + +persist_message(QName, true, Txn, Msg = #basic_message { + is_persistent = true }) -> + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties, + %% rebuild from properties_bin on restore + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + persist_work(Txn, QName, + [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg) -> + ok. + +persist_delivery(QName, true, false, #basic_message { is_persistent = true, + guid = Guid }) -> + persist_work(none, QName, [{deliver, {QName, Guid}}]); +persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> + ok. + +persist_acks(QName, true, Txn, AckTags, PA) -> + persist_work(Txn, QName, + [{ack, {QName, Guid}} || Guid <- AckTags, + begin + {ok, Msg} = dict:find(Guid, PA), + Msg #basic_message.is_persistent + end]); +persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> + ok. + +persist_work(_Txn,_QName, []) -> + ok; +persist_work(none, _QName, WorkList) -> + rabbit_persister:dirty_work(WorkList); +persist_work(Txn, QName, WorkList) -> + mark_tx_persistent(Txn), + rabbit_persister:extend_transaction({Txn, QName}, WorkList). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 7d840861..878af029 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -249,10 +249,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> State#lim{queues = NewQueues}. unlink_on_stopped(LimiterPid, stopped) -> - true = unlink(LimiterPid), - ok = receive {'EXIT', LimiterPid, _Reason} -> ok - after 0 -> ok - end, + ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), stopped; unlink_on_stopped(_LimiterPid, Result) -> Result. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl new file mode 100644 index 00000000..91e97ffe --- /dev/null +++ b/src/rabbit_memory_monitor.erl @@ -0,0 +1,293 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + + +%% This module handles the node-wide memory statistics. +%% It receives statistics from all queues, counts the desired +%% queue length (in seconds), and sends this information back to +%% queues. + +-module(rabbit_memory_monitor). + +-behaviour(gen_server2). + +-export([start_link/0, update/0, register/2, deregister/1, + report_ram_duration/2, stop/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(process, {pid, reported, sent, callback, monitor}). + +-record(state, {timer, %% 'internal_update' timer + queue_durations, %% ets #process + queue_duration_sum, %% sum of all queue_durations + queue_duration_count, %% number of elements in sum + memory_limit, %% how much memory we intend to use + desired_duration %% the desired queue duration + }). + +-define(SERVER, ?MODULE). +-define(DEFAULT_UPDATE_INTERVAL, 2500). +-define(TABLE_NAME, ?MODULE). + +%% Because we have a feedback loop here, we need to ensure that we +%% have some space for when the queues don't quite respond as fast as +%% we would like, or when there is buffering going on in other parts +%% of the system. In short, we aim to stay some distance away from +%% when the memory alarms will go off, which cause channel.flow. +%% Note that all other Thresholds are relative to this scaling. +-define(MEMORY_LIMIT_SCALING, 0.4). + +-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this + +%% If all queues are pushed to disk (duration 0), then the sum of +%% their reported lengths will be 0. If memory then becomes available, +%% unless we manually intervene, the sum will remain 0, and the queues +%% will never get a non-zero duration. Thus when the mem use is < +%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT. +-define(SUM_INC_THRESHOLD, 0.95). +-define(SUM_INC_AMOUNT, 1.0). + +%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. +-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). + +-define(EPSILON, 0.000001). %% less than this and we clamp to 0 + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). +-spec(update/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). +-spec(deregister/1 :: (pid()) -> 'ok'). +-spec(report_ram_duration/2 :: (pid(), float() | 'infinity') -> number()). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +update() -> + gen_server2:cast(?SERVER, update). + +register(Pid, MFA = {_M, _F, _A}) -> + gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). + +deregister(Pid) -> + gen_server2:cast(?SERVER, {deregister, Pid}). + +report_ram_duration(Pid, QueueDuration) -> + gen_server2:call(?SERVER, + {report_ram_duration, Pid, QueueDuration}, infinity). + +stop() -> + gen_server2:cast(?SERVER, stop). + +%%---------------------------------------------------------------------------- +%% Gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + MemoryLimit = trunc(?MEMORY_LIMIT_SCALING * + (try + vm_memory_monitor:get_memory_limit() + catch + exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM + end)), + + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, + ?SERVER, update, []), + + Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), + + {ok, internal_update( + #state { timer = TRef, + queue_durations = Ets, + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + desired_duration = infinity })}. + +handle_call({report_ram_duration, Pid, QueueDuration}, From, + State = #state { queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations, + desired_duration = SendDuration }) -> + + [Proc = #process { reported = PrevQueueDuration }] = + ets:lookup(Durations, Pid), + + gen_server2:reply(From, SendDuration), + + {Sum1, Count1} = + case {PrevQueueDuration, QueueDuration} of + {infinity, infinity} -> {Sum, Count}; + {infinity, _} -> {Sum + QueueDuration, Count + 1}; + {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; + {_, _} -> {Sum - PrevQueueDuration + QueueDuration, + Count} + end, + true = ets:insert(Durations, Proc #process { reported = QueueDuration, + sent = SendDuration }), + {noreply, State #state { queue_duration_sum = zero_clamp(Sum1), + queue_duration_count = Count1 }}; + +handle_call({register, Pid, MFA}, _From, + State = #state { queue_durations = Durations }) -> + MRef = erlang:monitor(process, Pid), + true = ets:insert(Durations, #process { pid = Pid, reported = infinity, + sent = infinity, callback = MFA, + monitor = MRef }), + {reply, ok, State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast({deregister, Pid}, State) -> + {noreply, internal_deregister(Pid, true, State)}; + +handle_cast(stop, State) -> + {stop, normal, State}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + {noreply, internal_deregister(Pid, false, State)}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state { timer = TRef }) -> + timer:cancel(TRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +zero_clamp(Sum) -> + case Sum < ?EPSILON of + true -> 0.0; + false -> Sum + end. + +internal_deregister(Pid, Demonitor, + State = #state { queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations }) -> + case ets:lookup(Durations, Pid) of + [] -> State; + [#process { reported = PrevQueueDuration, monitor = MRef }] -> + true = case Demonitor of + true -> erlang:demonitor(MRef); + false -> true + end, + {Sum1, Count1} = + case PrevQueueDuration of + infinity -> {Sum, Count}; + _ -> {zero_clamp(Sum - PrevQueueDuration), + Count - 1} + end, + true = ets:delete(Durations, Pid), + State #state { queue_duration_sum = Sum1, + queue_duration_count = Count1 } + end. + +internal_update(State = #state { memory_limit = Limit, + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count }) -> + MemoryRatio = erlang:memory(total) / Limit, + DesiredDurationAvg1 = + case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of + true -> + infinity; + false -> + Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of + true -> Sum + ?SUM_INC_AMOUNT; + false -> Sum + end, + (Sum1 / Count) / MemoryRatio + end, + State1 = State #state { desired_duration = DesiredDurationAvg1 }, + + %% only inform queues immediately if the desired duration has + %% decreased + case DesiredDurationAvg1 == infinity orelse + (DesiredDurationAvg /= infinity andalso + DesiredDurationAvg1 >= DesiredDurationAvg) of + true -> + ok; + false -> + true = + ets:foldl( + fun (Proc = #process { reported = QueueDuration, + sent = PrevSendDuration, + callback = {M, F, A} }, true) -> + case (case {QueueDuration, PrevSendDuration} of + {infinity, infinity} -> + true; + {infinity, D} -> + DesiredDurationAvg1 < D; + {D, infinity} -> + DesiredDurationAvg1 < D; + {D1, D2} -> + DesiredDurationAvg1 < + lists:min([D1,D2]) + end) of + true -> + ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), + ets:insert( + Durations, + Proc #process {sent = DesiredDurationAvg1}); + false -> + true + end + end, true, Durations) + end, + State1. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81cecb38..9a911ab1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -43,6 +43,7 @@ -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). +-export([start_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -59,6 +60,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). -import(mnesia). -import(lists). @@ -96,9 +98,10 @@ undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). +-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> ok_or_error()). --spec(report_cover/1 :: (string()) -> 'ok'). +-spec(enable_cover/1 :: (file_path()) -> ok_or_error()). +-spec(report_cover/1 :: (file_path()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -119,20 +122,27 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). --spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (string(), string()) -> ok_or_error()). +-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). +-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(recursive_delete/1 :: ([file_path()]) -> + 'ok' | {'error', {file_path(), any()}}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -220,6 +230,10 @@ enable_cover(Root) -> _ -> ok end. +start_cover(NodesS) -> + {ok, _} = cover:start([makenode(N) || N <- NodesS]), + ok. + report_cover() -> report_cover("."). @@ -523,51 +537,25 @@ pid_to_string(Pid) when is_pid(Pid) -> %% inverse of above string_to_pid(Str) -> - ErrorFun = fun () -> throw({error, {invalid_pid_syntax, Str}}) end, - %% TODO: simplify this code by using the 're' module, once we drop - %% support for R11 - %% - %% 1) sanity check + Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. - case regexp:first_match(Str, "^<.*\\.[0-9]+\\.[0-9]+>\$") of - {match, _, _} -> - %% 2) strip <> - Str1 = string:substr(Str, 2, string:len(Str) - 2), - %% 3) extract three constituent parts, taking care to - %% handle dots in the node part (hence the reverse and concat) - [SerStr, IdStr | Rest] = lists:reverse(string:tokens(Str1, ".")), - NodeStr = lists:concat(lists:reverse(Rest)), - %% 4) construct a triple term from the three parts - TripleStr = lists:flatten(io_lib:format("{~s,~s,~s}.", - [NodeStr, IdStr, SerStr])), - %% 5) parse the triple - Tokens = case erl_scan:string(TripleStr) of - {ok, Tokens1, _} -> Tokens1; - {error, _, _} -> ErrorFun() - end, - Term = case erl_parse:parse_term(Tokens) of - {ok, Term1} -> Term1; - {error, _} -> ErrorFun() - end, - {Node, Id, Ser} = - case Term of - {Node1, Id1, Ser1} when is_atom(Node1) andalso - is_integer(Id1) andalso - is_integer(Ser1) -> - Term; - _ -> - ErrorFun() - end, - %% 6) turn the triple into a pid - see pid_to_string - <<131,NodeEnc/binary>> = term_to_binary(Node), + case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", + [{capture,all_but_first,list}]) of + {match, [NodeStr, IdStr, SerStr]} -> + %% the NodeStr atom might be quoted, so we have to parse + %% it rather than doing a simple list_to_atom + NodeAtom = case erl_scan:string(NodeStr) of + {ok, [{atom, _, X}], _} -> X; + {error, _, _} -> throw(Err) + end, + <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), + Id = list_to_integer(IdStr), + Ser = list_to_integer(SerStr), binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); nomatch -> - ErrorFun(); - Error -> - %% invalid regexp - shouldn't happen - throw(Error) - end. + throw(Err) + end. version_compare(A, B, lte) -> case version_compare(A, B) of @@ -584,20 +572,67 @@ version_compare(A, B, gte) -> version_compare(A, B, Result) -> Result =:= version_compare(A, B). -version_compare([], []) -> +version_compare(A, A) -> eq; -version_compare([], _ ) -> +version_compare([], [$0 | B]) -> + version_compare([], dropdot(B)); +version_compare([], _) -> lt; %% 2.3 < 2.3.1 -version_compare(_ , []) -> +version_compare([$0 | A], []) -> + version_compare(dropdot(A), []); +version_compare(_, []) -> gt; %% 2.3.1 > 2.3 version_compare(A, B) -> {AStr, ATl} = lists:splitwith(fun (X) -> X =/= $. end, A), {BStr, BTl} = lists:splitwith(fun (X) -> X =/= $. end, B), ANum = list_to_integer(AStr), BNum = list_to_integer(BStr), - if ANum =:= BNum -> ATl1 = lists:dropwhile(fun (X) -> X =:= $. end, ATl), - BTl1 = lists:dropwhile(fun (X) -> X =:= $. end, BTl), - version_compare(ATl1, BTl1); + if ANum =:= BNum -> version_compare(dropdot(ATl), dropdot(BTl)); ANum < BNum -> lt; ANum > BNum -> gt end. + +dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A). + +recursive_delete(Files) -> + lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). + +recursive_delete1(Path) -> + case filelib:is_dir(Path) of + false -> case file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +unlink_and_capture_exit(Pid) -> + unlink(Pid), + receive {'EXIT', Pid, _} -> ok + after 0 -> ok + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6ec3cf74..55a6761d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -48,7 +48,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> string()). +-spec(dir/0 :: () -> file_path()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). @@ -424,9 +424,8 @@ reset(Force) -> cannot_delete_schema) end, ok = delete_cluster_nodes_config(), - %% remove persistet messages and any other garbage we find - lists:foreach(fun file:delete/1, - filelib:wildcard(dir() ++ "/*")), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 7978573d..c3d0b7b7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -51,6 +51,7 @@ binary, {packet, raw}, % no packaging {reuseaddr, true}, % allow rebind without waiting + {backlog, 128}, % use the maximum listen(2) backlog value %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. %% {delay_send, true}, {exit_on_close, false} diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 019d2a26..3cd42e47 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -33,14 +33,14 @@ -behaviour(gen_server). --export([start_link/0]). +-export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0, serial/0]). + force_snapshot/0, queue_content/1]). -include("rabbit.hrl"). @@ -49,48 +49,44 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). --define(HIBERNATE_AFTER, 10000). - --define(MAX_WRAP_ENTRIES, 500). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, - snapshot}). + pending_logs, pending_replies, snapshot}). %% two tables for efficient persistency %% one maps a key to a message %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). +-record(psnapshot, {transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --type(qmsg() :: {amqqueue(), pkey()}). +-type(pmsg() :: {queue_name(), pkey()}). -type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: ([queue_name()]) -> + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: (txn()) -> 'ok'). --spec(rollback_transaction/1 :: (txn()) -> 'ok'). +-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). --spec(serial/0 :: () -> non_neg_integer()). +-spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link(DurableQueues) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), @@ -116,19 +112,19 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). -serial() -> - gen_server:call(?SERVER, serial, infinity). +queue_content(QName) -> + gen_server:call(?SERVER, {queue_content, QName}, infinity). %%-------------------------------------------------------------------- -init(_Args) -> +init([DurableQueues]) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{serial = 0, - transactions = dict:new(), + Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, [])}, + queues = ets:new(queues, [ordered_set]), + next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, {head, current_snapshot(Snapshot)}, @@ -143,9 +139,8 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), - NewSnapshot = LoadedSnapshot#psnapshot{ - serial = LoadedSnapshot#psnapshot.serial + 1}, + {Res, NewSnapshot} = + internal_load_snapshot(LogHandle, DurableQueues, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -153,12 +148,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], + pending_replies = [], + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -168,9 +163,13 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); -handle_call(serial, _From, - State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> - do_reply(Serial, State); +handle_call({queue_content, QName}, _From, + State = #pstate{snapshot = #psnapshot{messages = Messages, + queues = Queues}}) -> + MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), D} || + {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -185,9 +184,7 @@ handle_cast(_Msg, State) -> handle_info(timeout, State = #pstate{deadline = infinity}) -> State1 = flush(true, State), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State1, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); + {noreply, State1, hibernate}; handle_info(timeout, State) -> do_noreply(flush(State)); handle_info(_Info, State) -> @@ -236,8 +233,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline, %% "tied" is met. log_work(CreateWorkUnit, MessageList, State = #pstate{ - snapshot = Snapshot = #psnapshot{ - messages = Messages}}) -> + snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( fun(M = {publish, Message, QK = {_QName, PKey}}) -> @@ -282,12 +278,15 @@ take_snapshot_and_save_old(LogHandle, Snapshot) -> maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, log_handle = LH, - snapshot = Snapshot}) - when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; -maybe_take_snapshot(_Force, State) -> - State. + snapshot = Snapshot}) -> + {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries), + if + Force orelse EntryCount >= MaxWrapEntries -> + ok = take_snapshot(LH, Snapshot), + State#pstate{entry_count = 0}; + true -> + State + end. later_ms(DeltaMilliSec) -> {MegaSec, Sec, MicroSec} = now(), @@ -304,7 +303,8 @@ compute_deadline(_TimerDelay, ExistingDeadline) -> ExistingDeadline. compute_timeout(infinity) -> - ?HIBERNATE_AFTER; + {ok, HibernateAfter} = application:get_env(persister_hibernate_after), + HibernateAfter; compute_timeout(Deadline) -> DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, if @@ -343,56 +343,64 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> +current_snapshot(_Snapshot = #psnapshot{transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues)), - InnerSnapshot = {{serial, Serial}, - {txns, Ts}, + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues), + prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), + InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. -prune_table(Tab, Keys) -> +prune_table(Tab, Pred) -> true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Keys, ets:first(Tab)), + ok = prune_table(Tab, Pred, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). -prune_table(_Tab, _Keys, '$end_of_table') -> ok; -prune_table(Tab, Keys, Key) -> - case sets:is_element(Key, Keys) of +prune_table(_Tab, _Pred, '$end_of_table') -> ok; +prune_table(Tab, Pred, Key) -> + case Pred(Key) of true -> ok; false -> ets:delete(Tab, Key) end, - prune_table(Tab, Keys, ets:next(Tab, Key)). + prune_table(Tab, Pred, ets:next(Tab, Key)). internal_load_snapshot(LogHandle, + DurableQueues, Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), + {{txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ - serial = Serial, - transactions = Ts}), - Snapshot2 = requeue_messages(Snapshot1), + transactions = Ts, + next_seq_id = NextSeqId}), + %% Remove all entries for queues that no longer exist. + %% Note that the 'messages' table is pruned when the next + %% snapshot is taken. + DurableQueuesSet = sets:from_list(DurableQueues), + prune_table(Snapshot1#psnapshot.queues, + fun ({QName, _PKey}) -> + sets:is_element(QName, DurableQueuesSet) + end), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so %% any uncompleted transactions will have been aborted. - {ok, Snapshot2#psnapshot{transactions = dict:new()}}; + {ok, Snapshot1#psnapshot{transactions = dict:new()}}; {error, Reason} -> {{error, Reason}, Snapshot} end. @@ -404,56 +412,6 @@ check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> check_version(_Other) -> {error, unrecognised_persister_log_format}. -requeue_messages(Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), - %% unstable parallel map, because order doesn't matter - L = lists:append( - rabbit_misc:upmap( - %% we do as much work as possible in spawned worker - %% processes, but we need to make sure the ets:inserts are - %% performed in self() - fun ({QName, Requeues}) -> - requeue(QName, Requeues, Messages) - end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], - ets:delete_all_objects(Messages), - ets:delete_all_objects(Queues), - true = ets:insert(Messages, NewMessages), - true = ets:insert(Queues, NewQueues), - %% contains the mutated messages and queues tables - Snapshot. - -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - -requeue(QName, Requeues, Messages) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, - {_, Message} <- ets:lookup(Messages, PKey)], - rabbit_amqqueue:redeliver( - QPid, - %% Messages published by the same process receive - %% persistence keys that are monotonically - %% increasing. Since message ordering is defined on a - %% per-channel basis, and channels are bound to specific - %% processes, sorting the list does provide the correct - %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- - lists:sort(RequeueMessages)]), - RequeueMessages; - {error, not_found} -> - [] - end. - replay([], LogHandle, K, Snapshot) -> case disk_log:chunk(LogHandle, K) of {K1, Items} -> @@ -474,50 +432,55 @@ internal_integrate_messages(Items, Snapshot) -> internal_integrate1({extend_transaction, Key, MessageList}, Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; internal_integrate1({rollback_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions}) -> Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; internal_integrate1({commit_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> case dict:find(Key, Transactions) of {ok, MessageLists} -> ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; error -> Snapshot end; internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 274981ef..ef3c5cc2 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -108,6 +108,7 @@ start() -> WApp == stdlib; WApp == kernel; WApp == sasl; + WApp == crypto; WApp == os_mon -> false; _ -> true end]), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5cf519b7..73a58f13 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -52,10 +52,12 @@ -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). +-define(SILENT_CLOSE_DELAY, 3). %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state}). +-record(v1, {sock, connection, callback, recv_ref, connection_state, + queue_collector}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -233,6 +235,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), + {ok, Collector} = rabbit_reader_queue_collector:start_link(), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -244,7 +247,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> client_properties = none}, callback = uninitialized_callback, recv_ref = none, - connection_state = pre_init}, + connection_state = pre_init, + queue_collector = Collector}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -262,7 +266,9 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue) + teardown_profiling(ProfilingValue), + rabbit_reader_queue_collector:shutdown(Collector), + rabbit_misc:unlink_and_capture_exit(Collector) end, done. @@ -425,11 +431,17 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State = #v1{connection_state = closing}) -> +maybe_close(State = #v1{connection_state = closing, + queue_collector = Collector}) -> case all_channels() of - [] -> ok = send_on_channel0( - State#v1.sock, #'connection.close_ok'{}), - close_connection(State); + [] -> + %% Spec says "Exclusive queues may only be accessed by the current + %% connection, and are deleted when that connection closes." + %% This does not strictly imply synchrony, but in practice it seems + %% to be what people assume. + rabbit_reader_queue_collector:delete_all(Collector), + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + close_connection(State); _ -> State end; maybe_close(State) -> @@ -575,7 +587,11 @@ handle_method0(MethodName, FieldsBin, State) -> end, case State#v1.connection_state of running -> send_exception(State, 0, CompleteReason); - Other -> throw({channel0_error, Other, CompleteReason}) + %% We don't trust the client at this point - force + %% them to wait for a bit so they can't DOS us with + %% repeated failed logins etc. + Other -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, Other, CompleteReason}) end end. @@ -722,15 +738,16 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State) -> +send_to_new_channel(Channel, AnalyzedFrame, + State = #v1{queue_collector = Collector}) -> #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/6, + [Channel, self(), WriterPid, Username, VHost, Collector]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl new file mode 100644 index 00000000..841549e9 --- /dev/null +++ b/src/rabbit_reader_queue_collector.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_reader_queue_collector). + +-behaviour(gen_server). + +-export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {exclusive_queues}). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()}). +-spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok'). +-spec(delete_all/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +register_exclusive_queue(CollectorPid, Q) -> + gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity). + +delete_all(CollectorPid) -> + gen_server:call(CollectorPid, delete_all, infinity). + +shutdown(CollectorPid) -> + gen_server:call(CollectorPid, shutdown, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{exclusive_queues = dict:new()}}. + +%%-------------------------------------------------------------------------- + +handle_call({register_exclusive_queue, Q}, _From, + State = #state{exclusive_queues = Queues}) -> + MonitorRef = erlang:monitor(process, Q#amqqueue.pid), + {reply, ok, + State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, _From, + State = #state{exclusive_queues = ExclusiveQueues}) -> + [rabbit_misc:with_exit_handler( + fun() -> ok end, + fun() -> + erlang:demonitor(MonitorRef), + rabbit_amqqueue:delete(Q, false, false) + end) + || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)], + {reply, ok, State}; + +handle_call(shutdown, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, + State = #state{exclusive_queues = ExclusiveQueues}) -> + {noreply, State#state{exclusive_queues = + dict:erase(MonitorRef, ExclusiveQueues)}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 884ea4ab..03979d6c 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,104 +33,40 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --behaviour(gen_server2). - --export([start_link/0, - deliver/2, +-export([deliver/2, match_bindings/2, match_routing_key/2]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% cross-node routing optimisation is disabled because of bug 19758. --define(BUG19758, true). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - --ifdef(BUG19758). - -deliver(QPids, Delivery) -> - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)). - --else. +deliver(QPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + delegate:invoke_no_result( + QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + {routed, QPids}; deliver(QPids, Delivery) -> - %% we reduce inter-node traffic by grouping the qpids by node and - %% only delivering one copy of the message to each node involved, - %% which then in turn delivers it to its queues. - deliver_per_node( - dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), - Delivery). - -deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> - %% optimisation - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - run_bindings(QPids, Delivery)); -deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver in run_bindings below will deliver the - %% message to the queue process asynchronously, and return true, - %% which means all the QPids will always be returned. It is - %% therefore safe to use a fire-and-forget cast here and return - %% the QPids - the semantics is preserved. This scales much better - %% than the non-immediate case below. - {routed, - lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), - QPids - end, - NodeQPids)}; -deliver_per_node(NodeQPids, Delivery) -> - R = rabbit_misc:upmap( - fun ({Node, QPids}) -> - try gen_server2:call({?SERVER, Node}, - {deliver, QPids, Delivery}, - infinity) - catch - _Class:_Reason -> - %% TODO: figure out what to log (and do!) here - {false, []} - end - end, - NodeQPids), - {Routed, Handled} = - lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) -> - {Routed or RoutedAcc, - %% we do the concatenation below, which - %% should be faster - [Handled | HandledAcc]} - end, - {false, []}, - R), + {Success, _} = + delegate:invoke(QPids, + fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, lists:append(Handled)}). - --endif. + {Routed, Handled}). %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange @@ -141,20 +77,7 @@ match_bindings(Name, Match) -> mnesia:table(rabbit_route), ExchangeName == Name, Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). + lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). match_routing_key(Name, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, @@ -174,44 +97,8 @@ lookup_qpids(Queues) -> %%-------------------------------------------------------------------- -init([]) -> - {ok, no_state}. - -handle_call({deliver, QPids, Delivery}, From, State) -> - spawn( - fun () -> - R = run_bindings(QPids, Delivery), - gen_server2:reply(From, R) - end), - {noreply, State}. - -handle_cast({deliver, QPids, Delivery}, State) -> - %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Delivery), - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -run_bindings(QPids, Delivery) -> - lists:foldl( - fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(QPid, Delivery) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', _Reason} -> {Routed, Handled} - end - end, - {false, []}, - QPids). +fold_deliveries({Pid, true},{_, Handled}) -> {true, [Pid|Handled]}; +fold_deliveries({_, false},{_, Handled}) -> {true, Handled}. %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) check_delivery(true, _ , {false, []}) -> {unroutable, []}; diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 25715e6e..2c5e5112 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, +-export([start_link/0, start_child/1, start_child/2, start_child/3, start_restartable_child/1, start_restartable_child/2]). -export([init/1]). @@ -49,8 +49,11 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, + {ChildId, {Mod, start_link, Args}, transient, ?MAX_WAIT, worker, [Mod]}), ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 82f2d199..fa0ce2db 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -61,7 +61,32 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), - passed = test_hooks(), + passed = maybe_run_cluster_dependent_tests(), + passed. + + +maybe_run_cluster_dependent_tests() -> + SecondaryNode = rabbit_misc:makenode("hare"), + + case net_adm:ping(SecondaryNode) of + pong -> passed = run_cluster_dependent_tests(SecondaryNode); + pang -> io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end, + passed. + +run_cluster_dependent_tests(SecondaryNode) -> + SecondaryNodeS = atom_to_list(SecondaryNode), + + ok = control_action(stop_app, []), + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), + ok = control_action(start_app, []), + + io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), + passed = test_delegates_async(SecondaryNode), + passed = test_delegates_sync(SecondaryNode), + passed. test_priority_queue() -> @@ -625,8 +650,12 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), %% NB: this will log an inconsistent_database error, which is harmless + %% Turning cover on / off is OK even if we're not in general using cover, + %% it just turns the engine on / off, doesn't actually log anything. + cover:stop([SecondaryNode]), true = disconnect_node(SecondaryNode), pong = net_adm:ping(SecondaryNode), + cover:start([SecondaryNode]), %% leaving a cluster as a ram node ok = control_action(reset, []), @@ -717,17 +746,16 @@ test_user_management() -> passed. test_server_status() -> - %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, []) || + false, false, [], none) || Name <- [<<"foo">>, <<"bar">>]], - ok = rabbit_amqqueue:claim_queue(Q, self()), - ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, + ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), %% list queues @@ -811,6 +839,93 @@ test_hooks() -> end, passed. +test_delegates_async(SecondaryNode) -> + Self = self(), + Sender = fun(Pid) -> Pid ! {invoked, Self} end, + + Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end), + + ok = delegate:invoke_no_result(spawn(Responder), Sender), + ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), + await_response(2), + + LocalPids = spawn_responders(node(), Responder, 10), + RemotePids = spawn_responders(SecondaryNode, Responder, 10), + ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender), + await_response(20), + + passed. + +make_responder(FMsg) -> make_responder(FMsg, timeout). +make_responder(FMsg, Throw) -> + fun() -> + receive Msg -> FMsg(Msg) + after 1000 -> throw(Throw) + end + end. + +spawn_responders(Node, Responder, Count) -> + [spawn(Node, Responder) || _ <- lists:seq(1, Count)]. + +await_response(0) -> + ok; +await_response(Count) -> + receive + response -> ok, + await_response(Count - 1) + after 1000 -> + io:format("Async reply not received~n"), + throw(timeout) + end. + +must_exit(Fun) -> + try + Fun(), + throw(exit_not_thrown) + catch + exit:_ -> ok + end. + +test_delegates_sync(SecondaryNode) -> + Sender = fun(Pid) -> gen_server:call(Pid, invoked) end, + BadSender = fun(_Pid) -> exit(exception) end, + + Responder = make_responder(fun({'$gen_call', From, invoked}) -> + gen_server:reply(From, response) + end), + + BadResponder = make_responder(fun({'$gen_call', From, invoked}) -> + gen_server:reply(From, response) + end, bad_responder_died), + + response = delegate:invoke(spawn(Responder), Sender), + response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), + + must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end), + must_exit(fun() -> + delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), + + LocalGoodPids = spawn_responders(node(), Responder, 2), + RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), + LocalBadPids = spawn_responders(node(), BadResponder, 2), + RemoteBadPids = spawn_responders(SecondaryNode, BadResponder, 2), + + {GoodRes, []} = delegate:invoke(LocalGoodPids ++ RemoteGoodPids, Sender), + true = lists:all(fun ({_, response}) -> true end, GoodRes), + GoodResPids = [Pid || {Pid, _} <- GoodRes], + + Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), + Good = ordsets:from_list(GoodResPids), + + {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), + true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), + BadResPids = [Pid || {Pid, _} <- BadRes], + + Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), + Bad = ordsets:from_list(BadResPids), + + passed. + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). diff --git a/src/supervisor2.erl b/src/supervisor2.erl new file mode 100644 index 00000000..55753512 --- /dev/null +++ b/src/supervisor2.erl @@ -0,0 +1,917 @@ +%% This file is a copy of supervisor.erl from the R13B-3 Erlang/OTP +%% distribution, with the following modifications: +%% +%% 1) the module name is supervisor2 +%% +%% 2) there is a new strategy called +%% simple_one_for_one_terminate. This is exactly the same as for +%% simple_one_for_one, except that children *are* explicitly +%% terminated as per the shutdown component of the child_spec. +%% +%% All modifications are (C) 2010 LShift Ltd. +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(supervisor2). + +-behaviour(gen_server). + +%% External exports +-export([start_link/2,start_link/3, + start_child/2, restart_child/2, + delete_child/2, terminate_child/2, + which_children/1, + check_childspecs/1]). + +-export([behaviour_info/1]). + +%% Internal exports +-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). +-export([handle_cast/2]). + +-define(DICT, dict). + +-record(state, {name, + strategy, + children = [], + dynamics = ?DICT:new(), + intensity, + period, + restarts = [], + module, + args}). + +-record(child, {pid = undefined, % pid is undefined when child is not running + name, + mfa, + restart_type, + shutdown, + child_type, + modules = []}). + +-define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse + State#state.strategy =:= simple_one_for_one_terminate). +-define(is_terminate_simple(State), + State#state.strategy =:= simple_one_for_one_terminate). + +behaviour_info(callbacks) -> + [{init,1}]; +behaviour_info(_Other) -> + undefined. + +%%% --------------------------------------------------- +%%% This is a general process supervisor built upon gen_server.erl. +%%% Servers/processes should/could also be built using gen_server.erl. +%%% SupName = {local, atom()} | {global, atom()}. +%%% --------------------------------------------------- +start_link(Mod, Args) -> + gen_server:start_link(?MODULE, {self, Mod, Args}, []). + +start_link(SupName, Mod, Args) -> + gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). + +%%% --------------------------------------------------- +%%% Interface functions. +%%% --------------------------------------------------- +start_child(Supervisor, ChildSpec) -> + call(Supervisor, {start_child, ChildSpec}). + +restart_child(Supervisor, Name) -> + call(Supervisor, {restart_child, Name}). + +delete_child(Supervisor, Name) -> + call(Supervisor, {delete_child, Name}). + +%%----------------------------------------------------------------- +%% Func: terminate_child/2 +%% Returns: ok | {error, Reason} +%% Note that the child is *always* terminated in some +%% way (maybe killed). +%%----------------------------------------------------------------- +terminate_child(Supervisor, Name) -> + call(Supervisor, {terminate_child, Name}). + +which_children(Supervisor) -> + call(Supervisor, which_children). + +call(Supervisor, Req) -> + gen_server:call(Supervisor, Req, infinity). + +check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> + case check_startspec(ChildSpecs) of + {ok, _} -> ok; + Error -> {error, Error} + end; +check_childspecs(X) -> {error, {badarg, X}}. + +%%% --------------------------------------------------- +%%% +%%% Initialize the supervisor. +%%% +%%% --------------------------------------------------- +init({SupName, Mod, Args}) -> + process_flag(trap_exit, true), + case Mod:init(Args) of + {ok, {SupFlags, StartSpec}} -> + case init_state(SupName, SupFlags, Mod, Args) of + {ok, State} when ?is_simple(State) -> + init_dynamic(State, StartSpec); + {ok, State} -> + init_children(State, StartSpec); + Error -> + {stop, {supervisor_data, Error}} + end; + ignore -> + ignore; + Error -> + {stop, {bad_return, {Mod, init, Error}}} + end. + +init_children(State, StartSpec) -> + SupName = State#state.name, + case check_startspec(StartSpec) of + {ok, Children} -> + case start_children(Children, SupName) of + {ok, NChildren} -> + {ok, State#state{children = NChildren}}; + {error, NChildren} -> + terminate_children(NChildren, SupName), + {stop, shutdown} + end; + Error -> + {stop, {start_spec, Error}} + end. + +init_dynamic(State, [StartSpec]) -> + case check_startspec([StartSpec]) of + {ok, Children} -> + {ok, State#state{children = Children}}; + Error -> + {stop, {start_spec, Error}} + end; +init_dynamic(_State, StartSpec) -> + {stop, {bad_start_spec, StartSpec}}. + +%%----------------------------------------------------------------- +%% Func: start_children/2 +%% Args: Children = [#child] in start order +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Purpose: Start all children. The new list contains #child's +%% with pids. +%% Returns: {ok, NChildren} | {error, NChildren} +%% NChildren = [#child] in termination order (reversed +%% start order) +%%----------------------------------------------------------------- +start_children(Children, SupName) -> start_children(Children, [], SupName). + +start_children([Child|Chs], NChildren, SupName) -> + case do_start_child(SupName, Child) of + {ok, Pid} -> + start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); + {ok, Pid, _Extra} -> + start_children(Chs, [Child#child{pid = Pid}|NChildren], SupName); + {error, Reason} -> + report_error(start_error, Reason, Child, SupName), + {error, lists:reverse(Chs) ++ [Child | NChildren]} + end; +start_children([], NChildren, _SupName) -> + {ok, NChildren}. + +do_start_child(SupName, Child) -> + #child{mfa = {M, F, A}} = Child, + case catch apply(M, F, A) of + {ok, Pid} when is_pid(Pid) -> + NChild = Child#child{pid = Pid}, + report_progress(NChild, SupName), + {ok, Pid}; + {ok, Pid, Extra} when is_pid(Pid) -> + NChild = Child#child{pid = Pid}, + report_progress(NChild, SupName), + {ok, Pid, Extra}; + ignore -> + {ok, undefined}; + {error, What} -> {error, What}; + What -> {error, What} + end. + +do_start_child_i(M, F, A) -> + case catch apply(M, F, A) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {ok, Pid, Extra} when is_pid(Pid) -> + {ok, Pid, Extra}; + ignore -> + {ok, undefined}; + {error, Error} -> + {error, Error}; + What -> + {error, What} + end. + + +%%% --------------------------------------------------- +%%% +%%% Callback functions. +%%% +%%% --------------------------------------------------- +handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> + #child{mfa = {M, F, A}} = hd(State#state.children), + Args = A ++ EArgs, + case do_start_child_i(M, F, Args) of + {ok, Pid} -> + NState = State#state{dynamics = + ?DICT:store(Pid, Args, State#state.dynamics)}, + {reply, {ok, Pid}, NState}; + {ok, Pid, Extra} -> + NState = State#state{dynamics = + ?DICT:store(Pid, Args, State#state.dynamics)}, + {reply, {ok, Pid, Extra}, NState}; + What -> + {reply, What, State} + end; + +%%% The requests terminate_child, delete_child and restart_child are +%%% invalid for simple_one_for_one and simple_one_for_one_terminate +%%% supervisors. +handle_call({_Req, _Data}, _From, State) when ?is_simple(State) -> + {reply, {error, State#state.strategy}, State}; + +handle_call({start_child, ChildSpec}, _From, State) -> + case check_childspec(ChildSpec) of + {ok, Child} -> + {Resp, NState} = handle_start_child(Child, State), + {reply, Resp, NState}; + What -> + {reply, {error, What}, State} + end; + +handle_call({restart_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} when Child#child.pid =:= undefined -> + case do_start_child(State#state.name, Child) of + {ok, Pid} -> + NState = replace_child(Child#child{pid = Pid}, State), + {reply, {ok, Pid}, NState}; + {ok, Pid, Extra} -> + NState = replace_child(Child#child{pid = Pid}, State), + {reply, {ok, Pid, Extra}, NState}; + Error -> + {reply, Error, State} + end; + {value, _} -> + {reply, {error, running}, State}; + _ -> + {reply, {error, not_found}, State} + end; + +handle_call({delete_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} when Child#child.pid =:= undefined -> + NState = remove_child(Child, State), + {reply, ok, NState}; + {value, _} -> + {reply, {error, running}, State}; + _ -> + {reply, {error, not_found}, State} + end; + +handle_call({terminate_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} -> + NChild = do_terminate(Child, State#state.name), + {reply, ok, replace_child(NChild, State)}; + _ -> + {reply, {error, not_found}, State} + end; + +handle_call(which_children, _From, State) when ?is_simple(State) -> + [#child{child_type = CT, modules = Mods}] = State#state.children, + Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end, + ?DICT:to_list(State#state.dynamics)), + {reply, Reply, State}; + +handle_call(which_children, _From, State) -> + Resp = + lists:map(fun(#child{pid = Pid, name = Name, + child_type = ChildType, modules = Mods}) -> + {Name, Pid, ChildType, Mods} + end, + State#state.children), + {reply, Resp, State}. + + +%%% Hopefully cause a function-clause as there is no API function +%%% that utilizes cast. +handle_cast(null, State) -> + error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", + []), + + {noreply, State}. + +%% +%% Take care of terminated children. +%% +handle_info({'EXIT', Pid, Reason}, State) -> + case restart_child(Pid, Reason, State) of + {ok, State1} -> + {noreply, State1}; + {shutdown, State1} -> + {stop, shutdown, State1} + end; + +handle_info(Msg, State) -> + error_logger:error_msg("Supervisor received unexpected message: ~p~n", + [Msg]), + {noreply, State}. +%% +%% Terminate this server. +%% +terminate(_Reason, State) when ?is_terminate_simple(State) -> + terminate_simple_children( + hd(State#state.children), State#state.dynamics, State#state.name), + ok; +terminate(_Reason, State) -> + terminate_children(State#state.children, State#state.name), + ok. + +%% +%% Change code for the supervisor. +%% Call the new call-back module and fetch the new start specification. +%% Combine the new spec. with the old. If the new start spec. is +%% not valid the code change will not succeed. +%% Use the old Args as argument to Module:init/1. +%% NOTE: This requires that the init function of the call-back module +%% does not have any side effects. +%% +code_change(_, State, _) -> + case (State#state.module):init(State#state.args) of + {ok, {SupFlags, StartSpec}} -> + case catch check_flags(SupFlags) of + ok -> + {Strategy, MaxIntensity, Period} = SupFlags, + update_childspec(State#state{strategy = Strategy, + intensity = MaxIntensity, + period = Period}, + StartSpec); + Error -> + {error, Error} + end; + ignore -> + {ok, State}; + Error -> + Error + end. + +check_flags({Strategy, MaxIntensity, Period}) -> + validStrategy(Strategy), + validIntensity(MaxIntensity), + validPeriod(Period), + ok; +check_flags(What) -> + {bad_flags, What}. + +update_childspec(State, StartSpec) when ?is_simple(State) -> + case check_startspec(StartSpec) of + {ok, [Child]} -> + {ok, State#state{children = [Child]}}; + Error -> + {error, Error} + end; + +update_childspec(State, StartSpec) -> + case check_startspec(StartSpec) of + {ok, Children} -> + OldC = State#state.children, % In reverse start order ! + NewC = update_childspec1(OldC, Children, []), + {ok, State#state{children = NewC}}; + Error -> + {error, Error} + end. + +update_childspec1([Child|OldC], Children, KeepOld) -> + case update_chsp(Child, Children) of + {ok,NewChildren} -> + update_childspec1(OldC, NewChildren, KeepOld); + false -> + update_childspec1(OldC, Children, [Child|KeepOld]) + end; +update_childspec1([], Children, KeepOld) -> + % Return them in (keeped) reverse start order. + lists:reverse(Children ++ KeepOld). + +update_chsp(OldCh, Children) -> + case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> + Ch#child{pid = OldCh#child.pid}; + (Ch) -> + Ch + end, + Children) of + Children -> + false; % OldCh not found in new spec. + NewC -> + {ok, NewC} + end. + +%%% --------------------------------------------------- +%%% Start a new child. +%%% --------------------------------------------------- + +handle_start_child(Child, State) -> + case get_child(Child#child.name, State) of + false -> + case do_start_child(State#state.name, Child) of + {ok, Pid} -> + Children = State#state.children, + {{ok, Pid}, + State#state{children = + [Child#child{pid = Pid}|Children]}}; + {ok, Pid, Extra} -> + Children = State#state.children, + {{ok, Pid, Extra}, + State#state{children = + [Child#child{pid = Pid}|Children]}}; + {error, What} -> + {{error, {What, Child}}, State} + end; + {value, OldChild} when OldChild#child.pid =/= undefined -> + {{error, {already_started, OldChild#child.pid}}, State}; + {value, _OldChild} -> + {{error, already_present}, State} + end. + +%%% --------------------------------------------------- +%%% Restart. A process has terminated. +%%% Returns: {ok, #state} | {shutdown, #state} +%%% --------------------------------------------------- + +restart_child(Pid, Reason, State) when ?is_simple(State) -> + case ?DICT:find(Pid, State#state.dynamics) of + {ok, Args} -> + [Child] = State#state.children, + RestartType = Child#child.restart_type, + {M, F, _} = Child#child.mfa, + NChild = Child#child{pid = Pid, mfa = {M, F, Args}}, + do_restart(RestartType, Reason, NChild, State); + error -> + {ok, State} + end; +restart_child(Pid, Reason, State) -> + Children = State#state.children, + case lists:keysearch(Pid, #child.pid, Children) of + {value, Child} -> + RestartType = Child#child.restart_type, + do_restart(RestartType, Reason, Child, State); + _ -> + {ok, State} + end. + +do_restart(permanent, Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name), + restart(Child, State); +do_restart(_, normal, Child, State) -> + NState = state_del_child(Child, State), + {ok, NState}; +do_restart(_, shutdown, Child, State) -> + NState = state_del_child(Child, State), + {ok, NState}; +do_restart(transient, Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name), + restart(Child, State); +do_restart(temporary, Reason, Child, State) -> + report_error(child_terminated, Reason, Child, State#state.name), + NState = state_del_child(Child, State), + {ok, NState}. + +restart(Child, State) -> + case add_restart(State) of + {ok, NState} -> + restart(NState#state.strategy, Child, NState); + {terminate, NState} -> + report_error(shutdown, reached_max_restart_intensity, + Child, State#state.name), + {shutdown, remove_child(Child, NState)} + end. + +restart(Strategy, Child, State) + when Strategy =:= simple_one_for_one orelse + Strategy =:= simple_one_for_one_terminate -> + #child{mfa = {M, F, A}} = Child, + Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), + case do_start_child_i(M, F, A) of + {ok, Pid} -> + NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, + {ok, NState}; + {ok, Pid, _Extra} -> + NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, + {ok, NState}; + {error, Error} -> + report_error(start_error, Error, Child, State#state.name), + restart(Child, State) + end; +restart(one_for_one, Child, State) -> + case do_start_child(State#state.name, Child) of + {ok, Pid} -> + NState = replace_child(Child#child{pid = Pid}, State), + {ok, NState}; + {ok, Pid, _Extra} -> + NState = replace_child(Child#child{pid = Pid}, State), + {ok, NState}; + {error, Reason} -> + report_error(start_error, Reason, Child, State#state.name), + restart(Child, State) + end; +restart(rest_for_one, Child, State) -> + {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children), + ChAfter2 = terminate_children(ChAfter, State#state.name), + case start_children(ChAfter2, State#state.name) of + {ok, ChAfter3} -> + {ok, State#state{children = ChAfter3 ++ ChBefore}}; + {error, ChAfter3} -> + restart(Child, State#state{children = ChAfter3 ++ ChBefore}) + end; +restart(one_for_all, Child, State) -> + Children1 = del_child(Child#child.pid, State#state.children), + Children2 = terminate_children(Children1, State#state.name), + case start_children(Children2, State#state.name) of + {ok, NChs} -> + {ok, State#state{children = NChs}}; + {error, NChs} -> + restart(Child, State#state{children = NChs}) + end. + +%%----------------------------------------------------------------- +%% Func: terminate_children/2 +%% Args: Children = [#child] in termination order +%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} +%% Returns: NChildren = [#child] in +%% startup order (reversed termination order) +%%----------------------------------------------------------------- +terminate_children(Children, SupName) -> + terminate_children(Children, SupName, []). + +terminate_children([Child | Children], SupName, Res) -> + NChild = do_terminate(Child, SupName), + terminate_children(Children, SupName, [NChild | Res]); +terminate_children([], _SupName, Res) -> + Res. + +terminate_simple_children(Child, Dynamics, SupName) -> + dict:fold(fun (Pid, _Args, _Any) -> + do_terminate(Child#child{pid = Pid}, SupName) + end, ok, Dynamics), + ok. + +do_terminate(Child, SupName) when Child#child.pid =/= undefined -> + case shutdown(Child#child.pid, + Child#child.shutdown) of + ok -> + Child#child{pid = undefined}; + {error, OtherReason} -> + report_error(shutdown_error, OtherReason, Child, SupName), + Child#child{pid = undefined} + end; +do_terminate(Child, _SupName) -> + Child. + +%%----------------------------------------------------------------- +%% Shutdowns a child. We must check the EXIT value +%% of the child, because it might have died with another reason than +%% the wanted. In that case we want to report the error. We put a +%% monitor on the child an check for the 'DOWN' message instead of +%% checking for the 'EXIT' message, because if we check the 'EXIT' +%% message a "naughty" child, who does unlink(Sup), could hang the +%% supervisor. +%% Returns: ok | {error, OtherReason} (this should be reported) +%%----------------------------------------------------------------- +shutdown(Pid, brutal_kill) -> + + case monitor_child(Pid) of + ok -> + exit(Pid, kill), + receive + {'DOWN', _MRef, process, Pid, killed} -> + ok; + {'DOWN', _MRef, process, Pid, OtherReason} -> + {error, OtherReason} + end; + {error, Reason} -> + {error, Reason} + end; + +shutdown(Pid, Time) -> + + case monitor_child(Pid) of + ok -> + exit(Pid, shutdown), %% Try to shutdown gracefully + receive + {'DOWN', _MRef, process, Pid, shutdown} -> + ok; + {'DOWN', _MRef, process, Pid, OtherReason} -> + {error, OtherReason} + after Time -> + exit(Pid, kill), %% Force termination. + receive + {'DOWN', _MRef, process, Pid, OtherReason} -> + {error, OtherReason} + end + end; + {error, Reason} -> + {error, Reason} + end. + +%% Help function to shutdown/2 switches from link to monitor approach +monitor_child(Pid) -> + + %% Do the monitor operation first so that if the child dies + %% before the monitoring is done causing a 'DOWN'-message with + %% reason noproc, we will get the real reason in the 'EXIT'-message + %% unless a naughty child has already done unlink... + erlang:monitor(process, Pid), + unlink(Pid), + + receive + %% If the child dies before the unlik we must empty + %% the mail-box of the 'EXIT'-message and the 'DOWN'-message. + {'EXIT', Pid, Reason} -> + receive + {'DOWN', _, process, Pid, _} -> + {error, Reason} + end + after 0 -> + %% If a naughty child did unlink and the child dies before + %% monitor the result will be that shutdown/2 receives a + %% 'DOWN'-message with reason noproc. + %% If the child should die after the unlink there + %% will be a 'DOWN'-message with a correct reason + %% that will be handled in shutdown/2. + ok + end. + + +%%----------------------------------------------------------------- +%% Child/State manipulating functions. +%%----------------------------------------------------------------- +state_del_child(#child{pid = Pid}, State) when ?is_simple(State) -> + NDynamics = ?DICT:erase(Pid, State#state.dynamics), + State#state{dynamics = NDynamics}; +state_del_child(Child, State) -> + NChildren = del_child(Child#child.name, State#state.children), + State#state{children = NChildren}. + +del_child(Name, [Ch|Chs]) when Ch#child.name =:= Name -> + [Ch#child{pid = undefined} | Chs]; +del_child(Pid, [Ch|Chs]) when Ch#child.pid =:= Pid -> + [Ch#child{pid = undefined} | Chs]; +del_child(Name, [Ch|Chs]) -> + [Ch|del_child(Name, Chs)]; +del_child(_, []) -> + []. + +%% Chs = [S4, S3, Ch, S1, S0] +%% Ret: {[S4, S3, Ch], [S1, S0]} +split_child(Name, Chs) -> + split_child(Name, Chs, []). + +split_child(Name, [Ch|Chs], After) when Ch#child.name =:= Name -> + {lists:reverse([Ch#child{pid = undefined} | After]), Chs}; +split_child(Pid, [Ch|Chs], After) when Ch#child.pid =:= Pid -> + {lists:reverse([Ch#child{pid = undefined} | After]), Chs}; +split_child(Name, [Ch|Chs], After) -> + split_child(Name, Chs, [Ch | After]); +split_child(_, [], After) -> + {lists:reverse(After), []}. + +get_child(Name, State) -> + lists:keysearch(Name, #child.name, State#state.children). +replace_child(Child, State) -> + Chs = do_replace_child(Child, State#state.children), + State#state{children = Chs}. + +do_replace_child(Child, [Ch|Chs]) when Ch#child.name =:= Child#child.name -> + [Child | Chs]; +do_replace_child(Child, [Ch|Chs]) -> + [Ch|do_replace_child(Child, Chs)]. + +remove_child(Child, State) -> + Chs = lists:keydelete(Child#child.name, #child.name, State#state.children), + State#state{children = Chs}. + +%%----------------------------------------------------------------- +%% Func: init_state/4 +%% Args: SupName = {local, atom()} | {global, atom()} | self +%% Type = {Strategy, MaxIntensity, Period} +%% Strategy = one_for_one | one_for_all | simple_one_for_one | +%% rest_for_one +%% MaxIntensity = integer() +%% Period = integer() +%% Mod :== atom() +%% Arsg :== term() +%% Purpose: Check that Type is of correct type (!) +%% Returns: {ok, #state} | Error +%%----------------------------------------------------------------- +init_state(SupName, Type, Mod, Args) -> + case catch init_state1(SupName, Type, Mod, Args) of + {ok, State} -> + {ok, State}; + Error -> + Error + end. + +init_state1(SupName, {Strategy, MaxIntensity, Period}, Mod, Args) -> + validStrategy(Strategy), + validIntensity(MaxIntensity), + validPeriod(Period), + {ok, #state{name = supname(SupName,Mod), + strategy = Strategy, + intensity = MaxIntensity, + period = Period, + module = Mod, + args = Args}}; +init_state1(_SupName, Type, _, _) -> + {invalid_type, Type}. + +validStrategy(simple_one_for_one_terminate) -> true; +validStrategy(simple_one_for_one) -> true; +validStrategy(one_for_one) -> true; +validStrategy(one_for_all) -> true; +validStrategy(rest_for_one) -> true; +validStrategy(What) -> throw({invalid_strategy, What}). + +validIntensity(Max) when is_integer(Max), + Max >= 0 -> true; +validIntensity(What) -> throw({invalid_intensity, What}). + +validPeriod(Period) when is_integer(Period), + Period > 0 -> true; +validPeriod(What) -> throw({invalid_period, What}). + +supname(self,Mod) -> {self(),Mod}; +supname(N,_) -> N. + +%%% ------------------------------------------------------ +%%% Check that the children start specification is valid. +%%% Shall be a six (6) tuple +%%% {Name, Func, RestartType, Shutdown, ChildType, Modules} +%%% where Name is an atom +%%% Func is {Mod, Fun, Args} == {atom, atom, list} +%%% RestartType is permanent | temporary | transient +%%% Shutdown = integer() | infinity | brutal_kill +%%% ChildType = supervisor | worker +%%% Modules = [atom()] | dynamic +%%% Returns: {ok, [#child]} | Error +%%% ------------------------------------------------------ + +check_startspec(Children) -> check_startspec(Children, []). + +check_startspec([ChildSpec|T], Res) -> + case check_childspec(ChildSpec) of + {ok, Child} -> + case lists:keymember(Child#child.name, #child.name, Res) of + true -> {duplicate_child_name, Child#child.name}; + false -> check_startspec(T, [Child | Res]) + end; + Error -> Error + end; +check_startspec([], Res) -> + {ok, lists:reverse(Res)}. + +check_childspec({Name, Func, RestartType, Shutdown, ChildType, Mods}) -> + catch check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods); +check_childspec(X) -> {invalid_child_spec, X}. + +check_childspec(Name, Func, RestartType, Shutdown, ChildType, Mods) -> + validName(Name), + validFunc(Func), + validRestartType(RestartType), + validChildType(ChildType), + validShutdown(Shutdown, ChildType), + validMods(Mods), + {ok, #child{name = Name, mfa = Func, restart_type = RestartType, + shutdown = Shutdown, child_type = ChildType, modules = Mods}}. + +validChildType(supervisor) -> true; +validChildType(worker) -> true; +validChildType(What) -> throw({invalid_child_type, What}). + +validName(_Name) -> true. + +validFunc({M, F, A}) when is_atom(M), + is_atom(F), + is_list(A) -> true; +validFunc(Func) -> throw({invalid_mfa, Func}). + +validRestartType(permanent) -> true; +validRestartType(temporary) -> true; +validRestartType(transient) -> true; +validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}). + +validShutdown(Shutdown, _) + when is_integer(Shutdown), Shutdown > 0 -> true; +validShutdown(infinity, supervisor) -> true; +validShutdown(brutal_kill, _) -> true; +validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). + +validMods(dynamic) -> true; +validMods(Mods) when is_list(Mods) -> + lists:foreach(fun(Mod) -> + if + is_atom(Mod) -> ok; + true -> throw({invalid_module, Mod}) + end + end, + Mods); +validMods(Mods) -> throw({invalid_modules, Mods}). + +%%% ------------------------------------------------------ +%%% Add a new restart and calculate if the max restart +%%% intensity has been reached (in that case the supervisor +%%% shall terminate). +%%% All restarts accured inside the period amount of seconds +%%% are kept in the #state.restarts list. +%%% Returns: {ok, State'} | {terminate, State'} +%%% ------------------------------------------------------ + +add_restart(State) -> + I = State#state.intensity, + P = State#state.period, + R = State#state.restarts, + Now = erlang:now(), + R1 = add_restart([Now|R], Now, P), + State1 = State#state{restarts = R1}, + case length(R1) of + CurI when CurI =< I -> + {ok, State1}; + _ -> + {terminate, State1} + end. + +add_restart([R|Restarts], Now, Period) -> + case inPeriod(R, Now, Period) of + true -> + [R|add_restart(Restarts, Now, Period)]; + _ -> + [] + end; +add_restart([], _, _) -> + []. + +inPeriod(Time, Now, Period) -> + case difference(Time, Now) of + T when T > Period -> + false; + _ -> + true + end. + +%% +%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored) +%% Calculate the time elapsed in seconds between two timestamps. +%% If MegaSecs is equal just subtract Secs. +%% Else calculate the Mega difference and add the Secs difference, +%% note that Secs difference can be negative, e.g. +%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs. +%% +difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM -> + ((CurM - TimeM) * 1000000) + (CurS - TimeS); +difference({_, TimeS, _}, {_, CurS, _}) -> + CurS - TimeS. + +%%% ------------------------------------------------------ +%%% Error and progress reporting. +%%% ------------------------------------------------------ + +report_error(Error, Reason, Child, SupName) -> + ErrorMsg = [{supervisor, SupName}, + {errorContext, Error}, + {reason, Reason}, + {offender, extract_child(Child)}], + error_logger:error_report(supervisor_report, ErrorMsg). + + +extract_child(Child) -> + [{pid, Child#child.pid}, + {name, Child#child.name}, + {mfa, Child#child.mfa}, + {restart_type, Child#child.restart_type}, + {shutdown, Child#child.shutdown}, + {child_type, Child#child.child_type}]. + +report_progress(Child, SupName) -> + Progress = [{supervisor, SupName}, + {started, extract_child(Child)}], + error_logger:info_report(progress, Progress). diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 68efc27f..cc4982c9 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -75,8 +75,15 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", [inet_parse:ntoa(Address), Port, inet_parse:ntoa(PeerAddress), PeerPort]), + %% In the event that somebody floods us with connections we can spew + %% the above message at error_logger faster than it can keep up. + %% So error_logger's mailbox grows unbounded until we eat all the + %% memory available and crash. So here's a meaningless synchronous call + %% to the underlying gen_event mechanism - when it returns the mailbox + %% is drained. + gen_event:which_handlers(error_logger), %% handle - apply(M, F, A ++ [Sock]) + file_handle_cache:release_on_death(apply(M, F, A ++ [Sock])) catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -104,6 +111,7 @@ code_change(_OldVsn, State, _Extra) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). accept(State = #state{sock=LSock}) -> + ok = file_handle_cache:obtain(), case prim_inet:async_accept(LSock, -1) of {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 1ee958af..97e07545 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -40,12 +40,10 @@ %% %% 1. Allow priorities (basically, change the pending queue to a %% priority_queue). -%% -%% 2. Allow the submission to the pool_worker to be async. -behaviour(gen_server2). --export([start_link/0, submit/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,6 +54,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: + (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -80,6 +80,9 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + idle(WId) -> gen_server2:cast(?SERVER, {idle, WId}). @@ -93,7 +96,8 @@ handle_call(next_free, From, State = #state { available = Avail, pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> - {noreply, State #state { pending = queue:in(From, Pending) }, + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }, hibernate}; {{value, WId}, Avail1} -> {reply, get_worker_pid(WId), State #state { available = Avail1 }, @@ -108,11 +112,25 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, From}, Pending1} -> + {{value, {next_free, From}}, Pending1} -> gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 3bfcc2d9..57901fd5 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -33,7 +33,9 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, run/1]). +-export([start_link/1, submit/2, submit_async/2, run/1]). + +-export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,6 +46,11 @@ -spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: + (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; + ({atom(), atom(), [any()]}) -> any()). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. @@ -60,7 +67,22 @@ start_link(WId) -> submit(Pid, Fun) -> gen_server2:call(Pid, {submit, Fun}, infinity). +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + +set_maximum_since_use(Pid, Age) -> + gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). + +%%---------------------------------------------------------------------------- + init([WId]) -> + ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, + [self()]), ok = worker_pool:idle(WId), put(worker_pool_worker, true), {ok, WId, hibernate, @@ -74,6 +96,15 @@ handle_call({submit, Fun}, From, WId) -> handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_cast({set_maximum_since_use, Age}, WId) -> + ok = file_handle_cache:set_maximum_since_use(Age), + {noreply, WId, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. @@ -85,10 +116,3 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. - -%%---------------------------------------------------------------------------- - -run({M, F, A}) -> - apply(M, F, A); -run(Fun) -> - Fun(). |