summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-06-27 12:25:25 +0100
committerTim Watson <tim@rabbitmq.com>2012-06-27 12:25:25 +0100
commitf9934df40d8490e4fce36be5aceec3c4be152292 (patch)
treefbd9c7c202f618a8b76aa7ce4fd97b63efc8aa58
parent811b2095f8316260e38e6ee58937e04219952f97 (diff)
parentf07902ee61e62c9de8de8ecd594248bf0da85b6c (diff)
downloadrabbitmq-server-bug24912.tar.gz
merge defaultbug24912
-rw-r--r--docs/rabbitmqctl.1.xml2
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--packaging/RPMS/Fedora/Makefile1
-rw-r--r--packaging/common/rabbitmq-script-wrapper4
-rw-r--r--packaging/debs/Debian/Makefile1
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init20
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server34
-rwxr-xr-xscripts/rabbitmq-server.bat31
-rwxr-xr-xscripts/rabbitmq-service.bat27
-rwxr-xr-xscripts/rabbitmqctl2
-rwxr-xr-xscripts/rabbitmqctl.bat4
-rw-r--r--src/app_utils.erl121
-rw-r--r--src/gm.erl286
-rw-r--r--src/rabbit.erl137
-rw-r--r--src/rabbit_alarm.erl12
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_control_main.erl (renamed from src/rabbit_control.erl)49
-rw-r--r--src/rabbit_disk_monitor.erl11
-rw-r--r--src/rabbit_file.erl17
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
-rw-r--r--src/rabbit_misc.erl68
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_plugins.erl383
-rw-r--r--src/rabbit_plugins_main.erl273
-rw-r--r--src/rabbit_prelaunch.erl214
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_tests.erl49
-rw-r--r--src/rabbit_variable_queue.erl18
35 files changed, 962 insertions, 901 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3eb83c88..d6f6d51f 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -720,7 +720,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index b7d14f20..523b54ce 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,9 +19,11 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
- {disk_free_limit, {mem_relative, 1.0}},
+ {disk_free_limit, 1000000000}, %% 1GB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
{frame_max, 131072},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 180500ed..03e513f8 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -42,6 +42,7 @@ ifeq "$(RPM_OS)" "fedora"
SOURCES/rabbitmq-server.init
endif
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@||' \
SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0e59c218..e832aed6 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -29,7 +29,9 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
-if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
+if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then
+ /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@
+elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
elif [ `id -u` = 0 ] ; then
@SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 844388c6..1e4bf755 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -23,6 +23,7 @@ package: clean
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@| > "/var/log/rabbitmq/startup_log" 2> "/var/log/rabbitmq/startup_err"|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index e935acf5..943ed48f 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
-Uploader: Emile Joubert <emile@rabbitmq.com>
+Uploaders: Emile Joubert <emile@rabbitmq.com>
DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
index f514b974..b2d3f86a 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.init
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -60,10 +60,7 @@ start_rabbitmq () {
set +e
RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \
--chuid rabbitmq --start --exec $DAEMON \
- --pidfile "$RABBITMQ_PID_FILE" \
- > "${INIT_LOG_DIR}/startup_log" \
- 2> "${INIT_LOG_DIR}/startup_err" \
- 0<&- &
+ --pidfile "$RABBITMQ_PID_FILE" --background
$CONTROL wait $PID_FILE >/dev/null 2>&1
RETVAL=$?
set -e
@@ -137,13 +134,18 @@ restart_end() {
start_stop_end() {
case "$RETVAL" in
0)
- log_end_msg 0;;
+ [ -x /sbin/initctl ] && /sbin/initctl emit --no-wait "${NAME}-${1}"
+ log_end_msg 0
+ ;;
3)
log_warning_msg "${DESC} already ${1}"
- log_end_msg 0;;
+ log_end_msg 0
+ RETVAL=0
+ ;;
*)
log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
- log_end_msg 1;;
+ log_end_msg 1
+ ;;
esac
}
@@ -151,7 +153,7 @@ case "$1" in
start)
log_daemon_msg "Starting ${DESC}" $NAME
start_rabbitmq
- start_stop_end "started"
+ start_stop_end "running"
;;
stop)
log_daemon_msg "Stopping ${DESC}" $NAME
@@ -162,7 +164,7 @@ case "$1" in
status_rabbitmq
;;
rotate-logs)
- log_action_begin_msg "Rotating log files for ${DESC} ${NAME}"
+ log_action_begin_msg "Rotating log files for ${DESC}: ${NAME}"
rotate_logs_rabbitmq
log_action_end_msg $RETVAL
;;
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 14a18d57..97c74791 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -31,7 +31,7 @@ exec erl \
-noinput \
-hidden \
-sname rabbitmq-plugins$$ \
- -s rabbit_plugins \
+ -s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 66a900a1..c67a0263 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -43,9 +43,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 0a5a4640..34915b3d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -58,7 +58,8 @@ DEFAULT_NODE_PORT=5672
##--- End of overridden <var_name> variables
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
+[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot "
case "$(uname -s)" in
CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait
@@ -70,24 +71,16 @@ case "$(uname -s)" in
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
- if erl \
- -pa "$RABBITMQ_EBIN_ROOT" \
- -noinput \
- -hidden \
- -s rabbit_prelaunch \
- -sname rabbitmqprelaunch$$ \
- -extra "$RABBITMQ_ENABLED_PLUGINS_FILE" "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
+if ! erl -pa "$RABBITMQ_EBIN_ROOT" \
+ -noinput \
+ -hidden \
+ -s rabbit_prelaunch \
+ -sname rabbitmqprelaunch$$ \
+ -extra "${RABBITMQ_NODENAME}";
then
- RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
- RABBITMQ_EBIN_PATH=""
- else
- exit 1
- fi
-else
- RABBITMQ_BOOT_FILE=start_sasl
- RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+ exit 1;
fi
+
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
@@ -100,10 +93,10 @@ RABBITMQ_LISTEN_ARG=
set -f
exec erl \
- ${RABBITMQ_EBIN_PATH} \
+ -pa ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot ${RABBITMQ_BOOT_FILE} \
+ -boot start_sasl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
@@ -112,6 +105,9 @@ exec erl \
-sasl sasl_error_logger false \
-rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
+ -rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
+ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
+ -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
-os_mon start_cpu_sup false \
-os_mon start_disksup false \
-os_mon start_memsup false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index ca49a5d8..167f272e 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -86,25 +86,24 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- "!RABBITMQ_NODENAME!"
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+ -pa "!RABBITMQ_EBIN_ROOT!" ^
+ -noinput -hidden ^
+ -s rabbit_prelaunch ^
+ -sname rabbitmqprelaunch!RANDOM! ^
+ -extra "!RABBITMQ_NODENAME!"
+
if ERRORLEVEL 1 (
exit /B 1
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_PATH="-pa !RABBITMQ_EBIN_ROOT!"
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -124,9 +123,10 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
"!ERLANG_HOME!\bin\erl.exe" ^
-!RABBITMQ_EBIN_PATH! ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
+W w ^
@@ -139,6 +139,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 9e274840..4758c861 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -154,24 +154,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- ""
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if ERRORLEVEL 1 (
- exit /B 1
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -191,8 +178,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
set ERLANG_SERVICE_ARGUMENTS= ^
-!RABBITMQ_EBIN_PATH! ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
+W w ^
+A30 ^
@@ -204,6 +192,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 4aad6b8f..a5fade72 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -32,6 +32,6 @@ exec erl \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
- -s rabbit_control \
+ -s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index f37fae48..9f549f1e 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -34,7 +34,7 @@ if "!RABBITMQ_NODENAME!"=="" (
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -43,7 +43,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
new file mode 100644
index 00000000..4bef83a5
--- /dev/null
+++ b/src/app_utils.erl
@@ -0,0 +1,121 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+-module(app_utils).
+
+-export([load_applications/1, start_applications/1,
+ stop_applications/1, app_dependency_order/2,
+ wait_for_applications/1]).
+
+-ifdef(use_specs).
+
+-spec load_applications([atom()]) -> 'ok'.
+-spec start_applications([atom()]) -> 'ok'.
+-spec stop_applications([atom()]) -> 'ok'.
+-spec wait_for_applications([atom()]) -> 'ok'.
+-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+
+-endif.
+
+%%---------------------------------------------------------------------------
+%% Public API
+
+load_applications(Apps) ->
+ load_applications(queue:from_list(Apps), sets:new()),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
+wait_for_applications(Apps) ->
+ [wait_for_application(App) || App <- Apps], ok.
+
+app_dependency_order(RootApps, StripUnreachable) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ [{App, app_dependencies(App)} ||
+ {App, _Desc, _Vsn} <- application:loaded_applications()]),
+ try
+ case StripUnreachable of
+ true -> digraph:del_vertices(G, digraph:vertices(G) --
+ digraph_utils:reachable(RootApps, G));
+ false -> ok
+ end,
+ digraph_utils:topsort(G)
+ after
+ true = digraph:delete(G)
+ end.
+
+%%---------------------------------------------------------------------------
+%% Private API
+
+wait_for_application(Application) ->
+ case lists:keymember(Application, 1, application:which_applications()) of
+ true -> ok;
+ false -> timer:sleep(1000),
+ wait_for_application(Application)
+ end.
+
+load_applications(Worklist, Loaded) ->
+ case queue:out(Worklist) of
+ {empty, _WorkList} ->
+ ok;
+ {{value, App}, Worklist1} ->
+ case sets:is_element(App, Loaded) of
+ true -> load_applications(Worklist1, Loaded);
+ false -> case application:load(App) of
+ ok -> ok;
+ {error, {already_loaded, App}} -> ok;
+ Error -> throw(Error)
+ end,
+ load_applications(
+ queue:join(Worklist1,
+ queue:from_list(app_dependencies(App))),
+ sets:add_element(App, Loaded))
+ end
+ end.
+
+app_dependencies(App) ->
+ case application:get_key(App, applications) of
+ undefined -> [];
+ {ok, Lst} -> Lst
+ end.
+
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
diff --git a/src/gm.erl b/src/gm.erl
index 01300f18..f88ed18f 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -433,51 +433,47 @@
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(group_members/1 :: (pid()) -> [pid()]).
-%% The joined, members_changed and handle_msg callbacks can all
-%% return any of the following terms:
+%% The joined, members_changed and handle_msg callbacks can all return
+%% any of the following terms:
%%
%% 'ok' - the callback function returns normally
%%
-%% {'stop', Reason} - the callback indicates the member should
-%% stop with reason Reason and should leave the group.
+%% {'stop', Reason} - the callback indicates the member should stop
+%% with reason Reason and should leave the group.
%%
-%% {'become', Module, Args} - the callback indicates that the
-%% callback module should be changed to Module and that the
-%% callback functions should now be passed the arguments
-%% Args. This allows the callback module to be dynamically
-%% changed.
+%% {'become', Module, Args} - the callback indicates that the callback
+%% module should be changed to Module and that the callback functions
+%% should now be passed the arguments Args. This allows the callback
+%% module to be dynamically changed.
-%% Called when we've successfully joined the group. Supplied with
-%% Args provided in start_link, plus current group members.
+%% Called when we've successfully joined the group. Supplied with Args
+%% provided in start_link, plus current group members.
-callback joined(Args :: term(), Members :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Supplied with Args provided in start_link, the list of new
-%% members and the list of members previously known to us that
-%% have since died. Note that if a member joins and dies very
-%% quickly, it's possible that we will never see that member
-%% appear in either births or deaths. However we are guaranteed
-%% that (1) we will see a member joining either in the births
-%% here, or in the members passed to joined/2 before receiving
-%% any messages from it; and (2) we will not see members die that
-%% we have not seen born (or supplied in the members to
-%% joined/2).
+%% Supplied with Args provided in start_link, the list of new members
+%% and the list of members previously known to us that have since
+%% died. Note that if a member joins and dies very quickly, it's
+%% possible that we will never see that member appear in either births
+%% or deaths. However we are guaranteed that (1) we will see a member
+%% joining either in the births here, or in the members passed to
+%% joined/2 before receiving any messages from it; and (2) we will not
+%% see members die that we have not seen born (or supplied in the
+%% members to joined/2).
-callback members_changed(Args :: term(), Births :: [pid()],
Deaths :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
-%% message. This does get called for messages injected by this
-%% member, however, in such cases, there is no special
-%% significance of this invocation: it does not indicate that the
-%% message has made it to any other members, let alone all other
-%% members.
+%% message. This does get called for messages injected by this member,
+%% however, in such cases, there is no special significance of this
+%% invocation: it does not indicate that the message has made it to
+%% any other members, let alone all other members.
-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Called on gm member termination as per rules in gen_server,
-%% with the Args provided in start_link plus the termination
-%% Reason.
+%% Called on gm member termination as per rules in gen_server, with
+%% the Args provided in start_link plus the termination Reason.
-callback terminate(Args :: term(), Reason :: term()) ->
ok | term().
@@ -533,7 +529,7 @@ init([GroupName, Module, Args]) ->
group_name = GroupName,
module = Module,
view = undefined,
- pub_count = 0,
+ pub_count = -1,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
@@ -562,7 +558,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -575,33 +571,39 @@ handle_call({add_on_right, NewMember}, _From,
members_state = MembersState,
module = Module,
callback_args = Args }) ->
- Group = record_new_member_in_group(
- GroupName, Self, NewMember,
- fun (Group1) ->
- View1 = group_to_view(Group1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(
- MembersState)})
- end),
+ {MembersState1, Group} =
+ record_new_member_in_group(
+ GroupName, Self, NewMember,
+ fun (Group1) ->
+ View1 = group_to_view(Group1),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self,
+ prepare_members_state(MembersState1)}),
+ MembersState1
+ end),
View2 = group_to_view(Group),
- State1 = check_neighbours(State #state { view = View2 }),
+ State1 = check_neighbours(State #state { view = View2,
+ members_state = MembersState1 }),
Result = callback_view_changed(Args, Module, View, View2),
handle_callback_result({Result, {ok, Group}, State1}).
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
+ members_state = MembersState,
group_name = GroupName,
module = Module,
callback_args = Args }) ->
{Result, State1} =
case needs_view_update(ReqVer, View) of
- true ->
- View1 = group_to_view(read_group(GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- check_neighbours(State #state { view = View1 })};
- false ->
- {ok, State}
+ true -> View1 = group_to_view(read_group(GroupName)),
+ MemberState1 = remove_erased_members(MembersState, View1),
+ {callback_view_changed(Args, Module, View, View1),
+ check_neighbours(
+ State #state { view = View1,
+ members_state = MemberState1 })};
+ false -> {ok, State}
end,
handle_callback_result(
if_callback_success(
@@ -645,7 +647,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -659,28 +661,29 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
group_to_view(record_dead_member_in_group(Member, GroupName)),
- State1 = State #state { view = View1 },
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
- maybe_erase_aliases(
- State1 #state {
+ {Result1, State1} = maybe_erase_aliases(State, View1),
+ {Result1, State1 #state {
members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) });
+ confirms = purge_confirms(Confirms) }};
_ ->
%% here we won't be pointing out any deaths:
%% the concern is that there maybe births
%% which we'd otherwise miss.
{callback_view_changed(Args, Module, View, View1),
- State1}
+ check_neighbours(State #state { view = View1 })}
end,
- handle_callback_result({Result, check_neighbours(State2)})
+ handle_callback_result({Result, State2})
end.
@@ -693,9 +696,13 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) -> 1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
-prioritise_info(_ , _State) -> 0.
+prioritise_info(flush, _State) ->
+ 1;
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+ #state { members_state = MS }) when MS /= undefined ->
+ 1;
+prioritise_info(_, _State) ->
+ 0.
handle_msg(check_neighbours, State) ->
@@ -795,8 +802,8 @@ handle_msg({activity, Left, Activity},
State1 = State #state { members_state = MembersState1,
confirms = Confirms1 },
Activity3 = activity_finalise(Activity1),
- {Result, State2} = maybe_erase_aliases(State1),
- ok = maybe_send_activity(Activity3, State2),
+ ok = maybe_send_activity(Activity3, State1),
+ {Result, State2} = maybe_erase_aliases(State1, View),
if_callback_success(
Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
@@ -829,13 +836,14 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
+ PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
- Buffer1 = [{PubCount, Msg} | Buffer],
+ Buffer1 = [{PubCount1, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
- _ -> queue:in({PubCount, From}, Confirms)
+ _ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount + 1,
+ State1 = State #state { pub_count = PubCount1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
case From =/= none of
@@ -850,14 +858,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
- broadcast_buffer = Buffer }) ->
+ broadcast_buffer = Buffer,
+ pub_count = PubCount }) ->
+ [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
- Member #member { pending_ack = PA1 }
+ Member #member { pending_ack = PA1,
+ last_pub = PubCount }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [] }.
@@ -867,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -890,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1052,7 +1057,7 @@ record_dead_member_in_group(Member, GroupName) ->
Group.
record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, Group} =
+ {atomic, {Result, Group}} =
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
@@ -1062,11 +1067,11 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
Members1 = Prefix ++ [Left, NewMember | Suffix],
Group2 = Group1 #gm_group { members = Members1,
version = Ver + 1 },
- ok = Fun(Group2),
+ Result = Fun(Group2),
mnesia:write(Group2),
- Group2
+ {Result, Group2}
end),
- Group.
+ {Result, Group}.
erase_members_in_group(Members, GroupName) ->
DeadMembers = [{dead, Id} || Id <- Members],
@@ -1089,10 +1094,10 @@ erase_members_in_group(Members, GroupName) ->
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
- view = View,
+ view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1107,11 +1112,11 @@ maybe_erase_aliases(State = #state { self = Self,
end, {[], MembersState}, Aliases),
State1 = State #state { members_state = MembersState1 },
case Erasable of
- [] -> {ok, State1};
+ [] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
erase_members_in_group(Erasable, GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- State1 #state { view = View1 }}
+ {callback_view_changed(Args, Module, View0, View1),
+ check_neighbours(State1 #state { view = View1 })}
end.
can_erase_view_member(Self, Self, _LA, _LP) -> false;
@@ -1141,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1233,23 +1236,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1257,6 +1256,12 @@ make_member(GroupName) ->
{error, not_found} -> ?VERSION_START
end, self()}.
+remove_erased_members(MembersState, View) ->
+ lists:foldl(fun (Id, MembersState1) ->
+ store_member(Id, find_member_or_blank(Id, MembersState),
+ MembersState1)
+ end, blank_member_state(), all_known_members(View)).
+
get_pid({_Version, Pid}) -> Pid.
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
@@ -1265,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1287,16 +1288,30 @@ send_right(Right, View, Msg) ->
ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
- lists:foldl(
- fun ({Id, Pubs, _Acks}, ok) ->
- lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, get_pid(Id), Pub);
- (_, Error) ->
- Error
- end, ok, Pubs);
- (_, Error) ->
- Error
- end, ok, Activity).
+ Result =
+ lists:foldl(
+ fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
+ lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
+ case Module2:handle_msg(
+ Args2, get_pid(Id), Pub) of
+ ok ->
+ Acc;
+ {become, Module3, Args3} ->
+ {Args3, Module3, ok};
+ {stop, _Reason} = Error ->
+ Error
+ end;
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args1, Module1, ok}, Pubs);
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args, Module, ok}, Activity),
+ case Result of
+ {Args, Module, ok} -> ok;
+ {Args1, Module1, ok} -> {become, Module1, Args1};
+ {stop, _Reason} = Error -> Error
+ end.
callback_view_changed(Args, Module, OldView, NewView) ->
OldMembers = all_known_members(OldView),
@@ -1364,34 +1379,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index df009529..fda489fe 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,9 +18,9 @@
-behaviour(application).
--export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0,
- status/0, is_running/0, is_running/1, environment/0,
- rotate_logs/1, force_event_refresh/0]).
+-export([start/0, boot/0, stop/0,
+ stop_and_halt/0, await_startup/0, status/0, is_running/0,
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -199,7 +199,8 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
+ ssl_connection, ssl_record, gen_fsm, ssl]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -216,11 +217,11 @@
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
--spec(maybe_hipe_compile/0 :: () -> 'ok').
--spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
+-spec(boot/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> no_return()).
+-spec(await_startup/0 :: () -> 'ok').
-spec(status/0 ::
() -> [{pid, integer()} |
{running_applications, [{atom(), string(), string()}]} |
@@ -263,7 +264,7 @@ maybe_hipe_compile() ->
hipe_compile() ->
Count = length(?HIPE_WORTHY),
- io:format("HiPE compiling: |~s|~n |",
+ io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
PidMRefs = [spawn_monitor(fun () -> [begin
@@ -285,29 +286,51 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
split0([], Ls) -> Ls;
split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
-prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_upgrade:maybe_upgrade_mnesia().
+ensure_application_loaded() ->
+ %% We end up looking at the rabbit app's env for HiPE and log
+ %% handling, so it needs to be loaded. But during the tests, it
+ %% may end up getting loaded twice, so guard against that.
+ case application:load(rabbit) of
+ ok -> ok;
+ {error, {already_loaded, rabbit}} -> ok
+ end.
start() ->
+ start_it(fun() ->
+ %% We do not want to HiPE compile or upgrade
+ %% mnesia after just restarting the app
+ ok = ensure_application_loaded(),
+ ok = ensure_working_log_handlers(),
+ ok = app_utils:start_applications(app_startup_order()),
+ ok = print_plugin_info(rabbit_plugins:active())
+ end).
+
+boot() ->
+ start_it(fun() ->
+ ok = ensure_application_loaded(),
+ maybe_hipe_compile(),
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ ok = app_utils:load_applications(ToBeLoaded),
+ StartupApps = app_utils:app_dependency_order(ToBeLoaded,
+ false),
+ ok = app_utils:start_applications(StartupApps),
+ ok = print_plugin_info(Plugins)
+ end).
+
+start_it(StartFun) ->
try
- %% prepare/1 ends up looking at the rabbit app's env, so it
- %% needs to be loaded, but during the tests, it may end up
- %% getting loaded twice, so guard against that
- case application:load(rabbit) of
- ok -> ok;
- {error, {already_loaded, rabbit}} -> ok
- end,
- ok = prepare(),
- ok = rabbit_misc:start_applications(application_load_order())
+ StartFun()
after
- %%give the error loggers some time to catch up
+ %% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
rabbit_log:info("Stopping Rabbit~n"),
- ok = rabbit_misc:stop_applications(application_load_order()).
+ ok = app_utils:stop_applications(app_shutdown_order()).
stop_and_halt() ->
try
@@ -318,6 +341,9 @@ stop_and_halt() ->
end,
ok.
+await_startup() ->
+ app_utils:wait_for_applications(app_startup_order()).
+
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
{running_applications, application:which_applications(infinity)},
@@ -394,46 +420,13 @@ stop(_State) ->
%%---------------------------------------------------------------------------
%% application life cycle
-application_load_order() ->
- ok = load_applications(),
- {ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
- [{App, app_dependencies(App)} ||
- {App, _Desc, _Vsn} <- application:loaded_applications()]),
- true = digraph:del_vertices(
- G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)),
- Result = digraph_utils:topsort(G),
- true = digraph:delete(G),
- Result.
-
-load_applications() ->
- load_applications(queue:from_list(?APPS), sets:new()).
-
-load_applications(Worklist, Loaded) ->
- case queue:out(Worklist) of
- {empty, _WorkList} ->
- ok;
- {{value, App}, Worklist1} ->
- case sets:is_element(App, Loaded) of
- true -> load_applications(Worklist1, Loaded);
- false -> case application:load(App) of
- ok -> ok;
- {error, {already_loaded, App}} -> ok;
- Error -> throw(Error)
- end,
- load_applications(
- queue:join(Worklist1,
- queue:from_list(app_dependencies(App))),
- sets:add_element(App, Loaded))
- end
- end.
+app_startup_order() ->
+ ok = app_utils:load_applications(?APPS),
+ app_utils:app_dependency_order(?APPS, false).
-app_dependencies(App) ->
- case application:get_key(App, applications) of
- undefined -> [];
- {ok, Lst} -> Lst
- end.
+app_shutdown_order() ->
+ Apps = ?APPS ++ rabbit_plugins:active(),
+ app_utils:app_dependency_order(Apps, true).
%%---------------------------------------------------------------------------
%% boot step logic
@@ -479,7 +472,8 @@ sort_boot_steps(UnsortedSteps) ->
%% there is one, otherwise fail).
SortedSteps = lists:reverse(
[begin
- {StepName, Step} = digraph:vertex(G, StepName),
+ {StepName, Step} = digraph:vertex(G,
+ StepName),
Step
end || StepName <- digraph_utils:topsort(G)]),
digraph:delete(G),
@@ -561,7 +555,8 @@ insert_default_data() ->
ok = rabbit_vhost:add(DefaultVHost),
ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags),
- ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser,
+ DefaultVHost,
DefaultConfigurePerm,
DefaultWritePerm,
DefaultReadPerm),
@@ -649,6 +644,24 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
%% misc
+print_plugin_info([]) ->
+ ok;
+print_plugin_info(Plugins) ->
+ %% This gets invoked by rabbitmqctl start_app, outside the context
+ %% of the rabbit application
+ rabbit_misc:with_local_io(
+ fun() ->
+ io:format("~n-- plugins running~n"),
+ [print_plugin_info(
+ AppName, element(2, application:get_key(AppName, vsn)))
+ || AppName <- Plugins],
+ ok
+ end).
+
+print_plugin_info(Plugin, Vsn) ->
+ Len = 76 - length(Vsn),
+ io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]).
+
erts_version_check() ->
FoundVer = erlang:system_info(version),
case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 04e0c141..d16d90a4 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -162,17 +162,17 @@ maybe_alert(UpdateFun, Node, Source,
end,
State#alarms{alarmed_nodes = AN1}.
-alert_local(Alert, Alertees, _Source) ->
- alert(Alertees, [Alert], fun erlang:'=:='/2).
+alert_local(Alert, Alertees, Source) ->
+ alert(Alertees, Source, Alert, fun erlang:'=:='/2).
alert_remote(Alert, Alertees, Source) ->
- alert(Alertees, [Source, Alert], fun erlang:'=/='/2).
+ alert(Alertees, Source, Alert, fun erlang:'=/='/2).
-alert(Alertees, AlertArg, NodeComparator) ->
+alert(Alertees, Source, Alert, NodeComparator) ->
Node = node(),
dict:fold(fun (Pid, {M, F, A}, ok) ->
case NodeComparator(Node, node(Pid)) of
- true -> apply(M, F, A ++ [Pid] ++ AlertArg);
+ true -> apply(M, F, A ++ [Pid, Source, Alert]);
false -> ok
end
end, ok, Alertees).
@@ -181,7 +181,7 @@ internal_register(Pid, {M, F, A} = HighMemMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
- {ok, _Sources} -> apply(M, F, A ++ [Pid, true]);
+ {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
error -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c1673504..e3ed4113 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -166,6 +166,9 @@
[queue_name, channel_pid, consumer_tag, ack_required]).
start() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -508,7 +511,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -573,7 +576,8 @@ on_node_down(Node) ->
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
+ node(Pid) == Node andalso
+ not is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5701efeb..f2833c26 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -984,8 +984,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- {basic_consume, _, _, _, _, _, _} -> 7;
- {basic_cancel, _, _, _} -> 7;
stat -> 7;
_ -> 0
end.
@@ -995,10 +993,6 @@ prioritise_cast(Msg, _State) ->
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
- {ack, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid, _Credit} -> 7;
- {unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control_main.erl
index ad350d28..b23088cc 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control_main.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
--module(rabbit_control).
+-module(rabbit_control_main).
-include("rabbit.hrl").
-export([start/0, stop/0, action/5]).
@@ -58,7 +58,7 @@
{set_permissions, [?VHOST_DEF]},
{clear_permissions, [?VHOST_DEF]},
{list_permissions, [?VHOST_DEF]},
- {list_user_permissions, [?VHOST_DEF]},
+ list_user_permissions,
set_parameter,
clear_parameter,
@@ -253,7 +253,7 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, PidFile, rabbit, Inform);
+ wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
action(wait, Node, [PidFile, App], _Opts, Inform) ->
Inform("Waiting for ~p on ~p", [App, Node]),
wait_for_application(Node, PidFile, list_to_atom(App), Inform);
@@ -465,12 +465,22 @@ wait_for_application(Node, PidFile, Application, Inform) ->
Inform("pid is ~s", [Pid]),
wait_for_application(Node, Pid, Application).
+wait_for_application(Node, Pid, rabbit_and_plugins) ->
+ wait_for_startup(Node, Pid);
wait_for_application(Node, Pid, Application) ->
+ while_process_is_alive(
+ Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end).
+
+wait_for_startup(Node, Pid) ->
+ while_process_is_alive(
+ Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
+
+while_process_is_alive(Node, Pid, Activity) ->
case process_up(Pid) of
- true -> case rabbit_nodes:is_running(Node, Application) of
+ true -> case Activity() of
true -> ok;
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- wait_for_application(Node, Pid, Application)
+ while_process_is_alive(Node, Pid, Activity)
end;
false -> {error, process_not_running}
end.
@@ -485,12 +495,14 @@ wait_for_process_death(Pid) ->
read_pid_file(PidFile, Wait) ->
case {file:read_file(PidFile), Wait} of
{{ok, Bin}, _} ->
- S = string:strip(binary_to_list(Bin), right, $\n),
- try list_to_integer(S)
+ S = binary_to_list(Bin),
+ {match, [PidS]} = re:run(S, "[^\\s]+",
+ [{capture, all, list}]),
+ try list_to_integer(PidS)
catch error:badarg ->
exit({error, {garbage_in_pid_file, PidFile}})
end,
- S;
+ PidS;
{{error, enoent}, true} ->
timer:sleep(?EXTERNAL_CHECK_INTERVAL),
read_pid_file(PidFile, Wait);
@@ -502,8 +514,7 @@ read_pid_file(PidFile, Wait) ->
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
with_os([{unix, fun () ->
- system("ps -p " ++ Pid
- ++ " >/dev/null 2>&1") =:= 0
+ run_ps(Pid) =:= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -521,15 +532,17 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
+run_ps(Pid) ->
+ Port = erlang:open_port({spawn, "ps -p " ++ Pid},
+ [exit_status, {line, 16384},
+ use_stdio, stderr_to_stdout]),
+ exit_loop(Port).
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+exit_loop(Port) ->
+ receive
+ {Port, {exit_status, Rc}} -> Rc;
+ {Port, _} -> exit_loop(Port)
+ end.
format_parse_error({_Line, Mod, Err}) ->
lists:flatten(Mod:format_error(Err)).
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b1750b61..58375abb 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -27,7 +27,7 @@
set_check_interval/1, get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
-record(state, {dir,
limit,
@@ -168,8 +168,8 @@ get_disk_free(Dir, {unix, _}) ->
parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir));
get_disk_free(Dir, {win32, _}) ->
parse_free_win32(os:cmd("dir /-C /W \"" ++ Dir ++ [$"]));
-get_disk_free(_, _) ->
- unknown.
+get_disk_free(_, Platform) ->
+ {unknown, Platform}.
parse_free_unix(CommandResult) ->
[_, Stats | _] = string:tokens(CommandResult, "\n"),
@@ -178,8 +178,9 @@ parse_free_unix(CommandResult) ->
parse_free_win32(CommandResult) ->
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
- [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "),
- list_to_integer(Free).
+ {match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
+ [{capture, all_but_first, list}]),
+ list_to_integer(lists:reverse(Free)).
interpret_limit({mem_relative, R}) ->
round(R * vm_memory_monitor:get_total_memory());
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 59df14f3..a95f8f26 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -102,9 +102,12 @@ read_file_info(File) ->
with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
with_fhc_handle(Fun) ->
- ok = file_handle_cache:obtain(),
+ with_fhc_handle(1, Fun).
+
+with_fhc_handle(N, Fun) ->
+ [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
try Fun()
- after ok = file_handle_cache:release()
+ after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
end.
read_term_file(File) ->
@@ -165,7 +168,7 @@ make_binary(List) ->
{error, Reason}
end.
-
+%% TODO the semantics of this function are rather odd. But see bug 25021.
append_file(File, Suffix) ->
case read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -183,9 +186,11 @@ append_file(File, 0, Suffix) ->
end
end);
append_file(File, _, Suffix) ->
- case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
+ case with_fhc_handle(2, fun () ->
+ file:copy(File, {[File, Suffix], [append]})
+ end) of
+ {ok, _BytesCopied} -> ok;
+ Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb4..3e058793 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4e71cc43..750bcd56 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,21 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
+ Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ monitor_wait(MRefs),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
+monitor_wait([]) ->
+ ok;
+monitor_wait([MRef | MRefs]) ->
+ receive({'DOWN', MRef, process, _Pid, _Info}) ->
+ ok
+ end,
+ monitor_wait(MRefs).
+
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc..03fafc3e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -351,20 +351,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, {gm, Msg}),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Deaths) ->
- rabbit_misc:with_exit_handler(
- fun () -> {stop, normal} end,
- fun () ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
- ok ->
- ok;
- {promote, CPid} ->
- {become, rabbit_mirror_queue_coordinator, [CPid]}
- end
- end).
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
%% ---------------------------------------------------------------------------
%% Others
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 93c784ec..d41aa09b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, amqp_error/4,
+-export([die/1, frame_error/2, amqp_error/4, quit/1, quit/2,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
@@ -42,7 +42,6 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([format/2, format_many/1, format_stderr/2]).
-export([with_local_io/1, local_info_msg/2]).
--export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
@@ -59,7 +58,6 @@
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
--export([quit/1]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
@@ -87,6 +85,10 @@
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 ::
(rabbit_framing:amqp_exception()) -> channel_or_connection_exit()).
+
+-spec(quit/1 :: (integer()) -> no_return()).
+-spec(quit/2 :: (string(), [term()]) -> no_return()).
+
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
-> rabbit_types:connection_exit()).
-spec(amqp_error/4 ::
@@ -163,8 +165,6 @@
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
--spec(start_applications/1 :: ([atom()]) -> 'ok').
--spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
@@ -210,7 +210,6 @@
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
--spec(quit/1 :: (integer() | string()) -> no_return()).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
@@ -391,6 +390,28 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
confirm_to_sender(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+%%
+%% @doc Halts the emulator after printing out an error message io-formatted with
+%% the supplied arguments. The exit status of the beam process will be set to 1.
+%%
+quit(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ quit(1).
+
+%%
+%% @doc Halts the emulator returning the given status code to the os.
+%% On Windows this function will block indefinitely so as to give the io
+%% subsystem time to flush stdout completely.
+%%
+quit(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
+ end.
+
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
@@ -593,34 +614,6 @@ with_local_io(Fun) ->
local_info_msg(Format, Args) ->
with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
- Iterate(fun (App, Acc) ->
- case Do(App) of
- ok -> [App | Acc];
- {error, {SkipError, _}} -> Acc;
- {error, Reason} ->
- lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
- end
- end, [], Apps),
- ok.
-
-start_applications(Apps) ->
- manage_applications(fun lists:foldl/3,
- fun application:start/1,
- fun application:stop/1,
- already_started,
- cannot_start_application,
- Apps).
-
-stop_applications(Apps) ->
- manage_applications(fun lists:foldr/3,
- fun application:stop/1,
- fun application:start/1,
- not_started,
- cannot_stop_application,
- Apps).
-
unfold(Fun, Init) ->
unfold(Fun, [], Init).
@@ -937,13 +930,6 @@ receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
end.
-%% the slower shutdown on windows required to flush stdout
-quit(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status)
- end.
-
os_cmd(Command) ->
Exec = hd(string:tokens(Command, " ")),
case os:find_executable(Exec) of
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 1a12d43b..bedf5142 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
- connection_string/2]).
+ tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -69,6 +69,7 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
@@ -159,6 +160,13 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+tune_buffer_size(Sock) ->
+ case getopts(Sock, [sndbuf, recbuf, buffer]) of
+ {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+ setopts(Sock, [{buffer, BufSz}]);
+ Err -> Err
+ end.
+
connection_string(Sock, Direction) ->
{From, To} = case Direction of
inbound -> {fun peername/1, fun sockname/1};
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 78deea97..94a5a2b7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -142,7 +142,7 @@ start() -> rabbit_sup:start_supervisor_child(
{rabbit_connection_sup,start_link,[]}]).
ensure_ssl() ->
- ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
+ ok = app_utils:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
% unknown_ca errors are silently ignored prior to R14B unless we
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 30c7bb37..7cf6eea9 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -17,13 +17,8 @@
-module(rabbit_plugins).
-include("rabbit.hrl").
--export([start/0, stop/0, find_plugins/1, read_enabled_plugins/1,
- lookup_plugins/2, calculate_required_plugins/2, plugin_names/1]).
-
--define(VERBOSE_OPT, "-v").
--define(MINIMAL_OPT, "-m").
--define(ENABLED_OPT, "-E").
--define(ENABLED_ALL_OPT, "-e").
+-export([setup/0, active/0, read_enabled/1,
+ list/1, dependencies/3]).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
@@ -41,133 +36,38 @@
-ifdef(use_specs).
--spec(start/0 :: () -> no_return()).
--spec(stop/0 :: () -> 'ok').
--spec(find_plugins/1 :: (file:filename()) -> [#plugin{}]).
--spec(read_enabled_plugins/1 :: (file:filename()) -> [atom()]).
--spec(lookup_plugins/2 :: ([atom()], [#plugin{}]) -> [#plugin{}]).
--spec(calculate_required_plugins/2 :: ([atom()], [#plugin{}]) -> [atom()]).
--spec(plugin_names/1 :: ([#plugin{}]) -> [atom()]).
+-spec(setup/0 :: () -> [atom()]).
+-spec(active/0 :: () -> [atom()]).
+-spec(list/1 :: (string()) -> [#plugin{}]).
+-spec(read_enabled/1 :: (file:filename()) -> [atom()]).
+-spec(dependencies/3 ::
+ (boolean(), [atom()], [#plugin{}]) -> [atom()]).
-endif.
%%----------------------------------------------------------------------------
-start() ->
- {ok, [[PluginsFile|_]|_]} =
- init:get_argument(enabled_plugins_file),
- {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
- {Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
- {ok, Res} -> Res;
- no_command -> print_error("could not recognise command", []),
- usage()
- end,
-
- PrintInvalidCommandError =
- fun () ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")])
- end,
-
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
- ok ->
- rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- PrintInvalidCommandError(),
- usage();
- {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
- PrintInvalidCommandError(),
- usage();
- {error, Reason} ->
- print_error("~p", [Reason]),
- rabbit_misc:quit(2);
- {error_string, Reason} ->
- print_error("~s", [Reason]),
- rabbit_misc:quit(2);
- Other ->
- print_error("~p", [Other]),
- rabbit_misc:quit(2)
- end.
-
-stop() ->
- ok.
-
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
-
-usage() ->
- io:format("~s", [rabbit_plugins_usage:usage()]),
- rabbit_misc:quit(1).
-
-%%----------------------------------------------------------------------------
-
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
-
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
- case ToEnable0 of
- [] -> throw({error_string, "Not enough arguments for 'enable'"});
- _ -> ok
- end,
- AllPlugins = find_plugins(PluginsDir),
- Enabled = read_enabled_plugins(PluginsFile),
- ImplicitlyEnabled = calculate_required_plugins(Enabled, AllPlugins),
- ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
- Missing = ToEnable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> throw({error_string,
- fmt_list("The following plugins could not be found:",
- Missing)})
- end,
- NewEnabled = lists:usort(Enabled ++ ToEnable),
- write_enabled_plugins(PluginsFile, NewEnabled),
- NewImplicitlyEnabled = calculate_required_plugins(NewEnabled, AllPlugins),
- maybe_warn_mochiweb(NewImplicitlyEnabled),
- case NewEnabled -- ImplicitlyEnabled of
- [] -> io:format("Plugin configuration unchanged.~n");
- _ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
-
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
- case ToDisable0 of
- [] -> throw({error_string, "Not enough arguments for 'disable'"});
- _ -> ok
- end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = read_enabled_plugins(PluginsFile),
- AllPlugins = find_plugins(PluginsDir),
- Missing = ToDisable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> print_list("Warning: the following plugins could not be found:",
- Missing)
- end,
- ToDisableDeps = calculate_dependencies(true, ToDisable, AllPlugins),
- NewEnabled = Enabled -- ToDisableDeps,
- case length(Enabled) =:= length(NewEnabled) of
- true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- calculate_required_plugins(Enabled, AllPlugins),
- NewImplicitlyEnabled =
- calculate_required_plugins(NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
- ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
-
-%%----------------------------------------------------------------------------
-
-%% Get the #plugin{}s ready to be enabled.
-find_plugins(PluginsDir) ->
+%%
+%% @doc Prepares the file system and installs all enabled plugins.
+%%
+setup() ->
+ {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ {ok, EnabledPluginsFile} = application:get_env(rabbit,
+ enabled_plugins_file),
+ prepare_plugins(EnabledPluginsFile, PluginDir, ExpandDir),
+ [prepare_dir_plugin(PluginName) ||
+ PluginName <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+
+%% @doc Lists the plugins which are currently running.
+active() ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ [App || {App, _, _} <- application:which_applications(),
+ lists:member(App, InstalledPlugins)].
+
+%% @doc Get the list of plugins which are ready to be enabled.
+list(PluginsDir) ->
EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)],
FreeApps = [{app, App} ||
App <- filelib:wildcard("*/ebin/*.app", PluginsDir)],
@@ -186,6 +86,91 @@ find_plugins(PluginsDir) ->
end,
Plugins.
+%% @doc Read the list of enabled plugins from the supplied term file.
+read_enabled(PluginsFile) ->
+ case rabbit_file:read_term_file(PluginsFile) of
+ {ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+%%
+%% @doc Calculate the dependency graph from <i>Sources</i>.
+%% When Reverse =:= true the bottom/leaf level applications are returned in
+%% the resulting list, otherwise they're skipped.
+%%
+dependencies(Reverse, Sources, AllPlugins) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps}
+ || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ Dests = case Reverse of
+ false -> digraph_utils:reachable(Sources, G);
+ true -> digraph_utils:reaching(Sources, G)
+ end,
+ true = digraph:delete(G),
+ Dests.
+
+%%----------------------------------------------------------------------------
+
+prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
+ AllPlugins = list(PluginsDistDir),
+ Enabled = read_enabled(EnabledPluginsFile),
+ ToUnpack = dependencies(false, Enabled, AllPlugins),
+ ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
+
+ Missing = Enabled -- plugin_names(ToUnpackPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> io:format("Warning: the following enabled plugins were "
+ "not found: ~p~n", [Missing])
+ end,
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> rabbit_misc:quit("Could not delete dir ~s (~p)",
+ [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> rabbit_misc:quit("Could not create dir ~s (~p)",
+ [DestDir, E2])
+ end,
+
+ [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
+
+prepare_dir_plugin(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN),
+
+ %% We want the second-last token
+ NameTokens = string:tokens(PluginAppDescFn,"/."),
+ PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
+ list_to_atom(PluginNameString).
+
+%%----------------------------------------------------------------------------
+
+delete_recursively(Fn) ->
+ case rabbit_file:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
+ end.
+
+prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
+ zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ PluginsDestDir) ->
+ rabbit_file:recursive_copy(Location,
+ filename:join([PluginsDestDir, Name])).
+
%% Get the #plugin{} from an .ez.
get_plugin_info(Base, {ez, EZ0}) ->
EZ = filename:join([Base, EZ0]),
@@ -245,78 +230,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
- Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
- Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
- Format = case {Verbose, Minimal} of
- {false, false} -> normal;
- {true, false} -> verbose;
- {false, true} -> minimal;
- {true, true} -> throw({error_string,
- "Cannot specify -m and -v together"})
- end,
- OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
- OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
-
- AvailablePlugins = find_plugins(PluginsDir),
- EnabledExplicitly = read_enabled_plugins(PluginsFile),
- EnabledImplicitly =
- calculate_required_plugins(EnabledExplicitly, AvailablePlugins) --
- EnabledExplicitly,
- {ok, RE} = re:compile(Pattern),
- Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins,
- re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or
- lists:member(Name, EnabledImplicitly));
- true -> true
- end],
- Plugins1 = usort_plugins(Plugins),
- MaxWidth = lists:max([length(atom_to_list(Name)) ||
- #plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
- MaxWidth) || P <- Plugins1],
- ok.
-
-format_plugin(#plugin{name = Name, version = Version,
- description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly)} of
- {true, false} -> "[E]";
- {false, true} -> "[e]";
- _ -> "[ ]"
- end,
- case Format of
- minimal -> io:format("~s~n", [Name]);
- normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
- "w ~s~n", [Glyph, Name, Version]);
- verbose -> io:format("~s ~w~n", [Glyph, Name]),
- io:format(" Version: \t~s~n", [Version]),
- case Deps of
- [] -> ok;
- _ -> io:format(" Dependencies:\t~p~n", [Deps])
- end,
- io:format(" Description:\t~s~n", [Description]),
- io:format("~n")
- end.
-
-print_list(Header, Plugins) ->
- io:format(fmt_list(Header, Plugins)).
-
-fmt_list(Header, Plugins) ->
- lists:flatten(
- [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-
-usort_plugins(Plugins) ->
- lists:usort(fun plugins_cmp/2, Plugins).
-
-plugins_cmp(#plugin{name = N1, version = V1},
- #plugin{name = N2, version = V2}) ->
- {N1, V1} =< {N2, V2}.
-
%% Filter out applications that can be loaded *right now*.
filter_applications(Applications) ->
[Application || Application <- Applications,
@@ -339,71 +252,3 @@ plugin_names(Plugins) ->
%% Find plugins by name in a list of plugins.
lookup_plugins(Names, AllPlugins) ->
[P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)].
-
-%% Read the enabled plugin names from disk.
-read_enabled_plugins(PluginsFile) ->
- case rabbit_file:read_term_file(PluginsFile) of
- {ok, [Plugins]} -> Plugins;
- {ok, []} -> [];
- {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
- PluginsFile}});
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
- PluginsFile, Reason}})
- end.
-
-%% Write the enabled plugin names on disk.
-write_enabled_plugins(PluginsFile, Plugins) ->
- case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
- ok -> ok;
- {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
- PluginsFile, Reason}})
- end.
-
-calculate_required_plugins(Sources, AllPlugins) ->
- calculate_dependencies(false, Sources, AllPlugins).
-
-calculate_dependencies(Reverse, Sources, AllPlugins) ->
- {ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- [{Name, Deps}
- || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
- Dests = case Reverse of
- false -> digraph_utils:reachable(Sources, G);
- true -> digraph_utils:reaching(Sources, G)
- end,
- true = digraph:delete(G),
- Dests.
-
-maybe_warn_mochiweb(Enabled) ->
- V = erlang:system_info(otp_release),
- case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
- true ->
- Stars = string:copies("*", 80),
- io:format("~n~n~s~n"
- " Warning: Mochiweb enabled and Erlang version ~s "
- "detected.~n"
- " Enabling plugins that depend on Mochiweb is not "
- "supported on this Erlang~n"
- " version. At least R13B01 is required.~n~n"
- " RabbitMQ will not start successfully in this "
- "configuration. You *must*~n"
- " disable the Mochiweb plugin, or upgrade Erlang.~n"
- "~s~n~n~n", [Stars, V, Stars]);
- false ->
- ok
- end.
-
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n"),
- case os:type() of
- {win32, _OsName} ->
- io:format("If you have RabbitMQ running as a service then you must"
- " reinstall by running~n rabbitmq-service.bat stop~n"
- " rabbitmq-service.bat install~n"
- " rabbitmq-service.bat start~n~n");
- _ ->
- ok
- end.
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
new file mode 100644
index 00000000..572cf150
--- /dev/null
+++ b/src/rabbit_plugins_main.erl
@@ -0,0 +1,273 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_plugins_main).
+-include("rabbit.hrl").
+
+-export([start/0, stop/0]).
+
+-define(VERBOSE_OPT, "-v").
+-define(MINIMAL_OPT, "-m").
+-define(ENABLED_OPT, "-E").
+-define(ENABLED_ALL_OPT, "-e").
+
+-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
+-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
+-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
+-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+
+-define(GLOBAL_DEFS, []).
+
+-define(COMMANDS,
+ [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
+ enable,
+ disable]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(usage/0 :: () -> no_return()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ {ok, [[PluginsFile|_]|_]} =
+ init:get_argument(enabled_plugins_file),
+ {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
+ {Command, Opts, Args} =
+ case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
+ init:get_plain_arguments())
+ of
+ {ok, Res} -> Res;
+ no_command -> print_error("could not recognise command", []),
+ usage()
+ end,
+
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
+ case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ ok ->
+ rabbit_misc:quit(0);
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
+ usage();
+ {error, Reason} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
+ rabbit_misc:quit(2);
+ Other ->
+ print_error("~p", [Other]),
+ rabbit_misc:quit(2)
+ end.
+
+stop() ->
+ ok.
+
+%%----------------------------------------------------------------------------
+
+action(list, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+
+action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToEnable0 of
+ [] -> throw({error_string, "Not enough arguments for 'enable'"});
+ _ -> ok
+ end,
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ Enabled, AllPlugins),
+ ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
+ Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string,
+ fmt_list("The following plugins could not be found:",
+ Missing)})
+ end,
+ NewEnabled = lists:usort(Enabled ++ ToEnable),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
+ maybe_warn_mochiweb(NewImplicitlyEnabled),
+ case NewEnabled -- ImplicitlyEnabled of
+ [] -> io:format("Plugin configuration unchanged.~n");
+ _ -> print_list("The following plugins have been enabled:",
+ NewImplicitlyEnabled -- ImplicitlyEnabled),
+ report_change()
+ end;
+
+action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToDisable0 of
+ [] -> throw({error_string, "Not enough arguments for 'disable'"});
+ _ -> ok
+ end,
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Missing = ToDisable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> print_list("Warning: the following plugins could not be found:",
+ Missing)
+ end,
+ ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
+ NewEnabled = Enabled -- ToDisableDeps,
+ case length(Enabled) =:= length(NewEnabled) of
+ true -> io:format("Plugin configuration unchanged.~n");
+ false -> ImplicitlyEnabled =
+ rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ NewImplicitlyEnabled =
+ rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
+ print_list("The following plugins have been disabled:",
+ ImplicitlyEnabled -- NewImplicitlyEnabled),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ report_change()
+ end.
+
+%%----------------------------------------------------------------------------
+
+print_error(Format, Args) ->
+ rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+
+usage() ->
+ io:format("~s", [rabbit_plugins_usage:usage()]),
+ rabbit_misc:quit(1).
+
+%% Pretty print a list of plugins.
+format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+ Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
+ Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
+ Format = case {Verbose, Minimal} of
+ {false, false} -> normal;
+ {true, false} -> verbose;
+ {false, true} -> minimal;
+ {true, true} -> throw({error_string,
+ "Cannot specify -m and -v together"})
+ end,
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
+
+ AvailablePlugins = rabbit_plugins:list(PluginsDir),
+ EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
+ EnabledImplicitly =
+ rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins) -- EnabledExplicitly,
+ {ok, RE} = re:compile(Pattern),
+ Plugins = [ Plugin ||
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
+ re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> (lists:member(Name,
+ EnabledExplicitly) or
+ lists:member(Name, EnabledImplicitly));
+ true -> true
+ end],
+ Plugins1 = usort_plugins(Plugins),
+ MaxWidth = lists:max([length(atom_to_list(Name)) ||
+ #plugin{name = Name} <- Plugins1] ++ [0]),
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
+ MaxWidth) || P <- Plugins1],
+ ok.
+
+format_plugin(#plugin{name = Name, version = Version,
+ description = Description, dependencies = Deps},
+ EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ Glyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "[E]";
+ {false, true} -> "[e]";
+ _ -> "[ ]"
+ end,
+ case Format of
+ minimal -> io:format("~s~n", [Name]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
+ "w ~s~n", [Glyph, Name, Version]);
+ verbose -> io:format("~s ~w~n", [Glyph, Name]),
+ io:format(" Version: \t~s~n", [Version]),
+ case Deps of
+ [] -> ok;
+ _ -> io:format(" Dependencies:\t~p~n", [Deps])
+ end,
+ io:format(" Description:\t~s~n", [Description]),
+ io:format("~n")
+ end.
+
+print_list(Header, Plugins) ->
+ io:format(fmt_list(Header, Plugins)).
+
+fmt_list(Header, Plugins) ->
+ lists:flatten(
+ [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
+
+usort_plugins(Plugins) ->
+ lists:usort(fun plugins_cmp/2, Plugins).
+
+plugins_cmp(#plugin{name = N1, version = V1},
+ #plugin{name = N2, version = V2}) ->
+ {N1, V1} =< {N2, V2}.
+
+%% Return the names of the given plugins.
+plugin_names(Plugins) ->
+ [Name || #plugin{name = Name} <- Plugins].
+
+%% Write the enabled plugin names on disk.
+write_enabled_plugins(PluginsFile, Plugins) ->
+ case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
+ ok -> ok;
+ {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+maybe_warn_mochiweb(Enabled) ->
+ V = erlang:system_info(otp_release),
+ case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
+ true ->
+ Stars = string:copies("*", 80),
+ io:format("~n~n~s~n"
+ " Warning: Mochiweb enabled and Erlang version ~s "
+ "detected.~n"
+ " Enabling plugins that depend on Mochiweb is not "
+ "supported on this Erlang~n"
+ " version. At least R13B01 is required.~n~n"
+ " RabbitMQ will not start successfully in this "
+ "configuration. You *must*~n"
+ " disable the Mochiweb plugin, or upgrade Erlang.~n"
+ "~s~n~n~n", [Stars, V, Stars]);
+ false ->
+ ok
+ end.
+
+report_change() ->
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n").
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 162d44f1..d56211b5 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -31,212 +31,21 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
-%% Shut dialyzer up
--spec(terminate/1 :: (string()) -> no_return()).
--spec(terminate/2 :: (string(), [any()]) -> no_return()).
-endif.
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ...~n"),
-
- %% Determine our various directories
- [EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir, NodeStr] =
- init:get_plain_arguments(),
- RootName = UnpackedPluginDir ++ "/rabbit",
-
- prepare_plugins(EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir),
-
- %% Build a list of required apps based on the fixed set, and any plugins
- PluginApps = find_plugins(UnpackedPluginDir),
- RequiredApps = ?BaseApps ++ PluginApps,
-
- %% Build the entire set of dependencies - this will load the
- %% applications along the way
- AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {failed_to_load_app, App, Err} ->
- terminate("failed to load application ~s:~n~p",
- [App, Err]);
- AppList ->
- AppList
- end,
- AppVersions = [determine_version(App) || App <- AllApps],
- RabbitVersion = proplists:get_value(rabbit, AppVersions),
-
- %% Build the overall release descriptor
- RDesc = {release,
- {"rabbit", RabbitVersion},
- {erts, erlang:system_info(version)},
- AppVersions},
-
- %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
- rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
-
- %% We exclude mochiweb due to its optional use of fdsrv.
- XRefExclude = [mochiweb],
-
- %% Compile the script
- ScriptFile = RootName ++ ".script",
- case systools:make_script(RootName, [local, silent,
- {exref, AllApps -- XRefExclude}]) of
- {ok, Module, Warnings} ->
- %% This gets lots of spurious no-source warnings when we
- %% have .ez files, so we want to supress them to prevent
- %% hiding real issues. On Ubuntu, we also get warnings
- %% about kernel/stdlib sources being out of date, which we
- %% also ignore for the same reason.
- WarningStr = Module:format_warning(
- [W || W <- Warnings,
- case W of
- {warning, {source_not_found, _}} -> false;
- {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
- when WApp == mnesia;
- WApp == stdlib;
- WApp == kernel;
- WApp == sasl;
- WApp == crypto;
- WApp == os_mon -> false;
- _ -> true
- end]),
- case length(WarningStr) of
- 0 -> ok;
- _ -> S = string:copies("*", 80),
- io:format("~n~s~n~s~s~n~n", [S, WarningStr, S])
- end,
- ok;
- {error, Module, Error} ->
- terminate("generation of boot script file ~s failed:~n~s",
- [ScriptFile, Module:format_error(Error)])
- end,
-
- case post_process_script(ScriptFile) of
- ok -> ok;
- {error, Reason} ->
- terminate("post processing of boot script file ~s failed:~n~w",
- [ScriptFile, Reason])
- end,
- case systools:script2boot(RootName) of
- ok -> ok;
- error -> terminate("failed to compile boot script file ~s",
- [ScriptFile])
- end,
- io:format("~w plugins activated:~n", [length(PluginApps)]),
- [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
- || App <- PluginApps],
- io:nl(),
-
+ [NodeStr] = init:get_plain_arguments(),
ok = duplicate_node_check(NodeStr),
-
- terminate(0),
+ rabbit_misc:quit(0),
ok.
stop() ->
ok.
-determine_version(App) ->
- application:load(App),
- {ok, Vsn} = application:get_key(App, vsn),
- {App, Vsn}.
-
-delete_recursively(Fn) ->
- case rabbit_file:recursive_delete([Fn]) of
- ok -> ok;
- {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
- Error -> Error
- end.
-
-prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
- AllPlugins = rabbit_plugins:find_plugins(PluginsDistDir),
- Enabled = rabbit_plugins:read_enabled_plugins(EnabledPluginsFile),
- ToUnpack = rabbit_plugins:calculate_required_plugins(Enabled, AllPlugins),
- ToUnpackPlugins = rabbit_plugins:lookup_plugins(ToUnpack, AllPlugins),
-
- Missing = Enabled -- rabbit_plugins:plugin_names(ToUnpackPlugins),
- case Missing of
- [] -> ok;
- _ -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
- end,
-
- %% Eliminate the contents of the destination directory
- case delete_recursively(DestDir) of
- ok -> ok;
- {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E])
- end,
- case filelib:ensure_dir(DestDir ++ "/") of
- ok -> ok;
- {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
- end,
-
- [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
-
-prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
- zip:unzip(Location, [{cwd, PluginDestDir}]);
-prepare_plugin(#plugin{type = dir, name = Name, location = Location},
- PluginsDestDir) ->
- rabbit_file:recursive_copy(Location,
- filename:join([PluginsDestDir, Name])).
-
-find_plugins(PluginDir) ->
- [prepare_dir_plugin(PluginName) ||
- PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
-
-prepare_dir_plugin(PluginAppDescFn) ->
- %% Add the plugin ebin directory to the load path
- PluginEBinDirN = filename:dirname(PluginAppDescFn),
- code:add_path(PluginEBinDirN),
-
- %% We want the second-last token
- NameTokens = string:tokens(PluginAppDescFn,"/."),
- PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
- list_to_atom(PluginNameString).
-
-expand_dependencies(Pending) ->
- expand_dependencies(sets:new(), Pending).
-expand_dependencies(Current, []) ->
- Current;
-expand_dependencies(Current, [Next|Rest]) ->
- case sets:is_element(Next, Current) of
- true ->
- expand_dependencies(Current, Rest);
- false ->
- case application:load(Next) of
- ok ->
- ok;
- {error, {already_loaded, _}} ->
- ok;
- {error, Reason} ->
- throw({failed_to_load_app, Next, Reason})
- end,
- {ok, Required} = application:get_key(Next, applications),
- Unique = [A || A <- Required, not(sets:is_element(A, Current))],
- expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
- end.
-
-post_process_script(ScriptFile) ->
- case file:consult(ScriptFile) of
- {ok, [{script, Name, Entries}]} ->
- NewEntries = lists:flatmap(fun process_entry/1, Entries),
- case file:open(ScriptFile, [write]) of
- {ok, Fd} ->
- io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
- [date(), time(), {script, Name, NewEntries}]),
- file:close(Fd),
- ok;
- {error, OReason} ->
- {error, {failed_to_open_script_file_for_writing, OReason}}
- end;
- {error, Reason} ->
- {error, {failed_to_load_script, Reason}}
- end.
-
-process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
- [{apply,{rabbit,maybe_hipe_compile,[]}},
- {apply,{rabbit,prepare,[]}}, Entry];
-process_entry(Entry) ->
- [Entry].
+%%----------------------------------------------------------------------------
%% Check whether a node with the same name is already running
duplicate_node_check([]) ->
@@ -252,11 +61,11 @@ duplicate_node_check(NodeStr) ->
"already running on ~p~n",
[NodeName, NodeHost]),
io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
- terminate(?ERROR_CODE);
+ rabbit_misc:quit(?ERROR_CODE);
false -> ok
end;
{error, EpmdReason} ->
- terminate("epmd error for host ~p: ~p (~s)~n",
+ rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
case EpmdReason of
address -> "unable to establish tcp connection";
@@ -264,16 +73,3 @@ duplicate_node_check(NodeStr) ->
_ -> inet:format_error(EpmdReason)
end])
end.
-
-terminate(Fmt, Args) ->
- io:format("ERROR: " ++ Fmt ++ "~n", Args),
- terminate(?ERROR_CODE).
-
-terminate(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status),
- receive
- after infinity -> ok
- end
- end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5acf6aca..bd5cf588 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_resources/2, server_properties/1]).
+-export([conserve_resources/3, server_properties/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -133,7 +133,7 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, Conserve) ->
+conserve_resources(Pid, _Source, Conserve) ->
Pid ! {conserve_resources, Conserve},
ok.
@@ -222,14 +222,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_by = none,
last_blocked_at = never},
try
- BufSizes = inet_op(fun () ->
- rabbit_net:getopts(
- ClientSock, [sndbuf, recbuf, buffer])
- end),
- BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
- ok = inet_op(fun () ->
- rabbit_net:setopts(ClientSock, [{buffer, BufSz}])
- end),
+ ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)),
@@ -320,8 +313,8 @@ handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
-handle_other(timeout, _Deb, #v1{connection_state = S}) ->
- throw({timeout, S});
+handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+ throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -690,7 +683,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun = fun() -> Parent ! timeout end,
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -743,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bae4928d..5545cccf 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -29,6 +29,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+-define(TIMEOUT, 5000).
all_tests() ->
passed = gm_tests:all_tests(),
@@ -1240,7 +1241,7 @@ test_spawn() ->
rabbit_limiter:make_token(self())),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
end,
{Writer, Ch}.
@@ -1261,7 +1262,7 @@ test_spawn_remote() ->
end
end),
receive Res -> Res
- after 1000 -> throw(failed_to_receive_result)
+ after ?TIMEOUT -> throw(failed_to_receive_result)
end.
user(Username) ->
@@ -1281,13 +1282,10 @@ test_confirms() ->
queue = Q0,
exchange = <<"amq.direct">>,
routing_key = "magic" }),
- receive #'queue.bind_ok'{} ->
- Q0
- after 1000 ->
- throw(failed_to_bind_queue)
+ receive #'queue.bind_ok'{} -> Q0
+ after ?TIMEOUT -> throw(failed_to_bind_queue)
end
- after 1000 ->
- throw(failed_to_declare_queue)
+ after ?TIMEOUT -> throw(failed_to_declare_queue)
end
end,
%% Declare and bind two queues
@@ -1300,7 +1298,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
#'confirm.select_ok'{} -> ok
- after 1000 -> throw(failed_to_enable_confirms)
+ after ?TIMEOUT -> throw(failed_to_enable_confirms)
end,
%% Publish a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
@@ -1317,7 +1315,7 @@ test_confirms() ->
receive
#'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(received_ack_instead_of_nack)
- after 2000 -> throw(did_not_receive_nack)
+ after ?TIMEOUT-> throw(did_not_receive_nack)
end,
receive
#'basic.ack'{} -> throw(received_ack_when_none_expected)
@@ -1327,7 +1325,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
receive
#'queue.delete_ok'{} -> ok
- after 1000 -> throw(failed_to_cleanup_queue)
+ after ?TIMEOUT -> throw(failed_to_cleanup_queue)
end,
unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
@@ -1350,7 +1348,7 @@ test_statistics_receive_event1(Ch, Matcher) ->
true -> Props;
_ -> test_statistics_receive_event1(Ch, Matcher)
end
- after 1000 -> throw(failed_to_receive_event)
+ after ?TIMEOUT -> throw(failed_to_receive_event)
end.
test_statistics() ->
@@ -1362,9 +1360,8 @@ test_statistics() ->
%% Set up a channel and queue
{_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{}),
- QName = receive #'queue.declare_ok'{queue = Q0} ->
- Q0
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
QPid = Q#amqqueue.pid,
@@ -1444,7 +1441,7 @@ expect_event(Pid, Type) ->
Pid -> ok;
_ -> expect_event(Pid, Type)
end
- after 1000 -> throw({failed_to_receive_event, Type})
+ after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
test_delegates_async(SecondaryNode) ->
@@ -1468,7 +1465,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
+ after ?TIMEOUT -> throw(Throw)
end
end.
@@ -1481,9 +1478,7 @@ await_response(Count) ->
receive
response -> ok,
await_response(Count - 1)
- after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ after ?TIMEOUT -> throw(timeout)
end.
must_exit(Fun) ->
@@ -1550,7 +1545,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
rabbit_channel:shutdown(Ch),
rabbit:stop(),
@@ -1561,8 +1556,7 @@ test_queue_cleanup(_SecondaryNode) ->
receive
#'channel.close'{reply_code = ?NOT_FOUND} ->
ok
- after 2000 ->
- throw(failed_to_receive_channel_exit)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_exit)
end,
rabbit_channel:shutdown(Ch2),
passed.
@@ -1589,8 +1583,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false),
passed
- after 2000 ->
- throw(failed_to_create_and_kill_queue)
+ after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
%%---------------------------------------------------------------------
@@ -1603,7 +1596,7 @@ control_action(Command, Args, NewOpts) ->
expand_options(default_options(), NewOpts)).
control_action(Command, Node, Args, Opts) ->
- case catch rabbit_control:action(
+ case catch rabbit_control_main:action(
Command, Node, Args, Opts,
fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
@@ -1821,7 +1814,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) ->
Pid);
stop ->
done
- after (case Awaiting of [] -> 200; _ -> 1000 end) ->
+ after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) ->
case Awaiting of
[] -> Pid ! {self(), arrived}, on_disk_capture();
_ -> Pid ! {self(), timeout}
@@ -2374,7 +2367,7 @@ wait_for_confirms(Unconfirmed) ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
- after 5000 -> exit(timeout_waiting_for_confirm)
+ after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
end
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index dafb3f2e..49213c95 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1466,16 +1466,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
- S1 -> {_, State2} =
- lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end,
- {S1, State},
- case (AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress) of
- true -> [AckFun, AlphaBetaFun];
- false -> [AlphaBetaFun, AckFun]
- end),
+ S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress)) of
+ true -> [AckFun, AlphaBetaFun];
+ false -> [AlphaBetaFun, AckFun]
+ end,
+ {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, Funs),
{true, State2}
end,