diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-04-26 17:03:43 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-04-26 17:03:43 +0100 |
commit | 6f48c9017d2b2fecb95e12bafa65c1314bd5ae56 (patch) | |
tree | 2be2553934ebd6b50df75066f4fac0df63d22be6 | |
parent | 721174a7885f2697d890bf4b21cb4335b481f338 (diff) | |
parent | 08d20d9324eaca270458f31ab860725bc02a1143 (diff) | |
download | rabbitmq-server-6f48c9017d2b2fecb95e12bafa65c1314bd5ae56.tar.gz |
Merging bug 22584 into default
-rw-r--r-- | Makefile | 13 | ||||
-rw-r--r-- | include/rabbit.hrl | 15 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 6 | ||||
-rwxr-xr-x | packaging/common/rabbitmq-server.ocf | 362 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/control | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rules | 1 | ||||
-rw-r--r-- | packaging/macports/Portfile.in | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 28 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 71 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 | ||||
-rw-r--r-- | src/rabbit_amqqueue_sup.erl | 5 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 29 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 53 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 49 | ||||
-rw-r--r-- | src/rabbit_dialyzer.erl | 6 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 14 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 5 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 71 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 7 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 171 | ||||
-rw-r--r-- | src/rabbit_router.erl | 10 | ||||
-rw-r--r-- | src/rabbit_sup.erl | 7 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
-rw-r--r-- | src/worker_pool.erl | 28 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 12 |
25 files changed, 741 insertions, 291 deletions
@@ -164,7 +164,8 @@ stop-node: COVER_DIR=. start-cover: all - echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) + echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL) + echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) @@ -211,11 +212,11 @@ distclean: clean # generated but empty if we fail $(SOURCE_DIR)/%_usage.erl: xsltproc --stringparam modulename "`basename $@ .erl`" \ - $(DOCS_DIR)/usage.xsl $< > $@.tmp && \ - sed -e s/\\\"/\\\\\\\"/g -e s/%QUOTE%/\\\"/g $@.tmp > $@.tmp2 && \ - fold -s $@.tmp2 > $@.tmp3 && \ - cp $@.tmp3 $@ && \ - rm $@.tmp $@.tmp2 $@.tmp3 + $(DOCS_DIR)/usage.xsl $< > $@.tmp + sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2 + fold -s $@.tmp2 > $@.tmp3 + mv $@.tmp3 $@ + rm $@.tmp $@.tmp2 # We rename the file before xmlto sees it since xmlto will use the name of # the file to make internal links. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 38142491..a4abc1ff 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,8 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, + is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -83,9 +84,10 @@ -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). -type(regexp() :: binary()). +-type(file_path() :: string()). %% this is really an abstract type, but dialyzer does not support them --type(guid() :: any()). +-type(guid() :: binary()). -type(txn() :: guid()). -type(pkey() :: guid()). -type(r(Kind) :: @@ -144,7 +146,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), @@ -154,7 +157,7 @@ message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(listener() :: #listener{node :: erlang_node(), protocol :: atom(), @@ -166,6 +169,7 @@ #amqp_error{name :: atom(), explanation :: string(), method :: atom()}). + -endif. %%---------------------------------------------------------------------------- @@ -175,6 +179,9 @@ -define(MAX_WAIT, 16#ffffffff). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c318a96c..b052d889 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -10,9 +10,10 @@ Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate Source4: rabbitmq-asroot-script-wrapper +Source5: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ BuildArch: noarch -BuildRequires: erlang, python-simplejson +BuildRequires: erlang, python-simplejson, xmlto, libxslt Requires: erlang, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root Summary: The RabbitMQ server @@ -29,6 +30,7 @@ scalable implementation of an AMQP broker. %define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version} %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` +%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -38,6 +40,7 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} cp %{S:4} %{_rabbit_asroot_wrapper} +cp %{S:5} %{_rabbit_server_ocf} make %{?_smp_mflags} %install @@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins +install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf new file mode 100755 index 00000000..97c58ea2 --- /dev/null +++ b/packaging/common/rabbitmq-server.ocf @@ -0,0 +1,362 @@ +#!/bin/sh +## +## OCF Resource Agent compliant rabbitmq-server resource script. +## + +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2010 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2010 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +## OCF instance parameters +## OCF_RESKEY_multi +## OCF_RESKEY_ctl +## OCF_RESKEY_nodename +## OCF_RESKEY_ip +## OCF_RESKEY_port +## OCF_RESKEY_cluster_config_file +## OCF_RESKEY_config_file +## OCF_RESKEY_log_base +## OCF_RESKEY_mnesia_base +## OCF_RESKEY_server_start_args + +####################################################################### +# Initialization: + +. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs + +####################################################################### + +OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi" +OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" +OCF_RESKEY_nodename_default="rabbit@localhost" +OCF_RESKEY_log_base_default="/var/log/rabbitmq" +: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}} +: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} +: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} +: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} + +meta_data() { + cat <<END +<?xml version="1.0"?> +<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd"> +<resource-agent name="rabbitmq-server"> +<version>1.0</version> + +<longdesc lang="en"> +Resource agent for RabbitMQ-server +</longdesc> + +<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc> + +<parameters> +<parameter name="multi" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmq-multi script +</longdesc> +<shortdesc lang="en">Path to rabbitmq-multi</shortdesc> +<content type="string" default="${OCF_RESKEY_multi_default}" /> +</parameter> + +<parameter name="ctl" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmqctl script +</longdesc> +<shortdesc lang="en">Path to rabbitmqctl</shortdesc> +<content type="string" default="${OCF_RESKEY_ctl_default}" /> +</parameter> + +<parameter name="nodename" unique="0" required="0"> +<longdesc lang="en"> +The node name for rabbitmq-server +</longdesc> +<shortdesc lang="en">Node name</shortdesc> +<content type="string" default="${OCF_RESKEY_nodename_default}" /> +</parameter> + +<parameter name="ip" unique="0" required="0"> +<longdesc lang="en"> +The IP address for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Address</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="port" unique="0" required="0"> +<longdesc lang="en"> +The IP Port for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Port</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="cluster_config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the cluster config file +</longdesc> +<shortdesc lang="en">Cluster config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the config file +</longdesc> +<shortdesc lang="en">Config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="log_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which logs will be created +</longdesc> +<shortdesc lang="en">Log base path</shortdesc> +<content type="string" default="${OCF_RESKEY_log_base_default}" /> +</parameter> + +<parameter name="mnesia_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which mnesia will store data +</longdesc> +<shortdesc lang="en">Mnesia base path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="server_start_args" unique="0" required="0"> +<longdesc lang="en"> +Additional arguments provided to the server on startup +</longdesc> +<shortdesc lang="en">Server start arguments</shortdesc> +<content type="string" default="" /> +</parameter> + +</parameters> + +<actions> +<action name="start" timeout="600" /> +<action name="stop" timeout="120" /> +<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" /> +<action name="validate-all" timeout="30" /> +<action name="meta-data" timeout="5" /> +</actions> +</resource-agent> +END +} + +rabbit_usage() { + cat <<END +usage: $0 {start|stop|monitor|validate-all|meta-data} + +Expects to have a fully populated OCF RA-compliant environment set. +END +} + +RABBITMQ_MULTI=$OCF_RESKEY_multi +RABBITMQ_CTL=$OCF_RESKEY_ctl +RABBITMQ_NODENAME=$OCF_RESKEY_nodename +RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip +RABBITMQ_NODE_PORT=$OCF_RESKEY_port +RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file +RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file +RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base +RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base +RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args +[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME" +[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME + +export_vars() { + [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS + [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT + [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE + [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE + [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE + [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE + [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS +} + +rabbit_validate_partial() { + if [ ! -x $RABBITMQ_MULTI ]; then + ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi + + if [ ! -x $RABBITMQ_CTL ]; then + ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi +} + +rabbit_validate_full() { + if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then + ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then + ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + rabbit_validate_partial + + return $OCF_SUCCESS +} + +rabbit_status() { + local rc + $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null + rc=$? + case "$rc" in + 0) + return $OCF_SUCCESS + ;; + 2) + return $OCF_NOT_RUNNING + ;; + *) + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + return $OCF_ERR_GENERIC + esac +} + +rabbit_start() { + local rc + + rabbit_validate_full + rc=$? + if [ "$rc" != $OCF_SUCCESS ]; then + return $rc + fi + + export_vars + + $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" + return $rc + fi + + # Spin waiting for the server to come up. + # Let the CRM/LRM time us out if required + start_wait=1 + while [ $start_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_SUCCESS ]; then + start_wait=0 + + elif [ "$rc" != $OCF_NOT_RUNNING ]; then + ocf_log info "rabbitmq-server start failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_stop() { + local rc + $RABBITMQ_MULTI stop_all & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + return $rc + fi + + # Spin waiting for the server to shut down. + # Let the CRM/LRM time us out if required + stop_wait=1 + while [ $stop_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_NOT_RUNNING ]; then + stop_wait=0 + break + elif [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server stop failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_monitor() { + rabbit_status + return $? +} + +case $__OCF_ACTION in + meta-data) + meta_data + exit $OCF_SUCCESS + ;; + usage|help) + rabbit_usage + exit $OCF_SUCCESS + ;; +esac + +rabbit_validate_partial || exit + +case $__OCF_ACTION in + start) + rabbit_start + ;; + stop) + rabbit_stop + ;; + monitor) + rabbit_monitor + ;; + validate-all) + exit $OCF_SUCCESS + ;; + *) + rabbit_usage + exit $OCF_ERR_UNIMPLEMENTED + ;; +esac + +exit $?
\ No newline at end of file diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index d4e2cd17..479c3568 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson +Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc Standards-Version: 3.8.0 Package: rabbitmq-server diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 3799c438..45659602 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -21,3 +21,4 @@ install/rabbitmq-server:: install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm + install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index e1f58212..153727be 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -23,7 +23,7 @@ checksums \ sha1 @sha1@ \ rmd160 @rmd160@ -depends_build port:erlang +depends_build port:erlang port:xmlto port:libxslt depends_run port:erlang platform darwin 7 { diff --git a/src/rabbit.erl b/src/rabbit.erl index b1204997..f2dce303 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -91,9 +91,10 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_amqqueue_sup, - [{description, "queue supervisor"}, - {mfa, {rabbit_amqqueue, start, []}}, +-rabbit_boot_step({guid_generator, + [{description, "guid generator"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_guid]}}, {requires, kernel_ready}, {enables, core_initialized}]}). @@ -109,7 +110,6 @@ {mfa, {rabbit_sup, start_restartable_child, [rabbit_node_monitor]}}, {requires, kernel_ready}, - {requires, rabbit_amqqueue_sup}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, @@ -125,21 +125,15 @@ {mfa, {rabbit_exchange, recover, []}}, {requires, empty_db_check}]}). --rabbit_boot_step({queue_recovery, - [{description, "queue recovery"}, - {mfa, {rabbit_amqqueue, recover, []}}, - {requires, exchange_recovery}]}). +-rabbit_boot_step({queue_sup_queue_recovery, + [{description, "queue supervisor and queue recovery"}, + {mfa, {rabbit_amqqueue, start, []}}, + {requires, empty_db_check}]}). -rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {requires, queue_recovery}]}). - --rabbit_boot_step({guid_generator, - [{description, "guid generator"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_guid]}}, - {requires, persister}, - {enables, routing_ready}]}). + [{mfa, {rabbit_sup, start_child, + [rabbit_persister]}}, + {requires, queue_sup_queue_recovery}]}). -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}]}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ceec00fd..f6278836 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -41,7 +41,7 @@ -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -63,7 +63,6 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -92,13 +91,13 @@ -spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). --spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). + {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/8 :: (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> @@ -118,45 +117,47 @@ %%---------------------------------------------------------------------------- start() -> + DurableQueues = find_durable_queues(), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + _RealDurableQueues = recover_durable_queues(DurableQueues), ok. -recover() -> - ok = recover_durable_queues(), - ok. - -recover_durable_queues() -> +find_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end). + +recover_durable_queues(DurableQueues) -> + lists:foldl( + fun (RecoveredQ, Acc) -> Q = start_queue_process(RecoveredQ), %% We need to catch the case where a client connected to %% another node has deleted the queue (and possibly %% re-created it). case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end + fun () -> + case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, + read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) + true -> [Q | Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + end, [], DurableQueues). declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -201,7 +202,7 @@ store_queue(Q = #amqqueue{durable = false}) -> ok. start_queue_process(Q) -> - {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), + {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -287,16 +288,16 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). -commit_all(QPids, Txn) -> +commit_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). -rollback_all(QPids, Txn) -> +rollback_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, QPids). notify_down_all(QPids, ChPid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 19cb5c71..449e79ea 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -export([start_link/1, info_keys/0]). @@ -59,7 +57,7 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(tx, {is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumer_count, @@ -376,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -384,29 +382,28 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). persist_delivery(_QName, _Message, true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, _IsDelivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = Guid}, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, Guid}}]). persist_acks(Txn, QName, Messages) -> persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, Guid}} || #basic_message{ + guid = Guid, is_persistent = true} <- Messages]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{guid = Guid}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, Guid}}]). persist_work(_Txn,_QName, []) -> ok; @@ -434,8 +431,7 @@ do_if_persistent(F, Txn, QName) -> lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx{ch_pid = none, - is_persistent = false, + undefined -> #tx{is_persistent = false, pending_messages = [], pending_acks = []}; V -> V @@ -464,26 +460,19 @@ is_tx_persistent(Txn) -> record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). - -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}). + +process_pending(Txn, ChPid, State) -> + #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} = + lookup_tx(Txn), + C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), + {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), deliver_or_enqueue_n(lists:reverse(PendingMessages), State). collect_messages(MsgIds, UAM) -> @@ -592,12 +581,13 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({commit, Txn}, From, State) -> +handle_call({commit, Txn, ChPid}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), + NewState = process_pending(Txn, ChPid, State), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(NewState); handle_call({notify_down, ChPid}, _From, State) -> @@ -779,9 +769,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> noreply(State) end; -handle_cast({rollback, Txn}, State) -> +handle_cast({rollback, Txn, ChPid}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(State); handle_cast({redeliver, Messages}, State) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0f3a8664..dbd65780 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_child/1]). -export([init/1]). @@ -42,6 +42,9 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_child(Args) -> + supervisor:start_child(?SERVER, Args). + init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72..4ab7a2a0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -36,6 +36,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,7 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -57,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -93,10 +96,17 @@ from_content(Content) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - persistent_key = none}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 1d47d764..ed843735 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -95,33 +95,36 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin) maybe_encode_properties(ContentProperties, none) -> rabbit_framing:encode_properties(ContentProperties). -build_content_frames(FragmentsRev, FrameMax, ChannelInt) -> - BodyPayloadMax = if - FrameMax == 0 -> - none; - true -> +build_content_frames(FragsRev, FrameMax, ChannelInt) -> + BodyPayloadMax = if FrameMax == 0 -> + iolist_size(FragsRev); + true -> FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE end, - build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt). - -build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) -> - {SizeAcc, FragmentAcc}; -build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], - BodyPayloadMax, ChannelInt) - when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) -> - <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment, - build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev], - BodyPayloadMax, ChannelInt); -build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev], - BodyPayloadMax, ChannelInt) -> - build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt); -build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], - BodyPayloadMax, ChannelInt) -> - build_content_frames(SizeAcc + size(Fragment), - [create_frame(3, ChannelInt, Fragment) | FragmentAcc], - FragmentsRev, - BodyPayloadMax, - ChannelInt). + build_content_frames(0, [], BodyPayloadMax, [], + lists:reverse(FragsRev), BodyPayloadMax, ChannelInt). + +build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [], + [], _BodyPayloadMax, _ChannelInt) -> + {SizeAcc, lists:reverse(FramesAcc)}; +build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, + Frags, BodyPayloadMax, ChannelInt) + when FragSizeRem == 0 orelse Frags == [] -> + Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)), + FrameSize = BodyPayloadMax - FragSizeRem, + build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc], + BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt); +build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc, + [Frag | Frags], BodyPayloadMax, ChannelInt) -> + Size = size(Frag), + {NewFragSizeRem, NewFragAcc, NewFrags} = + case Size =< FragSizeRem of + true -> {FragSizeRem - Size, [Frag | FragAcc], Frags}; + false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag, + {0, [Head | FragAcc], [Tail | Frags]} + end, + build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc, + NewFrags, BodyPayloadMax, ChannelInt). build_heartbeat_frame() -> create_frame(?FRAME_HEARTBEAT, 0, <<>>). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3597fcd7..7d3cd722 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,9 +48,6 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking}). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(INFO_KEYS, @@ -75,7 +72,7 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). @@ -386,14 +383,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, + IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -933,7 +928,7 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); @@ -950,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey, queue:len(UAQ), queue:len(UAMQ)]), case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> rabbit_misc:protocol_error( @@ -966,14 +961,11 @@ fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> - %% dict:append would be simpler and avoid the - %% lists:reverse in handle_message({recover, true}, - %% ...). However, it is significantly slower when - %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + %% dict:append would avoid the lists:reverse in + %% handle_message({recover, true}, ...). However, it + %% is significantly slower when going beyond a few + %% thousand elements. + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -1019,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 078cf620..f19e8d02 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -38,9 +38,9 @@ -ifdef(use_specs). --spec(create_basic_plt/1 :: (string()) -> 'ok'). --spec(add_to_plt/2 :: (string(), string()) -> 'ok'). --spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file_path()) -> 'ok'). +-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2fa531a7..1ae8f7da 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -67,7 +67,7 @@ update_disk_serial() -> Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), Serial = case rabbit_misc:read_term_file(Filename) of {ok, [Num]} -> Num; - {error, enoent} -> rabbit_persister:serial(); + {error, enoent} -> 0; {error, Reason} -> throw({error, {cannot_read_serial_file, Filename, Reason}}) end, @@ -78,7 +78,7 @@ update_disk_serial() -> end, Serial. -%% generate a guid that is monotonically increasing per process. +%% generate a GUID. %% %% The id is only unique within a single cluster and as long as the %% serial store hasn't been deleted. @@ -92,20 +92,18 @@ guid() -> %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give - %% us a GUID that is monotonically increasing per process. + %% us a GUID. G = case get(guid) of undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, 0}; {S, I} -> {S, I+1} end, put(guid, G), - G. + erlang:md5(term_to_binary(G)). -%% generate a readable string representation of a guid. Note that any -%% monotonicity of the guid is not preserved in the encoding. +%% generate a readable string representation of a GUID. string_guid(Prefix) -> - Prefix ++ "-" ++ base64:encode_to_string( - erlang:md5(term_to_binary(guid()))). + Prefix ++ "-" ++ base64:encode_to_string(guid()). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 7d840861..878af029 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -249,10 +249,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> State#lim{queues = NewQueues}. unlink_on_stopped(LimiterPid, stopped) -> - true = unlink(LimiterPid), - ok = receive {'EXIT', LimiterPid, _Reason} -> ok - after 0 -> ok - end, + ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), stopped; unlink_on_stopped(_LimiterPid, Result) -> Result. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81cecb38..2c180846 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -43,6 +43,7 @@ -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). +-export([start_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -59,6 +60,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). -import(mnesia). -import(lists). @@ -96,9 +98,10 @@ undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). +-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> ok_or_error()). --spec(report_cover/1 :: (string()) -> 'ok'). +-spec(enable_cover/1 :: (file_path()) -> ok_or_error()). +-spec(report_cover/1 :: (file_path()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -119,20 +122,27 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). --spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (string(), string()) -> ok_or_error()). +-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). +-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(recursive_delete/1 :: ([file_path()]) -> + 'ok' | {'error', {file_path(), any()}}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -220,6 +230,10 @@ enable_cover(Root) -> _ -> ok end. +start_cover(NodesS) -> + {ok, _} = cover:start([makenode(N) || N <- NodesS]), + ok. + report_cover() -> report_cover("."). @@ -601,3 +615,46 @@ version_compare(A, B) -> ANum < BNum -> lt; ANum > BNum -> gt end. + +recursive_delete(Files) -> + lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). + +recursive_delete1(Path) -> + case filelib:is_dir(Path) of + false -> case file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +unlink_and_capture_exit(Pid) -> + unlink(Pid), + receive {'EXIT', Pid, _} -> ok + after 0 -> ok + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6ec3cf74..55a6761d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -48,7 +48,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> string()). +-spec(dir/0 :: () -> file_path()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). @@ -424,9 +424,8 @@ reset(Force) -> cannot_delete_schema) end, ok = delete_cluster_nodes_config(), - %% remove persistet messages and any other garbage we find - lists:foreach(fun file:delete/1, - filelib:wildcard(dir() ++ "/*")), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index dd987c21..a9e0cab9 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -40,7 +40,7 @@ -export([transaction/1, extend_transaction/2, dirty_work/1, commit_transaction/1, rollback_transaction/1, - force_snapshot/0, serial/0]). + force_snapshot/0]). -include("rabbit.hrl"). @@ -49,7 +49,7 @@ -define(LOG_BUNDLE_DELAY, 5). -define(COMPLETE_BUNDLE_DELAY, 2). --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -60,26 +60,25 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). +-record(psnapshot, {transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --type(qmsg() :: {amqqueue(), pkey()}). +-type(pmsg() :: {queue_name(), pkey()}). -type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: (txn()) -> 'ok'). --spec(rollback_transaction/1 :: (txn()) -> 'ok'). +-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). --spec(serial/0 :: () -> non_neg_integer()). -endif. @@ -112,19 +111,16 @@ rollback_transaction(TxnKey) -> force_snapshot() -> gen_server:call(?SERVER, force_snapshot, infinity). -serial() -> - gen_server:call(?SERVER, serial, infinity). - %%-------------------------------------------------------------------- init(_Args) -> process_flag(trap_exit, true), FileName = base_filename(), ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{serial = 0, - transactions = dict:new(), + Snapshot = #psnapshot{transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, [])}, + queues = ets:new(queues, []), + next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, {head, current_snapshot(Snapshot)}, @@ -139,9 +135,7 @@ init(_Args) -> [Recovered, Bad]), LH end, - {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), - NewSnapshot = LoadedSnapshot#psnapshot{ - serial = LoadedSnapshot#psnapshot.serial + 1}, + {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot), case Res of ok -> ok = take_snapshot(LogHandle, NewSnapshot); @@ -149,12 +143,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], pending_replies = [], - snapshot = NewSnapshot}, + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -164,9 +158,6 @@ handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); -handle_call(serial, _From, - State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> - do_reply(Serial, State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -232,8 +223,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline, %% "tied" is met. log_work(CreateWorkUnit, MessageList, State = #pstate{ - snapshot = Snapshot = #psnapshot{ - messages = Messages}}) -> + snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( fun(M = {publish, Message, QK = {_QName, PKey}}) -> @@ -343,20 +333,20 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> +current_snapshot(_Snapshot = #psnapshot{transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> + fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), - InnerSnapshot = {{serial, Serial}, - {txns, Ts}, + InnerSnapshot = {{txns, Ts}, {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. @@ -380,14 +370,14 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), + {{txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ - serial = Serial, - transactions = Ts}), + transactions = Ts, + next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so @@ -406,7 +396,10 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), + Work = ets:foldl( + fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> + rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) + end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( rabbit_misc:upmap( @@ -416,8 +409,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, fun ({QName, Requeues}) -> requeue(QName, Requeues, Messages) end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], + NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], + NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], ets:delete_all_objects(Messages), ets:delete_all_objects(Queues), true = ets:insert(Messages, NewMessages), @@ -425,19 +418,12 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, %% contains the mutated messages and queues tables Snapshot. -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + [{SeqId, QName, PKey, Message, Delivered} || + {SeqId, PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -447,7 +433,7 @@ requeue(QName, Requeues, Messages) -> %% per-channel basis, and channels are bound to specific %% processes, sorting the list does provide the correct %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- + [{Message, Delivered} || {_, _, _, Message, Delivered} <- lists:sort(RequeueMessages)]), RequeueMessages; {error, not_found} -> @@ -474,50 +460,55 @@ internal_integrate_messages(Items, Snapshot) -> internal_integrate1({extend_transaction, Key, MessageList}, Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; internal_integrate1({rollback_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions}) -> Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; internal_integrate1({commit_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> case dict:find(Key, Transactions) of {ok, MessageLists} -> ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; error -> Snapshot end; internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 884ea4ab..a449e19e 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -76,13 +76,9 @@ deliver(QPids, Delivery) -> %% which then in turn delivers it to its queues. deliver_per_node( dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), + lists:foldl(fun (QPid, D) -> + rabbit_misc:dict_cons(node(QPid), QPid, D) + end, dict:new(), QPids)), Delivery). deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 25715e6e..2c5e5112 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2, +-export([start_link/0, start_child/1, start_child/2, start_child/3, start_restartable_child/1, start_restartable_child/2]). -export([init/1]). @@ -49,8 +49,11 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, + {ChildId, {Mod, start_link, Args}, transient, ?MAX_WAIT, worker, [Mod]}), ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 82f2d199..d645d183 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -625,8 +625,12 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), %% NB: this will log an inconsistent_database error, which is harmless + %% Turning cover on / off is OK even if we're not in general using cover, + %% it just turns the engine on / off, doesn't actually log anything. + cover:stop([SecondaryNode]), true = disconnect_node(SecondaryNode), pong = net_adm:ping(SecondaryNode), + cover:start([SecondaryNode]), %% leaving a cluster as a ram node ok = control_action(reset, []), diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 1ee958af..97e07545 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -40,12 +40,10 @@ %% %% 1. Allow priorities (basically, change the pending queue to a %% priority_queue). -%% -%% 2. Allow the submission to the pool_worker to be async. -behaviour(gen_server2). --export([start_link/0, submit/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,6 +54,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: + (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -80,6 +80,9 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + idle(WId) -> gen_server2:cast(?SERVER, {idle, WId}). @@ -93,7 +96,8 @@ handle_call(next_free, From, State = #state { available = Avail, pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> - {noreply, State #state { pending = queue:in(From, Pending) }, + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }, hibernate}; {{value, WId}, Avail1} -> {reply, get_worker_pid(WId), State #state { available = Avail1 }, @@ -108,11 +112,25 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, From}, Pending1} -> + {{value, {next_free, From}}, Pending1} -> gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 3bfcc2d9..d3a48119 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, run/1]). +-export([start_link/1, submit/2, submit_async/2, run/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,6 +44,8 @@ -spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: + (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -60,6 +62,9 @@ start_link(WId) -> submit(Pid, Fun) -> gen_server2:call(Pid, {submit, Fun}, infinity). +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + init([WId]) -> ok = worker_pool:idle(WId), put(worker_pool_worker, true), @@ -74,6 +79,11 @@ handle_call({submit, Fun}, From, WId) -> handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. |