summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-04-30 11:28:48 +0100
committerTim Watson <tim@rabbitmq.com>2013-04-30 11:28:48 +0100
commit94d254278048eb6d1d2e31f904f88808c6375980 (patch)
tree72a0803fa120d9445cb804d61dda568594348d85
parent8726fa142693ca8a68fb9a634d14322739891b89 (diff)
parent1cfb7c934656b4457b2c790d8949502be696c893 (diff)
downloadrabbitmq-server-94d254278048eb6d1d2e31f904f88808c6375980.tar.gz
merge bug25501 into default
-rw-r--r--src/rabbit_mirror_queue_misc.erl137
-rw-r--r--src/rabbit_mirror_queue_mode.erl57
-rw-r--r--src/rabbit_mirror_queue_mode_all.erl41
-rw-r--r--src/rabbit_mirror_queue_mode_exactly.erl56
-rw-r--r--src/rabbit_mirror_queue_mode_nodes.erl70
-rw-r--r--src/rabbit_policy_validator.erl2
-rw-r--r--src/rabbit_registry.erl3
-rw-r--r--src/rabbit_tests.erl9
8 files changed, 278 insertions, 97 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4fb1fc3b..8787e966 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,7 +22,7 @@
is_mirrored/1, update_mirrors/2, validate_policy/1]).
%% for testing only
--export([suggested_queue_nodes/4]).
+-export([module/1]).
-include("rabbit.hrl").
@@ -237,14 +237,17 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, PossibleNodes) ->
+suggested_queue_nodes(Q, All) ->
{MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
- suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes, SSNodes}, PossibleNodes).
+ Params = policy(<<"ha-params">>, Q),
+ case module(Q) of
+ {ok, M} -> M:suggested_queue_nodes(Params, MNode, SNodes, SSNodes, All);
+ _ -> {MNode, []}
+ end.
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -252,52 +255,26 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
- {MNode, Poss -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
- Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is not in the nodes specified, then what we want
- %% to do depends on whether there are any synchronised slaves. If there
- %% are then we can just kill the current master - the admin has asked for
- %% a migration and we should give it to them. If there are not however
- %% then we must keep the master around so as not to lose messages.
- Nodes = case SSNodes of
- [] -> lists:usort([MNode | Nodes1]);
- _ -> Nodes1
- end,
- Unavailable = Nodes -- Poss,
- Available = Nodes -- Unavailable,
- case Available of
- [] -> %% We have never heard of anything? Not much we can do but
- %% keep the master alive.
- {MNode, []};
- _ -> case lists:member(MNode, Available) of
- true -> {MNode, Available -- [MNode]};
- false -> %% Make sure the new master is synced! In order to
- %% get here SSNodes must not be empty.
- [NewMNode | _] = SSNodes,
- {NewMNode, Available -- [NewMNode]}
- end
+module(#amqqueue{} = Q) ->
+ case rabbit_policy:get(<<"ha-mode">>, Q) of
+ {ok, Mode} -> module(Mode);
+ _ -> not_mirrored
end;
-%% When we need to add nodes, we randomise our candidate list as a
-%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - we
-%% would have to take account of synchronisation though.
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
- SCount = Count - 1,
- {MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
- SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
- false -> lists:sublist(SNodes, SCount)
- end};
-suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
- {MNode, []}.
-
-shuffle(L) ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
- {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
- L1.
+
+module(Mode) when is_binary(Mode) ->
+ case rabbit_registry:binary_to_type(Mode) of
+ {error, not_found} -> not_mirrored;
+ T -> case rabbit_registry:lookup_module(ha_mode, T) of
+ {ok, Module} -> {ok, Module};
+ _ -> not_mirrored
+ end
+ end.
+
+is_mirrored(Q) ->
+ case module(Q) of
+ {ok, _} -> true;
+ _ -> false
+ end.
actual_queue_nodes(#amqqueue{pid = MPid,
slave_pids = SPids,
@@ -308,14 +285,6 @@ actual_queue_nodes(#amqqueue{pid = MPid,
_ -> node(MPid)
end, Nodes(SPids), Nodes(SSPids)}.
-is_mirrored(Q) ->
- case policy(<<"ha-mode">>, Q) of
- <<"all">> -> true;
- <<"nodes">> -> true;
- <<"exactly">> -> true;
- _ -> false
- end.
-
maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
case policy(<<"ha-sync-mode">>, Q) of
<<"automatic">> ->
@@ -347,40 +316,24 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- case validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)) of
- ok -> case proplists:get_value(
- <<"ha-sync-mode">>, KeyList, <<"manual">>) of
- <<"automatic">> -> ok;
- <<"manual">> -> ok;
- Mode -> {error, "ha-sync-mode must be \"manual\" "
- "or \"automatic\", got ~p", [Mode]}
- end;
- E -> E
+ Mode = proplists:get_value(<<"ha-mode">>, KeyList),
+ Params = proplists:get_value(<<"ha-params">>, KeyList, none),
+ case Mode of
+ undefined -> ok;
+ _ -> case module(Mode) of
+ {ok, M} -> case M:validate_policy(Params) of
+ ok -> validate_sync_mode(KeyList);
+ E -> E
+ end;
+ _ -> {error,
+ "~p is not a valid ha-mode value", [Mode]}
+ end
end.
-validate_policy(<<"all">>, none) ->
- ok;
-validate_policy(<<"all">>, _Params) ->
- {error, "ha-mode=\"all\" does not take parameters", []};
-
-validate_policy(<<"nodes">>, []) ->
- {error, "ha-mode=\"nodes\" list must be non-empty", []};
-validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) ->
- case [I || I <- Nodes, not is_binary(I)] of
- [] -> ok;
- Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, "
- "~p was not a string", [Invalid]}
- end;
-validate_policy(<<"nodes">>, Params) ->
- {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]};
-
-validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 ->
- ok;
-validate_policy(<<"exactly">>, Params) ->
- {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]};
-
-validate_policy(Mode, _Params) ->
- {error, "~p is not a valid ha-mode value", [Mode]}.
-
+validate_sync_mode(KeyList) ->
+ case proplists:get_value(<<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_mode.erl b/src/rabbit_mirror_queue_mode.erl
new file mode 100644
index 00000000..da4c9b36
--- /dev/null
+++ b/src/rabbit_mirror_queue_mode.erl
@@ -0,0 +1,57 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_mode).
+
+-ifdef(use_specs).
+
+-type(master() :: node()).
+-type(slave() :: node()).
+-type(params() :: any()).
+
+-callback description() -> [proplists:property()].
+
+%% Called whenever we think we might need to change nodes for a
+%% mirrored queue. Note that this is called from a variety of
+%% contexts, both inside and outside Mnesia transactions. Ideally it
+%% will be pure-functional.
+%%
+%% Takes: parameters set in the policy,
+%% current master,
+%% current slaves,
+%% current synchronised slaves,
+%% all nodes to consider
+%%
+%% Returns: tuple of new master, new slaves
+%%
+-callback suggested_queue_nodes(
+ params(), master(), [slave()], [slave()], [node()]) ->
+ {master(), [slave()]}.
+
+%% Are the parameters valid for this mode?
+-callback validate_policy(params()) ->
+ rabbit_policy_validator:validate_results().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {suggested_queue_nodes, 5}, {validate_policy, 1}].
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_mirror_queue_mode_all.erl b/src/rabbit_mirror_queue_mode_all.erl
new file mode 100644
index 00000000..84d75b9a
--- /dev/null
+++ b/src/rabbit_mirror_queue_mode_all.erl
@@ -0,0 +1,41 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_mode_all).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_mirror_queue_mode).
+
+-export([description/0, suggested_queue_nodes/5, validate_policy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "mirror mode all"},
+ {mfa, {rabbit_registry, register,
+ [ha_mode, <<"all">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{description, <<"Mirror queue to all nodes">>}].
+
+suggested_queue_nodes(_Params, MNode, _SNodes, _SSNodes, Poss) ->
+ {MNode, Poss -- [MNode]}.
+
+validate_policy(none) ->
+ ok;
+validate_policy(_Params) ->
+ {error, "ha-mode=\"all\" does not take parameters", []}.
diff --git a/src/rabbit_mirror_queue_mode_exactly.erl b/src/rabbit_mirror_queue_mode_exactly.erl
new file mode 100644
index 00000000..2a42c383
--- /dev/null
+++ b/src/rabbit_mirror_queue_mode_exactly.erl
@@ -0,0 +1,56 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_mode_exactly).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_mirror_queue_mode).
+
+-export([description/0, suggested_queue_nodes/5, validate_policy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "mirror mode exactly"},
+ {mfa, {rabbit_registry, register,
+ [ha_mode, <<"exactly">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{description, <<"Mirror queue to a specified number of nodes">>}].
+
+%% When we need to add nodes, we randomise our candidate list as a
+%% crude form of load-balancing. TODO it would also be nice to
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(Count, MNode, SNodes, _SSNodes, Poss) ->
+ SCount = Count - 1,
+ {MNode, case SCount > length(SNodes) of
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
+ SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
+ false -> lists:sublist(SNodes, SCount)
+ end}.
+
+shuffle(L) ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
+ L1.
+
+validate_policy(N) when is_integer(N) andalso N > 0 ->
+ ok;
+validate_policy(Params) ->
+ {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]}.
diff --git a/src/rabbit_mirror_queue_mode_nodes.erl b/src/rabbit_mirror_queue_mode_nodes.erl
new file mode 100644
index 00000000..aa62ad33
--- /dev/null
+++ b/src/rabbit_mirror_queue_mode_nodes.erl
@@ -0,0 +1,70 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_mode_nodes).
+
+-include("rabbit.hrl").
+
+-behaviour(rabbit_mirror_queue_mode).
+
+-export([description/0, suggested_queue_nodes/5, validate_policy/1]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "mirror mode nodes"},
+ {mfa, {rabbit_registry, register,
+ [ha_mode, <<"nodes">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+description() ->
+ [{description, <<"Mirror queue to specified nodes">>}].
+
+suggested_queue_nodes(Nodes0, MNode, _SNodes, SSNodes, Poss) ->
+ Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
+ Available = Nodes -- Unavailable,
+ case Available of
+ [] -> %% We have never heard of anything? Not much we can do but
+ %% keep the master alive.
+ {MNode, []};
+ _ -> case lists:member(MNode, Available) of
+ true -> {MNode, Available -- [MNode]};
+ false -> %% Make sure the new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
+ end
+ end.
+
+validate_policy([]) ->
+ {error, "ha-mode=\"nodes\" list must be non-empty", []};
+validate_policy(Nodes) when is_list(Nodes) ->
+ case [I || I <- Nodes, not is_binary(I)] of
+ [] -> ok;
+ Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, "
+ "~p was not a string", [Invalid]}
+ end;
+validate_policy(Params) ->
+ {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]}.
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
index 75b88c39..f0bc1a30 100644
--- a/src/rabbit_policy_validator.erl
+++ b/src/rabbit_policy_validator.erl
@@ -18,6 +18,8 @@
-ifdef(use_specs).
+-export_type([validate_results/0]).
+
-type(validate_results() ::
'ok' | {error, string(), [term()]} | [validate_results()]).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index acdc2cff..6aae8de6 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -130,7 +130,8 @@ class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(policy_validator) -> rabbit_policy_validator.
+class_module(policy_validator) -> rabbit_policy_validator;
+class_module(ha_mode) -> rabbit_mirror_queue_mode.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 163f6170..2e46304f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -914,10 +914,11 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) ->
- {NewM, NewSs0} =
- rabbit_mirror_queue_misc:suggested_queue_nodes(
- Policy, Params, CurrentState, All),
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params,
+ {MNode, SNodes, SSNodes}, All) ->
+ {ok, M} = rabbit_mirror_queue_misc:module(Policy),
+ {NewM, NewSs0} = M:suggested_queue_nodes(
+ Params, MNode, SNodes, SSNodes, All),
NewSs1 = lists:sort(NewSs0),
case dm_list_match(NewSs, NewSs1, ExtraSs) of
ok -> ok;