diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-04-30 11:28:48 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-04-30 11:28:48 +0100 |
commit | 94d254278048eb6d1d2e31f904f88808c6375980 (patch) | |
tree | 72a0803fa120d9445cb804d61dda568594348d85 | |
parent | 8726fa142693ca8a68fb9a634d14322739891b89 (diff) | |
parent | 1cfb7c934656b4457b2c790d8949502be696c893 (diff) | |
download | rabbitmq-server-94d254278048eb6d1d2e31f904f88808c6375980.tar.gz |
merge bug25501 into default
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 137 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_mode.erl | 57 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_mode_all.erl | 41 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_mode_exactly.erl | 56 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_mode_nodes.erl | 70 | ||||
-rw-r--r-- | src/rabbit_policy_validator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 3 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 9 |
8 files changed, 278 insertions, 97 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4fb1fc3b..8787e966 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,7 +22,7 @@ is_mirrored/1, update_mirrors/2, validate_policy/1]). %% for testing only --export([suggested_queue_nodes/4]). +-export([module/1]). -include("rabbit.hrl"). @@ -237,14 +237,17 @@ suggested_queue_nodes(Q) -> %% This variant exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. -suggested_queue_nodes(Q, PossibleNodes) -> +suggested_queue_nodes(Q, All) -> {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, - suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes, SSNodes}, PossibleNodes). + Params = policy(<<"ha-params">>, Q), + case module(Q) of + {ok, M} -> M:suggested_queue_nodes(Params, MNode, SNodes, SSNodes, All); + _ -> {MNode, []} + end. policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -252,52 +255,26 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) -> - {MNode, Poss -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) -> - Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - %% If the current master is not in the nodes specified, then what we want - %% to do depends on whether there are any synchronised slaves. If there - %% are then we can just kill the current master - the admin has asked for - %% a migration and we should give it to them. If there are not however - %% then we must keep the master around so as not to lose messages. - Nodes = case SSNodes of - [] -> lists:usort([MNode | Nodes1]); - _ -> Nodes1 - end, - Unavailable = Nodes -- Poss, - Available = Nodes -- Unavailable, - case Available of - [] -> %% We have never heard of anything? Not much we can do but - %% keep the master alive. - {MNode, []}; - _ -> case lists:member(MNode, Available) of - true -> {MNode, Available -- [MNode]}; - false -> %% Make sure the new master is synced! In order to - %% get here SSNodes must not be empty. - [NewMNode | _] = SSNodes, - {NewMNode, Available -- [NewMNode]} - end +module(#amqqueue{} = Q) -> + case rabbit_policy:get(<<"ha-mode">>, Q) of + {ok, Mode} -> module(Mode); + _ -> not_mirrored end; -%% When we need to add nodes, we randomise our candidate list as a -%% crude form of load-balancing. TODO it would also be nice to -%% randomise the list of ones to remove when we have too many - we -%% would have to take account of synchronisation though. -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) -> - SCount = Count - 1, - {MNode, case SCount > length(SNodes) of - true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), - SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); - false -> lists:sublist(SNodes, SCount) - end}; -suggested_queue_nodes(_, _, {MNode, _, _}, _) -> - {MNode, []}. - -shuffle(L) -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), - {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), - L1. + +module(Mode) when is_binary(Mode) -> + case rabbit_registry:binary_to_type(Mode) of + {error, not_found} -> not_mirrored; + T -> case rabbit_registry:lookup_module(ha_mode, T) of + {ok, Module} -> {ok, Module}; + _ -> not_mirrored + end + end. + +is_mirrored(Q) -> + case module(Q) of + {ok, _} -> true; + _ -> false + end. actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids, @@ -308,14 +285,6 @@ actual_queue_nodes(#amqqueue{pid = MPid, _ -> node(MPid) end, Nodes(SPids), Nodes(SSPids)}. -is_mirrored(Q) -> - case policy(<<"ha-mode">>, Q) of - <<"all">> -> true; - <<"nodes">> -> true; - <<"exactly">> -> true; - _ -> false - end. - maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> case policy(<<"ha-sync-mode">>, Q) of <<"automatic">> -> @@ -347,40 +316,24 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, %%---------------------------------------------------------------------------- validate_policy(KeyList) -> - case validate_policy( - proplists:get_value(<<"ha-mode">>, KeyList), - proplists:get_value(<<"ha-params">>, KeyList, none)) of - ok -> case proplists:get_value( - <<"ha-sync-mode">>, KeyList, <<"manual">>) of - <<"automatic">> -> ok; - <<"manual">> -> ok; - Mode -> {error, "ha-sync-mode must be \"manual\" " - "or \"automatic\", got ~p", [Mode]} - end; - E -> E + Mode = proplists:get_value(<<"ha-mode">>, KeyList), + Params = proplists:get_value(<<"ha-params">>, KeyList, none), + case Mode of + undefined -> ok; + _ -> case module(Mode) of + {ok, M} -> case M:validate_policy(Params) of + ok -> validate_sync_mode(KeyList); + E -> E + end; + _ -> {error, + "~p is not a valid ha-mode value", [Mode]} + end end. -validate_policy(<<"all">>, none) -> - ok; -validate_policy(<<"all">>, _Params) -> - {error, "ha-mode=\"all\" does not take parameters", []}; - -validate_policy(<<"nodes">>, []) -> - {error, "ha-mode=\"nodes\" list must be non-empty", []}; -validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) -> - case [I || I <- Nodes, not is_binary(I)] of - [] -> ok; - Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, " - "~p was not a string", [Invalid]} - end; -validate_policy(<<"nodes">>, Params) -> - {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]}; - -validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 -> - ok; -validate_policy(<<"exactly">>, Params) -> - {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]}; - -validate_policy(Mode, _Params) -> - {error, "~p is not a valid ha-mode value", [Mode]}. - +validate_sync_mode(KeyList) -> + case proplists:get_value(<<"ha-sync-mode">>, KeyList, <<"manual">>) of + <<"automatic">> -> ok; + <<"manual">> -> ok; + Mode -> {error, "ha-sync-mode must be \"manual\" " + "or \"automatic\", got ~p", [Mode]} + end. diff --git a/src/rabbit_mirror_queue_mode.erl b/src/rabbit_mirror_queue_mode.erl new file mode 100644 index 00000000..da4c9b36 --- /dev/null +++ b/src/rabbit_mirror_queue_mode.erl @@ -0,0 +1,57 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_mode). + +-ifdef(use_specs). + +-type(master() :: node()). +-type(slave() :: node()). +-type(params() :: any()). + +-callback description() -> [proplists:property()]. + +%% Called whenever we think we might need to change nodes for a +%% mirrored queue. Note that this is called from a variety of +%% contexts, both inside and outside Mnesia transactions. Ideally it +%% will be pure-functional. +%% +%% Takes: parameters set in the policy, +%% current master, +%% current slaves, +%% current synchronised slaves, +%% all nodes to consider +%% +%% Returns: tuple of new master, new slaves +%% +-callback suggested_queue_nodes( + params(), master(), [slave()], [slave()], [node()]) -> + {master(), [slave()]}. + +%% Are the parameters valid for this mode? +-callback validate_policy(params()) -> + rabbit_policy_validator:validate_results(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {suggested_queue_nodes, 5}, {validate_policy, 1}]. +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_mirror_queue_mode_all.erl b/src/rabbit_mirror_queue_mode_all.erl new file mode 100644 index 00000000..84d75b9a --- /dev/null +++ b/src/rabbit_mirror_queue_mode_all.erl @@ -0,0 +1,41 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_mode_all). + +-include("rabbit.hrl"). + +-behaviour(rabbit_mirror_queue_mode). + +-export([description/0, suggested_queue_nodes/5, validate_policy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "mirror mode all"}, + {mfa, {rabbit_registry, register, + [ha_mode, <<"all">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +description() -> + [{description, <<"Mirror queue to all nodes">>}]. + +suggested_queue_nodes(_Params, MNode, _SNodes, _SSNodes, Poss) -> + {MNode, Poss -- [MNode]}. + +validate_policy(none) -> + ok; +validate_policy(_Params) -> + {error, "ha-mode=\"all\" does not take parameters", []}. diff --git a/src/rabbit_mirror_queue_mode_exactly.erl b/src/rabbit_mirror_queue_mode_exactly.erl new file mode 100644 index 00000000..2a42c383 --- /dev/null +++ b/src/rabbit_mirror_queue_mode_exactly.erl @@ -0,0 +1,56 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_mode_exactly). + +-include("rabbit.hrl"). + +-behaviour(rabbit_mirror_queue_mode). + +-export([description/0, suggested_queue_nodes/5, validate_policy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "mirror mode exactly"}, + {mfa, {rabbit_registry, register, + [ha_mode, <<"exactly">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +description() -> + [{description, <<"Mirror queue to a specified number of nodes">>}]. + +%% When we need to add nodes, we randomise our candidate list as a +%% crude form of load-balancing. TODO it would also be nice to +%% randomise the list of ones to remove when we have too many - we +%% would have to take account of synchronisation though. +suggested_queue_nodes(Count, MNode, SNodes, _SSNodes, Poss) -> + SCount = Count - 1, + {MNode, case SCount > length(SNodes) of + true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), + SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); + false -> lists:sublist(SNodes, SCount) + end}. + +shuffle(L) -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), + L1. + +validate_policy(N) when is_integer(N) andalso N > 0 -> + ok; +validate_policy(Params) -> + {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]}. diff --git a/src/rabbit_mirror_queue_mode_nodes.erl b/src/rabbit_mirror_queue_mode_nodes.erl new file mode 100644 index 00000000..aa62ad33 --- /dev/null +++ b/src/rabbit_mirror_queue_mode_nodes.erl @@ -0,0 +1,70 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_mode_nodes). + +-include("rabbit.hrl"). + +-behaviour(rabbit_mirror_queue_mode). + +-export([description/0, suggested_queue_nodes/5, validate_policy/1]). + +-rabbit_boot_step({?MODULE, + [{description, "mirror mode nodes"}, + {mfa, {rabbit_registry, register, + [ha_mode, <<"nodes">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +description() -> + [{description, <<"Mirror queue to specified nodes">>}]. + +suggested_queue_nodes(Nodes0, MNode, _SNodes, SSNodes, Poss) -> + Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + %% If the current master is not in the nodes specified, then what we want + %% to do depends on whether there are any synchronised slaves. If there + %% are then we can just kill the current master - the admin has asked for + %% a migration and we should give it to them. If there are not however + %% then we must keep the master around so as not to lose messages. + Nodes = case SSNodes of + [] -> lists:usort([MNode | Nodes1]); + _ -> Nodes1 + end, + Unavailable = Nodes -- Poss, + Available = Nodes -- Unavailable, + case Available of + [] -> %% We have never heard of anything? Not much we can do but + %% keep the master alive. + {MNode, []}; + _ -> case lists:member(MNode, Available) of + true -> {MNode, Available -- [MNode]}; + false -> %% Make sure the new master is synced! In order to + %% get here SSNodes must not be empty. + [NewMNode | _] = SSNodes, + {NewMNode, Available -- [NewMNode]} + end + end. + +validate_policy([]) -> + {error, "ha-mode=\"nodes\" list must be non-empty", []}; +validate_policy(Nodes) when is_list(Nodes) -> + case [I || I <- Nodes, not is_binary(I)] of + [] -> ok; + Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, " + "~p was not a string", [Invalid]} + end; +validate_policy(Params) -> + {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]}. diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl index 75b88c39..f0bc1a30 100644 --- a/src/rabbit_policy_validator.erl +++ b/src/rabbit_policy_validator.erl @@ -18,6 +18,8 @@ -ifdef(use_specs). +-export_type([validate_results/0]). + -type(validate_results() :: 'ok' | {error, string(), [term()]} | [validate_results()]). diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index acdc2cff..6aae8de6 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -130,7 +130,8 @@ class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(policy_validator) -> rabbit_policy_validator. +class_module(policy_validator) -> rabbit_policy_validator; +class_module(ha_mode) -> rabbit_mirror_queue_mode. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 163f6170..2e46304f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -914,10 +914,11 @@ test_arguments_parser() -> test_dynamic_mirroring() -> %% Just unit tests of the node selection logic, see multi node %% tests for the rest... - Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) -> - {NewM, NewSs0} = - rabbit_mirror_queue_misc:suggested_queue_nodes( - Policy, Params, CurrentState, All), + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, + {MNode, SNodes, SSNodes}, All) -> + {ok, M} = rabbit_mirror_queue_misc:module(Policy), + {NewM, NewSs0} = M:suggested_queue_nodes( + Params, MNode, SNodes, SSNodes, All), NewSs1 = lists:sort(NewSs0), case dm_list_match(NewSs, NewSs1, ExtraSs) of ok -> ok; |