summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_misc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r--src/rabbit_mirror_queue_misc.erl85
1 files changed, 59 insertions, 26 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 58f20476..4fb1fc3b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
@@ -32,6 +32,8 @@
[policy_validator, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
{ok, SPid} when is_pid(SPid) ->
+ maybe_auto_sync(Q),
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
@@ -235,13 +238,13 @@ suggested_queue_nodes(Q) ->
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
suggested_queue_nodes(Q, PossibleNodes) ->
- {MNode0, SNodes} = actual_queue_nodes(Q),
+ {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, PossibleNodes).
+ {MNode, SNodes, SSNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,15 +252,20 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
- {MNode, Possible -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
+ {MNode, Poss -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is currently not in the nodes specified,
- %% act like it is for the purposes below - otherwise we will not
- %% return it in the results...
- Nodes = lists:usort([MNode | Nodes1]),
- Unavailable = Nodes -- Possible,
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
{MNode, []};
_ -> case lists:member(MNode, Available) of
true -> {MNode, Available -- [MNode]};
- false -> promote_slave(Available)
+ false -> %% Make sure the new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
end
end;
%% When we need to add nodes, we randomise our candidate list as a
%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - but
-%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
-suggested_queue_nodes(_, _, {MNode, _}, _) ->
+suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
{MNode, []}.
shuffle(L) ->
@@ -288,11 +299,14 @@ shuffle(L) ->
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
-actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+actual_queue_nodes(#amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
- end, [node(Pid) || Pid <- SPids]}.
+ end, Nodes(SPids), Nodes(SSPids)}.
is_mirrored(Q) ->
case policy(<<"ha-mode">>, Q) of
@@ -302,6 +316,14 @@ is_mirrored(Q) ->
_ -> false
end.
+maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
+ case policy(<<"ha-sync-mode">>, Q) of
+ <<"automatic">> ->
+ spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end);
+ _ ->
+ ok
+ end.
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
@@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
update_mirrors0(OldQ = #amqqueue{name = QName},
NewQ = #amqqueue{name = QName}) ->
- All = fun ({A,B}) -> [A|B] end,
- OldNodes = All(actual_queue_nodes(OldQ)),
- NewNodes = All(suggested_queue_nodes(NewQ)),
- add_mirrors(QName, NewNodes -- OldNodes),
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+ OldNodes = [OldMNode | OldSNodes],
+ NewNodes = [NewMNode | NewSNodes],
+ add_mirrors (QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
+ maybe_auto_sync(NewQ),
ok.
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)).
+ case validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)) of
+ ok -> case proplists:get_value(
+ <<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end;
+ E -> E
+ end.
validate_policy(<<"all">>, none) ->
ok;