summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-12 15:03:12 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-12 15:03:12 +0000
commitcd629f5c15b4113b366bca43b159a55fa851dcfd (patch)
tree93764e81bfff6f0380f1f4031e31580c00c09518
parent451849adf9a5e0b589d5ed85af1663d129eebf19 (diff)
downloadrabbitmq-server-cd629f5c15b4113b366bca43b159a55fa851dcfd.tar.gz
Restore synchronous addition of slaves in the queue.declare case.
-rw-r--r--Makefile2
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl26
-rw-r--r--src/rabbit_mirror_queue_slave.erl30
4 files changed, 40 insertions, 26 deletions
diff --git a/Makefile b/Makefile
index e413f879..859825f5 100644
--- a/Makefile
+++ b/Makefile
@@ -222,7 +222,7 @@ start-background-node: all
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
- ./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid
+ #./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid
stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 3abd81f5..d9cef642 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -110,7 +110,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
#state { name = QName,
gm = GM,
coordinator = CPid,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 65cc58be..f2c8b211 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_misc).
-behaviour(rabbit_policy_validator).
--export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1]).
@@ -46,10 +46,8 @@
(rabbit_amqqueue:name(), pid(), [pid()])
-> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
--spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
--spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) ->
- {'ok', atom()} | rabbit_types:error(any())).
+-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async')
+ -> 'ok').
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -141,7 +139,7 @@ on_node_up() ->
end
end, [], rabbit_queue)
end),
- [add_mirror(QName, node()) || QName <- QNames],
+ [add_mirror(QName, node(), async) || QName <- QNames],
ok.
drop_mirrors(QName, Nodes) ->
@@ -166,26 +164,26 @@ drop_mirror(QName, MirrorNode) ->
end
end).
-add_mirrors(QName, Nodes) ->
- [add_mirror(QName, Node) || Node <- Nodes],
+add_mirrors(QName, Nodes, SyncMode) ->
+ [add_mirror(QName, Node, SyncMode) || Node <- Nodes],
ok.
-add_mirror(QName, MirrorNode) ->
+add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_amqqueue:with(
QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
- start_child(Name, MirrorNode, Q);
+ start_child(Name, MirrorNode, Q, SyncMode);
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
true -> {ok, already_mirrored};
- false -> start_child(Name, MirrorNode, Q)
+ false -> start_child(Name, MirrorNode, Q, SyncMode)
end
end
end).
-start_child(Name, MirrorNode, Q) ->
+start_child(Name, MirrorNode, Q, SyncMode) ->
case rabbit_misc:with_exit_handler(
rabbit_misc:const(down),
fun () ->
@@ -193,7 +191,7 @@ start_child(Name, MirrorNode, Q) ->
end) of
{ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid);
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
_ -> ok
end.
@@ -308,7 +306,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
{NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
- add_mirrors (QName, NewNodes -- OldNodes),
+ add_mirrors (QName, NewNodes -- OldNodes, async),
drop_mirrors(QName, OldNodes -- NewNodes),
maybe_auto_sync(NewQ),
ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 69e68d20..96f89ecc 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2, info/1, go/1]).
+-export([start_link/1, set_maximum_since_use/2, info/1, go/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -83,9 +83,10 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
-go(SPid) -> gen_server2:cast(SPid, go).
+go(SPid, sync) -> gen_server2:call(SPid, go, infinity);
+go(SPid, async) -> gen_server2:cast(SPid, go).
-handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) ->
+handle_go(Q = #amqqueue{name = QName}) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -132,26 +133,26 @@ handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) ->
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
- {noreply, State};
+ {ok, State};
{stale, StalePid} ->
rabbit_log:warning("Detected stale HA master while adding "
"mirror of ~s: ~p~n",
[rabbit_misc:rs(QName), StalePid]),
gm:leave(GM),
- {stop, {stale_master_pid, StalePid}, NotStarted};
+ {error, {stale_master_pid, StalePid}};
duplicate_live_master ->
gm:leave(GM),
- {stop, {duplicate_live_master, Node}, NotStarted};
+ {error, {duplicate_live_master, Node}};
existing ->
gm:leave(GM),
- {stop, normal, NotStarted};
+ {error, normal};
master_in_recovery ->
gm:leave(GM),
%% The queue record vanished - we must have a master starting
%% concurrently with us. In that case we can safely decide to do
%% nothing here, and the master will start us in
%% master:init_with_existing_bq/3
- {stop, normal, NotStarted}
+ {error, normal}
end.
init_it(Self, GM, Node, QName) ->
@@ -185,6 +186,12 @@ add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
+handle_call(go, _From, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {reply, ok, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
+
handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
@@ -220,8 +227,11 @@ handle_call({gm_deaths, LiveGMPids}, From,
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State).
-handle_cast(go, {not_started, _Q} = NotStarted) ->
- handle_go(NotStarted);
+handle_cast(go, {not_started, Q} = NotStarted) ->
+ case handle_go(Q) of
+ {ok, State} -> {noreply, State};
+ {error, Error} -> {stop, Error, NotStarted}
+ end;
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));