diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-17 15:40:04 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-17 15:40:04 +0100 |
commit | 655f69685ccaefef4cdd5a8977cb179d955ff46f (patch) | |
tree | 7097ad9c41148e93fd9e77d65092c77032875d35 | |
parent | 2cba5f21768390aa9849b8b9d5b796d840af41a1 (diff) | |
parent | a5ac91c069d4c73a1d09f48ea132a93b2cadc841 (diff) | |
download | rabbitmq-server-655f69685ccaefef4cdd5a8977cb179d955ff46f.tar.gz |
merge bug25071
-rw-r--r-- | packaging/macports/Portfile.in | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-plugins.bat | 2 | ||||
-rwxr-xr-x | scripts/rabbitmq-server.bat | 2 | ||||
-rwxr-xr-x | scripts/rabbitmqctl.bat | 2 | ||||
-rw-r--r-- | src/mirrored_supervisor.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 62 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 22 | ||||
-rw-r--r-- | src/rabbit_log.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 33 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 98 | ||||
-rw-r--r-- | src/rabbit_prelaunch.erl | 7 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 80 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 | ||||
-rw-r--r-- | src/vm_memory_monitor.erl | 8 |
18 files changed, 193 insertions, 177 deletions
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index e461e49e..82c1fb0c 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -59,7 +59,7 @@ set mandest ${destroot}${prefix}/share/man use_configure no -use_parallel_build yes +use_parallel_build no build.env-append HOME=${workpath} diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index c67a0263..341f871a 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -47,7 +47,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
endlocal
endlocal
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 167f272e..3aea4c07 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -96,7 +96,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
- -sname rabbitmqprelaunch!RANDOM! ^
+ -sname rabbitmqprelaunch!RANDOM!!TIME:~9! ^
-extra "!RABBITMQ_NODENAME!"
if ERRORLEVEL 1 (
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 9f549f1e..d8b1eaf1 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -43,7 +43,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM!!TIME:~9! -s rabbit_control_main -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 4fc488b8..24c3ebd0 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -174,7 +174,7 @@ -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), ChildSpecs :: [supervisor2:child_spec()], - Result :: supervisor2:startlink_ret(). + Result :: {'ok', pid()} | {'error', term()}. -spec create_tables() -> Result when Result :: 'ok'. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b4071627..f5a3a5f1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + ttl_timer_expiry, senders, publish_seqno, unconfirmed, @@ -107,7 +108,7 @@ ]). -define(INFO_KEYS, - ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]). + ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). %%---------------------------------------------------------------------------- @@ -576,7 +577,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -716,28 +718,42 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), DLXFun = dead_letter_fun(expired, State), - ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - case DLXFun of - undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS), - BQS1; - _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS), - lists:foreach( - fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), - BQS1 - end, - ensure_ttl_timer(State#q{backing_queue_state = BQS1}). - -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) - when TTL =/= undefined -> - case BQ:is_empty(BQS) of - true -> State; - false -> TRef = erlang:send_after(TTL, self(), drop_expired), - State#q{ttl_timer_ref = TRef} + ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, + {Props, BQS1} = + case DLXFun of + undefined -> + {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> + {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, + Msgs), + {Next, BQS2} + end, + ensure_ttl_timer(case Props of + undefined -> undefined; + #message_properties{expiry = Exp} -> Exp + end, State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(undefined, State) -> + State; +ensure_ttl_timer(_Expiry, State = #q{ttl = undefined}) -> + State; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> + After = (case Expiry - now_micros() of + V when V > 0 -> V + 999; %% always fire later + _ -> 0 + end) div 1000, + TRef = erlang:send_after(After, self(), drop_expired), + State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, + ttl_timer_expiry = TExpiry}) + when Expiry + 1000 < TExpiry -> + case erlang:cancel_timer(TRef) of + false -> State; + _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; -ensure_ttl_timer(State) -> +ensure_ttl_timer(_Expiry, State) -> State. ack_if_no_dlx(AckTags, State = #q{dlx = undefined, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index dc144a0e..ed5340fe 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,6 +18,8 @@ -ifdef(use_specs). +-export_type([async_callback/0]). + %% We can't specify a per-queue ack/state with callback signatures -type(ack() :: any()). -type(state() :: any()). @@ -28,7 +30,8 @@ {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(async_callback() :: + fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). -type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | @@ -121,9 +124,11 @@ %% necessitate an ack or not. If they do, the function returns a list of %% messages with the respective acktags. -callback dropwhile(msg_pred(), true, state()) - -> {[{rabbit_types:basic_message(), ack()}], state()}; + -> {rabbit_types:message_properties() | undefined, + [{rabbit_types:basic_message(), ack()}], state()}; (msg_pred(), false, state()) - -> {undefined, state()}. + -> {rabbit_types:message_properties() | undefined, + undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index a84800c0..e40d9b29 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> S#state{bqstate = BQ1}; next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [2, Res]}, + BQ = {call, erlang, element, [3, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index e7948e37..08b96757 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -164,8 +164,8 @@ start() -> {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); - {error_string, Reason} -> - print_error("~s", [Reason]), + {parse_error, {_Line, Mod, Err}} -> + print_error("~s", [lists:flatten(Mod:format_error(Err))]), rabbit_misc:quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), @@ -449,16 +449,15 @@ action(eval, Node, [Expr], _Opts, _Inform) -> case erl_scan:string(Expr) of {ok, Scanned, _} -> case erl_parse:parse_exprs(Scanned) of - {ok, Parsed} -> - {value, Value, _} = unsafe_rpc( - Node, erl_eval, exprs, [Parsed, []]), - io:format("~p~n", [Value]), - ok; - {error, E} -> - {error_string, format_parse_error(E)} + {ok, Parsed} -> {value, Value, _} = + unsafe_rpc( + Node, erl_eval, exprs, [Parsed, []]), + io:format("~p~n", [Value]), + ok; + {error, E} -> {parse_error, E} end; {error, E, _} -> - {error_string, format_parse_error(E)} + {parse_error, E} end. %%---------------------------------------------------------------------------- @@ -547,9 +546,6 @@ exit_loop(Port) -> {Port, _} -> exit_loop(Port) end. -format_parse_error({_Line, Mod, Err}) -> - lists:flatten(Mod:format_error(Err)). - %%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index a6b4eeb0..8dfa89d3 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -40,18 +40,20 @@ -spec(log/3 :: (category(), level(), string()) -> 'ok'). -spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok'). --spec(info/1 :: (string()) -> 'ok'). --spec(info/2 :: (string(), [any()]) -> 'ok'). + +-spec(info/1 :: (string()) -> 'ok'). +-spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). -spec(warning/2 :: (string(), [any()]) -> 'ok'). --spec(error/1 :: (string()) -> 'ok'). --spec(error/2 :: (string(), [any()]) -> 'ok'). +-spec(error/1 :: (string()) -> 'ok'). +-spec(error/2 :: (string(), [any()]) -> 'ok'). -endif. %%---------------------------------------------------------------------------- start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + log(Category, Level, Fmt) -> log(Category, Level, Fmt, []). log(Category, Level, Fmt, Args) when is_list(Args) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 750bcd56..477449e3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), + {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - {Msgs, State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 } }. + {Next, Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 25a51d22..8f6a9bcf 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -19,7 +19,7 @@ -include("rabbit_framing.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). --export([die/1, frame_error/2, amqp_error/4, quit/1, quit/2, +-export([die/1, frame_error/2, amqp_error/4, quit/1, protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). @@ -92,7 +92,6 @@ (rabbit_framing:amqp_exception()) -> channel_or_connection_exit()). -spec(quit/1 :: (integer()) -> no_return()). --spec(quit/2 :: (string(), [term()]) -> no_return()). -spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary()) -> rabbit_types:connection_exit()). @@ -396,19 +395,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> confirm_to_sender(Pid, MsgSeqNos) -> gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). -%% -%% @doc Halts the emulator after printing out an error message io-formatted with -%% the supplied arguments. The exit status of the beam process will be set to 1. -%% -quit(Fmt, Args) -> - io:format("ERROR: " ++ Fmt ++ "~n", Args), - quit(1). - -%% %% @doc Halts the emulator returning the given status code to the os. %% On Windows this function will block indefinitely so as to give the io %% subsystem time to flush stdout completely. -%% quit(Status) -> case os:type() of {unix, _} -> halt(Status); diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7e9346f9..61b4054a 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -726,45 +726,48 @@ wait_for_tables(TableNames) -> end. reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully"; - true -> "" - end]), + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", + [if Force -> " forcefully"; + true -> "" + end]), ensure_mnesia_not_running(), case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) + is_only_disc_node(node(), false) of true -> log_both("no other disc nodes running"); false -> ok end, - Node = node(), - Nodes = all_clustered_nodes() -- [Node], case Force of - true -> ok; + true -> + disconnect_nodes(nodes()); false -> ensure_mnesia_dir(), start_mnesia(), - RunningNodes = + {Nodes, RunningNodes} = try %% Force=true here so that reset still works when clustered %% with a node which is down ok = init_db(read_cluster_nodes_config(), true), - running_clustered_nodes() -- [Node] + {all_clustered_nodes() -- [node()], + running_clustered_nodes() -- [node()]} after stop_mnesia() end, leave_cluster(Nodes, RunningNodes), - rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), - cannot_delete_schema) + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + disconnect_nodes(Nodes) end, - %% We need to make sure that we don't end up in a distributed - %% Erlang system with nodes while not being in an Mnesia cluster - %% with them. We don't handle that well. - [erlang:disconnect_node(N) || N <- Nodes], ok = delete_cluster_nodes_config(), %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. +%% We need to make sure that we don't end up in a distributed Erlang +%% system with nodes while not being in an Mnesia cluster with +%% them. We don't handle that well. +disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes]. + leave_cluster([], _) -> ok; leave_cluster(Nodes, RunningNodes) -> %% find at least one running cluster node and instruct it to diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 7cf6eea9..ecb19611 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -17,8 +17,7 @@ -module(rabbit_plugins). -include("rabbit.hrl"). --export([setup/0, active/0, read_enabled/1, - list/1, dependencies/3]). +-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). @@ -36,28 +35,25 @@ -ifdef(use_specs). --spec(setup/0 :: () -> [atom()]). --spec(active/0 :: () -> [atom()]). +-type(plugin_name() :: atom()). + +-spec(setup/0 :: () -> [plugin_name()]). +-spec(active/0 :: () -> [plugin_name()]). -spec(list/1 :: (string()) -> [#plugin{}]). --spec(read_enabled/1 :: (file:filename()) -> [atom()]). --spec(dependencies/3 :: - (boolean(), [atom()], [#plugin{}]) -> [atom()]). +-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]). +-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) -> + [plugin_name()]). -endif. %%---------------------------------------------------------------------------- -%% %% @doc Prepares the file system and installs all enabled plugins. -%% setup() -> - {ok, PluginDir} = application:get_env(rabbit, plugins_dir), - {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), - {ok, EnabledPluginsFile} = application:get_env(rabbit, - enabled_plugins_file), - prepare_plugins(EnabledPluginsFile, PluginDir, ExpandDir), - [prepare_dir_plugin(PluginName) || - PluginName <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")]. + {ok, PluginDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), + prepare_plugins(EnabledFile, PluginDir, ExpandDir). %% @doc Lists the plugins which are currently running. active() -> @@ -77,8 +73,7 @@ list(PluginsDir) -> (Plugin = #plugin{}, {Plugins1, Problems1}) -> {[Plugin|Plugins1], Problems1} end, {[], []}, - [get_plugin_info(PluginsDir, Plug) || - Plug <- EZs ++ FreeApps]), + [plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]), case Problems of [] -> ok; _ -> io:format("Warning: Problem reading some plugins: ~p~n", @@ -98,11 +93,9 @@ read_enabled(PluginsFile) -> PluginsFile, Reason}}) end. -%% %% @doc Calculate the dependency graph from <i>Sources</i>. %% When Reverse =:= true the bottom/leaf level applications are returned in %% the resulting list, otherwise they're skipped. -%% dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( fun (App, _Deps) -> [{App, App}] end, @@ -118,42 +111,38 @@ dependencies(Reverse, Sources, AllPlugins) -> %%---------------------------------------------------------------------------- -prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) -> +prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> AllPlugins = list(PluginsDistDir), - Enabled = read_enabled(EnabledPluginsFile), + Enabled = read_enabled(EnabledFile), ToUnpack = dependencies(false, Enabled, AllPlugins), ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), - Missing = Enabled -- plugin_names(ToUnpackPlugins), - case Missing of - [] -> ok; - _ -> io:format("Warning: the following enabled plugins were " - "not found: ~p~n", [Missing]) + case Enabled -- plugin_names(ToUnpackPlugins) of + [] -> ok; + Missing -> io:format("Warning: the following enabled plugins were " + "not found: ~p~n", [Missing]) end, %% Eliminate the contents of the destination directory - case delete_recursively(DestDir) of - ok -> ok; - {error, E} -> rabbit_misc:quit("Could not delete dir ~s (~p)", - [DestDir, E]) + case delete_recursively(ExpandDir) of + ok -> ok; + {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, + [ExpandDir, E1]}}) end, - case filelib:ensure_dir(DestDir ++ "/") of + case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; - {error, E2} -> rabbit_misc:quit("Could not create dir ~s (~p)", - [DestDir, E2]) + {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, + [ExpandDir, E2]}}) end, - [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins]. + [prepare_plugin(Plugin, ExpandDir) || Plugin <- ToUnpackPlugins], -prepare_dir_plugin(PluginAppDescFn) -> - %% Add the plugin ebin directory to the load path - PluginEBinDirN = filename:dirname(PluginAppDescFn), - code:add_path(PluginEBinDirN), + [prepare_dir_plugin(PluginAppDescPath) || + PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")]. - %% We want the second-last token - NameTokens = string:tokens(PluginAppDescFn,"/."), - PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens), - list_to_atom(PluginNameString). +prepare_dir_plugin(PluginAppDescPath) -> + code:add_path(filename:dirname(PluginAppDescPath)), + list_to_atom(filename:basename(PluginAppDescPath, ".app")). %%---------------------------------------------------------------------------- @@ -164,22 +153,19 @@ delete_recursively(Fn) -> Error -> Error end. -prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) -> - zip:unzip(Location, [{cwd, PluginDestDir}]); +prepare_plugin(#plugin{type = ez, location = Location}, ExpandDir) -> + zip:unzip(Location, [{cwd, ExpandDir}]); prepare_plugin(#plugin{type = dir, name = Name, location = Location}, - PluginsDestDir) -> - rabbit_file:recursive_copy(Location, - filename:join([PluginsDestDir, Name])). + ExpandDir) -> + rabbit_file:recursive_copy(Location, filename:join([ExpandDir, Name])). -%% Get the #plugin{} from an .ez. -get_plugin_info(Base, {ez, EZ0}) -> +plugin_info(Base, {ez, EZ0}) -> EZ = filename:join([Base, EZ0]), case read_app_file(EZ) of {application, Name, Props} -> mkplugin(Name, Props, ez, EZ); {error, Reason} -> {error, EZ, Reason} end; -%% Get the #plugin{} from an .app. -get_plugin_info(Base, {app, App0}) -> +plugin_info(Base, {app, App0}) -> App = filename:join([Base, App0]), case rabbit_file:read_term_file(App) of {ok, [{application, Name, Props}]} -> @@ -198,7 +184,6 @@ mkplugin(Name, Props, Type, Location) -> #plugin{name = Name, version = Version, description = Description, dependencies = Dependencies, location = Location, type = Type}. -%% Read the .app file from an ez. read_app_file(EZ) -> case zip:list_dir(EZ) of {ok, [_|ZippedFiles]} -> @@ -214,13 +199,11 @@ read_app_file(EZ) -> {error, {invalid_ez, Reason}} end. -%% Return the path of the .app files in ebin/. find_app_files(ZippedFiles) -> {ok, RE} = re:compile("^.*/ebin/.*.app$"), [Path || {zip_file, Path, _, _, _, _} <- ZippedFiles, re:run(Path, RE, [{capture, none}]) =:= match]. -%% Parse a binary into a term. parse_binary(Bin) -> try {ok, Ts, _} = erl_scan:string(binary_to_list(Bin)), @@ -230,13 +213,10 @@ parse_binary(Bin) -> Err -> {error, {invalid_app, Err}} end. -%% Filter out applications that can be loaded *right now*. filter_applications(Applications) -> [Application || Application <- Applications, not is_available_app(Application)]. -%% Return whether is application is already available (and hence -%% doesn't need enabling). is_available_app(Application) -> case application:load(Application) of {error, {already_loaded, _}} -> true; @@ -245,10 +225,8 @@ is_available_app(Application) -> _ -> false end. -%% Return the names of the given plugins. plugin_names(Plugins) -> [Name || #plugin{name = Name} <- Plugins]. -%% Find plugins by name in a list of plugins. lookup_plugins(Names, AllPlugins) -> [P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)]. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index b0454435..404afe3c 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -57,7 +57,7 @@ duplicate_node_check(NodeStr) -> case rabbit_nodes:names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of - true -> io:format("node with name ~p " + true -> io:format("ERROR: node with name ~p " "already running on ~p~n", [NodeName, NodeHost]), io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"), @@ -65,7 +65,8 @@ duplicate_node_check(NodeStr) -> false -> ok end; {error, EpmdReason} -> - rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n", + io:format("ERROR: epmd error for host ~p: ~p (~s)~n", [NodeHost, EpmdReason, - rabbit_misc:format_inet_error(EpmdReason)]) + rabbit_misc:format_inet_error(EpmdReason)]), + rabbit_misc:quit(?ERROR_CODE) end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bb60bd12..ccac12c6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -74,12 +74,10 @@ maybe_run_cluster_dependent_tests() -> run_cluster_dependent_tests(SecondaryNode) -> SecondaryNodeS = atom_to_list(SecondaryNode), - cover:stop(SecondaryNode), ok = control_action(stop_app, []), - ok = control_action(reset, []), + ok = safe_reset(), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), - cover:start(SecondaryNode), ok = control_action(start_app, SecondaryNode, [], []), io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), @@ -940,7 +938,7 @@ test_cluster_management2(SecondaryNode) -> ok = assert_ram_node(), %% join cluster as a ram node - ok = control_action(reset, []), + ok = safe_reset(), ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -997,29 +995,30 @@ test_cluster_management2(SecondaryNode) -> ok = assert_disc_node(), %% turn a disk node into a ram node - ok = control_action(reset, []), + %% + %% can't use safe_reset here since for some reason nodes()==[] and + %% yet w/o stopping coverage things break + with_suspended_cover( + [SecondaryNode], fun () -> ok = control_action(reset, []) end), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), ok = assert_ram_node(), %% NB: this will log an inconsistent_database error, which is harmless - %% Turning cover on / off is OK even if we're not in general using cover, - %% it just turns the engine on / off, doesn't actually log anything. - cover:stop([SecondaryNode]), - true = disconnect_node(SecondaryNode), - pong = net_adm:ping(SecondaryNode), - cover:start([SecondaryNode]), + with_suspended_cover( + [SecondaryNode], fun () -> + true = disconnect_node(SecondaryNode), + pong = net_adm:ping(SecondaryNode) + end), %% leaving a cluster as a ram node - ok = control_action(reset, []), + ok = safe_reset(), %% ...and as a disk node ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - cover:stop(SecondaryNode), - ok = control_action(reset, []), - cover:start(SecondaryNode), + ok = safe_reset(), %% attempt to leave cluster when no other node is alive ok = control_action(cluster, [SecondaryNodeS, NodeS]), @@ -1034,22 +1033,39 @@ test_cluster_management2(SecondaryNode) -> control_action(cluster, [SecondaryNodeS]), %% leave system clustered, with the secondary node as a ram node - ok = control_action(force_reset, []), + with_suspended_cover( + [SecondaryNode], fun () -> ok = control_action(force_reset, []) end), ok = control_action(start_app, []), %% Yes, this is rather ugly. But since we're a clustered Mnesia %% node and we're telling another clustered node to reset itself, %% we will get disconnected half way through causing a %% badrpc. This never happens in real life since rabbitmqctl is - %% not a clustered Mnesia node. - cover:stop(SecondaryNode), - {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []), - pong = net_adm:ping(SecondaryNode), - cover:start(SecondaryNode), + %% not a clustered Mnesia node and is a hidden node. + with_suspended_cover( + [SecondaryNode], + fun () -> + {badrpc, nodedown} = + control_action(force_reset, SecondaryNode, [], []), + pong = net_adm:ping(SecondaryNode) + end), ok = control_action(cluster, SecondaryNode, [NodeS], []), ok = control_action(start_app, SecondaryNode, [], []), passed. +%% 'cover' does not cope at all well with nodes disconnecting, which +%% happens as part of reset. So we turn it off temporarily. That is ok +%% even if we're not in general using cover, it just turns the engine +%% on / off and doesn't log anything. +safe_reset() -> with_suspended_cover( + nodes(), fun () -> control_action(reset, []) end). + +with_suspended_cover(Nodes, Fun) -> + cover:stop(Nodes), + Res = Fun(), + cover:start(Nodes), + Res. + test_user_management() -> %% lots if stuff that should fail @@ -1216,7 +1232,15 @@ test_server_status() -> ok = control_action(list_consumers, []), %% set vm memory high watermark + HWM = vm_memory_monitor:get_vm_memory_high_watermark(), + ok = control_action(set_vm_memory_high_watermark, ["1"]), ok = control_action(set_vm_memory_high_watermark, ["1.0"]), + ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), + + %% eval + {parse_error, _} = control_action(eval, ["\""]), + {parse_error, _} = control_action(eval, ["a("]), + ok = control_action(eval, ["a."]), %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], @@ -2447,10 +2471,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - {undefined, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, false, VQ1), + {_, undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2467,11 +2491,11 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {undefined, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, false, VQ2), + {_, undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {undefined, VQ6} = + {_, undefined, VQ6} = rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), VQ6. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 49213c95..bd606dfb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; - (S) -> {undefined, S} + End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; + (Next, S) -> {Next, undefined, S} end, case queue_out(State) of {empty, State1} -> - End(a(State1)); + End(undefined, a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case {Pred(MsgProps), AckRequired} of {true, true} -> @@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - End(a(in_r(MsgStatus, State1))) + End(MsgProps, a(in_r(MsgStatus, State1))) end end. diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 85dbf368..5ce894a9 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -49,6 +49,7 @@ -record(state, {total_memory, memory_limit, + memory_fraction, timeout, timer, alarmed, @@ -117,7 +118,7 @@ init([MemFraction, AlarmFuns]) -> {ok, set_mem_limits(State, MemFraction)}. handle_call(get_vm_memory_high_watermark, _From, State) -> - {reply, State#state.memory_limit / State#state.total_memory, State}; + {reply, State#state.memory_fraction, State}; handle_call({set_vm_memory_high_watermark, MemFraction}, _From, State) -> {reply, ok, set_mem_limits(State, MemFraction)}; @@ -185,8 +186,9 @@ set_mem_limits(State, MemFraction) -> MemLim = trunc(MemFraction * UsableMemory), error_logger:info_msg("Memory limit set to ~pMB of ~pMB total.~n", [trunc(MemLim/?ONE_MB), trunc(TotalMemory/?ONE_MB)]), - internal_update(State #state { total_memory = TotalMemory, - memory_limit = MemLim }). + internal_update(State #state { total_memory = TotalMemory, + memory_limit = MemLim, + memory_fraction = MemFraction}). internal_update(State = #state { memory_limit = MemLimit, alarmed = Alarmed, |