diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-10 13:59:17 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-10 13:59:17 +0100 |
commit | 1ac8c17d396ed7179f78d6ffbe8681875775e270 (patch) | |
tree | b0809a5e67a7d67ffae94883894c374505502e0b | |
parent | 81cc69afc8a2b408ba9dbabbc1ecf3776fc12739 (diff) | |
parent | 3f13a798ee99f7cebcbb14c5e45e48e2a0bfe81d (diff) | |
download | rabbitmq-server-1ac8c17d396ed7179f78d6ffbe8681875775e270.tar.gz |
Merge defaultbug25195
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 66 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 |
4 files changed, 44 insertions, 44 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 4d8b6ade..72dcfc95 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -344,10 +344,9 @@ handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, MPid, DeadPids, ExtraNodes} -> + {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ea98430c..d865d675 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -149,7 +149,17 @@ stop_all_slaves(Reason, #state{gm = GM}) -> MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], - ok = gm:forget_group(proplists:get_value(group_name, Info)). + %% Normally when we remove a slave another slave or master will + %% notice and update Mnesia. But we just removed them all, and + %% have stopped listening ourselves. So manually clean up. + QName = proplists:get_value(group_name, Info), + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q] = mnesia:read({rabbit_queue, QName}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { slave_pids = [] }) + end), + ok = gm:forget_group(QName). purge(State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0226bcb..6a97cd0d 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -31,11 +31,12 @@ -spec(remove_from_queue/2 :: (rabbit_amqqueue:name(), [pid()]) - -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). -spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). + (rabbit_amqqueue:name(), node()) -> + {'ok', atom()} | rabbit_types:error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -59,7 +60,6 @@ remove_from_queue(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes, rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -73,42 +73,45 @@ remove_from_queue(QueueName, DeadGMPids) -> {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, [], []}; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - Q1 = store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - %% Sometimes a slave dying means we need - %% to start more on other nodes - - %% "exactly" mode can cause this to - %% happen. - {_, OldNodes} = actual_queue_nodes(Q1), - {_, NewNodes} = suggested_queue_nodes( - Q1, ClusterNodes), - {ok, QPid1, [QPid | SPids] -- Alive, - NewNodes -- OldNodes}; + store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, [], []} + {ok, QPid1, []} end end end). on_node_up() -> - ClusterNodes = rabbit_mnesia:cluster_nodes(running), QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (Q = #amqqueue{name = QName}, QNames0) -> + fun (Q = #amqqueue{name = QName, + pid = Pid, + slave_pids = SPids}, QNames0) -> + %% We don't want to pass in the whole + %% cluster - we don't want a situation + %% where starting one node causes us to + %% decide to start a mirror on another + PossibleNodes0 = [node(P) || P <- [Pid | SPids]], + PossibleNodes = + case lists:member(node(), PossibleNodes0) of + true -> PossibleNodes0; + false -> [node() | PossibleNodes0] + end, {_MNode, SNodes} = suggested_queue_nodes( - Q, ClusterNodes), + Q, PossibleNodes), case lists:member(node(), SNodes) of true -> [QName | QNames0]; false -> QNames0 @@ -234,14 +237,14 @@ suggested_queue_nodes(Q) -> %% This variant exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. -suggested_queue_nodes(Q, ClusterNodes) -> +suggested_queue_nodes(Q, PossibleNodes) -> {MNode0, SNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, ClusterNodes). + {MNode, SNodes}, PossibleNodes). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -249,11 +252,11 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) -> - {MNode, All -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> + {MNode, Possible -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - Unavailable = Nodes -- All, + Unavailable = Nodes -- Possible, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -264,10 +267,10 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) -> false -> promote_slave(Available) end end; -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) -> +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = (All -- [MNode]) -- SNodes, + true -> Cand = (Possible -- [MNode]) -- SNodes, SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; @@ -309,13 +312,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, All = fun ({A,B}) -> [A|B] end, OldNodes = All(actual_queue_nodes(OldQ)), NewNodes = All(suggested_queue_nodes(NewQ)), - %% When a mirror dies, remove_from_queue/2 might have to add new - %% slaves (in "exactly" mode). It will check mnesia to see which - %% slaves there currently are. If drop_mirror/2 is invoked first - %% then when we end up in remove_from_queue/2 it will not see the - %% slaves that add_mirror/2 will add, and also want to add them - %% (even though we are not responding to the death of a - %% mirror). Breakage ensues. add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 53e61e2d..931a7f90 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -180,25 +180,20 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids, ExtraNodes} -> + {ok, Pid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed gen_server2:reply(From, ok), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); node(Pid) =:= node() -> %% we've become master QueueState = promote_me(From, State), - rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), - %% assertion, we don't need to add_mirrors/2 in this - %% branch, see last clause in remove_from_queue/2 - [] = ExtraNodes, erlang:monitor(process, Pid), noreply(State #state { master_pid = Pid }) end |