summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim.watson@gmail.com>2012-06-13 14:13:55 +0100
committerTim Watson <tim.watson@gmail.com>2012-06-13 14:13:55 +0100
commiteca7e0725db080a249bca1720d06f5d2fb49e7cd (patch)
tree4343b79cd5460d534bca849352bdf2172784c1aa
parent88bdcd465e8d9a55bbeb554c939d93ede37fd7a0 (diff)
downloadrabbitmq-server-eca7e0725db080a249bca1720d06f5d2fb49e7cd.tar.gz
attempt to handle stale slave pids during intialisation routines
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl61
-rw-r--r--src/rabbit_mirror_queue_slave.erl82
3 files changed, 121 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c1673504..9879d9b4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -439,6 +439,7 @@ force_event_refresh() ->
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
+ %% BUG-24942/3: could one of these pids could be stale!?
{_, Bad} = rabbit_misc:multi_call(
[Q#amqqueue.pid || Q <- Qs], force_event_refresh),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
@@ -569,6 +570,7 @@ set_maximum_since_use(QPid, Age) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
+ %% BUG-24942/3: could one of these pids could be stale!?
qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..f434f524 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -130,25 +130,48 @@ add_mirror(VHostPath, QueueName, MirrorNode) ->
add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
add_mirror(Queue, MirrorNode) ->
- if_mirrored_queue(
- Queue,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
- end
- end).
+ if_mirrored_queue(Queue,
+ fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ %% TODO: this condition is silently ignored - should
+ %% we really be arriving at this state at all?
+ {error,{queue_already_mirrored_on_node,MirrorNode}};
+ false ->
+ %% BUG-24942: we need to strip out this dead pid
+ %% now, so we do so directly - perhaps we ought
+ %% to start the txn sooner in order to get a more
+ %% coarse grained lock though....
+ %%
+ %% BUG-24942: QUESTION - do we need to report that
+ %% something has changed (either via gm or via
+ %% the rabbit_event mechanisms) here?
+ Q1 = Q#amqqueue{ slave_pids = (SPids -- [SPid]) },
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ ok = rabbit_amqqueue:store_queue(Q1)
+ end),
+ start_child(Name, MirrorNode, Q1)
+ end
+ end
+ end).
+
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} -> %% Already running
+ ok;
+ {ok, SPid} -> rabbit_log:info(
+ "Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ Other -> %% BUG-24942: should this not be checked for
+ %% error conditions or something?
+ Other
+ end.
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc..87fbfdfb 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -106,15 +106,83 @@ init(#amqqueue { name = QueueName } = Q) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
+ [] ->
+ MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
+ {new, QPid};
+ [MPid] when MPid =:= QPid ->
+ case rabbit_misc:is_process_alive(MPid) of
+ true ->
+ %% Well damn, this shouldn't really happen -
+ %% what this appears to mean is that this
+ %% node is attempting to start a slave, but
+ %% a pid already exists for this node and
+ %% it is *already* the master! This state
+ %% probably requires a bit more thought.
+ existing;
+ false ->
+ %% We were the master, died, came back
+ %% online (but not after 3 days!) and
+ %% our death hasn't been registered by the
+ %% rest of our unbelieving flock yet!
+ %%
+ %% Actually, this is worse than it seems,
+ %% because the master is now down but the
+ %% pid in mnesia is from an old incarnation
+ %% of this node, so messages to it will be
+ %% silently dropped by Erlang (with some
+ %% helpful noise in the logs).
+ %%
+ %% I'm not sure how we're supposed to
+ %% recover from this. Won't messages get
+ %% lost, or at least lose their ordering
+ %% in this situation? Because a slave that
+ %% comes back online doesn't contain any
+ %% state (a slave will start off empty as
+ %% if they have no mirrored content at all)
+ %% then don't we fine ourselves in a slightly
+ %% inconsistent position here?
+ %%
+ %% In this scenario, I wonder whether we
+ %% should call init_with_backing_queue_state
+ %% to try and recover?
+ {stale, MPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ existing;
+ false ->
+ %% we need to replace this pid as it
+ %% is stale, from an old incarnation
+ %% of this node.
+ %%
+ %% NB: I *do* think we should completely
+ %% initialise this process before exiting
+ %% the mnesia txn in this case - once
+ %% we're *comitted* then I'd expect this
+ %% slave to become subject to any and all
+ %% invariants that members of the
+ %% slave_pids list should enforce, and
+ %% I'm *not* convinced this is possible
+ %% immediately after the txn commits
+ %% as the remaining initialisation code
+ %% could take arbitrary time to complete.
+
+ MPids1 = (MPids -- [SPid]) ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{slave_pids=MPids1}),
+ {new, QPid}
+ end
end
end) of
{new, MPid} ->
+ %% BUG-24942: *should* we move the whole initialisation process (bar
+ %% obviously the trap_exit and erlang:monitor/2 calls) into the
+ %% mnesia transaction? We could optionally return {ready, State}
+ %% from it, and in that case immediately return....
+
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
receive {joined, GM} ->
@@ -150,6 +218,10 @@ init(#amqqueue { name = QueueName } = Q) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, OldPid} ->
+ %% NB: placeholder for doing something actually useful here...
+ %% such as {error, {stale_master_pid, OldPid}}
+ ignore;
existing ->
ignore
end.