summaryrefslogtreecommitdiff
path: root/src/rabbit_control.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_control.erl')
-rw-r--r--src/rabbit_control.erl214
1 files changed, 118 insertions, 96 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 323d4d2f..a3b6f369 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,20 +32,25 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/4]).
-
--record(params, {quiet, node, command, args}).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
+-define(QUIET_OPT, "-q").
+-define(NODE_OPT, "-n").
+-define(VHOST_OPT, "-p").
+-define(SCOPE_OPT, "-s").
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
--spec(action/4 :: (atom(), erlang_node(), [string()],
- fun ((string(), [any()]) -> 'ok')) -> 'ok').
+-spec(action/5 ::
+ (atom(), node(), [string()], [{string(), any()}],
+ fun ((string(), [any()]) -> 'ok'))
+ -> 'ok').
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -55,18 +60,33 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
FullCommand = init:get_plain_arguments(),
- #params{quiet = Quiet, node = Node, command = Command, args = Args} =
- parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:makenode(NodeStr)}),
+ case FullCommand of
+ [] -> usage();
+ _ -> ok
+ end,
+ {[Command0 | Args], Opts} =
+ rabbit_misc:get_options(
+ [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr},
+ {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}],
+ FullCommand),
+ Opts1 = lists:map(fun({K, V}) ->
+ case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ _ -> {K, V}
+ end
+ end, Opts),
+ Command = list_to_atom(Command0),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
+ Node = proplists:get_value(?NODE_OPT, Opts1),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
io:format(Format ++ " ...~n", Args1)
- end
+ end
end,
%% The reason we don't use a try/catch here is that rpc:call turns
%% thrown errors into normal return values
- case catch action(Command, Node, Args, Inform) of
+ case catch action(Command, Node, Args, Opts, Inform) of
ok ->
case Quiet of
true -> ok;
@@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) ->
fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
ok.
-parse_args(["-n", NodeS | Args], Params) ->
- parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)});
-parse_args(["-q" | Args], Params) ->
- parse_args(Args, Params#params{quiet = true});
-parse_args([Command | Args], Params) ->
- Params#params{command = list_to_atom(Command), args = Args};
-parse_args([], _) ->
- usage().
-
stop() ->
ok.
@@ -134,33 +145,39 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
halt(1).
-action(stop, Node, [], Inform) ->
+action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
-action(stop_app, Node, [], Inform) ->
+action(stop_app, Node, [], _Opts, Inform) ->
Inform("Stopping node ~p", [Node]),
call(Node, {rabbit, stop, []});
-action(start_app, Node, [], Inform) ->
+action(start_app, Node, [], _Opts, Inform) ->
Inform("Starting node ~p", [Node]),
call(Node, {rabbit, start, []});
-action(reset, Node, [], Inform) ->
+action(reset, Node, [], _Opts, Inform) ->
Inform("Resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, reset, []});
-action(force_reset, Node, [], Inform) ->
+action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, Inform) ->
+action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-action(status, Node, [], Inform) ->
+action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
+ ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
+ Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
+ [Node, ClusterNodes]),
+ rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+
+action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
{badrpc, _} = Res -> Res;
@@ -168,129 +185,125 @@ action(status, Node, [], Inform) ->
ok
end;
-action(rotate_logs, Node, [], Inform) ->
+action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
-action(rotate_logs, Node, Args = [Suffix], Inform) ->
+action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
Inform("Rotating logs to files with suffix ~p", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
-action(close_connection, Node, [PidStr, Explanation], Inform) ->
+action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
Inform("Closing connection ~s", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
-action(add_user, Node, Args = [Username, _Password], Inform) ->
+action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
Inform("Creating user ~p", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
-action(delete_user, Node, Args = [_Username], Inform) ->
+action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
Inform("Deleting user ~p", Args),
call(Node, {rabbit_access_control, delete_user, Args});
-action(change_password, Node, Args = [Username, _Newpassword], Inform) ->
+action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
-action(list_users, Node, [], Inform) ->
+action(set_admin, Node, [Username], _Opts, Inform) ->
+ Inform("Setting administrative status for user ~p", [Username]),
+ call(Node, {rabbit_access_control, set_admin, [Username]});
+
+action(clear_admin, Node, [Username], _Opts, Inform) ->
+ Inform("Clearing administrative status for user ~p", [Username]),
+ call(Node, {rabbit_access_control, clear_admin, [Username]});
+
+action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
display_list(call(Node, {rabbit_access_control, list_users, []}));
-action(add_vhost, Node, Args = [_VHostPath], Inform) ->
+action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost ~p", Args),
call(Node, {rabbit_access_control, add_vhost, Args});
-action(delete_vhost, Node, Args = [_VHostPath], Inform) ->
+action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
call(Node, {rabbit_access_control, delete_vhost, Args});
-action(list_vhosts, Node, [], Inform) ->
+action(list_vhosts, Node, [], _Opts, Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(list_user_permissions, Node, Args = [_Username], Inform) ->
+action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
display_list(call(Node, {rabbit_access_control, list_user_permissions,
Args}));
-action(list_queues, Node, Args, Inform) ->
+action(list_queues, Node, Args, Opts, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_exchanges, Node, Args, Inform) ->
+action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, Args, Inform) ->
+action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
- InfoKeys = [exchange_name, queue_name, routing_key, args],
- display_info_list(
- [lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys);
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ routing_key, arguments]),
+ display_info_list(rpc_call(Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms]),
+ ArgAtoms);
-action(list_connections, Node, Args, Inform) ->
+action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
-action(list_channels, Node, Args, Inform) ->
+action(list_channels, Node, Args, _Opts, Inform) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count,
messages_unacknowledged]),
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
-action(list_consumers, Node, Args, Inform) ->
+action(list_consumers, Node, _Args, Opts, Inform) ->
Inform("Listing consumers", []),
- {VHostArg, _} = parse_vhost_flag_bin(Args),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
InfoKeys);
-action(Command, Node, Args, Inform) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- action(Command, Node, VHost, RemainingArgs, Inform).
-
-action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
+action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Scope = proplists:get_value(?SCOPE_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, set_permissions,
- [Username, VHost, CPerm, WPerm, RPerm]});
+ [Scope, Username, VHost, CPerm, WPerm, RPerm]});
-action(clear_permissions, Node, VHost, [Username], Inform) ->
+action(clear_permissions, Node, [Username], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
-action(list_permissions, Node, VHost, [], Inform) ->
+action(list_permissions, Node, [], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
[VHost]})).
-parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {VHost, RemainingArgs};
- RemainingArgs ->
- {"/", RemainingArgs}
- end.
-
-parse_vhost_flag_bin(Args) ->
- {VHost, RemainingArgs} = parse_vhost_flag(Args),
- {list_to_binary(VHost), RemainingArgs}.
-
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
Default;
@@ -299,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
- X <- InfoItemKeys])
- end, Results),
+ lists:foreach(
+ fun (Result) -> display_row(
+ [format_info_item(proplists:get_value(X, Result)) ||
+ X <- InfoItemKeys])
+ end, Results),
ok;
display_info_list(Other, _) ->
Other.
@@ -310,25 +325,30 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
-format_info_item(Key, Items) ->
- case proplists:get_value(Key, Items) of
- #resource{name = Name} ->
- escape(Name);
- Value when Key =:= address; Key =:= peer_address andalso
- is_tuple(Value) ->
- inet_parse:ntoa(Value);
- Value when is_pid(Value) ->
- rabbit_misc:pid_to_string(Value);
- Value when is_binary(Value) ->
- escape(Value);
- Value when is_atom(Value) ->
- escape(atom_to_list(Value));
- Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
- when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
- io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
- Value ->
- io_lib:format("~w", [Value])
- end.
+-define(IS_U8(X), (X >= 0 andalso X =< 255)).
+-define(IS_U16(X), (X >= 0 andalso X =< 65535)).
+
+format_info_item(#resource{name = Name}) ->
+ escape(Name);
+format_info_item({N1, N2, N3, N4} = Value) when
+ ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
+ inet_parse:ntoa(Value);
+format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
+ ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
+ ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
+ inet_parse:ntoa(Value);
+format_info_item(Value) when is_pid(Value) ->
+ rabbit_misc:pid_to_string(Value);
+format_info_item(Value) when is_binary(Value) ->
+ escape(Value);
+format_info_item(Value) when is_atom(Value) ->
+ escape(atom_to_list(Value));
+format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
+ Value) when is_binary(TableEntryKey) andalso
+ is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item(Value) ->
+ io_lib:format("~w", [Value]).
display_list(L) when is_list(L) ->
lists:foreach(fun (I) when is_binary(I) ->
@@ -351,6 +371,8 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
+escape(Atom) when is_atom(Atom) ->
+ escape(atom_to_list(Atom));
escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->