summaryrefslogtreecommitdiff
path: root/deps/rabbit_common/src/rabbit_ct_broker_helpers.erl
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2016-05-03 15:49:47 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2016-05-03 15:49:47 +0200
commit2a84bf75f88a3ec81b49bcf6c22f7600767500c9 (patch)
tree7841a0c73690b306ec0fce9679dea32f2d482d97 /deps/rabbit_common/src/rabbit_ct_broker_helpers.erl
parentfad2d9b5bf141ef7107ba9f4bc9c9450886db971 (diff)
downloadrabbitmq-server-git-2a84bf75f88a3ec81b49bcf6c22f7600767500c9.tar.gz
rabbit_ct_*: Refine API
o Simplify many function names. o Unify functions taking a node name or index. Now the same function (eg. rpc()) takes a node name or index interchangeably. o Rewrite command execution: it now takes a list of arguments instead of a command line to parse (and thus escape if needed). o The node name is stored under the key `nodename`, instead of `rmq_nodename`. The prefix is not needed because the key appears under the `rmq_nodes` node. o Provide a make() function to wrap execution of make(1).
Diffstat (limited to 'deps/rabbit_common/src/rabbit_ct_broker_helpers.erl')
-rw-r--r--deps/rabbit_common/src/rabbit_ct_broker_helpers.erl329
1 files changed, 191 insertions, 138 deletions
diff --git a/deps/rabbit_common/src/rabbit_ct_broker_helpers.erl b/deps/rabbit_common/src/rabbit_ct_broker_helpers.erl
index a19d68db15..846c06e199 100644
--- a/deps/rabbit_common/src/rabbit_ct_broker_helpers.erl
+++ b/deps/rabbit_common/src/rabbit_ct_broker_helpers.erl
@@ -25,23 +25,29 @@
start_rabbitmq_nodes/1,
stop_rabbitmq_nodes/1,
cluster_nodes/1,
+
get_node_configs/1, get_node_configs/2,
get_node_config/2, get_node_config/3,
+ nodename_to_index/2,
+
control_action/2, control_action/3, control_action/4,
control_action_t/3, control_action_t/4, control_action_t/5,
control_action_opts/1,
info_action/3, info_action_t/4,
- add_code_path_to_broker/2,
- add_code_path_to_all_brokers/2,
- run_on_broker/4, run_on_broker/5,
- run_on_broker_i/5, run_on_broker_i/6,
- run_on_all_brokers/4,
- restart_broker/1,
- restart_broker_i/2,
- stop_broker/1,
- stop_broker_i/2,
- kill_broker/1,
- kill_broker_i/2,
+ rabbitmqctl/3,
+
+ add_code_path_to_node/2,
+ add_code_path_to_all_nodes/2,
+ rpc/5, rpc/6,
+ rpc_all/4, rpc_all/5,
+
+ start_node/2,
+ start_broker/2,
+ restart_broker/2,
+ stop_broker/2,
+ stop_node/2,
+ kill_node/2,
+
get_connection_pids/1,
get_queue_sup_pid/1,
set_policy/6,
@@ -107,26 +113,14 @@ start_rabbitmq_nodes(Config) ->
wait_for_rabbitmq_nodes(Config, [], NodeConfigs, Clustered) ->
NodeConfigs1 = lists:sort(
fun(NodeConfigA, NodeConfigB) ->
- NodeA = ?config(rmq_nodename, NodeConfigA),
- NodeB = ?config(rmq_nodename, NodeConfigB),
+ NodeA = ?config(nodename, NodeConfigA),
+ NodeB = ?config(nodename, NodeConfigB),
NodeA =< NodeB
end, NodeConfigs),
Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodes, NodeConfigs1}),
if
- Clustered ->
- Rabbitmqctl = ?config(rabbitmqctl_cmd, Config),
- Nodename = ?config(rmq_nodename, hd(NodeConfigs1)),
- Cmd = Rabbitmqctl ++ " -n \"" ++ atom_to_list(Nodename) ++ "\"" ++
- " cluster_status",
- case rabbit_ct_helpers:run_cmd(Cmd) of
- true ->
- Config1;
- false ->
- stop_rabbitmq_nodes(Config1),
- {skip, "Could not confirm cluster was up and running"}
- end;
- true ->
- Config1
+ Clustered -> cluster_nodes(Config1);
+ true -> Config1
end;
wait_for_rabbitmq_nodes(Config, Starting, NodeConfigs, Clustered) ->
receive
@@ -138,20 +132,9 @@ wait_for_rabbitmq_nodes(Config, Starting, NodeConfigs, Clustered) ->
{Pid, NodeConfig} when NodeConfigs =:= [] ->
wait_for_rabbitmq_nodes(Config, Starting -- [Pid],
[NodeConfig | NodeConfigs], Clustered);
- {Pid, NodeConfig} when not Clustered ->
+ {Pid, NodeConfig} ->
wait_for_rabbitmq_nodes(Config, Starting -- [Pid],
- [NodeConfig | NodeConfigs], Clustered);
- {Pid, NodeConfig} when Clustered ->
- case cluster_nodes(Config, NodeConfig, hd(NodeConfigs)) of
- ok ->
- wait_for_rabbitmq_nodes(Config, Starting -- [Pid],
- [NodeConfig | NodeConfigs], Clustered);
- {skip, _} = Error ->
- Config1 = rabbit_ct_helpers:set_config(Config,
- {rmq_nodes, [NodeConfig | NodeConfigs]}),
- stop_rabbitmq_nodes(Config1),
- Error
- end
+ [NodeConfig | NodeConfigs], Clustered)
end.
%% To start a RabbitMQ node, we need to:
@@ -166,8 +149,7 @@ wait_for_rabbitmq_nodes(Config, Starting, NodeConfigs, Clustered) ->
%% generated.
start_rabbitmq_node(Master, Config, NodeConfig, I) ->
- Attempts0 = rabbit_ct_helpers:get_config(NodeConfig,
- rmq_failed_boot_attempts),
+ Attempts0 = rabbit_ct_helpers:get_config(NodeConfig, failed_boot_attempts),
Attempts = case Attempts0 of
undefined -> 0;
N -> N
@@ -189,7 +171,7 @@ start_rabbitmq_node(Master, Config, NodeConfig, I) ->
%% Try again with another TCP port numbers base.
NodeConfig4 = move_nonworking_nodedir_away(NodeConfig3),
NodeConfig5 = rabbit_ct_helpers:set_config(NodeConfig4,
- {rmq_failed_boot_attempts, Attempts + 1}),
+ {failed_boot_attempts, Attempts + 1}),
start_rabbitmq_node(Master, Config, NodeConfig5, I);
NodeConfig4 ->
Master ! {self(), NodeConfig4},
@@ -277,11 +259,11 @@ init_nodename(Config, NodeConfig, I) ->
end,
Nodename = list_to_atom(
rabbit_misc:format("rmq-ct~s-~b-~b@localhost", [Suffix, I + 1, Base])),
- rabbit_ct_helpers:set_config(NodeConfig, {rmq_nodename, Nodename}).
+ rabbit_ct_helpers:set_config(NodeConfig, {nodename, Nodename}).
init_config_filename(Config, NodeConfig, _I) ->
PrivDir = ?config(priv_dir, Config),
- Nodename = ?config(rmq_nodename, NodeConfig),
+ Nodename = ?config(nodename, NodeConfig),
ConfigDir = filename:join(PrivDir, Nodename),
ConfigFile = filename:join(ConfigDir, Nodename),
rabbit_ct_helpers:set_config(NodeConfig,
@@ -313,10 +295,9 @@ write_config_file(Config, NodeConfig, _I) ->
end.
do_start_rabbitmq_node(Config, NodeConfig, _I) ->
- Make = ?config(make_cmd, Config),
SrcDir = ?config(rabbit_srcdir, Config),
PrivDir = ?config(priv_dir, Config),
- Nodename = ?config(rmq_nodename, NodeConfig),
+ Nodename = ?config(nodename, NodeConfig),
DistPort = ?config(tcp_port_erlang_dist, NodeConfig),
ConfigFile = ?config(erlang_node_config_filename, NodeConfig),
%% Use inet_proxy_dist to handle distribution. This is used by the
@@ -351,18 +332,32 @@ do_start_rabbitmq_node(Config, NodeConfig, _I) ->
end,
StartArgs0 ++ " -kernel net_ticktime " ++ integer_to_list(Ticktime)
end,
- Cmd = Make ++ " -C " ++ SrcDir ++ rabbit_ct_helpers:make_verbosity() ++
- " start-background-broker" ++
- " RABBITMQ_NODENAME='" ++ atom_to_list(Nodename) ++ "'" ++
- " RABBITMQ_DIST_PORT='" ++ integer_to_list(DistPort) ++ "'" ++
- " RABBITMQ_CONFIG_FILE='" ++ ConfigFile ++ "'" ++
- " RABBITMQ_SERVER_START_ARGS='" ++ StartArgs1 ++ "'" ++
- " TEST_TMPDIR='" ++ PrivDir ++ "'",
- case rabbit_ct_helpers:run_cmd(Cmd) of
- true -> NodeConfig;
- false -> {skip, "Failed to initialize RabbitMQ"}
+ Cmd = ["start-background-broker",
+ {"RABBITMQ_NODENAME=~s", [Nodename]},
+ {"RABBITMQ_DIST_PORT=~b", [DistPort]},
+ {"RABBITMQ_CONFIG_FILE=~s", [ConfigFile]},
+ {"RABBITMQ_SERVER_START_ARGS=~s", [StartArgs1]},
+ {"TEST_TMPDIR=~s", [PrivDir]}],
+ case rabbit_ct_helpers:make(Config, SrcDir, Cmd) of
+ {ok, _} -> query_node(Config, NodeConfig);
+ _ -> {skip, "Failed to initialize RabbitMQ"}
end.
+query_node(Config, NodeConfig) ->
+ Nodename = ?config(nodename, NodeConfig),
+ PidFile = rpc(Config, Nodename, os, getenv, ["RABBITMQ_PID_FILE"]),
+ MnesiaDir = rpc(Config, Nodename, mnesia, system_info, [directory]),
+ {ok, PluginsDir} = rpc(Config, Nodename, application, get_env,
+ [rabbit, plugins_dir]),
+ {ok, EnabledPluginsFile} = rpc(Config, Nodename, application, get_env,
+ [rabbit, enabled_plugins_file]),
+ rabbit_ct_helpers:set_config(NodeConfig, [
+ {pid_file, PidFile},
+ {mnesia_dir, MnesiaDir},
+ {plugins_dir, PluginsDir},
+ {enabled_plugins_file, EnabledPluginsFile}
+ ]).
+
cluster_nodes(Config) ->
[NodeConfig1 | NodeConfigs] = get_node_configs(Config),
cluster_nodes1(Config, NodeConfig1, NodeConfigs).
@@ -376,22 +371,24 @@ cluster_nodes1(Config, _, []) ->
Config.
cluster_nodes(Config, NodeConfig1, NodeConfig2) ->
- Rabbitmqctl = ?config(rabbitmqctl_cmd, Config),
- Nodename1 = ?config(rmq_nodename, NodeConfig1),
- Nodename2 = ?config(rmq_nodename, NodeConfig2),
- Cmd =
- Rabbitmqctl ++ " -n \"" ++ atom_to_list(Nodename1) ++ "\"" ++
- " stop_app && " ++
- Rabbitmqctl ++ " -n \"" ++ atom_to_list(Nodename1) ++ "\"" ++
- " join_cluster \"" ++ atom_to_list(Nodename2) ++ "\" && " ++
- Rabbitmqctl ++ " -n \"" ++ atom_to_list(Nodename1) ++ "\"" ++
- " start_app",
- case rabbit_ct_helpers:run_cmd(Cmd) of
- true -> ok;
- false -> {skip,
- "Failed to cluster nodes \"" ++ atom_to_list(Nodename1) ++
- "\" and \"" ++ atom_to_list(Nodename2) ++ "\""}
- end.
+ Nodename1 = ?config(nodename, NodeConfig1),
+ Nodename2 = ?config(nodename, NodeConfig2),
+ Cmds = [
+ ["stop_app"],
+ ["join_cluster", Nodename2],
+ ["start_app"]
+ ],
+ cluster_nodes1(Config, Nodename1, Nodename2, Cmds).
+
+cluster_nodes1(Config, Nodename1, Nodename2, [Cmd | Rest]) ->
+ case rabbitmqctl(Config, Nodename1, Cmd) of
+ {ok, _} -> cluster_nodes1(Config, Nodename1, Nodename2, Rest);
+ _ -> {skip,
+ "Failed to cluster nodes \"" ++ atom_to_list(Nodename1) ++
+ "\" and \"" ++ atom_to_list(Nodename2) ++ "\""}
+ end;
+cluster_nodes1(_, _, _, []) ->
+ ok.
move_nonworking_nodedir_away(NodeConfig) ->
ConfigFile = ?config(erlang_node_config_filename, NodeConfig),
@@ -408,7 +405,7 @@ share_dist_and_proxy_ports_map(Config) ->
?config(tcp_port_erlang_dist, NodeConfig),
?config(tcp_port_erlang_dist_proxy, NodeConfig)
} || NodeConfig <- get_node_configs(Config)],
- run_on_all_brokers(Config,
+ rpc_all(Config,
application, set_env, [kernel, dist_and_proxy_ports_map, Map]),
Config.
@@ -418,15 +415,13 @@ stop_rabbitmq_nodes(Config) ->
proplists:delete(rmq_nodes, Config).
stop_rabbitmq_node(Config, NodeConfig) ->
- Make = ?config(make_cmd, Config),
SrcDir = ?config(rabbit_srcdir, Config),
PrivDir = ?config(priv_dir, Config),
- Nodename = ?config(rmq_nodename, NodeConfig),
- Cmd = Make ++ " -C " ++ SrcDir ++ rabbit_ct_helpers:make_verbosity() ++
- " stop-rabbit-on-node stop-node" ++
- " RABBITMQ_NODENAME='" ++ atom_to_list(Nodename) ++ "'" ++
- " TEST_TMPDIR='" ++ PrivDir ++ "'",
- rabbit_ct_helpers:run_cmd(Cmd),
+ Nodename = ?config(nodename, NodeConfig),
+ Cmd = ["stop-rabbit-on-node", "stop-node",
+ {"RABBITMQ_NODENAME=~s", [Nodename]},
+ {"TEST_TMPDIR=~s", [PrivDir]}],
+ rabbit_ct_helpers:make(Config, SrcDir, Cmd),
NodeConfig.
%% -------------------------------------------------------------------
@@ -517,6 +512,22 @@ expand_options(As, Bs) ->
end
end, Bs, As).
+%% Use rabbitmqctl(1) instead of using the Erlang API.
+
+rabbitmqctl(Config, Node, Args) ->
+ Rabbitmqctl = ?config(rabbitmqctl_cmd, Config),
+ NodeConfig = get_node_config(Config, Node),
+ Nodename = ?config(nodename, NodeConfig),
+ Env = [
+ {"RABBITMQ_PID_FILE", ?config(pid_file, NodeConfig)},
+ {"RABBITMQ_MNESIA_DIR", ?config(mnesia_dir, NodeConfig)},
+ {"RABBITMQ_PLUGINS_DIR", ?config(plugins_dir, NodeConfig)},
+ {"RABBITMQ_ENABLED_PLUGINS_FILE",
+ ?config(enabled_plugins_file, NodeConfig)}
+ ],
+ Cmd = [Rabbitmqctl, "-n", Nodename | Args],
+ rabbit_ct_helpers:exec(Cmd, [{env, Env}]).
+
%% -------------------------------------------------------------------
%% Other helpers.
%% -------------------------------------------------------------------
@@ -528,15 +539,40 @@ get_node_configs(Config, Key) ->
NodeConfigs = get_node_configs(Config),
[?config(Key, NodeConfig) || NodeConfig <- NodeConfigs].
-get_node_config(Config, I) ->
+get_node_config(Config, Node) when is_atom(Node) andalso Node =/= undefined ->
+ NodeConfigs = get_node_configs(Config),
+ get_node_config1(NodeConfigs, Node);
+get_node_config(Config, I) when is_integer(I) andalso I >= 0 ->
NodeConfigs = get_node_configs(Config),
lists:nth(I + 1, NodeConfigs).
-get_node_config(Config, I, Key) ->
- NodeConfig = get_node_config(Config, I),
+get_node_config1([NodeConfig | Rest], Node) ->
+ case ?config(nodename, NodeConfig) of
+ Node -> NodeConfig;
+ _ -> get_node_config1(Rest, Node)
+ end;
+get_node_config1([], Node) ->
+ exit({unknown_node, Node}).
+
+get_node_config(Config, Node, Key) ->
+ NodeConfig = get_node_config(Config, Node),
?config(Key, NodeConfig).
-add_code_path_to_broker(Node, Module) ->
+nodename_to_index(Config, Node) ->
+ NodeConfigs = get_node_configs(Config),
+ nodename_to_index1(NodeConfigs, Node, 0).
+
+nodename_to_index1([NodeConfig | Rest], Node, I) ->
+ case ?config(nodename, NodeConfig) of
+ Node -> I;
+ _ -> nodename_to_index1(Rest, Node, I + 1)
+ end;
+nodename_to_index1([], Node, _) ->
+ exit({unknown_node, Node}).
+
+%% Functions to execute code on a remote node/broker.
+
+add_code_path_to_node(Node, Module) ->
Path1 = filename:dirname(code:which(Module)),
Path2 = filename:dirname(code:which(?MODULE)),
Paths = lists:usort([Path1, Path2]),
@@ -549,18 +585,27 @@ add_code_path_to_broker(Node, Module) ->
end
end, Paths).
-add_code_path_to_all_brokers(Config, Module) ->
- Nodenames = get_node_configs(Config, rmq_nodename),
- [ok = add_code_path_to_broker(Nodename, Module)
+add_code_path_to_all_nodes(Config, Module) ->
+ Nodenames = get_node_configs(Config, nodename),
+ [ok = add_code_path_to_node(Nodename, Module)
|| Nodename <- Nodenames],
ok.
-run_on_broker(Node, Module, Function, Args) ->
- run_on_broker(Node, Module, Function, Args, infinity).
-
-run_on_broker(Node, Module, Function, Args, Timeout) ->
+rpc(Config, Node, Module, Function, Args)
+when is_atom(Node) andalso Node =/= undefined ->
+ rpc(Config, Node, Module, Function, Args, infinity);
+rpc(Config, I, Module, Function, Args)
+when is_integer(I) andalso I >= 0 ->
+ Node = get_node_config(Config, I, nodename),
+ rpc(Config, Node, Module, Function, Args);
+rpc(Config, Nodes, Module, Function, Args)
+when is_list(Nodes) ->
+ [rpc(Config, Node, Module, Function, Args) || Node <- Nodes].
+
+rpc(_Config, Node, Module, Function, Args, Timeout)
+when is_atom(Node) andalso Node =/= undefined ->
%% We add some directories to the broker node search path.
- add_code_path_to_broker(Node, Module),
+ add_code_path_to_node(Node, Module),
%% If there is an exception, rpc:call/{4,5} returns the exception as
%% a "normal" return value. If there is an exit signal, we raise
%% it again. In both cases, we have no idea of the module and line
@@ -573,51 +618,59 @@ run_on_broker(Node, Module, Function, Args, Timeout) ->
{badrpc, {'EXIT', Reason}} -> exit(Reason);
{badrpc, Reason} -> exit(Reason);
Ret -> Ret
+ end;
+rpc(Config, I, Module, Function, Args, Timeout)
+when is_integer(I) andalso I >= 0 ->
+ Node = get_node_config(Config, I, nodename),
+ rpc(Config, Node, Module, Function, Args, Timeout);
+rpc(Config, Nodes, Module, Function, Args, Timeout)
+when is_list(Nodes) ->
+ [rpc(Config, Node, Module, Function, Args, Timeout) || Node <- Nodes].
+
+rpc_all(Config, Module, Function, Args) ->
+ Nodes = get_node_configs(Config, nodename),
+ rpc(Config, Nodes, Module, Function, Args).
+
+rpc_all(Config, Module, Function, Args, Timeout) ->
+ Nodes = get_node_configs(Config, nodename),
+ rpc(Config, Nodes, Module, Function, Args, Timeout).
+
+%% Functions to start/restart/stop only the broker or the full Erlang
+%% node.
+
+start_node(Config, Node) ->
+ NodeConfig = get_node_config(Config, Node),
+ I = if
+ is_atom(Node) -> nodename_to_index(Config, Node);
+ true -> Node
+ end,
+ case do_start_rabbitmq_node(Config, NodeConfig, I) of
+ {skip, _} = Error -> {error, Error};
+ _ -> ok
end.
-run_on_broker_i(Config, I, Module, Function, Args) ->
- Node = get_node_config(Config, I, rmq_nodename),
- run_on_broker(Node, Module, Function, Args).
-
-run_on_broker_i(Config, I, Module, Function, Args, Timeout) ->
- Node = get_node_config(Config, I, rmq_nodename),
- run_on_broker(Node, Module, Function, Args, Timeout).
-
-run_on_all_brokers(Config, Module, Function, Args) ->
- Nodes = get_node_configs(Config, rmq_nodename),
- [run_on_broker(Node, Module, Function, Args) || Node <- Nodes].
-
-restart_broker(Nodename) ->
- ok = rabbit_ct_broker_helpers:run_on_broker(Nodename,
- ?MODULE, do_restart_broker, []).
+start_broker(Config, Node) ->
+ ok = rpc(Config, Node, rabbit, start, []).
-restart_broker_i(Config, I) ->
- ok = rabbit_ct_broker_helpers:run_on_broker_i(Config, I,
- ?MODULE, do_restart_broker, []).
+restart_broker(Config, Node) ->
+ ok = rpc(Config, Node, ?MODULE, do_restart_broker, []).
do_restart_broker() ->
- rabbit:stop(),
- rabbit:start().
+ ok = rabbit:stop(),
+ ok = rabbit:start().
-stop_broker(Nodename) ->
- ok = rabbit_ct_broker_helpers:run_on_broker(Nodename,
- rabbit, stop, []).
+stop_broker(Config, Node) ->
+ ok = rpc(Config, Node, rabbit, stop, []).
-stop_broker_i(Config, I) ->
- ok = rabbit_ct_broker_helpers:run_on_broker_i(Config, I,
- rabbit, stop, []).
-
-kill_broker(Nodename) ->
- Pid = rabbit_ct_broker_helpers:run_on_broker(Nodename,
- os, getpid, []),
- do_kill_broker(Pid).
-
-kill_broker_i(Config, I) ->
- Pid = rabbit_ct_broker_helpers:run_on_broker_i(Config, I,
- os, getpid, []),
- do_kill_broker(Pid).
+stop_node(Config, Node) ->
+ NodeConfig = get_node_config(Config, Node),
+ case stop_rabbitmq_node(Config, NodeConfig) of
+ {skip, _} = Error -> Error;
+ _ -> ok
+ end.
-do_kill_broker(Pid) ->
+kill_node(Config, Node) ->
+ Pid = rpc(Config, Node, os, getpid, []),
%% FIXME maybe_flush_cover(Cfg),
os:cmd("kill -9 " ++ Pid),
await_os_pid_death(Pid).
@@ -662,19 +715,19 @@ get_queue_sup_pid([], _QueuePid) ->
%% Policy helpers.
%% -------------------------------------------------------------------
-set_policy(Config, I, Name, Pattern, ApplyTo, Definition) ->
- ok = run_on_broker_i(Config, I,
+set_policy(Config, Node, Name, Pattern, ApplyTo, Definition) ->
+ ok = rpc(Config, Node,
rabbit_policy, set, [<<"/">>, Name, Pattern, Definition, 0, ApplyTo]).
-clear_policy(Config, I, Name) ->
- ok = run_on_broker_i(Config, I,
+clear_policy(Config, Node, Name) ->
+ ok = rpc(Config, Node,
rabbit_policy, delete, [<<"/">>, Name]).
-set_ha_policy(Config, I, Pattern, Policy) ->
- set_ha_policy(Config, I, Pattern, Policy, []).
+set_ha_policy(Config, Node, Pattern, Policy) ->
+ set_ha_policy(Config, Node, Pattern, Policy, []).
-set_ha_policy(Config, I, Pattern, Policy, Extra) ->
- set_policy(Config, I, Pattern, Pattern, <<"queues">>,
+set_ha_policy(Config, Node, Pattern, Policy, Extra) ->
+ set_policy(Config, Node, Pattern, Pattern, <<"queues">>,
ha_policy(Policy) ++ Extra).
ha_policy(<<"all">>) -> [{<<"ha-mode">>, <<"all">>}];