summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Losh <tim@rabbitmq.com>2012-06-14 12:22:51 +0100
committerSteve Losh <tim@rabbitmq.com>2012-06-14 12:22:51 +0100
commitb126cf6d6bff5010500be94310097e4d3f5f3f38 (patch)
tree01876d9e725429010b70096f1658003c22e74eef
parent5619b90f552bea9882dd145c69d2630657e87083 (diff)
parent88bdcd465e8d9a55bbeb554c939d93ede37fd7a0 (diff)
downloadrabbitmq-server-b126cf6d6bff5010500be94310097e4d3f5f3f38.tar.gz
merge default
-rw-r--r--docs/rabbitmqctl.1.xml5
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init15
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server34
-rwxr-xr-xscripts/rabbitmq-server.bat31
-rwxr-xr-xscripts/rabbitmq-service.bat24
-rwxr-xr-xscripts/rabbitmqctl2
-rwxr-xr-xscripts/rabbitmqctl.bat4
-rw-r--r--src/app_utils.erl121
-rw-r--r--src/gm.erl276
-rw-r--r--src/mirrored_supervisor.erl81
-rw-r--r--src/mirrored_supervisor_tests.erl6
-rw-r--r--src/rabbit.erl145
-rw-r--r--src/rabbit_alarm.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_control_main.erl (renamed from src/rabbit_control.erl)118
-rw-r--r--src/rabbit_direct.erl12
-rw-r--r--src/rabbit_disk_monitor.erl5
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl8
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl17
-rw-r--r--src/rabbit_misc.erl170
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_networking.erl15
-rw-r--r--src/rabbit_plugins.erl395
-rw-r--r--src/rabbit_plugins_main.erl273
-rw-r--r--src/rabbit_prelaunch.erl214
-rw-r--r--src/rabbit_reader.erl17
-rw-r--r--src/rabbit_sup.erl38
-rw-r--r--src/rabbit_tests.erl130
-rw-r--r--src/rabbit_variable_queue.erl18
33 files changed, 1183 insertions, 1031 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 1effd691..d6f6d51f 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -720,7 +720,7 @@
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>list_user_permissions</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
@@ -978,7 +978,8 @@
</varlistentry>
<varlistentry>
<term>type</term>
- <listitem><para>The exchange type (one of [<command>direct</command>,
+ <listitem><para>The exchange type (such as
+ [<command>direct</command>,
<command>topic</command>, <command>headers</command>,
<command>fanout</command>]).</para></listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index b7d14f20..ffe112a0 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -22,6 +22,8 @@
{disk_free_limit, {mem_relative, 1.0}},
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
+ %% 0 ("no limit") would make a better default, but that
+ %% breaks the QPid Java client
{frame_max, 131072},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index e935acf5..943ed48f 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
-Uploader: Emile Joubert <emile@rabbitmq.com>
+Uploaders: Emile Joubert <emile@rabbitmq.com>
DM-Upload-Allowed: yes
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
index f514b974..c1352078 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.init
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -137,13 +137,18 @@ restart_end() {
start_stop_end() {
case "$RETVAL" in
0)
- log_end_msg 0;;
+ [ -x /sbin/initctl ] && /sbin/initctl emit --no-wait "${NAME}-${1}"
+ log_end_msg 0
+ ;;
3)
log_warning_msg "${DESC} already ${1}"
- log_end_msg 0;;
+ log_end_msg 0
+ RETVAL=0
+ ;;
*)
log_warning_msg "FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\}"
- log_end_msg 1;;
+ log_end_msg 1
+ ;;
esac
}
@@ -151,7 +156,7 @@ case "$1" in
start)
log_daemon_msg "Starting ${DESC}" $NAME
start_rabbitmq
- start_stop_end "started"
+ start_stop_end "running"
;;
stop)
log_daemon_msg "Stopping ${DESC}" $NAME
@@ -162,7 +167,7 @@ case "$1" in
status_rabbitmq
;;
rotate-logs)
- log_action_begin_msg "Rotating log files for ${DESC} ${NAME}"
+ log_action_begin_msg "Rotating log files for ${DESC}: ${NAME}"
rotate_logs_rabbitmq
log_action_end_msg $RETVAL
;;
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 14a18d57..97c74791 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -31,7 +31,7 @@ exec erl \
-noinput \
-hidden \
-sname rabbitmq-plugins$$ \
- -s rabbit_plugins \
+ -s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 66a900a1..3c268726 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -43,9 +43,11 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 0a5a4640..34915b3d 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -58,7 +58,8 @@ DEFAULT_NODE_PORT=5672
##--- End of overridden <var_name> variables
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT=" -noinput"
+[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT="$RABBITMQ_START_RABBIT -s rabbit boot "
case "$(uname -s)" in
CYGWIN*) # we make no attempt to record the cygwin pid; rabbitmqctl wait
@@ -70,24 +71,16 @@ case "$(uname -s)" in
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
- if erl \
- -pa "$RABBITMQ_EBIN_ROOT" \
- -noinput \
- -hidden \
- -s rabbit_prelaunch \
- -sname rabbitmqprelaunch$$ \
- -extra "$RABBITMQ_ENABLED_PLUGINS_FILE" "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
+if ! erl -pa "$RABBITMQ_EBIN_ROOT" \
+ -noinput \
+ -hidden \
+ -s rabbit_prelaunch \
+ -sname rabbitmqprelaunch$$ \
+ -extra "${RABBITMQ_NODENAME}";
then
- RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
- RABBITMQ_EBIN_PATH=""
- else
- exit 1
- fi
-else
- RABBITMQ_BOOT_FILE=start_sasl
- RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+ exit 1;
fi
+
RABBITMQ_CONFIG_ARG=
[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}"
@@ -100,10 +93,10 @@ RABBITMQ_LISTEN_ARG=
set -f
exec erl \
- ${RABBITMQ_EBIN_PATH} \
+ -pa ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot ${RABBITMQ_BOOT_FILE} \
+ -boot start_sasl \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
@@ -112,6 +105,9 @@ exec erl \
-sasl sasl_error_logger false \
-rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
-rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
+ -rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
+ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
+ -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
-os_mon start_cpu_sup false \
-os_mon start_disksup false \
-os_mon start_memsup false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index ca49a5d8..b8822739 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -86,25 +86,24 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- "!RABBITMQ_NODENAME!"
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+ -pa "!RABBITMQ_EBIN_ROOT!" ^
+ -noinput -hidden ^
+ -s rabbit_prelaunch ^
+ -sname rabbitmqprelaunch!RANDOM! ^
+ -extra "!RABBITMQ_NODENAME!"
+
if ERRORLEVEL 1 (
exit /B 1
)
-set RABBITMQ_EBIN_PATH=
+set RABBITMQ_EBIN_PATH="-pa !RABBITMQ_EBIN_ROOT!"
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
@@ -124,9 +123,10 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
"!ERLANG_HOME!\bin\erl.exe" ^
-!RABBITMQ_EBIN_PATH! ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
-sname !RABBITMQ_NODENAME! ^
+W w ^
@@ -139,6 +139,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 9e274840..849bedcf 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -157,22 +157,6 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!RABBITMQ_EBIN_ROOT!" ^
--noinput -hidden ^
--s rabbit_prelaunch ^
--extra "!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!" ^
- "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
- ""
-
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if ERRORLEVEL 1 (
- exit /B 1
-)
-
-set RABBITMQ_EBIN_PATH=
-
if "!RABBITMQ_CONFIG_FILE!"=="" (
set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
)
@@ -191,8 +175,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
)
set ERLANG_SERVICE_ARGUMENTS= ^
-!RABBITMQ_EBIN_PATH! ^
--boot "!RABBITMQ_BOOT_FILE!" ^
+-pa "!RABBITMQ_EBIN_ROOT!" ^
+-boot start_sasl ^
+-s rabbit boot ^
!RABBITMQ_CONFIG_ARG! ^
+W w ^
+A30 ^
@@ -204,6 +189,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^
-sasl sasl_error_logger false ^
-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-rabbit enabled_plugins_file \""!RABBITMQ_ENABLED_PLUGINS_FILE:\=/!"\" ^
+-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
-os_mon start_cpu_sup false ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 4aad6b8f..a5fade72 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -32,6 +32,6 @@ exec erl \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
- -s rabbit_control \
+ -s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index f37fae48..9f549f1e 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -34,7 +34,7 @@ if "!RABBITMQ_NODENAME!"=="" (
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
- echo ERLANG_HOME not set correctly.
+ echo ERLANG_HOME not set correctly.
echo ******************************
echo.
echo Please either set ERLANG_HOME to point to your Erlang installation or place the
@@ -43,7 +43,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
new file mode 100644
index 00000000..4bef83a5
--- /dev/null
+++ b/src/app_utils.erl
@@ -0,0 +1,121 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+-module(app_utils).
+
+-export([load_applications/1, start_applications/1,
+ stop_applications/1, app_dependency_order/2,
+ wait_for_applications/1]).
+
+-ifdef(use_specs).
+
+-spec load_applications([atom()]) -> 'ok'.
+-spec start_applications([atom()]) -> 'ok'.
+-spec stop_applications([atom()]) -> 'ok'.
+-spec wait_for_applications([atom()]) -> 'ok'.
+-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+
+-endif.
+
+%%---------------------------------------------------------------------------
+%% Public API
+
+load_applications(Apps) ->
+ load_applications(queue:from_list(Apps), sets:new()),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
+wait_for_applications(Apps) ->
+ [wait_for_application(App) || App <- Apps], ok.
+
+app_dependency_order(RootApps, StripUnreachable) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
+ [{App, app_dependencies(App)} ||
+ {App, _Desc, _Vsn} <- application:loaded_applications()]),
+ try
+ case StripUnreachable of
+ true -> digraph:del_vertices(G, digraph:vertices(G) --
+ digraph_utils:reachable(RootApps, G));
+ false -> ok
+ end,
+ digraph_utils:topsort(G)
+ after
+ true = digraph:delete(G)
+ end.
+
+%%---------------------------------------------------------------------------
+%% Private API
+
+wait_for_application(Application) ->
+ case lists:keymember(Application, 1, application:which_applications()) of
+ true -> ok;
+ false -> timer:sleep(1000),
+ wait_for_application(Application)
+ end.
+
+load_applications(Worklist, Loaded) ->
+ case queue:out(Worklist) of
+ {empty, _WorkList} ->
+ ok;
+ {{value, App}, Worklist1} ->
+ case sets:is_element(App, Loaded) of
+ true -> load_applications(Worklist1, Loaded);
+ false -> case application:load(App) of
+ ok -> ok;
+ {error, {already_loaded, App}} -> ok;
+ Error -> throw(Error)
+ end,
+ load_applications(
+ queue:join(Worklist1,
+ queue:from_list(app_dependencies(App))),
+ sets:add_element(App, Loaded))
+ end
+ end.
+
+app_dependencies(App) ->
+ case application:get_key(App, applications) of
+ undefined -> [];
+ {ok, Lst} -> Lst
+ end.
+
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
diff --git a/src/gm.erl b/src/gm.erl
index 01300f18..30fcdc5d 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -433,51 +433,47 @@
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(group_members/1 :: (pid()) -> [pid()]).
-%% The joined, members_changed and handle_msg callbacks can all
-%% return any of the following terms:
+%% The joined, members_changed and handle_msg callbacks can all return
+%% any of the following terms:
%%
%% 'ok' - the callback function returns normally
%%
-%% {'stop', Reason} - the callback indicates the member should
-%% stop with reason Reason and should leave the group.
+%% {'stop', Reason} - the callback indicates the member should stop
+%% with reason Reason and should leave the group.
%%
-%% {'become', Module, Args} - the callback indicates that the
-%% callback module should be changed to Module and that the
-%% callback functions should now be passed the arguments
-%% Args. This allows the callback module to be dynamically
-%% changed.
+%% {'become', Module, Args} - the callback indicates that the callback
+%% module should be changed to Module and that the callback functions
+%% should now be passed the arguments Args. This allows the callback
+%% module to be dynamically changed.
-%% Called when we've successfully joined the group. Supplied with
-%% Args provided in start_link, plus current group members.
+%% Called when we've successfully joined the group. Supplied with Args
+%% provided in start_link, plus current group members.
-callback joined(Args :: term(), Members :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Supplied with Args provided in start_link, the list of new
-%% members and the list of members previously known to us that
-%% have since died. Note that if a member joins and dies very
-%% quickly, it's possible that we will never see that member
-%% appear in either births or deaths. However we are guaranteed
-%% that (1) we will see a member joining either in the births
-%% here, or in the members passed to joined/2 before receiving
-%% any messages from it; and (2) we will not see members die that
-%% we have not seen born (or supplied in the members to
-%% joined/2).
+%% Supplied with Args provided in start_link, the list of new members
+%% and the list of members previously known to us that have since
+%% died. Note that if a member joins and dies very quickly, it's
+%% possible that we will never see that member appear in either births
+%% or deaths. However we are guaranteed that (1) we will see a member
+%% joining either in the births here, or in the members passed to
+%% joined/2 before receiving any messages from it; and (2) we will not
+%% see members die that we have not seen born (or supplied in the
+%% members to joined/2).
-callback members_changed(Args :: term(), Births :: [pid()],
Deaths :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
-%% message. This does get called for messages injected by this
-%% member, however, in such cases, there is no special
-%% significance of this invocation: it does not indicate that the
-%% message has made it to any other members, let alone all other
-%% members.
+%% message. This does get called for messages injected by this member,
+%% however, in such cases, there is no special significance of this
+%% invocation: it does not indicate that the message has made it to
+%% any other members, let alone all other members.
-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Called on gm member termination as per rules in gen_server,
-%% with the Args provided in start_link plus the termination
-%% Reason.
+%% Called on gm member termination as per rules in gen_server, with
+%% the Args provided in start_link plus the termination Reason.
-callback terminate(Args :: term(), Reason :: term()) ->
ok | term().
@@ -533,7 +529,7 @@ init([GroupName, Module, Args]) ->
group_name = GroupName,
module = Module,
view = undefined,
- pub_count = 0,
+ pub_count = -1,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
@@ -575,33 +571,39 @@ handle_call({add_on_right, NewMember}, _From,
members_state = MembersState,
module = Module,
callback_args = Args }) ->
- Group = record_new_member_in_group(
- GroupName, Self, NewMember,
- fun (Group1) ->
- View1 = group_to_view(Group1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(
- MembersState)})
- end),
+ {MembersState1, Group} =
+ record_new_member_in_group(
+ GroupName, Self, NewMember,
+ fun (Group1) ->
+ View1 = group_to_view(Group1),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self,
+ prepare_members_state(MembersState1)}),
+ MembersState1
+ end),
View2 = group_to_view(Group),
- State1 = check_neighbours(State #state { view = View2 }),
+ State1 = check_neighbours(State #state { view = View2,
+ members_state = MembersState1 }),
Result = callback_view_changed(Args, Module, View, View2),
handle_callback_result({Result, {ok, Group}, State1}).
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
+ members_state = MembersState,
group_name = GroupName,
module = Module,
callback_args = Args }) ->
{Result, State1} =
case needs_view_update(ReqVer, View) of
- true ->
- View1 = group_to_view(read_group(GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- check_neighbours(State #state { view = View1 })};
- false ->
- {ok, State}
+ true -> View1 = group_to_view(read_group(GroupName)),
+ MemberState1 = remove_erased_members(MembersState, View1),
+ {callback_view_changed(Args, Module, View, View1),
+ check_neighbours(
+ State #state { view = View1,
+ members_state = MemberState1 })};
+ false -> {ok, State}
end,
handle_callback_result(
if_callback_success(
@@ -665,22 +667,21 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
_ ->
View1 =
group_to_view(record_dead_member_in_group(Member, GroupName)),
- State1 = State #state { view = View1 },
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
- maybe_erase_aliases(
- State1 #state {
+ {Result1, State1} = maybe_erase_aliases(State, View1),
+ {Result1, State1 #state {
members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) });
+ confirms = purge_confirms(Confirms) }};
_ ->
%% here we won't be pointing out any deaths:
%% the concern is that there maybe births
%% which we'd otherwise miss.
{callback_view_changed(Args, Module, View, View1),
- State1}
+ check_neighbours(State #state { view = View1 })}
end,
- handle_callback_result({Result, check_neighbours(State2)})
+ handle_callback_result({Result, State2})
end.
@@ -693,9 +694,13 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) -> 1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
-prioritise_info(_ , _State) -> 0.
+prioritise_info(flush, _State) ->
+ 1;
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+ #state { members_state = MS }) when MS /= undefined ->
+ 1;
+prioritise_info(_, _State) ->
+ 0.
handle_msg(check_neighbours, State) ->
@@ -795,8 +800,8 @@ handle_msg({activity, Left, Activity},
State1 = State #state { members_state = MembersState1,
confirms = Confirms1 },
Activity3 = activity_finalise(Activity1),
- {Result, State2} = maybe_erase_aliases(State1),
- ok = maybe_send_activity(Activity3, State2),
+ ok = maybe_send_activity(Activity3, State1),
+ {Result, State2} = maybe_erase_aliases(State1, View),
if_callback_success(
Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
@@ -829,13 +834,14 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
+ PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
- Buffer1 = [{PubCount, Msg} | Buffer],
+ Buffer1 = [{PubCount1, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
- _ -> queue:in({PubCount, From}, Confirms)
+ _ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount + 1,
+ State1 = State #state { pub_count = PubCount1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
case From =/= none of
@@ -850,14 +856,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
- broadcast_buffer = Buffer }) ->
+ broadcast_buffer = Buffer,
+ pub_count = PubCount }) ->
+ [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
- Member #member { pending_ack = PA1 }
+ Member #member { pending_ack = PA1,
+ last_pub = PubCount }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [] }.
@@ -867,11 +876,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -890,17 +897,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1052,7 +1055,7 @@ record_dead_member_in_group(Member, GroupName) ->
Group.
record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, Group} =
+ {atomic, {Result, Group}} =
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
@@ -1062,11 +1065,11 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
Members1 = Prefix ++ [Left, NewMember | Suffix],
Group2 = Group1 #gm_group { members = Members1,
version = Ver + 1 },
- ok = Fun(Group2),
+ Result = Fun(Group2),
mnesia:write(Group2),
- Group2
+ {Result, Group2}
end),
- Group.
+ {Result, Group}.
erase_members_in_group(Members, GroupName) ->
DeadMembers = [{dead, Id} || Id <- Members],
@@ -1089,10 +1092,10 @@ erase_members_in_group(Members, GroupName) ->
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
- view = View,
+ view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1107,11 +1110,11 @@ maybe_erase_aliases(State = #state { self = Self,
end, {[], MembersState}, Aliases),
State1 = State #state { members_state = MembersState1 },
case Erasable of
- [] -> {ok, State1};
+ [] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
erase_members_in_group(Erasable, GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- State1 #state { view = View1 }}
+ {callback_view_changed(Args, Module, View0, View1),
+ check_neighbours(State1 #state { view = View1 })}
end.
can_erase_view_member(Self, Self, _LA, _LP) -> false;
@@ -1141,10 +1144,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1233,23 +1234,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1257,6 +1254,12 @@ make_member(GroupName) ->
{error, not_found} -> ?VERSION_START
end, self()}.
+remove_erased_members(MembersState, View) ->
+ lists:foldl(fun (Id, MembersState1) ->
+ store_member(Id, find_member_or_blank(Id, MembersState),
+ MembersState1)
+ end, blank_member_state(), all_known_members(View)).
+
get_pid({_Version, Pid}) -> Pid.
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
@@ -1265,16 +1268,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1287,16 +1286,30 @@ send_right(Right, View, Msg) ->
ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
- lists:foldl(
- fun ({Id, Pubs, _Acks}, ok) ->
- lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, get_pid(Id), Pub);
- (_, Error) ->
- Error
- end, ok, Pubs);
- (_, Error) ->
- Error
- end, ok, Activity).
+ Result =
+ lists:foldl(
+ fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
+ lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
+ case Module2:handle_msg(
+ Args2, get_pid(Id), Pub) of
+ ok ->
+ Acc;
+ {become, Module3, Args3} ->
+ {Args3, Module3, ok};
+ {stop, _Reason} = Error ->
+ Error
+ end;
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args1, Module1, ok}, Pubs);
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args, Module, ok}, Activity),
+ case Result of
+ {Args, Module, ok} -> ok;
+ {Args1, Module1, ok} -> {become, Module1, Args1};
+ {stop, _Reason} = Error -> Error
+ end.
callback_view_changed(Args, Module, OldView, NewView) ->
OldMembers = all_known_members(OldView),
@@ -1364,34 +1377,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 4fe93981..4fc488b8 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -225,8 +225,8 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2).
count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
-call(Sup, Msg) ->
- ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
+call(Sup, Msg) -> ?GEN_SERVER:call(mirroring(Sup), Msg, infinity).
+cast(Sup, Msg) -> ?GEN_SERVER:cast(mirroring(Sup), Msg).
find_call(Sup, Id, Msg) ->
Group = call(Sup, group),
@@ -237,7 +237,7 @@ find_call(Sup, Id, Msg) ->
%% immediately after the tx - we can't be 100% here. So we may as
%% well dirty_select.
case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of
- [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity);
+ [Mirror] -> call(Mirror, Msg);
[] -> {error, not_found}
end.
@@ -246,13 +246,16 @@ fold(FunAtom, Sup, AggFun) ->
lists:foldl(AggFun, [],
[apply(?SUPERVISOR, FunAtom, [D]) ||
M <- ?PG2:get_members(Group),
- D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]).
+ D <- [delegate(M)]]).
child(Sup, Id) ->
[Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup),
Id1 =:= Id],
Pid.
+delegate(Sup) -> child(Sup, delegate).
+mirroring(Sup) -> child(Sup, mirroring).
+
%%----------------------------------------------------------------------------
start_internal(Group, ChildSpecs) ->
@@ -288,28 +291,29 @@ handle_call({init, Overall}, _From,
initial_childspecs = ChildSpecs}) ->
process_flag(trap_exit, true),
?PG2:create(Group),
- ok = ?PG2:join(Group, self()),
- Rest = ?PG2:get_members(Group) -- [self()],
+ ok = ?PG2:join(Group, Overall),
+ Rest = ?PG2:get_members(Group) -- [Overall],
case Rest of
[] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end);
_ -> ok
end,
[begin
- ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}),
+ ?GEN_SERVER:cast(mirroring(Pid), {ensure_monitoring, Overall}),
erlang:monitor(process, Pid)
end || Pid <- Rest],
- Delegate = child(Overall, delegate),
+ Delegate = delegate(Overall),
erlang:monitor(process, Delegate),
State1 = State#state{overall = Overall, delegate = Delegate},
- case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
+ case errors([maybe_start(Group, Overall, Delegate, S) || S <- ChildSpecs]) of
[] -> {reply, ok, State1};
Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
- State = #state{delegate = Delegate,
+ State = #state{overall = Overall,
+ delegate = Delegate,
group = Group}) ->
- {reply, case maybe_start(Group, Delegate, ChildSpec) of
+ {reply, case maybe_start(Group, Overall, Delegate, ChildSpec) of
already_in_mnesia -> {error, already_present};
{already_in_mnesia, Pid} -> {error, {already_started, Pid}};
Else -> Else
@@ -322,9 +326,6 @@ handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate,
handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
{reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
-handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) ->
- {reply, Delegate, State};
-
handle_call(group, _From, State = #state{group = Group}) ->
{reply, Group, State};
@@ -343,7 +344,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _Ref, process, Pid, Reason},
- State = #state{delegate = Pid, group = Group}) ->
+ State = #state{overall = Pid, group = Group}) ->
%% Since the delegate is temporary, its death won't cause us to
%% die. Since the overall supervisor kills processes in reverse
%% order when shutting down "from above" and we started after the
@@ -357,15 +358,16 @@ handle_info({'DOWN', _Ref, process, Pid, Reason},
{stop, Reason, State};
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{delegate = Delegate, group = Group}) ->
+ State = #state{delegate = Delegate, group = Group,
+ overall = O}) ->
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
- Self = self(),
R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, ChildSpecs} =
- mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
- _ -> []
+ [O | _] -> {atomic, ChildSpecs} =
+ mnesia:transaction(
+ fun() -> update_all(O, Pid) end),
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
end,
case errors(R) of
[] -> {noreply, State};
@@ -384,13 +386,11 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
tell_all_peers_to_die(Group, Reason) ->
- [?GEN_SERVER:cast(P, {die, Reason}) ||
- P <- ?PG2:get_members(Group) -- [self()]].
+ [cast(P, {die, Reason}) || P <- ?PG2:get_members(Group) -- [self()]].
-maybe_start(Group, Delegate, ChildSpec) ->
- case mnesia:transaction(fun() ->
- check_start(Group, Delegate, ChildSpec)
- end) of
+maybe_start(Group, Overall, Delegate, ChildSpec) ->
+ case mnesia:transaction(
+ fun() -> check_start(Group, Overall, Delegate, ChildSpec) end) of
{atomic, start} -> start(Delegate, ChildSpec);
{atomic, undefined} -> already_in_mnesia;
{atomic, Pid} -> {already_in_mnesia, Pid};
@@ -398,31 +398,29 @@ maybe_start(Group, Delegate, ChildSpec) ->
{aborted, E} -> {error, E}
end.
-check_start(Group, Delegate, ChildSpec) ->
+check_start(Group, Overall, Delegate, ChildSpec) ->
case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of
- [] -> write(Group, ChildSpec),
+ [] -> write(Group, Overall, ChildSpec),
start;
[S] -> #mirrored_sup_childspec{key = {Group, Id},
mirroring_pid = Pid} = S,
- case self() of
+ case Overall of
Pid -> child(Delegate, Id);
_ -> case supervisor(Pid) of
- dead -> write(Group, ChildSpec),
+ dead -> write(Group, Overall, ChildSpec),
start;
Delegate0 -> child(Delegate0, Id)
end
end
end.
-supervisor(Pid) ->
- with_exit_handler(
- fun() -> dead end,
- fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end).
+supervisor(Pid) -> with_exit_handler(fun() -> dead end,
+ fun() -> delegate(Pid) end).
-write(Group, ChildSpec) ->
+write(Group, Overall, ChildSpec) ->
ok = mnesia:write(
#mirrored_sup_childspec{key = {Group, id(ChildSpec)},
- mirroring_pid = self(),
+ mirroring_pid = Overall,
childspec = ChildSpec}),
ChildSpec.
@@ -448,12 +446,12 @@ check_stop(Group, Delegate, Id) ->
id({Id, _, _, _, _, _}) -> Id.
-update_all(OldPid) ->
- MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid,
+update_all(Overall, OldOverall) ->
+ MatchHead = #mirrored_sup_childspec{mirroring_pid = OldOverall,
key = '$1',
childspec = '$2',
_ = '_'},
- [write(Group, C) ||
+ [write(Group, Overall, C) ||
[{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])].
delete_all(Group) ->
@@ -467,8 +465,7 @@ errors(Results) -> [E || {error, E} <- Results].
%%----------------------------------------------------------------------------
-create_tables() ->
- create_tables([?TABLE_DEF]).
+create_tables() -> create_tables([?TABLE_DEF]).
create_tables([]) ->
ok;
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 60192b34..f8cbd853 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -157,7 +157,7 @@ test_no_migration_on_shutdown() ->
with_sups(fun([Evil, _]) ->
?MS:start_child(Evil, childspec(worker)),
try
- call(worker, ping),
+ call(worker, ping, 1000, 100),
exit(worker_should_not_have_migrated)
catch exit:{timeout_waiting_for_server, _, _} ->
ok
@@ -268,7 +268,7 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 1000, 100).
+call(Id, Msg) -> call(Id, Msg, 10*1000, 100).
call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
@@ -285,7 +285,7 @@ kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]);
kill(Pid, Waits) ->
erlang:monitor(process, Pid),
[erlang:monitor(process, P) || P <- Waits],
- exit(Pid, kill),
+ exit(Pid, bang),
kill_wait(Pid),
[kill_wait(P) || P <- Waits].
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ea9731b6..fda489fe 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -18,9 +18,9 @@
-behaviour(application).
--export([maybe_hipe_compile/0, prepare/0, start/0, stop/0, stop_and_halt/0,
- status/0, is_running/0, is_running/1, environment/0,
- rotate_logs/1, force_event_refresh/0]).
+-export([start/0, boot/0, stop/0,
+ stop_and_halt/0, await_startup/0, status/0, is_running/0,
+ is_running/1, environment/0, rotate_logs/1, force_event_refresh/0]).
-export([start/2, stop/1]).
@@ -60,7 +60,8 @@
-rabbit_boot_step({worker_pool,
[{description, "worker pool"},
- {mfa, {rabbit_sup, start_child, [worker_pool_sup]}},
+ {mfa, {rabbit_sup, start_supervisor_child,
+ [worker_pool_sup]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).
@@ -143,7 +144,8 @@
-rabbit_boot_step({mirror_queue_slave_sup,
[{description, "mirror queue slave sup"},
- {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {mfa, {rabbit_sup, start_supervisor_child,
+ [rabbit_mirror_queue_slave_sup]}},
{requires, recovery},
{enables, routing_ready}]}).
@@ -197,7 +199,8 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
+ ssl_connection, ssl_record, gen_fsm, ssl]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -214,11 +217,11 @@
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
--spec(maybe_hipe_compile/0 :: () -> 'ok').
--spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
+-spec(boot/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> no_return()).
+-spec(await_startup/0 :: () -> 'ok').
-spec(status/0 ::
() -> [{pid, integer()} |
{running_applications, [{atom(), string(), string()}]} |
@@ -261,7 +264,7 @@ maybe_hipe_compile() ->
hipe_compile() ->
Count = length(?HIPE_WORTHY),
- io:format("HiPE compiling: |~s|~n |",
+ io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
PidMRefs = [spawn_monitor(fun () -> [begin
@@ -283,29 +286,51 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
split0([], Ls) -> Ls;
split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]).
-prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_upgrade:maybe_upgrade_mnesia().
+ensure_application_loaded() ->
+ %% We end up looking at the rabbit app's env for HiPE and log
+ %% handling, so it needs to be loaded. But during the tests, it
+ %% may end up getting loaded twice, so guard against that.
+ case application:load(rabbit) of
+ ok -> ok;
+ {error, {already_loaded, rabbit}} -> ok
+ end.
start() ->
+ start_it(fun() ->
+ %% We do not want to HiPE compile or upgrade
+ %% mnesia after just restarting the app
+ ok = ensure_application_loaded(),
+ ok = ensure_working_log_handlers(),
+ ok = app_utils:start_applications(app_startup_order()),
+ ok = print_plugin_info(rabbit_plugins:active())
+ end).
+
+boot() ->
+ start_it(fun() ->
+ ok = ensure_application_loaded(),
+ maybe_hipe_compile(),
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ ok = app_utils:load_applications(ToBeLoaded),
+ StartupApps = app_utils:app_dependency_order(ToBeLoaded,
+ false),
+ ok = app_utils:start_applications(StartupApps),
+ ok = print_plugin_info(Plugins)
+ end).
+
+start_it(StartFun) ->
try
- %% prepare/1 ends up looking at the rabbit app's env, so it
- %% needs to be loaded, but during the tests, it may end up
- %% getting loaded twice, so guard against that
- case application:load(rabbit) of
- ok -> ok;
- {error, {already_loaded, rabbit}} -> ok
- end,
- ok = prepare(),
- ok = rabbit_misc:start_applications(application_load_order())
+ StartFun()
after
- %%give the error loggers some time to catch up
+ %% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
rabbit_log:info("Stopping Rabbit~n"),
- ok = rabbit_misc:stop_applications(application_load_order()).
+ ok = app_utils:stop_applications(app_shutdown_order()).
stop_and_halt() ->
try
@@ -316,6 +341,9 @@ stop_and_halt() ->
end,
ok.
+await_startup() ->
+ app_utils:wait_for_applications(app_startup_order()).
+
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
{running_applications, application:which_applications(infinity)},
@@ -392,46 +420,13 @@ stop(_State) ->
%%---------------------------------------------------------------------------
%% application life cycle
-application_load_order() ->
- ok = load_applications(),
- {ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{Dep, App} || Dep <- Deps] end,
- [{App, app_dependencies(App)} ||
- {App, _Desc, _Vsn} <- application:loaded_applications()]),
- true = digraph:del_vertices(
- G, digraph:vertices(G) -- digraph_utils:reachable(?APPS, G)),
- Result = digraph_utils:topsort(G),
- true = digraph:delete(G),
- Result.
-
-load_applications() ->
- load_applications(queue:from_list(?APPS), sets:new()).
-
-load_applications(Worklist, Loaded) ->
- case queue:out(Worklist) of
- {empty, _WorkList} ->
- ok;
- {{value, App}, Worklist1} ->
- case sets:is_element(App, Loaded) of
- true -> load_applications(Worklist1, Loaded);
- false -> case application:load(App) of
- ok -> ok;
- {error, {already_loaded, App}} -> ok;
- Error -> throw(Error)
- end,
- load_applications(
- queue:join(Worklist1,
- queue:from_list(app_dependencies(App))),
- sets:add_element(App, Loaded))
- end
- end.
+app_startup_order() ->
+ ok = app_utils:load_applications(?APPS),
+ app_utils:app_dependency_order(?APPS, false).
-app_dependencies(App) ->
- case application:get_key(App, applications) of
- undefined -> [];
- {ok, Lst} -> Lst
- end.
+app_shutdown_order() ->
+ Apps = ?APPS ++ rabbit_plugins:active(),
+ app_utils:app_dependency_order(Apps, true).
%%---------------------------------------------------------------------------
%% boot step logic
@@ -477,7 +472,8 @@ sort_boot_steps(UnsortedSteps) ->
%% there is one, otherwise fail).
SortedSteps = lists:reverse(
[begin
- {StepName, Step} = digraph:vertex(G, StepName),
+ {StepName, Step} = digraph:vertex(G,
+ StepName),
Step
end || StepName <- digraph_utils:topsort(G)]),
digraph:delete(G),
@@ -538,7 +534,7 @@ boot_error(Format, Args) ->
boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
- rabbit_sup:start_child(delegate_sup, [Count]).
+ rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
@@ -559,7 +555,8 @@ insert_default_data() ->
ok = rabbit_vhost:add(DefaultVHost),
ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags),
- ok = rabbit_auth_backend_internal:set_permissions(DefaultUser, DefaultVHost,
+ ok = rabbit_auth_backend_internal:set_permissions(DefaultUser,
+ DefaultVHost,
DefaultConfigurePerm,
DefaultWritePerm,
DefaultReadPerm),
@@ -647,6 +644,24 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
%% misc
+print_plugin_info([]) ->
+ ok;
+print_plugin_info(Plugins) ->
+ %% This gets invoked by rabbitmqctl start_app, outside the context
+ %% of the rabbit application
+ rabbit_misc:with_local_io(
+ fun() ->
+ io:format("~n-- plugins running~n"),
+ [print_plugin_info(
+ AppName, element(2, application:get_key(AppName, vsn)))
+ || AppName <- Plugins],
+ ok
+ end).
+
+print_plugin_info(Plugin, Vsn) ->
+ Len = 76 - length(Vsn),
+ io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]).
+
erts_version_check() ->
FoundVer = erlang:system_info(version),
case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 04e0c141..d16d90a4 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -162,17 +162,17 @@ maybe_alert(UpdateFun, Node, Source,
end,
State#alarms{alarmed_nodes = AN1}.
-alert_local(Alert, Alertees, _Source) ->
- alert(Alertees, [Alert], fun erlang:'=:='/2).
+alert_local(Alert, Alertees, Source) ->
+ alert(Alertees, Source, Alert, fun erlang:'=:='/2).
alert_remote(Alert, Alertees, Source) ->
- alert(Alertees, [Source, Alert], fun erlang:'=/='/2).
+ alert(Alertees, Source, Alert, fun erlang:'=/='/2).
-alert(Alertees, AlertArg, NodeComparator) ->
+alert(Alertees, Source, Alert, NodeComparator) ->
Node = node(),
dict:fold(fun (Pid, {M, F, A}, ok) ->
case NodeComparator(Node, node(Pid)) of
- true -> apply(M, F, A ++ [Pid] ++ AlertArg);
+ true -> apply(M, F, A ++ [Pid, Source, Alert]);
false -> ok
end
end, ok, Alertees).
@@ -181,7 +181,7 @@ internal_register(Pid, {M, F, A} = HighMemMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
- {ok, _Sources} -> apply(M, F, A ++ [Pid, true]);
+ {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
error -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5701efeb..f2833c26 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -984,8 +984,6 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
- {basic_consume, _, _, _, _, _, _} -> 7;
- {basic_cancel, _, _, _} -> 7;
stat -> 7;
_ -> 0
end.
@@ -995,10 +993,6 @@ prioritise_cast(Msg, _State) ->
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
- {ack, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid, _Credit} -> 7;
- {unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control_main.erl
index 2dea2a2f..2e163cfb 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control_main.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
--module(rabbit_control).
+-module(rabbit_control_main).
-include("rabbit.hrl").
-export([start/0, stop/0, action/5]).
@@ -26,6 +26,61 @@
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(QUIET_DEF, {?QUIET_OPT, flag}).
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
+-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+
+-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
+
+-define(COMMANDS,
+ [stop,
+ stop_app,
+ start_app,
+ wait,
+ reset,
+ force_reset,
+ rotate_logs,
+
+ cluster,
+ force_cluster,
+ cluster_status,
+
+ add_user,
+ delete_user,
+ change_password,
+ clear_password,
+ set_user_tags,
+ list_users,
+
+ add_vhost,
+ delete_vhost,
+ list_vhosts,
+ {set_permissions, [?VHOST_DEF]},
+ {clear_permissions, [?VHOST_DEF]},
+ {list_permissions, [?VHOST_DEF]},
+ list_user_permissions,
+
+ set_parameter,
+ clear_parameter,
+ list_parameters,
+
+ {list_queues, [?VHOST_DEF]},
+ {list_exchanges, [?VHOST_DEF]},
+ {list_bindings, [?VHOST_DEF]},
+ {list_connections, [?VHOST_DEF]},
+ list_channels,
+ {list_consumers, [?VHOST_DEF]},
+ status,
+ environment,
+ report,
+ eval,
+
+ close_connection,
+ {trace_on, [?VHOST_DEF]},
+ {trace_off, [?VHOST_DEF]},
+ set_vm_memory_high_watermark
+ ]).
+
-define(GLOBAL_QUERIES,
[{"Connections", rabbit_networking, connection_info_all,
connection_info_keys},
@@ -57,19 +112,18 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
- {[Command0 | Args], Opts} =
- case rabbit_misc:get_options([{flag, ?QUIET_OPT},
- {option, ?NODE_OPT, NodeStr},
- {option, ?VHOST_OPT, "/"}],
- init:get_plain_arguments()) of
- {[], _Opts} -> usage();
- CmdArgsAndOpts -> CmdArgsAndOpts
+ {Command, Opts, Args} =
+ case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
+ init:get_plain_arguments())
+ of
+ {ok, Res} -> Res;
+ no_command -> print_error("could not recognise command", []),
+ usage()
end,
Opts1 = [case K of
?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
_ -> {K, V}
end || {K, V} <- Opts],
- Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
Inform = case Quiet of
@@ -194,8 +248,7 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, PidFile, rabbit, Inform);
-
+ wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
action(wait, Node, [PidFile, App], _Opts, Inform) ->
Inform("Waiting for ~p on ~p", [App, Node]),
wait_for_application(Node, PidFile, list_to_atom(App), Inform);
@@ -407,12 +460,22 @@ wait_for_application(Node, PidFile, Application, Inform) ->
Inform("pid is ~s", [Pid]),
wait_for_application(Node, Pid, Application).
+wait_for_application(Node, Pid, rabbit_and_plugins) ->
+ wait_for_startup(Node, Pid);
wait_for_application(Node, Pid, Application) ->
+ while_process_is_alive(
+ Node, Pid, fun() -> rabbit_nodes:is_running(Node, Application) end).
+
+wait_for_startup(Node, Pid) ->
+ while_process_is_alive(
+ Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
+
+while_process_is_alive(Node, Pid, Activity) ->
case process_up(Pid) of
- true -> case rabbit_nodes:is_running(Node, Application) of
+ true -> case Activity() of
true -> ok;
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
- wait_for_application(Node, Pid, Application)
+ while_process_is_alive(Node, Pid, Activity)
end;
false -> {error, process_not_running}
end.
@@ -427,12 +490,14 @@ wait_for_process_death(Pid) ->
read_pid_file(PidFile, Wait) ->
case {file:read_file(PidFile), Wait} of
{{ok, Bin}, _} ->
- S = string:strip(binary_to_list(Bin), right, $\n),
- try list_to_integer(S)
+ S = binary_to_list(Bin),
+ {match, [PidS]} = re:run(S, "[^\\s]+",
+ [{capture, all, list}]),
+ try list_to_integer(PidS)
catch error:badarg ->
exit({error, {garbage_in_pid_file, PidFile}})
end,
- S;
+ PidS;
{{error, enoent}, true} ->
timer:sleep(?EXTERNAL_CHECK_INTERVAL),
read_pid_file(PidFile, Wait);
@@ -444,8 +509,7 @@ read_pid_file(PidFile, Wait) ->
% rpc:call(os, getpid, []) at this point
process_up(Pid) ->
with_os([{unix, fun () ->
- system("ps -p " ++ Pid
- ++ " >/dev/null 2>&1") =:= 0
+ run_ps(Pid) =:= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -463,15 +527,17 @@ with_os(Handlers) ->
Handler -> Handler()
end.
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
+run_ps(Pid) ->
+ Port = erlang:open_port({spawn, "ps -p " ++ Pid},
+ [exit_status, {line, 16384},
+ use_stdio, stderr_to_stdout]),
+ exit_loop(Port).
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+exit_loop(Port) ->
+ receive
+ {Port, {exit_status, Rc}} -> Rc;
+ {Port, _} -> exit_loop(Port)
+ end.
format_parse_error({_Line, Mod, Err}) ->
lists:flatten(Mod:format_error(Err)).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a471d282..c07ad832 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -47,16 +47,10 @@
%%----------------------------------------------------------------------------
-boot() ->
- {ok, _} =
- supervisor2:start_child(
- rabbit_sup,
- {rabbit_direct_client_sup,
- {rabbit_client_sup, start_link,
+boot() -> rabbit_sup:start_supervisor_child(
+ rabbit_direct_client_sup, rabbit_client_sup,
[{local, rabbit_direct_client_sup},
- {rabbit_channel_sup, start_link, []}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
- ok.
+ {rabbit_channel_sup, start_link, []}]).
force_event_refresh() ->
[Pid ! force_event_refresh || Pid<- list()],
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b1750b61..d9e8e8e4 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -178,8 +178,9 @@ parse_free_unix(CommandResult) ->
parse_free_win32(CommandResult) ->
LastLine = lists:last(string:tokens(CommandResult, "\r\n")),
- [_, _Dir, Free, "bytes", "free"] = string:tokens(LastLine, " "),
- list_to_integer(Free).
+ {match, [Free]} = re:run(lists:reverse(LastLine), "(\\d+)",
+ [{capture, all_but_first, list}]),
+ list_to_integer(lists:reverse(Free)).
interpret_limit({mem_relative, R}) ->
round(R * vm_memory_monitor:get_total_memory());
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 17e2ffb4..71e0507a 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -354,7 +354,10 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
+
+handle_cast({delete_and_terminate, Reason}, State) ->
+ {stop, Reason, State}.
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
gm:broadcast(GM, heartbeat),
@@ -402,6 +405,9 @@ handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
+handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg),
+ {stop, Reason};
handle_msg([_CPid], _From, _Msg) ->
ok.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 8eacb1f3..a2034876 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
--export([start/0, start_link/0, start_child/2]).
+-export([start_link/0, start_child/2]).
-export([init/1]).
@@ -26,20 +26,9 @@
-define(SERVER, ?MODULE).
-start() ->
- {ok, _} =
- supervisor2:start_child(
- rabbit_sup,
- {rabbit_mirror_queue_slave_sup,
- {rabbit_mirror_queue_slave_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
- ok.
+start_link() -> supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-start_link() ->
- supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
-
-start_child(Node, Args) ->
- supervisor2:start_child({?SERVER, Node}, Args).
+start_child(Node, Args) -> supervisor2:start_child({?SERVER, Node}, Args).
init([]) ->
{ok, {{simple_one_for_one_terminate, 10, 10},
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 706de835..d41aa09b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, amqp_error/4,
+-export([die/1, frame_error/2, amqp_error/4, quit/1, quit/2,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
@@ -42,14 +42,13 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([format/2, format_many/1, format_stderr/2]).
-export([with_local_io/1, local_info_msg/2]).
--export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
--export([get_options/2]).
+-export([parse_arguments/3]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([const_ok/0, const/1]).
@@ -59,7 +58,6 @@
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
--export([quit/1]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
@@ -72,7 +70,7 @@
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
-type(resource_name() :: binary()).
--type(optdef() :: {flag, string()} | {option, string(), any()}).
+-type(optdef() :: flag | {option, string()}).
-type(channel_or_connection_exit()
:: rabbit_types:channel_exit() | rabbit_types:connection_exit()).
-type(digraph_label() :: term()).
@@ -87,6 +85,10 @@
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 ::
(rabbit_framing:amqp_exception()) -> channel_or_connection_exit()).
+
+-spec(quit/1 :: (integer()) -> no_return()).
+-spec(quit/2 :: (string(), [term()]) -> no_return()).
+
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
-> rabbit_types:connection_exit()).
-spec(amqp_error/4 ::
@@ -163,8 +165,6 @@
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
--spec(start_applications/1 :: ([atom()]) -> 'ok').
--spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
@@ -182,8 +182,12 @@
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
--spec(get_options/2 :: ([optdef()], [string()])
- -> {[string()], [{string(), any()}]}).
+-spec(parse_arguments/3 ::
+ ([{atom(), [{string(), optdef()}]} | atom()],
+ [{string(), optdef()}],
+ [string()])
+ -> {'ok', {atom(), [{string(), string()}], [string()]}} |
+ 'no_command').
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
@@ -206,7 +210,6 @@
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
--spec(quit/1 :: (integer() | string()) -> no_return()).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
@@ -387,6 +390,28 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
confirm_to_sender(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
+%%
+%% @doc Halts the emulator after printing out an error message io-formatted with
+%% the supplied arguments. The exit status of the beam process will be set to 1.
+%%
+quit(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ quit(1).
+
+%%
+%% @doc Halts the emulator returning the given status code to the os.
+%% On Windows this function will block indefinitely so as to give the io
+%% subsystem time to flush stdout completely.
+%%
+quit(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
+ end.
+
throw_on_error(E, Thunk) ->
case Thunk() of
{error, Reason} -> throw({E, Reason});
@@ -589,34 +614,6 @@ with_local_io(Fun) ->
local_info_msg(Format, Args) ->
with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
- Iterate(fun (App, Acc) ->
- case Do(App) of
- ok -> [App | Acc];
- {error, {SkipError, _}} -> Acc;
- {error, Reason} ->
- lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
- end
- end, [], Apps),
- ok.
-
-start_applications(Apps) ->
- manage_applications(fun lists:foldl/3,
- fun application:start/1,
- fun application:stop/1,
- already_started,
- cannot_start_application,
- Apps).
-
-stop_applications(Apps) ->
- manage_applications(fun lists:foldr/3,
- fun application:stop/1,
- fun application:start/1,
- not_started,
- cannot_stop_application,
- Apps).
-
unfold(Fun, Init) ->
unfold(Fun, [], Init).
@@ -736,39 +733,63 @@ gb_trees_fold1(Fun, Acc, {Key, Val, It}) ->
gb_trees_foreach(Fun, Tree) ->
gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree).
-%% Separate flags and options from arguments.
-%% get_options([{flag, "-q"}, {option, "-p", "/"}],
-%% ["set_permissions","-p","/","guest",
-%% "-q",".*",".*",".*"])
-%% == {["set_permissions","guest",".*",".*",".*"],
-%% [{"-q",true},{"-p","/"}]}
-get_options(Defs, As) ->
- lists:foldl(fun(Def, {AsIn, RsIn}) ->
- {K, {AsOut, V}} =
- case Def of
- {flag, Key} ->
- {Key, get_flag(Key, AsIn)};
- {option, Key, Default} ->
- {Key, get_option(Key, Default, AsIn)}
- end,
- {AsOut, [{K, V} | RsIn]}
- end, {As, []}, Defs).
-
-get_option(K, _Default, [K, V | As]) ->
- {As, V};
-get_option(K, Default, [Nk | As]) ->
- {As1, V} = get_option(K, Default, As),
- {[Nk | As1], V};
-get_option(_, Default, As) ->
- {As, Default}.
-
-get_flag(K, [K | As]) ->
- {As, true};
-get_flag(K, [Nk | As]) ->
- {As1, V} = get_flag(K, As),
- {[Nk | As1], V};
-get_flag(_, []) ->
- {[], false}.
+%% Takes:
+%% * A list of [{atom(), [{string(), optdef()]} | atom()], where the atom()s
+%% are the accepted commands and the optional [string()] is the list of
+%% accepted options for that command
+%% * A list [{string(), optdef()}] of options valid for all commands
+%% * The list of arguments given by the user
+%%
+%% Returns either {ok, {atom(), [{string(), string()}], [string()]} which are
+%% respectively the command, the key-value pairs of the options and the leftover
+%% arguments; or no_command if no command could be parsed.
+parse_arguments(Commands, GlobalDefs, As) ->
+ lists:foldl(maybe_process_opts(GlobalDefs, As), no_command, Commands).
+
+maybe_process_opts(GDefs, As) ->
+ fun({C, Os}, no_command) ->
+ process_opts(atom_to_list(C), dict:from_list(GDefs ++ Os), As);
+ (C, no_command) ->
+ (maybe_process_opts(GDefs, As))({C, []}, no_command);
+ (_, {ok, Res}) ->
+ {ok, Res}
+ end.
+
+process_opts(C, Defs, As0) ->
+ KVs0 = dict:map(fun (_, flag) -> false;
+ (_, {option, V}) -> V
+ end, Defs),
+ process_opts(Defs, C, As0, not_found, KVs0, []).
+
+%% Consume flags/options until you find the correct command. If there are no
+%% arguments or the first argument is not the command we're expecting, fail.
+%% Arguments to this are: definitions, cmd we're looking for, args we
+%% haven't parsed, whether we have found the cmd, options we've found,
+%% plain args we've found.
+process_opts(_Defs, C, [], found, KVs, Outs) ->
+ {ok, {list_to_atom(C), dict:to_list(KVs), lists:reverse(Outs)}};
+process_opts(_Defs, _C, [], not_found, _, _) ->
+ no_command;
+process_opts(Defs, C, [A | As], Found, KVs, Outs) ->
+ OptType = case dict:find(A, Defs) of
+ error -> none;
+ {ok, flag} -> flag;
+ {ok, {option, _}} -> option
+ end,
+ case {OptType, C, Found} of
+ {flag, _, _} -> process_opts(
+ Defs, C, As, Found, dict:store(A, true, KVs),
+ Outs);
+ {option, _, _} -> case As of
+ [] -> no_command;
+ [V | As1] -> process_opts(
+ Defs, C, As1, Found,
+ dict:store(A, V, KVs), Outs)
+ end;
+ {none, A, _} -> process_opts(Defs, C, As, found, KVs, Outs);
+ {none, _, found} -> process_opts(Defs, C, As, found, KVs, [A | Outs]);
+ {none, _, _} -> no_command
+ end.
now_ms() ->
timer:now_diff(now(), {0,0,0}) div 1000.
@@ -909,13 +930,6 @@ receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
end.
-%% the slower shutdown on windows required to flush stdout
-quit(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status)
- end.
-
os_cmd(Command) ->
Exec = hd(string:tokens(Command, " ")),
case os:find_executable(Exec) of
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 1a12d43b..bedf5142 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
- connection_string/2]).
+ tune_buffer_size/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -69,6 +69,7 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
@@ -159,6 +160,13 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+tune_buffer_size(Sock) ->
+ case getopts(Sock, [sndbuf, recbuf, buffer]) of
+ {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+ setopts(Sock, [{buffer, BufSz}]);
+ Err -> Err
+ end.
+
connection_string(Sock, Direction) ->
{From, To} = case Direction of
inbound -> {fun peername/1, fun sockname/1};
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f0c75d23..94a5a2b7 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -136,18 +136,13 @@ boot_ssl() ->
ok
end.
-start() ->
- {ok,_} = supervisor2:start_child(
- rabbit_sup,
- {rabbit_tcp_client_sup,
- {rabbit_client_sup, start_link,
- [{local, rabbit_tcp_client_sup},
- {rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
- ok.
+start() -> rabbit_sup:start_supervisor_child(
+ rabbit_tcp_client_sup, rabbit_client_sup,
+ [{local, rabbit_tcp_client_sup},
+ {rabbit_connection_sup,start_link,[]}]).
ensure_ssl() ->
- ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
+ ok = app_utils:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
% unknown_ca errors are silently ignored prior to R14B unless we
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 00880fb2..7cf6eea9 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -17,146 +17,57 @@
-module(rabbit_plugins).
-include("rabbit.hrl").
--export([start/0, stop/0, find_plugins/1, read_enabled_plugins/1,
- lookup_plugins/2, calculate_required_plugins/2, plugin_names/1]).
+-export([setup/0, active/0, read_enabled/1,
+ list/1, dependencies/3]).
--define(VERBOSE_OPT, "-v").
--define(MINIMAL_OPT, "-m").
--define(ENABLED_OPT, "-E").
--define(ENABLED_ALL_OPT, "-e").
+-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
+-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
+-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
+-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
+-define(GLOBAL_DEFS, []).
--spec(start/0 :: () -> no_return()).
--spec(stop/0 :: () -> 'ok').
--spec(find_plugins/1 :: (file:filename()) -> [#plugin{}]).
--spec(read_enabled_plugins/1 :: (file:filename()) -> [atom()]).
--spec(lookup_plugins/2 :: ([atom()], [#plugin{}]) -> [#plugin{}]).
--spec(calculate_required_plugins/2 :: ([atom()], [#plugin{}]) -> [atom()]).
--spec(plugin_names/1 :: ([#plugin{}]) -> [atom()]).
-
--endif.
+-define(COMMANDS,
+ [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
+ enable,
+ disable]).
%%----------------------------------------------------------------------------
-start() ->
- {ok, [[PluginsFile|_]|_]} =
- init:get_argument(enabled_plugins_file),
- {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
- {[Command0 | Args], Opts} =
- case rabbit_misc:get_options([{flag, ?VERBOSE_OPT},
- {flag, ?MINIMAL_OPT},
- {flag, ?ENABLED_OPT},
- {flag, ?ENABLED_ALL_OPT}],
- init:get_plain_arguments()) of
- {[], _Opts} -> usage();
- CmdArgsAndOpts -> CmdArgsAndOpts
- end,
- Command = list_to_atom(Command0),
- PrintInvalidCommandError =
- fun () ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")])
- end,
-
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
- ok ->
- rabbit_misc:quit(0);
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- PrintInvalidCommandError(),
- usage();
- {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
- PrintInvalidCommandError(),
- usage();
- {error, Reason} ->
- print_error("~p", [Reason]),
- rabbit_misc:quit(2);
- {error_string, Reason} ->
- print_error("~s", [Reason]),
- rabbit_misc:quit(2);
- Other ->
- print_error("~p", [Other]),
- rabbit_misc:quit(2)
- end.
-
-stop() ->
- ok.
-
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
-
-usage() ->
- io:format("~s", [rabbit_plugins_usage:usage()]),
- rabbit_misc:quit(1).
-
-%%----------------------------------------------------------------------------
+-ifdef(use_specs).
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
- format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+-spec(setup/0 :: () -> [atom()]).
+-spec(active/0 :: () -> [atom()]).
+-spec(list/1 :: (string()) -> [#plugin{}]).
+-spec(read_enabled/1 :: (file:filename()) -> [atom()]).
+-spec(dependencies/3 ::
+ (boolean(), [atom()], [#plugin{}]) -> [atom()]).
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
- case ToEnable0 of
- [] -> throw({error_string, "Not enough arguments for 'enable'"});
- _ -> ok
- end,
- AllPlugins = find_plugins(PluginsDir),
- Enabled = read_enabled_plugins(PluginsFile),
- ImplicitlyEnabled = calculate_required_plugins(Enabled, AllPlugins),
- ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
- Missing = ToEnable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> throw({error_string,
- fmt_list("The following plugins could not be found:",
- Missing)})
- end,
- NewEnabled = lists:usort(Enabled ++ ToEnable),
- write_enabled_plugins(PluginsFile, NewEnabled),
- NewImplicitlyEnabled = calculate_required_plugins(NewEnabled, AllPlugins),
- maybe_warn_mochiweb(NewImplicitlyEnabled),
- case NewEnabled -- ImplicitlyEnabled of
- [] -> io:format("Plugin configuration unchanged.~n");
- _ -> print_list("The following plugins have been enabled:",
- NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
- end;
-
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
- case ToDisable0 of
- [] -> throw({error_string, "Not enough arguments for 'disable'"});
- _ -> ok
- end,
- ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
- Enabled = read_enabled_plugins(PluginsFile),
- AllPlugins = find_plugins(PluginsDir),
- Missing = ToDisable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> print_list("Warning: the following plugins could not be found:",
- Missing)
- end,
- ToDisableDeps = calculate_dependencies(true, ToDisable, AllPlugins),
- NewEnabled = Enabled -- ToDisableDeps,
- case length(Enabled) =:= length(NewEnabled) of
- true -> io:format("Plugin configuration unchanged.~n");
- false -> ImplicitlyEnabled =
- calculate_required_plugins(Enabled, AllPlugins),
- NewImplicitlyEnabled =
- calculate_required_plugins(NewEnabled, AllPlugins),
- print_list("The following plugins have been disabled:",
- ImplicitlyEnabled -- NewImplicitlyEnabled),
- write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
- end.
+-endif.
%%----------------------------------------------------------------------------
-%% Get the #plugin{}s ready to be enabled.
-find_plugins(PluginsDir) ->
+%%
+%% @doc Prepares the file system and installs all enabled plugins.
+%%
+setup() ->
+ {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ {ok, EnabledPluginsFile} = application:get_env(rabbit,
+ enabled_plugins_file),
+ prepare_plugins(EnabledPluginsFile, PluginDir, ExpandDir),
+ [prepare_dir_plugin(PluginName) ||
+ PluginName <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+
+%% @doc Lists the plugins which are currently running.
+active() ->
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ InstalledPlugins = [ P#plugin.name || P <- list(ExpandDir) ],
+ [App || {App, _, _} <- application:which_applications(),
+ lists:member(App, InstalledPlugins)].
+
+%% @doc Get the list of plugins which are ready to be enabled.
+list(PluginsDir) ->
EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)],
FreeApps = [{app, App} ||
App <- filelib:wildcard("*/ebin/*.app", PluginsDir)],
@@ -175,6 +86,91 @@ find_plugins(PluginsDir) ->
end,
Plugins.
+%% @doc Read the list of enabled plugins from the supplied term file.
+read_enabled(PluginsFile) ->
+ case rabbit_file:read_term_file(PluginsFile) of
+ {ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+%%
+%% @doc Calculate the dependency graph from <i>Sources</i>.
+%% When Reverse =:= true the bottom/leaf level applications are returned in
+%% the resulting list, otherwise they're skipped.
+%%
+dependencies(Reverse, Sources, AllPlugins) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps}
+ || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ Dests = case Reverse of
+ false -> digraph_utils:reachable(Sources, G);
+ true -> digraph_utils:reaching(Sources, G)
+ end,
+ true = digraph:delete(G),
+ Dests.
+
+%%----------------------------------------------------------------------------
+
+prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
+ AllPlugins = list(PluginsDistDir),
+ Enabled = read_enabled(EnabledPluginsFile),
+ ToUnpack = dependencies(false, Enabled, AllPlugins),
+ ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
+
+ Missing = Enabled -- plugin_names(ToUnpackPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> io:format("Warning: the following enabled plugins were "
+ "not found: ~p~n", [Missing])
+ end,
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> rabbit_misc:quit("Could not delete dir ~s (~p)",
+ [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> rabbit_misc:quit("Could not create dir ~s (~p)",
+ [DestDir, E2])
+ end,
+
+ [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
+
+prepare_dir_plugin(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN),
+
+ %% We want the second-last token
+ NameTokens = string:tokens(PluginAppDescFn,"/."),
+ PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
+ list_to_atom(PluginNameString).
+
+%%----------------------------------------------------------------------------
+
+delete_recursively(Fn) ->
+ case rabbit_file:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
+ end.
+
+prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
+ zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ PluginsDestDir) ->
+ rabbit_file:recursive_copy(Location,
+ filename:join([PluginsDestDir, Name])).
+
%% Get the #plugin{} from an .ez.
get_plugin_info(Base, {ez, EZ0}) ->
EZ = filename:join([Base, EZ0]),
@@ -234,78 +230,6 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-%% Pretty print a list of plugins.
-format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
- Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
- Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
- Format = case {Verbose, Minimal} of
- {false, false} -> normal;
- {true, false} -> verbose;
- {false, true} -> minimal;
- {true, true} -> throw({error_string,
- "Cannot specify -m and -v together"})
- end,
- OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
- OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
-
- AvailablePlugins = find_plugins(PluginsDir),
- EnabledExplicitly = read_enabled_plugins(PluginsFile),
- EnabledImplicitly =
- calculate_required_plugins(EnabledExplicitly, AvailablePlugins) --
- EnabledExplicitly,
- {ok, RE} = re:compile(Pattern),
- Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins,
- re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or
- lists:member(Name, EnabledImplicitly));
- true -> true
- end],
- Plugins1 = usort_plugins(Plugins),
- MaxWidth = lists:max([length(atom_to_list(Name)) ||
- #plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
- MaxWidth) || P <- Plugins1],
- ok.
-
-format_plugin(#plugin{name = Name, version = Version,
- description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
- Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly)} of
- {true, false} -> "[E]";
- {false, true} -> "[e]";
- _ -> "[ ]"
- end,
- case Format of
- minimal -> io:format("~s~n", [Name]);
- normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
- "w ~s~n", [Glyph, Name, Version]);
- verbose -> io:format("~s ~w~n", [Glyph, Name]),
- io:format(" Version: \t~s~n", [Version]),
- case Deps of
- [] -> ok;
- _ -> io:format(" Dependencies:\t~p~n", [Deps])
- end,
- io:format(" Description:\t~s~n", [Description]),
- io:format("~n")
- end.
-
-print_list(Header, Plugins) ->
- io:format(fmt_list(Header, Plugins)).
-
-fmt_list(Header, Plugins) ->
- lists:flatten(
- [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
-
-usort_plugins(Plugins) ->
- lists:usort(fun plugins_cmp/2, Plugins).
-
-plugins_cmp(#plugin{name = N1, version = V1},
- #plugin{name = N2, version = V2}) ->
- {N1, V1} =< {N2, V2}.
-
%% Filter out applications that can be loaded *right now*.
filter_applications(Applications) ->
[Application || Application <- Applications,
@@ -328,72 +252,3 @@ plugin_names(Plugins) ->
%% Find plugins by name in a list of plugins.
lookup_plugins(Names, AllPlugins) ->
[P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)].
-
-%% Read the enabled plugin names from disk.
-read_enabled_plugins(PluginsFile) ->
- case rabbit_file:read_term_file(PluginsFile) of
- {ok, [Plugins]} -> Plugins;
- {ok, []} -> [];
- {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
- PluginsFile}});
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
- PluginsFile, Reason}})
- end.
-
-%% Write the enabled plugin names on disk.
-write_enabled_plugins(PluginsFile, Plugins) ->
- case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
- ok -> ok;
- {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
- PluginsFile, Reason}})
- end.
-
-calculate_required_plugins(Sources, AllPlugins) ->
- calculate_dependencies(false, Sources, AllPlugins).
-
-calculate_dependencies(Reverse, Sources, AllPlugins) ->
- {ok, G} = rabbit_misc:build_acyclic_graph(
- fun (App, _Deps) -> [{App, App}] end,
- fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- [{Name, Deps}
- || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
- Dests = case Reverse of
- false -> digraph_utils:reachable(Sources, G);
- true -> digraph_utils:reaching(Sources, G)
- end,
- true = digraph:delete(G),
- Dests.
-
-maybe_warn_mochiweb(Enabled) ->
- V = erlang:system_info(otp_release),
- case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
- true ->
- Stars = string:copies("*", 80),
- io:format("~n~n~s~n"
- " Warning: Mochiweb enabled and Erlang version ~s "
- "detected.~n"
- " Enabling plugins that depend on Mochiweb is not "
- "supported on this Erlang~n"
- " version. At least R13B01 is required.~n~n"
- " RabbitMQ will not start successfully in this "
- "configuration. You *must*~n"
- " disable the Mochiweb plugin, or upgrade Erlang.~n"
- "~s~n~n~n", [Stars, V, Stars]);
- false ->
- ok
- end.
-
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n"),
- case os:type() of
- {win32, _OsName} ->
- io:format("If you have RabbitMQ running as a service then you must"
- " reinstall by running~n rabbitmq-service.bat stop~n"
- " rabbitmq-service.bat install~n"
- " rabbitmq-service.bat start~n~n");
- _ ->
- ok
- end.
-
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
new file mode 100644
index 00000000..572cf150
--- /dev/null
+++ b/src/rabbit_plugins_main.erl
@@ -0,0 +1,273 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_plugins_main).
+-include("rabbit.hrl").
+
+-export([start/0, stop/0]).
+
+-define(VERBOSE_OPT, "-v").
+-define(MINIMAL_OPT, "-m").
+-define(ENABLED_OPT, "-E").
+-define(ENABLED_ALL_OPT, "-e").
+
+-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
+-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
+-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
+-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
+
+-define(GLOBAL_DEFS, []).
+
+-define(COMMANDS,
+ [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
+ enable,
+ disable]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(usage/0 :: () -> no_return()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ {ok, [[PluginsFile|_]|_]} =
+ init:get_argument(enabled_plugins_file),
+ {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
+ {Command, Opts, Args} =
+ case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
+ init:get_plain_arguments())
+ of
+ {ok, Res} -> Res;
+ no_command -> print_error("could not recognise command", []),
+ usage()
+ end,
+
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
+
+ case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ ok ->
+ rabbit_misc:quit(0);
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
+ usage();
+ {error, Reason} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
+ rabbit_misc:quit(2);
+ Other ->
+ print_error("~p", [Other]),
+ rabbit_misc:quit(2)
+ end.
+
+stop() ->
+ ok.
+
+%%----------------------------------------------------------------------------
+
+action(list, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+
+action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToEnable0 of
+ [] -> throw({error_string, "Not enough arguments for 'enable'"});
+ _ -> ok
+ end,
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ ImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ Enabled, AllPlugins),
+ ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
+ Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw({error_string,
+ fmt_list("The following plugins could not be found:",
+ Missing)})
+ end,
+ NewEnabled = lists:usort(Enabled ++ ToEnable),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
+ maybe_warn_mochiweb(NewImplicitlyEnabled),
+ case NewEnabled -- ImplicitlyEnabled of
+ [] -> io:format("Plugin configuration unchanged.~n");
+ _ -> print_list("The following plugins have been enabled:",
+ NewImplicitlyEnabled -- ImplicitlyEnabled),
+ report_change()
+ end;
+
+action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToDisable0 of
+ [] -> throw({error_string, "Not enough arguments for 'disable'"});
+ _ -> ok
+ end,
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
+ Enabled = rabbit_plugins:read_enabled(PluginsFile),
+ AllPlugins = rabbit_plugins:list(PluginsDir),
+ Missing = ToDisable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> print_list("Warning: the following plugins could not be found:",
+ Missing)
+ end,
+ ToDisableDeps = rabbit_plugins:dependencies(true, ToDisable, AllPlugins),
+ NewEnabled = Enabled -- ToDisableDeps,
+ case length(Enabled) =:= length(NewEnabled) of
+ true -> io:format("Plugin configuration unchanged.~n");
+ false -> ImplicitlyEnabled =
+ rabbit_plugins:dependencies(false, Enabled, AllPlugins),
+ NewImplicitlyEnabled =
+ rabbit_plugins:dependencies(false,
+ NewEnabled, AllPlugins),
+ print_list("The following plugins have been disabled:",
+ ImplicitlyEnabled -- NewImplicitlyEnabled),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ report_change()
+ end.
+
+%%----------------------------------------------------------------------------
+
+print_error(Format, Args) ->
+ rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+
+usage() ->
+ io:format("~s", [rabbit_plugins_usage:usage()]),
+ rabbit_misc:quit(1).
+
+%% Pretty print a list of plugins.
+format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+ Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
+ Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
+ Format = case {Verbose, Minimal} of
+ {false, false} -> normal;
+ {true, false} -> verbose;
+ {false, true} -> minimal;
+ {true, true} -> throw({error_string,
+ "Cannot specify -m and -v together"})
+ end,
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
+
+ AvailablePlugins = rabbit_plugins:list(PluginsDir),
+ EnabledExplicitly = rabbit_plugins:read_enabled(PluginsFile),
+ EnabledImplicitly =
+ rabbit_plugins:dependencies(false, EnabledExplicitly,
+ AvailablePlugins) -- EnabledExplicitly,
+ {ok, RE} = re:compile(Pattern),
+ Plugins = [ Plugin ||
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
+ re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> (lists:member(Name,
+ EnabledExplicitly) or
+ lists:member(Name, EnabledImplicitly));
+ true -> true
+ end],
+ Plugins1 = usort_plugins(Plugins),
+ MaxWidth = lists:max([length(atom_to_list(Name)) ||
+ #plugin{name = Name} <- Plugins1] ++ [0]),
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
+ MaxWidth) || P <- Plugins1],
+ ok.
+
+format_plugin(#plugin{name = Name, version = Version,
+ description = Description, dependencies = Deps},
+ EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ Glyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "[E]";
+ {false, true} -> "[e]";
+ _ -> "[ ]"
+ end,
+ case Format of
+ minimal -> io:format("~s~n", [Name]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
+ "w ~s~n", [Glyph, Name, Version]);
+ verbose -> io:format("~s ~w~n", [Glyph, Name]),
+ io:format(" Version: \t~s~n", [Version]),
+ case Deps of
+ [] -> ok;
+ _ -> io:format(" Dependencies:\t~p~n", [Deps])
+ end,
+ io:format(" Description:\t~s~n", [Description]),
+ io:format("~n")
+ end.
+
+print_list(Header, Plugins) ->
+ io:format(fmt_list(Header, Plugins)).
+
+fmt_list(Header, Plugins) ->
+ lists:flatten(
+ [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
+
+usort_plugins(Plugins) ->
+ lists:usort(fun plugins_cmp/2, Plugins).
+
+plugins_cmp(#plugin{name = N1, version = V1},
+ #plugin{name = N2, version = V2}) ->
+ {N1, V1} =< {N2, V2}.
+
+%% Return the names of the given plugins.
+plugin_names(Plugins) ->
+ [Name || #plugin{name = Name} <- Plugins].
+
+%% Write the enabled plugin names on disk.
+write_enabled_plugins(PluginsFile, Plugins) ->
+ case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
+ ok -> ok;
+ {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+maybe_warn_mochiweb(Enabled) ->
+ V = erlang:system_info(otp_release),
+ case lists:member(mochiweb, Enabled) andalso V < "R13B01" of
+ true ->
+ Stars = string:copies("*", 80),
+ io:format("~n~n~s~n"
+ " Warning: Mochiweb enabled and Erlang version ~s "
+ "detected.~n"
+ " Enabling plugins that depend on Mochiweb is not "
+ "supported on this Erlang~n"
+ " version. At least R13B01 is required.~n~n"
+ " RabbitMQ will not start successfully in this "
+ "configuration. You *must*~n"
+ " disable the Mochiweb plugin, or upgrade Erlang.~n"
+ "~s~n~n~n", [Stars, V, Stars]);
+ false ->
+ ok
+ end.
+
+report_change() ->
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n").
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 162d44f1..d56211b5 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -31,212 +31,21 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
-%% Shut dialyzer up
--spec(terminate/1 :: (string()) -> no_return()).
--spec(terminate/2 :: (string(), [any()]) -> no_return()).
-endif.
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ...~n"),
-
- %% Determine our various directories
- [EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir, NodeStr] =
- init:get_plain_arguments(),
- RootName = UnpackedPluginDir ++ "/rabbit",
-
- prepare_plugins(EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir),
-
- %% Build a list of required apps based on the fixed set, and any plugins
- PluginApps = find_plugins(UnpackedPluginDir),
- RequiredApps = ?BaseApps ++ PluginApps,
-
- %% Build the entire set of dependencies - this will load the
- %% applications along the way
- AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {failed_to_load_app, App, Err} ->
- terminate("failed to load application ~s:~n~p",
- [App, Err]);
- AppList ->
- AppList
- end,
- AppVersions = [determine_version(App) || App <- AllApps],
- RabbitVersion = proplists:get_value(rabbit, AppVersions),
-
- %% Build the overall release descriptor
- RDesc = {release,
- {"rabbit", RabbitVersion},
- {erts, erlang:system_info(version)},
- AppVersions},
-
- %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
- rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
-
- %% We exclude mochiweb due to its optional use of fdsrv.
- XRefExclude = [mochiweb],
-
- %% Compile the script
- ScriptFile = RootName ++ ".script",
- case systools:make_script(RootName, [local, silent,
- {exref, AllApps -- XRefExclude}]) of
- {ok, Module, Warnings} ->
- %% This gets lots of spurious no-source warnings when we
- %% have .ez files, so we want to supress them to prevent
- %% hiding real issues. On Ubuntu, we also get warnings
- %% about kernel/stdlib sources being out of date, which we
- %% also ignore for the same reason.
- WarningStr = Module:format_warning(
- [W || W <- Warnings,
- case W of
- {warning, {source_not_found, _}} -> false;
- {warning, {obj_out_of_date, {_,_,WApp,_,_}}}
- when WApp == mnesia;
- WApp == stdlib;
- WApp == kernel;
- WApp == sasl;
- WApp == crypto;
- WApp == os_mon -> false;
- _ -> true
- end]),
- case length(WarningStr) of
- 0 -> ok;
- _ -> S = string:copies("*", 80),
- io:format("~n~s~n~s~s~n~n", [S, WarningStr, S])
- end,
- ok;
- {error, Module, Error} ->
- terminate("generation of boot script file ~s failed:~n~s",
- [ScriptFile, Module:format_error(Error)])
- end,
-
- case post_process_script(ScriptFile) of
- ok -> ok;
- {error, Reason} ->
- terminate("post processing of boot script file ~s failed:~n~w",
- [ScriptFile, Reason])
- end,
- case systools:script2boot(RootName) of
- ok -> ok;
- error -> terminate("failed to compile boot script file ~s",
- [ScriptFile])
- end,
- io:format("~w plugins activated:~n", [length(PluginApps)]),
- [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
- || App <- PluginApps],
- io:nl(),
-
+ [NodeStr] = init:get_plain_arguments(),
ok = duplicate_node_check(NodeStr),
-
- terminate(0),
+ rabbit_misc:quit(0),
ok.
stop() ->
ok.
-determine_version(App) ->
- application:load(App),
- {ok, Vsn} = application:get_key(App, vsn),
- {App, Vsn}.
-
-delete_recursively(Fn) ->
- case rabbit_file:recursive_delete([Fn]) of
- ok -> ok;
- {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
- Error -> Error
- end.
-
-prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
- AllPlugins = rabbit_plugins:find_plugins(PluginsDistDir),
- Enabled = rabbit_plugins:read_enabled_plugins(EnabledPluginsFile),
- ToUnpack = rabbit_plugins:calculate_required_plugins(Enabled, AllPlugins),
- ToUnpackPlugins = rabbit_plugins:lookup_plugins(ToUnpack, AllPlugins),
-
- Missing = Enabled -- rabbit_plugins:plugin_names(ToUnpackPlugins),
- case Missing of
- [] -> ok;
- _ -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
- end,
-
- %% Eliminate the contents of the destination directory
- case delete_recursively(DestDir) of
- ok -> ok;
- {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E])
- end,
- case filelib:ensure_dir(DestDir ++ "/") of
- ok -> ok;
- {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
- end,
-
- [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
-
-prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
- zip:unzip(Location, [{cwd, PluginDestDir}]);
-prepare_plugin(#plugin{type = dir, name = Name, location = Location},
- PluginsDestDir) ->
- rabbit_file:recursive_copy(Location,
- filename:join([PluginsDestDir, Name])).
-
-find_plugins(PluginDir) ->
- [prepare_dir_plugin(PluginName) ||
- PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
-
-prepare_dir_plugin(PluginAppDescFn) ->
- %% Add the plugin ebin directory to the load path
- PluginEBinDirN = filename:dirname(PluginAppDescFn),
- code:add_path(PluginEBinDirN),
-
- %% We want the second-last token
- NameTokens = string:tokens(PluginAppDescFn,"/."),
- PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
- list_to_atom(PluginNameString).
-
-expand_dependencies(Pending) ->
- expand_dependencies(sets:new(), Pending).
-expand_dependencies(Current, []) ->
- Current;
-expand_dependencies(Current, [Next|Rest]) ->
- case sets:is_element(Next, Current) of
- true ->
- expand_dependencies(Current, Rest);
- false ->
- case application:load(Next) of
- ok ->
- ok;
- {error, {already_loaded, _}} ->
- ok;
- {error, Reason} ->
- throw({failed_to_load_app, Next, Reason})
- end,
- {ok, Required} = application:get_key(Next, applications),
- Unique = [A || A <- Required, not(sets:is_element(A, Current))],
- expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
- end.
-
-post_process_script(ScriptFile) ->
- case file:consult(ScriptFile) of
- {ok, [{script, Name, Entries}]} ->
- NewEntries = lists:flatmap(fun process_entry/1, Entries),
- case file:open(ScriptFile, [write]) of
- {ok, Fd} ->
- io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
- [date(), time(), {script, Name, NewEntries}]),
- file:close(Fd),
- ok;
- {error, OReason} ->
- {error, {failed_to_open_script_file_for_writing, OReason}}
- end;
- {error, Reason} ->
- {error, {failed_to_load_script, Reason}}
- end.
-
-process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) ->
- [{apply,{rabbit,maybe_hipe_compile,[]}},
- {apply,{rabbit,prepare,[]}}, Entry];
-process_entry(Entry) ->
- [Entry].
+%%----------------------------------------------------------------------------
%% Check whether a node with the same name is already running
duplicate_node_check([]) ->
@@ -252,11 +61,11 @@ duplicate_node_check(NodeStr) ->
"already running on ~p~n",
[NodeName, NodeHost]),
io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
- terminate(?ERROR_CODE);
+ rabbit_misc:quit(?ERROR_CODE);
false -> ok
end;
{error, EpmdReason} ->
- terminate("epmd error for host ~p: ~p (~s)~n",
+ rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
case EpmdReason of
address -> "unable to establish tcp connection";
@@ -264,16 +73,3 @@ duplicate_node_check(NodeStr) ->
_ -> inet:format_error(EpmdReason)
end])
end.
-
-terminate(Fmt, Args) ->
- io:format("ERROR: " ++ Fmt ++ "~n", Args),
- terminate(?ERROR_CODE).
-
-terminate(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status),
- receive
- after infinity -> ok
- end
- end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5acf6aca..07b39d8c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_resources/2, server_properties/1]).
+-export([conserve_resources/3, server_properties/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_resources/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_resources/3 :: (pid(), atom(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -133,7 +133,7 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_resources(Pid, Conserve) ->
+conserve_resources(Pid, _Source, Conserve) ->
Pid ! {conserve_resources, Conserve},
ok.
@@ -222,14 +222,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_by = none,
last_blocked_at = never},
try
- BufSizes = inet_op(fun () ->
- rabbit_net:getopts(
- ClientSock, [sndbuf, recbuf, buffer])
- end),
- BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
- ok = inet_op(fun () ->
- rabbit_net:setopts(ClientSock, [{buffer, BufSz}])
- end),
+ ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)),
@@ -743,8 +736,6 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-%% Compute frame_max for this instance. Could simply use 0, but breaks
-%% QPid Java client.
server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index bf2b4798..f142d233 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -19,6 +19,8 @@
-behaviour(supervisor).
-export([start_link/0, start_child/1, start_child/2, start_child/3,
+ start_supervisor_child/1, start_supervisor_child/2,
+ start_supervisor_child/3,
start_restartable_child/1, start_restartable_child/2, stop_child/1]).
-export([init/1]).
@@ -33,7 +35,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_child/1 :: (atom()) -> 'ok').
+-spec(start_child/2 :: (atom(), [any()]) -> 'ok').
-spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok').
+-spec(start_supervisor_child/1 :: (atom()) -> 'ok').
+-spec(start_supervisor_child/2 :: (atom(), [any()]) -> 'ok').
+-spec(start_supervisor_child/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(start_restartable_child/1 :: (atom()) -> 'ok').
-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok').
-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())).
@@ -42,22 +48,29 @@
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []).
-start_child(Mod) ->
- start_child(Mod, []).
+start_child(Mod) -> start_child(Mod, []).
-start_child(Mod, Args) ->
- start_child(Mod, Mod, Args).
+start_child(Mod, Args) -> start_child(Mod, Mod, Args).
start_child(ChildId, Mod, Args) ->
- child_reply(supervisor:start_child(?SERVER,
- {ChildId, {Mod, start_link, Args},
- transient, ?MAX_WAIT, worker, [Mod]})).
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, ?MAX_WAIT, worker, [Mod]})).
+
+start_supervisor_child(Mod) -> start_supervisor_child(Mod, []).
+
+start_supervisor_child(Mod, Args) -> start_supervisor_child(Mod, Mod, Args).
+
+start_supervisor_child(ChildId, Mod, Args) ->
+ child_reply(supervisor:start_child(
+ ?SERVER,
+ {ChildId, {Mod, start_link, Args},
+ transient, infinity, supervisor, [Mod]})).
-start_restartable_child(Mod) ->
- start_restartable_child(Mod, []).
+start_restartable_child(Mod) -> start_restartable_child(Mod, []).
start_restartable_child(Mod, Args) ->
Name = list_to_atom(atom_to_list(Mod) ++ "_sup"),
@@ -73,8 +86,7 @@ stop_child(ChildId) ->
E -> E
end.
-init([]) ->
- {ok, {{one_for_all, 0, 1}, []}}.
+init([]) -> {ok, {{one_for_all, 0, 1}, []}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 96b5fa38..5545cccf 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -29,6 +29,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
+-define(TIMEOUT, 5000).
all_tests() ->
passed = gm_tests:all_tests(),
@@ -50,7 +51,7 @@ all_tests() ->
passed = test_app_management(),
passed = test_log_management_during_startup(),
passed = test_statistics(),
- passed = test_option_parser(),
+ passed = test_arguments_parser(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
@@ -801,27 +802,57 @@ test_log_management_during_startup() ->
ok = control_action(start_app, []),
passed.
-test_option_parser() ->
- %% command and arguments should just pass through
- ok = check_get_options({["mock_command", "arg1", "arg2"], []},
- [], ["mock_command", "arg1", "arg2"]),
+test_arguments_parser() ->
+ GlobalOpts1 = [{"-f1", flag}, {"-o1", {option, "foo"}}],
+ Commands1 = [command1, {command2, [{"-f2", flag}, {"-o2", {option, "bar"}}]}],
- %% get flags
- ok = check_get_options(
- {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
- [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
-
- %% get options
- ok = check_get_options(
- {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
- [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
- ["mock_command", "-foo", "bar"]),
+ GetOptions =
+ fun (Args) ->
+ rabbit_misc:parse_arguments(Commands1, GlobalOpts1, Args)
+ end,
- %% shuffled and interleaved arguments and options
- ok = check_get_options(
- {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
- [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
- ["-f", "a1", "-o1", "hello", "a2", "a3"]),
+ check_parse_arguments(no_command, GetOptions, []),
+ check_parse_arguments(no_command, GetOptions, ["foo", "bar"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "foo"}], []}},
+ GetOptions, ["command1"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}},
+ GetOptions, ["command1", "-o1", "blah"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", true}, {"-o1", "foo"}], []}},
+ GetOptions, ["command1", "-f1"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], []}},
+ GetOptions, ["-o1", "blah", "command1"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "blah"}], ["quux"]}},
+ GetOptions, ["-o1", "blah", "command1", "quux"]),
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", true}, {"-o1", "blah"}], ["quux", "baz"]}},
+ GetOptions, ["command1", "quux", "-f1", "-o1", "blah", "baz"]),
+ %% For duplicate flags, the last one counts
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "second"}], []}},
+ GetOptions, ["-o1", "first", "command1", "-o1", "second"]),
+ %% If the flag "eats" the command, the command won't be recognised
+ check_parse_arguments(no_command, GetOptions,
+ ["-o1", "command1", "quux"]),
+ %% If a flag eats another flag, the eaten flag won't be recognised
+ check_parse_arguments(
+ {ok, {command1, [{"-f1", false}, {"-o1", "-f1"}], []}},
+ GetOptions, ["command1", "-o1", "-f1"]),
+
+ %% Now for some command-specific flags...
+ check_parse_arguments(
+ {ok, {command2, [{"-f1", false}, {"-f2", false},
+ {"-o1", "foo"}, {"-o2", "bar"}], []}},
+ GetOptions, ["command2"]),
+
+ check_parse_arguments(
+ {ok, {command2, [{"-f1", false}, {"-f2", true},
+ {"-o1", "baz"}, {"-o2", "bar"}], ["quux", "foo"]}},
+ GetOptions, ["-f2", "command2", "quux", "-o1", "baz", "foo"]),
passed.
@@ -1210,7 +1241,7 @@ test_spawn() ->
rabbit_limiter:make_token(self())),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
- after 1000 -> throw(failed_to_receive_channel_open_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
end,
{Writer, Ch}.
@@ -1231,7 +1262,7 @@ test_spawn_remote() ->
end
end),
receive Res -> Res
- after 1000 -> throw(failed_to_receive_result)
+ after ?TIMEOUT -> throw(failed_to_receive_result)
end.
user(Username) ->
@@ -1251,13 +1282,10 @@ test_confirms() ->
queue = Q0,
exchange = <<"amq.direct">>,
routing_key = "magic" }),
- receive #'queue.bind_ok'{} ->
- Q0
- after 1000 ->
- throw(failed_to_bind_queue)
+ receive #'queue.bind_ok'{} -> Q0
+ after ?TIMEOUT -> throw(failed_to_bind_queue)
end
- after 1000 ->
- throw(failed_to_declare_queue)
+ after ?TIMEOUT -> throw(failed_to_declare_queue)
end
end,
%% Declare and bind two queues
@@ -1270,7 +1298,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
#'confirm.select_ok'{} -> ok
- after 1000 -> throw(failed_to_enable_confirms)
+ after ?TIMEOUT -> throw(failed_to_enable_confirms)
end,
%% Publish a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
@@ -1287,7 +1315,7 @@ test_confirms() ->
receive
#'basic.nack'{} -> ok;
#'basic.ack'{} -> throw(received_ack_instead_of_nack)
- after 2000 -> throw(did_not_receive_nack)
+ after ?TIMEOUT-> throw(did_not_receive_nack)
end,
receive
#'basic.ack'{} -> throw(received_ack_when_none_expected)
@@ -1297,7 +1325,7 @@ test_confirms() ->
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
receive
#'queue.delete_ok'{} -> ok
- after 1000 -> throw(failed_to_cleanup_queue)
+ after ?TIMEOUT -> throw(failed_to_cleanup_queue)
end,
unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
@@ -1320,7 +1348,7 @@ test_statistics_receive_event1(Ch, Matcher) ->
true -> Props;
_ -> test_statistics_receive_event1(Ch, Matcher)
end
- after 1000 -> throw(failed_to_receive_event)
+ after ?TIMEOUT -> throw(failed_to_receive_event)
end.
test_statistics() ->
@@ -1332,9 +1360,8 @@ test_statistics() ->
%% Set up a channel and queue
{_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{}),
- QName = receive #'queue.declare_ok'{queue = Q0} ->
- Q0
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
QPid = Q#amqqueue.pid,
@@ -1414,7 +1441,7 @@ expect_event(Pid, Type) ->
Pid -> ok;
_ -> expect_event(Pid, Type)
end
- after 1000 -> throw({failed_to_receive_event, Type})
+ after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
test_delegates_async(SecondaryNode) ->
@@ -1438,7 +1465,7 @@ make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
+ after ?TIMEOUT -> throw(Throw)
end
end.
@@ -1451,9 +1478,7 @@ await_response(Count) ->
receive
response -> ok,
await_response(Count - 1)
- after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ after ?TIMEOUT -> throw(timeout)
end.
must_exit(Fun) ->
@@ -1520,7 +1545,7 @@ test_queue_cleanup(_SecondaryNode) ->
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok
- after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
rabbit_channel:shutdown(Ch),
rabbit:stop(),
@@ -1531,8 +1556,7 @@ test_queue_cleanup(_SecondaryNode) ->
receive
#'channel.close'{reply_code = ?NOT_FOUND} ->
ok
- after 2000 ->
- throw(failed_to_receive_channel_exit)
+ after ?TIMEOUT -> throw(failed_to_receive_channel_exit)
end,
rabbit_channel:shutdown(Ch2),
passed.
@@ -1559,8 +1583,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
{ok, 0} = rabbit_amqqueue:delete(Q, false, false),
passed
- after 2000 ->
- throw(failed_to_create_and_kill_queue)
+ after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
%%---------------------------------------------------------------------
@@ -1573,7 +1596,7 @@ control_action(Command, Args, NewOpts) ->
expand_options(default_options(), NewOpts)).
control_action(Command, Node, Args, Opts) ->
- case catch rabbit_control:action(
+ case catch rabbit_control_main:action(
Command, Node, Args, Opts,
fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
@@ -1605,10 +1628,13 @@ expand_options(As, Bs) ->
end
end, Bs, As).
-check_get_options({ExpArgs, ExpOpts}, Defs, Args) ->
- {ExpArgs, ResOpts} = rabbit_misc:get_options(Defs, Args),
- true = lists:sort(ExpOpts) == lists:sort(ResOpts), % don't care about the order
- ok.
+check_parse_arguments(ExpRes, Fun, As) ->
+ SortRes =
+ fun (no_command) -> no_command;
+ ({ok, {C, KVs, As1}}) -> {ok, {C, lists:sort(KVs), As1}}
+ end,
+
+ true = SortRes(ExpRes) =:= SortRes(Fun(As)).
empty_files(Files) ->
[case file:read_file_info(File) of
@@ -1788,7 +1814,7 @@ on_disk_capture(OnDisk, Awaiting, Pid) ->
Pid);
stop ->
done
- after (case Awaiting of [] -> 200; _ -> 1000 end) ->
+ after (case Awaiting of [] -> 200; _ -> ?TIMEOUT end) ->
case Awaiting of
[] -> Pid ! {self(), arrived}, on_disk_capture();
_ -> Pid ! {self(), timeout}
@@ -2341,7 +2367,7 @@ wait_for_confirms(Unconfirmed) ->
wait_for_confirms(
rabbit_misc:gb_sets_difference(
Unconfirmed, gb_sets:from_list(Confirmed)))
- after 5000 -> exit(timeout_waiting_for_confirm)
+ after ?TIMEOUT -> exit(timeout_waiting_for_confirm)
end
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index dafb3f2e..49213c95 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1466,16 +1466,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
- S1 -> {_, State2} =
- lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end,
- {S1, State},
- case (AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress) of
- true -> [AckFun, AlphaBetaFun];
- false -> [AlphaBetaFun, AckFun]
- end),
+ S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress)) of
+ true -> [AckFun, AlphaBetaFun];
+ false -> [AlphaBetaFun, AckFun]
+ end,
+ {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, Funs),
{true, State2}
end,