summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-07-01 23:50:58 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-07-01 23:50:58 +0100
commitc79513aa24b7058eb8a4d4c228dc34aa085aaed4 (patch)
treed3b64592b4a5418adb1d63fb6d355577a2c6f9c7
parent63a6a40f5beff915b3e8d0512f04bfbfded1ea2c (diff)
parent1791a50cfb0a9db025b74c701744e0dd45c91ef7 (diff)
downloadrabbitmq-server-c79513aa24b7058eb8a4d4c228dc34aa085aaed4.tar.gz
merge bug24884 into default (no-op)
-rw-r--r--docs/rabbitmqctl.1.xml106
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--include/rabbit.hrl6
-rw-r--r--packaging/RPMS/Fedora/Makefile3
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.init (renamed from packaging/common/rabbitmq-server.init)2
-rw-r--r--packaging/common/rabbitmq-script-wrapper4
-rw-r--r--packaging/debs/Debian/Makefile9
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init187
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server34
-rwxr-xr-xscripts/rabbitmq-server.bat31
-rwxr-xr-xscripts/rabbitmq-service.bat27
-rwxr-xr-xscripts/rabbitmqctl2
-rwxr-xr-xscripts/rabbitmqctl.bat4
-rw-r--r--src/app_utils.erl121
-rw-r--r--src/gm.erl286
-rw-r--r--src/mirrored_supervisor.erl124
-rw-r--r--src/mirrored_supervisor_tests.erl24
-rw-r--r--src/rabbit.erl160
-rw-r--r--src/rabbit_alarm.erl12
-rw-r--r--src/rabbit_amqqueue.erl38
-rw-r--r--src/rabbit_amqqueue_process.erl58
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_binding.erl11
-rw-r--r--src/rabbit_control_main.erl (renamed from src/rabbit_control.erl)170
-rw-r--r--src/rabbit_direct.erl12
-rw-r--r--src/rabbit_disk_monitor.erl17
-rw-r--r--src/rabbit_event.erl6
-rw-r--r--src/rabbit_exchange.erl158
-rw-r--r--src/rabbit_exchange_decorator.erl71
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_invalid.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_file.erl17
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl17
-rw-r--r--src/rabbit_misc.erl182
-rw-r--r--src/rabbit_mnesia.erl5
-rw-r--r--src/rabbit_net.erl30
-rw-r--r--src/rabbit_networking.erl15
-rw-r--r--src/rabbit_nodes.erl3
-rw-r--r--src/rabbit_parameter_validation.erl61
-rw-r--r--src/rabbit_plugins.erl399
-rw-r--r--src/rabbit_plugins_main.erl273
-rw-r--r--src/rabbit_policy.erl156
-rw-r--r--src/rabbit_prelaunch.erl214
-rw-r--r--src/rabbit_reader.erl15
-rw-r--r--src/rabbit_registry.erl21
-rw-r--r--src/rabbit_runtime_parameter.erl43
-rw-r--r--src/rabbit_runtime_parameters.erl243
-rw-r--r--src/rabbit_runtime_parameters_test.erl38
-rw-r--r--src/rabbit_sup.erl38
-rw-r--r--src/rabbit_tests.erl167
-rw-r--r--src/rabbit_upgrade_functions.erl55
-rw-r--r--src/rabbit_variable_queue.erl28
-rw-r--r--src/supervisor2.erl4
65 files changed, 2562 insertions, 1224 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ded3ab48..68bc87be 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -720,7 +720,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -745,6 +745,99 @@
</refsect2>
<refsect2>
+ <title>Parameter Management</title>
+ <para>
+ Certain features of RabbitMQ (such as the federation plugin)
+ are controlled by dynamic,
+ cluster-wide <emphasis>parameters</emphasis>. Each parameter
+ consists of a component name, a key and a value. The
+ component name and key are strings, and the value is an
+ Erlang term. Parameters can be set, cleared and listed. In
+ general you should refer to the documentation for the feature
+ in question to see how to set parameters.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Sets a parameter.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>component_name</term>
+ <listitem><para>
+ The name of the component for which the
+ parameter is being set.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>key</term>
+ <listitem><para>
+ The key for which the parameter is being set.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>value</term>
+ <listitem><para>
+ The value for the parameter, as an
+ Erlang term. In most shells you are very likely to
+ need to quote this.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_parameter federation local_username '&lt;&lt;"guest">>'</screen>
+ <para role="example">
+ This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command>&lt;&lt;"guest">></command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Clears a parameter.
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>component_name</term>
+ <listitem><para>
+ The name of the component for which the
+ parameter is being cleared.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>key</term>
+ <listitem><para>
+ The key for which the parameter is being cleared.
+ </para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_parameter federation local_username</screen>
+ <para role="example">
+ This command clears the parameter <command>local_username</command> for the <command>federation</command> component.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Lists all parameters.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl list_parameters</screen>
+ <para role="example">
+ This command lists all parameters.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
+
+ <refsect2>
<title>Server Status</title>
<para>
The server status queries interrogate the server and return a list of
@@ -794,6 +887,10 @@
<listitem><para>Queue arguments.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>policy</term>
+ <listitem><para>Policy name applying to the queue.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>pid</term>
<listitem><para>Id of the Erlang process associated with the queue.</para></listitem>
</varlistentry>
@@ -885,7 +982,8 @@
</varlistentry>
<varlistentry>
<term>type</term>
- <listitem><para>The exchange type (one of [<command>direct</command>,
+ <listitem><para>The exchange type (such as
+ [<command>direct</command>,
<command>topic</command>, <command>headers</command>,
<command>fanout</command>]).</para></listitem>
</varlistentry>
@@ -905,6 +1003,10 @@
<term>arguments</term>
<listitem><para>Exchange arguments.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>policy</term>
+ <listitem><para>Policy name for applying to the exchange.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>exchangeinfoitem</command>s are specified then
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index b7d14f20..523b54ce 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,9 +19,11 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
- {disk_free_limit, {mem_relative, 1.0}},
+ {disk_free_limit, 1000000000}, %% 1GB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
{frame_max, 131072},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index faf3059a..e8b4a623 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,11 +43,11 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratch}).
+ scratches, policy}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes}).
+ arguments, pid, slave_pids, mirror_nodes, policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -66,6 +66,8 @@
-record(listener, {node, protocol, host, ip_address, port}).
+-record(runtime_parameters, {key, value}).
+
-record(basic_message, {exchange_name, routing_keys = [], content, id,
is_persistent}).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 234fc2c7..03e513f8 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -32,8 +32,8 @@ prepare:
SPECS/rabbitmq-server.spec
cp ${COMMON_DIR}/* SOURCES/
+ cp rabbitmq-server.init SOURCES/rabbitmq-server.init
sed -i \
- -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
-e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \
SOURCES/rabbitmq-server.init
ifeq "$(RPM_OS)" "fedora"
@@ -42,6 +42,7 @@ ifeq "$(RPM_OS)" "fedora"
SOURCES/rabbitmq-server.init
endif
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@||' \
SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
diff --git a/packaging/common/rabbitmq-server.init b/packaging/RPMS/Fedora/rabbitmq-server.init
index 40238c8e..2d2680e3 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/RPMS/Fedora/rabbitmq-server.init
@@ -27,7 +27,7 @@ INIT_LOG_DIR=/var/log/rabbitmq
PID_FILE=/var/run/rabbitmq/pid
START_PROG= # Set when building package
-LOCK_FILE= # Set when building package
+LOCK_FILE=/var/lock/subsys/$NAME
test -x $DAEMON || exit 0
test -x $CONTROL || exit 0
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0e59c218..e832aed6 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -29,7 +29,9 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
-if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
+if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then
+ /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@
+elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
elif [ `id -u` = 0 ] ; then
@SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 2a738f6e..1e4bf755 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -22,15 +22,8 @@ package: clean
tar -zxf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
-# Debian and descendants differ from most other distros in that
-# runlevel 2 should start network services.
- sed -i \
- -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
- -e 's|^START_PROG=.*$$|START_PROG="start-stop-daemon -v --chuid rabbitmq --start --exec"|' \
- -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \
- -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \
- $(UNPACKED_DIR)/debian/rabbitmq-server.init
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@| > "/var/log/rabbitmq/startup_log" 2> "/var/log/rabbitmq/startup_err"|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index e935acf5..943ed48f 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
-Uploader: Emile Joubert <emile@rabbitmq.com>
+Uploaders: Emile Joubert <emile@rabbitmq.com>
DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
new file mode 100644
index 00000000..b2d3f86a
--- /dev/null
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -0,0 +1,187 @@
+#!/bin/sh
+#
+# rabbitmq-server RabbitMQ broker
+#
+# chkconfig: - 80 05
+# description: Enable AMQP service provided by RabbitMQ
+#
+
+### BEGIN INIT INFO
+# Provides: rabbitmq-server
+# Required-Start: $remote_fs $network
+# Required-Stop: $remote_fs $network
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Description: RabbitMQ broker
+# Short-Description: Enable AMQP service provided by RabbitMQ broker
+### END INIT INFO
+
+PATH=/sbin:/usr/sbin:/bin:/usr/bin
+NAME=rabbitmq-server
+DAEMON=/usr/sbin/${NAME}
+CONTROL=/usr/sbin/rabbitmqctl
+DESC="message broker"
+USER=rabbitmq
+ROTATE_SUFFIX=
+INIT_LOG_DIR=/var/log/rabbitmq
+PID_FILE=/var/run/rabbitmq/pid
+
+
+test -x $DAEMON || exit 0
+test -x $CONTROL || exit 0
+
+RETVAL=0
+set -e
+
+[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}
+
+. /lib/lsb/init-functions
+. /lib/init/vars.sh
+
+ensure_pid_dir () {
+ PID_DIR=`dirname ${PID_FILE}`
+ if [ ! -d ${PID_DIR} ] ; then
+ mkdir -p ${PID_DIR}
+ chown -R ${USER}:${USER} ${PID_DIR}
+ chmod 755 ${PID_DIR}
+ fi
+}
+
+remove_pid () {
+ rm -f ${PID_FILE}
+ rmdir `dirname ${PID_FILE}` || :
+}
+
+start_rabbitmq () {
+ status_rabbitmq quiet
+ if [ $RETVAL != 0 ] ; then
+ RETVAL=0
+ ensure_pid_dir
+ set +e
+ RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \
+ --chuid rabbitmq --start --exec $DAEMON \
+ --pidfile "$RABBITMQ_PID_FILE" --background
+ $CONTROL wait $PID_FILE >/dev/null 2>&1
+ RETVAL=$?
+ set -e
+ if [ $RETVAL != 0 ] ; then
+ remove_pid
+ fi
+ else
+ RETVAL=3
+ fi
+}
+
+stop_rabbitmq () {
+ status_rabbitmq quiet
+ if [ $RETVAL = 0 ] ; then
+ set +e
+ $CONTROL stop ${PID_FILE} > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err
+ RETVAL=$?
+ set -e
+ if [ $RETVAL = 0 ] ; then
+ remove_pid
+ fi
+ else
+ RETVAL=3
+ fi
+}
+
+status_rabbitmq() {
+ set +e
+ if [ "$1" != "quiet" ] ; then
+ $CONTROL status 2>&1
+ else
+ $CONTROL status > /dev/null 2>&1
+ fi
+ if [ $? != 0 ] ; then
+ RETVAL=3
+ fi
+ set -e
+}
+
+rotate_logs_rabbitmq() {
+ set +e
+ $CONTROL -q rotate_logs ${ROTATE_SUFFIX}
+ if [ $? != 0 ] ; then
+ RETVAL=1
+ fi
+ set -e
+}
+
+restart_running_rabbitmq () {
+ status_rabbitmq quiet
+ if [ $RETVAL = 0 ] ; then
+ restart_rabbitmq
+ else
+ log_warning_msg "${DESC} not running"
+ fi
+}
+
+restart_rabbitmq() {
+ stop_rabbitmq
+ start_rabbitmq
+}
+
+restart_end() {
+ if [ $RETVAL = 0 ] ; then
+ log_end_msg 0
+ else
+ log_end_msg 1
+ fi
+}
+
+start_stop_end() {
+ case "$RETVAL" in
+ 0)
+ [ -x /sbin/initctl ] && /sbin/initctl emit --no-wait "${NAME}-${1}"
+ log_end_msg 0
+ ;;
+ 3)
+ log_warning_msg "${DESC} already ${1}"
+ log_end_msg 0
+ RETVAL=0
+ ;;
+ *)
+ log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
+ log_end_msg 1
+ ;;
+ esac
+}
+
+case "$1" in
+ start)
+ log_daemon_msg "Starting ${DESC}" $NAME
+ start_rabbitmq
+ start_stop_end "running"
+ ;;
+ stop)
+ log_daemon_msg "Stopping ${DESC}" $NAME
+ stop_rabbitmq
+ start_stop_end "stopped"
+ ;;
+ status)
+ status_rabbitmq
+ ;;
+ rotate-logs)
+ log_action_begin_msg "Rotating log files for ${DESC}: ${NAME}"
+ rotate_logs_rabbitmq
+ log_action_end_msg $RETVAL
+ ;;
+ force-reload|reload|restart)
+ log_daemon_msg "Restarting ${DESC}" $NAME
+ restart_rabbitmq
+ restart_end
+ ;;
+ try-restart)
+ log_daemon_msg "Restarting ${DESC}" $NAME
+ restart_running_rabbitmq
+ restart_end
+ ;;
+ *)
+ echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2
+ RETVAL=1
+ ;;
+esac
+
+exit $RETVAL
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 14a18d57..97c74791 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -31,7 +31,7 @@ exec erl \
-noinput \
-hidden \
-sname rabbitmq-plugins$$ \
- -s rabbit_plugins \
+ -s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 66a900a1..c67a0263 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -43,9 +43,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 0a5a4640..34915b3d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -58,7 +58,8 @@ DEFAULT_NODE_PORT=5672
##--- End of overridden <var_name> variables
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
+[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot "
case "$(uname -s)" in
CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait
@@ -70,24 +71,16 @@ case "$(uname -s)" in
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
- if erl \
- -pa "$RABBITMQ_EBIN_ROOT" \
- -noinput \
- -hidden \
- -s rabbit_prelaunch \
- -sname rabbitmqprelaunch$$ \
- -extra "$RABBITMQ_ENABLED_PLUGINS_FILE" "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
+if ! erl -pa "$RABBITMQ_EBIN_ROOT" \
+ -noinput \
+ -hidden \
+ -s rabbit_prelaunch \
+ -sname rabbitmqprelaunch$$ \
+ -extra "${RABBITMQ_NODENAME}";
then
- RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
- RABBITMQ_EBIN_PATH=""
- else
- exit 1
- fi
-else
- RABBITMQ_BOOT_FILE=start_sasl
- RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+ exit 1;
fi
+
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
@@ -100,10 +93,10 @@ RABBITMQ_LISTEN_ARG=
set -f
exec erl \
- ${RABBITMQ_EBIN_PATH} \
+ -pa ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot ${RABBITMQ_BOOT_FILE} \
+ -boot start_sasl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
@@ -112,6 +105,9 @@ exec erl \
-sasl sasl_error_logger false \
-rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
+ -rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
+ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
+ -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
-os_mon start_cpu_sup false \
-os_mon start_disksup false \
-os_mon start_memsup false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index ca49a5d8..167f272e 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -86,25 +86,24 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- "!RABBITMQ_NODENAME!"
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+ -pa "!RABBITMQ_EBIN_ROOT!" ^
+ -noinput -hidden ^
+ -s rabbit_prelaunch ^
+ -sname rabbitmqprelaunch!RANDOM! ^
+ -extra "!RABBITMQ_NODENAME!"
+
if ERRORLEVEL 1 (
exit /B 1
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_PATH="-pa !RABBITMQ_EBIN_ROOT!"
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -124,9 +123,10 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
"!ERLANG_HOME!\bin\erl.exe" ^
-!RABBITMQ_EBIN_PATH! ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
+W w ^
@@ -139,6 +139,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 9e274840..4758c861 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -154,24 +154,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- ""
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if ERRORLEVEL 1 (
- exit /B 1
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -191,8 +178,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
set ERLANG_SERVICE_ARGUMENTS= ^
-!RABBITMQ_EBIN_PATH! ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
+W w ^
+A30 ^
@@ -204,6 +192,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 4aad6b8f..a5fade72 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -32,6 +32,6 @@ exec erl \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
- -s rabbit_control \
+ -s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index f37fae48..9f549f1e 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -34,7 +34,7 @@ if "!RABBITMQ_NODENAME!"=="" (
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -43,7 +43,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
new file mode 100644
index 00000000..4bef83a5
--- /dev/null
+++ b/src/app_utils.erl
@@ -0,0 +1,121 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+-module(app_utils).
+
+-export([load_applications/1, start_applications/1,
+ stop_applications/1, app_dependency_order/2,
+ wait_for_applications/1]).
+
+-ifdef(use_specs).
+
+-spec load_applications([atom()]) -> 'ok'.
+-spec start_applications([atom()]) -> 'ok'.
+-spec stop_applications([atom()]) -> 'ok'.
+-spec wait_for_applications([atom()]) -> 'ok'.
+-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+
+-endif.
+
+%%---------------------------------------------------------------------------
+%% Public API
+
+load_applications(Apps) ->
+ load_applications(queue:from_list(Apps), sets:new()),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
+wait_for_applications(Apps) ->
+ [wait_for_application(App) || App <- Apps], ok.
+
+app_dependency_order(RootApps, StripUnreachable) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ [{App, app_dependencies(App)} ||
+ {App, _Desc, _Vsn} <- application:loaded_applications()]),
+ try
+ case StripUnreachable of
+ true -> digraph:del_vertices(G, digraph:vertices(G) --
+ digraph_utils:reachable(RootApps, G));
+ false -> ok
+ end,
+ digraph_utils:topsort(G)
+ after
+ true = digraph:delete(G)
+ end.
+
+%%---------------------------------------------------------------------------
+%% Private API
+
+wait_for_application(Application) ->
+ case lists:keymember(Application, 1, application:which_applications()) of
+ true -> ok;
+ false -> timer:sleep(1000),
+ wait_for_application(Application)
+ end.
+
+load_applications(Worklist, Loaded) ->
+ case queue:out(Worklist) of
+ {empty, _WorkList} ->
+ ok;
+ {{value, App}, Worklist1} ->
+ case sets:is_element(App, Loaded) of
+ true -> load_applications(Worklist1, Loaded);
+ false -> case application:load(App) of
+ ok -> ok;
+ {error, {already_loaded, App}} -> ok;
+ Error -> throw(Error)
+ end,
+ load_applications(
+ queue:join(Worklist1,
+ queue:from_list(app_dependencies(App))),
+ sets:add_element(App, Loaded))
+ end
+ end.
+
+app_dependencies(App) ->
+ case application:get_key(App, applications) of
+ undefined -> [];
+ {ok, Lst} -> Lst
+ end.
+
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
diff --git a/src/gm.erl b/src/gm.erl
index 01300f18..f88ed18f 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -433,51 +433,47 @@
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(group_members/1 :: (pid()) -> [pid()]).
-%% The joined, members_changed and handle_msg callbacks can all
-%% return any of the following terms:
+%% The joined, members_changed and handle_msg callbacks can all return
+%% any of the following terms:
%%
%% 'ok' - the callback function returns normally
%%
-%% {'stop', Reason} - the callback indicates the member should
-%% stop with reason Reason and should leave the group.
+%% {'stop', Reason} - the callback indicates the member should stop
+%% with reason Reason and should leave the group.
%%
-%% {'become', Module, Args} - the callback indicates that the
-%% callback module should be changed to Module and that the
-%% callback functions should now be passed the arguments
-%% Args. This allows the callback module to be dynamically
-%% changed.
+%% {'become', Module, Args} - the callback indicates that the callback
+%% module should be changed to Module and that the callback functions
+%% should now be passed the arguments Args. This allows the callback
+%% module to be dynamically changed.
-%% Called when we've successfully joined the group. Supplied with
-%% Args provided in start_link, plus current group members.
+%% Called when we've successfully joined the group. Supplied with Args
+%% provided in start_link, plus current group members.
-callback joined(Args :: term(), Members :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Supplied with Args provided in start_link, the list of new
-%% members and the list of members previously known to us that
-%% have since died. Note that if a member joins and dies very
-%% quickly, it's possible that we will never see that member
-%% appear in either births or deaths. However we are guaranteed
-%% that (1) we will see a member joining either in the births
-%% here, or in the members passed to joined/2 before receiving
-%% any messages from it; and (2) we will not see members die that
-%% we have not seen born (or supplied in the members to
-%% joined/2).
+%% Supplied with Args provided in start_link, the list of new members
+%% and the list of members previously known to us that have since
+%% died. Note that if a member joins and dies very quickly, it's
+%% possible that we will never see that member appear in either births
+%% or deaths. However we are guaranteed that (1) we will see a member
+%% joining either in the births here, or in the members passed to
+%% joined/2 before receiving any messages from it; and (2) we will not
+%% see members die that we have not seen born (or supplied in the
+%% members to joined/2).
-callback members_changed(Args :: term(), Births :: [pid()],
Deaths :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
-%% message. This does get called for messages injected by this
-%% member, however, in such cases, there is no special
-%% significance of this invocation: it does not indicate that the
-%% message has made it to any other members, let alone all other
-%% members.
+%% message. This does get called for messages injected by this member,
+%% however, in such cases, there is no special significance of this
+%% invocation: it does not indicate that the message has made it to
+%% any other members, let alone all other members.
-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Called on gm member termination as per rules in gen_server,
-%% with the Args provided in start_link plus the termination
-%% Reason.
+%% Called on gm member termination as per rules in gen_server, with
+%% the Args provided in start_link plus the termination Reason.
-callback terminate(Args :: term(), Reason :: term()) ->
ok | term().
@@ -533,7 +529,7 @@ init([GroupName, Module, Args]) ->
group_name = GroupName,
module = Module,
view = undefined,
- pub_count = 0,
+ pub_count = -1,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
@@ -562,7 +558,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -575,33 +571,39 @@ handle_call({add_on_right, NewMember}, _From,
members_state = MembersState,
module = Module,
callback_args = Args }) ->
- Group = record_new_member_in_group(
- GroupName, Self, NewMember,
- fun (Group1) ->
- View1 = group_to_view(Group1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(
- MembersState)})
- end),
+ {MembersState1, Group} =
+ record_new_member_in_group(
+ GroupName, Self, NewMember,
+ fun (Group1) ->
+ View1 = group_to_view(Group1),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self,
+ prepare_members_state(MembersState1)}),
+ MembersState1
+ end),
View2 = group_to_view(Group),
- State1 = check_neighbours(State #state { view = View2 }),
+ State1 = check_neighbours(State #state { view = View2,
+ members_state = MembersState1 }),
Result = callback_view_changed(Args, Module, View, View2),
handle_callback_result({Result, {ok, Group}, State1}).
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
+ members_state = MembersState,
group_name = GroupName,
module = Module,
callback_args = Args }) ->
{Result, State1} =
case needs_view_update(ReqVer, View) of
- true ->
- View1 = group_to_view(read_group(GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- check_neighbours(State #state { view = View1 })};
- false ->
- {ok, State}
+ true -> View1 = group_to_view(read_group(GroupName)),
+ MemberState1 = remove_erased_members(MembersState, View1),
+ {callback_view_changed(Args, Module, View, View1),
+ check_neighbours(
+ State #state { view = View1,
+ members_state = MemberState1 })};
+ false -> {ok, State}
end,
handle_callback_result(
if_callback_success(
@@ -645,7 +647,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -659,28 +661,29 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
group_to_view(record_dead_member_in_group(Member, GroupName)),
- State1 = State #state { view = View1 },
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
- maybe_erase_aliases(
- State1 #state {
+ {Result1, State1} = maybe_erase_aliases(State, View1),
+ {Result1, State1 #state {
members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) });
+ confirms = purge_confirms(Confirms) }};
_ ->
%% here we won't be pointing out any deaths:
%% the concern is that there maybe births
%% which we'd otherwise miss.
{callback_view_changed(Args, Module, View, View1),
- State1}
+ check_neighbours(State #state { view = View1 })}
end,
- handle_callback_result({Result, check_neighbours(State2)})
+ handle_callback_result({Result, State2})
end.
@@ -693,9 +696,13 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) -> 1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
-prioritise_info(_ , _State) -> 0.
+prioritise_info(flush, _State) ->
+ 1;
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+ #state { members_state = MS }) when MS /= undefined ->
+ 1;
+prioritise_info(_, _State) ->
+ 0.
handle_msg(check_neighbours, State) ->
@@ -795,8 +802,8 @@ handle_msg({activity, Left, Activity},
State1 = State #state { members_state = MembersState1,
confirms = Confirms1 },
Activity3 = activity_finalise(Activity1),
- {Result, State2} = maybe_erase_aliases(State1),
- ok = maybe_send_activity(Activity3, State2),
+ ok = maybe_send_activity(Activity3, State1),
+ {Result, State2} = maybe_erase_aliases(State1, View),
if_callback_success(
Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
@@ -829,13 +836,14 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
+ PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
- Buffer1 = [{PubCount, Msg} | Buffer],
+ Buffer1 = [{PubCount1, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
- _ -> queue:in({PubCount, From}, Confirms)
+ _ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount + 1,
+ State1 = State #state { pub_count = PubCount1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
case From =/= none of
@@ -850,14 +858,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
- broadcast_buffer = Buffer }) ->
+ broadcast_buffer = Buffer,
+ pub_count = PubCount }) ->
+ [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
- Member #member { pending_ack = PA1 }
+ Member #member { pending_ack = PA1,
+ last_pub = PubCount }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [] }.
@@ -867,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -890,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1052,7 +1057,7 @@ record_dead_member_in_group(Member, GroupName) ->
Group.
record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, Group} =
+ {atomic, {Result, Group}} =
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
@@ -1062,11 +1067,11 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
Members1 = Prefix ++ [Left, NewMember | Suffix],
Group2 = Group1 #gm_group { members = Members1,
version = Ver + 1 },
- ok = Fun(Group2),
+ Result = Fun(Group2),
mnesia:write(Group2),
- Group2
+ {Result, Group2}
end),
- Group.
+ {Result, Group}.
erase_members_in_group(Members, GroupName) ->
DeadMembers = [{dead, Id} || Id <- Members],
@@ -1089,10 +1094,10 @@ erase_members_in_group(Members, GroupName) ->
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
- view = View,
+ view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1107,11 +1112,11 @@ maybe_erase_aliases(State = #state { self = Self,
end, {[], MembersState}, Aliases),
State1 = State #state { members_state = MembersState1 },
case Erasable of
- [] -> {ok, State1};
+ [] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
erase_members_in_group(Erasable, GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- State1 #state { view = View1 }}
+ {callback_view_changed(Args, Module, View0, View1),
+ check_neighbours(State1 #state { view = View1 })}
end.
can_erase_view_member(Self, Self, _LA, _LP) -> false;
@@ -1141,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1233,23 +1236,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1257,6 +1256,12 @@ make_member(GroupName) ->
{error, not_found} -> ?VERSION_START
end, self()}.
+remove_erased_members(MembersState, View) ->
+ lists:foldl(fun (Id, MembersState1) ->
+ store_member(Id, find_member_or_blank(Id, MembersState),
+ MembersState1)
+ end, blank_member_state(), all_known_members(View)).
+
get_pid({_Version, Pid}) -> Pid.
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
@@ -1265,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1287,16 +1288,30 @@ send_right(Right, View, Msg) ->
ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
- lists:foldl(
- fun ({Id, Pubs, _Acks}, ok) ->
- lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, get_pid(Id), Pub);
- (_, Error) ->
- Error
- end, ok, Pubs);
- (_, Error) ->
- Error
- end, ok, Activity).
+ Result =
+ lists:foldl(
+ fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
+ lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
+ case Module2:handle_msg(
+ Args2, get_pid(Id), Pub) of
+ ok ->
+ Acc;
+ {become, Module3, Args3} ->
+ {Args3, Module3, ok};
+ {stop, _Reason} = Error ->
+ Error
+ end;
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args1, Module1, ok}, Pubs);
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args, Module, ok}, Activity),
+ case Result of
+ {Args, Module, ok} -> ok;
+ {Args1, Module1, ok} -> {become, Module1, Args1};
+ {stop, _Reason} = Error -> Error
+ end.
callback_view_changed(Args, Module, OldView, NewView) ->
OldMembers = all_known_members(OldView),
@@ -1364,34 +1379,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 221f6a87..4fc488b8 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -225,8 +225,8 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2).
count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
-call(Sup, Msg) ->
- ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
+call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity).
+cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg).
find_call(Sup, Id, Msg) ->
Group = call(Sup, group),
@@ -237,7 +237,7 @@ find_call(Sup, Id, Msg) ->
%% immediately after the tx - we can't be 100% here. So we may as
%% well dirty_select.
case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of
- [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity);
+ [Mirror] -> call(Mirror, Msg);
[] -> {error, not_found}
end.
@@ -246,13 +246,16 @@ fold(FunAtom, Sup, AggFun) ->
lists:foldl(AggFun, [],
[apply(?SUPERVISOR, FunAtom, [D]) ||
M <- ?PG2:get_members(Group),
- D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]).
+ D <- [delegate(M)]]).
child(Sup, Id) ->
[Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup),
Id1 =:= Id],
Pid.
+delegate(Sup) -> child(Sup, delegate).
+mirroring(Sup) -> child(Sup, mirroring).
+
%%----------------------------------------------------------------------------
start_internal(Group, ChildSpecs) ->
@@ -261,24 +264,19 @@ start_internal(Group, ChildSpecs) ->
%%----------------------------------------------------------------------------
-init({overall, Group, Init}) ->
- case Init of
- {ok, {Restart, ChildSpecs}} ->
- Delegate = {delegate, {?SUPERVISOR, start_link,
- [?MODULE, {delegate, Restart}]},
- temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
- Mirroring = {mirroring, {?MODULE, start_internal,
- [Group, ChildSpecs]},
- permanent, 16#ffffffff, worker, [?MODULE]},
- %% Important: Delegate MUST start before Mirroring so that
- %% when we shut down from above it shuts down last, so
- %% Mirroring does not see it die.
- %%
- %% See comment in handle_info('DOWN', ...) below
- {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}};
- ignore ->
- ignore
- end;
+init({overall, _Group, ignore}) -> ignore;
+init({overall, Group, {ok, {Restart, ChildSpecs}}}) ->
+ %% Important: Delegate MUST start before Mirroring so that when we
+ %% shut down from above it shuts down last, so Mirroring does not
+ %% see it die.
+ %%
+ %% See comment in handle_info('DOWN', ...) below
+ {ok, {{one_for_all, 0, 1},
+ [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]},
+ temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
+ {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]},
+ permanent, 16#ffffffff, worker, [?MODULE]}]}};
+
init({delegate, Restart}) ->
{ok, {Restart, []}};
@@ -293,28 +291,29 @@ handle_call({init, Overall}, _From,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
?PG2:create(Group),
- ok = ?PG2:join(Group, self()),
- Rest = ?PG2:get_members(Group) -- [self()],
+ ok = ?PG2:join(Group, Overall),
+ Rest = ?PG2:get_members(Group) -- [Overall],
case Rest of
[] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
- ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}),
+ ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}),
erlang:monitor(process, Pid)
end || Pid <- Rest],
- Delegate = child(Overall, delegate),
+ Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
- true -> {reply, ok, State1};
- false -> {stop, shutdown, State1}
+ case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of
+ [] -> {reply, ok, State1};
+ Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
- State = #state{delegate = Delegate,
+ State = #state{overall = Overall,
+ delegate = Delegate,
group = Group}) ->
- {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
@@ -327,9 +326,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
{reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
-handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) ->
- {reply, Delegate, State};
-
handle_call(group, _From, State = #state{group = Group}) ->
{reply, Group, State};
@@ -348,7 +344,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _Ref, process, Pid, Reason},
- State = #state{delegate = Pid, group = Group}) ->
+ State = #state{overall = Pid, group = Group}) ->
%% Since the delegate is temporary, its death won't cause us to
%% die. Since the overall supervisor kills processes in reverse
%% order when shutting down "from above" and we started after the
@@ -362,19 +358,20 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{stop, Reason, State};
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{delegate = Delegate, group = Group}) ->
+ State = #state{delegate = Delegate, group = Group,
+ overall = O}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
- Self = self(),
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> []
+ [O | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(
+ fun() -> update_all(O, Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
end,
- case all_started(R) of
- true -> {noreply, State};
- false -> {stop, shutdown, State}
+ case errors(R) of
+ [] -> {noreply, State};
+ Errors -> {stop, {shutdown, Errors}, State}
end;
handle_info(Info, State) ->
@@ -389,13 +386,11 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
tell_all_peers_to_die(Group, Reason) ->
- [?GEN_SERVER:cast(P, {die, Reason}) ||
- P <- ?PG2:get_members(Group) -- [self()]].
+ [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]].
-maybe_start(Group, Delegate, ChildSpec) ->
- case mnesia:transaction(fun() ->
- check_start(Group, Delegate, ChildSpec)
- end) of
+maybe_start(Group, Overall, Delegate, ChildSpec) ->
+ case mnesia:transaction(
+ fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
{atomic, start} -> start(Delegate, ChildSpec);
{atomic, undefined} -> already_in_mnesia;
{atomic, Pid} -> {already_in_mnesia, Pid};
@@ -403,31 +398,29 @@ maybe_start(Group, Delegate, ChildSpec) ->
{aborted, E} -> {error, E}
end.
-check_start(Group, Delegate, ChildSpec) ->
+check_start(Group, Overall, Delegate, ChildSpec) ->
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
- [] -> write(Group, ChildSpec),
+ [] -> write(Group, Overall, ChildSpec),
start;
[S] -> #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Pid} = S,
- case self() of
+ case Overall of
Pid -> child(Delegate, Id);
_ -> case supervisor(Pid) of
- dead -> write(Group, ChildSpec),
+ dead -> write(Group, Overall, ChildSpec),
start;
Delegate0 -> child(Delegate0, Id)
end
end
end.
-supervisor(Pid) ->
- with_exit_handler(
- fun() -> dead end,
- fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end).
+supervisor(Pid) -> with_exit_handler(fun() -> dead end,
+ fun() -> delegate(Pid) end).
-write(Group, ChildSpec) ->
+write(Group, Overall, ChildSpec) ->
ok = mnesia:write(
#mirrored_sup_childspec{key = {Group, id(ChildSpec)},
- mirroring_pid = self(),
+ mirroring_pid = Overall,
childspec = ChildSpec}),
ChildSpec.
@@ -453,12 +446,12 @@ check_stop(Group, Delegate, Id) ->
id({Id, _, _, _, _, _}) -> Id.
-update_all(OldPid) ->
- MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid,
+update_all(Overall, OldOverall) ->
+ MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall,
key = '$1',
childspec = '$2',
_ = '_'},
- [write(Group, C) ||
+ [write(Group, Overall, C) ||
[{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])].
delete_all(Group) ->
@@ -468,12 +461,11 @@ delete_all(Group) ->
[delete(Group, id(C)) ||
C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
-all_started(Results) -> [] =:= [R || R = {error, _} <- Results].
+errors(Results) -> [E || {error, E} <- Results].
%%----------------------------------------------------------------------------
-create_tables() ->
- create_tables([?TABLE_DEF]).
+create_tables() -> create_tables([?TABLE_DEF]).
create_tables([]) ->
ok;
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index e8baabe8..f8cbd853 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -51,7 +51,7 @@ test_migrate() ->
with_sups(fun([A, _]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [a, b]).
@@ -61,10 +61,10 @@ test_migrate_twice() ->
with_sups(fun([A, B]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
{ok, C} = start_sup(c),
Pid2 = pid_of(worker),
- kill(B, Pid2),
+ kill_registered(B, Pid2),
Pid3 = pid_of(worker),
false = (Pid1 =:= Pid3),
kill(C)
@@ -124,7 +124,7 @@ test_large_group() ->
with_sups(fun([A, _, _, _]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [a, b, c, d]).
@@ -134,7 +134,7 @@ test_childspecs_at_init() ->
S = childspec(worker),
with_sups(fun([A, _]) ->
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [{a, [S]}, {b, [S]}]).
@@ -143,7 +143,7 @@ test_anonymous_supervisors() ->
with_sups(fun([A, _B]) ->
?MS:start_child(A, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [anon, anon]).
@@ -157,7 +157,7 @@ test_no_migration_on_shutdown() ->
with_sups(fun([Evil, _]) ->
?MS:start_child(Evil, childspec(worker)),
try
- call(worker, ping),
+ call(worker, ping, 1000, 100),
exit(worker_should_not_have_migrated)
catch exit:{timeout_waiting_for_server, _, _} ->
ok
@@ -268,7 +268,7 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 1000, 100).
+call(Id, Msg) -> call(Id, Msg, 10*1000, 100).
call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
@@ -285,10 +285,16 @@ kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]);
kill(Pid, Waits) ->
erlang:monitor(process, Pid),
[erlang:monitor(process, P) || P <- Waits],
- exit(Pid, kill),
+ exit(Pid, bang),
kill_wait(Pid),
[kill_wait(P) || P <- Waits].
+kill_registered(Pid, Child) ->
+ {registered_name, Name} = erlang:process_info(Child, registered_name),
+ kill(Pid, Child),
+ false = (Child =:= whereis(Name)),
+ ok.
+
kill_wait(Pid) ->
receive
{'DOWN', _Ref, process, Pid, _Reason} ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b1f786a0..fda489fe 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,9 +18,9 @@
-behaviour(application).
--export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0,
- status/0, is_running/0, is_running/1, environment/0,
- rotate_logs/1, force_event_refresh/0]).
+-export([start/0, boot/0, stop/0,
+ stop_and_halt/0, await_startup/0, status/0, is_running/0,
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -60,7 +60,8 @@
-rabbit_boot_step({worker_pool,
[{description, "worker pool"},
- {mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {mfa, {rabbit_sup, start_supervisor_child,
+ [worker_pool_sup]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).
@@ -143,7 +144,8 @@
-rabbit_boot_step({mirror_queue_slave_sup,
[{description, "mirror queue slave sup"},
- {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {mfa, {rabbit_sup, start_supervisor_child,
+ [rabbit_mirror_queue_slave_sup]}},
{requires, recovery},
{enables, routing_ready}]}).
@@ -197,7 +199,8 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
+ ssl_connection, ssl_record, gen_fsm, ssl]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -214,11 +217,11 @@
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
--spec(maybe_hipe_compile/0 :: () -> 'ok').
--spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
+-spec(boot/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> no_return()).
+-spec(await_startup/0 :: () -> 'ok').
-spec(status/0 ::
() -> [{pid, integer()} |
{running_applications, [{atom(), string(), string()}]} |
@@ -261,7 +264,7 @@ maybe_hipe_compile() ->
hipe_compile() ->
Count = length(?HIPE_WORTHY),
- io:format("HiPE compiling: |~s|~n |",
+ io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
PidMRefs = [spawn_monitor(fun () -> [begin
@@ -283,29 +286,51 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
split0([], Ls) -> Ls;
split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
-prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_upgrade:maybe_upgrade_mnesia().
+ensure_application_loaded() ->
+ %% We end up looking at the rabbit app's env for HiPE and log
+ %% handling, so it needs to be loaded. But during the tests, it
+ %% may end up getting loaded twice, so guard against that.
+ case application:load(rabbit) of
+ ok -> ok;
+ {error, {already_loaded, rabbit}} -> ok
+ end.
start() ->
+ start_it(fun() ->
+ %% We do not want to HiPE compile or upgrade
+ %% mnesia after just restarting the app
+ ok = ensure_application_loaded(),
+ ok = ensure_working_log_handlers(),
+ ok = app_utils:start_applications(app_startup_order()),
+ ok = print_plugin_info(rabbit_plugins:active())
+ end).
+
+boot() ->
+ start_it(fun() ->
+ ok = ensure_application_loaded(),
+ maybe_hipe_compile(),
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ ok = app_utils:load_applications(ToBeLoaded),
+ StartupApps = app_utils:app_dependency_order(ToBeLoaded,
+ false),
+ ok = app_utils:start_applications(StartupApps),
+ ok = print_plugin_info(Plugins)
+ end).
+
+start_it(StartFun) ->
try
- %% prepare/1 ends up looking at the rabbit app's env, so it
- %% needs to be loaded, but during the tests, it may end up
- %% getting loaded twice, so guard against that
- case application:load(rabbit) of
- ok -> ok;
- {error, {already_loaded, rabbit}} -> ok
- end,
- ok = prepare(),
- ok = rabbit_misc:start_applications(application_load_order())
+ StartFun()
after
- %%give the error loggers some time to catch up
+ %% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
rabbit_log:info("Stopping Rabbit~n"),
- ok = rabbit_misc:stop_applications(application_load_order()).
+ ok = app_utils:stop_applications(app_shutdown_order()).
stop_and_halt() ->
try
@@ -316,6 +341,9 @@ stop_and_halt() ->
end,
ok.
+await_startup() ->
+ app_utils:wait_for_applications(app_startup_order()).
+
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
{running_applications, application:which_applications(infinity)},
@@ -392,46 +420,13 @@ stop(_State) ->
%%---------------------------------------------------------------------------
%% application life cycle
-application_load_order() ->
- ok = load_applications(),
- {ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
- [{App, app_dependencies(App)} ||
- {App, _Desc, _Vsn} <- application:loaded_applications()]),
- true = digraph:del_vertices(
- G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)),
- Result = digraph_utils:topsort(G),
- true = digraph:delete(G),
- Result.
-
-load_applications() ->
- load_applications(queue:from_list(?APPS), sets:new()).
-
-load_applications(Worklist, Loaded) ->
- case queue:out(Worklist) of
- {empty, _WorkList} ->
- ok;
- {{value, App}, Worklist1} ->
- case sets:is_element(App, Loaded) of
- true -> load_applications(Worklist1, Loaded);
- false -> case application:load(App) of
- ok -> ok;
- {error, {already_loaded, App}} -> ok;
- Error -> throw(Error)
- end,
- load_applications(
- queue:join(Worklist1,
- queue:from_list(app_dependencies(App))),
- sets:add_element(App, Loaded))
- end
- end.
+app_startup_order() ->
+ ok = app_utils:load_applications(?APPS),
+ app_utils:app_dependency_order(?APPS, false).
-app_dependencies(App) ->
- case application:get_key(App, applications) of
- undefined -> [];
- {ok, Lst} -> Lst
- end.
+app_shutdown_order() ->
+ Apps = ?APPS ++ rabbit_plugins:active(),
+ app_utils:app_dependency_order(Apps, true).
%%---------------------------------------------------------------------------
%% boot step logic
@@ -477,7 +472,8 @@ sort_boot_steps(UnsortedSteps) ->
%% there is one, otherwise fail).
SortedSteps = lists:reverse(
[begin
- {StepName, Step} = digraph:vertex(G, StepName),
+ {StepName, Step} = digraph:vertex(G,
+ StepName),
Step
end || StepName <- digraph_utils:topsort(G)]),
digraph:delete(G),
@@ -538,7 +534,7 @@ boot_error(Format, Args) ->
boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
- rabbit_sup:start_child(delegate_sup, [Count]).
+ rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
@@ -559,7 +555,8 @@ insert_default_data() ->
ok = rabbit_vhost:add(DefaultVHost),
ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags),
- ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser,
+ DefaultVHost,
DefaultConfigurePerm,
DefaultWritePerm,
DefaultReadPerm),
@@ -621,15 +618,12 @@ log_location(Type) ->
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
-rotate_logs(File, Suffix, OldHandler, NewHandler) ->
- case File of
- undefined -> ok;
- tty -> ok;
- _ -> gen_event:swap_handler(
- error_logger,
- {OldHandler, swap},
- {NewHandler, {File, Suffix}})
- end.
+rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(File, Suffix, OldHandler, NewHandler) ->
+ gen_event:swap_handler(error_logger,
+ {OldHandler, swap},
+ {NewHandler, {File, Suffix}}).
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
@@ -650,6 +644,24 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
%% misc
+print_plugin_info([]) ->
+ ok;
+print_plugin_info(Plugins) ->
+ %% This gets invoked by rabbitmqctl start_app, outside the context
+ %% of the rabbit application
+ rabbit_misc:with_local_io(
+ fun() ->
+ io:format("~n-- plugins running~n"),
+ [print_plugin_info(
+ AppName, element(2, application:get_key(AppName, vsn)))
+ || AppName <- Plugins],
+ ok
+ end).
+
+print_plugin_info(Plugin, Vsn) ->
+ Len = 76 - length(Vsn),
+ io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]).
+
erts_version_check() ->
FoundVer = erlang:system_info(version),
case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 04e0c141..d16d90a4 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -162,17 +162,17 @@ maybe_alert(UpdateFun, Node, Source,
end,
State#alarms{alarmed_nodes = AN1}.
-alert_local(Alert, Alertees, _Source) ->
- alert(Alertees, [Alert], fun erlang:'=:='/2).
+alert_local(Alert, Alertees, Source) ->
+ alert(Alertees, Source, Alert, fun erlang:'=:='/2).
alert_remote(Alert, Alertees, Source) ->
- alert(Alertees, [Source, Alert], fun erlang:'=/='/2).
+ alert(Alertees, Source, Alert, fun erlang:'=/='/2).
-alert(Alertees, AlertArg, NodeComparator) ->
+alert(Alertees, Source, Alert, NodeComparator) ->
Node = node(),
dict:fold(fun (Pid, {M, F, A}, ok) ->
case NodeComparator(Node, node(Pid)) of
- true -> apply(M, F, A ++ [Pid] ++ AlertArg);
+ true -> apply(M, F, A ++ [Pid, Source, Alert]);
false -> ok
end
end, ok, Alertees).
@@ -181,7 +181,7 @@ internal_register(Pid, {M, F, A} = HighMemMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
- {ok, _Sources} -> apply(M, F, A ++ [Pid, true]);
+ {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
error -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c1673504..afbaea65 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,7 +28,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([store_queue/1]).
+-export([update/2, store_queue/1, policy_changed/2]).
%% internal
@@ -71,6 +71,9 @@
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -157,6 +160,8 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -166,6 +171,9 @@
[queue_name, channel_pid, consumer_tag, ack_required]).
start() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -222,9 +230,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- B = add_default_binding(Q),
- fun () -> B(), Q end;
+ [] -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -237,6 +246,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q = #amqqueue{durable = Durable}] ->
+ Q1 = Fun(Q),
+ ok = mnesia:write(rabbit_queue, Q1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
@@ -245,6 +267,9 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
+policy_changed(_Q1, _Q2) ->
+ ok.
+
determine_queue_nodes(Args) ->
Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
@@ -508,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -573,7 +598,8 @@ on_node_down(Node) ->
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
+ node(Pid) == Node andalso
+ not is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5701efeb..8933de87 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, {new, Q}),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use,
- [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- BQS = bq_init(BQ, Q, Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
- rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #q.stats_timer,
- fun() -> emit_stats(State1) end),
- noreply(State1);
- Q1 -> {stop, normal, {existing, Q1}, State}
- end.
+ not_found ->
+ {stop, normal, not_found, State};
+ Q1 ->
+ case matches(Recover, Q, Q1) of
+ true ->
+ gen_server2:reply(From, {new, Q}),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = bq_init(BQ, Q, Recover),
+ State1 = process_args(State#q{backing_queue_state = BQS}),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #q.stats_timer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
+ false ->
+ {stop, normal, {existing, Q1}, State}
+ end
+ end.
+
+matches(true, Q, Q) -> true;
+matches(true, _Q, _Q1) -> false;
+matches(false, Q1, Q2) ->
+ %% i.e. not policy
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
+ Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
bq_init(BQ, Q, Recover) ->
Self = self(),
@@ -984,8 +1002,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- {basic_consume, _, _, _, _, _, _} -> 7;
- {basic_cancel, _, _, _} -> 7;
stat -> 7;
_ -> 0
end.
@@ -995,10 +1011,6 @@ prioritise_cast(Msg, _State) ->
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
- {ack, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid, _Credit} -> 7;
- {unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 28c57bb0..dc144a0e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,10 +26,8 @@
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 17d848da..734456d3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -224,6 +224,5 @@ header_routes(HeadersTable) ->
{array, Routes} -> [Route || {longstr, Route} <- Routes];
undefined -> [];
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
- Type,
- binary_to_list(HeaderKey)}})
+ binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb44797e..f0ea514d 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -173,13 +173,11 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- ok = rabbit_exchange:callback(
- Src, add_binding, [transaction, Src, B]),
+ x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
fun () ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Serial, Src, B]),
- ok = rabbit_event:notify(binding_created, info(B))
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
false -> rabbit_misc:const({error, binding_not_found})
end.
@@ -487,4 +485,5 @@ process_deletions(Deletions) ->
del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
-x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).
+x_callback(Serial, X, F, Bs) ->
+ ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).
diff --git a/src/rabbit_control.erl b/src/rabbit_control_main.erl
index 8b24d2e3..b23088cc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control_main.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
--module(rabbit_control).
+-module(rabbit_control_main).
-include("rabbit.hrl").
-export([start/0, stop/0, action/5]).
@@ -26,6 +26,61 @@
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(QUIET_DEF, {?QUIET_OPT, flag}).
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
+-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+
+-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
+
+-define(COMMANDS,
+ [stop,
+ stop_app,
+ start_app,
+ wait,
+ reset,
+ force_reset,
+ rotate_logs,
+
+ cluster,
+ force_cluster,
+ cluster_status,
+
+ add_user,
+ delete_user,
+ change_password,
+ clear_password,
+ set_user_tags,
+ list_users,
+
+ add_vhost,
+ delete_vhost,
+ list_vhosts,
+ {set_permissions, [?VHOST_DEF]},
+ {clear_permissions, [?VHOST_DEF]},
+ {list_permissions, [?VHOST_DEF]},
+ list_user_permissions,
+
+ set_parameter,
+ clear_parameter,
+ list_parameters,
+
+ {list_queues, [?VHOST_DEF]},
+ {list_exchanges, [?VHOST_DEF]},
+ {list_bindings, [?VHOST_DEF]},
+ {list_connections, [?VHOST_DEF]},
+ list_channels,
+ {list_consumers, [?VHOST_DEF]},
+ status,
+ environment,
+ report,
+ eval,
+
+ close_connection,
+ {trace_on, [?VHOST_DEF]},
+ {trace_off, [?VHOST_DEF]},
+ set_vm_memory_high_watermark
+ ]).
+
-define(GLOBAL_QUERIES,
[{"Connections", rabbit_networking, connection_info_all,
connection_info_keys},
@@ -57,19 +112,18 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
- {[Command0 | Args], Opts} =
- case rabbit_misc:get_options([{flag, ?QUIET_OPT},
- {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}],
- init:get_plain_arguments()) of
- {[], _Opts} -> usage();
- CmdArgsAndOpts -> CmdArgsAndOpts
+ {Command, Opts, Args} =
+ case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
+ init:get_plain_arguments())
+ of
+ {ok, Res} -> Res;
+ no_command -> print_error("could not recognise command", []),
+ usage()
end,
Opts1 = [case K of
?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
_ -> {K, V}
end || {K, V} <- Opts],
- Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
Inform = case Quiet of
@@ -102,6 +156,11 @@ start() ->
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
usage();
+ {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
+ %% We handle this common case specially to avoid ~p since
+ %% that has i18n issues
+ print_error("~s: ~s", [Problem, Reason]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
@@ -194,8 +253,7 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, PidFile, rabbit, Inform);
-
+ wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
action(wait, Node, [PidFile, App], _Opts, Inform) ->
Inform("Waiting for ~p on ~p", [App, Node]),
wait_for_application(Node, PidFile, list_to_atom(App), Inform);
@@ -216,33 +274,33 @@ action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
- Inform("Rotating logs to files with suffix ~p", [Suffix]),
+ Inform("Rotating logs to files with suffix \"~s\"", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
- Inform("Closing connection ~s", [PidStr]),
+ Inform("Closing connection \"~s\"", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
- Inform("Creating user ~p", [Username]),
+ Inform("Creating user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, add_user, Args});
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
- Inform("Deleting user ~p", Args),
+ Inform("Deleting user \"~s\"", Args),
call(Node, {rabbit_auth_backend_internal, delete_user, Args});
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
- Inform("Changing password for user ~p", [Username]),
+ Inform("Changing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, change_password, Args});
action(clear_password, Node, Args = [Username], _Opts, Inform) ->
- Inform("Clearing password for user ~p", [Username]),
+ Inform("Clearing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, clear_password, Args});
action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
Tags = [list_to_atom(T) || T <- TagsStr],
- Inform("Setting tags for user ~p to ~p", [Username, Tags]),
+ Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
rpc_call(Node, rabbit_auth_backend_internal, set_tags,
[list_to_binary(Username), Tags]);
@@ -253,11 +311,11 @@ action(list_users, Node, [], _Opts, Inform) ->
rabbit_auth_backend_internal:user_info_keys());
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Creating vhost ~p", Args),
+ Inform("Creating vhost \"~s\"", Args),
call(Node, {rabbit_vhost, add, Args});
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Deleting vhost ~p", Args),
+ Inform("Deleting vhost \"~s\"", Args),
call(Node, {rabbit_vhost, delete, Args});
action(list_vhosts, Node, Args, _Opts, Inform) ->
@@ -319,12 +377,12 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(trace_on, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Starting tracing for vhost ~p", [VHost]),
+ Inform("Starting tracing for vhost \"~s\"", [VHost]),
rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
action(trace_off, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Stopping tracing for vhost ~p", [VHost]),
+ Inform("Stopping tracing for vhost \"~s\"", [VHost]),
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
@@ -337,23 +395,42 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
+ Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
+ [Username, VHost]),
call(Node, {rabbit_auth_backend_internal, set_permissions,
[Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
+ Inform("Clearing permissions for user \"~s\" in vhost \"~s\"",
+ [Username, VHost]),
call(Node, {rabbit_auth_backend_internal, clear_permissions,
[Username, VHost]});
action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Listing permissions in vhost ~p", [VHost]),
+ Inform("Listing permissions in vhost \"~s\"", [VHost]),
display_info_list(call(Node, {rabbit_auth_backend_internal,
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
+action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) ->
+ Inform("Setting runtime parameter ~p for component ~p to ~p",
+ [Key, Component, Value]),
+ rpc_call(Node, rabbit_runtime_parameters, parse_set,
+ [list_to_binary(Component), list_to_binary(Key), Value]);
+
+action(clear_parameter, Node, [Component, Key], _Opts, Inform) ->
+ Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]),
+ rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component),
+ list_to_binary(Key)]);
+
+action(list_parameters, Node, Args = [], _Opts, Inform) ->
+ Inform("Listing runtime parameters", []),
+ display_info_list(
+ rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args),
+ rabbit_runtime_parameters:info_keys());
+
action(report, Node, _Args, _Opts, Inform) ->
io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
@@ -388,12 +465,22 @@ wait_for_application(Node, PidFile, Application, Inform) ->
Inform("pid is ~s", [Pid]),
wait_for_application(Node, Pid, Application).
+wait_for_application(Node, Pid, rabbit_and_plugins) ->
+ wait_for_startup(Node, Pid);
wait_for_application(Node, Pid, Application) ->
+ while_process_is_alive(
+ Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end).
+
+wait_for_startup(Node, Pid) ->
+ while_process_is_alive(
+ Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
+
+while_process_is_alive(Node, Pid, Activity) ->
case process_up(Pid) of
- true -> case rabbit_nodes:is_running(Node, Application) of
+ true -> case Activity() of
true -> ok;
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- wait_for_application(Node, Pid, Application)
+ while_process_is_alive(Node, Pid, Activity)
end;
false -> {error, process_not_running}
end.
@@ -408,12 +495,14 @@ wait_for_process_death(Pid) ->
read_pid_file(PidFile, Wait) ->
case {file:read_file(PidFile), Wait} of
{{ok, Bin}, _} ->
- S = string:strip(binary_to_list(Bin), right, $\n),
- try list_to_integer(S)
+ S = binary_to_list(Bin),
+ {match, [PidS]} = re:run(S, "[^\\s]+",
+ [{capture, all, list}]),
+ try list_to_integer(PidS)
catch error:badarg ->
exit({error, {garbage_in_pid_file, PidFile}})
end,
- S;
+ PidS;
{{error, enoent}, true} ->
timer:sleep(?EXTERNAL_CHECK_INTERVAL),
read_pid_file(PidFile, Wait);
@@ -425,8 +514,7 @@ read_pid_file(PidFile, Wait) ->
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
with_os([{unix, fun () ->
- system("ps -p " ++ Pid
- ++ " >/dev/null 2>&1") =:= 0
+ run_ps(Pid) =:= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -444,15 +532,17 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
+run_ps(Pid) ->
+ Port = erlang:open_port({spawn, "ps -p " ++ Pid},
+ [exit_status, {line, 16384},
+ use_stdio, stderr_to_stdout]),
+ exit_loop(Port).
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+exit_loop(Port) ->
+ receive
+ {Port, {exit_status, Rc}} -> Rc;
+ {Port, _} -> exit_loop(Port)
+ end.
format_parse_error({_Line, Mod, Err}) ->
lists:flatten(Mod:format_error(Err)).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a471d282..c07ad832 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -47,16 +47,10 @@
%%----------------------------------------------------------------------------
-boot() ->
- {ok, _} =
- supervisor2:start_child(
- rabbit_sup,
- {rabbit_direct_client_sup,
- {rabbit_client_sup, start_link,
+boot() -> rabbit_sup:start_supervisor_child(
+ rabbit_direct_client_sup, rabbit_client_sup,
[{local, rabbit_direct_client_sup},
- {rabbit_channel_sup, start_link, []}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
- ok.
+ {rabbit_channel_sup, start_link, []}]).
force_event_refresh() ->
[Pid ! force_event_refresh || Pid<- list()],
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 831d5290..58375abb 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -27,7 +27,7 @@
set_check_interval/1, get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
-record(state, {dir,
limit,
@@ -87,9 +87,9 @@ init([Limit]) ->
vm_memory_monitor:get_total_memory()} of
{N1, N2} when is_integer(N1), is_integer(N2) ->
{ok, set_disk_limits(State, Limit)};
- _ ->
+ Err ->
rabbit_log:info("Disabling disk free space monitoring "
- "on unsupported platform~n"),
+ "on unsupported platform: ~p~n", [Err]),
{stop, unsupported_platform}
end.
@@ -167,9 +167,9 @@ get_disk_free(Dir, {unix, Sun})
get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
- parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir));
-get_disk_free(_, _) ->
- unknown.
+ parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
+get_disk_free(_, Platform) ->
+ {unknown, Platform}.
parse_free_unix(CommandResult) ->
[_, Stats | _] = string:tokens(CommandResult, "\n"),
@@ -178,8 +178,9 @@ parse_free_unix(CommandResult) ->
parse_free_win32(CommandResult) ->
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
- [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "),
- list_to_integer(Free).
+ {match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
+ [{capture, all_but_first, list}]),
+ list_to_integer(lists:reverse(Free)).
interpret_limit({mem_relative, R}) ->
round(R * vm_memory_monitor:get_total_memory());
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 4ec141cf..3f1b20fe 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -139,6 +139,6 @@ notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) ->
%% TODO: switch to os:timestamp() when we drop support for
%% Erlang/OTP < R13B01
- gen_event:notify(rabbit_event, #event{type = Type,
- props = Props,
- timestamp = now()}).
+ gen_event:notify(?MODULE, #event{type = Type,
+ props = Props,
+ timestamp = now()}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 910a89b4..57c571f1 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,13 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, callback/3, declare/6,
+-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, update_scratch/2,
+ lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1]).
+-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -37,7 +37,12 @@
-type(fun_name() :: atom()).
-spec(recover/0 :: () -> [name()]).
--spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
+-spec(callback/4::
+ (rabbit_types:exchange(), fun_name(),
+ fun((boolean()) -> non_neg_integer()) | atom(),
+ [any()]) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -58,7 +63,13 @@
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
--spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok').
+-spec(lookup_scratch/2 :: (name(), atom()) ->
+ rabbit_types:ok(term()) |
+ rabbit_types:error('not_found')).
+-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -76,14 +87,16 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(serial/1 :: (rabbit_types:exchange()) ->
+ fun((boolean()) -> 'none' | pos_integer())).
-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
+ policy]).
recover() ->
Xs = rabbit_misc:table_filter(
@@ -95,21 +108,52 @@ recover() ->
true -> store(X);
false -> ok
end,
- rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
+ callback(X, create, map_create_tx(Tx), [X])
end,
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
-callback(#exchange{type = XType}, Fun, Args) ->
- apply(type_to_module(XType), Fun, Args).
+callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+ Serial = fun (Bool) ->
+ case Serial0 of
+ _ when is_atom(Serial0) -> Serial0;
+ _ -> Serial0(Bool)
+ end
+ end,
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
+ || M <- decorators()],
+ Module = type_to_module(XType),
+ apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
+
+policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+
+serialise_events(X = #exchange{type = Type}) ->
+ case [Serialise || M <- decorators(),
+ Serialise <- [M:serialise_events(X)],
+ Serialise == true] of
+ [] -> (type_to_module(Type)):serialise_events();
+ _ -> true
+ end.
+
+serial(#exchange{name = XName} = X) ->
+ Serial = case serialise_events(X) of
+ true -> next_serial(XName);
+ false -> none
+ end,
+ fun (true) -> Serial;
+ (false) -> none
+ end.
+
+decorators() ->
+ [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = #exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args},
+ X = rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args}),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -129,7 +173,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = XT:create(map_create_tx(Tx), Exchange),
+ ok = callback(X, create, map_create_tx(Tx), [Exchange]),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -141,13 +185,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X = #exchange{name = Name, type = Type}) ->
- ok = mnesia:write(rabbit_exchange, X, write),
- case (type_to_module(Type)):serialise_events() of
- true -> S = #exchange_serial{name = Name, next = 1},
- ok = mnesia:write(rabbit_exchange_serial, S, write);
- false -> ok
- end.
+store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -200,23 +238,51 @@ list(VHostPath) ->
rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
-update_scratch(Name, Fun) ->
+lookup_scratch(Name, App) ->
+ case lookup(Name) of
+ {ok, #exchange{scratches = undefined}} ->
+ {error, not_found};
+ {ok, #exchange{scratches = Scratches}} ->
+ case orddict:find(App, Scratches) of
+ {ok, Value} -> {ok, Value};
+ error -> {error, not_found}
+ end;
+ {error, not_found} ->
+ {error, not_found}
+ end.
+
+update_scratch(Name, App, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
- case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable, scratch = Scratch}] ->
- X1 = X#exchange{scratch = Fun(Scratch)},
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange,
- X1, write);
- _ -> ok
- end;
- [] ->
- ok
- end
+ update(Name,
+ fun(X = #exchange{scratches = Scratches0}) ->
+ Scratches1 = case Scratches0 of
+ undefined -> orddict:new();
+ _ -> Scratches0
+ end,
+ Scratch = case orddict:find(App, Scratches1) of
+ {ok, S} -> S;
+ error -> undefined
+ end,
+ Scratches2 = orddict:store(
+ App, Fun(Scratch), Scratches1),
+ X#exchange{scratches = Scratches2}
+ end)
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X = #exchange{durable = Durable}] ->
+ X1 = Fun(X),
+ ok = mnesia:write(rabbit_exchange, X1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -232,6 +298,7 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
+i(policy, X) -> rabbit_policy:name(X);
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
@@ -341,23 +408,18 @@ unconditional_delete(X = #exchange{name = XName}) ->
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-serial(#exchange{name = XName, type = Type}) ->
- case (type_to_module(Type)):serialise_events() of
- true -> next_serial(XName);
- false -> none
- end.
-
next_serial(XName) ->
- [#exchange_serial{next = Serial}] =
- mnesia:read(rabbit_exchange_serial, XName, write),
+ Serial = peek_serial(XName, write),
ok = mnesia:write(rabbit_exchange_serial,
#exchange_serial{name = XName, next = Serial + 1}, write),
Serial.
-peek_serial(XName) ->
- case mnesia:read({rabbit_exchange_serial, XName}) of
+peek_serial(XName) -> peek_serial(XName, read).
+
+peek_serial(XName, LockType) ->
+ case mnesia:read(rabbit_exchange_serial, XName, LockType) of
[#exchange_serial{next = Serial}] -> Serial;
- _ -> undefined
+ _ -> 1
end.
invalid_module(T) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
new file mode 100644
index 00000000..b40ceda9
--- /dev/null
+++ b/src/rabbit_exchange_decorator.erl
@@ -0,0 +1,71 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_decorator).
+
+%% This is like an exchange type except that:
+%%
+%% 1) It applies to all exchanges as soon as it is installed, therefore
+%% 2) It is not allowed to affect validation, so no validate/1 or
+%% assert_args_equivalence/2
+%% 3) It also can't affect routing
+%%
+%% It's possible in the future we might relax 3), or even make these
+%% able to manipulate messages as they are published.
+
+-ifdef(use_specs).
+
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
+-callback description() -> [proplist:property()].
+
+%% Should Rabbit ensure that all binding events that are
+%% delivered to an individual exchange can be serialised? (they
+%% might still be delivered out of order, but there'll be a
+%% serial number).
+-callback serialise_events(rabbit_types:exchange()) -> boolean().
+
+%% called after declaration and recovery
+-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
+
+%% called after exchange (auto)deletion.
+-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
+ 'ok'.
+
+%% called after a binding has been added or recovered
+-callback add_binding(serial(), rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok'.
+
+%% called after bindings have been deleted.
+-callback remove_bindings(serial(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok'.
+
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
+ {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1027570c..e6470b72 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -58,6 +58,10 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
-else.
-export([behaviour_info/1]).
@@ -65,7 +69,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}];
+ {assert_args_equivalence, 2}, {policy_changed, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index cdec1cb9..9a5665c0 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -43,6 +43,7 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index a64f2c29..d9a2f60f 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 61917d8f..516b78e5 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 82d27960..101fe434 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
@@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 3160fdf4..644d9acf 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
+
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
add_binding(none, _Exchange, _Binding) ->
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 59df14f3..a95f8f26 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -102,9 +102,12 @@ read_file_info(File) ->
with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
with_fhc_handle(Fun) ->
- ok = file_handle_cache:obtain(),
+ with_fhc_handle(1, Fun).
+
+with_fhc_handle(N, Fun) ->
+ [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
try Fun()
- after ok = file_handle_cache:release()
+ after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
end.
read_term_file(File) ->
@@ -165,7 +168,7 @@ make_binary(List) ->
{error, Reason}
end.
-
+%% TODO the semantics of this function are rather odd. But see bug 25021.
append_file(File, Suffix) ->
case read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -183,9 +186,11 @@ append_file(File, 0, Suffix) ->
end
end);
append_file(File, _, Suffix) ->
- case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
+ case with_fhc_handle(2, fun () ->
+ file:copy(File, {[File, Suffix], [append]})
+ end) of
+ {ok, _BytesCopied} -> ok;
+ Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9fa6213b..2b15498e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -192,7 +192,7 @@ terminate(_, _) ->
ok.
code_change(_, State, _) ->
- State.
+ {ok, State}.
%%----------------------------------------------------------------------------
%% Internal plumbing
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb4..3e058793 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4e71cc43..750bcd56 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,21 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
+ Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ monitor_wait(MRefs),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
+monitor_wait([]) ->
+ ok;
+monitor_wait([MRef | MRefs]) ->
+ receive({'DOWN', MRef, process, _Pid, _Info}) ->
+ ok
+ end,
+ monitor_wait(MRefs).
+
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc..03fafc3e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, {gm, Msg}),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Deaths) ->
- rabbit_misc:with_exit_handler(
- fun () -> {stop, normal} end,
- fun () ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
- ok ->
- ok;
- {promote, CPid} ->
- {become, rabbit_mirror_queue_coordinator, [CPid]}
- end
- end).
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
%% ---------------------------------------------------------------------------
%% Others
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 8eacb1f3..a2034876 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start/0, start_link/0, start_child/2]).
+-export([start_link/0, start_child/2]).
-export([init/1]).
@@ -26,20 +26,9 @@
-define(SERVER, ?MODULE).
-start() ->
- {ok, _} =
- supervisor2:start_child(
- rabbit_sup,
- {rabbit_mirror_queue_slave_sup,
- {rabbit_mirror_queue_slave_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
- ok.
+start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_child(Node, Args) ->
- supervisor2:start_child({?SERVER, Node}, Args).
+start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
{ok, {{simple_one_for_one_terminate, 10, 10},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0aacd654..d41aa09b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, amqp_error/4,
+-export([die/1, frame_error/2, amqp_error/4, quit/1, quit/2,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
@@ -40,25 +40,24 @@
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
--export([start_applications/1, stop_applications/1]).
+-export([format/2, format_many/1, format_stderr/2]).
+-export([with_local_io/1, local_info_msg/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
--export([get_options/2]).
+-export([parse_arguments/3]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
--export([pget/2, pget/3, pget_or_die/2]).
+-export([pget/2, pget/3, pget_or_die/2, pset/3]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
--export([quit/1]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
@@ -71,7 +70,7 @@
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
-type(resource_name() :: binary()).
--type(optdef() :: {flag, string()} | {option, string(), any()}).
+-type(optdef() :: flag | {option, string()}).
-type(channel_or_connection_exit()
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
@@ -86,6 +85,10 @@
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 ::
(rabbit_framing:amqp_exception()) -> channel_or_connection_exit()).
+
+-spec(quit/1 :: (integer()) -> no_return()).
+-spec(quit/2 :: (string(), [term()]) -> no_return()).
+
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
-> rabbit_types:connection_exit()).
-spec(amqp_error/4 ::
@@ -158,11 +161,10 @@
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
-spec(format/2 :: (string(), [any()]) -> string()).
+-spec(format_many/1 :: ([{string(), [any()]}]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
--spec(start_applications/1 :: ([atom()]) -> 'ok').
--spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
@@ -180,8 +182,12 @@
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
--spec(get_options/2 :: ([optdef()], [string()])
- -> {[string()], [{string(), any()}]}).
+-spec(parse_arguments/3 ::
+ ([{atom(), [{string(), optdef()}]} | atom()],
+ [{string(), optdef()}],
+ [string()])
+ -> {'ok', {atom(), [{string(), string()}], [string()]}} |
+ 'no_command').
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
@@ -199,11 +205,11 @@
-spec(pget/2 :: (term(), [term()]) -> term()).
-spec(pget/3 :: (term(), [term()], term()) -> term()).
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
+-spec(pset/3 :: (term(), term(), [term()]) -> term()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
--spec(quit/1 :: (integer() | string()) -> no_return()).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
@@ -384,6 +390,28 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
confirm_to_sender(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+%%
+%% @doc Halts the emulator after printing out an error message io-formatted with
+%% the supplied arguments. The exit status of the beam process will be set to 1.
+%%
+quit(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ quit(1).
+
+%%
+%% @doc Halts the emulator returning the given status code to the os.
+%% On Windows this function will block indefinitely so as to give the io
+%% subsystem time to flush stdout completely.
+%%
+quit(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
+ end.
+
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
@@ -551,6 +579,9 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+format_many(List) ->
+ lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]).
+
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -583,34 +614,6 @@ with_local_io(Fun) ->
local_info_msg(Format, Args) ->
with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
- Iterate(fun (App, Acc) ->
- case Do(App) of
- ok -> [App | Acc];
- {error, {SkipError, _}} -> Acc;
- {error, Reason} ->
- lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
- end
- end, [], Apps),
- ok.
-
-start_applications(Apps) ->
- manage_applications(fun lists:foldl/3,
- fun application:start/1,
- fun application:stop/1,
- already_started,
- cannot_start_application,
- Apps).
-
-stop_applications(Apps) ->
- manage_applications(fun lists:foldr/3,
- fun application:stop/1,
- fun application:start/1,
- not_started,
- cannot_stop_application,
- Apps).
-
unfold(Fun, Init) ->
unfold(Fun, [], Init).
@@ -730,39 +733,63 @@ gb_trees_fold1(Fun, Acc, {Key, Val, It}) ->
gb_trees_foreach(Fun, Tree) ->
gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree).
-%% Separate flags and options from arguments.
-%% get_options([{flag, "-q"}, {option, "-p", "/"}],
-%% ["set_permissions","-p","/","guest",
-%% "-q",".*",".*",".*"])
-%% == {["set_permissions","guest",".*",".*",".*"],
-%% [{"-q",true},{"-p","/"}]}
-get_options(Defs, As) ->
- lists:foldl(fun(Def, {AsIn, RsIn}) ->
- {K, {AsOut, V}} =
- case Def of
- {flag, Key} ->
- {Key, get_flag(Key, AsIn)};
- {option, Key, Default} ->
- {Key, get_option(Key, Default, AsIn)}
- end,
- {AsOut, [{K, V} | RsIn]}
- end, {As, []}, Defs).
-
-get_option(K, _Default, [K, V | As]) ->
- {As, V};
-get_option(K, Default, [Nk | As]) ->
- {As1, V} = get_option(K, Default, As),
- {[Nk | As1], V};
-get_option(_, Default, As) ->
- {As, Default}.
-
-get_flag(K, [K | As]) ->
- {As, true};
-get_flag(K, [Nk | As]) ->
- {As1, V} = get_flag(K, As),
- {[Nk | As1], V};
-get_flag(_, []) ->
- {[], false}.
+%% Takes:
+%% * A list of [{atom(), [{string(), optdef()]} | atom()], where the atom()s
+%% are the accepted commands and the optional [string()] is the list of
+%% accepted options for that command
+%% * A list [{string(), optdef()}] of options valid for all commands
+%% * The list of arguments given by the user
+%%
+%% Returns either {ok, {atom(), [{string(), string()}], [string()]} which are
+%% respectively the command, the key-value pairs of the options and the leftover
+%% arguments; or no_command if no command could be parsed.
+parse_arguments(Commands, GlobalDefs, As) ->
+ lists:foldl(maybe_process_opts(GlobalDefs, As), no_command, Commands).
+
+maybe_process_opts(GDefs, As) ->
+ fun({C, Os}, no_command) ->
+ process_opts(atom_to_list(C), dict:from_list(GDefs ++ Os), As);
+ (C, no_command) ->
+ (maybe_process_opts(GDefs, As))({C, []}, no_command);
+ (_, {ok, Res}) ->
+ {ok, Res}
+ end.
+
+process_opts(C, Defs, As0) ->
+ KVs0 = dict:map(fun (_, flag) -> false;
+ (_, {option, V}) -> V
+ end, Defs),
+ process_opts(Defs, C, As0, not_found, KVs0, []).
+
+%% Consume flags/options until you find the correct command. If there are no
+%% arguments or the first argument is not the command we're expecting, fail.
+%% Arguments to this are: definitions, cmd we're looking for, args we
+%% haven't parsed, whether we have found the cmd, options we've found,
+%% plain args we've found.
+process_opts(_Defs, C, [], found, KVs, Outs) ->
+ {ok, {list_to_atom(C), dict:to_list(KVs), lists:reverse(Outs)}};
+process_opts(_Defs, _C, [], not_found, _, _) ->
+ no_command;
+process_opts(Defs, C, [A | As], Found, KVs, Outs) ->
+ OptType = case dict:find(A, Defs) of
+ error -> none;
+ {ok, flag} -> flag;
+ {ok, {option, _}} -> option
+ end,
+ case {OptType, C, Found} of
+ {flag, _, _} -> process_opts(
+ Defs, C, As, Found, dict:store(A, true, KVs),
+ Outs);
+ {option, _, _} -> case As of
+ [] -> no_command;
+ [V | As1] -> process_opts(
+ Defs, C, As1, Found,
+ dict:store(A, V, KVs), Outs)
+ end;
+ {none, A, _} -> process_opts(Defs, C, As, found, KVs, Outs);
+ {none, _, found} -> process_opts(Defs, C, As, found, KVs, [A | Outs]);
+ {none, _, _} -> no_command
+ end.
now_ms() ->
timer:now_diff(now(), {0,0,0}) div 1000.
@@ -848,6 +875,8 @@ pget_or_die(K, P) ->
V -> V
end.
+pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)].
+
format_message_queue(_Opt, MQ) ->
Len = priority_queue:len(MQ),
{Len,
@@ -901,13 +930,6 @@ receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
end.
-%% the slower shutdown on windows required to flush stdout
-quit(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status)
- end.
-
os_cmd(Command) ->
Exec = hd(string:tokens(Command, " ")),
case os:find_executable(Exec) of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c714d3a7..7e9346f9 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -296,6 +296,11 @@ table_definitions() ->
[{record_name, exchange_serial},
{attributes, record_info(fields, exchange_serial)},
{match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
+ {rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, record_info(fields, runtime_parameters)},
+ {disc_copies, [node()]},
+ {match, #runtime_parameters{_='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index e6a05335..bedf5142 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- maybe_fast_close/1, sockname/1, peername/1, peercert/1,
- connection_string/2]).
+ recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
+ close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -34,6 +34,8 @@
-type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())).
-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
+-type(opts() :: [{atom(), any()} |
+ {raw, non_neg_integer(), non_neg_integer(), binary()}]).
-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(ssl_info/1 :: (socket())
@@ -49,9 +51,12 @@
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
--spec(setopts/2 :: (socket(), [{atom(), any()} |
- {raw, non_neg_integer(), non_neg_integer(),
- binary()}]) -> ok_or_any_error()).
+-spec(getopts/2 :: (socket(), [atom() | {raw,
+ non_neg_integer(),
+ non_neg_integer(),
+ non_neg_integer() | binary()}])
+ -> ok_val_or_error(opts())).
+-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
-spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
@@ -64,6 +69,7 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
@@ -126,6 +132,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) ->
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
+getopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:getopts(Sock#ssl_socket.ssl, Options);
+getopts(Sock, Options) when is_port(Sock) ->
+ inet:getopts(Sock, Options).
+
setopts(Sock, Options) when ?IS_SSL(Sock) ->
ssl:setopts(Sock#ssl_socket.ssl, Options);
setopts(Sock, Options) when is_port(Sock) ->
@@ -149,6 +160,13 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+tune_buffer_size(Sock) ->
+ case getopts(Sock, [sndbuf, recbuf, buffer]) of
+ {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+ setopts(Sock, [{buffer, BufSz}]);
+ Err -> Err
+ end.
+
connection_string(Sock, Direction) ->
{From, To} = case Direction of
inbound -> {fun peername/1, fun sockname/1};
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f0c75d23..94a5a2b7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -136,18 +136,13 @@ boot_ssl() ->
ok
end.
-start() ->
- {ok,_} = supervisor2:start_child(
- rabbit_sup,
- {rabbit_tcp_client_sup,
- {rabbit_client_sup, start_link,
- [{local, rabbit_tcp_client_sup},
- {rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
- ok.
+start() -> rabbit_sup:start_supervisor_child(
+ rabbit_tcp_client_sup, rabbit_client_sup,
+ [{local, rabbit_tcp_client_sup},
+ {rabbit_connection_sup,start_link,[]}]).
ensure_ssl() ->
- ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
+ ok = app_utils:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
% unknown_ca errors are silently ignored prior to R14B unless we
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index b6a9e263..1c23632d 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -57,8 +57,7 @@ diagnostics(Nodes) ->
"hosts, their running nodes and ports:", [Nodes]}] ++
[diagnostics_host(Host) || Host <- Hosts] ++
diagnostics0(),
- lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags,
- {F, A} <- [NodeDiag]]).
+ rabbit_misc:format_many(lists:flatten(NodeDiags)).
diagnostics0() ->
[{"~ncurrent node details:~n- node name: ~w", [node()]},
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
new file mode 100644
index 00000000..af940dde
--- /dev/null
+++ b/src/rabbit_parameter_validation.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_parameter_validation).
+
+-export([number/2, binary/2, list/2, proplist/3]).
+
+number(_Name, Term) when is_number(Term) ->
+ ok;
+
+number(Name, Term) ->
+ {error, "~s should be number, actually was ~p", [Name, Term]}.
+
+binary(_Name, Term) when is_binary(Term) ->
+ ok;
+
+binary(Name, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]}.
+
+list(_Name, Term) when is_list(Term) ->
+ ok;
+
+list(Name, Term) ->
+ {error, "~s should be list, actually was ~p", [Name, Term]}.
+
+proplist(Name, Constraints, Term) when is_list(Term) ->
+ {Results, Remainder}
+ = lists:foldl(
+ fun ({Key, Fun, Needed}, {Results0, Term0}) ->
+ case {lists:keytake(Key, 1, Term0), Needed} of
+ {{value, {Key, Value}, Term1}, _} ->
+ {[Fun(Key, Value) | Results0],
+ Term1};
+ {false, mandatory} ->
+ {[{error, "Key \"~s\" not found in ~s",
+ [Key, Name]} | Results0], Term0};
+ {false, optional} ->
+ {Results0, Term0}
+ end
+ end, {[], Term}, Constraints),
+ case Remainder of
+ [] -> Results;
+ _ -> [{error, "Unrecognised terms ~p in ~s", [Remainder, Name]}
+ | Results]
+ end;
+
+proplist(Name, _Constraints, Term) ->
+ {error, "~s not a list ~p", [Name, Term]}.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 2a93c8f2..7cf6eea9 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -17,146 +17,57 @@
-module(rabbit_plugins).
-include("rabbit.hrl").
--export([start/0, stop/0, find_plugins/1, read_enabled_plugins/1,
- lookup_plugins/2, calculate_required_plugins/2, plugin_names/1]).
+-export([setup/0, active/0, read_enabled/1,
+ list/1, dependencies/3]).
--define(VERBOSE_OPT, "-v").
--define(MINIMAL_OPT, "-m").
--define(ENABLED_OPT, "-E").
--define(ENABLED_ALL_OPT, "-e").
+-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
+-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
+-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
+-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start/0 :: () -> no_return()).
--spec(stop/0 :: () -> 'ok').
--spec(find_plugins/1 :: (file:filename()) -> [#plugin{}]).
--spec(read_enabled_plugins/1 :: (file:filename()) -> [atom()]).
--spec(lookup_plugins/2 :: ([atom()], [#plugin{}]) -> [#plugin{}]).
--spec(calculate_required_plugins/2 :: ([atom()], [#plugin{}]) -> [atom()]).
--spec(plugin_names/1 :: ([#plugin{}]) -> [atom()]).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start() ->
- {ok, [[PluginsFile|_]|_]} =
- init:get_argument(enabled_plugins_file),
- {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
- {[Command0 | Args], Opts} =
- case rabbit_misc:get_options([{flag, ?VERBOSE_OPT},
- {flag, ?MINIMAL_OPT},
- {flag, ?ENABLED_OPT},
- {flag, ?ENABLED_ALL_OPT}],
- init:get_plain_arguments()) of
- {[], _Opts} -> usage();
- CmdArgsAndOpts -> CmdArgsAndOpts
- end,
- Command = list_to_atom(Command0),
- PrintInvalidCommandError =
- fun () ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")])
- end,
-
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
- ok ->
- rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- PrintInvalidCommandError(),
- usage();
- {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
- PrintInvalidCommandError(),
- usage();
- {error, Reason} ->
- print_error("~p", [Reason]),
- rabbit_misc:quit(2);
- {error_string, Reason} ->
- print_error("~s", [Reason]),
- rabbit_misc:quit(2);
- Other ->
- print_error("~p", [Other]),
- rabbit_misc:quit(2)
- end.
-
-stop() ->
- ok.
-
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+-define(GLOBAL_DEFS, []).
-usage() ->
- io:format("~s", [rabbit_plugins_usage:usage()]),
- rabbit_misc:quit(1).
+-define(COMMANDS,
+ [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
+ enable,
+ disable]).
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+-ifdef(use_specs).
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
- case ToEnable0 of
- [] -> throw({error_string, "Not enough arguments for 'enable'"});
- _ -> ok
- end,
- AllPlugins = find_plugins(PluginsDir),
- Enabled = read_enabled_plugins(PluginsFile),
- ImplicitlyEnabled = calculate_required_plugins(Enabled, AllPlugins),
- ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
- Missing = ToEnable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> throw({error_string,
- fmt_list("The following plugins could not be found:",
- Missing)})
- end,
- NewEnabled = lists:usort(Enabled ++ ToEnable),
- write_enabled_plugins(PluginsFile, NewEnabled),
- NewImplicitlyEnabled = calculate_required_plugins(NewEnabled, AllPlugins),
- maybe_warn_mochiweb(NewImplicitlyEnabled),
- case NewEnabled -- ImplicitlyEnabled of
- [] -> io:format("Plugin configuration unchanged.~n");
- _ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
+-spec(setup/0 :: () -> [atom()]).
+-spec(active/0 :: () -> [atom()]).
+-spec(list/1 :: (string()) -> [#plugin{}]).
+-spec(read_enabled/1 :: (file:filename()) -> [atom()]).
+-spec(dependencies/3 ::
+ (boolean(), [atom()], [#plugin{}]) -> [atom()]).
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
- case ToDisable0 of
- [] -> throw({error_string, "Not enough arguments for 'disable'"});
- _ -> ok
- end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = read_enabled_plugins(PluginsFile),
- AllPlugins = find_plugins(PluginsDir),
- Missing = ToDisable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> print_list("Warning: the following plugins could not be found:",
- Missing)
- end,
- ToDisableDeps = calculate_dependencies(true, ToDisable, AllPlugins),
- NewEnabled = Enabled -- ToDisableDeps,
- case length(Enabled) =:= length(NewEnabled) of
- true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- calculate_required_plugins(Enabled, AllPlugins),
- NewImplicitlyEnabled =
- calculate_required_plugins(NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
- ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
+-endif.
%%----------------------------------------------------------------------------
-%% Get the #plugin{}s ready to be enabled.
-find_plugins(PluginsDir) ->
+%%
+%% @doc Prepares the file system and installs all enabled plugins.
+%%
+setup() ->
+ {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ {ok, EnabledPluginsFile} = application:get_env(rabbit,
+ enabled_plugins_file),
+ prepare_plugins(EnabledPluginsFile, PluginDir, ExpandDir),
+ [prepare_dir_plugin(PluginName) ||
+ PluginName <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+
+%% @doc Lists the plugins which are currently running.
+active() ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ [App || {App, _, _} <- application:which_applications(),
+ lists:member(App, InstalledPlugins)].
+
+%% @doc Get the list of plugins which are ready to be enabled.
+list(PluginsDir) ->
EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)],
FreeApps = [{app, App} ||
App <- filelib:wildcard("*/ebin/*.app", PluginsDir)],
@@ -175,6 +86,91 @@ find_plugins(PluginsDir) ->
end,
Plugins.
+%% @doc Read the list of enabled plugins from the supplied term file.
+read_enabled(PluginsFile) ->
+ case rabbit_file:read_term_file(PluginsFile) of
+ {ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+%%
+%% @doc Calculate the dependency graph from <i>Sources</i>.
+%% When Reverse =:= true the bottom/leaf level applications are returned in
+%% the resulting list, otherwise they're skipped.
+%%
+dependencies(Reverse, Sources, AllPlugins) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps}
+ || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ Dests = case Reverse of
+ false -> digraph_utils:reachable(Sources, G);
+ true -> digraph_utils:reaching(Sources, G)
+ end,
+ true = digraph:delete(G),
+ Dests.
+
+%%----------------------------------------------------------------------------
+
+prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
+ AllPlugins = list(PluginsDistDir),
+ Enabled = read_enabled(EnabledPluginsFile),
+ ToUnpack = dependencies(false, Enabled, AllPlugins),
+ ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
+
+ Missing = Enabled -- plugin_names(ToUnpackPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> io:format("Warning: the following enabled plugins were "
+ "not found: ~p~n", [Missing])
+ end,
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> rabbit_misc:quit("Could not delete dir ~s (~p)",
+ [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> rabbit_misc:quit("Could not create dir ~s (~p)",
+ [DestDir, E2])
+ end,
+
+ [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
+
+prepare_dir_plugin(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN),
+
+ %% We want the second-last token
+ NameTokens = string:tokens(PluginAppDescFn,"/."),
+ PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
+ list_to_atom(PluginNameString).
+
+%%----------------------------------------------------------------------------
+
+delete_recursively(Fn) ->
+ case rabbit_file:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
+ end.
+
+prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
+ zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ PluginsDestDir) ->
+ rabbit_file:recursive_copy(Location,
+ filename:join([PluginsDestDir, Name])).
+
%% Get the #plugin{} from an .ez.
get_plugin_info(Base, {ez, EZ0}) ->
EZ = filename:join([Base, EZ0]),
@@ -234,82 +230,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
- Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
- Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
- Format = case {Verbose, Minimal} of
- {false, false} -> normal;
- {true, false} -> verbose;
- {false, true} -> minimal;
- {true, true} -> throw({error_string,
- "Cannot specify -m and -v together"})
- end,
- OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
- OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
-
- AvailablePlugins = find_plugins(PluginsDir),
- EnabledExplicitly = read_enabled_plugins(PluginsFile),
- EnabledImplicitly =
- calculate_required_plugins(EnabledExplicitly, AvailablePlugins) --
- EnabledExplicitly,
- {ok, RE} = re:compile(Pattern),
- Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins,
- re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- true -> true
- end,
- if OnlyEnabledAll ->
- lists:member(Name, EnabledImplicitly) or
- lists:member(Name, EnabledExplicitly);
- true ->
- true
- end],
- Plugins1 = usort_plugins(Plugins),
- MaxWidth = lists:max([length(atom_to_list(Name)) ||
- #plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
- MaxWidth) || P <- Plugins1],
- ok.
-
-format_plugin(#plugin{name = Name, version = Version,
- description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly)} of
- {true, false} -> "[E]";
- {false, true} -> "[e]";
- _ -> "[ ]"
- end,
- case Format of
- minimal -> io:format("~s~n", [Name]);
- normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
- "w ~s~n", [Glyph, Name, Version]);
- verbose -> io:format("~s ~w~n", [Glyph, Name]),
- io:format(" Version: \t~s~n", [Version]),
- case Deps of
- [] -> ok;
- _ -> io:format(" Dependencies:\t~p~n", [Deps])
- end,
- io:format(" Description:\t~s~n", [Description]),
- io:format("~n")
- end.
-
-print_list(Header, Plugins) ->
- io:format(fmt_list(Header, Plugins)).
-
-fmt_list(Header, Plugins) ->
- lists:flatten(
- [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-
-usort_plugins(Plugins) ->
- lists:usort(fun plugins_cmp/2, Plugins).
-
-plugins_cmp(#plugin{name = N1, version = V1},
- #plugin{name = N2, version = V2}) ->
- {N1, V1} =< {N2, V2}.
-
%% Filter out applications that can be loaded *right now*.
filter_applications(Applications) ->
[Application || Application <- Applications,
@@ -332,72 +252,3 @@ plugin_names(Plugins) ->
%% Find plugins by name in a list of plugins.
lookup_plugins(Names, AllPlugins) ->
[P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)].
-
-%% Read the enabled plugin names from disk.
-read_enabled_plugins(PluginsFile) ->
- case rabbit_file:read_term_file(PluginsFile) of
- {ok, [Plugins]} -> Plugins;
- {ok, []} -> [];
- {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
- PluginsFile}});
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
- PluginsFile, Reason}})
- end.
-
-%% Write the enabled plugin names on disk.
-write_enabled_plugins(PluginsFile, Plugins) ->
- case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
- ok -> ok;
- {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
- PluginsFile, Reason}})
- end.
-
-calculate_required_plugins(Sources, AllPlugins) ->
- calculate_dependencies(false, Sources, AllPlugins).
-
-calculate_dependencies(Reverse, Sources, AllPlugins) ->
- {ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- [{Name, Deps}
- || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
- Dests = case Reverse of
- false -> digraph_utils:reachable(Sources, G);
- true -> digraph_utils:reaching(Sources, G)
- end,
- true = digraph:delete(G),
- Dests.
-
-maybe_warn_mochiweb(Enabled) ->
- V = erlang:system_info(otp_release),
- case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
- true ->
- Stars = string:copies("*", 80),
- io:format("~n~n~s~n"
- " Warning: Mochiweb enabled and Erlang version ~s "
- "detected.~n"
- " Enabling plugins that depend on Mochiweb is not "
- "supported on this Erlang~n"
- " version. At least R13B01 is required.~n~n"
- " RabbitMQ will not start successfully in this "
- "configuration. You *must*~n"
- " disable the Mochiweb plugin, or upgrade Erlang.~n"
- "~s~n~n~n", [Stars, V, Stars]);
- false ->
- ok
- end.
-
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n"),
- case os:type() of
- {win32, _OsName} ->
- io:format("If you have RabbitMQ running as a service then you must"
- " reinstall by running~n rabbitmq-service.bat stop~n"
- " rabbitmq-service.bat install~n"
- " rabbitmq-service.bat start~n~n");
- _ ->
- ok
- end.
-
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
new file mode 100644
index 00000000..572cf150
--- /dev/null
+++ b/src/rabbit_plugins_main.erl
@@ -0,0 +1,273 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_plugins_main).
+-include("rabbit.hrl").
+
+-export([start/0, stop/0]).
+
+-define(VERBOSE_OPT, "-v").
+-define(MINIMAL_OPT, "-m").
+-define(ENABLED_OPT, "-E").
+-define(ENABLED_ALL_OPT, "-e").
+
+-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
+-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
+-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
+-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+
+-define(GLOBAL_DEFS, []).
+
+-define(COMMANDS,
+ [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
+ enable,
+ disable]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(usage/0 :: () -> no_return()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ {ok, [[PluginsFile|_]|_]} =
+ init:get_argument(enabled_plugins_file),
+ {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
+ {Command, Opts, Args} =
+ case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
+ init:get_plain_arguments())
+ of
+ {ok, Res} -> Res;
+ no_command -> print_error("could not recognise command", []),
+ usage()
+ end,
+
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
+ case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ ok ->
+ rabbit_misc:quit(0);
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
+ usage();
+ {error, Reason} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
+ rabbit_misc:quit(2);
+ Other ->
+ print_error("~p", [Other]),
+ rabbit_misc:quit(2)
+ end.
+
+stop() ->
+ ok.
+
+%%----------------------------------------------------------------------------
+
+action(list, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+
+action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToEnable0 of
+ [] -> throw({error_string, "Not enough arguments for 'enable'"});
+ _ -> ok
+ end,
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ Enabled, AllPlugins),
+ ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
+ Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string,
+ fmt_list("The following plugins could not be found:",
+ Missing)})
+ end,
+ NewEnabled = lists:usort(Enabled ++ ToEnable),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
+ maybe_warn_mochiweb(NewImplicitlyEnabled),
+ case NewEnabled -- ImplicitlyEnabled of
+ [] -> io:format("Plugin configuration unchanged.~n");
+ _ -> print_list("The following plugins have been enabled:",
+ NewImplicitlyEnabled -- ImplicitlyEnabled),
+ report_change()
+ end;
+
+action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToDisable0 of
+ [] -> throw({error_string, "Not enough arguments for 'disable'"});
+ _ -> ok
+ end,
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Missing = ToDisable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> print_list("Warning: the following plugins could not be found:",
+ Missing)
+ end,
+ ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
+ NewEnabled = Enabled -- ToDisableDeps,
+ case length(Enabled) =:= length(NewEnabled) of
+ true -> io:format("Plugin configuration unchanged.~n");
+ false -> ImplicitlyEnabled =
+ rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ NewImplicitlyEnabled =
+ rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
+ print_list("The following plugins have been disabled:",
+ ImplicitlyEnabled -- NewImplicitlyEnabled),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ report_change()
+ end.
+
+%%----------------------------------------------------------------------------
+
+print_error(Format, Args) ->
+ rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+
+usage() ->
+ io:format("~s", [rabbit_plugins_usage:usage()]),
+ rabbit_misc:quit(1).
+
+%% Pretty print a list of plugins.
+format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+ Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
+ Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
+ Format = case {Verbose, Minimal} of
+ {false, false} -> normal;
+ {true, false} -> verbose;
+ {false, true} -> minimal;
+ {true, true} -> throw({error_string,
+ "Cannot specify -m and -v together"})
+ end,
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
+
+ AvailablePlugins = rabbit_plugins:list(PluginsDir),
+ EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
+ EnabledImplicitly =
+ rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins) -- EnabledExplicitly,
+ {ok, RE} = re:compile(Pattern),
+ Plugins = [ Plugin ||
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
+ re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> (lists:member(Name,
+ EnabledExplicitly) or
+ lists:member(Name, EnabledImplicitly));
+ true -> true
+ end],
+ Plugins1 = usort_plugins(Plugins),
+ MaxWidth = lists:max([length(atom_to_list(Name)) ||
+ #plugin{name = Name} <- Plugins1] ++ [0]),
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
+ MaxWidth) || P <- Plugins1],
+ ok.
+
+format_plugin(#plugin{name = Name, version = Version,
+ description = Description, dependencies = Deps},
+ EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ Glyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "[E]";
+ {false, true} -> "[e]";
+ _ -> "[ ]"
+ end,
+ case Format of
+ minimal -> io:format("~s~n", [Name]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
+ "w ~s~n", [Glyph, Name, Version]);
+ verbose -> io:format("~s ~w~n", [Glyph, Name]),
+ io:format(" Version: \t~s~n", [Version]),
+ case Deps of
+ [] -> ok;
+ _ -> io:format(" Dependencies:\t~p~n", [Deps])
+ end,
+ io:format(" Description:\t~s~n", [Description]),
+ io:format("~n")
+ end.
+
+print_list(Header, Plugins) ->
+ io:format(fmt_list(Header, Plugins)).
+
+fmt_list(Header, Plugins) ->
+ lists:flatten(
+ [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
+
+usort_plugins(Plugins) ->
+ lists:usort(fun plugins_cmp/2, Plugins).
+
+plugins_cmp(#plugin{name = N1, version = V1},
+ #plugin{name = N2, version = V2}) ->
+ {N1, V1} =< {N2, V2}.
+
+%% Return the names of the given plugins.
+plugin_names(Plugins) ->
+ [Name || #plugin{name = Name} <- Plugins].
+
+%% Write the enabled plugin names on disk.
+write_enabled_plugins(PluginsFile, Plugins) ->
+ case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
+ ok -> ok;
+ {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+maybe_warn_mochiweb(Enabled) ->
+ V = erlang:system_info(otp_release),
+ case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
+ true ->
+ Stars = string:copies("*", 80),
+ io:format("~n~n~s~n"
+ " Warning: Mochiweb enabled and Erlang version ~s "
+ "detected.~n"
+ " Enabling plugins that depend on Mochiweb is not "
+ "supported on this Erlang~n"
+ " version. At least R13B01 is required.~n~n"
+ " RabbitMQ will not start successfully in this "
+ "configuration. You *must*~n"
+ " disable the Mochiweb plugin, or upgrade Erlang.~n"
+ "~s~n~n~n", [Stars, V, Stars]);
+ false ->
+ ok
+ end.
+
+report_change() ->
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n").
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
new file mode 100644
index 00000000..1551795f
--- /dev/null
+++ b/src/rabbit_policy.erl
@@ -0,0 +1,156 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy).
+
+%% TODO specs
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+-export([register/0]).
+-export([name/1, get/2, set/1]).
+-export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "policy parameters"},
+ {mfa, {rabbit_policy, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
+
+name(#amqqueue{policy = Policy}) -> name0(Policy);
+name(#exchange{policy = Policy}) -> name0(Policy).
+
+name0(undefined) -> none;
+name0(Policy) -> pget(<<"name">>, Policy).
+
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+
+set0(Name) -> match(Name, list()).
+
+get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
+get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
+%% Caution - SLOW.
+get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())).
+
+get0(_Name, undefined) -> {error, not_found};
+get0(Name, List) -> case pget(<<"policy">>, List) of
+ undefined -> {error, not_found};
+ Policy -> case pget(Name, Policy) of
+ undefined -> {error, not_found};
+ Value -> {ok, Value}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+
+validate(<<"policy">>, Name, Term) ->
+ rabbit_parameter_validation:proplist(
+ Name, policy_validation(), Term).
+
+validate_clear(<<"policy">>, _Name) ->
+ ok.
+
+notify(<<"policy">>, _Name, _Term) ->
+ update_policies().
+
+notify_clear(<<"policy">>, _Name) ->
+ update_policies().
+
+%%----------------------------------------------------------------------------
+
+list() ->
+ [[{<<"name">>, pget(key, P)} | pget(value, P)]
+ || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+
+update_policies() ->
+ Policies = list(),
+ {Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ {[update_exchange(X, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end),
+ [notify(X) || X <- Xs],
+ [notify(Q) || Q <- Qs],
+ ok.
+
+update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(XName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_exchange:update(
+ XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
+ {X, X#exchange{policy = NewPolicy}}
+ end.
+
+update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(QName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_amqqueue:update(
+ QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
+ {Q, Q#amqqueue{policy = NewPolicy}}
+ end.
+
+notify(no_change)->
+ ok;
+notify({X1 = #exchange{}, X2 = #exchange{}}) ->
+ rabbit_exchange:policy_changed(X1, X2);
+notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
+ rabbit_amqqueue:policy_changed(Q1, Q2).
+
+match(Name, Policies) ->
+ case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
+ [] -> undefined;
+ [Policy | _Rest] -> Policy
+ end.
+
+matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
+ Prefix = pget(<<"prefix">>, Policy),
+ case pget(<<"vhost">>, Policy) of
+ undefined -> prefix(Prefix, Name);
+ VHost -> prefix(Prefix, Name);
+ _ -> false
+ end.
+
+prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+
+sort_pred(A, B) ->
+ R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
+ case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
+ {undefined, undefined} -> R;
+ {undefined, _} -> true;
+ {_, undefined} -> false;
+ _ -> R
+ end.
+
+%%----------------------------------------------------------------------------
+
+policy_validation() ->
+ [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
+ {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 162d44f1..d56211b5 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -31,212 +31,21 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
-%% Shut dialyzer up
--spec(terminate/1 :: (string()) -> no_return()).
--spec(terminate/2 :: (string(), [any()]) -> no_return()).
-endif.
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ...~n"),
-
- %% Determine our various directories
- [EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir, NodeStr] =
- init:get_plain_arguments(),
- RootName = UnpackedPluginDir ++ "/rabbit",
-
- prepare_plugins(EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir),
-
- %% Build a list of required apps based on the fixed set, and any plugins
- PluginApps = find_plugins(UnpackedPluginDir),
- RequiredApps = ?BaseApps ++ PluginApps,
-
- %% Build the entire set of dependencies - this will load the
- %% applications along the way
- AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {failed_to_load_app, App, Err} ->
- terminate("failed to load application ~s:~n~p",
- [App, Err]);
- AppList ->
- AppList
- end,
- AppVersions = [determine_version(App) || App <- AllApps],
- RabbitVersion = proplists:get_value(rabbit, AppVersions),
-
- %% Build the overall release descriptor
- RDesc = {release,
- {"rabbit", RabbitVersion},
- {erts, erlang:system_info(version)},
- AppVersions},
-
- %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
- rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
-
- %% We exclude mochiweb due to its optional use of fdsrv.
- XRefExclude = [mochiweb],
-
- %% Compile the script
- ScriptFile = RootName ++ ".script",
- case systools:make_script(RootName, [local, silent,
- {exref, AllApps -- XRefExclude}]) of
- {ok, Module, Warnings} ->
- %% This gets lots of spurious no-source warnings when we
- %% have .ez files, so we want to supress them to prevent
- %% hiding real issues. On Ubuntu, we also get warnings
- %% about kernel/stdlib sources being out of date, which we
- %% also ignore for the same reason.
- WarningStr = Module:format_warning(
- [W || W <- Warnings,
- case W of
- {warning, {source_not_found, _}} -> false;
- {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
- when WApp == mnesia;
- WApp == stdlib;
- WApp == kernel;
- WApp == sasl;
- WApp == crypto;
- WApp == os_mon -> false;
- _ -> true
- end]),
- case length(WarningStr) of
- 0 -> ok;
- _ -> S = string:copies("*", 80),
- io:format("~n~s~n~s~s~n~n", [S, WarningStr, S])
- end,
- ok;
- {error, Module, Error} ->
- terminate("generation of boot script file ~s failed:~n~s",
- [ScriptFile, Module:format_error(Error)])
- end,
-
- case post_process_script(ScriptFile) of
- ok -> ok;
- {error, Reason} ->
- terminate("post processing of boot script file ~s failed:~n~w",
- [ScriptFile, Reason])
- end,
- case systools:script2boot(RootName) of
- ok -> ok;
- error -> terminate("failed to compile boot script file ~s",
- [ScriptFile])
- end,
- io:format("~w plugins activated:~n", [length(PluginApps)]),
- [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
- || App <- PluginApps],
- io:nl(),
-
+ [NodeStr] = init:get_plain_arguments(),
ok = duplicate_node_check(NodeStr),
-
- terminate(0),
+ rabbit_misc:quit(0),
ok.
stop() ->
ok.
-determine_version(App) ->
- application:load(App),
- {ok, Vsn} = application:get_key(App, vsn),
- {App, Vsn}.
-
-delete_recursively(Fn) ->
- case rabbit_file:recursive_delete([Fn]) of
- ok -> ok;
- {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
- Error -> Error
- end.
-
-prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
- AllPlugins = rabbit_plugins:find_plugins(PluginsDistDir),
- Enabled = rabbit_plugins:read_enabled_plugins(EnabledPluginsFile),
- ToUnpack = rabbit_plugins:calculate_required_plugins(Enabled, AllPlugins),
- ToUnpackPlugins = rabbit_plugins:lookup_plugins(ToUnpack, AllPlugins),
-
- Missing = Enabled -- rabbit_plugins:plugin_names(ToUnpackPlugins),
- case Missing of
- [] -> ok;
- _ -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
- end,
-
- %% Eliminate the contents of the destination directory
- case delete_recursively(DestDir) of
- ok -> ok;
- {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E])
- end,
- case filelib:ensure_dir(DestDir ++ "/") of
- ok -> ok;
- {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
- end,
-
- [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
-
-prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
- zip:unzip(Location, [{cwd, PluginDestDir}]);
-prepare_plugin(#plugin{type = dir, name = Name, location = Location},
- PluginsDestDir) ->
- rabbit_file:recursive_copy(Location,
- filename:join([PluginsDestDir, Name])).
-
-find_plugins(PluginDir) ->
- [prepare_dir_plugin(PluginName) ||
- PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
-
-prepare_dir_plugin(PluginAppDescFn) ->
- %% Add the plugin ebin directory to the load path
- PluginEBinDirN = filename:dirname(PluginAppDescFn),
- code:add_path(PluginEBinDirN),
-
- %% We want the second-last token
- NameTokens = string:tokens(PluginAppDescFn,"/."),
- PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
- list_to_atom(PluginNameString).
-
-expand_dependencies(Pending) ->
- expand_dependencies(sets:new(), Pending).
-expand_dependencies(Current, []) ->
- Current;
-expand_dependencies(Current, [Next|Rest]) ->
- case sets:is_element(Next, Current) of
- true ->
- expand_dependencies(Current, Rest);
- false ->
- case application:load(Next) of
- ok ->
- ok;
- {error, {already_loaded, _}} ->
- ok;
- {error, Reason} ->
- throw({failed_to_load_app, Next, Reason})
- end,
- {ok, Required} = application:get_key(Next, applications),
- Unique = [A || A <- Required, not(sets:is_element(A, Current))],
- expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
- end.
-
-post_process_script(ScriptFile) ->
- case file:consult(ScriptFile) of
- {ok, [{script, Name, Entries}]} ->
- NewEntries = lists:flatmap(fun process_entry/1, Entries),
- case file:open(ScriptFile, [write]) of
- {ok, Fd} ->
- io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
- [date(), time(), {script, Name, NewEntries}]),
- file:close(Fd),
- ok;
- {error, OReason} ->
- {error, {failed_to_open_script_file_for_writing, OReason}}
- end;
- {error, Reason} ->
- {error, {failed_to_load_script, Reason}}
- end.
-
-process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
- [{apply,{rabbit,maybe_hipe_compile,[]}},
- {apply,{rabbit,prepare,[]}}, Entry];
-process_entry(Entry) ->
- [Entry].
+%%----------------------------------------------------------------------------
%% Check whether a node with the same name is already running
duplicate_node_check([]) ->
@@ -252,11 +61,11 @@ duplicate_node_check(NodeStr) ->
"already running on ~p~n",
[NodeName, NodeHost]),
io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
- terminate(?ERROR_CODE);
+ rabbit_misc:quit(?ERROR_CODE);
false -> ok
end;
{error, EpmdReason} ->
- terminate("epmd error for host ~p: ~p (~s)~n",
+ rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
case EpmdReason of
address -> "unable to establish tcp connection";
@@ -264,16 +73,3 @@ duplicate_node_check(NodeStr) ->
_ -> inet:format_error(EpmdReason)
end])
end.
-
-terminate(Fmt, Args) ->
- io:format("ERROR: " ++ Fmt ++ "~n", Args),
- terminate(?ERROR_CODE).
-
-terminate(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status),
- receive
- after infinity -> ok
- end
- end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5e9e78d3..bd5cf588 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_resources/2, server_properties/1]).
+-export([conserve_resources/3, server_properties/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -133,7 +133,7 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, Conserve) ->
+conserve_resources(Pid, _Source, Conserve) ->
Pid ! {conserve_resources, Conserve},
ok.
@@ -222,6 +222,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_by = none,
last_blocked_at = never},
try
+ ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)),
@@ -312,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
-handle_other(timeout, _Deb, #v1{connection_state = S}) ->
- throw({timeout, S});
+handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+ throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -682,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun = fun() -> Parent ! timeout end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -735,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 8c0ebcbe..e14bbba0 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -23,7 +23,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]).
+-export([register/3, unregister/2,
+ binary_to_type/1, lookup_module/2, lookup_all/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -32,6 +33,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/3 :: (atom(), binary(), atom()) -> 'ok').
+-spec(unregister/2 :: (atom(), binary()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
-spec(lookup_module/2 ::
@@ -50,6 +52,9 @@ start_link() ->
register(Class, TypeName, ModuleName) ->
gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity).
+unregister(Class, TypeName) ->
+ gen_server:call(?SERVER, {unregister, Class, TypeName}, infinity).
+
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
%% can throw a badarg, indicating that the type cannot have been
@@ -83,6 +88,10 @@ internal_register(Class, TypeName, ModuleName)
{{Class, internal_binary_to_type(TypeName)}, ModuleName}),
ok.
+internal_unregister(Class, TypeName) ->
+ true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}),
+ ok.
+
sanity_check_module(ClassModule, Module) ->
case catch lists:member(ClassModule,
lists:flatten(
@@ -95,8 +104,10 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator.
%%---------------------------------------------------------------------------
@@ -108,6 +119,10 @@ handle_call({register, Class, TypeName, ModuleName}, _From, State) ->
ok = internal_register(Class, TypeName, ModuleName),
{reply, ok, State};
+handle_call({unregister, Class, TypeName}, _From, State) ->
+ ok = internal_unregister(Class, TypeName),
+ {reply, ok, State};
+
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
new file mode 100644
index 00000000..c7d30116
--- /dev/null
+++ b/src/rabbit_runtime_parameter.erl
@@ -0,0 +1,43 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_runtime_parameter).
+
+-ifdef(use_specs).
+
+-type(validate_results() ::
+ 'ok' | {error, string(), [term()]} | [validate_results()]).
+
+-callback validate(binary(), binary(), term()) -> validate_results().
+-callback validate_clear(binary(), binary()) -> validate_results().
+-callback notify(binary(), binary(), term()) -> 'ok'.
+-callback notify_clear(binary(), binary()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {validate, 3},
+ {validate_clear, 2},
+ {notify, 3},
+ {notify_clear, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
new file mode 100644
index 00000000..3a54e8f6
--- /dev/null
+++ b/src/rabbit_runtime_parameters.erl
@@ -0,0 +1,243 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_runtime_parameters).
+
+-include("rabbit.hrl").
+
+-export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1,
+ list_formatted/0, lookup/2, value/2, value/3, info_keys/0]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
+
+-spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()).
+-spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()).
+-spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()).
+-spec(list/0 :: () -> [rabbit_types:infos()]).
+-spec(list/1 :: (binary()) -> [rabbit_types:infos()]).
+-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
+-spec(list_formatted/0 :: () -> [rabbit_types:infos()]).
+-spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()).
+-spec(value/2 :: (binary(), binary()) -> term()).
+-spec(value/3 :: (binary(), binary(), term()) -> term()).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+
+-endif.
+
+%%---------------------------------------------------------------------------
+
+-import(rabbit_misc, [pget/2, pset/3]).
+
+-define(TABLE, rabbit_runtime_parameters).
+
+%%---------------------------------------------------------------------------
+
+parse_set(Component, Key, String) ->
+ case parse(String) of
+ {ok, Term} -> set(Component, Key, Term);
+ {errors, L} -> format_error(L)
+ end.
+
+set(Component, Key, Term) ->
+ case set0(Component, Key, Term) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+format_error(L) ->
+ {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
+
+set0(Component, Key, Term) ->
+ case lookup_component(Component) of
+ {ok, Mod} ->
+ case flatten_errors(validate(Term)) of
+ ok ->
+ case flatten_errors(Mod:validate(Component, Key, Term)) of
+ ok ->
+ case mnesia_update(Component, Key, Term) of
+ {old, Term} -> ok;
+ _ -> Mod:notify(Component, Key, Term)
+ end,
+ ok;
+ E ->
+ E
+ end;
+ E ->
+ E
+ end;
+ E ->
+ E
+ end.
+
+mnesia_update(Component, Key, Term) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Res = case mnesia:read(?TABLE, {Component, Key}, read) of
+ [] -> new;
+ [Params] -> {old, Params#runtime_parameters.value}
+ end,
+ ok = mnesia:write(?TABLE, c(Component, Key, Term), write),
+ Res
+ end).
+
+clear(Component, Key) ->
+ case clear0(Component, Key) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+clear0(Component, Key) ->
+ case lookup_component(Component) of
+ {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of
+ ok -> mnesia_clear(Component, Key),
+ Mod:notify_clear(Component, Key),
+ ok;
+ E -> E
+ end;
+ E -> E
+ end.
+
+mnesia_clear(Component, Key) ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ ok = mnesia:delete(?TABLE, {Component, Key}, write)
+ end).
+
+list() ->
+ [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
+
+list(Component) -> list(Component, []).
+list_strict(Component) -> list(Component, not_found).
+
+list(Component, Default) ->
+ case lookup_component(Component) of
+ {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'},
+ [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ _ -> Default
+ end.
+
+list_formatted() ->
+ [pset(value, format(pget(value, P)), P) || P <- list()].
+
+lookup(Component, Key) ->
+ case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+ not_found -> not_found;
+ Params -> p(Params)
+ end.
+
+value(Component, Key) ->
+ case lookup0(Component, Key, rabbit_misc:const(not_found)) of
+ not_found -> not_found;
+ Params -> Params#runtime_parameters.value
+ end.
+
+value(Component, Key, Default) ->
+ Params = lookup0(Component, Key,
+ fun () -> lookup_missing(Component, Key, Default) end),
+ Params#runtime_parameters.value.
+
+lookup0(Component, Key, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {Component, Key}) of
+ [] -> DefaultFun();
+ [R] -> R
+ end.
+
+lookup_missing(Component, Key, Default) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read(?TABLE, {Component, Key}, read) of
+ [] -> Record = c(Component, Key, Default),
+ mnesia:write(?TABLE, Record, write),
+ Record;
+ [R] -> R
+ end
+ end).
+
+c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key},
+ value = Default}.
+
+p(#runtime_parameters{key = {Component, Key}, value = Value}) ->
+ [{component, Component},
+ {key, Key},
+ {value, Value}].
+
+info_keys() -> [component, key, value].
+
+%%---------------------------------------------------------------------------
+
+lookup_component(Component) ->
+ case rabbit_registry:lookup_module(
+ runtime_parameter, list_to_atom(binary_to_list(Component))) of
+ {error, not_found} -> {errors,
+ [{"component ~s not found", [Component]}]};
+ {ok, Module} -> {ok, Module}
+ end.
+
+parse(Src0) ->
+ Src1 = string:strip(Src0),
+ Src = case lists:reverse(Src1) of
+ [$. |_] -> Src1;
+ _ -> Src1 ++ "."
+ end,
+ case erl_scan:string(Src) of
+ {ok, Scanned, _} ->
+ case erl_parse:parse_term(Scanned) of
+ {ok, Parsed} ->
+ {ok, Parsed};
+ {error, E} ->
+ {errors,
+ [{"Could not parse value: ~s", [format_parse_error(E)]}]}
+ end;
+ {error, E, _} ->
+ {errors, [{"Could not scan value: ~s", [format_parse_error(E)]}]}
+ end.
+
+format_parse_error({_Line, Mod, Err}) ->
+ lists:flatten(Mod:format_error(Err)).
+
+format(Term) ->
+ list_to_binary(rabbit_misc:format("~p", [Term])).
+
+%%---------------------------------------------------------------------------
+
+%% We will want to be able to biject these to JSON. So we have some
+%% generic restrictions on what we consider acceptable.
+validate(Proplist = [T | _]) when is_tuple(T) -> validate_proplist(Proplist);
+validate(L) when is_list(L) -> validate_list(L);
+validate(T) when is_tuple(T) -> {error, "tuple: ~p", [T]};
+validate(B) when is_boolean(B) -> ok;
+validate(null) -> ok;
+validate(A) when is_atom(A) -> {error, "atom: ~p", [A]};
+validate(N) when is_number(N) -> ok;
+validate(B) when is_binary(B) -> ok;
+validate(B) when is_bitstring(B) -> {error, "bitstring: ~p", [B]}.
+
+validate_list(L) -> [validate(I) || I <- L].
+validate_proplist(L) -> [vp(I) || I <- L].
+
+vp({K, V}) when is_binary(K) -> validate(V);
+vp({K, _V}) -> {error, "bad key: ~p", [K]};
+vp(H) -> {error, "not two tuple: ~p", [H]}.
+
+flatten_errors(L) ->
+ case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
+ [] -> ok;
+ E -> {errors, E}
+ end.
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
new file mode 100644
index 00000000..f23b3227
--- /dev/null
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -0,0 +1,38 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_runtime_parameters_test).
+-behaviour(rabbit_runtime_parameter).
+
+-export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+-export([register/0, unregister/0]).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE).
+
+unregister() ->
+ rabbit_registry:unregister(runtime_parameter, <<"test">>).
+
+validate(<<"test">>, <<"good">>, _Term) -> ok;
+validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok;
+validate(<<"test">>, _, _) -> {error, "meh", []}.
+
+validate_clear(<<"test">>, <<"good">>) -> ok;
+validate_clear(<<"test">>, <<"maybe">>) -> ok;
+validate_clear(<<"test">>, _) -> {error, "meh", []}.
+
+notify(_, _, _) -> ok.
+notify_clear(_, _) -> ok.
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index bf2b4798..f142d233 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -19,6 +19,8 @@
-behaviour(supervisor).
-export([start_link/0, start_child/1, start_child/2, start_child/3,
+ start_supervisor_child/1, start_supervisor_child/2,
+ start_supervisor_child/3,
start_restartable_child/1, start_restartable_child/2, stop_child/1]).
-export([init/1]).
@@ -33,7 +35,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_child/1 :: (atom()) -> 'ok').
+-spec(start_child/2 :: (atom(), [any()]) -> 'ok').
-spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok').
+-spec(start_supervisor_child/1 :: (atom()) -> 'ok').
+-spec(start_supervisor_child/2 :: (atom(), [any()]) -> 'ok').
+-spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(start_restartable_child/1 :: (atom()) -> 'ok').
-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok').
-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())).
@@ -42,22 +48,29 @@
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
-start_child(Mod) ->
- start_child(Mod, []).
+start_child(Mod) -> start_child(Mod, []).
-start_child(Mod, Args) ->
- start_child(Mod, Mod, Args).
+start_child(Mod, Args) -> start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- child_reply(supervisor:start_child(?SERVER,
- {ChildId, {Mod, start_link, Args},
- transient, ?MAX_WAIT, worker, [Mod]})).
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]})).
+
+start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
+
+start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
+
+start_supervisor_child(ChildId, Mod, Args) ->
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, infinity, supervisor, [Mod]})).
-start_restartable_child(Mod) ->
- start_restartable_child(Mod, []).
+start_restartable_child(Mod) -> start_restartable_child(Mod, []).
start_restartable_child(Mod, Args) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
@@ -73,8 +86,7 @@ stop_child(ChildId) ->
E -> E
end.
-init([]) ->
- {ok, {{one_for_all, 0, 1}, []}}.
+init([]) -> {ok, {{one_for_all, 0, 1}, []}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 04ee6ef2..bb60bd12 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -29,6 +29,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+-define(TIMEOUT, 5000).
all_tests() ->
passed = gm_tests:all_tests(),
@@ -50,9 +51,10 @@ all_tests() ->
passed = test_app_management(),
passed = test_log_management_during_startup(),
passed = test_statistics(),
- passed = test_option_parser(),
+ passed = test_arguments_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
+ passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
passed = maybe_run_cluster_dependent_tests(),
@@ -616,8 +618,8 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, Args) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end),
- rabbit_exchange:callback(X, Fun, [none, X] ++ Args).
+ fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end),
+ rabbit_exchange:callback(X, Fun, none, [X] ++ Args).
test_topic_expect_match(X, List) ->
lists:foreach(
@@ -800,27 +802,57 @@ test_log_management_during_startup() ->
ok = control_action(start_app, []),
passed.
-test_option_parser() ->
- %% command and arguments should just pass through
- ok = check_get_options({["mock_command", "arg1", "arg2"], []},
- [], ["mock_command", "arg1", "arg2"]),
+test_arguments_parser() ->
+ GlobalOpts1 = [{"-f1", flag}, {"-o1", {option, "foo"}}],
+ Commands1 = [command1, {command2, [{"-f2", flag}, {"-o2", {option, "bar"}}]}],
- %% get flags
- ok = check_get_options(
- {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
- [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
-
- %% get options
- ok = check_get_options(
- {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
- [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
- ["mock_command", "-foo", "bar"]),
+ GetOptions =
+ fun (Args) ->
+ rabbit_misc:parse_arguments(Commands1, GlobalOpts1, Args)
+ end,
- %% shuffled and interleaved arguments and options
- ok = check_get_options(
- {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
- [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
- ["-f", "a1", "-o1", "hello", "a2", "a3"]),
+ check_parse_arguments(no_command, GetOptions, []),
+ check_parse_arguments(no_command, GetOptions, ["foo", "bar"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "foo"}], []}},
+ GetOptions, ["command1"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}},
+ GetOptions, ["command1", "-o1", "blah"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", true}, {"-o1", "foo"}], []}},
+ GetOptions, ["command1", "-f1"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}},
+ GetOptions, ["-o1", "blah", "command1"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], ["quux"]}},
+ GetOptions, ["-o1", "blah", "command1", "quux"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", true}, {"-o1", "blah"}], ["quux", "baz"]}},
+ GetOptions, ["command1", "quux", "-f1", "-o1", "blah", "baz"]),
+ %% For duplicate flags, the last one counts
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "second"}], []}},
+ GetOptions, ["-o1", "first", "command1", "-o1", "second"]),
+ %% If the flag "eats" the command, the command won't be recognised
+ check_parse_arguments(no_command, GetOptions,
+ ["-o1", "command1", "quux"]),
+ %% If a flag eats another flag, the eaten flag won't be recognised
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "-f1"}], []}},
+ GetOptions, ["command1", "-o1", "-f1"]),
+
+ %% Now for some command-specific flags...
+ check_parse_arguments(
+ {ok, {command2, [{"-f1", false}, {"-f2", false},
+ {"-o1", "foo"}, {"-o2", "bar"}], []}},
+ GetOptions, ["command2"]),
+
+ check_parse_arguments(
+ {ok, {command2, [{"-f1", false}, {"-f2", true},
+ {"-o1", "baz"}, {"-o2", "bar"}], ["quux", "foo"]}},
+ GetOptions, ["-f2", "command2", "quux", "-o1", "baz", "foo"]),
passed.
@@ -1097,6 +1129,38 @@ test_user_management() ->
passed.
+test_runtime_parameters() ->
+ rabbit_runtime_parameters_test:register(),
+ Good = fun(L) -> ok = control_action(set_parameter, L) end,
+ Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end,
+
+ %% Acceptable for bijection
+ Good(["test", "good", "<<\"ignore\">>"]),
+ Good(["test", "good", "123"]),
+ Good(["test", "good", "true"]),
+ Good(["test", "good", "false"]),
+ Good(["test", "good", "null"]),
+ Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]),
+
+ %% Various forms of fail due to non-bijectability
+ Bad(["test", "good", "atom"]),
+ Bad(["test", "good", "{tuple, foo}"]),
+ Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]),
+ Bad(["test", "good", "[{key, <<\"value\">>}]"]),
+
+ %% Test actual validation hook
+ Good(["test", "maybe", "<<\"good\">>"]),
+ Bad(["test", "maybe", "<<\"bad\">>"]),
+
+ ok = control_action(list_parameters, []),
+
+ ok = control_action(clear_parameter, ["test", "good"]),
+ ok = control_action(clear_parameter, ["test", "maybe"]),
+ {error_string, _} =
+ control_action(clear_parameter, ["test", "neverexisted"]),
+ rabbit_runtime_parameters_test:unregister(),
+ passed.
+
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
@@ -1177,7 +1241,7 @@ test_spawn() ->
rabbit_limiter:make_token(self())),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
end,
{Writer, Ch}.
@@ -1198,7 +1262,7 @@ test_spawn_remote() ->
end
end),
receive Res -> Res
- after 1000 -> throw(failed_to_receive_result)
+ after ?TIMEOUT -> throw(failed_to_receive_result)
end.
user(Username) ->
@@ -1218,13 +1282,10 @@ test_confirms() ->
queue = Q0,
exchange = <<"amq.direct">>,
routing_key = "magic" }),
- receive #'queue.bind_ok'{} ->
- Q0
- after 1000 ->
- throw(failed_to_bind_queue)
+ receive #'queue.bind_ok'{} -> Q0
+ after ?TIMEOUT -> throw(failed_to_bind_queue)
end
- after 1000 ->
- throw(failed_to_declare_queue)
+ after ?TIMEOUT -> throw(failed_to_declare_queue)
end
end,
%% Declare and bind two queues
@@ -1237,7 +1298,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
#'confirm.select_ok'{} -> ok
- after 1000 -> throw(failed_to_enable_confirms)
+ after ?TIMEOUT -> throw(failed_to_enable_confirms)
end,
%% Publish a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
@@ -1254,7 +1315,7 @@ test_confirms() ->
receive
#'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(received_ack_instead_of_nack)
- after 2000 -> throw(did_not_receive_nack)
+ after ?TIMEOUT-> throw(did_not_receive_nack)
end,
receive
#'basic.ack'{} -> throw(received_ack_when_none_expected)
@@ -1264,7 +1325,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
receive
#'queue.delete_ok'{} -> ok
- after 1000 -> throw(failed_to_cleanup_queue)
+ after ?TIMEOUT -> throw(failed_to_cleanup_queue)
end,
unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
@@ -1287,7 +1348,7 @@ test_statistics_receive_event1(Ch, Matcher) ->
true -> Props;
_ -> test_statistics_receive_event1(Ch, Matcher)
end
- after 1000 -> throw(failed_to_receive_event)
+ after ?TIMEOUT -> throw(failed_to_receive_event)
end.
test_statistics() ->
@@ -1299,9 +1360,8 @@ test_statistics() ->
%% Set up a channel and queue
{_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{}),
- QName = receive #'queue.declare_ok'{queue = Q0} ->
- Q0
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
QPid = Q#amqqueue.pid,
@@ -1381,7 +1441,7 @@ expect_event(Pid, Type) ->
Pid -> ok;
_ -> expect_event(Pid, Type)
end
- after 1000 -> throw({failed_to_receive_event, Type})
+ after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
test_delegates_async(SecondaryNode) ->
@@ -1405,7 +1465,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
+ after ?TIMEOUT -> throw(Throw)
end
end.
@@ -1418,9 +1478,7 @@ await_response(Count) ->
receive
response -> ok,
await_response(Count - 1)
- after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ after ?TIMEOUT -> throw(timeout)
end.
must_exit(Fun) ->
@@ -1487,7 +1545,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
rabbit_channel:shutdown(Ch),
rabbit:stop(),
@@ -1498,8 +1556,7 @@ test_queue_cleanup(_SecondaryNode) ->
receive
#'channel.close'{reply_code = ?NOT_FOUND} ->
ok
- after 2000 ->
- throw(failed_to_receive_channel_exit)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_exit)
end,
rabbit_channel:shutdown(Ch2),
passed.
@@ -1526,8 +1583,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false),
passed
- after 2000 ->
- throw(failed_to_create_and_kill_queue)
+ after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
%%---------------------------------------------------------------------
@@ -1540,7 +1596,7 @@ control_action(Command, Args, NewOpts) ->
expand_options(default_options(), NewOpts)).
control_action(Command, Node, Args, Opts) ->
- case catch rabbit_control:action(
+ case catch rabbit_control_main:action(
Command, Node, Args, Opts,
fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
@@ -1572,10 +1628,13 @@ expand_options(As, Bs) ->
end
end, Bs, As).
-check_get_options({ExpArgs, ExpOpts}, Defs, Args) ->
- {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args),
- true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order
- ok.
+check_parse_arguments(ExpRes, Fun, As) ->
+ SortRes =
+ fun (no_command) -> no_command;
+ ({ok, {C, KVs, As1}}) -> {ok, {C, lists:sort(KVs), As1}}
+ end,
+
+ true = SortRes(ExpRes) =:= SortRes(Fun(As)).
empty_files(Files) ->
[case file:read_file_info(File) of
@@ -1755,7 +1814,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) ->
Pid);
stop ->
done
- after (case Awaiting of [] -> 200; _ -> 1000 end) ->
+ after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) ->
case Awaiting of
[] -> Pid ! {self(), arrived}, on_disk_capture();
_ -> Pid ! {self(), timeout}
@@ -2308,7 +2367,7 @@ wait_for_confirms(Unconfirmed) ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
- after 5000 -> exit(timeout_waiting_for_confirm)
+ after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
end
end.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 9f2535bd..18704807 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -36,6 +36,10 @@
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
-rabbit_upgrade({topic_trie_node, mnesia, []}).
+-rabbit_upgrade({runtime_parameters, mnesia, []}).
+-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
+-rabbit_upgrade({policy, mnesia,
+ [exchange_scratches, ha_mirrors]}).
%% -------------------------------------------------------------------
@@ -56,6 +60,8 @@
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
+-spec(runtime_parameters/0 :: () -> 'ok').
+-spec(policy/0 :: () -> 'ok').
-endif.
@@ -185,6 +191,55 @@ topic_trie_node() ->
{attributes, [trie_node, edge_count, binding_count]},
{type, ordered_set}]).
+runtime_parameters() ->
+ create(rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, [key, value]},
+ {disc_copies, [node()]}]).
+
+exchange_scratches() ->
+ ok = exchange_scratches(rabbit_exchange),
+ ok = exchange_scratches(rabbit_durable_exchange).
+
+exchange_scratches(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type = <<"x-federation">>, Dur, AutoDel, Int, Args,
+ Scratch}) ->
+ Scratches = orddict:store(federation, Scratch, orddict:new()),
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches};
+ %% We assert here that nothing else uses the scratch mechanism ATM
+ ({exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches]).
+
+policy() ->
+ ok = exchange_policy(rabbit_exchange),
+ ok = exchange_policy(rabbit_durable_exchange),
+ ok = queue_policy(rabbit_queue),
+ ok = queue_policy(rabbit_durable_queue).
+
+exchange_policy(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches,
+ policy]).
+
+queue_policy(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) ->
+ {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes,
+ undefined}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid,
+ slave_pids, mirror_nodes, policy]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 209e5252..49213c95 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,6 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -335,6 +334,13 @@
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type(ack() :: seq_id()).
-type(state() :: #vqstate {
q1 :: ?QUEUE:?QUEUE(),
q2 :: ?QUEUE:?QUEUE(),
@@ -368,6 +374,8 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
+%% Duplicated from rabbit_backing_queue
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(multiple_routing_keys/0 :: () -> 'ok').
@@ -1458,16 +1466,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
- S1 -> {_, State2} =
- lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end,
- {S1, State},
- case (AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress) of
- true -> [AckFun, AlphaBetaFun];
- false -> [AlphaBetaFun, AckFun]
- end),
+ S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress)) of
+ true -> [AckFun, AlphaBetaFun];
+ false -> [AlphaBetaFun, AckFun]
+ end,
+ {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, Funs),
{true, State2}
end,
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index f1b74878..3d3623d7 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -411,6 +411,8 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
#child{mfa = {M, F, A}} = hd(State#state.children),
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
+ {ok, undefined} ->
+ {reply, {ok, undefined}, State};
{ok, Pid} ->
NState = State#state{dynamics =
?DICT:store(Pid, Args, State#state.dynamics)},
@@ -743,6 +745,8 @@ restart(Strategy, Child, State, Restart)
#child{mfa = {M, F, A}} = Child,
Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics),
case do_start_child_i(M, F, A) of
+ {ok, undefined} ->
+ {ok, State};
{ok, Pid} ->
NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)},
{ok, NState};