summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-07-23 12:01:49 +0100
committerTim Watson <tim@rabbitmq.com>2012-07-23 12:01:49 +0100
commitf05a156d3d4af4dd47972f9bb639f14feb64ec2d (patch)
tree178c1bd3367faf5bf952497c49bd183bdd086600
parent69ca13a659d158ca53eaeb50ce837129144d1372 (diff)
parent0f6ba029bfef0650dce5625465d1914d47497c3b (diff)
downloadrabbitmq-server-f05a156d3d4af4dd47972f9bb639f14feb64ec2d.tar.gz
merge with default
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl58
-rw-r--r--src/rabbit_mirror_queue_slave.erl59
3 files changed, 89 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d82ac266..aa877b9d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -464,6 +464,7 @@ force_event_refresh() ->
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
+ %% BUG-24942/3: could one of these pids could be stale!?
{_, Bad} = rabbit_misc:multi_call(
[Q#amqqueue.pid || Q <- Qs], force_event_refresh),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
@@ -594,6 +595,7 @@ set_maximum_since_use(QPid, Age) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
+ %% BUG-24942/3: could one of these pids could be stale!?
qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..10e1b92b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -130,25 +130,45 @@ add_mirror(VHostPath, QueueName, MirrorNode) ->
add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
add_mirror(Queue, MirrorNode) ->
- if_mirrored_queue(
- Queue,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
- end
- end).
+ if_mirrored_queue(Queue,
+ fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,MirrorNode}};
+ false ->
+ %% See BUG-24942: we have a stale pid from an old
+ %% incarnation of this node, because we've come
+ %% back online faster than the node_down handling
+ %% logic was able to deal with a death signal. We
+ %% shall replace the stale pid, and the slave start
+ %% logic handles this explicitly
+ start_child(Name, MirrorNode, Q)
+ end
+ end
+ end).
+
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} ->
+ %% NB: this means the mirror process was
+ %% already running on the given node.
+ ok;
+ {ok, SPid} ->
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ {error, {stale_master_pid, StalePid}} ->
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, StalePid]),
+ ok;
+ Other ->
+ Other
+ end.
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 60d3e027..9722c53a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -102,18 +102,51 @@ init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
+ {new, QPid};
+ [MPid] when MPid =:= QPid ->
+ case rabbit_misc:is_process_alive(MPid) of
+ true ->
+ %% What this appears to mean is that this
+ %% node is attempting to start a slave, but
+ %% a pid already exists for this node and
+ %% it is *already* the master!
+ %% This should never happen, so we fail noisily
+ throw({invariant_failed,
+ {duplicate_live_master, Node}});
+ false ->
+ %% See bug24942: we have detected a stale
+ %% master pid (from a previous incarnation
+ %% of this node) which hasn't been detected
+ %% via nodedown recovery. We cannot recover
+ %% it here, so we bail and log the error.
+ %% This does mean that this node is not a
+ %% well behaving member of the HA configuration
+ %% for this cluster and we have opened bug25074
+ %% to address this situation explicitly.
+ {stale, MPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> %% See bug24942: we have detected a stale
+ %% slave pid (from a previous incarnation
+ %% of this node) which hasn't been detected
+ %% via nodedown recovery.
+ MPids1 = (MPids -- [SPid]) ++ [Self],
ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
- end
- end) of
+ Q1#amqqueue{slave_pids=MPids1}),
+ {new, QPid}
+ end
+ end
+ end) of
{new, MPid} ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -150,6 +183,10 @@ init(#amqqueue { name = QueueName } = Q) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, StalePid} ->
+ %% we cannot proceed if the master is stale, therefore we
+ %% fail to start and allow the error to be logged
+ {stop, {stale_master_pid, StalePid}};
existing ->
ignore
end.