summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-02-12 18:04:31 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-02-12 18:04:31 +0000
commit37dd46271a805a90b49d993d49ade8b70c47f1ef (patch)
tree280a8ea54fd742aa729d5d1cec69eb6f95f24513
parent4c6f0763a360f086987a4687ef39b1409f31f523 (diff)
downloadrabbitmq-server-37dd46271a805a90b49d993d49ade8b70c47f1ef.tar.gz
Tweak nodes policy to allow master removal and thus queue migration
-rw-r--r--src/rabbit_mirror_queue_misc.erl49
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
-rw-r--r--src/rabbit_tests.erl46
3 files changed, 73 insertions, 49 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 05036d35..cc2d7c77 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -235,13 +235,13 @@ suggested_queue_nodes(Q) ->
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
suggested_queue_nodes(Q, PossibleNodes) ->
- {MNode0, SNodes} = actual_queue_nodes(Q),
+ {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, PossibleNodes).
+ {MNode, SNodes, SSNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,15 +249,20 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
- {MNode, Possible -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
+ {MNode, Poss -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is currently not in the nodes specified,
- %% act like it is for the purposes below - otherwise we will not
- %% return it in the results...
- Nodes = lists:usort([MNode | Nodes1]),
- Unavailable = Nodes -- Possible,
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,21 +270,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
{MNode, []};
_ -> case lists:member(MNode, Available) of
true -> {MNode, Available -- [MNode]};
- false -> promote_slave(Available)
+ false -> %% Make the sure new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
end
end;
%% When we need to add nodes, we randomise our candidate list as a
%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - but
-%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
-suggested_queue_nodes(_, _, {MNode, _}, _) ->
+suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
{MNode, []}.
shuffle(L) ->
@@ -288,11 +296,14 @@ shuffle(L) ->
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
-actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+actual_queue_nodes(#amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
- end, [node(Pid) || Pid <- SPids]}.
+ end, Nodes(SPids), Nodes(SSPids)}.
is_mirrored(Q) ->
case policy(<<"ha-mode">>, Q) of
@@ -313,7 +324,7 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
update_mirrors0(OldQ = #amqqueue{name = QName},
NewQ = #amqqueue{name = QName}) ->
- All = fun ({A,B}) -> [A|B] end,
+ All = fun (Tuple) -> [element(1, Tuple) | element(2, Tuple)] end,
OldNodes = All(actual_queue_nodes(OldQ)),
NewNodes = All(suggested_queue_nodes(NewQ)),
add_mirrors(QName, NewNodes -- OldNodes),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 69a3be2b..b435e0f3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) ->
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQ:set_ram_duration_target(DesiredDuration, BQS1).
+%% [1] - the arrival of this newly synced slave may cause the master to die if
+%% the admin has requested a migration-type change to policy.
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q = #amqqueue { sync_slave_pids = SSPids }] ->
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
- ok
- end
- end).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue { sync_slave_pids = SSPids }] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2),
+ {ok, Q1, Q2}
+ end
+ end) of
+ ok -> ok;
+ {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1]
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f5ea4fba..9bc4288d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -912,10 +912,10 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
- Policy, Params, {OldM, OldSs}, All),
+ Policy, Params, CurrentState, All),
NewSs1 = lists:sort(NewSs0),
case dm_list_match(NewSs, NewSs1, ExtraSs) of
ok -> ok;
@@ -923,28 +923,36 @@ test_dynamic_mirroring() ->
end
end,
- Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
+
+ N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
%% Add a node
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- %% And once that's happened, still keep the master even when not listed
- Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
-
- Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed,
+ %% if nothing is synced
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
+ %% But if something is synced we can lose the master - but make
+ %% sure we pick the new master from the nodes which are synced!
+ Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
+ Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
passed.