summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-07-25 13:09:18 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-07-25 13:09:18 +0100
commit78c5faed5c2b8f11371028f3857ce935b9c8eb75 (patch)
tree2de8130108e52adce3fd960304021a5f9521dc62
parent69ca13a659d158ca53eaeb50ce837129144d1372 (diff)
parent7c0fbe7939baa5347f225140d3eb767f140efbf5 (diff)
downloadrabbitmq-server-78c5faed5c2b8f11371028f3857ce935b9c8eb75.tar.gz
Merged bug24942 into default
-rw-r--r--src/rabbit_mirror_queue_misc.erl48
-rw-r--r--src/rabbit_mirror_queue_slave.erl44
2 files changed, 65 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..ba62a734 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -62,7 +62,9 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ not lists:member(node(Pid),
+ DeadNodes) orelse
+ rabbit_misc:is_process_alive(Pid)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
@@ -134,22 +136,40 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,
+ MirrorNode}};
+ false ->
+ start_child(Name, MirrorNode, Q)
+ end
end
end).
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} ->
+ %% this means the mirror process was
+ %% already running on the given node.
+ ok;
+ {ok, SPid} ->
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ {error, {{stale_master_pid, StalePid}, _}} ->
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, StalePid]),
+ ok;
+ {error, {{duplicate_live_master, _}=Err, _}} ->
+ throw(Err);
+ Other ->
+ Other
+ end.
+
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
Queue, fun (#amqqueue { arguments = Args } = Q) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 60d3e027..c4ae307c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -101,19 +101,10 @@ info(QPid) ->
init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
- end
- end) of
+ case rabbit_misc:execute_mnesia_transaction(fun() ->
+ init_it(Self, Node,
+ QueueName)
+ end) of
{new, MPid} ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -150,10 +141,37 @@ init(#amqqueue { name = QueueName } = Q) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, StalePid} ->
+ {stop, {stale_master_pid, StalePid}};
+ duplicate_live_master ->
+ {stop, {duplicate_live_master, Node}};
existing ->
ignore
end.
+init_it(Self, Node, QueueName) ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}),
+ {new, QPid};
+ [QPid] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{ slave_pids = MPids1 }),
+ {new, QPid}
+ end
+ end.
+
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the