diff options
Diffstat (limited to 'src/rabbit_control.erl')
-rw-r--r-- | src/rabbit_control.erl | 214 |
1 files changed, 118 insertions, 96 deletions
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 323d4d2f..a3b6f369 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -32,20 +32,25 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/4]). - --record(params, {quiet, node, command, args}). +-export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). +-define(QUIET_OPT, "-q"). +-define(NODE_OPT, "-n"). +-define(VHOST_OPT, "-p"). +-define(SCOPE_OPT, "-s"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). --spec(action/4 :: (atom(), erlang_node(), [string()], - fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(action/5 :: + (atom(), node(), [string()], [{string(), any()}], + fun ((string(), [any()]) -> 'ok')) + -> 'ok'). -spec(usage/0 :: () -> no_return()). -endif. @@ -55,18 +60,33 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), - #params{quiet = Quiet, node = Node, command = Command, args = Args} = - parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:makenode(NodeStr)}), + case FullCommand of + [] -> usage(); + _ -> ok + end, + {[Command0 | Args], Opts} = + rabbit_misc:get_options( + [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, + {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + FullCommand), + Opts1 = lists:map(fun({K, V}) -> + case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; + _ -> {K, V} + end + end, Opts), + Command = list_to_atom(Command0), + Quiet = proplists:get_bool(?QUIET_OPT, Opts1), + Node = proplists:get_value(?NODE_OPT, Opts1), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) - end + end end, %% The reason we don't use a try/catch here is that rpc:call turns %% thrown errors into normal return values - case catch action(Command, Node, Args, Inform) of + case catch action(Command, Node, Args, Opts, Inform) of ok -> case Quiet of true -> ok; @@ -118,15 +138,6 @@ print_badrpc_diagnostics(Node) -> fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), ok. -parse_args(["-n", NodeS | Args], Params) -> - parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)}); -parse_args(["-q" | Args], Params) -> - parse_args(Args, Params#params{quiet = true}); -parse_args([Command | Args], Params) -> - Params#params{command = list_to_atom(Command), args = Args}; -parse_args([], _) -> - usage(). - stop() -> ok. @@ -134,33 +145,39 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). -action(stop, Node, [], Inform) -> +action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), call(Node, {rabbit, stop_and_halt, []}); -action(stop_app, Node, [], Inform) -> +action(stop_app, Node, [], _Opts, Inform) -> Inform("Stopping node ~p", [Node]), call(Node, {rabbit, stop, []}); -action(start_app, Node, [], Inform) -> +action(start_app, Node, [], _Opts, Inform) -> Inform("Starting node ~p", [Node]), call(Node, {rabbit, start, []}); -action(reset, Node, [], Inform) -> +action(reset, Node, [], _Opts, Inform) -> Inform("Resetting node ~p", [Node]), call(Node, {rabbit_mnesia, reset, []}); -action(force_reset, Node, [], Inform) -> +action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, Inform) -> +action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Clustering node ~p with ~p", [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); -action(status, Node, [], Inform) -> +action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> + ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), + Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", + [Node, ClusterNodes]), + rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); + +action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of {badrpc, _} = Res -> Res; @@ -168,129 +185,125 @@ action(status, Node, [], Inform) -> ok end; -action(rotate_logs, Node, [], Inform) -> +action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Reopening logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, [""]}); -action(rotate_logs, Node, Args = [Suffix], Inform) -> +action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> Inform("Rotating logs to files with suffix ~p", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); -action(close_connection, Node, [PidStr, Explanation], Inform) -> +action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> Inform("Closing connection ~s", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, [rabbit_misc:string_to_pid(PidStr), Explanation]); -action(add_user, Node, Args = [Username, _Password], Inform) -> +action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> Inform("Creating user ~p", [Username]), call(Node, {rabbit_access_control, add_user, Args}); -action(delete_user, Node, Args = [_Username], Inform) -> +action(delete_user, Node, Args = [_Username], _Opts, Inform) -> Inform("Deleting user ~p", Args), call(Node, {rabbit_access_control, delete_user, Args}); -action(change_password, Node, Args = [Username, _Newpassword], Inform) -> +action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); -action(list_users, Node, [], Inform) -> +action(set_admin, Node, [Username], _Opts, Inform) -> + Inform("Setting administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, set_admin, [Username]}); + +action(clear_admin, Node, [Username], _Opts, Inform) -> + Inform("Clearing administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, clear_admin, [Username]}); + +action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), display_list(call(Node, {rabbit_access_control, list_users, []})); -action(add_vhost, Node, Args = [_VHostPath], Inform) -> +action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost ~p", Args), call(Node, {rabbit_access_control, add_vhost, Args}); -action(delete_vhost, Node, Args = [_VHostPath], Inform) -> +action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), call(Node, {rabbit_access_control, delete_vhost, Args}); -action(list_vhosts, Node, [], Inform) -> +action(list_vhosts, Node, [], _Opts, Inform) -> Inform("Listing vhosts", []), display_list(call(Node, {rabbit_access_control, list_vhosts, []})); -action(list_user_permissions, Node, Args = [_Username], Inform) -> +action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), display_list(call(Node, {rabbit_access_control, list_user_permissions, Args})); -action(list_queues, Node, Args, Inform) -> +action(list_queues, Node, Args, Opts, Inform) -> Inform("Listing queues", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); -action(list_exchanges, Node, Args, Inform) -> +action(list_exchanges, Node, Args, Opts, Inform) -> Inform("Listing exchanges", []), - {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = default_if_empty(RemainingArgs, [name, type]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), display_info_list(rpc_call(Node, rabbit_exchange, info_all, [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, Args, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys); + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); -action(list_connections, Node, Args, Inform) -> +action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); -action(list_channels, Node, Args, Inform) -> +action(list_channels, Node, Args, _Opts, Inform) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, transactional, consumer_count, messages_unacknowledged]), display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); -action(list_consumers, Node, Args, Inform) -> +action(list_consumers, Node, _Args, Opts, Inform) -> Inform("Listing consumers", []), - {VHostArg, _} = parse_vhost_flag_bin(Args), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], InfoKeys); -action(Command, Node, Args, Inform) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - action(Command, Node, VHost, RemainingArgs, Inform). - -action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> +action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, WPerm, RPerm]}); + [Scope, Username, VHost, CPerm, WPerm, RPerm]}); -action(clear_permissions, Node, VHost, [Username], Inform) -> +action(clear_permissions, Node, [Username], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]}); -action(list_permissions, Node, VHost, [], Inform) -> +action(list_permissions, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_access_control, list_vhost_permissions, [VHost]})). -parse_vhost_flag(Args) when is_list(Args) -> - case Args of - ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; - RemainingArgs -> - {"/", RemainingArgs} - end. - -parse_vhost_flag_bin(Args) -> - {VHost, RemainingArgs} = parse_vhost_flag(Args), - {list_to_binary(VHost), RemainingArgs}. - default_if_empty(List, Default) when is_list(List) -> if List == [] -> Default; @@ -299,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) + end, Results), ok; display_info_list(Other, _) -> Other. @@ -310,25 +325,30 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Key, Items) -> - case proplists:get_value(Key, Items) of - #resource{name = Name} -> - escape(Name); - Value when Key =:= address; Key =:= peer_address andalso - is_tuple(Value) -> - inet_parse:ntoa(Value); - Value when is_pid(Value) -> - rabbit_misc:pid_to_string(Value); - Value when is_binary(Value) -> - escape(Value); - Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] - when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> - io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); - Value -> - io_lib:format("~w", [Value]) - end. +-define(IS_U8(X), (X >= 0 andalso X =< 255)). +-define(IS_U16(X), (X >= 0 andalso X =< 65535)). + +format_info_item(#resource{name = Name}) -> + escape(Name); +format_info_item({N1, N2, N3, N4} = Value) when + ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> + inet_parse:ntoa(Value); +format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when + ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), + ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> + inet_parse:ntoa(Value); +format_info_item(Value) when is_pid(Value) -> + rabbit_misc:pid_to_string(Value); +format_info_item(Value) when is_binary(Value) -> + escape(Value); +format_info_item(Value) when is_atom(Value) -> + escape(atom_to_list(Value)); +format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = + Value) when is_binary(TableEntryKey) andalso + is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item(Value) -> + io_lib:format("~w", [Value]). display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> @@ -351,6 +371,8 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. +escape(Atom) when is_atom(Atom) -> + escape(atom_to_list(Atom)); escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> |