diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-04 12:24:44 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-04 12:24:44 +0000 |
commit | 1d15ffde6cf646d05861d6cd71d4c4519f36c798 (patch) | |
tree | e52afb2d50f8d0ffbf59c55ff8b171c7a9380225 | |
parent | f6f0d2fe572ae7697d91139396d634864cecdf04 (diff) | |
parent | 4aa86fc039e0fdd7adf41b3b4e50b821c753a129 (diff) | |
download | rabbitmq-server-1d15ffde6cf646d05861d6cd71d4c4519f36c798.tar.gz |
Merge in bug24407.
-rw-r--r-- | docs/rabbitmqctl.1.xml | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 15 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 45 |
6 files changed, 115 insertions, 26 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a95f7b3d..31921769 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -476,6 +476,26 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to cancel synchronisation for. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a synchronising mirrored queue to stop + synchronising itself. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> @@ -1139,6 +1159,12 @@ i.e. those which could take over from the master without message loss.</para></listitem> </varlistentry> + <varlistentry> + <term>status</term> + <listitem><para>The status of the queue. Normally + 'running', but may be different if the queue is + synchronising.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>queueinfoitem</command>s are specified then queue name and depth are diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fbe146e8..2b5da0cb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,8 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, + cancel_sync_mirrors/1]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -175,6 +176,8 @@ -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(cancel_sync_mirrors/1 :: (pid()) -> + 'ok' | rabbit_types:error('not_mirrored')). -endif. @@ -600,7 +603,8 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). -sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). +cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6b065b96..854ba640 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + status }). -record(consumer, {tag, ack_required}). @@ -97,7 +98,8 @@ memory, slave_pids, synchronised_slave_pids, - backing_queue_status + backing_queue_status, + status ]). -define(CREATION_EVENT_KEYS, @@ -138,7 +140,8 @@ init(Q) -> unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = gb_trees:empty()}, + msg_id_to_channel = gb_trees:empty(), + status = running}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -164,7 +167,8 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = MTC}, + msg_id_to_channel = MTC, + status = running}, State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) @@ -926,6 +930,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(status, #q{status = Status}) -> + Status; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1149,8 +1155,22 @@ handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, + HandleInfo = fun (Status) -> + receive {'$gen_call', From, {info, Items}} -> + Infos = infos(Items, State#q{status = Status}), + gen_server2:reply(From, {ok, Infos}) + after 0 -> + ok + end + end, + EmitStats = fun (Status) -> + rabbit_event:if_enabled( + State, #q.stats_timer, + fun() -> emit_stats(State#q{status = Status}) end) + end, case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors( + HandleInfo, EmitStats, BQS) of {ok, BQS1} -> reply(ok, S(BQS1)); {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} end; @@ -1160,6 +1180,10 @@ handle_call(sync_mirrors, _From, handle_call(sync_mirrors, _From, State) -> reply({error, not_mirrored}, State); +%% By definition if we get this message here we do not have to do anything. +handle_call(cancel_sync_mirrors, _From, State) -> + reply({error, not_syncing}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 70ca6177..7f43c228 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,7 +17,7 @@ -module(rabbit_control_main). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, sync_queue/1]). +-export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -51,6 +51,7 @@ {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, {sync_queue, [?VHOST_DEF]}, + {cancel_sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -287,6 +288,12 @@ action(sync_queue, Node, [Q], Opts, Inform) -> rpc_call(Node, rabbit_control_main, sync_queue, [rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]); +action(cancel_sync_queue, Node, [Q], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Stopping synchronising queue ~s in ~s", [Q, VHost]), + rpc_call(Node, rabbit_control_main, cancel_sync_queue, + [rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); @@ -524,6 +531,12 @@ sync_queue(Q) -> rabbit_amqqueue:with( Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end). +cancel_sync_queue(Q) -> + rabbit_amqqueue:with( + Q, fun(#amqqueue{pid = QPid}) -> + rabbit_amqqueue:cancel_sync_mirrors(QPid) + end). + %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Application, Inform) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0df7ea1c..b5f72cad 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -28,7 +28,7 @@ -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). -behaviour(rabbit_backing_queue). @@ -46,10 +46,11 @@ -ifdef(use_specs). --export_type([death_fun/0, depth_fun/0]). +-export_type([death_fun/0, depth_fun/0, stats_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). -type(depth_fun() :: fun (() -> 'ok')). +-type(stats_fun() :: fun ((any()) -> 'ok')). -type(master_state() :: #state { name :: rabbit_amqqueue:name(), gm :: pid(), coordinator :: pid(), @@ -68,7 +69,7 @@ -spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> master_state()). -spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). --spec(sync_mirrors/1 :: (master_state()) -> +-spec(sync_mirrors/3 :: (stats_fun(), stats_fun(), master_state()) -> {'ok', master_state()} | {stop, any(), master_state()}). -endif. @@ -126,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors(State = #state { name = QName, +sync_mirrors(HandleInfo, EmitStats, + State = #state { name = QName, gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -140,7 +142,8 @@ sync_mirrors(State = #state { name = QName, Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, - case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of + case rabbit_mirror_queue_sync:master_go( + Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; {sync_died, R, BQS1} -> Log("~p", [R]), {ok, S(BQS1)}; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 10a74cc9..e3f254e4 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/5, slave/7]). +-export([master_prepare/3, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -59,7 +59,10 @@ -type(bqs() :: any()). -spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). --spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) -> +-spec(master_go/7 :: (pid(), reference(), log_fun(), + rabbit_mirror_queue_master:stats_fun(), + rabbit_mirror_queue_master:stats_fun(), + bq(), bqs()) -> {'already_synced', bqs()} | {'ok', bqs()} | {'shutdown', any(), bqs()} | {'sync_died', any(), bqs()}). @@ -78,12 +81,13 @@ master_prepare(Ref, Log, SPids) -> MPid = self(), spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, BQ, BQS) -> - Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, +master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> + Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; - {ready, Syncer} -> master_go0(Args, BQ, BQS) + {ready, Syncer} -> EmitStats({syncing, 0}), + master_go0(Args, BQ, BQS) end. master_go0(Args, BQ, BQS) -> @@ -95,12 +99,15 @@ master_go0(Args, BQ, BQS) -> {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> +master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, + {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of - true -> Log("~p messages", [I]), + true -> EmitStats({syncing, I}), + Log("~p messages", [I]), erlang:now(); false -> Last end, + HandleInfo({syncing, I}), receive {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age) @@ -108,24 +115,31 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> ok end, receive + {'$gen_call', From, + cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), + gen_server2:reply(From, ok), + {stop, cancelled}; {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. -master_done({Syncer, Ref, _Log, Parent}, BQS) -> +master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) -> receive - {next, Ref} -> unlink(Syncer), - Syncer ! {done, Ref}, - receive {'EXIT', Syncer, _} -> ok - after 0 -> ok - end, + {next, Ref} -> stop_syncer(Syncer, {done, Ref}), {ok, BQS}; {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS} end. +stop_syncer(Syncer, Msg) -> + unlink(Syncer), + Syncer ! Msg, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end. + %% Master %% --------------------------------------------------------------------------- %% Syncer @@ -158,6 +172,11 @@ syncer_loop(Ref, MPid, SPids) -> SPid ! {sync_msg, Ref, Msg, MsgProps} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + ok; {done, Ref} -> [SPid ! {sync_complete, Ref} || SPid <- SPids] end. |