summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-26 17:03:43 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-26 17:03:43 +0100
commit6f48c9017d2b2fecb95e12bafa65c1314bd5ae56 (patch)
tree2be2553934ebd6b50df75066f4fac0df63d22be6
parent721174a7885f2697d890bf4b21cb4335b481f338 (diff)
parent08d20d9324eaca270458f31ab860725bc02a1143 (diff)
downloadrabbitmq-server-6f48c9017d2b2fecb95e12bafa65c1314bd5ae56.tar.gz
Merging bug 22584 into default
-rw-r--r--Makefile13
-rw-r--r--include/rabbit.hrl15
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf362
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/rules1
-rw-r--r--packaging/macports/Portfile.in2
-rw-r--r--src/rabbit.erl28
-rw-r--r--src/rabbit_amqqueue.erl71
-rw-r--r--src/rabbit_amqqueue_process.erl61
-rw-r--r--src/rabbit_amqqueue_sup.erl5
-rw-r--r--src/rabbit_basic.erl29
-rw-r--r--src/rabbit_binary_generator.erl53
-rw-r--r--src/rabbit_channel.erl49
-rw-r--r--src/rabbit_dialyzer.erl6
-rw-r--r--src/rabbit_guid.erl14
-rw-r--r--src/rabbit_limiter.erl5
-rw-r--r--src/rabbit_misc.erl71
-rw-r--r--src/rabbit_mnesia.erl7
-rw-r--r--src/rabbit_persister.erl171
-rw-r--r--src/rabbit_router.erl10
-rw-r--r--src/rabbit_sup.erl7
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/worker_pool.erl28
-rw-r--r--src/worker_pool_worker.erl12
25 files changed, 741 insertions, 291 deletions
diff --git a/Makefile b/Makefile
index 4a90ef9f..2b08e071 100644
--- a/Makefile
+++ b/Makefile
@@ -164,7 +164,8 @@ stop-node:
COVER_DIR=.
start-cover: all
- echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
+ echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL)
+ echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
@@ -211,11 +212,11 @@ distclean: clean
# generated but empty if we fail
$(SOURCE_DIR)/%_usage.erl:
xsltproc --stringparam modulename "`basename $@ .erl`" \
- $(DOCS_DIR)/usage.xsl $< > $@.tmp && \
- sed -e s/\\\"/\\\\\\\"/g -e s/%QUOTE%/\\\"/g $@.tmp > $@.tmp2 && \
- fold -s $@.tmp2 > $@.tmp3 && \
- cp $@.tmp3 $@ && \
- rm $@.tmp $@.tmp2 $@.tmp3
+ $(DOCS_DIR)/usage.xsl $< > $@.tmp
+ sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2
+ fold -s $@.tmp2 > $@.tmp3
+ mv $@.tmp3 $@
+ rm $@.tmp $@.tmp2
# We rename the file before xmlto sees it since xmlto will use the name of
# the file to make internal links.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 38142491..a4abc1ff 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,7 +62,8 @@
-record(listener, {node, protocol, host, port}).
--record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(basic_message, {exchange_name, routing_key, content, guid,
+ is_persistent}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
@@ -83,9 +84,10 @@
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).
-type(regexp() :: binary()).
+-type(file_path() :: string()).
%% this is really an abstract type, but dialyzer does not support them
--type(guid() :: any()).
+-type(guid() :: binary()).
-type(txn() :: guid()).
-type(pkey() :: guid()).
-type(r(Kind) ::
@@ -144,7 +146,8 @@
#basic_message{exchange_name :: exchange_name(),
routing_key :: routing_key(),
content :: content(),
- persistent_key :: maybe(pkey())}).
+ guid :: guid(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
@@ -154,7 +157,7 @@
message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
--type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
+-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}).
-type(listener() ::
#listener{node :: erlang_node(),
protocol :: atom(),
@@ -166,6 +169,7 @@
#amqp_error{name :: atom(),
explanation :: string(),
method :: atom()}).
+
-endif.
%%----------------------------------------------------------------------------
@@ -175,6 +179,9 @@
-define(MAX_WAIT, 16#ffffffff).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index c318a96c..b052d889 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -10,9 +10,10 @@ Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-asroot-script-wrapper
+Source5: rabbitmq-server.ocf
URL: http://www.rabbitmq.com/
BuildArch: noarch
-BuildRequires: erlang, python-simplejson
+BuildRequires: erlang, python-simplejson, xmlto, libxslt
Requires: erlang, logrotate
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
@@ -29,6 +30,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version}
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
+%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -38,6 +40,7 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_asroot_wrapper}
+cp %{S:5} %{_rabbit_server_ocf}
make %{?_smp_mflags}
%install
@@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins
+install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
new file mode 100755
index 00000000..97c58ea2
--- /dev/null
+++ b/packaging/common/rabbitmq-server.ocf
@@ -0,0 +1,362 @@
+#!/bin/sh
+##
+## OCF Resource Agent compliant rabbitmq-server resource script.
+##
+
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2010 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2010 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+## OCF instance parameters
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
+
+#######################################################################
+# Initialization:
+
+. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+
+#######################################################################
+
+OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi"
+OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl"
+OCF_RESKEY_nodename_default="rabbit@localhost"
+OCF_RESKEY_log_base_default="/var/log/rabbitmq"
+: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}}
+: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}}
+: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}}
+: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
+
+meta_data() {
+ cat <<END
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="rabbitmq-server">
+<version>1.0</version>
+
+<longdesc lang="en">
+Resource agent for RabbitMQ-server
+</longdesc>
+
+<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc>
+
+<parameters>
+<parameter name="multi" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmq-multi script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmq-multi</shortdesc>
+<content type="string" default="${OCF_RESKEY_multi_default}" />
+</parameter>
+
+<parameter name="ctl" unique="0" required="0">
+<longdesc lang="en">
+The path to the rabbitmqctl script
+</longdesc>
+<shortdesc lang="en">Path to rabbitmqctl</shortdesc>
+<content type="string" default="${OCF_RESKEY_ctl_default}" />
+</parameter>
+
+<parameter name="nodename" unique="0" required="0">
+<longdesc lang="en">
+The node name for rabbitmq-server
+</longdesc>
+<shortdesc lang="en">Node name</shortdesc>
+<content type="string" default="${OCF_RESKEY_nodename_default}" />
+</parameter>
+
+<parameter name="ip" unique="0" required="0">
+<longdesc lang="en">
+The IP address for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Address</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="port" unique="0" required="0">
+<longdesc lang="en">
+The IP Port for rabbitmq-server to listen on
+</longdesc>
+<shortdesc lang="en">IP Port</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="cluster_config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the cluster config file
+</longdesc>
+<shortdesc lang="en">Cluster config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="config_file" unique="0" required="0">
+<longdesc lang="en">
+Location of the config file
+</longdesc>
+<shortdesc lang="en">Config file path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="log_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which logs will be created
+</longdesc>
+<shortdesc lang="en">Log base path</shortdesc>
+<content type="string" default="${OCF_RESKEY_log_base_default}" />
+</parameter>
+
+<parameter name="mnesia_base" unique="0" required="0">
+<longdesc lang="en">
+Location of the directory under which mnesia will store data
+</longdesc>
+<shortdesc lang="en">Mnesia base path</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+<parameter name="server_start_args" unique="0" required="0">
+<longdesc lang="en">
+Additional arguments provided to the server on startup
+</longdesc>
+<shortdesc lang="en">Server start arguments</shortdesc>
+<content type="string" default="" />
+</parameter>
+
+</parameters>
+
+<actions>
+<action name="start" timeout="600" />
+<action name="stop" timeout="120" />
+<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="validate-all" timeout="30" />
+<action name="meta-data" timeout="5" />
+</actions>
+</resource-agent>
+END
+}
+
+rabbit_usage() {
+ cat <<END
+usage: $0 {start|stop|monitor|validate-all|meta-data}
+
+Expects to have a fully populated OCF RA-compliant environment set.
+END
+}
+
+RABBITMQ_MULTI=$OCF_RESKEY_multi
+RABBITMQ_CTL=$OCF_RESKEY_ctl
+RABBITMQ_NODENAME=$OCF_RESKEY_nodename
+RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip
+RABBITMQ_NODE_PORT=$OCF_RESKEY_port
+RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file
+RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file
+RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base
+RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base
+RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args
+[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME"
+[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME
+
+export_vars() {
+ [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS
+ [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT
+ [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE
+ [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE
+ [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE
+ [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE
+ [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS
+}
+
+rabbit_validate_partial() {
+ if [ ! -x $RABBITMQ_MULTI ]; then
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -x $RABBITMQ_CTL ]; then
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ return $OCF_ERR_ARGS;
+ fi
+}
+
+rabbit_validate_full() {
+ if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ return $OCF_ERR_ARGS;
+ fi
+
+ rabbit_validate_partial
+
+ return $OCF_SUCCESS
+}
+
+rabbit_status() {
+ local rc
+ $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
+ rc=$?
+ case "$rc" in
+ 0)
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ return $OCF_ERR_GENERIC
+ esac
+}
+
+rabbit_start() {
+ local rc
+
+ rabbit_validate_full
+ rc=$?
+ if [ "$rc" != $OCF_SUCCESS ]; then
+ return $rc
+ fi
+
+ export_vars
+
+ $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to come up.
+ # Let the CRM/LRM time us out if required
+ start_wait=1
+ while [ $start_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_stop() {
+ local rc
+ $RABBITMQ_MULTI stop_all &
+ rc=$?
+
+ if [ "$rc" != 0 ]; then
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
+ fi
+
+ # Spin waiting for the server to shut down.
+ # Let the CRM/LRM time us out if required
+ stop_wait=1
+ while [ $stop_wait = 1 ]; do
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
+ break
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ return $OCF_ERR_GENERIC
+ fi
+ sleep 2
+ done
+
+ return $OCF_SUCCESS
+}
+
+rabbit_monitor() {
+ rabbit_status
+ return $?
+}
+
+case $__OCF_ACTION in
+ meta-data)
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
+ usage|help)
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
+esac
+
+rabbit_validate_partial || exit
+
+case $__OCF_ACTION in
+ start)
+ rabbit_start
+ ;;
+ stop)
+ rabbit_stop
+ ;;
+ monitor)
+ rabbit_monitor
+ ;;
+ validate-all)
+ exit $OCF_SUCCESS
+ ;;
+ *)
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
+esac
+
+exit $? \ No newline at end of file
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index d4e2cd17..479c3568 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
Standards-Version: 3.8.0
Package: rabbitmq-server
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 3799c438..45659602 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -21,3 +21,4 @@ install/rabbitmq-server::
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
+ install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e1f58212..153727be 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -23,7 +23,7 @@ checksums \
sha1 @sha1@ \
rmd160 @rmd160@
-depends_build port:erlang
+depends_build port:erlang port:xmlto port:libxslt
depends_run port:erlang
platform darwin 7 {
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b1204997..f2dce303 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -91,9 +91,10 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_amqqueue_sup,
- [{description, "queue supervisor"},
- {mfa, {rabbit_amqqueue, start, []}},
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_guid]}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -109,7 +110,6 @@
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_node_monitor]}},
{requires, kernel_ready},
- {requires, rabbit_amqqueue_sup},
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
@@ -125,21 +125,15 @@
{mfa, {rabbit_exchange, recover, []}},
{requires, empty_db_check}]}).
--rabbit_boot_step({queue_recovery,
- [{description, "queue recovery"},
- {mfa, {rabbit_amqqueue, recover, []}},
- {requires, exchange_recovery}]}).
+-rabbit_boot_step({queue_sup_queue_recovery,
+ [{description, "queue supervisor and queue recovery"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {requires, empty_db_check}]}).
-rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
- {requires, queue_recovery}]}).
-
--rabbit_boot_step({guid_generator,
- [{description, "guid generator"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_guid]}},
- {requires, persister},
- {enables, routing_ready}]}).
+ [{mfa, {rabbit_sup, start_child,
+ [rabbit_persister]}},
+ {requires, queue_sup_queue_recovery}]}).
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ceec00fd..f6278836 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/4, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
@@ -41,7 +41,7 @@
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
+-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -63,7 +63,6 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
@@ -92,13 +91,13 @@
-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
--spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), msg()} | 'empty').
+ {'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/8 ::
(amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
boolean(), any()) ->
@@ -118,45 +117,47 @@
%%----------------------------------------------------------------------------
start() ->
+ DurableQueues = find_durable_queues(),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
+ _RealDurableQueues = recover_durable_queues(DurableQueues),
ok.
-recover() ->
- ok = recover_durable_queues(),
- ok.
-
-recover_durable_queues() ->
+find_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end).
+
+recover_durable_queues(DurableQueues) ->
+ lists:foldl(
+ fun (RecoveredQ, Acc) ->
Q = start_queue_process(RecoveredQ),
%% We need to catch the case where a client connected to
%% another node has deleted the queue (and possibly
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
+ fun () ->
+ case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ,
+ read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
+ true -> [Q | Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
+ end, [], DurableQueues).
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -201,7 +202,7 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok.
start_queue_process(Q) ->
- {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
+ {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -287,16 +288,16 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
-commit_all(QPids, Txn) ->
+commit_all(QPids, Txn, ChPid) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
QPids).
-rollback_all(QPids, Txn) ->
+rollback_all(QPids, Txn, ChPid) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end,
QPids).
notify_down_all(QPids, ChPid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 19cb5c71..449e79ea 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,8 +36,6 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-export([start_link/1, info_keys/0]).
@@ -59,7 +57,7 @@
-record(consumer, {tag, ack_required}).
--record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
+-record(tx, {is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
-record(cr, {consumer_count,
@@ -376,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-persist_message(_Txn, _QName, #basic_message{persistent_key = none}) ->
+persist_message(_Txn, _QName, #basic_message{is_persistent = false}) ->
ok;
persist_message(Txn, QName, Message) ->
M = Message#basic_message{
@@ -384,29 +382,28 @@ persist_message(Txn, QName, Message) ->
content = rabbit_binary_parser:clear_decoded_content(
Message#basic_message.content)},
persist_work(Txn, QName,
- [{publish, M, {QName, M#basic_message.persistent_key}}]).
+ [{publish, M, {QName, M#basic_message.guid}}]).
persist_delivery(_QName, _Message,
true) ->
ok;
-persist_delivery(_QName, #basic_message{persistent_key = none},
+persist_delivery(_QName, #basic_message{is_persistent = false},
_IsDelivered) ->
ok;
-persist_delivery(QName, #basic_message{persistent_key = PKey},
+persist_delivery(QName, #basic_message{guid = Guid},
_IsDelivered) ->
- persist_work(none, QName, [{deliver, {QName, PKey}}]).
+ persist_work(none, QName, [{deliver, {QName, Guid}}]).
persist_acks(Txn, QName, Messages) ->
persist_work(Txn, QName,
- [{ack, {QName, PKey}} ||
- #basic_message{persistent_key = PKey} <- Messages,
- PKey =/= none]).
+ [{ack, {QName, Guid}} || #basic_message{
+ guid = Guid, is_persistent = true} <- Messages]).
-persist_auto_ack(_QName, #basic_message{persistent_key = none}) ->
+persist_auto_ack(_QName, #basic_message{is_persistent = false}) ->
ok;
-persist_auto_ack(QName, #basic_message{persistent_key = PKey}) ->
+persist_auto_ack(QName, #basic_message{guid = Guid}) ->
%% auto-acks are always non-transactional
- rabbit_persister:dirty_work([{ack, {QName, PKey}}]).
+ rabbit_persister:dirty_work([{ack, {QName, Guid}}]).
persist_work(_Txn,_QName, []) ->
ok;
@@ -434,8 +431,7 @@ do_if_persistent(F, Txn, QName) ->
lookup_tx(Txn) ->
case get({txn, Txn}) of
- undefined -> #tx{ch_pid = none,
- is_persistent = false,
+ undefined -> #tx{is_persistent = false,
pending_messages = [],
pending_acks = []};
V -> V
@@ -464,26 +460,19 @@ is_tx_persistent(Txn) ->
record_pending_message(Txn, ChPid, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending],
- ch_pid = ChPid}).
+ store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
record_pending_acks(Txn, ChPid, MsgIds) ->
Tx = #tx{pending_acks = Pending} = lookup_tx(Txn),
record_current_channel_tx(ChPid, Txn),
- store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending],
- ch_pid = ChPid}).
-
-process_pending(Txn, State) ->
- #tx{ch_pid = ChPid,
- pending_messages = PendingMessages,
- pending_acks = PendingAcks} = lookup_tx(Txn),
- case lookup_ch(ChPid) of
- not_found -> ok;
- C = #cr{unacked_messages = UAM} ->
- {_Acked, Remaining} =
- collect_messages(lists:append(PendingAcks), UAM),
- store_ch_record(C#cr{unacked_messages = Remaining})
- end,
+ store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}).
+
+process_pending(Txn, ChPid, State) ->
+ #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} =
+ lookup_tx(Txn),
+ C = #cr{unacked_messages = UAM} = lookup_ch(ChPid),
+ {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM),
+ store_ch_record(C#cr{unacked_messages = Remaining}),
deliver_or_enqueue_n(lists:reverse(PendingMessages), State).
collect_messages(MsgIds, UAM) ->
@@ -592,12 +581,13 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
{Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
reply(Delivered, NewState);
-handle_call({commit, Txn}, From, State) ->
+handle_call({commit, Txn, ChPid}, From, State) ->
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
gen_server2:reply(From, ok),
- NewState = process_pending(Txn, State),
+ NewState = process_pending(Txn, ChPid, State),
erase_tx(Txn),
+ record_current_channel_tx(ChPid, none),
noreply(NewState);
handle_call({notify_down, ChPid}, _From, State) ->
@@ -779,9 +769,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
noreply(State)
end;
-handle_cast({rollback, Txn}, State) ->
+handle_cast({rollback, Txn, ChPid}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
+ record_current_channel_tx(ChPid, none),
noreply(State);
handle_cast({redeliver, Messages}, State) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 0f3a8664..dbd65780 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/0, start_child/1]).
-export([init/1]).
@@ -42,6 +42,9 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_child(Args) ->
+ supervisor:start_child(?SERVER, Args).
+
init([]) ->
{ok, {{simple_one_for_one, 10, 10},
[{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9ebb6e72..4ab7a2a0 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -36,6 +36,7 @@
-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
+-export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -48,7 +49,7 @@
-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> message()).
+ binary()) -> (message() | {'error', any()})).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
@@ -57,6 +58,8 @@
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-spec(is_message_persistent/1 ::
+ (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
@@ -93,10 +96,17 @@ from_content(Content) ->
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = build_content(Properties, BodyBin),
- persistent_key = none}.
+ Content = build_content(Properties, BodyBin),
+ case is_message_persistent(Content) of
+ {invalid, Other} ->
+ {error, {invalid_delivery_mode, Other}};
+ IsPersistent when is_boolean(IsPersistent) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent}
+ end.
properties(P = #'P_basic'{}) ->
P;
@@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
properties(Properties), BodyBin))).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> {invalid, Other}
+ end.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 1d47d764..ed843735 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -95,33 +95,36 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
maybe_encode_properties(ContentProperties, none) ->
rabbit_framing:encode_properties(ContentProperties).
-build_content_frames(FragmentsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if
- FrameMax == 0 ->
- none;
- true ->
+build_content_frames(FragsRev, FrameMax, ChannelInt) ->
+ BodyPayloadMax = if FrameMax == 0 ->
+ iolist_size(FragsRev);
+ true ->
FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
end,
- build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt).
-
-build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) ->
- {SizeAcc, FragmentAcc};
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt)
- when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) ->
- <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment,
- build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev],
- BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc + size(Fragment),
- [create_frame(3, ChannelInt, Fragment) | FragmentAcc],
- FragmentsRev,
- BodyPayloadMax,
- ChannelInt).
+ build_content_frames(0, [], BodyPayloadMax, [],
+ lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
+
+build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [],
+ [], _BodyPayloadMax, _ChannelInt) ->
+ {SizeAcc, lists:reverse(FramesAcc)};
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ Frags, BodyPayloadMax, ChannelInt)
+ when FragSizeRem == 0 orelse Frags == [] ->
+ Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)),
+ FrameSize = BodyPayloadMax - FragSizeRem,
+ build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc],
+ BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt);
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ [Frag | Frags], BodyPayloadMax, ChannelInt) ->
+ Size = size(Frag),
+ {NewFragSizeRem, NewFragAcc, NewFrags} =
+ case Size =< FragSizeRem of
+ true -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
+ end,
+ build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
+ NewFrags, BodyPayloadMax, ChannelInt).
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3597fcd7..7d3cd722 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -48,9 +48,6 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking}).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(INFO_KEYS,
@@ -75,7 +72,7 @@
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
@@ -386,14 +383,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
- PersistentKey = case is_message_persistent(DecodedContent) of
- true -> rabbit_guid:guid();
- false -> none
- end,
+ IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
- persistent_key = PersistentKey},
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -933,7 +928,7 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> ok = notify_limiter(State#ch.limiter_pid,
State#ch.uncommitted_ack_q),
new_tx(State);
@@ -950,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
queue:len(UAQ),
queue:len(UAMQ)]),
case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey) of
+ TxnKey, self()) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> rabbit_misc:protocol_error(
@@ -966,14 +961,11 @@ fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
fun ({_DTag, _CTag,
{_QName, QPid, MsgId, _Redelivered, _Message}}, D) ->
- %% dict:append would be simpler and avoid the
- %% lists:reverse in handle_message({recover, true},
- %% ...). However, it is significantly slower when
- %% going beyond a few thousand elements.
- dict:update(QPid,
- fun (MsgIds) -> [MsgId | MsgIds] end,
- [MsgId],
- D)
+ %% dict:append would avoid the lists:reverse in
+ %% handle_message({recover, true}, ...). However, it
+ %% is significantly slower when going beyond a few
+ %% thousand elements.
+ rabbit_misc:dict_cons(QPid, MsgId, D)
end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
@@ -1019,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(#content{properties = #'P_basic'{
- delivery_mode = Mode}}) ->
- case Mode of
- 1 -> false;
- 2 -> true;
- undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false
+is_message_persistent(Content) ->
+ case rabbit_basic:is_message_persistent(Content) of
+ {invalid, Other} ->
+ rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false;
+ IsPersistent when is_boolean(IsPersistent) ->
+ IsPersistent
end.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 078cf620..f19e8d02 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -38,9 +38,9 @@
-ifdef(use_specs).
--spec(create_basic_plt/1 :: (string()) -> 'ok').
--spec(add_to_plt/2 :: (string(), string()) -> 'ok').
--spec(dialyze_files/2 :: (string(), string()) -> 'ok').
+-spec(create_basic_plt/1 :: (file_path()) -> 'ok').
+-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok').
-spec(halt_with_code/1 :: (atom()) -> no_return()).
-endif.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2fa531a7..1ae8f7da 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -67,7 +67,7 @@ update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
Serial = case rabbit_misc:read_term_file(Filename) of
{ok, [Num]} -> Num;
- {error, enoent} -> rabbit_persister:serial();
+ {error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
@@ -78,7 +78,7 @@ update_disk_serial() ->
end,
Serial.
-%% generate a guid that is monotonically increasing per process.
+%% generate a GUID.
%%
%% The id is only unique within a single cluster and as long as the
%% serial store hasn't been deleted.
@@ -92,20 +92,18 @@ guid() ->
%% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
- %% us a GUID that is monotonically increasing per process.
+ %% us a GUID.
G = case get(guid) of
undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
0};
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
-%% generate a readable string representation of a guid. Note that any
-%% monotonicity of the guid is not preserved in the encoding.
+%% generate a readable string representation of a GUID.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(
- erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 7d840861..878af029 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -249,10 +249,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
State#lim{queues = NewQueues}.
unlink_on_stopped(LimiterPid, stopped) ->
- true = unlink(LimiterPid),
- ok = receive {'EXIT', LimiterPid, _Reason} -> ok
- after 0 -> ok
- end,
+ ok = rabbit_misc:unlink_and_capture_exit(LimiterPid),
stopped;
unlink_on_stopped(_LimiterPid, Result) ->
Result.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81cecb38..2c180846 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -43,6 +43,7 @@
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
+-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -59,6 +60,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]).
-import(mnesia).
-import(lists).
@@ -96,9 +98,10 @@
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
+-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (string()) -> ok_or_error()).
--spec(report_cover/1 :: (string()) -> 'ok').
+-spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
+-spec(report_cover/1 :: (file_path()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -119,20 +122,27 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
--spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
--spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (string(), string()) -> ok_or_error()).
+-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()).
+-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
--spec(ceil/1 :: (number()) -> number()).
+-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
+-spec(version_compare/3 :: (string(), string(),
+ ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()).
+-spec(recursive_delete/1 :: ([file_path()]) ->
+ 'ok' | {'error', {file_path(), any()}}).
+-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
+-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-endif.
@@ -220,6 +230,10 @@ enable_cover(Root) ->
_ -> ok
end.
+start_cover(NodesS) ->
+ {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ ok.
+
report_cover() ->
report_cover(".").
@@ -601,3 +615,46 @@ version_compare(A, B) ->
ANum < BNum -> lt;
ANum > BNum -> gt
end.
+
+recursive_delete(Files) ->
+ lists:foldl(fun (Path, ok ) -> recursive_delete1(Path);
+ (_Path, {error, _Err} = Error) -> Error
+ end, ok, Files).
+
+recursive_delete1(Path) ->
+ case filelib:is_dir(Path) of
+ false -> case file:delete(Path) of
+ ok -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ true -> case file:list_dir(Path) of
+ {ok, FileNames} ->
+ case lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete1(
+ filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames) of
+ ok ->
+ case file:del_dir(Path) of
+ ok -> ok;
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ {error, _Err} = Error ->
+ Error
+ end;
+ {error, Err} ->
+ {error, {Path, Err}}
+ end
+ end.
+
+dict_cons(Key, Value, Dict) ->
+ dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
+
+unlink_and_capture_exit(Pid) ->
+ unlink(Pid),
+ receive {'EXIT', Pid, _} -> ok
+ after 0 -> ok
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6ec3cf74..55a6761d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -48,7 +48,7 @@
-ifdef(use_specs).
-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
--spec(dir/0 :: () -> string()).
+-spec(dir/0 :: () -> file_path()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
@@ -424,9 +424,8 @@ reset(Force) ->
cannot_delete_schema)
end,
ok = delete_cluster_nodes_config(),
- %% remove persistet messages and any other garbage we find
- lists:foreach(fun file:delete/1,
- filelib:wildcard(dir() ++ "/*")),
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index dd987c21..a9e0cab9 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -40,7 +40,7 @@
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
+ force_snapshot/0]).
-include("rabbit.hrl").
@@ -49,7 +49,7 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -60,26 +60,25 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
+-record(psnapshot, {transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--type(qmsg() :: {amqqueue(), pkey()}).
+-type(pmsg() :: {queue_name(), pkey()}).
-type(work_item() ::
- {publish, message(), qmsg()} |
- {deliver, qmsg()} |
- {ack, qmsg()}).
+ {publish, message(), pmsg()} |
+ {deliver, pmsg()} |
+ {ack, pmsg()}).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok').
+-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: (txn()) -> 'ok').
--spec(rollback_transaction/1 :: (txn()) -> 'ok').
+-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-endif.
@@ -112,19 +111,16 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
%%--------------------------------------------------------------------
init(_Args) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
+ Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
+ queues = ets:new(queues, []),
+ next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
@@ -139,9 +135,7 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
+ {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -149,12 +143,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
pending_replies = [],
- snapshot = NewSnapshot},
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -164,9 +158,6 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -232,8 +223,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline,
%% "tied" is met.
log_work(CreateWorkUnit, MessageList,
State = #pstate{
- snapshot = Snapshot = #psnapshot{
- messages = Messages}}) ->
+ snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
fun(M = {publish, Message, QK = {_QName, PKey}}) ->
@@ -343,20 +333,20 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
+current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
+ fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
+ InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
@@ -380,14 +370,14 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
+ {{txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
- serial = Serial,
- transactions = Ts}),
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
@@ -406,7 +396,10 @@ check_version(_Other) ->
requeue_messages(Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
- Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues),
+ Work = ets:foldl(
+ fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
+ rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
+ end, dict:new(), Queues),
%% unstable parallel map, because order doesn't matter
L = lists:append(
rabbit_misc:upmap(
@@ -416,8 +409,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
fun ({QName, Requeues}) ->
requeue(QName, Requeues, Messages)
end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
+ NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
+ NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
ets:delete_all_objects(Messages),
ets:delete_all_objects(Queues),
true = ets:insert(Messages, NewMessages),
@@ -425,19 +418,12 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
%% contains the mutated messages and queues tables
Snapshot.
-accumulate_requeues({{QName, PKey}, Delivered}, Acc) ->
- Requeue = {PKey, Delivered},
- dict:update(QName,
- fun (Requeues) -> [Requeue | Requeues] end,
- [Requeue],
- Acc).
-
requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
+ [{SeqId, QName, PKey, Message, Delivered} ||
+ {SeqId, PKey, Delivered} <- Requeues,
{_, Message} <- ets:lookup(Messages, PKey)],
rabbit_amqqueue:redeliver(
QPid,
@@ -447,7 +433,7 @@ requeue(QName, Requeues, Messages) ->
%% per-channel basis, and channels are bound to specific
%% processes, sorting the list does provide the correct
%% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
+ [{Message, Delivered} || {_, _, _, Message, Delivered} <-
lists:sort(RequeueMessages)]),
RequeueMessages;
{error, not_found} ->
@@ -474,50 +460,55 @@ internal_integrate_messages(Items, Snapshot) ->
internal_integrate1({extend_transaction, Key, MessageList},
Snapshot = #psnapshot {transactions = Transactions}) ->
- NewTransactions =
- dict:update(Key,
- fun (MessageLists) -> [MessageList | MessageLists] end,
- [MessageList],
- Transactions),
- Snapshot#psnapshot{transactions = NewTransactions};
+ Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList,
+ Transactions)};
internal_integrate1({rollback_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions}) ->
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 884ea4ab..a449e19e 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -76,13 +76,9 @@ deliver(QPids, Delivery) ->
%% which then in turn delivers it to its queues.
deliver_per_node(
dict:to_list(
- lists:foldl(
- fun (QPid, D) ->
- dict:update(node(QPid),
- fun (QPids1) -> [QPid | QPids1] end,
- [QPid], D)
- end,
- dict:new(), QPids)),
+ lists:foldl(fun (QPid, D) ->
+ rabbit_misc:dict_cons(node(QPid), QPid, D)
+ end, dict:new(), QPids)),
Delivery).
deliver_per_node([{Node, QPids}], Delivery) when Node == node() ->
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 25715e6e..2c5e5112 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor).
--export([start_link/0, start_child/1, start_child/2,
+-export([start_link/0, start_child/1, start_child/2, start_child/3,
start_restartable_child/1, start_restartable_child/2]).
-export([init/1]).
@@ -49,8 +49,11 @@ start_child(Mod) ->
start_child(Mod, []).
start_child(Mod, Args) ->
+ start_child(Mod, Mod, Args).
+
+start_child(ChildId, Mod, Args) ->
{ok, _} = supervisor:start_child(?SERVER,
- {Mod, {Mod, start_link, Args},
+ {ChildId, {Mod, start_link, Args},
transient, ?MAX_WAIT, worker, [Mod]}),
ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 82f2d199..d645d183 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -625,8 +625,12 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
%% NB: this will log an inconsistent_database error, which is harmless
+ %% Turning cover on / off is OK even if we're not in general using cover,
+ %% it just turns the engine on / off, doesn't actually log anything.
+ cover:stop([SecondaryNode]),
true = disconnect_node(SecondaryNode),
pong = net_adm:ping(SecondaryNode),
+ cover:start([SecondaryNode]),
%% leaving a cluster as a ram node
ok = control_action(reset, []),
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 1ee958af..97e07545 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -40,12 +40,10 @@
%%
%% 1. Allow priorities (basically, change the pending queue to a
%% priority_queue).
-%%
-%% 2. Allow the submission to the pool_worker to be async.
-behaviour(gen_server2).
--export([start_link/0, submit/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -56,6 +54,8 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/1 ::
+ (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
-endif.
@@ -80,6 +80,9 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
+submit_async(Fun) ->
+ gen_server2:cast(?SERVER, {run_async, Fun}).
+
idle(WId) ->
gen_server2:cast(?SERVER, {idle, WId}).
@@ -93,7 +96,8 @@ handle_call(next_free, From, State = #state { available = Avail,
pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
- {noreply, State #state { pending = queue:in(From, Pending) },
+ {noreply,
+ State #state { pending = queue:in({next_free, From}, Pending) },
hibernate};
{{value, WId}, Avail1} ->
{reply, get_worker_pid(WId), State #state { available = Avail1 },
@@ -108,11 +112,25 @@ handle_cast({idle, WId}, State = #state { available = Avail,
{noreply, case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = queue:in(WId, Avail) };
- {{value, From}, Pending1} ->
+ {{value, {next_free, From}}, Pending1} ->
gen_server2:reply(From, get_worker_pid(WId)),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = Avail,
+ pending = Pending }) ->
+ {noreply,
+ case queue:out(Avail) of
+ {empty, _Avail} ->
+ State #state { pending = queue:in({run_async, Fun}, Pending)};
+ {{value, WId}, Avail1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ State #state { available = Avail1 }
+ end, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 3bfcc2d9..d3a48119 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/1, submit/2, run/1]).
+-export([start_link/1, submit/2, submit_async/2, run/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -44,6 +44,8 @@
-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
+-spec(submit_async/2 ::
+ (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
-endif.
@@ -60,6 +62,9 @@ start_link(WId) ->
submit(Pid, Fun) ->
gen_server2:call(Pid, {submit, Fun}, infinity).
+submit_async(Pid, Fun) ->
+ gen_server2:cast(Pid, {submit_async, Fun}).
+
init([WId]) ->
ok = worker_pool:idle(WId),
put(worker_pool_worker, true),
@@ -74,6 +79,11 @@ handle_call({submit, Fun}, From, WId) ->
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
+handle_cast({submit_async, Fun}, WId) ->
+ run(Fun),
+ ok = worker_pool:idle(WId),
+ {noreply, WId, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.