diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-08 15:55:57 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-08 15:55:57 +0100 |
commit | fea7363c2873941d2135f6a480e38b2f1c12f3c4 (patch) | |
tree | 9366b571c8a8d5ae85a07d46e0958d1aac31fb3c | |
parent | 265b0c1db5715b79f39864f58892f496b827833f (diff) | |
download | rabbitmq-server-fea7363c2873941d2135f6a480e38b2f1c12f3c4.tar.gz |
WIP
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 68 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
3 files changed, 35 insertions, 43 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 40359da3..676cb519 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -344,10 +344,9 @@ handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, MPid, DeadPids, ExtraNodes} -> + {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0226bcb..dbc4a253 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -31,7 +31,7 @@ -spec(remove_from_queue/2 :: (rabbit_amqqueue:name(), [pid()]) - -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). -spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: @@ -59,7 +59,6 @@ remove_from_queue(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes, rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -73,42 +72,45 @@ remove_from_queue(QueueName, DeadGMPids) -> {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, [], []}; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - %% Sometimes a slave dying means we need - %% to start more on other nodes - - %% "exactly" mode can cause this to - %% happen. - {_, OldNodes} = actual_queue_nodes(Q1), - {_, NewNodes} = suggested_queue_nodes( - Q1, ClusterNodes), - {ok, QPid1, [QPid | SPids] -- Alive, - NewNodes -- OldNodes}; + store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, [], []} + {ok, QPid1, []} end end end). on_node_up() -> - ClusterNodes = rabbit_mnesia:cluster_nodes(running), QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (Q = #amqqueue{name = QName}, QNames0) -> + fun (Q = #amqqueue{name = QName, + pid = Pid, + slave_pids = SPids}, QNames0) -> + %% We don't want to pass in the whole + %% cluster - we don't want a situation + %% where starting one node causes us to + %% decide to start a mirror on another + PossibleNodes0 = [node(P) || P <- [Pid | SPids]], + PossibleNodes = + case lists:member(node(), PossibleNodes0) of + true -> PossibleNodes0; + false -> [node() | PossibleNodes0] + end, {_MNode, SNodes} = suggested_queue_nodes( - Q, ClusterNodes), + Q, PossibleNodes), case lists:member(node(), SNodes) of true -> [QName | QNames0]; false -> QNames0 @@ -234,14 +236,17 @@ suggested_queue_nodes(Q) -> %% This variant exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. -suggested_queue_nodes(Q, ClusterNodes) -> +suggested_queue_nodes(Q, PossibleNodes) -> {MNode0, SNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, - suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, ClusterNodes). + R = suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, PossibleNodes), + io:format("SQN: ~p~n", [{policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, PossibleNodes, R}]), + R. policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -249,11 +254,11 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> - {MNode, All -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> + {MNode, Possible -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - Unavailable = Nodes -- All, + Unavailable = Nodes -- Possible, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -264,10 +269,10 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> false -> promote_slave(Available) end end; -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = (All -- [MNode]) -- SNodes, + true -> Cand = (Possible -- [MNode]) -- SNodes, SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; @@ -309,13 +314,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, All = fun ({A,B}) -> [A|B] end, OldNodes = All(actual_queue_nodes(OldQ)), NewNodes = All(suggested_queue_nodes(NewQ)), - %% When a mirror dies, remove_from_queue/2 might have to add new - %% slaves (in "exactly" mode). It will check mnesia to see which - %% slaves there currently are. If drop_mirror/2 is invoked first - %% then when we end up in remove_from_queue/2 it will not see the - %% slaves that add_mirror/2 will add, and also want to add them - %% (even though we are not responding to the death of a - %% mirror). Breakage ensues. add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 307f2b4f..61423202 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -181,25 +181,20 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids, ExtraNodes} -> + {ok, Pid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed gen_server2:reply(From, ok), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); node(Pid) =:= node() -> %% we've become master QueueState = promote_me(From, State), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), - %% assertion, we don't need to add_mirrors/2 in this - %% branch, see last clause in remove_from_queue/2 - [] = ExtraNodes, erlang:monitor(process, Pid), %% GM is lazy. So we know of the death of the %% slave since it is a neighbour of ours, but |