diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-08-17 12:16:05 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-08-17 12:16:05 +0100 |
commit | 1ab8e0cbc66e711c3182d7ae734c704d5b0f183b (patch) | |
tree | 41e9db3f37d8fd62c95b56a41a0b108092d2dca2 | |
parent | 57ffe98c9dc85eff674187a69eec7757965723c3 (diff) | |
download | rabbitmq-server-1ab8e0cbc66e711c3182d7ae734c704d5b0f183b.tar.gz |
at-least mode can imply that we need to start slaves in response to slaves dying elsewhere. So do that.
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 40 |
1 files changed, 30 insertions, 10 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 5f36a19f..52846f58 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -58,38 +58,58 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, DeadPids) -> - DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + +remove_from_queue(QueueName, DeadGMPids) -> + case remove_from_queue0(QueueName, DeadGMPids) of + {ok, NewMPid, DeadQPids, ExtraNodes} -> + [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes], + {ok, NewMPid, DeadQPids}; + Other -> + Other + end. + +remove_from_queue0(QueueName, DeadGMPids) -> + DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we %% get here. case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [Q = #amqqueue { pid = QPid, + [Q = #amqqueue { name = QName, + pid = QPid, slave_pids = SPids }] -> Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes) orelse - rabbit_misc:is_process_alive(Pid)], + %% TODO when bug 25104 hits default do whatever it does. + false], + %% rabbit_misc:is_process_alive(Pid)], {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - {ok, QPid1, []}; + {ok, QPid1, [], []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. - store_updated_slaves( - Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), - {ok, QPid1, [QPid | SPids] -- Alive}; + Q1 = store_updated_slaves( + Q #amqqueue { pid = QPid1, + slave_pids = SPids1 }), + %% Sometimes a slave dying means we need + %% to start more on other nodes - + %% "at-least" mode can cause this to + %% happen. + {_, OldNodes} = actual_queue_nodes(Q1), + {_, NewNodes} = suggested_queue_nodes(Q1), + {ok, QPid1, [QPid | SPids] -- Alive, + NewNodes -- OldNodes}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - {ok, QPid1, []} + {ok, QPid1, [], []} end end end). |