summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-08-13 16:18:01 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-08-13 16:18:01 +0100
commitd65d6cad62c79239c3116fca499226c16ffc2c4c (patch)
tree77af9858123eccf3b59b45273f8864840a5e8243
parentfae24bbac8719b12b0dec0154a61d267cab3ab4b (diff)
parentfb43be6543d3b4c60a5a6b338d1d70661fa4afaf (diff)
downloadrabbitmq-server-d65d6cad62c79239c3116fca499226c16ffc2c4c.tar.gz
Merged bug25572 into default
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_control_main.erl51
-rw-r--r--src/rabbit_policy.erl82
-rw-r--r--src/rabbit_tests.erl69
-rw-r--r--src/rabbit_upgrade_functions.erl23
6 files changed, 189 insertions, 47 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 1d641144..b2361cde 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -960,7 +960,7 @@
</para>
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg> <arg choice="opt"><replaceable>priority</replaceable></arg> </cmdsynopsis></term>
+ <term><cmdsynopsis><command>set_policy</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt">--priority <replaceable>priority</replaceable></arg> <arg choice="opt">--apply-to <replaceable>apply-to</replaceable></arg> <arg choice="req"><replaceable>name</replaceable></arg> <arg choice="req"><replaceable>pattern</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg></cmdsynopsis></term>
<listitem>
<para>
Sets a policy.
@@ -989,7 +989,13 @@
<varlistentry>
<term>priority</term>
<listitem><para>
- The priority of the policy as an integer, defaulting to 0. Higher numbers indicate greater precedence.
+ The priority of the policy as an integer. Higher numbers indicate greater precedence. The default is 0.
+ </para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>apply-to</term>
+ <listitem><para>
+ Which types of object this policy should apply to - "queues", "exchanges" or "all". The default is "all".
</para></listitem>
</varlistentry>
</variablelist>
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2baec885..a9974711 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -588,6 +588,7 @@ boot_delegate() ->
rabbit_sup:start_supervisor_child(delegate_sup, [Count]).
recover() ->
+ rabbit_policy:recover(),
Qs = rabbit_amqqueue:recover(),
ok = rabbit_binding:recover(rabbit_exchange:recover(),
[QName || #amqqueue{name = QName} <- Qs]),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 0b666a36..6f36f99d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,8 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]).
+-export([start/0, stop/0, parse_arguments/2, action/5,
+ sync_queue/1, cancel_sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -25,12 +26,16 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(PRIORITY_OPT, "--priority").
+-define(APPLY_TO_OPT, "--apply-to").
-define(RAM_OPT, "--ram").
-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(PRIORITY_DEF, {?PRIORITY_OPT, {option, "0"}}).
+-define(APPLY_TO_DEF, {?APPLY_TO_OPT, {option, "all"}}).
-define(RAM_DEF, {?RAM_OPT, flag}).
-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
@@ -72,7 +77,7 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
- {set_policy, [?VHOST_DEF]},
+ {set_policy, [?VHOST_DEF, ?PRIORITY_DEF, ?APPLY_TO_DEF]},
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
@@ -127,19 +132,13 @@
start() ->
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS(NodeStr),
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
end,
- Opts1 = [case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
- _ -> {K, V}
- end || {K, V} <- Opts],
- Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
- Node = proplists:get_value(?NODE_OPT, Opts1),
+ Quiet = proplists:get_bool(?QUIET_OPT, Opts),
+ Node = proplists:get_value(?NODE_OPT, Opts),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -230,6 +229,19 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
rabbit_misc:quit(1).
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
%%----------------------------------------------------------------------------
action(stop, Node, Args, _Opts, Inform) ->
@@ -484,16 +496,15 @@ action(list_parameters, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
-action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
- when Prio == [] orelse length(Prio) == 1 ->
- Msg = "Setting policy ~p for pattern ~p to ~p",
- {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
- [P] -> {Msg ++ " with priority ~s", P}
- end,
+action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
+ Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p",
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
- rpc_call(Node, rabbit_policy, parse_set,
- [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
+ PriorityArg = proplists:get_value(?PRIORITY_OPT, Opts),
+ ApplyToArg = list_to_binary(proplists:get_value(?APPLY_TO_OPT, Opts)),
+ Inform(Msg, [Key, Pattern, Defn, PriorityArg]),
+ rpc_call(
+ Node, rabbit_policy, parse_set,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, PriorityArg, ApplyToArg]);
action(clear_policy, Node, [Key], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 91ca88dd..0785d278 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -25,9 +25,10 @@
-import(rabbit_misc, [pget/2]).
-export([register/0]).
+-export([invalidate/0, recover/0]).
-export([name/1, get/2, set/1]).
-export([validate/4, notify/4, notify_clear/3]).
--export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
+-export([parse_set/6, set/6, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
-rabbit_boot_step({?MODULE,
@@ -51,6 +52,10 @@ set(X = #exchange{name = Name}) -> rabbit_exchange_decorator:set(
set0(Name = #resource{virtual_host = VHost}) -> match(Name, list(VHost)).
+set(Q = #amqqueue{name = Name}, Ps) -> Q#amqqueue{policy = match(Name, Ps)};
+set(X = #exchange{name = Name}, Ps) -> rabbit_exchange_decorator:set(
+ X#exchange{policy = match(Name, Ps)}).
+
get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
%% Caution - SLOW.
@@ -68,32 +73,70 @@ get0(Name, List) -> case pget(definition, List) of
%%----------------------------------------------------------------------------
-parse_set(VHost, Name, Pattern, Definition, undefined) ->
- parse_set0(VHost, Name, Pattern, Definition, 0);
-parse_set(VHost, Name, Pattern, Definition, Priority) ->
+%% Gets called during upgrades - therefore must not assume anything about the
+%% state of Mnesia
+invalidate() ->
+ rabbit_file:write_file(invalid_file(), <<"">>).
+
+recover() ->
+ case rabbit_file:is_file(invalid_file()) of
+ true -> recover0(),
+ rabbit_file:delete(invalid_file());
+ false -> ok
+ end.
+
+%% To get here we have to have just completed an Mnesia upgrade - i.e. we are
+%% the first node starting. So we can rewrite the whole database. Note that
+%% recovery has not yet happened; we must work with the rabbit_durable_<thing>
+%% variants.
+recover0() ->
+ Xs = mnesia:dirty_match_object(rabbit_durable_exchange, #exchange{_ = '_'}),
+ Qs = mnesia:dirty_match_object(rabbit_durable_queue, #amqqueue{_ = '_'}),
+ Policies = list(),
+ [rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_exchange, set(X, Policies), write)
+ end) || X <- Xs],
+ [rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(rabbit_durable_queue, set(Q, Policies), write)
+ end) || Q <- Qs],
+ ok.
+
+invalid_file() ->
+ filename:join(rabbit_mnesia:dir(), "policies_are_invalid").
+
+%%----------------------------------------------------------------------------
+
+parse_set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
try list_to_integer(Priority) of
- Num -> parse_set0(VHost, Name, Pattern, Definition, Num)
+ Num -> parse_set0(VHost, Name, Pattern, Definition, Num, ApplyTo)
catch
error:badarg -> {error, "~p priority must be a number", [Priority]}
end.
-parse_set0(VHost, Name, Pattern, Defn, Priority) ->
+parse_set0(VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
case rabbit_misc:json_decode(Defn) of
{ok, JSON} ->
set0(VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
{<<"definition">>, rabbit_misc:json_to_term(JSON)},
- {<<"priority">>, Priority}]);
+ {<<"priority">>, Priority},
+ {<<"apply-to">>, ApplyTo}]);
error ->
{error_string, "JSON decoding error"}
end.
-set(VHost, Name, Pattern, Definition, Priority) ->
+set(VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
PolicyProps = [{<<"pattern">>, Pattern},
{<<"definition">>, Definition},
{<<"priority">>, case Priority of
undefined -> 0;
_ -> Priority
+ end},
+ {<<"apply-to">>, case ApplyTo of
+ undefined -> <<"all">>;
+ _ -> ApplyTo
end}],
set0(VHost, Name, PolicyProps).
@@ -130,6 +173,7 @@ p(Parameter, DefnFun) ->
[{vhost, pget(vhost, Parameter)},
{name, pget(name, Parameter)},
{pattern, pget(<<"pattern">>, Value)},
+ {'apply-to', pget(<<"apply-to">>, Value)},
{definition, DefnFun(pget(<<"definition">>, Value))},
{priority, pget(<<"priority">>, Value)}].
@@ -139,7 +183,7 @@ format(Term) ->
ident(X) -> X.
-info_keys() -> [vhost, name, pattern, definition, priority].
+info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
%%----------------------------------------------------------------------------
@@ -202,8 +246,16 @@ match(Name, Policies) ->
[Policy | _Rest] -> Policy
end.
-matches(#resource{name = Name}, Policy) ->
- match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]).
+matches(#resource{name = Name, kind = Kind, virtual_host = VHost}, Policy) ->
+ matches_type(Kind, pget('apply-to', Policy)) andalso
+ match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]) andalso
+ VHost =:= pget(vhost, Policy).
+
+matches_type(exchange, <<"exchanges">>) -> true;
+matches_type(queue, <<"queues">>) -> true;
+matches_type(exchange, <<"all">>) -> true;
+matches_type(queue, <<"all">>) -> true;
+matches_type(_, _) -> false.
sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
@@ -212,6 +264,7 @@ sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
policy_validation() ->
[{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
{<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"apply-to">>, fun apply_to_validation/2, optional},
{<<"definition">>, fun validation/2, mandatory}].
validation(_Name, []) ->
@@ -257,3 +310,10 @@ a2b(A) -> list_to_binary(atom_to_list(A)).
dups(L) -> L -- lists:usort(L).
is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
+
+apply_to_validation(_Name, <<"all">>) -> ok;
+apply_to_validation(_Name, <<"exchanges">>) -> ok;
+apply_to_validation(_Name, <<"queues">>) -> ok;
+apply_to_validation(_Name, Term) ->
+ {error, "apply-to '~s' unrecognised; should be 'queues', 'exchanges' "
+ "or 'all'", [Term]}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 30cf9114..5af4969a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -60,6 +60,7 @@ all_tests() ->
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_policy_validation(),
+ passed = test_policy_opts_validation(),
passed = test_ha_policy_validation(),
passed = test_server_status(),
passed = test_amqp_connection_refusal(),
@@ -1083,29 +1084,57 @@ test_runtime_parameters() ->
test_policy_validation() ->
rabbit_runtime_parameters_test:register_policy_validator(),
- SetPol =
- fun (Key, Val) ->
- control_action(
- set_policy,
- ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
- end,
+ SetPol = fun (Key, Val) ->
+ control_action_opts(
+ ["set_policy", "name", ".*",
+ rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
+ end,
- ok = SetPol("testeven", []),
- ok = SetPol("testeven", [1, 2]),
- ok = SetPol("testeven", [1, 2, 3, 4]),
- ok = SetPol("testpos", [2, 5, 5678]),
+ ok = SetPol("testeven", []),
+ ok = SetPol("testeven", [1, 2]),
+ ok = SetPol("testeven", [1, 2, 3, 4]),
+ ok = SetPol("testpos", [2, 5, 5678]),
- {error_string, _} = SetPol("testpos", [-1, 0, 1]),
- {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+ error = SetPol("testpos", [-1, 0, 1]),
+ error = SetPol("testeven", [ 1, 2, 3]),
ok = control_action(clear_policy, ["name"]),
rabbit_runtime_parameters_test:unregister_policy_validator(),
passed.
+test_policy_opts_validation() ->
+ Set = fun (Extra) -> control_action_opts(
+ ["set_policy", "name", ".*", "{\"ha-mode\":\"all\"}"
+ | Extra]) end,
+ OK = fun (Extra) -> ok = Set(Extra) end,
+ Fail = fun (Extra) -> error = Set(Extra) end,
+
+ OK ([]),
+
+ OK (["--priority", "0"]),
+ OK (["--priority", "3"]),
+ Fail(["--priority", "banana"]),
+ Fail(["--priority"]),
+
+ OK (["--apply-to", "all"]),
+ OK (["--apply-to", "queues"]),
+ Fail(["--apply-to", "bananas"]),
+ Fail(["--apply-to"]),
+
+ OK (["--priority", "3", "--apply-to", "queues"]),
+ Fail(["--priority", "banana", "--apply-to", "queues"]),
+ Fail(["--priority", "3", "--apply-to", "bananas"]),
+
+ Fail(["--offline"]),
+
+ ok = control_action(clear_policy, ["name"]),
+ passed.
+
test_ha_policy_validation() ->
- Set = fun (JSON) -> control_action(set_policy, ["name", ".*", JSON]) end,
+ Set = fun (JSON) -> control_action_opts(
+ ["set_policy", "name", ".*", JSON]) end,
OK = fun (JSON) -> ok = Set(JSON) end,
- Fail = fun (JSON) -> {error_string, _} = Set(JSON) end,
+ Fail = fun (JSON) -> error = Set(JSON) end,
OK ("{\"ha-mode\":\"all\"}"),
Fail("{\"ha-mode\":\"made_up\"}"),
@@ -1611,6 +1640,18 @@ control_action(Command, Node, Args, Opts) ->
Other
end.
+control_action_opts(Raw) ->
+ NodeStr = atom_to_list(node()),
+ case rabbit_control_main:parse_arguments(Raw, NodeStr) of
+ {ok, {Cmd, Opts, Args}} ->
+ case control_action(Cmd, node(), Args, Opts) of
+ ok -> ok;
+ _ -> error
+ end;
+ _ ->
+ error
+ end.
+
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 1613838c..d50cb282 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -44,6 +44,7 @@
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
+-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
%% -------------------------------------------------------------------
@@ -70,6 +71,7 @@
-spec(no_mirror_nodes/0 :: () -> 'ok').
-spec(gm_pids/0 :: () -> 'ok').
-spec(exchange_decorators/0 :: () -> 'ok').
+-spec(policy_apply_to/0 :: () -> 'ok').
-endif.
@@ -299,6 +301,27 @@ exchange_decorators(Table) ->
[name, type, durable, auto_delete, internal, arguments, scratches, policy,
decorators]).
+policy_apply_to() ->
+ transform(
+ rabbit_runtime_parameters,
+ fun ({runtime_parameters, Key = {_VHost, <<"policy">>, _Name}, Value}) ->
+ ApplyTo = apply_to(proplists:get_value(<<"definition">>, Value)),
+ {runtime_parameters, Key, [{<<"apply-to">>, ApplyTo} | Value]};
+ ({runtime_parameters, Key, Value}) ->
+ {runtime_parameters, Key, Value}
+ end,
+ [key, value]),
+ rabbit_policy:invalidate(),
+ ok.
+
+apply_to(Def) ->
+ case [proplists:get_value(K, Def) ||
+ K <- [<<"federation-upstream-set">>, <<"ha-mode">>]] of
+ [undefined, undefined] -> <<"all">>;
+ [_, undefined] -> <<"exchanges">>;
+ [undefined, _] -> <<"queues">>;
+ [_, _] -> <<"all">>
+ end.
%%--------------------------------------------------------------------