diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-08-13 16:18:01 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-08-13 16:18:01 +0100 |
commit | d65d6cad62c79239c3116fca499226c16ffc2c4c (patch) | |
tree | 77af9858123eccf3b59b45273f8864840a5e8243 | |
parent | fae24bbac8719b12b0dec0154a61d267cab3ab4b (diff) | |
parent | fb43be6543d3b4c60a5a6b338d1d70661fa4afaf (diff) | |
download | rabbitmq-server-d65d6cad62c79239c3116fca499226c16ffc2c4c.tar.gz |
Merged bug25572 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 10 | ||||
-rw-r--r-- | src/rabbit.erl | 1 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 51 | ||||
-rw-r--r-- | src/rabbit_policy.erl | 82 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 69 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 23 |
6 files changed, 189 insertions, 47 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 1d641144..b2361cde 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -960,7 +960,7 @@ </para> <variablelist> <varlistentry> - <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg> <arg choice="opt"><replaceable>priority</replaceable></arg> </cmdsynopsis></term> + <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">--priority <replaceable>priority</replaceable></arg> <arg choice="opt">--apply-to <replaceable>apply-to</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg></cmdsynopsis></term> <listitem> <para> Sets a policy. @@ -989,7 +989,13 @@ <varlistentry> <term>priority</term> <listitem><para> - The priority of the policy as an integer, defaulting to 0. Higher numbers indicate greater precedence. + The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0. + </para></listitem> + </varlistentry> + <varlistentry> + <term>apply-to</term> + <listitem><para> + Which types of object this policy should apply to - "queues", "exchanges" or "all". The default is "all". </para></listitem> </varlistentry> </variablelist> diff --git a/src/rabbit.erl b/src/rabbit.erl index 2baec885..a9974711 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -588,6 +588,7 @@ boot_delegate() -> rabbit_sup:start_supervisor_child(delegate_sup, [Count]). recover() -> + rabbit_policy:recover(), Qs = rabbit_amqqueue:recover(), ok = rabbit_binding:recover(rabbit_exchange:recover(), [QName || #amqqueue{name = QName} <- Qs]), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 0b666a36..6f36f99d 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,7 +17,8 @@ -module(rabbit_control_main). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]). +-export([start/0, stop/0, parse_arguments/2, action/5, + sync_queue/1, cancel_sync_queue/1]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -25,12 +26,16 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(PRIORITY_OPT, "--priority"). +-define(APPLY_TO_OPT, "--apply-to"). -define(RAM_OPT, "--ram"). -define(OFFLINE_OPT, "--offline"). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(PRIORITY_DEF, {?PRIORITY_OPT, {option, "0"}}). +-define(APPLY_TO_DEF, {?APPLY_TO_OPT, {option, "all"}}). -define(RAM_DEF, {?RAM_OPT, flag}). -define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). @@ -72,7 +77,7 @@ {clear_parameter, [?VHOST_DEF]}, {list_parameters, [?VHOST_DEF]}, - {set_policy, [?VHOST_DEF]}, + {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]}, {clear_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, @@ -127,19 +132,13 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr), - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() end, - Opts1 = [case K of - ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; - _ -> {K, V} - end || {K, V} <- Opts], - Quiet = proplists:get_bool(?QUIET_OPT, Opts1), - Node = proplists:get_value(?NODE_OPT, Opts1), + Quiet = proplists:get_bool(?QUIET_OPT, Opts), + Node = proplists:get_value(?NODE_OPT, Opts), Inform = case Quiet of true -> fun (_Format, _Args1) -> ok end; false -> fun (Format, Args1) -> @@ -230,6 +229,19 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), rabbit_misc:quit(1). +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + %%---------------------------------------------------------------------------- action(stop, Node, Args, _Opts, Inform) -> @@ -484,16 +496,15 @@ action(list_parameters, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), rabbit_runtime_parameters:info_keys()); -action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform) - when Prio == [] orelse length(Prio) == 1 -> - Msg = "Setting policy ~p for pattern ~p to ~p", - {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined}; - [P] -> {Msg ++ " with priority ~s", P} - end, +action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> + Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform(InformMsg, [Key, Pattern, Defn] ++ Prio), - rpc_call(Node, rabbit_policy, parse_set, - [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]); + PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts), + ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)), + Inform(Msg, [Key, Pattern, Defn, PriorityArg]), + rpc_call( + Node, rabbit_policy, parse_set, + [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]); action(clear_policy, Node, [Key], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 91ca88dd..0785d278 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -25,9 +25,10 @@ -import(rabbit_misc, [pget/2]). -export([register/0]). +-export([invalidate/0, recover/0]). -export([name/1, get/2, set/1]). -export([validate/4, notify/4, notify_clear/3]). --export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, +-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1, list_formatted/1, info_keys/0]). -rabbit_boot_step({?MODULE, @@ -51,6 +52,10 @@ set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set( set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)). +set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)}; +set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set( + X#exchange{policy = match(Name, Ps)}). + get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy); get(Name, #exchange{policy = Policy}) -> get0(Name, Policy); %% Caution - SLOW. @@ -68,32 +73,70 @@ get0(Name, List) -> case pget(definition, List) of %%---------------------------------------------------------------------------- -parse_set(VHost, Name, Pattern, Definition, undefined) -> - parse_set0(VHost, Name, Pattern, Definition, 0); -parse_set(VHost, Name, Pattern, Definition, Priority) -> +%% Gets called during upgrades - therefore must not assume anything about the +%% state of Mnesia +invalidate() -> + rabbit_file:write_file(invalid_file(), <<"">>). + +recover() -> + case rabbit_file:is_file(invalid_file()) of + true -> recover0(), + rabbit_file:delete(invalid_file()); + false -> ok + end. + +%% To get here we have to have just completed an Mnesia upgrade - i.e. we are +%% the first node starting. So we can rewrite the whole database. Note that +%% recovery has not yet happened; we must work with the rabbit_durable_<thing> +%% variants. +recover0() -> + Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}), + Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}), + Policies = list(), + [rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_exchange, set(X, Policies), write) + end) || X <- Xs], + [rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:write(rabbit_durable_queue, set(Q, Policies), write) + end) || Q <- Qs], + ok. + +invalid_file() -> + filename:join(rabbit_mnesia:dir(), "policies_are_invalid"). + +%%---------------------------------------------------------------------------- + +parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> try list_to_integer(Priority) of - Num -> parse_set0(VHost, Name, Pattern, Definition, Num) + Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo) catch error:badarg -> {error, "~p priority must be a number", [Priority]} end. -parse_set0(VHost, Name, Pattern, Defn, Priority) -> +parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) -> case rabbit_misc:json_decode(Defn) of {ok, JSON} -> set0(VHost, Name, [{<<"pattern">>, list_to_binary(Pattern)}, {<<"definition">>, rabbit_misc:json_to_term(JSON)}, - {<<"priority">>, Priority}]); + {<<"priority">>, Priority}, + {<<"apply-to">>, ApplyTo}]); error -> {error_string, "JSON decoding error"} end. -set(VHost, Name, Pattern, Definition, Priority) -> +set(VHost, Name, Pattern, Definition, Priority, ApplyTo) -> PolicyProps = [{<<"pattern">>, Pattern}, {<<"definition">>, Definition}, {<<"priority">>, case Priority of undefined -> 0; _ -> Priority + end}, + {<<"apply-to">>, case ApplyTo of + undefined -> <<"all">>; + _ -> ApplyTo end}], set0(VHost, Name, PolicyProps). @@ -130,6 +173,7 @@ p(Parameter, DefnFun) -> [{vhost, pget(vhost, Parameter)}, {name, pget(name, Parameter)}, {pattern, pget(<<"pattern">>, Value)}, + {'apply-to', pget(<<"apply-to">>, Value)}, {definition, DefnFun(pget(<<"definition">>, Value))}, {priority, pget(<<"priority">>, Value)}]. @@ -139,7 +183,7 @@ format(Term) -> ident(X) -> X. -info_keys() -> [vhost, name, pattern, definition, priority]. +info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. %%---------------------------------------------------------------------------- @@ -202,8 +246,16 @@ match(Name, Policies) -> [Policy | _Rest] -> Policy end. -matches(#resource{name = Name}, Policy) -> - match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]). +matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) -> + matches_type(Kind, pget('apply-to', Policy)) andalso + match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso + VHost =:= pget(vhost, Policy). + +matches_type(exchange, <<"exchanges">>) -> true; +matches_type(queue, <<"queues">>) -> true; +matches_type(exchange, <<"all">>) -> true; +matches_type(queue, <<"all">>) -> true; +matches_type(_, _) -> false. sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). @@ -212,6 +264,7 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). policy_validation() -> [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"apply-to">>, fun apply_to_validation/2, optional}, {<<"definition">>, fun validation/2, mandatory}]. validation(_Name, []) -> @@ -257,3 +310,10 @@ a2b(A) -> list_to_binary(atom_to_list(A)). dups(L) -> L -- lists:usort(L). is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]). + +apply_to_validation(_Name, <<"all">>) -> ok; +apply_to_validation(_Name, <<"exchanges">>) -> ok; +apply_to_validation(_Name, <<"queues">>) -> ok; +apply_to_validation(_Name, Term) -> + {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' " + "or 'all'", [Term]}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 30cf9114..5af4969a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -60,6 +60,7 @@ all_tests() -> passed = test_user_management(), passed = test_runtime_parameters(), passed = test_policy_validation(), + passed = test_policy_opts_validation(), passed = test_ha_policy_validation(), passed = test_server_status(), passed = test_amqp_connection_refusal(), @@ -1083,29 +1084,57 @@ test_runtime_parameters() -> test_policy_validation() -> rabbit_runtime_parameters_test:register_policy_validator(), - SetPol = - fun (Key, Val) -> - control_action( - set_policy, - ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) - end, + SetPol = fun (Key, Val) -> + control_action_opts( + ["set_policy", "name", ".*", + rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) + end, - ok = SetPol("testeven", []), - ok = SetPol("testeven", [1, 2]), - ok = SetPol("testeven", [1, 2, 3, 4]), - ok = SetPol("testpos", [2, 5, 5678]), + ok = SetPol("testeven", []), + ok = SetPol("testeven", [1, 2]), + ok = SetPol("testeven", [1, 2, 3, 4]), + ok = SetPol("testpos", [2, 5, 5678]), - {error_string, _} = SetPol("testpos", [-1, 0, 1]), - {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + error = SetPol("testpos", [-1, 0, 1]), + error = SetPol("testeven", [ 1, 2, 3]), ok = control_action(clear_policy, ["name"]), rabbit_runtime_parameters_test:unregister_policy_validator(), passed. +test_policy_opts_validation() -> + Set = fun (Extra) -> control_action_opts( + ["set_policy", "name", ".*", "{\"ha-mode\":\"all\"}" + | Extra]) end, + OK = fun (Extra) -> ok = Set(Extra) end, + Fail = fun (Extra) -> error = Set(Extra) end, + + OK ([]), + + OK (["--priority", "0"]), + OK (["--priority", "3"]), + Fail(["--priority", "banana"]), + Fail(["--priority"]), + + OK (["--apply-to", "all"]), + OK (["--apply-to", "queues"]), + Fail(["--apply-to", "bananas"]), + Fail(["--apply-to"]), + + OK (["--priority", "3", "--apply-to", "queues"]), + Fail(["--priority", "banana", "--apply-to", "queues"]), + Fail(["--priority", "3", "--apply-to", "bananas"]), + + Fail(["--offline"]), + + ok = control_action(clear_policy, ["name"]), + passed. + test_ha_policy_validation() -> - Set = fun (JSON) -> control_action(set_policy, ["name", ".*", JSON]) end, + Set = fun (JSON) -> control_action_opts( + ["set_policy", "name", ".*", JSON]) end, OK = fun (JSON) -> ok = Set(JSON) end, - Fail = fun (JSON) -> {error_string, _} = Set(JSON) end, + Fail = fun (JSON) -> error = Set(JSON) end, OK ("{\"ha-mode\":\"all\"}"), Fail("{\"ha-mode\":\"made_up\"}"), @@ -1611,6 +1640,18 @@ control_action(Command, Node, Args, Opts) -> Other end. +control_action_opts(Raw) -> + NodeStr = atom_to_list(node()), + case rabbit_control_main:parse_arguments(Raw, NodeStr) of + {ok, {Cmd, Opts, Args}} -> + case control_action(Cmd, node(), Args, Opts) of + ok -> ok; + _ -> error + end; + _ -> + error + end. + info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1613838c..d50cb282 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -44,6 +44,7 @@ -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). -rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). -rabbit_upgrade({exchange_decorators, mnesia, [policy]}). +-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}). %% ------------------------------------------------------------------- @@ -70,6 +71,7 @@ -spec(no_mirror_nodes/0 :: () -> 'ok'). -spec(gm_pids/0 :: () -> 'ok'). -spec(exchange_decorators/0 :: () -> 'ok'). +-spec(policy_apply_to/0 :: () -> 'ok'). -endif. @@ -299,6 +301,27 @@ exchange_decorators(Table) -> [name, type, durable, auto_delete, internal, arguments, scratches, policy, decorators]). +policy_apply_to() -> + transform( + rabbit_runtime_parameters, + fun ({runtime_parameters, Key = {_VHost, <<"policy">>, _Name}, Value}) -> + ApplyTo = apply_to(proplists:get_value(<<"definition">>, Value)), + {runtime_parameters, Key, [{<<"apply-to">>, ApplyTo} | Value]}; + ({runtime_parameters, Key, Value}) -> + {runtime_parameters, Key, Value} + end, + [key, value]), + rabbit_policy:invalidate(), + ok. + +apply_to(Def) -> + case [proplists:get_value(K, Def) || + K <- [<<"federation-upstream-set">>, <<"ha-mode">>]] of + [undefined, undefined] -> <<"all">>; + [_, undefined] -> <<"exchanges">>; + [undefined, _] -> <<"queues">>; + [_, _] -> <<"all">> + end. %%-------------------------------------------------------------------- |