diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-01 23:50:58 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-07-01 23:50:58 +0100 |
commit | c79513aa24b7058eb8a4d4c228dc34aa085aaed4 (patch) | |
tree | d3b64592b4a5418adb1d63fb6d355577a2c6f9c7 | |
parent | 63a6a40f5beff915b3e8d0512f04bfbfded1ea2c (diff) | |
parent | 1791a50cfb0a9db025b74c701744e0dd45c91ef7 (diff) | |
download | rabbitmq-server-c79513aa24b7058eb8a4d4c228dc34aa085aaed4.tar.gz |
merge bug24884 into default (no-op)
65 files changed, 2562 insertions, 1224 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ded3ab48..68bc87be 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -720,7 +720,7 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> @@ -745,6 +745,99 @@ </refsect2> <refsect2> + <title>Parameter Management</title> + <para> + Certain features of RabbitMQ (such as the federation plugin) + are controlled by dynamic, + cluster-wide <emphasis>parameters</emphasis>. Each parameter + consists of a component name, a key and a value. The + component name and key are strings, and the value is an + Erlang term. Parameters can be set, cleared and listed. In + general you should refer to the documentation for the feature + in question to see how to set parameters. + </para> + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>set_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Sets a parameter. + </para> + <variablelist> + <varlistentry> + <term>component_name</term> + <listitem><para> + The name of the component for which the + parameter is being set. + </para></listitem> + </varlistentry> + <varlistentry> + <term>key</term> + <listitem><para> + The key for which the parameter is being set. + </para></listitem> + </varlistentry> + <varlistentry> + <term>value</term> + <listitem><para> + The value for the parameter, as an + Erlang term. In most shells you are very likely to + need to quote this. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_parameter federation local_username '<<"guest">>'</screen> + <para role="example"> + This command sets the parameter <command>local_username</command> for the <command>federation</command> component to the Erlang term <command><<"guest">></command>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>clear_parameter</command> <arg choice="req"><replaceable>component_name</replaceable></arg> <arg choice="req"><replaceable>key</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Clears a parameter. + </para> + <variablelist> + <varlistentry> + <term>component_name</term> + <listitem><para> + The name of the component for which the + parameter is being cleared. + </para></listitem> + </varlistentry> + <varlistentry> + <term>key</term> + <listitem><para> + The key for which the parameter is being cleared. + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_parameter federation local_username</screen> + <para role="example"> + This command clears the parameter <command>local_username</command> for the <command>federation</command> component. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>list_parameters</command></cmdsynopsis></term> + <listitem> + <para> + Lists all parameters. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl list_parameters</screen> + <para role="example"> + This command lists all parameters. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> <title>Server Status</title> <para> The server status queries interrogate the server and return a list of @@ -794,6 +887,10 @@ <listitem><para>Queue arguments.</para></listitem> </varlistentry> <varlistentry> + <term>policy</term> + <listitem><para>Policy name applying to the queue.</para></listitem> + </varlistentry> + <varlistentry> <term>pid</term> <listitem><para>Id of the Erlang process associated with the queue.</para></listitem> </varlistentry> @@ -885,7 +982,8 @@ </varlistentry> <varlistentry> <term>type</term> - <listitem><para>The exchange type (one of [<command>direct</command>, + <listitem><para>The exchange type (such as + [<command>direct</command>, <command>topic</command>, <command>headers</command>, <command>fanout</command>]).</para></listitem> </varlistentry> @@ -905,6 +1003,10 @@ <term>arguments</term> <listitem><para>Exchange arguments.</para></listitem> </varlistentry> + <varlistentry> + <term>policy</term> + <listitem><para>Policy name for applying to the exchange.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>exchangeinfoitem</command>s are specified then diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index b7d14f20..523b54ce 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,9 +19,11 @@ {ssl_listeners, []}, {ssl_options, []}, {vm_memory_high_watermark, 0.4}, - {disk_free_limit, {mem_relative, 1.0}}, + {disk_free_limit, 1000000000}, %% 1GB {msg_store_index_module, rabbit_msg_store_ets_index}, {backing_queue_module, rabbit_variable_queue}, + %% 0 ("no limit") would make a better default, but that + %% breaks the QPid Java client {frame_max, 131072}, {msg_store_file_size_limit, 16777216}, {queue_index_max_journal_entries, 262144}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index faf3059a..e8b4a623 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,11 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments, - scratch}). + scratches, policy}). -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, mirror_nodes}). + arguments, pid, slave_pids, mirror_nodes, policy}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). @@ -66,6 +66,8 @@ -record(listener, {node, protocol, host, ip_address, port}). +-record(runtime_parameters, {key, value}). + -record(basic_message, {exchange_name, routing_keys = [], content, id, is_persistent}). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 234fc2c7..03e513f8 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -32,8 +32,8 @@ prepare: SPECS/rabbitmq-server.spec cp ${COMMON_DIR}/* SOURCES/ + cp rabbitmq-server.init SOURCES/rabbitmq-server.init sed -i \ - -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ -e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \ SOURCES/rabbitmq-server.init ifeq "$(RPM_OS)" "fedora" @@ -42,6 +42,7 @@ ifeq "$(RPM_OS)" "fedora" SOURCES/rabbitmq-server.init endif sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + -e 's|@STDOUT_STDERR_REDIRECTION@||' \ SOURCES/rabbitmq-script-wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate diff --git a/packaging/common/rabbitmq-server.init b/packaging/RPMS/Fedora/rabbitmq-server.init index 40238c8e..2d2680e3 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/RPMS/Fedora/rabbitmq-server.init @@ -27,7 +27,7 @@ INIT_LOG_DIR=/var/log/rabbitmq PID_FILE=/var/run/rabbitmq/pid START_PROG= # Set when building package -LOCK_FILE= # Set when building package +LOCK_FILE=/var/lock/subsys/$NAME test -x $DAEMON || exit 0 test -x $CONTROL || exit 0 diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 0e59c218..e832aed6 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -29,7 +29,9 @@ cd /var/lib/rabbitmq SCRIPT=`basename $0` -if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then +if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then + /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@ +elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then /usr/lib/rabbitmq/bin/${SCRIPT} "$@" elif [ `id -u` = 0 ] ; then @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 2a738f6e..1e4bf755 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -22,15 +22,8 @@ package: clean tar -zxf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ -# Debian and descendants differ from most other distros in that -# runlevel 2 should start network services. - sed -i \ - -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ - -e 's|^START_PROG=.*$$|START_PROG="start-stop-daemon -v --chuid rabbitmq --start --exec"|' \ - -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \ - -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \ - $(UNPACKED_DIR)/debian/rabbitmq-server.init sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + -e 's|@STDOUT_STDERR_REDIRECTION@| > "/var/log/rabbitmq/startup_log" 2> "/var/log/rabbitmq/startup_err"|' \ $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper chmod a+x $(UNPACKED_DIR)/debian/rules echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index e935acf5..943ed48f 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> -Uploader: Emile Joubert <emile@rabbitmq.com> +Uploaders: Emile Joubert <emile@rabbitmq.com> DM-Upload-Allowed: yes Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip Standards-Version: 3.8.0 diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init new file mode 100644 index 00000000..b2d3f86a --- /dev/null +++ b/packaging/debs/Debian/debian/rabbitmq-server.init @@ -0,0 +1,187 @@ +#!/bin/sh +# +# rabbitmq-server RabbitMQ broker +# +# chkconfig: - 80 05 +# description: Enable AMQP service provided by RabbitMQ +# + +### BEGIN INIT INFO +# Provides: rabbitmq-server +# Required-Start: $remote_fs $network +# Required-Stop: $remote_fs $network +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Description: RabbitMQ broker +# Short-Description: Enable AMQP service provided by RabbitMQ broker +### END INIT INFO + +PATH=/sbin:/usr/sbin:/bin:/usr/bin +NAME=rabbitmq-server +DAEMON=/usr/sbin/${NAME} +CONTROL=/usr/sbin/rabbitmqctl +DESC="message broker" +USER=rabbitmq +ROTATE_SUFFIX= +INIT_LOG_DIR=/var/log/rabbitmq +PID_FILE=/var/run/rabbitmq/pid + + +test -x $DAEMON || exit 0 +test -x $CONTROL || exit 0 + +RETVAL=0 +set -e + +[ -f /etc/default/${NAME} ] && . /etc/default/${NAME} + +. /lib/lsb/init-functions +. /lib/init/vars.sh + +ensure_pid_dir () { + PID_DIR=`dirname ${PID_FILE}` + if [ ! -d ${PID_DIR} ] ; then + mkdir -p ${PID_DIR} + chown -R ${USER}:${USER} ${PID_DIR} + chmod 755 ${PID_DIR} + fi +} + +remove_pid () { + rm -f ${PID_FILE} + rmdir `dirname ${PID_FILE}` || : +} + +start_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL != 0 ] ; then + RETVAL=0 + ensure_pid_dir + set +e + RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \ + --chuid rabbitmq --start --exec $DAEMON \ + --pidfile "$RABBITMQ_PID_FILE" --background + $CONTROL wait $PID_FILE >/dev/null 2>&1 + RETVAL=$? + set -e + if [ $RETVAL != 0 ] ; then + remove_pid + fi + else + RETVAL=3 + fi +} + +stop_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL = 0 ] ; then + set +e + $CONTROL stop ${PID_FILE} > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err + RETVAL=$? + set -e + if [ $RETVAL = 0 ] ; then + remove_pid + fi + else + RETVAL=3 + fi +} + +status_rabbitmq() { + set +e + if [ "$1" != "quiet" ] ; then + $CONTROL status 2>&1 + else + $CONTROL status > /dev/null 2>&1 + fi + if [ $? != 0 ] ; then + RETVAL=3 + fi + set -e +} + +rotate_logs_rabbitmq() { + set +e + $CONTROL -q rotate_logs ${ROTATE_SUFFIX} + if [ $? != 0 ] ; then + RETVAL=1 + fi + set -e +} + +restart_running_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL = 0 ] ; then + restart_rabbitmq + else + log_warning_msg "${DESC} not running" + fi +} + +restart_rabbitmq() { + stop_rabbitmq + start_rabbitmq +} + +restart_end() { + if [ $RETVAL = 0 ] ; then + log_end_msg 0 + else + log_end_msg 1 + fi +} + +start_stop_end() { + case "$RETVAL" in + 0) + [ -x /sbin/initctl ] && /sbin/initctl emit --no-wait "${NAME}-${1}" + log_end_msg 0 + ;; + 3) + log_warning_msg "${DESC} already ${1}" + log_end_msg 0 + RETVAL=0 + ;; + *) + log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}" + log_end_msg 1 + ;; + esac +} + +case "$1" in + start) + log_daemon_msg "Starting ${DESC}" $NAME + start_rabbitmq + start_stop_end "running" + ;; + stop) + log_daemon_msg "Stopping ${DESC}" $NAME + stop_rabbitmq + start_stop_end "stopped" + ;; + status) + status_rabbitmq + ;; + rotate-logs) + log_action_begin_msg "Rotating log files for ${DESC}: ${NAME}" + rotate_logs_rabbitmq + log_action_end_msg $RETVAL + ;; + force-reload|reload|restart) + log_daemon_msg "Restarting ${DESC}" $NAME + restart_rabbitmq + restart_end + ;; + try-restart) + log_daemon_msg "Restarting ${DESC}" $NAME + restart_running_rabbitmq + restart_end + ;; + *) + echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 + RETVAL=1 + ;; +esac + +exit $RETVAL diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 14a18d57..97c74791 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -31,7 +31,7 @@ exec erl \ -noinput \ -hidden \ -sname rabbitmq-plugins$$ \ - -s rabbit_plugins \ + -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ -extra "$@" diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 66a900a1..c67a0263 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -43,9 +43,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 0a5a4640..34915b3d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -58,7 +58,8 @@ DEFAULT_NODE_PORT=5672 ##--- End of overridden <var_name> variables RABBITMQ_START_RABBIT= -[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' +[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput" +[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot " case "$(uname -s)" in CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait @@ -70,24 +71,16 @@ case "$(uname -s)" in esac RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" -if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then - if erl \ - -pa "$RABBITMQ_EBIN_ROOT" \ - -noinput \ - -hidden \ - -s rabbit_prelaunch \ - -sname rabbitmqprelaunch$$ \ - -extra "$RABBITMQ_ENABLED_PLUGINS_FILE" "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}" +if ! erl -pa "$RABBITMQ_EBIN_ROOT" \ + -noinput \ + -hidden \ + -s rabbit_prelaunch \ + -sname rabbitmqprelaunch$$ \ + -extra "${RABBITMQ_NODENAME}"; then - RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" - RABBITMQ_EBIN_PATH="" - else - exit 1 - fi -else - RABBITMQ_BOOT_FILE=start_sasl - RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}" + exit 1; fi + RABBITMQ_CONFIG_ARG= [ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" @@ -100,10 +93,10 @@ RABBITMQ_LISTEN_ARG= set -f exec erl \ - ${RABBITMQ_EBIN_PATH} \ + -pa ${RABBITMQ_EBIN_ROOT} \ ${RABBITMQ_START_RABBIT} \ -sname ${RABBITMQ_NODENAME} \ - -boot ${RABBITMQ_BOOT_FILE} \ + -boot start_sasl \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ @@ -112,6 +105,9 @@ exec erl \ -sasl sasl_error_logger false \ -rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \ -rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \ + -rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \ + -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ -os_mon start_cpu_sup false \ -os_mon start_disksup false \ -os_mon start_memsup false \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index ca49a5d8..167f272e 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -86,25 +86,24 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- "!RABBITMQ_NODENAME!"
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+ -pa "!RABBITMQ_EBIN_ROOT!" ^
+ -noinput -hidden ^
+ -s rabbit_prelaunch ^
+ -sname rabbitmqprelaunch!RANDOM! ^
+ -extra "!RABBITMQ_NODENAME!"
+
if ERRORLEVEL 1 (
exit /B 1
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_PATH="-pa !RABBITMQ_EBIN_ROOT!"
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -124,9 +123,10 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( )
"!ERLANG_HOME!\bin\erl.exe" ^
-!RABBITMQ_EBIN_PATH! ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
+W w ^
@@ -139,6 +139,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( -sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 9e274840..4758c861 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -154,24 +154,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" ( set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- ""
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if ERRORLEVEL 1 (
- exit /B 1
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -191,8 +178,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" ( )
set ERLANG_SERVICE_ARGUMENTS= ^
-!RABBITMQ_EBIN_PATH! ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
+W w ^
+A30 ^
@@ -204,6 +192,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^ -sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 4aad6b8f..a5fade72 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -32,6 +32,6 @@ exec erl \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ - -s rabbit_control \ + -s rabbit_control_main \ -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index f37fae48..9f549f1e 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -34,7 +34,7 @@ if "!RABBITMQ_NODENAME!"=="" ( if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -43,7 +43,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl new file mode 100644 index 00000000..4bef83a5 --- /dev/null +++ b/src/app_utils.erl @@ -0,0 +1,121 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% +-module(app_utils). + +-export([load_applications/1, start_applications/1, + stop_applications/1, app_dependency_order/2, + wait_for_applications/1]). + +-ifdef(use_specs). + +-spec load_applications([atom()]) -> 'ok'. +-spec start_applications([atom()]) -> 'ok'. +-spec stop_applications([atom()]) -> 'ok'. +-spec wait_for_applications([atom()]) -> 'ok'. +-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. + +-endif. + +%%--------------------------------------------------------------------------- +%% Public API + +load_applications(Apps) -> + load_applications(queue:from_list(Apps), sets:new()), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + +wait_for_applications(Apps) -> + [wait_for_application(App) || App <- Apps], ok. + +app_dependency_order(RootApps, StripUnreachable) -> + {ok, G} = rabbit_misc:build_acyclic_graph( + fun (App, _Deps) -> [{App, App}] end, + fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end, + [{App, app_dependencies(App)} || + {App, _Desc, _Vsn} <- application:loaded_applications()]), + try + case StripUnreachable of + true -> digraph:del_vertices(G, digraph:vertices(G) -- + digraph_utils:reachable(RootApps, G)); + false -> ok + end, + digraph_utils:topsort(G) + after + true = digraph:delete(G) + end. + +%%--------------------------------------------------------------------------- +%% Private API + +wait_for_application(Application) -> + case lists:keymember(Application, 1, application:which_applications()) of + true -> ok; + false -> timer:sleep(1000), + wait_for_application(Application) + end. + +load_applications(Worklist, Loaded) -> + case queue:out(Worklist) of + {empty, _WorkList} -> + ok; + {{value, App}, Worklist1} -> + case sets:is_element(App, Loaded) of + true -> load_applications(Worklist1, Loaded); + false -> case application:load(App) of + ok -> ok; + {error, {already_loaded, App}} -> ok; + Error -> throw(Error) + end, + load_applications( + queue:join(Worklist1, + queue:from_list(app_dependencies(App))), + sets:add_element(App, Loaded)) + end + end. + +app_dependencies(App) -> + case application:get_key(App, applications) of + undefined -> []; + {ok, Lst} -> Lst + end. + +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + @@ -433,51 +433,47 @@ -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). -spec(group_members/1 :: (pid()) -> [pid()]). -%% The joined, members_changed and handle_msg callbacks can all -%% return any of the following terms: +%% The joined, members_changed and handle_msg callbacks can all return +%% any of the following terms: %% %% 'ok' - the callback function returns normally %% -%% {'stop', Reason} - the callback indicates the member should -%% stop with reason Reason and should leave the group. +%% {'stop', Reason} - the callback indicates the member should stop +%% with reason Reason and should leave the group. %% -%% {'become', Module, Args} - the callback indicates that the -%% callback module should be changed to Module and that the -%% callback functions should now be passed the arguments -%% Args. This allows the callback module to be dynamically -%% changed. +%% {'become', Module, Args} - the callback indicates that the callback +%% module should be changed to Module and that the callback functions +%% should now be passed the arguments Args. This allows the callback +%% module to be dynamically changed. -%% Called when we've successfully joined the group. Supplied with -%% Args provided in start_link, plus current group members. +%% Called when we've successfully joined the group. Supplied with Args +%% provided in start_link, plus current group members. -callback joined(Args :: term(), Members :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. -%% Supplied with Args provided in start_link, the list of new -%% members and the list of members previously known to us that -%% have since died. Note that if a member joins and dies very -%% quickly, it's possible that we will never see that member -%% appear in either births or deaths. However we are guaranteed -%% that (1) we will see a member joining either in the births -%% here, or in the members passed to joined/2 before receiving -%% any messages from it; and (2) we will not see members die that -%% we have not seen born (or supplied in the members to -%% joined/2). +%% Supplied with Args provided in start_link, the list of new members +%% and the list of members previously known to us that have since +%% died. Note that if a member joins and dies very quickly, it's +%% possible that we will never see that member appear in either births +%% or deaths. However we are guaranteed that (1) we will see a member +%% joining either in the births here, or in the members passed to +%% joined/2 before receiving any messages from it; and (2) we will not +%% see members die that we have not seen born (or supplied in the +%% members to joined/2). -callback members_changed(Args :: term(), Births :: [pid()], Deaths :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the sender, and the -%% message. This does get called for messages injected by this -%% member, however, in such cases, there is no special -%% significance of this invocation: it does not indicate that the -%% message has made it to any other members, let alone all other -%% members. +%% message. This does get called for messages injected by this member, +%% however, in such cases, there is no special significance of this +%% invocation: it does not indicate that the message has made it to +%% any other members, let alone all other members. -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. -%% Called on gm member termination as per rules in gen_server, -%% with the Args provided in start_link plus the termination -%% Reason. +%% Called on gm member termination as per rules in gen_server, with +%% the Args provided in start_link plus the termination Reason. -callback terminate(Args :: term(), Reason :: term()) -> ok | term(). @@ -533,7 +529,7 @@ init([GroupName, Module, Args]) -> group_name = GroupName, module = Module, view = undefined, - pub_count = 0, + pub_count = -1, members_state = undefined, callback_args = Args, confirms = queue:new(), @@ -562,7 +558,7 @@ handle_call(group_members, _From, reply(not_joined, State); handle_call(group_members, _From, State = #state { view = View }) -> - reply(alive_view_members(View), State); + reply(get_pids(alive_view_members(View)), State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> @@ -575,33 +571,39 @@ handle_call({add_on_right, NewMember}, _From, members_state = MembersState, module = Module, callback_args = Args }) -> - Group = record_new_member_in_group( - GroupName, Self, NewMember, - fun (Group1) -> - View1 = group_to_view(Group1), - ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state( - MembersState)}) - end), + {MembersState1, Group} = + record_new_member_in_group( + GroupName, Self, NewMember, + fun (Group1) -> + View1 = group_to_view(Group1), + MembersState1 = remove_erased_members(MembersState, View1), + ok = send_right(NewMember, View1, + {catchup, Self, + prepare_members_state(MembersState1)}), + MembersState1 + end), View2 = group_to_view(Group), - State1 = check_neighbours(State #state { view = View2 }), + State1 = check_neighbours(State #state { view = View2, + members_state = MembersState1 }), Result = callback_view_changed(Args, Module, View, View2), handle_callback_result({Result, {ok, Group}, State1}). handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, + members_state = MembersState, group_name = GroupName, module = Module, callback_args = Args }) -> {Result, State1} = case needs_view_update(ReqVer, View) of - true -> - View1 = group_to_view(read_group(GroupName)), - {callback_view_changed(Args, Module, View, View1), - check_neighbours(State #state { view = View1 })}; - false -> - {ok, State} + true -> View1 = group_to_view(read_group(GroupName)), + MemberState1 = remove_erased_members(MembersState, View1), + {callback_view_changed(Args, Module, View, View1), + check_neighbours( + State #state { view = View1, + members_state = MemberState1 })}; + false -> {ok, State} end, handle_callback_result( if_callback_success( @@ -645,7 +647,7 @@ handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); -handle_info({'DOWN', MRef, process, _Pid, _Reason}, +handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, right = Right, @@ -659,28 +661,29 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, {_, {Member1, MRef}} -> Member1; _ -> undefined end, - case Member of - undefined -> + case {Member, Reason} of + {undefined, _} -> + noreply(State); + {_, {shutdown, ring_shutdown}} -> noreply(State); _ -> View1 = group_to_view(record_dead_member_in_group(Member, GroupName)), - State1 = State #state { view = View1 }, {Result, State2} = case alive_view_members(View1) of [Self] -> - maybe_erase_aliases( - State1 #state { + {Result1, State1} = maybe_erase_aliases(State, View1), + {Result1, State1 #state { members_state = blank_member_state(), - confirms = purge_confirms(Confirms) }); + confirms = purge_confirms(Confirms) }}; _ -> %% here we won't be pointing out any deaths: %% the concern is that there maybe births %% which we'd otherwise miss. {callback_view_changed(Args, Module, View, View1), - State1} + check_neighbours(State #state { view = View1 })} end, - handle_callback_result({Result, check_neighbours(State2)}) + handle_callback_result({Result, State2}) end. @@ -693,9 +696,13 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_info(flush, _State) -> 1; -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; -prioritise_info(_ , _State) -> 0. +prioritise_info(flush, _State) -> + 1; +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, + #state { members_state = MS }) when MS /= undefined -> + 1; +prioritise_info(_, _State) -> + 0. handle_msg(check_neighbours, State) -> @@ -795,8 +802,8 @@ handle_msg({activity, Left, Activity}, State1 = State #state { members_state = MembersState1, confirms = Confirms1 }, Activity3 = activity_finalise(Activity1), - {Result, State2} = maybe_erase_aliases(State1), - ok = maybe_send_activity(Activity3, State2), + ok = maybe_send_activity(Activity3, State1), + {Result, State2} = maybe_erase_aliases(State1, View), if_callback_success( Result, fun activity_true/3, fun activity_false/3, Activity3, State2); @@ -829,13 +836,14 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> + PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), - Buffer1 = [{PubCount, Msg} | Buffer], + Buffer1 = [{PubCount1, Msg} | Buffer], Confirms1 = case From of none -> Confirms; - _ -> queue:in({PubCount, From}, Confirms) + _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount + 1, + State1 = State #state { pub_count = PubCount1, confirms = Confirms1, broadcast_buffer = Buffer1 }, case From =/= none of @@ -850,14 +858,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; flush_broadcast_buffer(State = #state { self = Self, members_state = MembersState, - broadcast_buffer = Buffer }) -> + broadcast_buffer = Buffer, + pub_count = PubCount }) -> + [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount Pubs = lists:reverse(Buffer), Activity = activity_cons(Self, Pubs, [], activity_nil()), ok = maybe_send_activity(activity_finalise(Activity), State), MembersState1 = with_member( fun (Member = #member { pending_ack = PA }) -> PA1 = queue:join(PA, queue:from_list(Pubs)), - Member #member { pending_ack = PA1 } + Member #member { pending_ack = PA1, + last_pub = PubCount } end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [] }. @@ -867,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self, %% View construction and inspection %% --------------------------------------------------------------------------- -needs_view_update(ReqVer, {Ver, _View}) -> - Ver < ReqVer. +needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. -view_version({Ver, _View}) -> - Ver. +view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. @@ -890,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> - ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). -find_view_member(Id, {_Ver, View}) -> - ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). -blank_view(Ver) -> - {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, ?DICT:new()}. -alive_view_members({_Ver, View}) -> - ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( @@ -1052,7 +1057,7 @@ record_dead_member_in_group(Member, GroupName) -> Group. record_new_member_in_group(GroupName, Left, NewMember, Fun) -> - {atomic, Group} = + {atomic, {Result, Group}} = mnesia:sync_transaction( fun () -> [#gm_group { members = Members, version = Ver } = Group1] = @@ -1062,11 +1067,11 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) -> Members1 = Prefix ++ [Left, NewMember | Suffix], Group2 = Group1 #gm_group { members = Members1, version = Ver + 1 }, - ok = Fun(Group2), + Result = Fun(Group2), mnesia:write(Group2), - Group2 + {Result, Group2} end), - Group. + {Result, Group}. erase_members_in_group(Members, GroupName) -> DeadMembers = [{dead, Id} || Id <- Members], @@ -1089,10 +1094,10 @@ erase_members_in_group(Members, GroupName) -> maybe_erase_aliases(State = #state { self = Self, group_name = GroupName, - view = View, + view = View0, members_state = MembersState, module = Module, - callback_args = Args }) -> + callback_args = Args }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} = ?SETS:fold( @@ -1107,11 +1112,11 @@ maybe_erase_aliases(State = #state { self = Self, end, {[], MembersState}, Aliases), State1 = State #state { members_state = MembersState1 }, case Erasable of - [] -> {ok, State1}; + [] -> {ok, State1 #state { view = View }}; _ -> View1 = group_to_view( erase_members_in_group(Erasable, GroupName)), - {callback_view_changed(Args, Module, View, View1), - State1 #state { view = View1 }} + {callback_view_changed(Args, Module, View0, View1), + check_neighbours(State1 #state { view = View1 })} end. can_erase_view_member(Self, Self, _LA, _LP) -> false; @@ -1141,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> end, {Neighbour, maybe_monitor(Neighbour, Self)}. -maybe_monitor(Self, Self) -> - undefined; -maybe_monitor(Other, _Self) -> - erlang:monitor(process, get_pid(Other)). +maybe_monitor( Self, Self) -> undefined; +maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1233,23 +1236,19 @@ find_member_or_blank(Id, MembersState) -> error -> blank_member() end. -erase_member(Id, MembersState) -> - ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> - ?DICT:new(). +blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> - ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). -build_members_state(MembersStateList) -> - ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case read_group(GroupName) of @@ -1257,6 +1256,12 @@ make_member(GroupName) -> {error, not_found} -> ?VERSION_START end, self()}. +remove_erased_members(MembersState, View) -> + lists:foldl(fun (Id, MembersState1) -> + store_member(Id, find_member_or_blank(Id, MembersState), + MembersState1) + end, blank_member_state(), all_known_members(View)). + get_pid({_Version, Pid}) -> Pid. get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. @@ -1265,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% Activity assembly %% --------------------------------------------------------------------------- -activity_nil() -> - queue:new(). +activity_nil() -> queue:new(). -activity_cons(_Id, [], [], Tail) -> - Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> - queue:in({Sender, Pubs, Acks}, Tail). +activity_cons( _Id, [], [], Tail) -> Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). -activity_finalise(Activity) -> - queue:to_list(Activity). +activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; @@ -1287,16 +1288,30 @@ send_right(Right, View, Msg) -> ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> - lists:foldl( - fun ({Id, Pubs, _Acks}, ok) -> - lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, get_pid(Id), Pub); - (_, Error) -> - Error - end, ok, Pubs); - (_, Error) -> - Error - end, ok, Activity). + Result = + lists:foldl( + fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) -> + lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) -> + case Module2:handle_msg( + Args2, get_pid(Id), Pub) of + ok -> + Acc; + {become, Module3, Args3} -> + {Args3, Module3, ok}; + {stop, _Reason} = Error -> + Error + end; + (_, Error = {stop, _Reason}) -> + Error + end, {Args1, Module1, ok}, Pubs); + (_, Error = {stop, _Reason}) -> + Error + end, {Args, Module, ok}, Activity), + case Result of + {Args, Module, ok} -> ok; + {Args1, Module1, ok} -> {become, Module1, Args1}; + {stop, _Reason} = Error -> Error + end. callback_view_changed(Args, Module, OldView, NewView) -> OldMembers = all_known_members(OldView), @@ -1364,34 +1379,25 @@ purge_confirms(Confirms) -> %% Msg transformation %% --------------------------------------------------------------------------- -acks_from_queue(Q) -> - [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. +acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. -pubs_from_queue(Q) -> - queue:to_list(Q). +pubs_from_queue(Q) -> queue:to_list(Q). -queue_from_pubs(Pubs) -> - queue:from_list(Pubs). +queue_from_pubs(Pubs) -> queue:from_list(Pubs). -apply_acks([], Pubs) -> - Pubs; -apply_acks(List, Pubs) -> - {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. +apply_acks( [], Pubs) -> Pubs; +apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). -last_ack([], LA) -> - LA; -last_ack(List, LA) -> - LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub([], LP) -> - LP; -last_pub(List, LP) -> - {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. +last_ack( [], LA) -> LA; +last_ack(List, LA) -> LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub( [], LP) -> LP; +last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 221f6a87..4fc488b8 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -225,8 +225,8 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2). count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). -call(Sup, Msg) -> - ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). +call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity). +cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg). find_call(Sup, Id, Msg) -> Group = call(Sup, group), @@ -237,7 +237,7 @@ find_call(Sup, Id, Msg) -> %% immediately after the tx - we can't be 100% here. So we may as %% well dirty_select. case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of - [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity); + [Mirror] -> call(Mirror, Msg); [] -> {error, not_found} end. @@ -246,13 +246,16 @@ fold(FunAtom, Sup, AggFun) -> lists:foldl(AggFun, [], [apply(?SUPERVISOR, FunAtom, [D]) || M <- ?PG2:get_members(Group), - D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]). + D <- [delegate(M)]]). child(Sup, Id) -> [Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup), Id1 =:= Id], Pid. +delegate(Sup) -> child(Sup, delegate). +mirroring(Sup) -> child(Sup, mirroring). + %%---------------------------------------------------------------------------- start_internal(Group, ChildSpecs) -> @@ -261,24 +264,19 @@ start_internal(Group, ChildSpecs) -> %%---------------------------------------------------------------------------- -init({overall, Group, Init}) -> - case Init of - {ok, {Restart, ChildSpecs}} -> - Delegate = {delegate, {?SUPERVISOR, start_link, - [?MODULE, {delegate, Restart}]}, - temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - Mirroring = {mirroring, {?MODULE, start_internal, - [Group, ChildSpecs]}, - permanent, 16#ffffffff, worker, [?MODULE]}, - %% Important: Delegate MUST start before Mirroring so that - %% when we shut down from above it shuts down last, so - %% Mirroring does not see it die. - %% - %% See comment in handle_info('DOWN', ...) below - {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; - ignore -> - ignore - end; +init({overall, _Group, ignore}) -> ignore; +init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> + %% Important: Delegate MUST start before Mirroring so that when we + %% shut down from above it shuts down last, so Mirroring does not + %% see it die. + %% + %% See comment in handle_info('DOWN', ...) below + {ok, {{one_for_all, 0, 1}, + [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, + temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, + {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + permanent, 16#ffffffff, worker, [?MODULE]}]}}; + init({delegate, Restart}) -> {ok, {Restart, []}}; @@ -293,28 +291,29 @@ handle_call({init, Overall}, _From, initial_childspecs = ChildSpecs}) -> process_flag(trap_exit, true), ?PG2:create(Group), - ok = ?PG2:join(Group, self()), - Rest = ?PG2:get_members(Group) -- [self()], + ok = ?PG2:join(Group, Overall), + Rest = ?PG2:get_members(Group) -- [Overall], case Rest of [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); _ -> ok end, [begin - ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}), + ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}), erlang:monitor(process, Pid) end || Pid <- Rest], - Delegate = child(Overall, delegate), + Delegate = delegate(Overall), erlang:monitor(process, Delegate), State1 = State#state{overall = Overall, delegate = Delegate}, - case all_started([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of - true -> {reply, ok, State1}; - false -> {stop, shutdown, State1} + case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of + [] -> {reply, ok, State1}; + Errors -> {stop, {shutdown, Errors}, State1} end; handle_call({start_child, ChildSpec}, _From, - State = #state{delegate = Delegate, + State = #state{overall = Overall, + delegate = Delegate, group = Group}) -> - {reply, case maybe_start(Group, Delegate, ChildSpec) of + {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of already_in_mnesia -> {error, already_present}; {already_in_mnesia, Pid} -> {error, {already_started, Pid}}; Else -> Else @@ -327,9 +326,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; -handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) -> - {reply, Delegate, State}; - handle_call(group, _From, State = #state{group = Group}) -> {reply, Group, State}; @@ -348,7 +344,7 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', _Ref, process, Pid, Reason}, - State = #state{delegate = Pid, group = Group}) -> + State = #state{overall = Pid, group = Group}) -> %% Since the delegate is temporary, its death won't cause us to %% die. Since the overall supervisor kills processes in reverse %% order when shutting down "from above" and we started after the @@ -362,19 +358,20 @@ handle_info({'DOWN', _Ref, process, Pid, Reason}, {stop, Reason, State}; handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{delegate = Delegate, group = Group}) -> + State = #state{delegate = Delegate, group = Group, + overall = O}) -> %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. - Self = self(), R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, ChildSpecs} = - mnesia:transaction(fun() -> update_all(Pid) end), - [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; - _ -> [] + [O | _] -> {atomic, ChildSpecs} = + mnesia:transaction( + fun() -> update_all(O, Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] end, - case all_started(R) of - true -> {noreply, State}; - false -> {stop, shutdown, State} + case errors(R) of + [] -> {noreply, State}; + Errors -> {stop, {shutdown, Errors}, State} end; handle_info(Info, State) -> @@ -389,13 +386,11 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- tell_all_peers_to_die(Group, Reason) -> - [?GEN_SERVER:cast(P, {die, Reason}) || - P <- ?PG2:get_members(Group) -- [self()]]. + [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]]. -maybe_start(Group, Delegate, ChildSpec) -> - case mnesia:transaction(fun() -> - check_start(Group, Delegate, ChildSpec) - end) of +maybe_start(Group, Overall, Delegate, ChildSpec) -> + case mnesia:transaction( + fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of {atomic, start} -> start(Delegate, ChildSpec); {atomic, undefined} -> already_in_mnesia; {atomic, Pid} -> {already_in_mnesia, Pid}; @@ -403,31 +398,29 @@ maybe_start(Group, Delegate, ChildSpec) -> {aborted, E} -> {error, E} end. -check_start(Group, Delegate, ChildSpec) -> +check_start(Group, Overall, Delegate, ChildSpec) -> case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of - [] -> write(Group, ChildSpec), + [] -> write(Group, Overall, ChildSpec), start; [S] -> #mirrored_sup_childspec{key = {Group, Id}, mirroring_pid = Pid} = S, - case self() of + case Overall of Pid -> child(Delegate, Id); _ -> case supervisor(Pid) of - dead -> write(Group, ChildSpec), + dead -> write(Group, Overall, ChildSpec), start; Delegate0 -> child(Delegate0, Id) end end end. -supervisor(Pid) -> - with_exit_handler( - fun() -> dead end, - fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end). +supervisor(Pid) -> with_exit_handler(fun() -> dead end, + fun() -> delegate(Pid) end). -write(Group, ChildSpec) -> +write(Group, Overall, ChildSpec) -> ok = mnesia:write( #mirrored_sup_childspec{key = {Group, id(ChildSpec)}, - mirroring_pid = self(), + mirroring_pid = Overall, childspec = ChildSpec}), ChildSpec. @@ -453,12 +446,12 @@ check_stop(Group, Delegate, Id) -> id({Id, _, _, _, _, _}) -> Id. -update_all(OldPid) -> - MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid, +update_all(Overall, OldOverall) -> + MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall, key = '$1', childspec = '$2', _ = '_'}, - [write(Group, C) || + [write(Group, Overall, C) || [{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])]. delete_all(Group) -> @@ -468,12 +461,11 @@ delete_all(Group) -> [delete(Group, id(C)) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. -all_started(Results) -> [] =:= [R || R = {error, _} <- Results]. +errors(Results) -> [E || {error, E} <- Results]. %%---------------------------------------------------------------------------- -create_tables() -> - create_tables([?TABLE_DEF]). +create_tables() -> create_tables([?TABLE_DEF]). create_tables([]) -> ok; diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index e8baabe8..f8cbd853 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -51,7 +51,7 @@ test_migrate() -> with_sups(fun([A, _]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [a, b]). @@ -61,10 +61,10 @@ test_migrate_twice() -> with_sups(fun([A, B]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), {ok, C} = start_sup(c), Pid2 = pid_of(worker), - kill(B, Pid2), + kill_registered(B, Pid2), Pid3 = pid_of(worker), false = (Pid1 =:= Pid3), kill(C) @@ -124,7 +124,7 @@ test_large_group() -> with_sups(fun([A, _, _, _]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [a, b, c, d]). @@ -134,7 +134,7 @@ test_childspecs_at_init() -> S = childspec(worker), with_sups(fun([A, _]) -> Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [{a, [S]}, {b, [S]}]). @@ -143,7 +143,7 @@ test_anonymous_supervisors() -> with_sups(fun([A, _B]) -> ?MS:start_child(A, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [anon, anon]). @@ -157,7 +157,7 @@ test_no_migration_on_shutdown() -> with_sups(fun([Evil, _]) -> ?MS:start_child(Evil, childspec(worker)), try - call(worker, ping), + call(worker, ping, 1000, 100), exit(worker_should_not_have_migrated) catch exit:{timeout_waiting_for_server, _, _} -> ok @@ -268,7 +268,7 @@ inc_group() -> get_group(Group) -> {Group, get(counter)}. -call(Id, Msg) -> call(Id, Msg, 1000, 100). +call(Id, Msg) -> call(Id, Msg, 10*1000, 100). call(Id, Msg, 0, _Decr) -> exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); @@ -285,10 +285,16 @@ kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]); kill(Pid, Waits) -> erlang:monitor(process, Pid), [erlang:monitor(process, P) || P <- Waits], - exit(Pid, kill), + exit(Pid, bang), kill_wait(Pid), [kill_wait(P) || P <- Waits]. +kill_registered(Pid, Child) -> + {registered_name, Name} = erlang:process_info(Child, registered_name), + kill(Pid, Child), + false = (Child =:= whereis(Name)), + ok. + kill_wait(Pid) -> receive {'DOWN', _Ref, process, Pid, _Reason} -> diff --git a/src/rabbit.erl b/src/rabbit.erl index b1f786a0..fda489fe 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -18,9 +18,9 @@ -behaviour(application). --export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0, - status/0, is_running/0, is_running/1, environment/0, - rotate_logs/1, force_event_refresh/0]). +-export([start/0, boot/0, stop/0, + stop_and_halt/0, await_startup/0, status/0, is_running/0, + is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -60,7 +60,8 @@ -rabbit_boot_step({worker_pool, [{description, "worker pool"}, - {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {mfa, {rabbit_sup, start_supervisor_child, + [worker_pool_sup]}}, {requires, pre_boot}, {enables, external_infrastructure}]}). @@ -143,7 +144,8 @@ -rabbit_boot_step({mirror_queue_slave_sup, [{description, "mirror queue slave sup"}, - {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {mfa, {rabbit_sup, start_supervisor_child, + [rabbit_mirror_queue_slave_sup]}}, {requires, recovery}, {enables, routing_ready}]}). @@ -197,7 +199,8 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, + ssl_connection, ssl_record, gen_fsm, ssl]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -214,11 +217,11 @@ -type(log_location() :: 'tty' | 'undefined' | file:filename()). -type(param() :: atom()). --spec(maybe_hipe_compile/0 :: () -> 'ok'). --spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). +-spec(boot/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> no_return()). +-spec(await_startup/0 :: () -> 'ok'). -spec(status/0 :: () -> [{pid, integer()} | {running_applications, [{atom(), string(), string()}]} | @@ -261,7 +264,7 @@ maybe_hipe_compile() -> hipe_compile() -> Count = length(?HIPE_WORTHY), - io:format("HiPE compiling: |~s|~n |", + io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), PidMRefs = [spawn_monitor(fun () -> [begin @@ -283,29 +286,51 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). split0([], Ls) -> Ls; split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]). -prepare() -> - ok = ensure_working_log_handlers(), - ok = rabbit_upgrade:maybe_upgrade_mnesia(). +ensure_application_loaded() -> + %% We end up looking at the rabbit app's env for HiPE and log + %% handling, so it needs to be loaded. But during the tests, it + %% may end up getting loaded twice, so guard against that. + case application:load(rabbit) of + ok -> ok; + {error, {already_loaded, rabbit}} -> ok + end. start() -> + start_it(fun() -> + %% We do not want to HiPE compile or upgrade + %% mnesia after just restarting the app + ok = ensure_application_loaded(), + ok = ensure_working_log_handlers(), + ok = app_utils:start_applications(app_startup_order()), + ok = print_plugin_info(rabbit_plugins:active()) + end). + +boot() -> + start_it(fun() -> + ok = ensure_application_loaded(), + maybe_hipe_compile(), + ok = ensure_working_log_handlers(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(), + Plugins = rabbit_plugins:setup(), + ToBeLoaded = Plugins ++ ?APPS, + ok = app_utils:load_applications(ToBeLoaded), + StartupApps = app_utils:app_dependency_order(ToBeLoaded, + false), + ok = app_utils:start_applications(StartupApps), + ok = print_plugin_info(Plugins) + end). + +start_it(StartFun) -> try - %% prepare/1 ends up looking at the rabbit app's env, so it - %% needs to be loaded, but during the tests, it may end up - %% getting loaded twice, so guard against that - case application:load(rabbit) of - ok -> ok; - {error, {already_loaded, rabbit}} -> ok - end, - ok = prepare(), - ok = rabbit_misc:start_applications(application_load_order()) + StartFun() after - %%give the error loggers some time to catch up + %% give the error loggers some time to catch up timer:sleep(100) end. stop() -> rabbit_log:info("Stopping Rabbit~n"), - ok = rabbit_misc:stop_applications(application_load_order()). + ok = app_utils:stop_applications(app_shutdown_order()). stop_and_halt() -> try @@ -316,6 +341,9 @@ stop_and_halt() -> end, ok. +await_startup() -> + app_utils:wait_for_applications(app_startup_order()). + status() -> S1 = [{pid, list_to_integer(os:getpid())}, {running_applications, application:which_applications(infinity)}, @@ -392,46 +420,13 @@ stop(_State) -> %%--------------------------------------------------------------------------- %% application life cycle -application_load_order() -> - ok = load_applications(), - {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end, - [{App, app_dependencies(App)} || - {App, _Desc, _Vsn} <- application:loaded_applications()]), - true = digraph:del_vertices( - G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)), - Result = digraph_utils:topsort(G), - true = digraph:delete(G), - Result. - -load_applications() -> - load_applications(queue:from_list(?APPS), sets:new()). - -load_applications(Worklist, Loaded) -> - case queue:out(Worklist) of - {empty, _WorkList} -> - ok; - {{value, App}, Worklist1} -> - case sets:is_element(App, Loaded) of - true -> load_applications(Worklist1, Loaded); - false -> case application:load(App) of - ok -> ok; - {error, {already_loaded, App}} -> ok; - Error -> throw(Error) - end, - load_applications( - queue:join(Worklist1, - queue:from_list(app_dependencies(App))), - sets:add_element(App, Loaded)) - end - end. +app_startup_order() -> + ok = app_utils:load_applications(?APPS), + app_utils:app_dependency_order(?APPS, false). -app_dependencies(App) -> - case application:get_key(App, applications) of - undefined -> []; - {ok, Lst} -> Lst - end. +app_shutdown_order() -> + Apps = ?APPS ++ rabbit_plugins:active(), + app_utils:app_dependency_order(Apps, true). %%--------------------------------------------------------------------------- %% boot step logic @@ -477,7 +472,8 @@ sort_boot_steps(UnsortedSteps) -> %% there is one, otherwise fail). SortedSteps = lists:reverse( [begin - {StepName, Step} = digraph:vertex(G, StepName), + {StepName, Step} = digraph:vertex(G, + StepName), Step end || StepName <- digraph_utils:topsort(G)]), digraph:delete(G), @@ -538,7 +534,7 @@ boot_error(Format, Args) -> boot_delegate() -> {ok, Count} = application:get_env(rabbit, delegate_count), - rabbit_sup:start_child(delegate_sup, [Count]). + rabbit_sup:start_supervisor_child(delegate_sup, [Count]). recover() -> rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). @@ -559,7 +555,8 @@ insert_default_data() -> ok = rabbit_vhost:add(DefaultVHost), ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags), - ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost, + ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, + DefaultVHost, DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm), @@ -621,15 +618,12 @@ log_location(Type) -> rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). -rotate_logs(File, Suffix, OldHandler, NewHandler) -> - case File of - undefined -> ok; - tty -> ok; - _ -> gen_event:swap_handler( - error_logger, - {OldHandler, swap}, - {NewHandler, {File, Suffix}}) - end. +rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(File, Suffix, OldHandler, NewHandler) -> + gen_event:swap_handler(error_logger, + {OldHandler, swap}, + {NewHandler, {File, Suffix}}). log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, @@ -650,6 +644,24 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- %% misc +print_plugin_info([]) -> + ok; +print_plugin_info(Plugins) -> + %% This gets invoked by rabbitmqctl start_app, outside the context + %% of the rabbit application + rabbit_misc:with_local_io( + fun() -> + io:format("~n-- plugins running~n"), + [print_plugin_info( + AppName, element(2, application:get_key(AppName, vsn))) + || AppName <- Plugins], + ok + end). + +print_plugin_info(Plugin, Vsn) -> + Len = 76 - length(Vsn), + io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]). + erts_version_check() -> FoundVer = erlang:system_info(version), case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 04e0c141..d16d90a4 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -162,17 +162,17 @@ maybe_alert(UpdateFun, Node, Source, end, State#alarms{alarmed_nodes = AN1}. -alert_local(Alert, Alertees, _Source) -> - alert(Alertees, [Alert], fun erlang:'=:='/2). +alert_local(Alert, Alertees, Source) -> + alert(Alertees, Source, Alert, fun erlang:'=:='/2). alert_remote(Alert, Alertees, Source) -> - alert(Alertees, [Source, Alert], fun erlang:'=/='/2). + alert(Alertees, Source, Alert, fun erlang:'=/='/2). -alert(Alertees, AlertArg, NodeComparator) -> +alert(Alertees, Source, Alert, NodeComparator) -> Node = node(), dict:fold(fun (Pid, {M, F, A}, ok) -> case NodeComparator(Node, node(Pid)) of - true -> apply(M, F, A ++ [Pid] ++ AlertArg); + true -> apply(M, F, A ++ [Pid, Source, Alert]); false -> ok end end, ok, Alertees). @@ -181,7 +181,7 @@ internal_register(Pid, {M, F, A} = HighMemMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), case dict:find(node(), State#alarms.alarmed_nodes) of - {ok, _Sources} -> apply(M, F, A ++ [Pid, true]); + {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources]; error -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertees), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c1673504..afbaea65 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,7 +28,7 @@ -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([store_queue/1]). +-export([update/2, store_queue/1, policy_changed/2]). %% internal @@ -71,6 +71,9 @@ -spec(internal_declare/2 :: (rabbit_types:amqqueue(), boolean()) -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). +-spec(update/2 :: + (name(), + fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok'). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | rabbit_types:error('not_found'); @@ -157,6 +160,8 @@ -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()). -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(policy_changed/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -166,6 +171,9 @@ [queue_name, channel_pid, consumer_tag, ack_required]). start() -> + %% Clear out remnants of old incarnation, in case we restarted + %% faster than other nodes handled DOWN messages from us. + on_node_down(node()), DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), @@ -222,9 +230,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of - [] -> ok = store_queue(Q), - B = add_default_binding(Q), - fun () -> B(), Q end; + [] -> Q1 = rabbit_policy:set(Q), + ok = store_queue(Q1), + B = add_default_binding(Q1), + fun () -> B(), Q1 end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -237,6 +246,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> end end). +update(Name, Fun) -> + case mnesia:wread({rabbit_queue, Name}) of + [Q = #amqqueue{durable = Durable}] -> + Q1 = Fun(Q), + ok = mnesia:write(rabbit_queue, Q1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_queue, Q1, write); + _ -> ok + end; + [] -> + ok + end. + store_queue(Q = #amqqueue{durable = true}) -> ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write), ok = mnesia:write(rabbit_queue, Q, write), @@ -245,6 +267,9 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. +policy_changed(_Q1, _Q2) -> + ok. + determine_queue_nodes(Args) -> Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), @@ -508,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, @@ -573,7 +598,8 @@ on_node_down(Node) -> #amqqueue{name = QName, pid = Pid, slave_pids = []} <- mnesia:table(rabbit_queue), - node(Pid) == Node])), + node(Pid) == Node andalso + not is_process_alive(Pid)])), {Qs, Dels} = lists:unzip(QsDels), T = rabbit_binding:process_deletions( lists:foldl(fun rabbit_binding:combine_deletions/2, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5701efeb..8933de87 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of - not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, {new, Q}), - ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, - [self()]), - ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, - set_ram_duration_target, [self()]}), - BQS = bq_init(BQ, Q, Recover), - State1 = process_args(State#q{backing_queue_state = BQS}), - rabbit_event:notify(queue_created, - infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(State1, #q.stats_timer, - fun() -> emit_stats(State1) end), - noreply(State1); - Q1 -> {stop, normal, {existing, Q1}, State} - end. + not_found -> + {stop, normal, not_found, State}; + Q1 -> + case matches(Recover, Q, Q1) of + true -> + gen_server2:reply(From, {new, Q}), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, + set_ram_duration_target, [self()]}), + BQS = bq_init(BQ, Q, Recover), + State1 = process_args(State#q{backing_queue_state = BQS}), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #q.stats_timer, + fun() -> emit_stats(State1) end), + noreply(State1); + false -> + {stop, normal, {existing, Q1}, State} + end + end. + +matches(true, Q, Q) -> true; +matches(true, _Q, _Q1) -> false; +matches(false, Q1, Q2) -> + %% i.e. not policy + Q1#amqqueue.name =:= Q2#amqqueue.name andalso + Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso + Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso + Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso + Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso + Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso + Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. bq_init(BQ, Q, Recover) -> Self = self(), @@ -984,8 +1002,6 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - {basic_consume, _, _, _, _, _, _} -> 7; - {basic_cancel, _, _, _} -> 7; stat -> 7; _ -> 0 end. @@ -995,10 +1011,6 @@ prioritise_cast(Msg, _State) -> delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; - {ack, _AckTags, _ChPid} -> 7; - {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid, _Credit} -> 7; - {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 28c57bb0..dc144a0e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,10 +26,8 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). --type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(confirm_required() :: boolean()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 17d848da..734456d3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -224,6 +224,5 @@ header_routes(HeadersTable) -> {array, Routes} -> [Route || {longstr, Route} <- Routes]; undefined -> []; {Type, _Val} -> throw({error, {unacceptable_type_in_header, - Type, - binary_to_list(HeaderKey)}}) + binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb44797e..f0ea514d 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -173,13 +173,11 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), - ok = rabbit_exchange:callback( - Src, add_binding, [transaction, Src, B]), + x_callback(transaction, Src, add_binding, B), Serial = rabbit_exchange:serial(Src), fun () -> - ok = rabbit_exchange:callback( - Src, add_binding, [Serial, Src, B]), - ok = rabbit_event:notify(binding_created, info(B)) + x_callback(Serial, Src, add_binding, B), + ok = rabbit_event:notify(binding_created, info(B)) end; false -> rabbit_misc:const({error, binding_not_found}) end. @@ -487,4 +485,5 @@ process_deletions(Deletions) -> del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. -x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). +x_callback(Serial, X, F, Bs) -> + ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]). diff --git a/src/rabbit_control.erl b/src/rabbit_control_main.erl index 8b24d2e3..b23088cc 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control_main.erl @@ -14,7 +14,7 @@ %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% --module(rabbit_control). +-module(rabbit_control_main). -include("rabbit.hrl"). -export([start/0, stop/0, action/5]). @@ -26,6 +26,61 @@ -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(QUIET_DEF, {?QUIET_OPT, flag}). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). +-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). + +-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). + +-define(COMMANDS, + [stop, + stop_app, + start_app, + wait, + reset, + force_reset, + rotate_logs, + + cluster, + force_cluster, + cluster_status, + + add_user, + delete_user, + change_password, + clear_password, + set_user_tags, + list_users, + + add_vhost, + delete_vhost, + list_vhosts, + {set_permissions, [?VHOST_DEF]}, + {clear_permissions, [?VHOST_DEF]}, + {list_permissions, [?VHOST_DEF]}, + list_user_permissions, + + set_parameter, + clear_parameter, + list_parameters, + + {list_queues, [?VHOST_DEF]}, + {list_exchanges, [?VHOST_DEF]}, + {list_bindings, [?VHOST_DEF]}, + {list_connections, [?VHOST_DEF]}, + list_channels, + {list_consumers, [?VHOST_DEF]}, + status, + environment, + report, + eval, + + close_connection, + {trace_on, [?VHOST_DEF]}, + {trace_off, [?VHOST_DEF]}, + set_vm_memory_high_watermark + ]). + -define(GLOBAL_QUERIES, [{"Connections", rabbit_networking, connection_info_all, connection_info_keys}, @@ -57,19 +112,18 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), - {[Command0 | Args], Opts} = - case rabbit_misc:get_options([{flag, ?QUIET_OPT}, - {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}], - init:get_plain_arguments()) of - {[], _Opts} -> usage(); - CmdArgsAndOpts -> CmdArgsAndOpts + {Command, Opts, Args} = + case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr), + init:get_plain_arguments()) + of + {ok, Res} -> Res; + no_command -> print_error("could not recognise command", []), + usage() end, Opts1 = [case K of ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; _ -> {K, V} end || {K, V} <- Opts], - Command = list_to_atom(Command0), Quiet = proplists:get_bool(?QUIET_OPT, Opts1), Node = proplists:get_value(?NODE_OPT, Opts1), Inform = case Quiet of @@ -102,6 +156,11 @@ start() -> {'EXIT', {badarg, _}} -> print_error("invalid parameter: ~p", [Args]), usage(); + {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) -> + %% We handle this common case specially to avoid ~p since + %% that has i18n issues + print_error("~s: ~s", [Problem, Reason]), + rabbit_misc:quit(2); {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); @@ -194,8 +253,7 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), - wait_for_application(Node, PidFile, rabbit, Inform); - + wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); action(wait, Node, [PidFile, App], _Opts, Inform) -> Inform("Waiting for ~p on ~p", [App, Node]), wait_for_application(Node, PidFile, list_to_atom(App), Inform); @@ -216,33 +274,33 @@ action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Reopening logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, [""]}); action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> - Inform("Rotating logs to files with suffix ~p", [Suffix]), + Inform("Rotating logs to files with suffix \"~s\"", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> - Inform("Closing connection ~s", [PidStr]), + Inform("Closing connection \"~s\"", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, [rabbit_misc:string_to_pid(PidStr), Explanation]); action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> - Inform("Creating user ~p", [Username]), + Inform("Creating user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, add_user, Args}); action(delete_user, Node, Args = [_Username], _Opts, Inform) -> - Inform("Deleting user ~p", Args), + Inform("Deleting user \"~s\"", Args), call(Node, {rabbit_auth_backend_internal, delete_user, Args}); action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> - Inform("Changing password for user ~p", [Username]), + Inform("Changing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, change_password, Args}); action(clear_password, Node, Args = [Username], _Opts, Inform) -> - Inform("Clearing password for user ~p", [Username]), + Inform("Clearing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, clear_password, Args}); action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> Tags = [list_to_atom(T) || T <- TagsStr], - Inform("Setting tags for user ~p to ~p", [Username, Tags]), + Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]), rpc_call(Node, rabbit_auth_backend_internal, set_tags, [list_to_binary(Username), Tags]); @@ -253,11 +311,11 @@ action(list_users, Node, [], _Opts, Inform) -> rabbit_auth_backend_internal:user_info_keys()); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Creating vhost ~p", Args), + Inform("Creating vhost \"~s\"", Args), call(Node, {rabbit_vhost, add, Args}); action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Deleting vhost ~p", Args), + Inform("Deleting vhost \"~s\"", Args), call(Node, {rabbit_vhost, delete, Args}); action(list_vhosts, Node, Args, _Opts, Inform) -> @@ -319,12 +377,12 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(trace_on, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Starting tracing for vhost ~p", [VHost]), + Inform("Starting tracing for vhost \"~s\"", [VHost]), rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); action(trace_off, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Stopping tracing for vhost ~p", [VHost]), + Inform("Stopping tracing for vhost \"~s\"", [VHost]), rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> @@ -337,23 +395,42 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), + Inform("Setting permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), call(Node, {rabbit_auth_backend_internal, set_permissions, [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), + Inform("Clearing permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), call(Node, {rabbit_auth_backend_internal, clear_permissions, [Username, VHost]}); action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost ~p", [VHost]), + Inform("Listing permissions in vhost \"~s\"", [VHost]), display_info_list(call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}), rabbit_auth_backend_internal:vhost_perms_info_keys()); +action(set_parameter, Node, [Component, Key, Value], _Opts, Inform) -> + Inform("Setting runtime parameter ~p for component ~p to ~p", + [Key, Component, Value]), + rpc_call(Node, rabbit_runtime_parameters, parse_set, + [list_to_binary(Component), list_to_binary(Key), Value]); + +action(clear_parameter, Node, [Component, Key], _Opts, Inform) -> + Inform("Clearing runtime parameter ~p for component ~p", [Key, Component]), + rpc_call(Node, rabbit_runtime_parameters, clear, [list_to_binary(Component), + list_to_binary(Key)]); + +action(list_parameters, Node, Args = [], _Opts, Inform) -> + Inform("Listing runtime parameters", []), + display_info_list( + rpc_call(Node, rabbit_runtime_parameters, list_formatted, Args), + rabbit_runtime_parameters:info_keys()); + action(report, Node, _Args, _Opts, Inform) -> io:format("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || @@ -388,12 +465,22 @@ wait_for_application(Node, PidFile, Application, Inform) -> Inform("pid is ~s", [Pid]), wait_for_application(Node, Pid, Application). +wait_for_application(Node, Pid, rabbit_and_plugins) -> + wait_for_startup(Node, Pid); wait_for_application(Node, Pid, Application) -> + while_process_is_alive( + Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end). + +wait_for_startup(Node, Pid) -> + while_process_is_alive( + Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end). + +while_process_is_alive(Node, Pid, Activity) -> case process_up(Pid) of - true -> case rabbit_nodes:is_running(Node, Application) of + true -> case Activity() of true -> ok; false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), - wait_for_application(Node, Pid, Application) + while_process_is_alive(Node, Pid, Activity) end; false -> {error, process_not_running} end. @@ -408,12 +495,14 @@ wait_for_process_death(Pid) -> read_pid_file(PidFile, Wait) -> case {file:read_file(PidFile), Wait} of {{ok, Bin}, _} -> - S = string:strip(binary_to_list(Bin), right, $\n), - try list_to_integer(S) + S = binary_to_list(Bin), + {match, [PidS]} = re:run(S, "[^\\s]+", + [{capture, all, list}]), + try list_to_integer(PidS) catch error:badarg -> exit({error, {garbage_in_pid_file, PidFile}}) end, - S; + PidS; {{error, enoent}, true} -> timer:sleep(?EXTERNAL_CHECK_INTERVAL), read_pid_file(PidFile, Wait); @@ -425,8 +514,7 @@ read_pid_file(PidFile, Wait) -> % rpc:call(os, getpid, []) at this point process_up(Pid) -> with_os([{unix, fun () -> - system("ps -p " ++ Pid - ++ " >/dev/null 2>&1") =:= 0 + run_ps(Pid) =:= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -444,15 +532,17 @@ with_os(Handlers) -> Handler -> Handler() end. -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. +run_ps(Pid) -> + Port = erlang:open_port({spawn, "ps -p " ++ Pid}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index a471d282..c07ad832 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -47,16 +47,10 @@ %%---------------------------------------------------------------------------- -boot() -> - {ok, _} = - supervisor2:start_child( - rabbit_sup, - {rabbit_direct_client_sup, - {rabbit_client_sup, start_link, +boot() -> rabbit_sup:start_supervisor_child( + rabbit_direct_client_sup, rabbit_client_sup, [{local, rabbit_direct_client_sup}, - {rabbit_channel_sup, start_link, []}]}, - transient, infinity, supervisor, [rabbit_client_sup]}), - ok. + {rabbit_channel_sup, start_link, []}]). force_event_refresh() -> [Pid ! force_event_refresh || Pid<- list()], diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 831d5290..58375abb 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -27,7 +27,7 @@ set_check_interval/1, get_disk_free/0]). -define(SERVER, ?MODULE). --define(DEFAULT_DISK_CHECK_INTERVAL, 60000). +-define(DEFAULT_DISK_CHECK_INTERVAL, 10000). -record(state, {dir, limit, @@ -87,9 +87,9 @@ init([Limit]) -> vm_memory_monitor:get_total_memory()} of {N1, N2} when is_integer(N1), is_integer(N2) -> {ok, set_disk_limits(State, Limit)}; - _ -> + Err -> rabbit_log:info("Disabling disk free space monitoring " - "on unsupported platform~n"), + "on unsupported platform: ~p~n", [Err]), {stop, unsupported_platform} end. @@ -167,9 +167,9 @@ get_disk_free(Dir, {unix, Sun}) get_disk_free(Dir, {unix, _}) -> parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> - parse_free_win32(rabbit_misc:os_cmd("dir /-C /W " ++ Dir)); -get_disk_free(_, _) -> - unknown. + parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"])); +get_disk_free(_, Platform) -> + {unknown, Platform}. parse_free_unix(CommandResult) -> [_, Stats | _] = string:tokens(CommandResult, "\n"), @@ -178,8 +178,9 @@ parse_free_unix(CommandResult) -> parse_free_win32(CommandResult) -> LastLine = lists:last(string:tokens(CommandResult, "\r\n")), - [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "), - list_to_integer(Free). + {match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)", + [{capture, all_but_first, list}]), + list_to_integer(lists:reverse(Free)). interpret_limit({mem_relative, R}) -> round(R * vm_memory_monitor:get_total_memory()); diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 4ec141cf..3f1b20fe 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -139,6 +139,6 @@ notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> %% TODO: switch to os:timestamp() when we drop support for %% Erlang/OTP < R13B01 - gen_event:notify(rabbit_event, #event{type = Type, - props = Props, - timestamp = now()}). + gen_event:notify(?MODULE, #event{type = Type, + props = Props, + timestamp = now()}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 910a89b4..57c571f1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,13 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, callback/3, declare/6, +-export([recover/0, policy_changed/2, callback/4, declare/6, assert_equivalence/6, assert_args_equivalence/2, check_type/1, - lookup/1, lookup_or_die/1, list/1, update_scratch/2, + lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1, peek_serial/1]). +-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). %%---------------------------------------------------------------------------- @@ -37,7 +37,12 @@ -type(fun_name() :: atom()). -spec(recover/0 :: () -> [name()]). --spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). +-spec(callback/4:: + (rabbit_types:exchange(), fun_name(), + fun((boolean()) -> non_neg_integer()) | atom(), + [any()]) -> 'ok'). +-spec(policy_changed/2 :: + (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -58,7 +63,13 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). --spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok'). +-spec(lookup_scratch/2 :: (name(), atom()) -> + rabbit_types:ok(term()) | + rabbit_types:error('not_found')). +-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok'). +-spec(update/2 :: + (name(), + fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok'). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -76,14 +87,16 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(serial/1 :: (rabbit_types:exchange()) -> + fun((boolean()) -> 'none' | pos_integer())). -spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined'). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments, + policy]). recover() -> Xs = rabbit_misc:table_filter( @@ -95,21 +108,52 @@ recover() -> true -> store(X); false -> ok end, - rabbit_exchange:callback(X, create, [map_create_tx(Tx), X]) + callback(X, create, map_create_tx(Tx), [X]) end, rabbit_durable_exchange), [XName || #exchange{name = XName} <- Xs]. -callback(#exchange{type = XType}, Fun, Args) -> - apply(type_to_module(XType), Fun, Args). +callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> + Serial = fun (Bool) -> + case Serial0 of + _ when is_atom(Serial0) -> Serial0; + _ -> Serial0(Bool) + end + end, + [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) + || M <- decorators()], + Module = type_to_module(XType), + apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). + +policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). + +serialise_events(X = #exchange{type = Type}) -> + case [Serialise || M <- decorators(), + Serialise <- [M:serialise_events(X)], + Serialise == true] of + [] -> (type_to_module(Type)):serialise_events(); + _ -> true + end. + +serial(#exchange{name = XName} = X) -> + Serial = case serialise_events(X) of + true -> next_serial(XName); + false -> none + end, + fun (true) -> Serial; + (false) -> none + end. + +decorators() -> + [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> - X = #exchange{name = XName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - internal = Internal, - arguments = Args}, + X = rabbit_policy:set(#exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + internal = Internal, + arguments = Args}), XT = type_to_module(Type), %% We want to upset things if it isn't ok ok = XT:validate(X), @@ -129,7 +173,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = XT:create(map_create_tx(Tx), Exchange), + ok = callback(X, create, map_create_tx(Tx), [Exchange]), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -141,13 +185,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> map_create_tx(true) -> transaction; map_create_tx(false) -> none. -store(X = #exchange{name = Name, type = Type}) -> - ok = mnesia:write(rabbit_exchange, X, write), - case (type_to_module(Type)):serialise_events() of - true -> S = #exchange_serial{name = Name, next = 1}, - ok = mnesia:write(rabbit_exchange_serial, S, write); - false -> ok - end. +store(X) -> ok = mnesia:write(rabbit_exchange, X, write). %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -200,23 +238,51 @@ list(VHostPath) -> rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). -update_scratch(Name, Fun) -> +lookup_scratch(Name, App) -> + case lookup(Name) of + {ok, #exchange{scratches = undefined}} -> + {error, not_found}; + {ok, #exchange{scratches = Scratches}} -> + case orddict:find(App, Scratches) of + {ok, Value} -> {ok, Value}; + error -> {error, not_found} + end; + {error, not_found} -> + {error, not_found} + end. + +update_scratch(Name, App, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> - case mnesia:wread({rabbit_exchange, Name}) of - [X = #exchange{durable = Durable, scratch = Scratch}] -> - X1 = X#exchange{scratch = Fun(Scratch)}, - ok = mnesia:write(rabbit_exchange, X1, write), - case Durable of - true -> ok = mnesia:write(rabbit_durable_exchange, - X1, write); - _ -> ok - end; - [] -> - ok - end + update(Name, + fun(X = #exchange{scratches = Scratches0}) -> + Scratches1 = case Scratches0 of + undefined -> orddict:new(); + _ -> Scratches0 + end, + Scratch = case orddict:find(App, Scratches1) of + {ok, S} -> S; + error -> undefined + end, + Scratches2 = orddict:store( + App, Fun(Scratch), Scratches1), + X#exchange{scratches = Scratches2} + end) end). +update(Name, Fun) -> + case mnesia:wread({rabbit_exchange, Name}) of + [X = #exchange{durable = Durable}] -> + X1 = Fun(X), + ok = mnesia:write(rabbit_exchange, X1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_exchange, X1, write); + _ -> ok + end; + [] -> + ok + end. + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -232,6 +298,7 @@ i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; +i(policy, X) -> rabbit_policy:name(X); i(Item, _) -> throw({bad_argument, Item}). info(X = #exchange{}) -> infos(?INFO_KEYS, X). @@ -341,23 +408,18 @@ unconditional_delete(X = #exchange{name = XName}) -> Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. -serial(#exchange{name = XName, type = Type}) -> - case (type_to_module(Type)):serialise_events() of - true -> next_serial(XName); - false -> none - end. - next_serial(XName) -> - [#exchange_serial{next = Serial}] = - mnesia:read(rabbit_exchange_serial, XName, write), + Serial = peek_serial(XName, write), ok = mnesia:write(rabbit_exchange_serial, #exchange_serial{name = XName, next = Serial + 1}, write), Serial. -peek_serial(XName) -> - case mnesia:read({rabbit_exchange_serial, XName}) of +peek_serial(XName) -> peek_serial(XName, read). + +peek_serial(XName, LockType) -> + case mnesia:read(rabbit_exchange_serial, XName, LockType) of [#exchange_serial{next = Serial}] -> Serial; - _ -> undefined + _ -> 1 end. invalid_module(T) -> diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl new file mode 100644 index 00000000..b40ceda9 --- /dev/null +++ b/src/rabbit_exchange_decorator.erl @@ -0,0 +1,71 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_exchange_decorator). + +%% This is like an exchange type except that: +%% +%% 1) It applies to all exchanges as soon as it is installed, therefore +%% 2) It is not allowed to affect validation, so no validate/1 or +%% assert_args_equivalence/2 +%% 3) It also can't affect routing +%% +%% It's possible in the future we might relax 3), or even make these +%% able to manipulate messages as they are published. + +-ifdef(use_specs). + +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). + +-callback description() -> [proplist:property()]. + +%% Should Rabbit ensure that all binding events that are +%% delivered to an individual exchange can be serialised? (they +%% might still be delivered out of order, but there'll be a +%% serial number). +-callback serialise_events(rabbit_types:exchange()) -> boolean(). + +%% called after declaration and recovery +-callback create(tx(), rabbit_types:exchange()) -> 'ok'. + +%% called after exchange (auto)deletion. +-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> + 'ok'. + +%% called after a binding has been added or recovered +-callback add_binding(serial(), rabbit_types:exchange(), + rabbit_types:binding()) -> 'ok'. + +%% called after bindings have been deleted. +-callback remove_bindings(serial(), rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'. + +%% called when the policy attached to this exchange changes. +-callback policy_changed ( + serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, + {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 1027570c..e6470b72 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -58,6 +58,10 @@ rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit(). +%% called when the policy attached to this exchange changes. +-callback policy_changed ( + serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. + -else. -export([behaviour_info/1]). @@ -65,7 +69,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1}, {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3}, - {assert_args_equivalence, 2}]; + {assert_args_equivalence, 2}, {policy_changed, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index cdec1cb9..9a5665c0 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -43,6 +43,7 @@ route(#exchange{name = Name}, validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index a64f2c29..d9a2f60f 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) -> validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 61917d8f..516b78e5 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 82d27960..101fe434 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). description() -> @@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) -> validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 3160fdf4..644d9acf 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, add_binding/3, +-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) -> delete(none, _Exchange, _Bs) -> ok. +policy_changed(_Tx, _X1, _X2) -> ok. + add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); add_binding(none, _Exchange, _Binding) -> diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 59df14f3..a95f8f26 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -102,9 +102,12 @@ read_file_info(File) -> with_fhc_handle(fun () -> prim_file:read_file_info(File) end). with_fhc_handle(Fun) -> - ok = file_handle_cache:obtain(), + with_fhc_handle(1, Fun). + +with_fhc_handle(N, Fun) -> + [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)], try Fun() - after ok = file_handle_cache:release() + after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)] end. read_term_file(File) -> @@ -165,7 +168,7 @@ make_binary(List) -> {error, Reason} end. - +%% TODO the semantics of this function are rather odd. But see bug 25021. append_file(File, Suffix) -> case read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -183,9 +186,11 @@ append_file(File, 0, Suffix) -> end end); append_file(File, _, Suffix) -> - case with_fhc_handle(fun () -> prim_file:read_file(File) end) of - {ok, Data} -> write_file([File, Suffix], Data, [append]); - Error -> Error + case with_fhc_handle(2, fun () -> + file:copy(File, {[File, Suffix], [append]}) + end) of + {ok, _BytesCopied} -> ok; + Error -> Error end. ensure_parent_dirs_exist(Filename) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9fa6213b..2b15498e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -192,7 +192,7 @@ terminate(_, _) -> ok. code_change(_, State, _) -> - State. + {ok, State}. %%---------------------------------------------------------------------------- %% Internal plumbing diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 17e2ffb4..3e058793 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> noreply(State); handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> - noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); + +handle_cast({delete_and_terminate, Reason}, State) -> + {stop, Reason, State}. handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), @@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); +handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> + ok = gen_server2:cast(CPid, Msg), + {stop, {shutdown, ring_shutdown}}; handle_msg([_CPid], _From, _Msg) -> ok. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4e71cc43..750bcd56 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -127,10 +127,21 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> + Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], + MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + monitor_wait(MRefs), State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }. +monitor_wait([]) -> + ok; +monitor_wait([MRef | MRefs]) -> + receive({'DOWN', MRef, process, _Pid, _Info}) -> + ok + end, + monitor_wait(MRefs). + purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e412fbbc..03fafc3e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> ok; handle_msg([SPid], _From, {process_death, Pid}) -> inform_deaths(SPid, [Pid]); +handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> + ok = gen_server2:cast(CPid, {gm, Msg}), + {stop, {shutdown, ring_shutdown}}; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). inform_deaths(SPid, Deaths) -> - rabbit_misc:with_exit_handler( - fun () -> {stop, normal} end, - fun () -> - case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of - ok -> - ok; - {promote, CPid} -> - {become, rabbit_mirror_queue_coordinator, [CPid]} - end - end). + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> ok; + {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} + end. %% --------------------------------------------------------------------------- %% Others diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index 8eacb1f3..a2034876 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor2). --export([start/0, start_link/0, start_child/2]). +-export([start_link/0, start_child/2]). -export([init/1]). @@ -26,20 +26,9 @@ -define(SERVER, ?MODULE). -start() -> - {ok, _} = - supervisor2:start_child( - rabbit_sup, - {rabbit_mirror_queue_slave_sup, - {rabbit_mirror_queue_slave_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), - ok. +start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []). -start_link() -> - supervisor2:start_link({local, ?SERVER}, ?MODULE, []). - -start_child(Node, Args) -> - supervisor2:start_child({?SERVER, Node}, Args). +start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args). init([]) -> {ok, {{simple_one_for_one_terminate, 10, 10}, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0aacd654..d41aa09b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -19,7 +19,7 @@ -include("rabbit_framing.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). --export([die/1, frame_error/2, amqp_error/4, +-export([die/1, frame_error/2, amqp_error/4, quit/1, quit/2, protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). @@ -40,25 +40,24 @@ -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]). --export([start_applications/1, stop_applications/1]). +-export([format/2, format_many/1, format_stderr/2]). +-export([with_local_io/1, local_info_msg/2]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). --export([get_options/2]). +-export([parse_arguments/3]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). --export([pget/2, pget/3, pget_or_die/2]). +-export([pget/2, pget/3, pget_or_die/2, pset/3]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). -export([multi_call/2]). --export([quit/1]). -export([os_cmd/1]). -export([gb_sets_difference/2]). @@ -71,7 +70,7 @@ -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). -type(resource_name() :: binary()). --type(optdef() :: {flag, string()} | {option, string(), any()}). +-type(optdef() :: flag | {option, string()}). -type(channel_or_connection_exit() :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). -type(digraph_label() :: term()). @@ -86,6 +85,10 @@ -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). -spec(die/1 :: (rabbit_framing:amqp_exception()) -> channel_or_connection_exit()). + +-spec(quit/1 :: (integer()) -> no_return()). +-spec(quit/2 :: (string(), [term()]) -> no_return()). + -spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary()) -> rabbit_types:connection_exit()). -spec(amqp_error/4 :: @@ -158,11 +161,10 @@ -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). -spec(format/2 :: (string(), [any()]) -> string()). +-spec(format_many/1 :: ([{string(), [any()]}]) -> string()). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). --spec(start_applications/1 :: ([atom()]) -> 'ok'). --spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). @@ -180,8 +182,12 @@ -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). --spec(get_options/2 :: ([optdef()], [string()]) - -> {[string()], [{string(), any()}]}). +-spec(parse_arguments/3 :: + ([{atom(), [{string(), optdef()}]} | atom()], + [{string(), optdef()}], + [string()]) + -> {'ok', {atom(), [{string(), string()}], [string()]}} | + 'no_command'). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) @@ -199,11 +205,11 @@ -spec(pget/2 :: (term(), [term()]) -> term()). -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). +-spec(pset/3 :: (term(), term(), [term()]) -> term()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). -spec(multi_call/2 :: ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). --spec(quit/1 :: (integer() | string()) -> no_return()). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). @@ -384,6 +390,28 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> confirm_to_sender(Pid, MsgSeqNos) -> gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). +%% +%% @doc Halts the emulator after printing out an error message io-formatted with +%% the supplied arguments. The exit status of the beam process will be set to 1. +%% +quit(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + quit(1). + +%% +%% @doc Halts the emulator returning the given status code to the os. +%% On Windows this function will block indefinitely so as to give the io +%% subsystem time to flush stdout completely. +%% +quit(Status) -> + case os:type() of + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status), + receive + after infinity -> ok + end + end. + throw_on_error(E, Thunk) -> case Thunk() of {error, Reason} -> throw({E, Reason}); @@ -551,6 +579,9 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). +format_many(List) -> + lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]). + format_stderr(Fmt, Args) -> case os:type() of {unix, _} -> @@ -583,34 +614,6 @@ with_local_io(Fun) -> local_info_msg(Format, Args) -> with_local_io(fun () -> error_logger:info_msg(Format, Args) end). -manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> - Iterate(fun (App, Acc) -> - case Do(App) of - ok -> [App | Acc]; - {error, {SkipError, _}} -> Acc; - {error, Reason} -> - lists:foreach(Undo, Acc), - throw({error, {ErrorTag, App, Reason}}) - end - end, [], Apps), - ok. - -start_applications(Apps) -> - manage_applications(fun lists:foldl/3, - fun application:start/1, - fun application:stop/1, - already_started, - cannot_start_application, - Apps). - -stop_applications(Apps) -> - manage_applications(fun lists:foldr/3, - fun application:stop/1, - fun application:start/1, - not_started, - cannot_stop_application, - Apps). - unfold(Fun, Init) -> unfold(Fun, [], Init). @@ -730,39 +733,63 @@ gb_trees_fold1(Fun, Acc, {Key, Val, It}) -> gb_trees_foreach(Fun, Tree) -> gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree). -%% Separate flags and options from arguments. -%% get_options([{flag, "-q"}, {option, "-p", "/"}], -%% ["set_permissions","-p","/","guest", -%% "-q",".*",".*",".*"]) -%% == {["set_permissions","guest",".*",".*",".*"], -%% [{"-q",true},{"-p","/"}]} -get_options(Defs, As) -> - lists:foldl(fun(Def, {AsIn, RsIn}) -> - {K, {AsOut, V}} = - case Def of - {flag, Key} -> - {Key, get_flag(Key, AsIn)}; - {option, Key, Default} -> - {Key, get_option(Key, Default, AsIn)} - end, - {AsOut, [{K, V} | RsIn]} - end, {As, []}, Defs). - -get_option(K, _Default, [K, V | As]) -> - {As, V}; -get_option(K, Default, [Nk | As]) -> - {As1, V} = get_option(K, Default, As), - {[Nk | As1], V}; -get_option(_, Default, As) -> - {As, Default}. - -get_flag(K, [K | As]) -> - {As, true}; -get_flag(K, [Nk | As]) -> - {As1, V} = get_flag(K, As), - {[Nk | As1], V}; -get_flag(_, []) -> - {[], false}. +%% Takes: +%% * A list of [{atom(), [{string(), optdef()]} | atom()], where the atom()s +%% are the accepted commands and the optional [string()] is the list of +%% accepted options for that command +%% * A list [{string(), optdef()}] of options valid for all commands +%% * The list of arguments given by the user +%% +%% Returns either {ok, {atom(), [{string(), string()}], [string()]} which are +%% respectively the command, the key-value pairs of the options and the leftover +%% arguments; or no_command if no command could be parsed. +parse_arguments(Commands, GlobalDefs, As) -> + lists:foldl(maybe_process_opts(GlobalDefs, As), no_command, Commands). + +maybe_process_opts(GDefs, As) -> + fun({C, Os}, no_command) -> + process_opts(atom_to_list(C), dict:from_list(GDefs ++ Os), As); + (C, no_command) -> + (maybe_process_opts(GDefs, As))({C, []}, no_command); + (_, {ok, Res}) -> + {ok, Res} + end. + +process_opts(C, Defs, As0) -> + KVs0 = dict:map(fun (_, flag) -> false; + (_, {option, V}) -> V + end, Defs), + process_opts(Defs, C, As0, not_found, KVs0, []). + +%% Consume flags/options until you find the correct command. If there are no +%% arguments or the first argument is not the command we're expecting, fail. +%% Arguments to this are: definitions, cmd we're looking for, args we +%% haven't parsed, whether we have found the cmd, options we've found, +%% plain args we've found. +process_opts(_Defs, C, [], found, KVs, Outs) -> + {ok, {list_to_atom(C), dict:to_list(KVs), lists:reverse(Outs)}}; +process_opts(_Defs, _C, [], not_found, _, _) -> + no_command; +process_opts(Defs, C, [A | As], Found, KVs, Outs) -> + OptType = case dict:find(A, Defs) of + error -> none; + {ok, flag} -> flag; + {ok, {option, _}} -> option + end, + case {OptType, C, Found} of + {flag, _, _} -> process_opts( + Defs, C, As, Found, dict:store(A, true, KVs), + Outs); + {option, _, _} -> case As of + [] -> no_command; + [V | As1] -> process_opts( + Defs, C, As1, Found, + dict:store(A, V, KVs), Outs) + end; + {none, A, _} -> process_opts(Defs, C, As, found, KVs, Outs); + {none, _, found} -> process_opts(Defs, C, As, found, KVs, [A | Outs]); + {none, _, _} -> no_command + end. now_ms() -> timer:now_diff(now(), {0,0,0}) div 1000. @@ -848,6 +875,8 @@ pget_or_die(K, P) -> V -> V end. +pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)]. + format_message_queue(_Opt, MQ) -> Len = priority_queue:len(MQ), {Len, @@ -901,13 +930,6 @@ receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) end. -%% the slower shutdown on windows required to flush stdout -quit(Status) -> - case os:type() of - {unix, _} -> halt(Status); - {win32, _} -> init:stop(Status) - end. - os_cmd(Command) -> Exec = hd(string:tokens(Command, " ")), case os:find_executable(Exec) of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c714d3a7..7e9346f9 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -296,6 +296,11 @@ table_definitions() -> [{record_name, exchange_serial}, {attributes, record_info(fields, exchange_serial)}, {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, + {rabbit_runtime_parameters, + [{record_name, runtime_parameters}, + {attributes, record_info(fields, runtime_parameters)}, + {disc_copies, [node()]}, + {match, #runtime_parameters{_='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index e6a05335..bedf5142 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, - recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - maybe_fast_close/1, sockname/1, peername/1, peercert/1, - connection_string/2]). + recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, + close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1, + tune_buffer_size/1, connection_string/2]). %%--------------------------------------------------------------------------- @@ -34,6 +34,8 @@ -type(ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any())). -type(ok_or_any_error() :: rabbit_types:ok_or_error(any())). -type(socket() :: port() | #ssl_socket{}). +-type(opts() :: [{atom(), any()} | + {raw, non_neg_integer(), non_neg_integer(), binary()}]). -spec(is_ssl/1 :: (socket()) -> boolean()). -spec(ssl_info/1 :: (socket()) @@ -49,9 +51,12 @@ -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). --spec(setopts/2 :: (socket(), [{atom(), any()} | - {raw, non_neg_integer(), non_neg_integer(), - binary()}]) -> ok_or_any_error()). +-spec(getopts/2 :: (socket(), [atom() | {raw, + non_neg_integer(), + non_neg_integer(), + non_neg_integer() | binary()}]) + -> ok_val_or_error(opts())). +-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). -spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()). @@ -64,6 +69,7 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). +-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()). -spec(connection_string/2 :: (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). @@ -126,6 +132,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) -> port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). +getopts(Sock, Options) when ?IS_SSL(Sock) -> + ssl:getopts(Sock#ssl_socket.ssl, Options); +getopts(Sock, Options) when is_port(Sock) -> + inet:getopts(Sock, Options). + setopts(Sock, Options) when ?IS_SSL(Sock) -> ssl:setopts(Sock#ssl_socket.ssl, Options); setopts(Sock, Options) when is_port(Sock) -> @@ -149,6 +160,13 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. +tune_buffer_size(Sock) -> + case getopts(Sock, [sndbuf, recbuf, buffer]) of + {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), + setopts(Sock, [{buffer, BufSz}]); + Err -> Err + end. + connection_string(Sock, Direction) -> {From, To} = case Direction of inbound -> {fun peername/1, fun sockname/1}; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index f0c75d23..94a5a2b7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -136,18 +136,13 @@ boot_ssl() -> ok end. -start() -> - {ok,_} = supervisor2:start_child( - rabbit_sup, - {rabbit_tcp_client_sup, - {rabbit_client_sup, start_link, - [{local, rabbit_tcp_client_sup}, - {rabbit_connection_sup,start_link,[]}]}, - transient, infinity, supervisor, [rabbit_client_sup]}), - ok. +start() -> rabbit_sup:start_supervisor_child( + rabbit_tcp_client_sup, rabbit_client_sup, + [{local, rabbit_tcp_client_sup}, + {rabbit_connection_sup,start_link,[]}]). ensure_ssl() -> - ok = rabbit_misc:start_applications([crypto, public_key, ssl]), + ok = app_utils:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), % unknown_ca errors are silently ignored prior to R14B unless we diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index b6a9e263..1c23632d 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -57,8 +57,7 @@ diagnostics(Nodes) -> "hosts, their running nodes and ports:", [Nodes]}] ++ [diagnostics_host(Host) || Host <- Hosts] ++ diagnostics0(), - lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags, - {F, A} <- [NodeDiag]]). + rabbit_misc:format_many(lists:flatten(NodeDiags)). diagnostics0() -> [{"~ncurrent node details:~n- node name: ~w", [node()]}, diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl new file mode 100644 index 00000000..af940dde --- /dev/null +++ b/src/rabbit_parameter_validation.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_parameter_validation). + +-export([number/2, binary/2, list/2, proplist/3]). + +number(_Name, Term) when is_number(Term) -> + ok; + +number(Name, Term) -> + {error, "~s should be number, actually was ~p", [Name, Term]}. + +binary(_Name, Term) when is_binary(Term) -> + ok; + +binary(Name, Term) -> + {error, "~s should be binary, actually was ~p", [Name, Term]}. + +list(_Name, Term) when is_list(Term) -> + ok; + +list(Name, Term) -> + {error, "~s should be list, actually was ~p", [Name, Term]}. + +proplist(Name, Constraints, Term) when is_list(Term) -> + {Results, Remainder} + = lists:foldl( + fun ({Key, Fun, Needed}, {Results0, Term0}) -> + case {lists:keytake(Key, 1, Term0), Needed} of + {{value, {Key, Value}, Term1}, _} -> + {[Fun(Key, Value) | Results0], + Term1}; + {false, mandatory} -> + {[{error, "Key \"~s\" not found in ~s", + [Key, Name]} | Results0], Term0}; + {false, optional} -> + {Results0, Term0} + end + end, {[], Term}, Constraints), + case Remainder of + [] -> Results; + _ -> [{error, "Unrecognised terms ~p in ~s", [Remainder, Name]} + | Results] + end; + +proplist(Name, _Constraints, Term) -> + {error, "~s not a list ~p", [Name, Term]}. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 2a93c8f2..7cf6eea9 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -17,146 +17,57 @@ -module(rabbit_plugins). -include("rabbit.hrl"). --export([start/0, stop/0, find_plugins/1, read_enabled_plugins/1, - lookup_plugins/2, calculate_required_plugins/2, plugin_names/1]). +-export([setup/0, active/0, read_enabled/1, + list/1, dependencies/3]). --define(VERBOSE_OPT, "-v"). --define(MINIMAL_OPT, "-m"). --define(ENABLED_OPT, "-E"). --define(ENABLED_ALL_OPT, "-e"). +-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). +-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). +-define(ENABLED_DEF, {?ENABLED_OPT, flag}). +-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start/0 :: () -> no_return()). --spec(stop/0 :: () -> 'ok'). --spec(find_plugins/1 :: (file:filename()) -> [#plugin{}]). --spec(read_enabled_plugins/1 :: (file:filename()) -> [atom()]). --spec(lookup_plugins/2 :: ([atom()], [#plugin{}]) -> [#plugin{}]). --spec(calculate_required_plugins/2 :: ([atom()], [#plugin{}]) -> [atom()]). --spec(plugin_names/1 :: ([#plugin{}]) -> [atom()]). - --endif. - -%%---------------------------------------------------------------------------- - -start() -> - {ok, [[PluginsFile|_]|_]} = - init:get_argument(enabled_plugins_file), - {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), - {[Command0 | Args], Opts} = - case rabbit_misc:get_options([{flag, ?VERBOSE_OPT}, - {flag, ?MINIMAL_OPT}, - {flag, ?ENABLED_OPT}, - {flag, ?ENABLED_ALL_OPT}], - init:get_plain_arguments()) of - {[], _Opts} -> usage(); - CmdArgsAndOpts -> CmdArgsAndOpts - end, - Command = list_to_atom(Command0), - PrintInvalidCommandError = - fun () -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]) - end, - - case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of - ok -> - rabbit_misc:quit(0); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - PrintInvalidCommandError(), - usage(); - {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> - PrintInvalidCommandError(), - usage(); - {error, Reason} -> - print_error("~p", [Reason]), - rabbit_misc:quit(2); - {error_string, Reason} -> - print_error("~s", [Reason]), - rabbit_misc:quit(2); - Other -> - print_error("~p", [Other]), - rabbit_misc:quit(2) - end. - -stop() -> - ok. - -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +-define(GLOBAL_DEFS, []). -usage() -> - io:format("~s", [rabbit_plugins_usage:usage()]), - rabbit_misc:quit(1). +-define(COMMANDS, + [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, + enable, + disable]). %%---------------------------------------------------------------------------- -action(list, [], Opts, PluginsFile, PluginsDir) -> - action(list, [".*"], Opts, PluginsFile, PluginsDir); -action(list, [Pat], Opts, PluginsFile, PluginsDir) -> - format_plugins(Pat, Opts, PluginsFile, PluginsDir); +-ifdef(use_specs). -action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> - case ToEnable0 of - [] -> throw({error_string, "Not enough arguments for 'enable'"}); - _ -> ok - end, - AllPlugins = find_plugins(PluginsDir), - Enabled = read_enabled_plugins(PluginsFile), - ImplicitlyEnabled = calculate_required_plugins(Enabled, AllPlugins), - ToEnable = [list_to_atom(Name) || Name <- ToEnable0], - Missing = ToEnable -- plugin_names(AllPlugins), - case Missing of - [] -> ok; - _ -> throw({error_string, - fmt_list("The following plugins could not be found:", - Missing)}) - end, - NewEnabled = lists:usort(Enabled ++ ToEnable), - write_enabled_plugins(PluginsFile, NewEnabled), - NewImplicitlyEnabled = calculate_required_plugins(NewEnabled, AllPlugins), - maybe_warn_mochiweb(NewImplicitlyEnabled), - case NewEnabled -- ImplicitlyEnabled of - [] -> io:format("Plugin configuration unchanged.~n"); - _ -> print_list("The following plugins have been enabled:", - NewImplicitlyEnabled -- ImplicitlyEnabled), - report_change() - end; +-spec(setup/0 :: () -> [atom()]). +-spec(active/0 :: () -> [atom()]). +-spec(list/1 :: (string()) -> [#plugin{}]). +-spec(read_enabled/1 :: (file:filename()) -> [atom()]). +-spec(dependencies/3 :: + (boolean(), [atom()], [#plugin{}]) -> [atom()]). -action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> - case ToDisable0 of - [] -> throw({error_string, "Not enough arguments for 'disable'"}); - _ -> ok - end, - ToDisable = [list_to_atom(Name) || Name <- ToDisable0], - Enabled = read_enabled_plugins(PluginsFile), - AllPlugins = find_plugins(PluginsDir), - Missing = ToDisable -- plugin_names(AllPlugins), - case Missing of - [] -> ok; - _ -> print_list("Warning: the following plugins could not be found:", - Missing) - end, - ToDisableDeps = calculate_dependencies(true, ToDisable, AllPlugins), - NewEnabled = Enabled -- ToDisableDeps, - case length(Enabled) =:= length(NewEnabled) of - true -> io:format("Plugin configuration unchanged.~n"); - false -> ImplicitlyEnabled = - calculate_required_plugins(Enabled, AllPlugins), - NewImplicitlyEnabled = - calculate_required_plugins(NewEnabled, AllPlugins), - print_list("The following plugins have been disabled:", - ImplicitlyEnabled -- NewImplicitlyEnabled), - write_enabled_plugins(PluginsFile, NewEnabled), - report_change() - end. +-endif. %%---------------------------------------------------------------------------- -%% Get the #plugin{}s ready to be enabled. -find_plugins(PluginsDir) -> +%% +%% @doc Prepares the file system and installs all enabled plugins. +%% +setup() -> + {ok, PluginDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + {ok, EnabledPluginsFile} = application:get_env(rabbit, + enabled_plugins_file), + prepare_plugins(EnabledPluginsFile, PluginDir, ExpandDir), + [prepare_dir_plugin(PluginName) || + PluginName <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")]. + +%% @doc Lists the plugins which are currently running. +active() -> + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ], + [App || {App, _, _} <- application:which_applications(), + lists:member(App, InstalledPlugins)]. + +%% @doc Get the list of plugins which are ready to be enabled. +list(PluginsDir) -> EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)], FreeApps = [{app, App} || App <- filelib:wildcard("*/ebin/*.app", PluginsDir)], @@ -175,6 +86,91 @@ find_plugins(PluginsDir) -> end, Plugins. +%% @doc Read the list of enabled plugins from the supplied term file. +read_enabled(PluginsFile) -> + case rabbit_file:read_term_file(PluginsFile) of + {ok, [Plugins]} -> Plugins; + {ok, []} -> []; + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, + PluginsFile, Reason}}) + end. + +%% +%% @doc Calculate the dependency graph from <i>Sources</i>. +%% When Reverse =:= true the bottom/leaf level applications are returned in +%% the resulting list, otherwise they're skipped. +%% +dependencies(Reverse, Sources, AllPlugins) -> + {ok, G} = rabbit_misc:build_acyclic_graph( + fun (App, _Deps) -> [{App, App}] end, + fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, + [{Name, Deps} + || #plugin{name = Name, dependencies = Deps} <- AllPlugins]), + Dests = case Reverse of + false -> digraph_utils:reachable(Sources, G); + true -> digraph_utils:reaching(Sources, G) + end, + true = digraph:delete(G), + Dests. + +%%---------------------------------------------------------------------------- + +prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) -> + AllPlugins = list(PluginsDistDir), + Enabled = read_enabled(EnabledPluginsFile), + ToUnpack = dependencies(false, Enabled, AllPlugins), + ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), + + Missing = Enabled -- plugin_names(ToUnpackPlugins), + case Missing of + [] -> ok; + _ -> io:format("Warning: the following enabled plugins were " + "not found: ~p~n", [Missing]) + end, + + %% Eliminate the contents of the destination directory + case delete_recursively(DestDir) of + ok -> ok; + {error, E} -> rabbit_misc:quit("Could not delete dir ~s (~p)", + [DestDir, E]) + end, + case filelib:ensure_dir(DestDir ++ "/") of + ok -> ok; + {error, E2} -> rabbit_misc:quit("Could not create dir ~s (~p)", + [DestDir, E2]) + end, + + [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins]. + +prepare_dir_plugin(PluginAppDescFn) -> + %% Add the plugin ebin directory to the load path + PluginEBinDirN = filename:dirname(PluginAppDescFn), + code:add_path(PluginEBinDirN), + + %% We want the second-last token + NameTokens = string:tokens(PluginAppDescFn,"/."), + PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens), + list_to_atom(PluginNameString). + +%%---------------------------------------------------------------------------- + +delete_recursively(Fn) -> + case rabbit_file:recursive_delete([Fn]) of + ok -> ok; + {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; + Error -> Error + end. + +prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) -> + zip:unzip(Location, [{cwd, PluginDestDir}]); +prepare_plugin(#plugin{type = dir, name = Name, location = Location}, + PluginsDestDir) -> + rabbit_file:recursive_copy(Location, + filename:join([PluginsDestDir, Name])). + %% Get the #plugin{} from an .ez. get_plugin_info(Base, {ez, EZ0}) -> EZ = filename:join([Base, EZ0]), @@ -234,82 +230,6 @@ parse_binary(Bin) -> Err -> {error, {invalid_app, Err}} end. -%% Pretty print a list of plugins. -format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> - Verbose = proplists:get_bool(?VERBOSE_OPT, Opts), - Minimal = proplists:get_bool(?MINIMAL_OPT, Opts), - Format = case {Verbose, Minimal} of - {false, false} -> normal; - {true, false} -> verbose; - {false, true} -> minimal; - {true, true} -> throw({error_string, - "Cannot specify -m and -v together"}) - end, - OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), - OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts), - - AvailablePlugins = find_plugins(PluginsDir), - EnabledExplicitly = read_enabled_plugins(PluginsFile), - EnabledImplicitly = - calculate_required_plugins(EnabledExplicitly, AvailablePlugins) -- - EnabledExplicitly, - {ok, RE} = re:compile(Pattern), - Plugins = [ Plugin || - Plugin = #plugin{name = Name} <- AvailablePlugins, - re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - true -> true - end, - if OnlyEnabledAll -> - lists:member(Name, EnabledImplicitly) or - lists:member(Name, EnabledExplicitly); - true -> - true - end], - Plugins1 = usort_plugins(Plugins), - MaxWidth = lists:max([length(atom_to_list(Name)) || - #plugin{name = Name} <- Plugins1] ++ [0]), - [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format, - MaxWidth) || P <- Plugins1], - ok. - -format_plugin(#plugin{name = Name, version = Version, - description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) -> - Glyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly)} of - {true, false} -> "[E]"; - {false, true} -> "[e]"; - _ -> "[ ]" - end, - case Format of - minimal -> io:format("~s~n", [Name]); - normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ - "w ~s~n", [Glyph, Name, Version]); - verbose -> io:format("~s ~w~n", [Glyph, Name]), - io:format(" Version: \t~s~n", [Version]), - case Deps of - [] -> ok; - _ -> io:format(" Dependencies:\t~p~n", [Deps]) - end, - io:format(" Description:\t~s~n", [Description]), - io:format("~n") - end. - -print_list(Header, Plugins) -> - io:format(fmt_list(Header, Plugins)). - -fmt_list(Header, Plugins) -> - lists:flatten( - [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). - -usort_plugins(Plugins) -> - lists:usort(fun plugins_cmp/2, Plugins). - -plugins_cmp(#plugin{name = N1, version = V1}, - #plugin{name = N2, version = V2}) -> - {N1, V1} =< {N2, V2}. - %% Filter out applications that can be loaded *right now*. filter_applications(Applications) -> [Application || Application <- Applications, @@ -332,72 +252,3 @@ plugin_names(Plugins) -> %% Find plugins by name in a list of plugins. lookup_plugins(Names, AllPlugins) -> [P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)]. - -%% Read the enabled plugin names from disk. -read_enabled_plugins(PluginsFile) -> - case rabbit_file:read_term_file(PluginsFile) of - {ok, [Plugins]} -> Plugins; - {ok, []} -> []; - {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, - PluginsFile}}); - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, - PluginsFile, Reason}}) - end. - -%% Write the enabled plugin names on disk. -write_enabled_plugins(PluginsFile, Plugins) -> - case rabbit_file:write_term_file(PluginsFile, [Plugins]) of - ok -> ok; - {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file, - PluginsFile, Reason}}) - end. - -calculate_required_plugins(Sources, AllPlugins) -> - calculate_dependencies(false, Sources, AllPlugins). - -calculate_dependencies(Reverse, Sources, AllPlugins) -> - {ok, G} = rabbit_misc:build_acyclic_graph( - fun (App, _Deps) -> [{App, App}] end, - fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, - [{Name, Deps} - || #plugin{name = Name, dependencies = Deps} <- AllPlugins]), - Dests = case Reverse of - false -> digraph_utils:reachable(Sources, G); - true -> digraph_utils:reaching(Sources, G) - end, - true = digraph:delete(G), - Dests. - -maybe_warn_mochiweb(Enabled) -> - V = erlang:system_info(otp_release), - case lists:member(mochiweb, Enabled) andalso V < "R13B01" of - true -> - Stars = string:copies("*", 80), - io:format("~n~n~s~n" - " Warning: Mochiweb enabled and Erlang version ~s " - "detected.~n" - " Enabling plugins that depend on Mochiweb is not " - "supported on this Erlang~n" - " version. At least R13B01 is required.~n~n" - " RabbitMQ will not start successfully in this " - "configuration. You *must*~n" - " disable the Mochiweb plugin, or upgrade Erlang.~n" - "~s~n~n~n", [Stars, V, Stars]); - false -> - ok - end. - -report_change() -> - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n"), - case os:type() of - {win32, _OsName} -> - io:format("If you have RabbitMQ running as a service then you must" - " reinstall by running~n rabbitmq-service.bat stop~n" - " rabbitmq-service.bat install~n" - " rabbitmq-service.bat start~n~n"); - _ -> - ok - end. - diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl new file mode 100644 index 00000000..572cf150 --- /dev/null +++ b/src/rabbit_plugins_main.erl @@ -0,0 +1,273 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_plugins_main). +-include("rabbit.hrl"). + +-export([start/0, stop/0]). + +-define(VERBOSE_OPT, "-v"). +-define(MINIMAL_OPT, "-m"). +-define(ENABLED_OPT, "-E"). +-define(ENABLED_ALL_OPT, "-e"). + +-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). +-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). +-define(ENABLED_DEF, {?ENABLED_OPT, flag}). +-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). + +-define(GLOBAL_DEFS, []). + +-define(COMMANDS, + [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, + enable, + disable]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). +-spec(usage/0 :: () -> no_return()). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + {ok, [[PluginsFile|_]|_]} = + init:get_argument(enabled_plugins_file), + {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), + {Command, Opts, Args} = + case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS, + init:get_plain_arguments()) + of + {ok, Res} -> Res; + no_command -> print_error("could not recognise command", []), + usage() + end, + + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, + + case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + ok -> + rabbit_misc:quit(0); + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> + PrintInvalidCommandError(), + usage(); + {error, Reason} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + {error_string, Reason} -> + print_error("~s", [Reason]), + rabbit_misc:quit(2); + Other -> + print_error("~p", [Other]), + rabbit_misc:quit(2) + end. + +stop() -> + ok. + +%%---------------------------------------------------------------------------- + +action(list, [], Opts, PluginsFile, PluginsDir) -> + action(list, [".*"], Opts, PluginsFile, PluginsDir); +action(list, [Pat], Opts, PluginsFile, PluginsDir) -> + format_plugins(Pat, Opts, PluginsFile, PluginsDir); + +action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> + case ToEnable0 of + [] -> throw({error_string, "Not enough arguments for 'enable'"}); + _ -> ok + end, + AllPlugins = rabbit_plugins:list(PluginsDir), + Enabled = rabbit_plugins:read_enabled(PluginsFile), + ImplicitlyEnabled = rabbit_plugins:dependencies(false, + Enabled, AllPlugins), + ToEnable = [list_to_atom(Name) || Name <- ToEnable0], + Missing = ToEnable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw({error_string, + fmt_list("The following plugins could not be found:", + Missing)}) + end, + NewEnabled = lists:usort(Enabled ++ ToEnable), + write_enabled_plugins(PluginsFile, NewEnabled), + NewImplicitlyEnabled = rabbit_plugins:dependencies(false, + NewEnabled, AllPlugins), + maybe_warn_mochiweb(NewImplicitlyEnabled), + case NewEnabled -- ImplicitlyEnabled of + [] -> io:format("Plugin configuration unchanged.~n"); + _ -> print_list("The following plugins have been enabled:", + NewImplicitlyEnabled -- ImplicitlyEnabled), + report_change() + end; + +action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> + case ToDisable0 of + [] -> throw({error_string, "Not enough arguments for 'disable'"}); + _ -> ok + end, + ToDisable = [list_to_atom(Name) || Name <- ToDisable0], + Enabled = rabbit_plugins:read_enabled(PluginsFile), + AllPlugins = rabbit_plugins:list(PluginsDir), + Missing = ToDisable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> print_list("Warning: the following plugins could not be found:", + Missing) + end, + ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins), + NewEnabled = Enabled -- ToDisableDeps, + case length(Enabled) =:= length(NewEnabled) of + true -> io:format("Plugin configuration unchanged.~n"); + false -> ImplicitlyEnabled = + rabbit_plugins:dependencies(false, Enabled, AllPlugins), + NewImplicitlyEnabled = + rabbit_plugins:dependencies(false, + NewEnabled, AllPlugins), + print_list("The following plugins have been disabled:", + ImplicitlyEnabled -- NewImplicitlyEnabled), + write_enabled_plugins(PluginsFile, NewEnabled), + report_change() + end. + +%%---------------------------------------------------------------------------- + +print_error(Format, Args) -> + rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). + +usage() -> + io:format("~s", [rabbit_plugins_usage:usage()]), + rabbit_misc:quit(1). + +%% Pretty print a list of plugins. +format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> + Verbose = proplists:get_bool(?VERBOSE_OPT, Opts), + Minimal = proplists:get_bool(?MINIMAL_OPT, Opts), + Format = case {Verbose, Minimal} of + {false, false} -> normal; + {true, false} -> verbose; + {false, true} -> minimal; + {true, true} -> throw({error_string, + "Cannot specify -m and -v together"}) + end, + OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), + OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts), + + AvailablePlugins = rabbit_plugins:list(PluginsDir), + EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile), + EnabledImplicitly = + rabbit_plugins:dependencies(false, EnabledExplicitly, + AvailablePlugins) -- EnabledExplicitly, + {ok, RE} = re:compile(Pattern), + Plugins = [ Plugin || + Plugin = #plugin{name = Name} <- AvailablePlugins, + re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> (lists:member(Name, + EnabledExplicitly) or + lists:member(Name, EnabledImplicitly)); + true -> true + end], + Plugins1 = usort_plugins(Plugins), + MaxWidth = lists:max([length(atom_to_list(Name)) || + #plugin{name = Name} <- Plugins1] ++ [0]), + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format, + MaxWidth) || P <- Plugins1], + ok. + +format_plugin(#plugin{name = Name, version = Version, + description = Description, dependencies = Deps}, + EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) -> + Glyph = case {lists:member(Name, EnabledExplicitly), + lists:member(Name, EnabledImplicitly)} of + {true, false} -> "[E]"; + {false, true} -> "[e]"; + _ -> "[ ]" + end, + case Format of + minimal -> io:format("~s~n", [Name]); + normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ + "w ~s~n", [Glyph, Name, Version]); + verbose -> io:format("~s ~w~n", [Glyph, Name]), + io:format(" Version: \t~s~n", [Version]), + case Deps of + [] -> ok; + _ -> io:format(" Dependencies:\t~p~n", [Deps]) + end, + io:format(" Description:\t~s~n", [Description]), + io:format("~n") + end. + +print_list(Header, Plugins) -> + io:format(fmt_list(Header, Plugins)). + +fmt_list(Header, Plugins) -> + lists:flatten( + [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). + +usort_plugins(Plugins) -> + lists:usort(fun plugins_cmp/2, Plugins). + +plugins_cmp(#plugin{name = N1, version = V1}, + #plugin{name = N2, version = V2}) -> + {N1, V1} =< {N2, V2}. + +%% Return the names of the given plugins. +plugin_names(Plugins) -> + [Name || #plugin{name = Name} <- Plugins]. + +%% Write the enabled plugin names on disk. +write_enabled_plugins(PluginsFile, Plugins) -> + case rabbit_file:write_term_file(PluginsFile, [Plugins]) of + ok -> ok; + {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file, + PluginsFile, Reason}}) + end. + +maybe_warn_mochiweb(Enabled) -> + V = erlang:system_info(otp_release), + case lists:member(mochiweb, Enabled) andalso V < "R13B01" of + true -> + Stars = string:copies("*", 80), + io:format("~n~n~s~n" + " Warning: Mochiweb enabled and Erlang version ~s " + "detected.~n" + " Enabling plugins that depend on Mochiweb is not " + "supported on this Erlang~n" + " version. At least R13B01 is required.~n~n" + " RabbitMQ will not start successfully in this " + "configuration. You *must*~n" + " disable the Mochiweb plugin, or upgrade Erlang.~n" + "~s~n~n~n", [Stars, V, Stars]); + false -> + ok + end. + +report_change() -> + io:format("Plugin configuration has changed. " + "Restart RabbitMQ for changes to take effect.~n"). diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl new file mode 100644 index 00000000..1551795f --- /dev/null +++ b/src/rabbit_policy.erl @@ -0,0 +1,156 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_policy). + +%% TODO specs + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +-export([register/0]). +-export([name/1, get/2, set/1]). +-export([validate/3, validate_clear/2, notify/3, notify_clear/2]). + +-rabbit_boot_step({?MODULE, + [{description, "policy parameters"}, + {mfa, {rabbit_policy, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +register() -> + rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE). + +name(#amqqueue{policy = Policy}) -> name0(Policy); +name(#exchange{policy = Policy}) -> name0(Policy). + +name0(undefined) -> none; +name0(Policy) -> pget(<<"name">>, Policy). + +set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; +set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. + +set0(Name) -> match(Name, list()). + +get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); +get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); +%% Caution - SLOW. +get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())). + +get0(_Name, undefined) -> {error, not_found}; +get0(Name, List) -> case pget(<<"policy">>, List) of + undefined -> {error, not_found}; + Policy -> case pget(Name, Policy) of + undefined -> {error, not_found}; + Value -> {ok, Value} + end + end. + +%%---------------------------------------------------------------------------- + +validate(<<"policy">>, Name, Term) -> + rabbit_parameter_validation:proplist( + Name, policy_validation(), Term). + +validate_clear(<<"policy">>, _Name) -> + ok. + +notify(<<"policy">>, _Name, _Term) -> + update_policies(). + +notify_clear(<<"policy">>, _Name) -> + update_policies(). + +%%---------------------------------------------------------------------------- + +list() -> + [[{<<"name">>, pget(key, P)} | pget(value, P)] + || P <- rabbit_runtime_parameters:list(<<"policy">>)]. + +update_policies() -> + Policies = list(), + {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( + fun() -> + {[update_exchange(X, Policies) || + VHost <- rabbit_vhost:list(), + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies) || + VHost <- rabbit_vhost:list(), + Q <- rabbit_amqqueue:list(VHost)]} + end), + [notify(X) || X <- Xs], + [notify(Q) || Q <- Qs], + ok. + +update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> + NewPolicy = match(XName, Policies), + case NewPolicy of + OldPolicy -> no_change; + _ -> rabbit_exchange:update( + XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), + {X, X#exchange{policy = NewPolicy}} + end. + +update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> + NewPolicy = match(QName, Policies), + case NewPolicy of + OldPolicy -> no_change; + _ -> rabbit_amqqueue:update( + QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), + {Q, Q#amqqueue{policy = NewPolicy}} + end. + +notify(no_change)-> + ok; +notify({X1 = #exchange{}, X2 = #exchange{}}) -> + rabbit_exchange:policy_changed(X1, X2); +notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) -> + rabbit_amqqueue:policy_changed(Q1, Q2). + +match(Name, Policies) -> + case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of + [] -> undefined; + [Policy | _Rest] -> Policy + end. + +matches(#resource{name = Name, virtual_host = VHost}, Policy) -> + Prefix = pget(<<"prefix">>, Policy), + case pget(<<"vhost">>, Policy) of + undefined -> prefix(Prefix, Name); + VHost -> prefix(Prefix, Name); + _ -> false + end. + +prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)). + +sort_pred(A, B) -> + R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)), + case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of + {undefined, undefined} -> R; + {undefined, _} -> true; + {_, undefined} -> false; + _ -> R + end. + +%%---------------------------------------------------------------------------- + +policy_validation() -> + [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory}, + {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 162d44f1..d56211b5 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -31,212 +31,21 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). -%% Shut dialyzer up --spec(terminate/1 :: (string()) -> no_return()). --spec(terminate/2 :: (string(), [any()]) -> no_return()). -endif. %%---------------------------------------------------------------------------- start() -> - io:format("Activating RabbitMQ plugins ...~n"), - - %% Determine our various directories - [EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir, NodeStr] = - init:get_plain_arguments(), - RootName = UnpackedPluginDir ++ "/rabbit", - - prepare_plugins(EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir), - - %% Build a list of required apps based on the fixed set, and any plugins - PluginApps = find_plugins(UnpackedPluginDir), - RequiredApps = ?BaseApps ++ PluginApps, - - %% Build the entire set of dependencies - this will load the - %% applications along the way - AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {failed_to_load_app, App, Err} -> - terminate("failed to load application ~s:~n~p", - [App, Err]); - AppList -> - AppList - end, - AppVersions = [determine_version(App) || App <- AllApps], - RabbitVersion = proplists:get_value(rabbit, AppVersions), - - %% Build the overall release descriptor - RDesc = {release, - {"rabbit", RabbitVersion}, - {erts, erlang:system_info(version)}, - AppVersions}, - - %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel - rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), - - %% We exclude mochiweb due to its optional use of fdsrv. - XRefExclude = [mochiweb], - - %% Compile the script - ScriptFile = RootName ++ ".script", - case systools:make_script(RootName, [local, silent, - {exref, AllApps -- XRefExclude}]) of - {ok, Module, Warnings} -> - %% This gets lots of spurious no-source warnings when we - %% have .ez files, so we want to supress them to prevent - %% hiding real issues. On Ubuntu, we also get warnings - %% about kernel/stdlib sources being out of date, which we - %% also ignore for the same reason. - WarningStr = Module:format_warning( - [W || W <- Warnings, - case W of - {warning, {source_not_found, _}} -> false; - {warning, {obj_out_of_date, {_,_,WApp,_,_}}} - when WApp == mnesia; - WApp == stdlib; - WApp == kernel; - WApp == sasl; - WApp == crypto; - WApp == os_mon -> false; - _ -> true - end]), - case length(WarningStr) of - 0 -> ok; - _ -> S = string:copies("*", 80), - io:format("~n~s~n~s~s~n~n", [S, WarningStr, S]) - end, - ok; - {error, Module, Error} -> - terminate("generation of boot script file ~s failed:~n~s", - [ScriptFile, Module:format_error(Error)]) - end, - - case post_process_script(ScriptFile) of - ok -> ok; - {error, Reason} -> - terminate("post processing of boot script file ~s failed:~n~w", - [ScriptFile, Reason]) - end, - case systools:script2boot(RootName) of - ok -> ok; - error -> terminate("failed to compile boot script file ~s", - [ScriptFile]) - end, - io:format("~w plugins activated:~n", [length(PluginApps)]), - [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) - || App <- PluginApps], - io:nl(), - + [NodeStr] = init:get_plain_arguments(), ok = duplicate_node_check(NodeStr), - - terminate(0), + rabbit_misc:quit(0), ok. stop() -> ok. -determine_version(App) -> - application:load(App), - {ok, Vsn} = application:get_key(App, vsn), - {App, Vsn}. - -delete_recursively(Fn) -> - case rabbit_file:recursive_delete([Fn]) of - ok -> ok; - {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; - Error -> Error - end. - -prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) -> - AllPlugins = rabbit_plugins:find_plugins(PluginsDistDir), - Enabled = rabbit_plugins:read_enabled_plugins(EnabledPluginsFile), - ToUnpack = rabbit_plugins:calculate_required_plugins(Enabled, AllPlugins), - ToUnpackPlugins = rabbit_plugins:lookup_plugins(ToUnpack, AllPlugins), - - Missing = Enabled -- rabbit_plugins:plugin_names(ToUnpackPlugins), - case Missing of - [] -> ok; - _ -> io:format("Warning: the following enabled plugins were " - "not found: ~p~n", [Missing]) - end, - - %% Eliminate the contents of the destination directory - case delete_recursively(DestDir) of - ok -> ok; - {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E]) - end, - case filelib:ensure_dir(DestDir ++ "/") of - ok -> ok; - {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2]) - end, - - [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins]. - -prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) -> - zip:unzip(Location, [{cwd, PluginDestDir}]); -prepare_plugin(#plugin{type = dir, name = Name, location = Location}, - PluginsDestDir) -> - rabbit_file:recursive_copy(Location, - filename:join([PluginsDestDir, Name])). - -find_plugins(PluginDir) -> - [prepare_dir_plugin(PluginName) || - PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")]. - -prepare_dir_plugin(PluginAppDescFn) -> - %% Add the plugin ebin directory to the load path - PluginEBinDirN = filename:dirname(PluginAppDescFn), - code:add_path(PluginEBinDirN), - - %% We want the second-last token - NameTokens = string:tokens(PluginAppDescFn,"/."), - PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens), - list_to_atom(PluginNameString). - -expand_dependencies(Pending) -> - expand_dependencies(sets:new(), Pending). -expand_dependencies(Current, []) -> - Current; -expand_dependencies(Current, [Next|Rest]) -> - case sets:is_element(Next, Current) of - true -> - expand_dependencies(Current, Rest); - false -> - case application:load(Next) of - ok -> - ok; - {error, {already_loaded, _}} -> - ok; - {error, Reason} -> - throw({failed_to_load_app, Next, Reason}) - end, - {ok, Required} = application:get_key(Next, applications), - Unique = [A || A <- Required, not(sets:is_element(A, Current))], - expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) - end. - -post_process_script(ScriptFile) -> - case file:consult(ScriptFile) of - {ok, [{script, Name, Entries}]} -> - NewEntries = lists:flatmap(fun process_entry/1, Entries), - case file:open(ScriptFile, [write]) of - {ok, Fd} -> - io:format(Fd, "%% script generated at ~w ~w~n~p.~n", - [date(), time(), {script, Name, NewEntries}]), - file:close(Fd), - ok; - {error, OReason} -> - {error, {failed_to_open_script_file_for_writing, OReason}} - end; - {error, Reason} -> - {error, {failed_to_load_script, Reason}} - end. - -process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> - [{apply,{rabbit,maybe_hipe_compile,[]}}, - {apply,{rabbit,prepare,[]}}, Entry]; -process_entry(Entry) -> - [Entry]. +%%---------------------------------------------------------------------------- %% Check whether a node with the same name is already running duplicate_node_check([]) -> @@ -252,11 +61,11 @@ duplicate_node_check(NodeStr) -> "already running on ~p~n", [NodeName, NodeHost]), io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"), - terminate(?ERROR_CODE); + rabbit_misc:quit(?ERROR_CODE); false -> ok end; {error, EpmdReason} -> - terminate("epmd error for host ~p: ~p (~s)~n", + rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n", [NodeHost, EpmdReason, case EpmdReason of address -> "unable to establish tcp connection"; @@ -264,16 +73,3 @@ duplicate_node_check(NodeStr) -> _ -> inet:format_error(EpmdReason) end]) end. - -terminate(Fmt, Args) -> - io:format("ERROR: " ++ Fmt ++ "~n", Args), - terminate(?ERROR_CODE). - -terminate(Status) -> - case os:type() of - {unix, _} -> halt(Status); - {win32, _} -> init:stop(Status), - receive - after infinity -> ok - end - end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5e9e78d3..bd5cf588 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_resources/2, server_properties/1]). +-export([conserve_resources/3, server_properties/1]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -133,7 +133,7 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_resources(Pid, Conserve) -> +conserve_resources(Pid, _Source, Conserve) -> Pid ! {conserve_resources, Conserve}, ok. @@ -222,6 +222,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_by = none, last_blocked_at = never}, try + ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), handshake, 8)), @@ -312,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> mainloop(Deb, State); -handle_other(timeout, _Deb, #v1{connection_state = S}) -> - throw({timeout, S}); +handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> + throw({heartbeat_timeout, S}); handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), @@ -682,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end, Parent = self(), - ReceiveFun = fun() -> Parent ! timeout end, + ReceiveFun = fun() -> Parent ! heartbeat_timeout end, Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, @@ -735,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -%% Compute frame_max for this instance. Could simply use 0, but breaks -%% QPid Java client. server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 8c0ebcbe..e14bbba0 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -23,7 +23,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]). +-export([register/3, unregister/2, + binary_to_type/1, lookup_module/2, lookup_all/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -32,6 +33,7 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/3 :: (atom(), binary(), atom()) -> 'ok'). +-spec(unregister/2 :: (atom(), binary()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). -spec(lookup_module/2 :: @@ -50,6 +52,9 @@ start_link() -> register(Class, TypeName, ModuleName) -> gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). +unregister(Class, TypeName) -> + gen_server:call(?SERVER, {unregister, Class, TypeName}, infinity). + %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it %% can throw a badarg, indicating that the type cannot have been @@ -83,6 +88,10 @@ internal_register(Class, TypeName, ModuleName) {{Class, internal_binary_to_type(TypeName)}, ModuleName}), ok. +internal_unregister(Class, TypeName) -> + true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}), + ok. + sanity_check_module(ClassModule, Module) -> case catch lists:member(ClassModule, lists:flatten( @@ -95,8 +104,10 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator. %%--------------------------------------------------------------------------- @@ -108,6 +119,10 @@ handle_call({register, Class, TypeName, ModuleName}, _From, State) -> ok = internal_register(Class, TypeName, ModuleName), {reply, ok, State}; +handle_call({unregister, Class, TypeName}, _From, State) -> + ok = internal_unregister(Class, TypeName), + {reply, ok, State}; + handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl new file mode 100644 index 00000000..c7d30116 --- /dev/null +++ b/src/rabbit_runtime_parameter.erl @@ -0,0 +1,43 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_runtime_parameter). + +-ifdef(use_specs). + +-type(validate_results() :: + 'ok' | {error, string(), [term()]} | [validate_results()]). + +-callback validate(binary(), binary(), term()) -> validate_results(). +-callback validate_clear(binary(), binary()) -> validate_results(). +-callback notify(binary(), binary(), term()) -> 'ok'. +-callback notify_clear(binary(), binary()) -> 'ok'. + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {validate, 3}, + {validate_clear, 2}, + {notify, 3}, + {notify_clear, 2} + ]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl new file mode 100644 index 00000000..3a54e8f6 --- /dev/null +++ b/src/rabbit_runtime_parameters.erl @@ -0,0 +1,243 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_runtime_parameters). + +-include("rabbit.hrl"). + +-export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1, + list_formatted/0, lookup/2, value/2, value/3, info_keys/0]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ok_or_error_string() :: 'ok' | {'error_string', string()}). + +-spec(parse_set/3 :: (binary(), binary(), string()) -> ok_or_error_string()). +-spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()). +-spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()). +-spec(list/0 :: () -> [rabbit_types:infos()]). +-spec(list/1 :: (binary()) -> [rabbit_types:infos()]). +-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). +-spec(list_formatted/0 :: () -> [rabbit_types:infos()]). +-spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()). +-spec(value/2 :: (binary(), binary()) -> term()). +-spec(value/3 :: (binary(), binary(), term()) -> term()). +-spec(info_keys/0 :: () -> rabbit_types:info_keys()). + +-endif. + +%%--------------------------------------------------------------------------- + +-import(rabbit_misc, [pget/2, pset/3]). + +-define(TABLE, rabbit_runtime_parameters). + +%%--------------------------------------------------------------------------- + +parse_set(Component, Key, String) -> + case parse(String) of + {ok, Term} -> set(Component, Key, Term); + {errors, L} -> format_error(L) + end. + +set(Component, Key, Term) -> + case set0(Component, Key, Term) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +format_error(L) -> + {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. + +set0(Component, Key, Term) -> + case lookup_component(Component) of + {ok, Mod} -> + case flatten_errors(validate(Term)) of + ok -> + case flatten_errors(Mod:validate(Component, Key, Term)) of + ok -> + case mnesia_update(Component, Key, Term) of + {old, Term} -> ok; + _ -> Mod:notify(Component, Key, Term) + end, + ok; + E -> + E + end; + E -> + E + end; + E -> + E + end. + +mnesia_update(Component, Key, Term) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + Res = case mnesia:read(?TABLE, {Component, Key}, read) of + [] -> new; + [Params] -> {old, Params#runtime_parameters.value} + end, + ok = mnesia:write(?TABLE, c(Component, Key, Term), write), + Res + end). + +clear(Component, Key) -> + case clear0(Component, Key) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +clear0(Component, Key) -> + case lookup_component(Component) of + {ok, Mod} -> case flatten_errors(Mod:validate_clear(Component, Key)) of + ok -> mnesia_clear(Component, Key), + Mod:notify_clear(Component, Key), + ok; + E -> E + end; + E -> E + end. + +mnesia_clear(Component, Key) -> + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = mnesia:delete(?TABLE, {Component, Key}, write) + end). + +list() -> + [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. + +list(Component) -> list(Component, []). +list_strict(Component) -> list(Component, not_found). + +list(Component, Default) -> + case lookup_component(Component) of + {ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'}, + [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; + _ -> Default + end. + +list_formatted() -> + [pset(value, format(pget(value, P)), P) || P <- list()]. + +lookup(Component, Key) -> + case lookup0(Component, Key, rabbit_misc:const(not_found)) of + not_found -> not_found; + Params -> p(Params) + end. + +value(Component, Key) -> + case lookup0(Component, Key, rabbit_misc:const(not_found)) of + not_found -> not_found; + Params -> Params#runtime_parameters.value + end. + +value(Component, Key, Default) -> + Params = lookup0(Component, Key, + fun () -> lookup_missing(Component, Key, Default) end), + Params#runtime_parameters.value. + +lookup0(Component, Key, DefaultFun) -> + case mnesia:dirty_read(?TABLE, {Component, Key}) of + [] -> DefaultFun(); + [R] -> R + end. + +lookup_missing(Component, Key, Default) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read(?TABLE, {Component, Key}, read) of + [] -> Record = c(Component, Key, Default), + mnesia:write(?TABLE, Record, write), + Record; + [R] -> R + end + end). + +c(Component, Key, Default) -> #runtime_parameters{key = {Component, Key}, + value = Default}. + +p(#runtime_parameters{key = {Component, Key}, value = Value}) -> + [{component, Component}, + {key, Key}, + {value, Value}]. + +info_keys() -> [component, key, value]. + +%%--------------------------------------------------------------------------- + +lookup_component(Component) -> + case rabbit_registry:lookup_module( + runtime_parameter, list_to_atom(binary_to_list(Component))) of + {error, not_found} -> {errors, + [{"component ~s not found", [Component]}]}; + {ok, Module} -> {ok, Module} + end. + +parse(Src0) -> + Src1 = string:strip(Src0), + Src = case lists:reverse(Src1) of + [$. |_] -> Src1; + _ -> Src1 ++ "." + end, + case erl_scan:string(Src) of + {ok, Scanned, _} -> + case erl_parse:parse_term(Scanned) of + {ok, Parsed} -> + {ok, Parsed}; + {error, E} -> + {errors, + [{"Could not parse value: ~s", [format_parse_error(E)]}]} + end; + {error, E, _} -> + {errors, [{"Could not scan value: ~s", [format_parse_error(E)]}]} + end. + +format_parse_error({_Line, Mod, Err}) -> + lists:flatten(Mod:format_error(Err)). + +format(Term) -> + list_to_binary(rabbit_misc:format("~p", [Term])). + +%%--------------------------------------------------------------------------- + +%% We will want to be able to biject these to JSON. So we have some +%% generic restrictions on what we consider acceptable. +validate(Proplist = [T | _]) when is_tuple(T) -> validate_proplist(Proplist); +validate(L) when is_list(L) -> validate_list(L); +validate(T) when is_tuple(T) -> {error, "tuple: ~p", [T]}; +validate(B) when is_boolean(B) -> ok; +validate(null) -> ok; +validate(A) when is_atom(A) -> {error, "atom: ~p", [A]}; +validate(N) when is_number(N) -> ok; +validate(B) when is_binary(B) -> ok; +validate(B) when is_bitstring(B) -> {error, "bitstring: ~p", [B]}. + +validate_list(L) -> [validate(I) || I <- L]. +validate_proplist(L) -> [vp(I) || I <- L]. + +vp({K, V}) when is_binary(K) -> validate(V); +vp({K, _V}) -> {error, "bad key: ~p", [K]}; +vp(H) -> {error, "not two tuple: ~p", [H]}. + +flatten_errors(L) -> + case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of + [] -> ok; + E -> {errors, E} + end. diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl new file mode 100644 index 00000000..f23b3227 --- /dev/null +++ b/src/rabbit_runtime_parameters_test.erl @@ -0,0 +1,38 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_runtime_parameters_test). +-behaviour(rabbit_runtime_parameter). + +-export([validate/3, validate_clear/2, notify/3, notify_clear/2]). +-export([register/0, unregister/0]). + +register() -> + rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE). + +unregister() -> + rabbit_registry:unregister(runtime_parameter, <<"test">>). + +validate(<<"test">>, <<"good">>, _Term) -> ok; +validate(<<"test">>, <<"maybe">>, <<"good">>) -> ok; +validate(<<"test">>, _, _) -> {error, "meh", []}. + +validate_clear(<<"test">>, <<"good">>) -> ok; +validate_clear(<<"test">>, <<"maybe">>) -> ok; +validate_clear(<<"test">>, _) -> {error, "meh", []}. + +notify(_, _, _) -> ok. +notify_clear(_, _) -> ok. diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index bf2b4798..f142d233 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -19,6 +19,8 @@ -behaviour(supervisor). -export([start_link/0, start_child/1, start_child/2, start_child/3, + start_supervisor_child/1, start_supervisor_child/2, + start_supervisor_child/3, start_restartable_child/1, start_restartable_child/2, stop_child/1]). -export([init/1]). @@ -33,7 +35,11 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_child/1 :: (atom()) -> 'ok'). +-spec(start_child/2 :: (atom(), [any()]) -> 'ok'). -spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok'). +-spec(start_supervisor_child/1 :: (atom()) -> 'ok'). +-spec(start_supervisor_child/2 :: (atom(), [any()]) -> 'ok'). +-spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(start_restartable_child/1 :: (atom()) -> 'ok'). -spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok'). -spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())). @@ -42,22 +48,29 @@ %%---------------------------------------------------------------------------- -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). -start_child(Mod) -> - start_child(Mod, []). +start_child(Mod) -> start_child(Mod, []). -start_child(Mod, Args) -> - start_child(Mod, Mod, Args). +start_child(Mod, Args) -> start_child(Mod, Mod, Args). start_child(ChildId, Mod, Args) -> - child_reply(supervisor:start_child(?SERVER, - {ChildId, {Mod, start_link, Args}, - transient, ?MAX_WAIT, worker, [Mod]})). + child_reply(supervisor:start_child( + ?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]})). + +start_supervisor_child(Mod) -> start_supervisor_child(Mod, []). + +start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args). + +start_supervisor_child(ChildId, Mod, Args) -> + child_reply(supervisor:start_child( + ?SERVER, + {ChildId, {Mod, start_link, Args}, + transient, infinity, supervisor, [Mod]})). -start_restartable_child(Mod) -> - start_restartable_child(Mod, []). +start_restartable_child(Mod) -> start_restartable_child(Mod, []). start_restartable_child(Mod, Args) -> Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), @@ -73,8 +86,7 @@ stop_child(ChildId) -> E -> E end. -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. +init([]) -> {ok, {{one_for_all, 0, 1}, []}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 04ee6ef2..bb60bd12 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -29,6 +29,7 @@ -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). +-define(TIMEOUT, 5000). all_tests() -> passed = gm_tests:all_tests(), @@ -50,9 +51,10 @@ all_tests() -> passed = test_app_management(), passed = test_log_management_during_startup(), passed = test_statistics(), - passed = test_option_parser(), + passed = test_arguments_parser(), passed = test_cluster_management(), passed = test_user_management(), + passed = test_runtime_parameters(), passed = test_server_status(), passed = test_confirms(), passed = maybe_run_cluster_dependent_tests(), @@ -616,8 +618,8 @@ test_topic_matching() -> exchange_op_callback(X, Fun, Args) -> rabbit_misc:execute_mnesia_transaction( - fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end), - rabbit_exchange:callback(X, Fun, [none, X] ++ Args). + fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end), + rabbit_exchange:callback(X, Fun, none, [X] ++ Args). test_topic_expect_match(X, List) -> lists:foreach( @@ -800,27 +802,57 @@ test_log_management_during_startup() -> ok = control_action(start_app, []), passed. -test_option_parser() -> - %% command and arguments should just pass through - ok = check_get_options({["mock_command", "arg1", "arg2"], []}, - [], ["mock_command", "arg1", "arg2"]), +test_arguments_parser() -> + GlobalOpts1 = [{"-f1", flag}, {"-o1", {option, "foo"}}], + Commands1 = [command1, {command2, [{"-f2", flag}, {"-o2", {option, "bar"}}]}], - %% get flags - ok = check_get_options( - {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]}, - [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]), - - %% get options - ok = check_get_options( - {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]}, - [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}], - ["mock_command", "-foo", "bar"]), + GetOptions = + fun (Args) -> + rabbit_misc:parse_arguments(Commands1, GlobalOpts1, Args) + end, - %% shuffled and interleaved arguments and options - ok = check_get_options( - {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]}, - [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}], - ["-f", "a1", "-o1", "hello", "a2", "a3"]), + check_parse_arguments(no_command, GetOptions, []), + check_parse_arguments(no_command, GetOptions, ["foo", "bar"]), + check_parse_arguments( + {ok, {command1, [{"-f1", false}, {"-o1", "foo"}], []}}, + GetOptions, ["command1"]), + check_parse_arguments( + {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}}, + GetOptions, ["command1", "-o1", "blah"]), + check_parse_arguments( + {ok, {command1, [{"-f1", true}, {"-o1", "foo"}], []}}, + GetOptions, ["command1", "-f1"]), + check_parse_arguments( + {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}}, + GetOptions, ["-o1", "blah", "command1"]), + check_parse_arguments( + {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], ["quux"]}}, + GetOptions, ["-o1", "blah", "command1", "quux"]), + check_parse_arguments( + {ok, {command1, [{"-f1", true}, {"-o1", "blah"}], ["quux", "baz"]}}, + GetOptions, ["command1", "quux", "-f1", "-o1", "blah", "baz"]), + %% For duplicate flags, the last one counts + check_parse_arguments( + {ok, {command1, [{"-f1", false}, {"-o1", "second"}], []}}, + GetOptions, ["-o1", "first", "command1", "-o1", "second"]), + %% If the flag "eats" the command, the command won't be recognised + check_parse_arguments(no_command, GetOptions, + ["-o1", "command1", "quux"]), + %% If a flag eats another flag, the eaten flag won't be recognised + check_parse_arguments( + {ok, {command1, [{"-f1", false}, {"-o1", "-f1"}], []}}, + GetOptions, ["command1", "-o1", "-f1"]), + + %% Now for some command-specific flags... + check_parse_arguments( + {ok, {command2, [{"-f1", false}, {"-f2", false}, + {"-o1", "foo"}, {"-o2", "bar"}], []}}, + GetOptions, ["command2"]), + + check_parse_arguments( + {ok, {command2, [{"-f1", false}, {"-f2", true}, + {"-o1", "baz"}, {"-o2", "bar"}], ["quux", "foo"]}}, + GetOptions, ["-f2", "command2", "quux", "-o1", "baz", "foo"]), passed. @@ -1097,6 +1129,38 @@ test_user_management() -> passed. +test_runtime_parameters() -> + rabbit_runtime_parameters_test:register(), + Good = fun(L) -> ok = control_action(set_parameter, L) end, + Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end, + + %% Acceptable for bijection + Good(["test", "good", "<<\"ignore\">>"]), + Good(["test", "good", "123"]), + Good(["test", "good", "true"]), + Good(["test", "good", "false"]), + Good(["test", "good", "null"]), + Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]), + + %% Various forms of fail due to non-bijectability + Bad(["test", "good", "atom"]), + Bad(["test", "good", "{tuple, foo}"]), + Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]), + Bad(["test", "good", "[{key, <<\"value\">>}]"]), + + %% Test actual validation hook + Good(["test", "maybe", "<<\"good\">>"]), + Bad(["test", "maybe", "<<\"bad\">>"]), + + ok = control_action(list_parameters, []), + + ok = control_action(clear_parameter, ["test", "good"]), + ok = control_action(clear_parameter, ["test", "maybe"]), + {error_string, _} = + control_action(clear_parameter, ["test", "neverexisted"]), + rabbit_runtime_parameters_test:unregister(), + passed. + test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), @@ -1177,7 +1241,7 @@ test_spawn() -> rabbit_limiter:make_token(self())), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok - after 1000 -> throw(failed_to_receive_channel_open_ok) + after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) end, {Writer, Ch}. @@ -1198,7 +1262,7 @@ test_spawn_remote() -> end end), receive Res -> Res - after 1000 -> throw(failed_to_receive_result) + after ?TIMEOUT -> throw(failed_to_receive_result) end. user(Username) -> @@ -1218,13 +1282,10 @@ test_confirms() -> queue = Q0, exchange = <<"amq.direct">>, routing_key = "magic" }), - receive #'queue.bind_ok'{} -> - Q0 - after 1000 -> - throw(failed_to_bind_queue) + receive #'queue.bind_ok'{} -> Q0 + after ?TIMEOUT -> throw(failed_to_bind_queue) end - after 1000 -> - throw(failed_to_declare_queue) + after ?TIMEOUT -> throw(failed_to_declare_queue) end end, %% Declare and bind two queues @@ -1237,7 +1298,7 @@ test_confirms() -> rabbit_channel:do(Ch, #'confirm.select'{}), receive #'confirm.select_ok'{} -> ok - after 1000 -> throw(failed_to_enable_confirms) + after ?TIMEOUT -> throw(failed_to_enable_confirms) end, %% Publish a message rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, @@ -1254,7 +1315,7 @@ test_confirms() -> receive #'basic.nack'{} -> ok; #'basic.ack'{} -> throw(received_ack_instead_of_nack) - after 2000 -> throw(did_not_receive_nack) + after ?TIMEOUT-> throw(did_not_receive_nack) end, receive #'basic.ack'{} -> throw(received_ack_when_none_expected) @@ -1264,7 +1325,7 @@ test_confirms() -> rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), receive #'queue.delete_ok'{} -> ok - after 1000 -> throw(failed_to_cleanup_queue) + after ?TIMEOUT -> throw(failed_to_cleanup_queue) end, unlink(Ch), ok = rabbit_channel:shutdown(Ch), @@ -1287,7 +1348,7 @@ test_statistics_receive_event1(Ch, Matcher) -> true -> Props; _ -> test_statistics_receive_event1(Ch, Matcher) end - after 1000 -> throw(failed_to_receive_event) + after ?TIMEOUT -> throw(failed_to_receive_event) end. test_statistics() -> @@ -1299,9 +1360,8 @@ test_statistics() -> %% Set up a channel and queue {_Writer, Ch} = test_spawn(), rabbit_channel:do(Ch, #'queue.declare'{}), - QName = receive #'queue.declare_ok'{queue = Q0} -> - Q0 - after 1000 -> throw(failed_to_receive_queue_declare_ok) + QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), QPid = Q#amqqueue.pid, @@ -1381,7 +1441,7 @@ expect_event(Pid, Type) -> Pid -> ok; _ -> expect_event(Pid, Type) end - after 1000 -> throw({failed_to_receive_event, Type}) + after ?TIMEOUT -> throw({failed_to_receive_event, Type}) end. test_delegates_async(SecondaryNode) -> @@ -1405,7 +1465,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> fun () -> receive Msg -> FMsg(Msg) - after 1000 -> throw(Throw) + after ?TIMEOUT -> throw(Throw) end end. @@ -1418,9 +1478,7 @@ await_response(Count) -> receive response -> ok, await_response(Count - 1) - after 1000 -> - io:format("Async reply not received~n"), - throw(timeout) + after ?TIMEOUT -> throw(timeout) end. must_exit(Fun) -> @@ -1487,7 +1545,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> ok - after 1000 -> throw(failed_to_receive_queue_declare_ok) + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, rabbit_channel:shutdown(Ch), rabbit:stop(), @@ -1498,8 +1556,7 @@ test_queue_cleanup(_SecondaryNode) -> receive #'channel.close'{reply_code = ?NOT_FOUND} -> ok - after 2000 -> - throw(failed_to_receive_channel_exit) + after ?TIMEOUT -> throw(failed_to_receive_channel_exit) end, rabbit_channel:shutdown(Ch2), passed. @@ -1526,8 +1583,7 @@ test_declare_on_dead_queue(SecondaryNode) -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), {ok, 0} = rabbit_amqqueue:delete(Q, false, false), passed - after 2000 -> - throw(failed_to_create_and_kill_queue) + after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) end. %%--------------------------------------------------------------------- @@ -1540,7 +1596,7 @@ control_action(Command, Args, NewOpts) -> expand_options(default_options(), NewOpts)). control_action(Command, Node, Args, Opts) -> - case catch rabbit_control:action( + case catch rabbit_control_main:action( Command, Node, Args, Opts, fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) @@ -1572,10 +1628,13 @@ expand_options(As, Bs) -> end end, Bs, As). -check_get_options({ExpArgs, ExpOpts}, Defs, Args) -> - {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args), - true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order - ok. +check_parse_arguments(ExpRes, Fun, As) -> + SortRes = + fun (no_command) -> no_command; + ({ok, {C, KVs, As1}}) -> {ok, {C, lists:sort(KVs), As1}} + end, + + true = SortRes(ExpRes) =:= SortRes(Fun(As)). empty_files(Files) -> [case file:read_file_info(File) of @@ -1755,7 +1814,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) -> Pid); stop -> done - after (case Awaiting of [] -> 200; _ -> 1000 end) -> + after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) -> case Awaiting of [] -> Pid ! {self(), arrived}, on_disk_capture(); _ -> Pid ! {self(), timeout} @@ -2308,7 +2367,7 @@ wait_for_confirms(Unconfirmed) -> wait_for_confirms( rabbit_misc:gb_sets_difference( Unconfirmed, gb_sets:from_list(Confirmed))) - after 5000 -> exit(timeout_waiting_for_confirm) + after ?TIMEOUT -> exit(timeout_waiting_for_confirm) end end. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 9f2535bd..18704807 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -36,6 +36,10 @@ -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). -rabbit_upgrade({topic_trie_node, mnesia, []}). +-rabbit_upgrade({runtime_parameters, mnesia, []}). +-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}). +-rabbit_upgrade({policy, mnesia, + [exchange_scratches, ha_mirrors]}). %% ------------------------------------------------------------------- @@ -56,6 +60,8 @@ -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). -spec(topic_trie_node/0 :: () -> 'ok'). +-spec(runtime_parameters/0 :: () -> 'ok'). +-spec(policy/0 :: () -> 'ok'). -endif. @@ -185,6 +191,55 @@ topic_trie_node() -> {attributes, [trie_node, edge_count, binding_count]}, {type, ordered_set}]). +runtime_parameters() -> + create(rabbit_runtime_parameters, + [{record_name, runtime_parameters}, + {attributes, [key, value]}, + {disc_copies, [node()]}]). + +exchange_scratches() -> + ok = exchange_scratches(rabbit_exchange), + ok = exchange_scratches(rabbit_durable_exchange). + +exchange_scratches(Table) -> + transform( + Table, + fun ({exchange, Name, Type = <<"x-federation">>, Dur, AutoDel, Int, Args, + Scratch}) -> + Scratches = orddict:store(federation, Scratch, orddict:new()), + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}; + %% We assert here that nothing else uses the scratch mechanism ATM + ({exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, undefined} + end, + [name, type, durable, auto_delete, internal, arguments, scratches]). + +policy() -> + ok = exchange_policy(rabbit_exchange), + ok = exchange_policy(rabbit_durable_exchange), + ok = queue_policy(rabbit_queue), + ok = queue_policy(rabbit_durable_queue). + +exchange_policy(Table) -> + transform( + Table, + fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) -> + {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches, + undefined} + end, + [name, type, durable, auto_delete, internal, arguments, scratches, + policy]). + +queue_policy(Table) -> + transform( + Table, + fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) -> + {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes, + undefined} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, + slave_pids, mirror_nodes, policy]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 209e5252..49213c95 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -323,7 +323,6 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -335,6 +334,13 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). +%% The compiler (rightfully) complains that ack() and state() are +%% unused. For this reason we duplicate a -spec from +%% rabbit_backing_queue with the only intent being to remove +%% warnings. The problem here is that we can't parameterise the BQ +%% behaviour by these two types as we would like to. We still leave +%% these here for documentation purposes. +-type(ack() :: seq_id()). -type(state() :: #vqstate { q1 :: ?QUEUE:?QUEUE(), q2 :: ?QUEUE:?QUEUE(), @@ -368,6 +374,8 @@ ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). +%% Duplicated from rabbit_backing_queue +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(multiple_routing_keys/0 :: () -> 'ok'). @@ -1458,16 +1466,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. - S1 -> {_, State2} = - lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> - ReduceFun(QuotaN, StateN) - end, - {S1, State}, - case (AvgAckIngress - AvgAckEgress) > - (AvgIngress - AvgEgress) of - true -> [AckFun, AlphaBetaFun]; - false -> [AlphaBetaFun, AckFun] - end), + S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress)) of + true -> [AckFun, AlphaBetaFun]; + false -> [AlphaBetaFun, AckFun] + end, + {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, {S1, State}, Funs), {true, State2} end, diff --git a/src/supervisor2.erl b/src/supervisor2.erl index f1b74878..3d3623d7 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -411,6 +411,8 @@ handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> #child{mfa = {M, F, A}} = hd(State#state.children), Args = A ++ EArgs, case do_start_child_i(M, F, Args) of + {ok, undefined} -> + {reply, {ok, undefined}, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, @@ -743,6 +745,8 @@ restart(Strategy, Child, State, Restart) #child{mfa = {M, F, A}} = Child, Dynamics = ?DICT:erase(Child#child.pid, State#state.dynamics), case do_start_child_i(M, F, A) of + {ok, undefined} -> + {ok, State}; {ok, Pid} -> NState = State#state{dynamics = ?DICT:store(Pid, A, Dynamics)}, {ok, NState}; |