diff options
author | Tim Watson <tim.watson@gmail.com> | 2012-06-13 14:13:55 +0100 |
---|---|---|
committer | Tim Watson <tim.watson@gmail.com> | 2012-06-13 14:13:55 +0100 |
commit | eca7e0725db080a249bca1720d06f5d2fb49e7cd (patch) | |
tree | 4343b79cd5460d534bca849352bdf2172784c1aa | |
parent | 88bdcd465e8d9a55bbeb554c939d93ede37fd7a0 (diff) | |
download | rabbitmq-server-eca7e0725db080a249bca1720d06f5d2fb49e7cd.tar.gz |
attempt to handle stale slave pids during intialisation routines
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 61 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 82 |
3 files changed, 121 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c1673504..9879d9b4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -439,6 +439,7 @@ force_event_refresh() -> force_event_refresh(QNames) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], + %% BUG-24942/3: could one of these pids could be stale!? {_, Bad} = rabbit_misc:multi_call( [Q#amqqueue.pid || Q <- Qs], force_event_refresh), FailedPids = [Pid || {Pid, _Reason} <- Bad], @@ -569,6 +570,7 @@ set_maximum_since_use(QPid, Age) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = + %% BUG-24942/3: could one of these pids could be stale!? qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} || #amqqueue{name = QName, pid = Pid, slave_pids = []} diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..f434f524 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -130,25 +130,48 @@ add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). add_mirror(Queue, MirrorNode) -> - if_mirrored_queue( - Queue, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> case rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]) of - {ok, undefined} -> %% Already running - ok; - {ok, SPid} -> - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; - Other -> - Other - end; - [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} - end - end). + if_mirrored_queue(Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + %% TODO: this condition is silently ignored - should + %% we really be arriving at this state at all? + {error,{queue_already_mirrored_on_node,MirrorNode}}; + false -> + %% BUG-24942: we need to strip out this dead pid + %% now, so we do so directly - perhaps we ought + %% to start the txn sooner in order to get a more + %% coarse grained lock though.... + %% + %% BUG-24942: QUESTION - do we need to report that + %% something has changed (either via gm or via + %% the rabbit_event mechanisms) here? + Q1 = Q#amqqueue{ slave_pids = (SPids -- [SPid]) }, + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = rabbit_amqqueue:store_queue(Q1) + end), + start_child(Name, MirrorNode, Q1) + end + end + end). + +start_child(Name, MirrorNode, Q) -> + case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + {ok, undefined} -> %% Already running + ok; + {ok, SPid} -> rabbit_log:info( + "Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + ok; + Other -> %% BUG-24942: should this not be checked for + %% error conditions or something? + Other + end. if_mirrored_queue(Queue, Fun) -> rabbit_amqqueue:with( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e412fbbc..87fbfdfb 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -106,15 +106,83 @@ init(#amqqueue { name = QueueName } = Q) -> [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( + [] -> + MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue( Q1 #amqqueue { slave_pids = MPids1 }), - {new, QPid}; - [SPid] -> true = rabbit_misc:is_process_alive(SPid), - existing + {new, QPid}; + [MPid] when MPid =:= QPid -> + case rabbit_misc:is_process_alive(MPid) of + true -> + %% Well damn, this shouldn't really happen - + %% what this appears to mean is that this + %% node is attempting to start a slave, but + %% a pid already exists for this node and + %% it is *already* the master! This state + %% probably requires a bit more thought. + existing; + false -> + %% We were the master, died, came back + %% online (but not after 3 days!) and + %% our death hasn't been registered by the + %% rest of our unbelieving flock yet! + %% + %% Actually, this is worse than it seems, + %% because the master is now down but the + %% pid in mnesia is from an old incarnation + %% of this node, so messages to it will be + %% silently dropped by Erlang (with some + %% helpful noise in the logs). + %% + %% I'm not sure how we're supposed to + %% recover from this. Won't messages get + %% lost, or at least lose their ordering + %% in this situation? Because a slave that + %% comes back online doesn't contain any + %% state (a slave will start off empty as + %% if they have no mirrored content at all) + %% then don't we fine ourselves in a slightly + %% inconsistent position here? + %% + %% In this scenario, I wonder whether we + %% should call init_with_backing_queue_state + %% to try and recover? + {stale, MPid} + end; + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + existing; + false -> + %% we need to replace this pid as it + %% is stale, from an old incarnation + %% of this node. + %% + %% NB: I *do* think we should completely + %% initialise this process before exiting + %% the mnesia txn in this case - once + %% we're *comitted* then I'd expect this + %% slave to become subject to any and all + %% invariants that members of the + %% slave_pids list should enforce, and + %% I'm *not* convinced this is possible + %% immediately after the txn commits + %% as the remaining initialisation code + %% could take arbitrary time to complete. + + MPids1 = (MPids -- [SPid]) ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{slave_pids=MPids1}), + {new, QPid} + end end end) of {new, MPid} -> + %% BUG-24942: *should* we move the whole initialisation process (bar + %% obviously the trap_exit and erlang:monitor/2 calls) into the + %% mnesia transaction? We could optionally return {ready, State} + %% from it, and in that case immediately return.... + process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), receive {joined, GM} -> @@ -150,6 +218,10 @@ init(#amqqueue { name = QueueName } = Q) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; + {stale, OldPid} -> + %% NB: placeholder for doing something actually useful here... + %% such as {error, {stale_master_pid, OldPid}} + ignore; existing -> ignore end. |