diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-12-06 16:52:13 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-12-06 16:52:13 +0000 |
commit | 8f088d8fe0403274b417bf7baad16de317ac23bb (patch) | |
tree | 6719bec734b8ba01c1fb4f7a30abbdb124f23e74 | |
parent | bd4dad1150be114357b8c5730c9942b43b425c5b (diff) | |
download | rabbitmq-server-8f088d8fe0403274b417bf7baad16de317ac23bb.tar.gz |
Add rabbitmqctl sync_queue, and make it synchronous. Also fix the fact that we were treating "no queues to sync" as a 'normal' crash of the syncer, which... no.
-rw-r--r-- | docs/rabbitmqctl.1.xml | 31 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 11 |
6 files changed, 67 insertions, 14 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 34947b66..1583ab98 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -446,6 +446,37 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to synchronise. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a mirrored queue with unsynchronised slaves to + synchronise itself. The queue will block while + synchronisation takes place (all publishers to and + consumers from the queue will block). The queue must be + mirrored, must have unsynchronised slaves, and must not + have any pending unacknowledged messages for this + command to succeed. + </para> + <para> + Note that unsynchronised queues from which messages are + being drained will become synchronised eventually. This + command is primarily useful for queues which are not + being drained. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4bdab0bc..ae96f739 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). +-export([start_mirroring/1, stop_mirroring/1, sync/2]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -173,8 +173,9 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). --spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) -> - 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(sync/2 :: (binary(), rabbit_types:vhost()) -> + 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored' | + 'already_synced')). -endif. @@ -592,7 +593,12 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). -sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors). +sync(QNameBin, VHostBin) -> + QName = rabbit_misc:r(VHostBin, queue, QNameBin), + case lookup(QName) of + {ok, #amqqueue{pid = QPid}} -> delegate_call(QPid, sync_mirrors); + E -> E + end. on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index acbea4e9..730c235e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1153,15 +1153,14 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); -handle_call(sync_mirrors, From, +handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> gen_server2:reply(From, ok), - case rabbit_mirror_queue_master:sync_mirrors(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of {shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)}; - {ok, BQS1} -> noreply(S(BQS1)) + {Result, BQS1} -> reply(Result, S(BQS1)) end; _ -> reply({error, pending_acks}, State) end; diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 669a0787..b4272555 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -50,6 +50,7 @@ update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, + {sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -280,6 +281,12 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> rpc_call(Node, rabbit_mnesia, forget_cluster_node, [ClusterNode, RemoveWhenOffline]); +action(sync_queue, Node, [Queue], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Synchronising queue ~s in ~s", [Queue, VHost]), + rpc_call(Node, rabbit_amqqueue, sync, + [list_to_binary(Queue), list_to_binary(VHost)]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3d7f902c..03a712d6 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -142,11 +142,12 @@ sync_mirrors(State = #state { name = QName, gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of - {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; - {sync_died, R, BQS1} -> Log("~p", [R]), - {ok, S(BQS1)}; - {ok, BQS1} -> Log("complete", []), - {ok, S(BQS1)} + {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; + {sync_died, R, BQS1} -> Log("~p", [R]), + {ok, S(BQS1)}; + {already_synced, BQS1} -> {{error, already_synced}, S(BQS1)}; + {ok, BQS1} -> Log("complete", []), + {ok, S(BQS1)} end. terminate({shutdown, dropped} = Reason, diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index a80f8f50..f56cf5b3 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -42,6 +42,7 @@ %% || || <--- sync_ready ---- || %% || || (or) || %% || || <--- sync_deny ----- || +%% || <--- ready ---- || || %% || <--- next* ---- || || } %% || ---- msg* ----> || || } loop %% || || ---- sync_msg* ----> || } @@ -60,6 +61,13 @@ master_prepare(Ref, Log, SPids) -> master_go(Syncer, Ref, Log, BQ, BQS) -> Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, + receive + {'EXIT', Syncer, normal} -> {already_synced, BQS}; + {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; + {ready, Syncer} -> master_go0(Args, BQ, BQS) + end. + +master_go0(Args, BQ, BQS) -> case BQ:fold(fun (Msg, MsgProps, {I, Last}) -> master_send(Args, I, Last, Msg, MsgProps) end, {0, erlang:now()}, BQS) of @@ -111,7 +119,8 @@ syncer(Ref, Log, MPid, SPids) -> %% *without* those messages ending up in their gen_server2 pqueue. case foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3) of [] -> Log("all slaves already synced", []); - SPidsMRefs1 -> Log("~p to sync", [[rabbit_misc:pid_to_string(S) || + SPidsMRefs1 -> MPid ! {ready, self()}, + Log("~p to sync", [[rabbit_misc:pid_to_string(S) || {S, _} <- SPidsMRefs1]]), SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3) |