summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-17 15:40:04 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-17 15:40:04 +0100
commit655f69685ccaefef4cdd5a8977cb179d955ff46f (patch)
tree7097ad9c41148e93fd9e77d65092c77032875d35
parent2cba5f21768390aa9849b8b9d5b796d840af41a1 (diff)
parenta5ac91c069d4c73a1d09f48ea132a93b2cadc841 (diff)
downloadrabbitmq-server-655f69685ccaefef4cdd5a8977cb179d955ff46f.tar.gz
merge bug25071
-rw-r--r--packaging/macports/Portfile.in2
-rwxr-xr-xscripts/rabbitmq-plugins.bat2
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmqctl.bat2
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl62
-rw-r--r--src/rabbit_backing_queue.erl11
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_control_main.erl22
-rw-r--r--src/rabbit_log.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_mnesia.erl33
-rw-r--r--src/rabbit_plugins.erl98
-rw-r--r--src/rabbit_prelaunch.erl7
-rw-r--r--src/rabbit_tests.erl80
-rw-r--r--src/rabbit_variable_queue.erl8
-rw-r--r--src/vm_memory_monitor.erl8
18 files changed, 193 insertions, 177 deletions
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index e461e49e..82c1fb0c 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -59,7 +59,7 @@ set mandest ${destroot}${prefix}/share/man
use_configure no
-use_parallel_build yes
+use_parallel_build no
build.env-append HOME=${workpath}
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index c67a0263..341f871a 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -47,7 +47,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 167f272e..3aea4c07 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -96,7 +96,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
- -sname rabbitmqprelaunch!RANDOM! ^
+ -sname rabbitmqprelaunch!RANDOM!!TIME:~9! ^
-extra "!RABBITMQ_NODENAME!"
if ERRORLEVEL 1 (
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 9f549f1e..d8b1eaf1 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -43,7 +43,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM!!TIME:~9! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 4fc488b8..24c3ebd0 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -174,7 +174,7 @@
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
ChildSpecs :: [supervisor2:child_spec()],
- Result :: supervisor2:startlink_ret().
+ Result :: {'ok', pid()} | {'error', term()}.
-spec create_tables() -> Result when
Result :: 'ok'.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b4071627..f5a3a5f1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ ttl_timer_expiry,
senders,
publish_seqno,
unconfirmed,
@@ -107,7 +108,7 @@
]).
-define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
%%----------------------------------------------------------------------------
@@ -576,7 +577,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -716,28 +718,42 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
- ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- case DLXFun of
- undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
- BQS1;
- _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
- BQS1
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
- when TTL =/= undefined ->
- case BQ:is_empty(BQS) of
- true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
- State#q{ttl_timer_ref = TRef}
+ ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
+ Msgs),
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index dc144a0e..ed5340fe 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,6 +18,8 @@
-ifdef(use_specs).
+-export_type([async_callback/0]).
+
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
@@ -28,7 +30,8 @@
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(async_callback() ::
+ fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
@@ -121,9 +124,11 @@
%% necessitate an ack or not. If they do, the function returns a list of
%% messages with the respective acktags.
-callback dropwhile(msg_pred(), true, state())
- -> {[{rabbit_types:basic_message(), ack()}], state()};
+ -> {rabbit_types:message_properties() | undefined,
+ [{rabbit_types:basic_message(), ack()}], state()};
(msg_pred(), false, state())
- -> {undefined, state()}.
+ -> {rabbit_types:message_properties() | undefined,
+ undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a84800c0..e40d9b29 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [2, Res]},
+ BQ = {call, erlang, element, [3, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index e7948e37..08b96757 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -164,8 +164,8 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
- {error_string, Reason} ->
- print_error("~s", [Reason]),
+ {parse_error, {_Line, Mod, Err}} ->
+ print_error("~s", [lists:flatten(Mod:format_error(Err))]),
rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
@@ -449,16 +449,15 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
case erl_scan:string(Expr) of
{ok, Scanned, _} ->
case erl_parse:parse_exprs(Scanned) of
- {ok, Parsed} ->
- {value, Value, _} = unsafe_rpc(
- Node, erl_eval, exprs, [Parsed, []]),
- io:format("~p~n", [Value]),
- ok;
- {error, E} ->
- {error_string, format_parse_error(E)}
+ {ok, Parsed} -> {value, Value, _} =
+ unsafe_rpc(
+ Node, erl_eval, exprs, [Parsed, []]),
+ io:format("~p~n", [Value]),
+ ok;
+ {error, E} -> {parse_error, E}
end;
{error, E, _} ->
- {error_string, format_parse_error(E)}
+ {parse_error, E}
end.
%%----------------------------------------------------------------------------
@@ -547,9 +546,6 @@ exit_loop(Port) ->
{Port, _} -> exit_loop(Port)
end.
-format_parse_error({_Line, Mod, Err}) ->
- lists:flatten(Mod:format_error(Err)).
-
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index a6b4eeb0..8dfa89d3 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -40,18 +40,20 @@
-spec(log/3 :: (category(), level(), string()) -> 'ok').
-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
--spec(info/1 :: (string()) -> 'ok').
--spec(info/2 :: (string(), [any()]) -> 'ok').
+
+-spec(info/1 :: (string()) -> 'ok').
+-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
-spec(warning/2 :: (string(), [any()]) -> 'ok').
--spec(error/1 :: (string()) -> 'ok').
--spec(error/2 :: (string(), [any()]) -> 'ok').
+-spec(error/1 :: (string()) -> 'ok').
+-spec(error/2 :: (string(), [any()]) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
log(Category, Level, Fmt, Args) when is_list(Args) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 750bcd56..477449e3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
+ {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 25a51d22..8f6a9bcf 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, amqp_error/4, quit/1, quit/2,
+-export([die/1, frame_error/2, amqp_error/4, quit/1,
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
@@ -92,7 +92,6 @@
(rabbit_framing:amqp_exception()) -> channel_or_connection_exit()).
-spec(quit/1 :: (integer()) -> no_return()).
--spec(quit/2 :: (string(), [term()]) -> no_return()).
-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
-> rabbit_types:connection_exit()).
@@ -396,19 +395,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) ->
confirm_to_sender(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-%%
-%% @doc Halts the emulator after printing out an error message io-formatted with
-%% the supplied arguments. The exit status of the beam process will be set to 1.
-%%
-quit(Fmt, Args) ->
- io:format("ERROR: " ++ Fmt ++ "~n", Args),
- quit(1).
-
-%%
%% @doc Halts the emulator returning the given status code to the os.
%% On Windows this function will block indefinitely so as to give the io
%% subsystem time to flush stdout completely.
-%%
quit(Status) ->
case os:type() of
{unix, _} -> halt(Status);
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 7e9346f9..61b4054a 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -726,45 +726,48 @@ wait_for_tables(TableNames) ->
end.
reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully";
- true -> ""
- end]),
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
ensure_mnesia_not_running(),
case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false)
+ is_only_disc_node(node(), false)
of
true -> log_both("no other disc nodes running");
false -> ok
end,
- Node = node(),
- Nodes = all_clustered_nodes() -- [Node],
case Force of
- true -> ok;
+ true ->
+ disconnect_nodes(nodes());
false ->
ensure_mnesia_dir(),
start_mnesia(),
- RunningNodes =
+ {Nodes, RunningNodes} =
try
%% Force=true here so that reset still works when clustered
%% with a node which is down
ok = init_db(read_cluster_nodes_config(), true),
- running_clustered_nodes() -- [Node]
+ {all_clustered_nodes() -- [node()],
+ running_clustered_nodes() -- [node()]}
after
stop_mnesia()
end,
leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
- cannot_delete_schema)
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
+ cannot_delete_schema),
+ disconnect_nodes(Nodes)
end,
- %% We need to make sure that we don't end up in a distributed
- %% Erlang system with nodes while not being in an Mnesia cluster
- %% with them. We don't handle that well.
- [erlang:disconnect_node(N) || N <- Nodes],
ok = delete_cluster_nodes_config(),
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
+%% We need to make sure that we don't end up in a distributed Erlang
+%% system with nodes while not being in an Mnesia cluster with
+%% them. We don't handle that well.
+disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
+
leave_cluster([], _) -> ok;
leave_cluster(Nodes, RunningNodes) ->
%% find at least one running cluster node and instruct it to
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 7cf6eea9..ecb19611 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -17,8 +17,7 @@
-module(rabbit_plugins).
-include("rabbit.hrl").
--export([setup/0, active/0, read_enabled/1,
- list/1, dependencies/3]).
+-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
@@ -36,28 +35,25 @@
-ifdef(use_specs).
--spec(setup/0 :: () -> [atom()]).
--spec(active/0 :: () -> [atom()]).
+-type(plugin_name() :: atom()).
+
+-spec(setup/0 :: () -> [plugin_name()]).
+-spec(active/0 :: () -> [plugin_name()]).
-spec(list/1 :: (string()) -> [#plugin{}]).
--spec(read_enabled/1 :: (file:filename()) -> [atom()]).
--spec(dependencies/3 ::
- (boolean(), [atom()], [#plugin{}]) -> [atom()]).
+-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
+-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
+ [plugin_name()]).
-endif.
%%----------------------------------------------------------------------------
-%%
%% @doc Prepares the file system and installs all enabled plugins.
-%%
setup() ->
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
- {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
- {ok, EnabledPluginsFile} = application:get_env(rabbit,
- enabled_plugins_file),
- prepare_plugins(EnabledPluginsFile, PluginDir, ExpandDir),
- [prepare_dir_plugin(PluginName) ||
- PluginName <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
+ {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+ {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
+ prepare_plugins(EnabledFile, PluginDir, ExpandDir).
%% @doc Lists the plugins which are currently running.
active() ->
@@ -77,8 +73,7 @@ list(PluginsDir) ->
(Plugin = #plugin{}, {Plugins1, Problems1}) ->
{[Plugin|Plugins1], Problems1}
end, {[], []},
- [get_plugin_info(PluginsDir, Plug) ||
- Plug <- EZs ++ FreeApps]),
+ [plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]),
case Problems of
[] -> ok;
_ -> io:format("Warning: Problem reading some plugins: ~p~n",
@@ -98,11 +93,9 @@ read_enabled(PluginsFile) ->
PluginsFile, Reason}})
end.
-%%
%% @doc Calculate the dependency graph from <i>Sources</i>.
%% When Reverse =:= true the bottom/leaf level applications are returned in
%% the resulting list, otherwise they're skipped.
-%%
dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
fun (App, _Deps) -> [{App, App}] end,
@@ -118,42 +111,38 @@ dependencies(Reverse, Sources, AllPlugins) ->
%%----------------------------------------------------------------------------
-prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
+prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
AllPlugins = list(PluginsDistDir),
- Enabled = read_enabled(EnabledPluginsFile),
+ Enabled = read_enabled(EnabledFile),
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
- Missing = Enabled -- plugin_names(ToUnpackPlugins),
- case Missing of
- [] -> ok;
- _ -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
+ case Enabled -- plugin_names(ToUnpackPlugins) of
+ [] -> ok;
+ Missing -> io:format("Warning: the following enabled plugins were "
+ "not found: ~p~n", [Missing])
end,
%% Eliminate the contents of the destination directory
- case delete_recursively(DestDir) of
- ok -> ok;
- {error, E} -> rabbit_misc:quit("Could not delete dir ~s (~p)",
- [DestDir, E])
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
end,
- case filelib:ensure_dir(DestDir ++ "/") of
+ case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
- {error, E2} -> rabbit_misc:quit("Could not create dir ~s (~p)",
- [DestDir, E2])
+ {error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
+ [ExpandDir, E2]}})
end,
- [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
+ [prepare_plugin(Plugin, ExpandDir) || Plugin <- ToUnpackPlugins],
-prepare_dir_plugin(PluginAppDescFn) ->
- %% Add the plugin ebin directory to the load path
- PluginEBinDirN = filename:dirname(PluginAppDescFn),
- code:add_path(PluginEBinDirN),
+ [prepare_dir_plugin(PluginAppDescPath) ||
+ PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")].
- %% We want the second-last token
- NameTokens = string:tokens(PluginAppDescFn,"/."),
- PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
- list_to_atom(PluginNameString).
+prepare_dir_plugin(PluginAppDescPath) ->
+ code:add_path(filename:dirname(PluginAppDescPath)),
+ list_to_atom(filename:basename(PluginAppDescPath, ".app")).
%%----------------------------------------------------------------------------
@@ -164,22 +153,19 @@ delete_recursively(Fn) ->
Error -> Error
end.
-prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
- zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) ->
+ zip:unzip(Location, [{cwd, ExpandDir}]);
prepare_plugin(#plugin{type = dir, name = Name, location = Location},
- PluginsDestDir) ->
- rabbit_file:recursive_copy(Location,
- filename:join([PluginsDestDir, Name])).
+ ExpandDir) ->
+ rabbit_file:recursive_copy(Location, filename:join([ExpandDir, Name])).
-%% Get the #plugin{} from an .ez.
-get_plugin_info(Base, {ez, EZ0}) ->
+plugin_info(Base, {ez, EZ0}) ->
EZ = filename:join([Base, EZ0]),
case read_app_file(EZ) of
{application, Name, Props} -> mkplugin(Name, Props, ez, EZ);
{error, Reason} -> {error, EZ, Reason}
end;
-%% Get the #plugin{} from an .app.
-get_plugin_info(Base, {app, App0}) ->
+plugin_info(Base, {app, App0}) ->
App = filename:join([Base, App0]),
case rabbit_file:read_term_file(App) of
{ok, [{application, Name, Props}]} ->
@@ -198,7 +184,6 @@ mkplugin(Name, Props, Type, Location) ->
#plugin{name = Name, version = Version, description = Description,
dependencies = Dependencies, location = Location, type = Type}.
-%% Read the .app file from an ez.
read_app_file(EZ) ->
case zip:list_dir(EZ) of
{ok, [_|ZippedFiles]} ->
@@ -214,13 +199,11 @@ read_app_file(EZ) ->
{error, {invalid_ez, Reason}}
end.
-%% Return the path of the .app files in ebin/.
find_app_files(ZippedFiles) ->
{ok, RE} = re:compile("^.*/ebin/.*.app$"),
[Path || {zip_file, Path, _, _, _, _} <- ZippedFiles,
re:run(Path, RE, [{capture, none}]) =:= match].
-%% Parse a binary into a term.
parse_binary(Bin) ->
try
{ok, Ts, _} = erl_scan:string(binary_to_list(Bin)),
@@ -230,13 +213,10 @@ parse_binary(Bin) ->
Err -> {error, {invalid_app, Err}}
end.
-%% Filter out applications that can be loaded *right now*.
filter_applications(Applications) ->
[Application || Application <- Applications,
not is_available_app(Application)].
-%% Return whether is application is already available (and hence
-%% doesn't need enabling).
is_available_app(Application) ->
case application:load(Application) of
{error, {already_loaded, _}} -> true;
@@ -245,10 +225,8 @@ is_available_app(Application) ->
_ -> false
end.
-%% Return the names of the given plugins.
plugin_names(Plugins) ->
[Name || #plugin{name = Name} <- Plugins].
-%% Find plugins by name in a list of plugins.
lookup_plugins(Names, AllPlugins) ->
[P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)].
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index b0454435..404afe3c 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -57,7 +57,7 @@ duplicate_node_check(NodeStr) ->
case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
- true -> io:format("node with name ~p "
+ true -> io:format("ERROR: node with name ~p "
"already running on ~p~n",
[NodeName, NodeHost]),
io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
@@ -65,7 +65,8 @@ duplicate_node_check(NodeStr) ->
false -> ok
end;
{error, EpmdReason} ->
- rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
+ io:format("ERROR: epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
- rabbit_misc:format_inet_error(EpmdReason)])
+ rabbit_misc:format_inet_error(EpmdReason)]),
+ rabbit_misc:quit(?ERROR_CODE)
end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bb60bd12..ccac12c6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -74,12 +74,10 @@ maybe_run_cluster_dependent_tests() ->
run_cluster_dependent_tests(SecondaryNode) ->
SecondaryNodeS = atom_to_list(SecondaryNode),
- cover:stop(SecondaryNode),
ok = control_action(stop_app, []),
- ok = control_action(reset, []),
+ ok = safe_reset(),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
- cover:start(SecondaryNode),
ok = control_action(start_app, SecondaryNode, [], []),
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
@@ -940,7 +938,7 @@ test_cluster_management2(SecondaryNode) ->
ok = assert_ram_node(),
%% join cluster as a ram node
- ok = control_action(reset, []),
+ ok = safe_reset(),
ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -997,29 +995,30 @@ test_cluster_management2(SecondaryNode) ->
ok = assert_disc_node(),
%% turn a disk node into a ram node
- ok = control_action(reset, []),
+ %%
+ %% can't use safe_reset here since for some reason nodes()==[] and
+ %% yet w/o stopping coverage things break
+ with_suspended_cover(
+ [SecondaryNode], fun () -> ok = control_action(reset, []) end),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
ok = assert_ram_node(),
%% NB: this will log an inconsistent_database error, which is harmless
- %% Turning cover on / off is OK even if we're not in general using cover,
- %% it just turns the engine on / off, doesn't actually log anything.
- cover:stop([SecondaryNode]),
- true = disconnect_node(SecondaryNode),
- pong = net_adm:ping(SecondaryNode),
- cover:start([SecondaryNode]),
+ with_suspended_cover(
+ [SecondaryNode], fun () ->
+ true = disconnect_node(SecondaryNode),
+ pong = net_adm:ping(SecondaryNode)
+ end),
%% leaving a cluster as a ram node
- ok = control_action(reset, []),
+ ok = safe_reset(),
%% ...and as a disk node
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- cover:stop(SecondaryNode),
- ok = control_action(reset, []),
- cover:start(SecondaryNode),
+ ok = safe_reset(),
%% attempt to leave cluster when no other node is alive
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
@@ -1034,22 +1033,39 @@ test_cluster_management2(SecondaryNode) ->
control_action(cluster, [SecondaryNodeS]),
%% leave system clustered, with the secondary node as a ram node
- ok = control_action(force_reset, []),
+ with_suspended_cover(
+ [SecondaryNode], fun () -> ok = control_action(force_reset, []) end),
ok = control_action(start_app, []),
%% Yes, this is rather ugly. But since we're a clustered Mnesia
%% node and we're telling another clustered node to reset itself,
%% we will get disconnected half way through causing a
%% badrpc. This never happens in real life since rabbitmqctl is
- %% not a clustered Mnesia node.
- cover:stop(SecondaryNode),
- {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
- pong = net_adm:ping(SecondaryNode),
- cover:start(SecondaryNode),
+ %% not a clustered Mnesia node and is a hidden node.
+ with_suspended_cover(
+ [SecondaryNode],
+ fun () ->
+ {badrpc, nodedown} =
+ control_action(force_reset, SecondaryNode, [], []),
+ pong = net_adm:ping(SecondaryNode)
+ end),
ok = control_action(cluster, SecondaryNode, [NodeS], []),
ok = control_action(start_app, SecondaryNode, [], []),
passed.
+%% 'cover' does not cope at all well with nodes disconnecting, which
+%% happens as part of reset. So we turn it off temporarily. That is ok
+%% even if we're not in general using cover, it just turns the engine
+%% on / off and doesn't log anything.
+safe_reset() -> with_suspended_cover(
+ nodes(), fun () -> control_action(reset, []) end).
+
+with_suspended_cover(Nodes, Fun) ->
+ cover:stop(Nodes),
+ Res = Fun(),
+ cover:start(Nodes),
+ Res.
+
test_user_management() ->
%% lots if stuff that should fail
@@ -1216,7 +1232,15 @@ test_server_status() ->
ok = control_action(list_consumers, []),
%% set vm memory high watermark
+ HWM = vm_memory_monitor:get_vm_memory_high_watermark(),
+ ok = control_action(set_vm_memory_high_watermark, ["1"]),
ok = control_action(set_vm_memory_high_watermark, ["1.0"]),
+ ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
+
+ %% eval
+ {parse_error, _} = control_action(eval, ["\""]),
+ {parse_error, _} = control_action(eval, ["a("]),
+ ok = control_action(eval, ["a."]),
%% cleanup
[{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
@@ -2447,10 +2471,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
+ {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2467,11 +2491,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {undefined, VQ6} =
+ {_, undefined, VQ6} =
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
VQ6.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49213c95..bd606dfb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
- (S) -> {undefined, S}
+ End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
+ (Next, S) -> {Next, undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
- End(a(State1));
+ End(undefined, a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
@@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- End(a(in_r(MsgStatus, State1)))
+ End(MsgProps, a(in_r(MsgStatus, State1)))
end
end.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 85dbf368..5ce894a9 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -49,6 +49,7 @@
-record(state, {total_memory,
memory_limit,
+ memory_fraction,
timeout,
timer,
alarmed,
@@ -117,7 +118,7 @@ init([MemFraction, AlarmFuns]) ->
{ok, set_mem_limits(State, MemFraction)}.
handle_call(get_vm_memory_high_watermark, _From, State) ->
- {reply, State#state.memory_limit / State#state.total_memory, State};
+ {reply, State#state.memory_fraction, State};
handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) ->
{reply, ok, set_mem_limits(State, MemFraction)};
@@ -185,8 +186,9 @@ set_mem_limits(State, MemFraction) ->
MemLim = trunc(MemFraction * UsableMemory),
error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n",
[trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]),
- internal_update(State #state { total_memory = TotalMemory,
- memory_limit = MemLim }).
+ internal_update(State #state { total_memory = TotalMemory,
+ memory_limit = MemLim,
+ memory_fraction = MemFraction}).
internal_update(State = #state { memory_limit = MemLimit,
alarmed = Alarmed,