diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-07-23 12:01:49 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-07-23 12:01:49 +0100 |
commit | f05a156d3d4af4dd47972f9bb639f14feb64ec2d (patch) | |
tree | 178c1bd3367faf5bf952497c49bd183bdd086600 | |
parent | 69ca13a659d158ca53eaeb50ce837129144d1372 (diff) | |
parent | 0f6ba029bfef0650dce5625465d1914d47497c3b (diff) | |
download | rabbitmq-server-f05a156d3d4af4dd47972f9bb639f14feb64ec2d.tar.gz |
merge with default
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 58 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 59 |
3 files changed, 89 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d82ac266..aa877b9d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -464,6 +464,7 @@ force_event_refresh() -> force_event_refresh(QNames) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], + %% BUG-24942/3: could one of these pids could be stale!? {_, Bad} = rabbit_misc:multi_call( [Q#amqqueue.pid || Q <- Qs], force_event_refresh), FailedPids = [Pid || {Pid, _Reason} <- Bad], @@ -594,6 +595,7 @@ set_maximum_since_use(QPid, Age) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = + %% BUG-24942/3: could one of these pids could be stale!? qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..10e1b92b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -130,25 +130,45 @@ add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). add_mirror(Queue, MirrorNode) -> - if_mirrored_queue( - Queue, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> case rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]) of - {ok, undefined} -> %% Already running - ok; - {ok, SPid} -> - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; - Other -> - Other - end; - [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} - end - end). + if_mirrored_queue(Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + {error,{queue_already_mirrored_on_node,MirrorNode}}; + false -> + %% See BUG-24942: we have a stale pid from an old + %% incarnation of this node, because we've come + %% back online faster than the node_down handling + %% logic was able to deal with a death signal. We + %% shall replace the stale pid, and the slave start + %% logic handles this explicitly + start_child(Name, MirrorNode, Q) + end + end + end). + +start_child(Name, MirrorNode, Q) -> + case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + {ok, undefined} -> + %% NB: this means the mirror process was + %% already running on the given node. + ok; + {ok, SPid} -> + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + ok; + {error, {stale_master_pid, StalePid}} -> + rabbit_log:warning("Detected stale HA master while adding " + "mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, StalePid]), + ok; + Other -> + Other + end. if_mirrored_queue(Queue, Fun) -> rabbit_amqqueue:with( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 60d3e027..9722c53a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -102,18 +102,51 @@ init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], + fun () -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> + MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1 #amqqueue { slave_pids = MPids1 }), + {new, QPid}; + [MPid] when MPid =:= QPid -> + case rabbit_misc:is_process_alive(MPid) of + true -> + %% What this appears to mean is that this + %% node is attempting to start a slave, but + %% a pid already exists for this node and + %% it is *already* the master! + %% This should never happen, so we fail noisily + throw({invariant_failed, + {duplicate_live_master, Node}}); + false -> + %% See bug24942: we have detected a stale + %% master pid (from a previous incarnation + %% of this node) which hasn't been detected + %% via nodedown recovery. We cannot recover + %% it here, so we bail and log the error. + %% This does mean that this node is not a + %% well behaving member of the HA configuration + %% for this cluster and we have opened bug25074 + %% to address this situation explicitly. + {stale, MPid} + end; + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> %% See bug24942: we have detected a stale + %% slave pid (from a previous incarnation + %% of this node) which hasn't been detected + %% via nodedown recovery. + MPids1 = (MPids -- [SPid]) ++ [Self], ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {new, QPid}; - [SPid] -> true = rabbit_misc:is_process_alive(SPid), - existing - end - end) of + Q1#amqqueue{slave_pids=MPids1}), + {new, QPid} + end + end + end) of {new, MPid} -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -150,6 +183,10 @@ init(#amqqueue { name = QueueName } = Q) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; + {stale, StalePid} -> + %% we cannot proceed if the master is stale, therefore we + %% fail to start and allow the error to be logged + {stop, {stale_master_pid, StalePid}}; existing -> ignore end. |