summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-12-30 18:34:11 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-12-30 18:34:11 +0000
commit2a65b5a33f6d7d799d9960e6586d03b42ba9f31d (patch)
tree4f86587cbb1ac745d555499d24b6b7ff0884a5ef
parent66033c0ae9b6c79ac34dcf6859dc31fd20b802e5 (diff)
parent46fddb9b1b910869f90480b4fb8549b26fca0cf4 (diff)
downloadrabbitmq-server-2a65b5a33f6d7d799d9960e6586d03b42ba9f31d.tar.gz
merge bug24407 into bug25345
...and move Info{Push,Pull} into Args, where they belong
-rw-r--r--docs/rabbitmqctl.1.xml26
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_control_main.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_sync.erl40
6 files changed, 108 insertions, 18 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a95f7b3d..31921769 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -476,6 +476,26 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cancel_sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to cancel synchronisation for.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a synchronising mirrored queue to stop
+ synchronising itself.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
@@ -1139,6 +1159,12 @@
i.e. those which could take over from the master without
message loss.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>status</term>
+ <listitem><para>The status of the queue. Normally
+ 'running', but may be different if the queue is
+ synchronising.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3169948b..c49d5bee 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1, sync/2]).
+-export([start_mirroring/1, stop_mirroring/1, sync/2, cancel_sync/2]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -175,6 +175,8 @@
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-spec(sync/2 :: (binary(), rabbit_types:vhost()) ->
'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(cancel_sync/2 :: (binary(), rabbit_types:vhost()) ->
+ 'ok' | rabbit_types:error('not_mirrored')).
-endif.
@@ -599,6 +601,13 @@ sync(QNameBin, VHostBin) ->
E -> E
end.
+cancel_sync(QNameBin, VHostBin) ->
+ QName = rabbit_misc:r(VHostBin, queue, QNameBin),
+ case lookup(QName) of
+ {ok, #amqqueue{pid = QPid}} -> delegate_call(QPid, cancel_sync_mirrors);
+ E -> E
+ end.
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 730c235e..26c0edbe 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,7 +54,8 @@
delayed_stop,
queue_monitors,
dlx,
- dlx_routing_key
+ dlx_routing_key,
+ status
}).
-record(consumer, {tag, ack_required}).
@@ -97,7 +98,8 @@
memory,
slave_pids,
synchronised_slave_pids,
- backing_queue_status
+ backing_queue_status,
+ status
]).
-define(CREATION_EVENT_KEYS,
@@ -138,7 +140,8 @@ init(Q) ->
unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
+ msg_id_to_channel = gb_trees:empty(),
+ status = running},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -164,7 +167,8 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = MTC},
+ msg_id_to_channel = MTC,
+ status = running},
State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
@@ -934,6 +938,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
+i(status, #q{status = Status}) ->
+ Status;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1157,8 +1163,22 @@ handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ InfoPull = fun (Status) ->
+ receive {'$gen_call', From, {info, Items}} ->
+ Infos = infos(Items, State#q{status = Status}),
+ gen_server2:reply(From, {ok, Infos})
+ after 0 ->
+ ok
+ end
+ end,
+ InfoPush = fun (Status) ->
+ rabbit_event:if_enabled(
+ State, #q.stats_timer,
+ fun() -> emit_stats(State#q{status = Status}) end)
+ end,
case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(
+ InfoPull, InfoPush, BQS) of
{shutdown, Reason, BQS1} -> {stop, Reason, S(BQS1)};
{Result, BQS1} -> reply(Result, S(BQS1))
end;
@@ -1168,6 +1188,10 @@ handle_call(sync_mirrors, _From,
handle_call(sync_mirrors, _From, State) ->
reply({error, not_mirrored}, State);
+%% By definition if we get this message here we do not have to do anything.
+handle_call(cancel_sync_mirrors, _From, State) ->
+ reply({error, not_syncing}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 819435ee..148c839b 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -51,6 +51,7 @@
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
{sync_queue, [?VHOST_DEF]},
+ {cancel_sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -287,6 +288,12 @@ action(sync_queue, Node, [Queue], Opts, Inform) ->
rpc_call(Node, rabbit_amqqueue, sync,
[list_to_binary(Queue), list_to_binary(VHost)]);
+action(cancel_sync_queue, Node, [Queue], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Stopping synchronising queue ~s in ~s", [Queue, VHost]),
+ rpc_call(Node, rabbit_amqqueue, cancel_sync,
+ [list_to_binary(Queue), list_to_binary(VHost)]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c9b6269b..601649ef 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -28,7 +28,7 @@
-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
@@ -127,7 +127,8 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors(State = #state { name = QName,
+sync_mirrors(InfoPull, InfoPush,
+ State = #state { name = QName,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -141,7 +142,8 @@ sync_mirrors(State = #state { name = QName,
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
- case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
+ case rabbit_mirror_queue_sync:master_go(
+ Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f9502219..5f0307fc 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/5, slave/7]).
+-export([master_prepare/3, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -59,12 +59,13 @@ master_prepare(Ref, Log, SPids) ->
MPid = self(),
spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
-master_go(Syncer, Ref, Log, BQ, BQS) ->
- Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+master_go(Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, InfoPull, InfoPush, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
- {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ {ready, Syncer} -> InfoPush({syncing, 0}),
+ master_go0(Args, BQ, BQS)
end.
master_go0(Args, BQ, BQS) ->
@@ -76,12 +77,15 @@ master_go0(Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
+master_send(Msg, MsgProps, {Syncer, Ref, Log, InfoPull, InfoPush, Parent},
+ {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
- true -> Log("~p messages", [I]),
+ true -> InfoPush({syncing, I}),
+ Log("~p messages", [I]),
erlang:now();
false -> Last
end,
+ InfoPull({syncing, I}),
receive
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age)
@@ -89,6 +93,14 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
ok
end,
receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> unlink(Syncer),
+ Syncer ! {cancel, Ref},
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end,
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
{next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
@@ -121,8 +133,16 @@ syncer(Ref, Log, MPid, SPids) ->
SPidsMRefs1 -> MPid ! {ready, self()},
Log("~p to sync", [[rabbit_misc:pid_to_string(S) ||
{S, _} <- SPidsMRefs1]]),
- SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
- foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3)
+ case syncer_loop({Ref, MPid}, SPidsMRefs1) of
+ {done, SPidsMRefs2} ->
+ foreach_slave(SPidsMRefs2, Ref,
+ fun sync_send_complete/3);
+ cancelled ->
+ %% We don't tell the slaves we will die
+ %% - so when we do they interpret that
+ %% as a failure, which is what we want.
+ ok
+ end
end.
syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
@@ -135,8 +155,10 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
SPid ! {sync_msg, Ref, Msg, MsgProps}
end || {SPid, _} <- SPidsMRefs1],
syncer_loop(Args, SPidsMRefs1);
+ {cancel, Ref} ->
+ cancelled;
{done, Ref} ->
- SPidsMRefs
+ {done, SPidsMRefs}
end.
wait_for_credit(SPidsMRefs, Ref) ->