diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-07-25 13:09:18 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-07-25 13:09:18 +0100 |
commit | 78c5faed5c2b8f11371028f3857ce935b9c8eb75 (patch) | |
tree | 2de8130108e52adce3fd960304021a5f9521dc62 | |
parent | 69ca13a659d158ca53eaeb50ce837129144d1372 (diff) | |
parent | 7c0fbe7939baa5347f225140d3eb767f140efbf5 (diff) | |
download | rabbitmq-server-78c5faed5c2b8f11371028f3857ce935b9c8eb75.tar.gz |
Merged bug24942 into default
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 48 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 44 |
2 files changed, 65 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..ba62a734 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -62,7 +62,9 @@ remove_from_queue(QueueName, DeadPids) -> slave_pids = SPids }] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + not lists:member(node(Pid), + DeadNodes) orelse + rabbit_misc:is_process_alive(Pid)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, []}; @@ -134,22 +136,40 @@ add_mirror(Queue, MirrorNode) -> Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> case rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]) of - {ok, undefined} -> %% Already running - ok; - {ok, SPid} -> - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; - Other -> - Other - end; - [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} + [] -> + start_child(Name, MirrorNode, Q); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + {error,{queue_already_mirrored_on_node, + MirrorNode}}; + false -> + start_child(Name, MirrorNode, Q) + end end end). +start_child(Name, MirrorNode, Q) -> + case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + {ok, undefined} -> + %% this means the mirror process was + %% already running on the given node. + ok; + {ok, SPid} -> + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + ok; + {error, {{stale_master_pid, StalePid}, _}} -> + rabbit_log:warning("Detected stale HA master while adding " + "mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, StalePid]), + ok; + {error, {{duplicate_live_master, _}=Err, _}} -> + throw(Err); + Other -> + Other + end. + if_mirrored_queue(Queue, Fun) -> rabbit_amqqueue:with( Queue, fun (#amqqueue { arguments = Args } = Q) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 60d3e027..c4ae307c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -101,19 +101,10 @@ info(QPid) -> init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - case rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {new, QPid}; - [SPid] -> true = rabbit_misc:is_process_alive(SPid), - existing - end - end) of + case rabbit_misc:execute_mnesia_transaction(fun() -> + init_it(Self, Node, + QueueName) + end) of {new, MPid} -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -150,10 +141,37 @@ init(#amqqueue { name = QueueName } = Q) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; + {stale, StalePid} -> + {stop, {stale_master_pid, StalePid}}; + duplicate_live_master -> + {stop, {duplicate_live_master, Node}}; existing -> ignore end. +init_it(Self, Node, QueueName) -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> + MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}), + {new, QPid}; + [QPid] -> + case rabbit_misc:is_process_alive(QPid) of + true -> duplicate_live_master; + false -> {stale, QPid} + end; + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> MPids1 = (MPids -- [SPid]) ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{ slave_pids = MPids1 }), + {new, QPid} + end + end. + handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the |