diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 18:04:31 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 18:04:31 +0000 |
commit | 37dd46271a805a90b49d993d49ade8b70c47f1ef (patch) | |
tree | 280a8ea54fd742aa729d5d1cec69eb6f95f24513 | |
parent | 4c6f0763a360f086987a4687ef39b1409f31f523 (diff) | |
download | rabbitmq-server-37dd46271a805a90b49d993d49ade8b70c47f1ef.tar.gz |
Tweak nodes policy to allow master removal and thus queue migration
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 49 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 46 |
3 files changed, 73 insertions, 49 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 05036d35..cc2d7c77 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -235,13 +235,13 @@ suggested_queue_nodes(Q) -> %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. suggested_queue_nodes(Q, PossibleNodes) -> - {MNode0, SNodes} = actual_queue_nodes(Q), + {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, PossibleNodes). + {MNode, SNodes, SSNodes}, PossibleNodes). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -249,15 +249,20 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> - {MNode, Possible -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) -> + {MNode, Poss -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) -> Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - %% If the current master is currently not in the nodes specified, - %% act like it is for the purposes below - otherwise we will not - %% return it in the results... - Nodes = lists:usort([MNode | Nodes1]), - Unavailable = Nodes -- Possible, + %% If the current master is not in the nodes specified, then what we want + %% to do depends on whether there are any synchronised slaves. If there + %% are then we can just kill the current master - the admin has asked for + %% a migration and we should give it to them. If there are not however + %% then we must keep the master around so as not to lose messages. + Nodes = case SSNodes of + [] -> lists:usort([MNode | Nodes1]); + _ -> Nodes1 + end, + Unavailable = Nodes -- Poss, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -265,21 +270,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> {MNode, []}; _ -> case lists:member(MNode, Available) of true -> {MNode, Available -- [MNode]}; - false -> promote_slave(Available) + false -> %% Make the sure new master is synced! In order to + %% get here SSNodes must not be empty. + [NewMNode | _] = SSNodes, + {NewMNode, Available -- [NewMNode]} end end; %% When we need to add nodes, we randomise our candidate list as a %% crude form of load-balancing. TODO it would also be nice to -%% randomise the list of ones to remove when we have too many - but -%% that would fail to take account of synchronisation... -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> +%% randomise the list of ones to remove when we have too many - we +%% would have to take account of synchronisation though. +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = shuffle((Possible -- [MNode]) -- SNodes), + true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; -suggested_queue_nodes(_, _, {MNode, _}, _) -> +suggested_queue_nodes(_, _, {MNode, _, _}, _) -> {MNode, []}. shuffle(L) -> @@ -288,11 +296,14 @@ shuffle(L) -> {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), L1. -actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> +actual_queue_nodes(#amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids}) -> + Nodes = fun (L) -> [node(Pid) || Pid <- L] end, {case MPid of none -> none; _ -> node(MPid) - end, [node(Pid) || Pid <- SPids]}. + end, Nodes(SPids), Nodes(SSPids)}. is_mirrored(Q) -> case policy(<<"ha-mode">>, Q) of @@ -313,7 +324,7 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, update_mirrors0(OldQ = #amqqueue{name = QName}, NewQ = #amqqueue{name = QName}) -> - All = fun ({A,B}) -> [A|B] end, + All = fun (Tuple) -> [element(1, Tuple) | element(2, Tuple)] end, OldNodes = All(actual_queue_nodes(OldQ)), NewNodes = All(suggested_queue_nodes(NewQ)), add_mirrors(QName, NewNodes -- OldNodes), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 69a3be2b..b435e0f3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) -> rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQ:set_ram_duration_target(DesiredDuration, BQS1). +%% [1] - the arrival of this newly synced slave may cause the master to die if +%% the admin has requested a migration-type change to policy. record_synchronised(#amqqueue { name = QName }) -> Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q = #amqqueue { sync_slave_pids = SSPids }] -> - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { sync_slave_pids = [Self | SSPids] }), - ok - end - end). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue { sync_slave_pids = SSPids }] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2), + {ok, Q1, Q2} + end + end) of + ok -> ok; + {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1] + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f5ea4fba..9bc4288d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -912,10 +912,10 @@ test_arguments_parser() -> test_dynamic_mirroring() -> %% Just unit tests of the node selection logic, see multi node %% tests for the rest... - Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) -> + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) -> {NewM, NewSs0} = rabbit_mirror_queue_misc:suggested_queue_nodes( - Policy, Params, {OldM, OldSs}, All), + Policy, Params, CurrentState, All), NewSs1 = lists:sort(NewSs0), case dm_list_match(NewSs, NewSs1, ExtraSs) of ok -> ok; @@ -923,28 +923,36 @@ test_dynamic_mirroring() -> end end, - Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]), + + N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end, %% Add a node - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), - Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]), + Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]), %% Add two nodes and drop one - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]), %% Don't try to include nodes that are not running - Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]), %% If we can't find any of the nodes listed then just keep the master - Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), - %% And once that's happened, still keep the master even when not listed - Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]), - - Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), - Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]), - Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]), - Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]), + %% And once that's happened, still keep the master even when not listed, + %% if nothing is synced + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]), + %% But if something is synced we can lose the master - but make + %% sure we pick the new master from the nodes which are synced! + Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]), + Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]), + + Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]), + Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]), + Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]), passed. |