summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-04 12:24:44 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-04 12:24:44 +0000
commit1d15ffde6cf646d05861d6cd71d4c4519f36c798 (patch)
treee52afb2d50f8d0ffbf59c55ff8b171c7a9380225
parentf6f0d2fe572ae7697d91139396d634864cecdf04 (diff)
parent4aa86fc039e0fdd7adf41b3b4e50b821c753a129 (diff)
downloadrabbitmq-server-1d15ffde6cf646d05861d6cd71d4c4519f36c798.tar.gz
Merge in bug24407.
-rw-r--r--docs/rabbitmqctl.1.xml26
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_control_main.erl15
-rw-r--r--src/rabbit_mirror_queue_master.erl13
-rw-r--r--src/rabbit_mirror_queue_sync.erl45
6 files changed, 115 insertions, 26 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a95f7b3d..31921769 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -476,6 +476,26 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to cancel synchronisation for.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a synchronising mirrored queue to stop
+ synchronising itself.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
@@ -1139,6 +1159,12 @@
i.e. those which could take over from the master without
message loss.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>status</term>
+ <listitem><para>The status of the queue. Normally
+ 'running', but may be different if the queue is
+ synchronising.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fbe146e8..2b5da0cb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,8 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
+ cancel_sync_mirrors/1]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -175,6 +176,8 @@
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-spec(sync_mirrors/1 :: (pid()) ->
'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(cancel_sync_mirrors/1 :: (pid()) ->
+ 'ok' | rabbit_types:error('not_mirrored')).
-endif.
@@ -600,7 +603,8 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
-sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6b065b96..854ba640 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ status
}).
-record(consumer, {tag, ack_required}).
@@ -97,7 +98,8 @@
memory,
slave_pids,
synchronised_slave_pids,
- backing_queue_status
+ backing_queue_status,
+ status
]).
-define(CREATION_EVENT_KEYS,
@@ -138,7 +140,8 @@ init(Q) ->
unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
+ msg_id_to_channel = gb_trees:empty(),
+ status = running},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -164,7 +167,8 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = MTC},
+ msg_id_to_channel = MTC,
+ status = running},
State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
@@ -926,6 +930,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(status, #q{status = Status}) ->
+ Status;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1149,8 +1155,22 @@ handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ HandleInfo = fun (Status) ->
+ receive {'$gen_call', From, {info, Items}} ->
+ Infos = infos(Items, State#q{status = Status}),
+ gen_server2:reply(From, {ok, Infos})
+ after 0 ->
+ ok
+ end
+ end,
+ EmitStats = fun (Status) ->
+ rabbit_event:if_enabled(
+ State, #q.stats_timer,
+ fun() -> emit_stats(State#q{status = Status}) end)
+ end,
case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(
+ HandleInfo, EmitStats, BQS) of
{ok, BQS1} -> reply(ok, S(BQS1));
{stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
end;
@@ -1160,6 +1180,10 @@ handle_call(sync_mirrors, _From,
handle_call(sync_mirrors, _From, State) ->
reply({error, not_mirrored}, State);
+%% By definition if we get this message here we do not have to do anything.
+handle_call(cancel_sync_mirrors, _From, State) ->
+ reply({error, not_syncing}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 70ca6177..7f43c228 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,7 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, sync_queue/1]).
+-export([start/0, stop/0, action/5, sync_queue/1, cancel_sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -51,6 +51,7 @@
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
{sync_queue, [?VHOST_DEF]},
+ {cancel_sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -287,6 +288,12 @@ action(sync_queue, Node, [Q], Opts, Inform) ->
rpc_call(Node, rabbit_control_main, sync_queue,
[rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]);
+action(cancel_sync_queue, Node, [Q], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Stopping synchronising queue ~s in ~s", [Q, VHost]),
+ rpc_call(Node, rabbit_control_main, cancel_sync_queue,
+ [rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
@@ -524,6 +531,12 @@ sync_queue(Q) ->
rabbit_amqqueue:with(
Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end).
+cancel_sync_queue(Q) ->
+ rabbit_amqqueue:with(
+ Q, fun(#amqqueue{pid = QPid}) ->
+ rabbit_amqqueue:cancel_sync_mirrors(QPid)
+ end).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0df7ea1c..b5f72cad 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -28,7 +28,7 @@
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
@@ -46,10 +46,11 @@
-ifdef(use_specs).
--export_type([death_fun/0, depth_fun/0]).
+-export_type([death_fun/0, depth_fun/0, stats_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
+-type(stats_fun() :: fun ((any()) -> 'ok')).
-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
gm :: pid(),
coordinator :: pid(),
@@ -68,7 +69,7 @@
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
--spec(sync_mirrors/1 :: (master_state()) ->
+-spec(sync_mirrors/3 :: (stats_fun(), stats_fun(), master_state()) ->
{'ok', master_state()} | {stop, any(), master_state()}).
-endif.
@@ -126,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors(State = #state { name = QName,
+sync_mirrors(HandleInfo, EmitStats,
+ State = #state { name = QName,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -140,7 +142,8 @@ sync_mirrors(State = #state { name = QName,
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
- case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
+ case rabbit_mirror_queue_sync:master_go(
+ Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 10a74cc9..e3f254e4 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/5, slave/7]).
+-export([master_prepare/3, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -59,7 +59,10 @@
-type(bqs() :: any()).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
--spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) ->
+-spec(master_go/7 :: (pid(), reference(), log_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'shutdown', any(), bqs()} |
{'sync_died', any(), bqs()}).
@@ -78,12 +81,13 @@ master_prepare(Ref, Log, SPids) ->
MPid = self(),
spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
-master_go(Syncer, Ref, Log, BQ, BQS) ->
- Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
- {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ {ready, Syncer} -> EmitStats({syncing, 0}),
+ master_go0(Args, BQ, BQS)
end.
master_go0(Args, BQ, BQS) ->
@@ -95,12 +99,15 @@ master_go0(Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
+master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
+ {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
- true -> Log("~p messages", [I]),
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
erlang:now();
false -> Last
end,
+ HandleInfo({syncing, I}),
receive
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age)
@@ -108,24 +115,31 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
ok
end,
receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
{next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
-master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
receive
- {next, Ref} -> unlink(Syncer),
- Syncer ! {done, Ref},
- receive {'EXIT', Syncer, _} -> ok
- after 0 -> ok
- end,
+ {next, Ref} -> stop_syncer(Syncer, {done, Ref}),
{ok, BQS};
{'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
end.
+stop_syncer(Syncer, Msg) ->
+ unlink(Syncer),
+ Syncer ! Msg,
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end.
+
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -158,6 +172,11 @@ syncer_loop(Ref, MPid, SPids) ->
SPid ! {sync_msg, Ref, Msg, MsgProps}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ ok;
{done, Ref} ->
[SPid ! {sync_complete, Ref} || SPid <- SPids]
end.