summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-10 13:59:17 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-10 13:59:17 +0100
commit1ac8c17d396ed7179f78d6ffbe8681875775e270 (patch)
treeb0809a5e67a7d67ffae94883894c374505502e0b
parent81cc69afc8a2b408ba9dbabbc1ecf3776fc12739 (diff)
parent3f13a798ee99f7cebcbb14c5e45e48e2a0bfe81d (diff)
downloadrabbitmq-server-bug25195.tar.gz
Merge defaultbug25195
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl12
-rw-r--r--src/rabbit_mirror_queue_misc.erl66
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
4 files changed, 44 insertions, 44 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 4d8b6ade..72dcfc95 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -344,10 +344,9 @@ handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, MPid, DeadPids, ExtraNodes} ->
+ {ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ea98430c..d865d675 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -149,7 +149,17 @@ stop_all_slaves(Reason, #state{gm = GM}) ->
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
- ok = gm:forget_group(proplists:get_value(group_name, Info)).
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ QName = proplists:get_value(group_name, Info),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { slave_pids = [] })
+ end),
+ ok = gm:forget_group(QName).
purge(State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0226bcb..6a97cd0d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -31,11 +31,12 @@
-spec(remove_from_queue/2 ::
(rabbit_amqqueue:name(), [pid()])
- -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
+ -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+ (rabbit_amqqueue:name(), node()) ->
+ {'ok', atom()} | rabbit_types:error(any())).
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -59,7 +60,6 @@
remove_from_queue(QueueName, DeadGMPids) ->
DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes,
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -73,42 +73,45 @@ remove_from_queue(QueueName, DeadGMPids) ->
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, [], []};
+ {ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = store_updated_slaves(
- Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
- %% Sometimes a slave dying means we need
- %% to start more on other nodes -
- %% "exactly" mode can cause this to
- %% happen.
- {_, OldNodes} = actual_queue_nodes(Q1),
- {_, NewNodes} = suggested_queue_nodes(
- Q1, ClusterNodes),
- {ok, QPid1, [QPid | SPids] -- Alive,
- NewNodes -- OldNodes};
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, [], []}
+ {ok, QPid1, []}
end
end
end).
on_node_up() ->
- ClusterNodes = rabbit_mnesia:cluster_nodes(running),
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName}, QNames0) ->
+ fun (Q = #amqqueue{name = QName,
+ pid = Pid,
+ slave_pids = SPids}, QNames0) ->
+ %% We don't want to pass in the whole
+ %% cluster - we don't want a situation
+ %% where starting one node causes us to
+ %% decide to start a mirror on another
+ PossibleNodes0 = [node(P) || P <- [Pid | SPids]],
+ PossibleNodes =
+ case lists:member(node(), PossibleNodes0) of
+ true -> PossibleNodes0;
+ false -> [node() | PossibleNodes0]
+ end,
{_MNode, SNodes} = suggested_queue_nodes(
- Q, ClusterNodes),
+ Q, PossibleNodes),
case lists:member(node(), SNodes) of
true -> [QName | QNames0];
false -> QNames0
@@ -234,14 +237,14 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, ClusterNodes) ->
+suggested_queue_nodes(Q, PossibleNodes) ->
{MNode0, SNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, ClusterNodes).
+ {MNode, SNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,11 +252,11 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) ->
- {MNode, All -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
+ {MNode, Possible -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- Unavailable = Nodes -- All,
+ Unavailable = Nodes -- Possible,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -264,10 +267,10 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
false -> promote_slave(Available)
end
end;
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) ->
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = (All -- [MNode]) -- SNodes,
+ true -> Cand = (Possible -- [MNode]) -- SNodes,
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
@@ -309,13 +312,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
All = fun ({A,B}) -> [A|B] end,
OldNodes = All(actual_queue_nodes(OldQ)),
NewNodes = All(suggested_queue_nodes(NewQ)),
- %% When a mirror dies, remove_from_queue/2 might have to add new
- %% slaves (in "exactly" mode). It will check mnesia to see which
- %% slaves there currently are. If drop_mirror/2 is invoked first
- %% then when we end up in remove_from_queue/2 it will not see the
- %% slaves that add_mirror/2 will add, and also want to add them
- %% (even though we are not responding to the death of a
- %% mirror). Breakage ensues.
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 53e61e2d..931a7f90 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -180,25 +180,20 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids, ExtraNodes} ->
+ {ok, Pid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
gen_server2:reply(From, ok),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
node(Pid) =:= node() ->
%% we've become master
QueueState = promote_me(From, State),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
{become, rabbit_amqqueue_process, QueueState, hibernate};
true ->
%% master has changed to not us.
gen_server2:reply(From, ok),
- %% assertion, we don't need to add_mirrors/2 in this
- %% branch, see last clause in remove_from_queue/2
- [] = ExtraNodes,
erlang:monitor(process, Pid),
noreply(State #state { master_pid = Pid })
end