summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-08 15:55:57 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-08 15:55:57 +0100
commitfea7363c2873941d2135f6a480e38b2f1c12f3c4 (patch)
tree9366b571c8a8d5ae85a07d46e0958d1aac31fb3c
parent265b0c1db5715b79f39864f58892f496b827833f (diff)
downloadrabbitmq-server-fea7363c2873941d2135f6a480e38b2f1c12f3c4.tar.gz
WIP
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_misc.erl68
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
3 files changed, 35 insertions, 43 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 40359da3..676cb519 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -344,10 +344,9 @@ handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, MPid, DeadPids, ExtraNodes} ->
+ {ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0226bcb..dbc4a253 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -31,7 +31,7 @@
-spec(remove_from_queue/2 ::
(rabbit_amqqueue:name(), [pid()])
- -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
+ -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
@@ -59,7 +59,6 @@
remove_from_queue(QueueName, DeadGMPids) ->
DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes,
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -73,42 +72,45 @@ remove_from_queue(QueueName, DeadGMPids) ->
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, [], []};
+ {ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- Q1 = store_updated_slaves(
- Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
- %% Sometimes a slave dying means we need
- %% to start more on other nodes -
- %% "exactly" mode can cause this to
- %% happen.
- {_, OldNodes} = actual_queue_nodes(Q1),
- {_, NewNodes} = suggested_queue_nodes(
- Q1, ClusterNodes),
- {ok, QPid1, [QPid | SPids] -- Alive,
- NewNodes -- OldNodes};
+ store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, [], []}
+ {ok, QPid1, []}
end
end
end).
on_node_up() ->
- ClusterNodes = rabbit_mnesia:cluster_nodes(running),
QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (Q = #amqqueue{name = QName}, QNames0) ->
+ fun (Q = #amqqueue{name = QName,
+ pid = Pid,
+ slave_pids = SPids}, QNames0) ->
+ %% We don't want to pass in the whole
+ %% cluster - we don't want a situation
+ %% where starting one node causes us to
+ %% decide to start a mirror on another
+ PossibleNodes0 = [node(P) || P <- [Pid | SPids]],
+ PossibleNodes =
+ case lists:member(node(), PossibleNodes0) of
+ true -> PossibleNodes0;
+ false -> [node() | PossibleNodes0]
+ end,
{_MNode, SNodes} = suggested_queue_nodes(
- Q, ClusterNodes),
+ Q, PossibleNodes),
case lists:member(node(), SNodes) of
true -> [QName | QNames0];
false -> QNames0
@@ -234,14 +236,17 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, ClusterNodes) ->
+suggested_queue_nodes(Q, PossibleNodes) ->
{MNode0, SNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
- suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, ClusterNodes).
+ R = suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
+ {MNode, SNodes}, PossibleNodes),
+ io:format("SQN: ~p~n", [{policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
+ {MNode, SNodes}, PossibleNodes, R}]),
+ R.
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,11 +254,11 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) ->
- {MNode, All -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
+ {MNode, Possible -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- Unavailable = Nodes -- All,
+ Unavailable = Nodes -- Possible,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -264,10 +269,10 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
false -> promote_slave(Available)
end
end;
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) ->
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = (All -- [MNode]) -- SNodes,
+ true -> Cand = (Possible -- [MNode]) -- SNodes,
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
@@ -309,13 +314,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
All = fun ({A,B}) -> [A|B] end,
OldNodes = All(actual_queue_nodes(OldQ)),
NewNodes = All(suggested_queue_nodes(NewQ)),
- %% When a mirror dies, remove_from_queue/2 might have to add new
- %% slaves (in "exactly" mode). It will check mnesia to see which
- %% slaves there currently are. If drop_mirror/2 is invoked first
- %% then when we end up in remove_from_queue/2 it will not see the
- %% slaves that add_mirror/2 will add, and also want to add them
- %% (even though we are not responding to the death of a
- %% mirror). Breakage ensues.
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 307f2b4f..61423202 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -181,25 +181,20 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids, ExtraNodes} ->
+ {ok, Pid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
gen_server2:reply(From, ok),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
node(Pid) =:= node() ->
%% we've become master
QueueState = promote_me(From, State),
- rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
{become, rabbit_amqqueue_process, QueueState, hibernate};
true ->
%% master has changed to not us.
gen_server2:reply(From, ok),
- %% assertion, we don't need to add_mirrors/2 in this
- %% branch, see last clause in remove_from_queue/2
- [] = ExtraNodes,
erlang:monitor(process, Pid),
%% GM is lazy. So we know of the death of the
%% slave since it is a neighbour of ours, but