summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-12-06 16:52:13 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-12-06 16:52:13 +0000
commit8f088d8fe0403274b417bf7baad16de317ac23bb (patch)
tree6719bec734b8ba01c1fb4f7a30abbdb124f23e74
parentbd4dad1150be114357b8c5730c9942b43b425c5b (diff)
downloadrabbitmq-server-8f088d8fe0403274b417bf7baad16de317ac23bb.tar.gz
Add rabbitmqctl sync_queue, and make it synchronous. Also fix the fact that we were treating "no queues to sync" as a 'normal' crash of the syncer, which... no.
-rw-r--r--docs/rabbitmqctl.1.xml31
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_control_main.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_sync.erl11
6 files changed, 67 insertions, 14 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 34947b66..1583ab98 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -446,6 +446,37 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to synchronise.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a mirrored queue with unsynchronised slaves to
+ synchronise itself. The queue will block while
+ synchronisation takes place (all publishers to and
+ consumers from the queue will block). The queue must be
+ mirrored, must have unsynchronised slaves, and must not
+ have any pending unacknowledged messages for this
+ command to succeed.
+ </para>
+ <para>
+ Note that unsynchronised queues from which messages are
+ being drained will become synchronised eventually. This
+ command is primarily useful for queues which are not
+ being drained.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4bdab0bc..ae96f739 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync/2]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -173,8 +173,9 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
--spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) ->
- 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(sync/2 :: (binary(), rabbit_types:vhost()) ->
+ 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored' |
+ 'already_synced')).
-endif.
@@ -592,7 +593,12 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
-sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors).
+sync(QNameBin, VHostBin) ->
+ QName = rabbit_misc:r(VHostBin, queue, QNameBin),
+ case lookup(QName) of
+ {ok, #amqqueue{pid = QPid}} -> delegate_call(QPid, sync_mirrors);
+ E -> E
+ end.
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index acbea4e9..730c235e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1153,15 +1153,14 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
-handle_call(sync_mirrors, From,
+handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> gen_server2:reply(From, ok),
- case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
{shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)};
- {ok, BQS1} -> noreply(S(BQS1))
+ {Result, BQS1} -> reply(Result, S(BQS1))
end;
_ -> reply({error, pending_acks}, State)
end;
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 669a0787..b4272555 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -50,6 +50,7 @@
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
+ {sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -280,6 +281,12 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
rpc_call(Node, rabbit_mnesia, forget_cluster_node,
[ClusterNode, RemoveWhenOffline]);
+action(sync_queue, Node, [Queue], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Synchronising queue ~s in ~s", [Queue, VHost]),
+ rpc_call(Node, rabbit_amqqueue, sync,
+ [list_to_binary(Queue), list_to_binary(VHost)]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 3d7f902c..03a712d6 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -142,11 +142,12 @@ sync_mirrors(State = #state { name = QName,
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
- {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
- {sync_died, R, BQS1} -> Log("~p", [R]),
- {ok, S(BQS1)};
- {ok, BQS1} -> Log("complete", []),
- {ok, S(BQS1)}
+ {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
+ {sync_died, R, BQS1} -> Log("~p", [R]),
+ {ok, S(BQS1)};
+ {already_synced, BQS1} -> {{error, already_synced}, S(BQS1)};
+ {ok, BQS1} -> Log("complete", []),
+ {ok, S(BQS1)}
end.
terminate({shutdown, dropped} = Reason,
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index a80f8f50..f56cf5b3 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -42,6 +42,7 @@
%% || || <--- sync_ready ---- ||
%% || || (or) ||
%% || || <--- sync_deny ----- ||
+%% || <--- ready ---- || ||
%% || <--- next* ---- || || }
%% || ---- msg* ----> || || } loop
%% || || ---- sync_msg* ----> || }
@@ -60,6 +61,13 @@ master_prepare(Ref, Log, SPids) ->
master_go(Syncer, Ref, Log, BQ, BQS) ->
Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+ receive
+ {'EXIT', Syncer, normal} -> {already_synced, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
+ {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ end.
+
+master_go0(Args, BQ, BQS) ->
case BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
master_send(Args, I, Last, Msg, MsgProps)
end, {0, erlang:now()}, BQS) of
@@ -111,7 +119,8 @@ syncer(Ref, Log, MPid, SPids) ->
%% *without* those messages ending up in their gen_server2 pqueue.
case foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3) of
[] -> Log("all slaves already synced", []);
- SPidsMRefs1 -> Log("~p to sync", [[rabbit_misc:pid_to_string(S) ||
+ SPidsMRefs1 -> MPid ! {ready, self()},
+ Log("~p to sync", [[rabbit_misc:pid_to_string(S) ||
{S, _} <- SPidsMRefs1]]),
SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3)