diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-12-30 18:34:11 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-12-30 18:34:11 +0000 |
commit | 2a65b5a33f6d7d799d9960e6586d03b42ba9f31d (patch) | |
tree | 4f86587cbb1ac745d555499d24b6b7ff0884a5ef | |
parent | 66033c0ae9b6c79ac34dcf6859dc31fd20b802e5 (diff) | |
parent | 46fddb9b1b910869f90480b4fb8549b26fca0cf4 (diff) | |
download | rabbitmq-server-2a65b5a33f6d7d799d9960e6586d03b42ba9f31d.tar.gz |
merge bug24407 into bug25345
...and move Info{Push,Pull} into Args, where they belong
-rw-r--r-- | docs/rabbitmqctl.1.xml | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 40 |
6 files changed, 108 insertions, 18 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a95f7b3d..31921769 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -476,6 +476,26 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to cancel synchronisation for. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a synchronising mirrored queue to stop + synchronising itself. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> @@ -1139,6 +1159,12 @@ i.e. those which could take over from the master without message loss.</para></listitem> </varlistentry> + <varlistentry> + <term>status</term> + <listitem><para>The status of the queue. Normally + 'running', but may be different if the queue is + synchronising.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>queueinfoitem</command>s are specified then queue name and depth are diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3169948b..c49d5bee 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1, sync/2]). +-export([start_mirroring/1, stop_mirroring/1, sync/2, cancel_sync/2]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -175,6 +175,8 @@ -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync/2 :: (binary(), rabbit_types:vhost()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(cancel_sync/2 :: (binary(), rabbit_types:vhost()) -> + 'ok' | rabbit_types:error('not_mirrored')). -endif. @@ -599,6 +601,13 @@ sync(QNameBin, VHostBin) -> E -> E end. +cancel_sync(QNameBin, VHostBin) -> + QName = rabbit_misc:r(VHostBin, queue, QNameBin), + case lookup(QName) of + {ok, #amqqueue{pid = QPid}} -> delegate_call(QPid, cancel_sync_mirrors); + E -> E + end. + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 730c235e..26c0edbe 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,7 +54,8 @@ delayed_stop, queue_monitors, dlx, - dlx_routing_key + dlx_routing_key, + status }). -record(consumer, {tag, ack_required}). @@ -97,7 +98,8 @@ memory, slave_pids, synchronised_slave_pids, - backing_queue_status + backing_queue_status, + status ]). -define(CREATION_EVENT_KEYS, @@ -138,7 +140,8 @@ init(Q) -> unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = gb_trees:empty()}, + msg_id_to_channel = gb_trees:empty(), + status = running}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -164,7 +167,8 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = pmon:new(), - msg_id_to_channel = MTC}, + msg_id_to_channel = MTC, + status = running}, State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) @@ -934,6 +938,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; +i(status, #q{status = Status}) -> + Status; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> @@ -1157,8 +1163,22 @@ handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, + InfoPull = fun (Status) -> + receive {'$gen_call', From, {info, Items}} -> + Infos = infos(Items, State#q{status = Status}), + gen_server2:reply(From, {ok, Infos}) + after 0 -> + ok + end + end, + InfoPush = fun (Status) -> + rabbit_event:if_enabled( + State, #q.stats_timer, + fun() -> emit_stats(State#q{status = Status}) end) + end, case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors( + InfoPull, InfoPush, BQS) of {shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)}; {Result, BQS1} -> reply(Result, S(BQS1)) end; @@ -1168,6 +1188,10 @@ handle_call(sync_mirrors, _From, handle_call(sync_mirrors, _From, State) -> reply({error, not_mirrored}, State); +%% By definition if we get this message here we do not have to do anything. +handle_call(cancel_sync_mirrors, _From, State) -> + reply({error, not_syncing}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 819435ee..148c839b 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -51,6 +51,7 @@ {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, {sync_queue, [?VHOST_DEF]}, + {cancel_sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -287,6 +288,12 @@ action(sync_queue, Node, [Queue], Opts, Inform) -> rpc_call(Node, rabbit_amqqueue, sync, [list_to_binary(Queue), list_to_binary(VHost)]); +action(cancel_sync_queue, Node, [Queue], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Stopping synchronising queue ~s in ~s", [Queue, VHost]), + rpc_call(Node, rabbit_amqqueue, cancel_sync, + [list_to_binary(Queue), list_to_binary(VHost)]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c9b6269b..601649ef 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -28,7 +28,7 @@ -export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). -behaviour(rabbit_backing_queue). @@ -127,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors(State = #state { name = QName, +sync_mirrors(InfoPull, InfoPush, + State = #state { name = QName, gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -141,7 +142,8 @@ sync_mirrors(State = #state { name = QName, Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, - case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of + case rabbit_mirror_queue_sync:master_go( + Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; {sync_died, R, BQS1} -> Log("~p", [R]), {ok, S(BQS1)}; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f9502219..5f0307fc 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/5, slave/7]). +-export([master_prepare/3, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -59,12 +59,13 @@ master_prepare(Ref, Log, SPids) -> MPid = self(), spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, BQ, BQS) -> - Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, +master_go(Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) -> + Args = {Syncer, Ref, Log, InfoPull, InfoPush, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; - {ready, Syncer} -> master_go0(Args, BQ, BQS) + {ready, Syncer} -> InfoPush({syncing, 0}), + master_go0(Args, BQ, BQS) end. master_go0(Args, BQ, BQS) -> @@ -76,12 +77,15 @@ master_go0(Args, BQ, BQS) -> {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> +master_send(Msg, MsgProps, {Syncer, Ref, Log, InfoPull, InfoPush, Parent}, + {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of - true -> Log("~p messages", [I]), + true -> InfoPush({syncing, I}), + Log("~p messages", [I]), erlang:now(); false -> Last end, + InfoPull({syncing, I}), receive {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age) @@ -89,6 +93,14 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> ok end, receive + {'$gen_call', From, + cancel_sync_mirrors} -> unlink(Syncer), + Syncer ! {cancel, Ref}, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end, + gen_server2:reply(From, ok), + {stop, cancelled}; {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; @@ -121,8 +133,16 @@ syncer(Ref, Log, MPid, SPids) -> SPidsMRefs1 -> MPid ! {ready, self()}, Log("~p to sync", [[rabbit_misc:pid_to_string(S) || {S, _} <- SPidsMRefs1]]), - SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), - foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3) + case syncer_loop({Ref, MPid}, SPidsMRefs1) of + {done, SPidsMRefs2} -> + foreach_slave(SPidsMRefs2, Ref, + fun sync_send_complete/3); + cancelled -> + %% We don't tell the slaves we will die + %% - so when we do they interpret that + %% as a failure, which is what we want. + ok + end end. syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> @@ -135,8 +155,10 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> SPid ! {sync_msg, Ref, Msg, MsgProps} end || {SPid, _} <- SPidsMRefs1], syncer_loop(Args, SPidsMRefs1); + {cancel, Ref} -> + cancelled; {done, Ref} -> - SPidsMRefs + {done, SPidsMRefs} end. wait_for_credit(SPidsMRefs, Ref) -> |