summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-08-17 12:16:05 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-08-17 12:16:05 +0100
commit1ab8e0cbc66e711c3182d7ae734c704d5b0f183b (patch)
tree41e9db3f37d8fd62c95b56a41a0b108092d2dca2
parent57ffe98c9dc85eff674187a69eec7757965723c3 (diff)
downloadrabbitmq-server-1ab8e0cbc66e711c3182d7ae734c704d5b0f183b.tar.gz
at-least mode can imply that we need to start slaves in response to slaves dying elsewhere. So do that.
-rw-r--r--src/rabbit_mirror_queue_misc.erl40
1 files changed, 30 insertions, 10 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 5f36a19f..52846f58 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -58,38 +58,58 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, DeadPids) ->
- DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+
+remove_from_queue(QueueName, DeadGMPids) ->
+ case remove_from_queue0(QueueName, DeadGMPids) of
+ {ok, NewMPid, DeadQPids, ExtraNodes} ->
+ [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes],
+ {ok, NewMPid, DeadQPids};
+ Other ->
+ Other
+ end.
+
+remove_from_queue0(QueueName, DeadGMPids) ->
+ DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
+ [Q = #amqqueue { name = QName,
+ pid = QPid,
slave_pids = SPids }] ->
Alive = [Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid),
DeadNodes) orelse
- rabbit_misc:is_process_alive(Pid)],
+ %% TODO when bug 25104 hits default do whatever it does.
+ false],
+ %% rabbit_misc:is_process_alive(Pid)],
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, []};
+ {ok, QPid1, [], []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- store_updated_slaves(
- Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ Q1 = store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
+ %% Sometimes a slave dying means we need
+ %% to start more on other nodes -
+ %% "at-least" mode can cause this to
+ %% happen.
+ {_, OldNodes} = actual_queue_nodes(Q1),
+ {_, NewNodes} = suggested_queue_nodes(Q1),
+ {ok, QPid1, [QPid | SPids] -- Alive,
+ NewNodes -- OldNodes};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, []}
+ {ok, QPid1, [], []}
end
end
end).