summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-24 17:38:14 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-24 17:38:14 +0100
commitf0cb7e165ced609d9909b3ef9b528442f63dc658 (patch)
treecad4ddd92350d58c0995a31bca853ec6dce1e155
parentb4963dd7aae39e8f8c5306b9d39af6deb7623e63 (diff)
downloadrabbitmq-server-f0cb7e165ced609d9909b3ef9b528442f63dc658.tar.gz
Permit dropping nodes of mirrored queues. This turns out to be much much messier than I'd hoped as the principle problem becomes ensuring an add after a drop works. Normally, an add would only occur on a node that has not seen said queue before: if it had, in a previous lifetime, then the booting of rabbit would have ripped out any locally stored files regarding that queue. But now this step may be missed. Having tried many different approaches, the simplest became expanding bq so that the shutdown reason is exposed to the BQ. Thus both slave and master can then detect that they're being dropped, and, in the case of master, it can convert a bq:terminate to a bq:delete_and_terminate. Every other approach I could think of turned out worse.
-rw-r--r--docs/rabbitmqctl.1.xml43
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl27
-rw-r--r--src/rabbit_mirror_queue_misc.erl83
-rw-r--r--src/rabbit_mirror_queue_slave.erl11
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl6
11 files changed, 147 insertions, 59 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 62869158..908ca973 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1325,6 +1325,49 @@
</variablelist>
</refsect2>
+
+ <refsect2>
+ <title>Mirrored Queue Management</title>
+ <para>
+ Mirrored queues can have slaves dynamically added, and slaves
+ or the master dynamically dropped. Refer to the <ulink
+ url="http://www.rabbitmq.com/ha.html">High Availability
+ guide</ulink> for further details about mirrored queues in
+ general.
+ </para>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>add_queue_mirror</command> <arg choice="req"><replaceable>queue_name</replaceable></arg> <arg choice="req"><replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Attempts to add a mirror of the queue
+ <command>queue_name</command> on
+ <command>node</command>. This will only succeed if the
+ queue was declared a mirrored queue and if there is no
+ mirror of the queue already on the node. If it succeeds,
+ the new mirror will start off as an empty slave.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>drop_queue_mirror</command> <arg choice="req"><replaceable>queue_name</replaceable></arg> <arg choice="req"><replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Attempts to drop a mirror of the queue
+ <command>queue_name</command> on
+ <command>node</command>. This will only succeed if the
+ queue was declared a mirrored queue and if there is a
+ mirror of the queue already on the node. If the node
+ contains the master of the queue, a slave on some other
+ node will be promoted to become the new master. It is
+ not permitted to drop the only node of a mirrored-queue.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
</refsect1>
</refentry>
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1c2b94e2..295d9039 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -32,8 +32,8 @@
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
--spec(terminate/1 :: (state()) -> state()).
--spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(terminate/2 :: (any(), state()) -> state()).
+-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/4 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ea31ec13..b1c95338 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -145,16 +145,16 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
State, Deliveries).
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc5..217ad3eb 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1140a2f0..b4b6255e 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -244,6 +244,12 @@ action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
rpc_call(Node, rabbit_mirror_queue_misc, add_slave,
[VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+action(drop_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
+ Inform("Dropping mirror of queue ~p on node ~p~n", [Queue, MirrorNode]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ rpc_call(Node, rabbit_mirror_queue_misc, drop_slave,
+ [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+
action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 99de1b18..9bd8565f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_master).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
@@ -106,17 +106,28 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
-terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+terminate({shutdown, dropped} = Reason,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination - this node has been explicitly
+ %% dropped. Normally, non-durable queues would be tidied up on
+ %% startup, but there's a possibility that we will be added back
+ %% in without this node being restarted. Thus we must do the full
+ %% blown delete_and_terminate now, but only locally: we do not
+ %% broadcast delete_and_terminate.
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 };
+terminate(Reason,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
%% shouldn't be deleted. Most likely safe shutdown of this
%% node. Thus just let some other slave take over.
- State #state { backing_queue_state = BQ:terminate(BQS) }.
+ State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
-delete_and_terminate(State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, delete_and_terminate),
- State #state { backing_queue_state = BQ:delete_and_terminate(BQS),
+delete_and_terminate(Reason, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
purge(State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 5f180c5e..046d3380 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,7 +16,8 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, add_slave/2, add_slave/3, on_node_up/0]).
+-export([remove_from_queue/2, on_node_up/0,
+ drop_slave/2, drop_slave/3, add_slave/2, add_slave/3]).
-include("rabbit.hrl").
@@ -59,36 +60,6 @@ remove_from_queue(QueueName, DeadPids) ->
end
end).
-add_slave(VHostPath, QueueName, MirrorNode) ->
- add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
-
-add_slave(Queue, MirrorNode) ->
- rabbit_amqqueue:with(
- Queue,
- fun (#amqqueue { arguments = Args, name = Name,
- pid = QPid, mirror_pids = MPids } = Q) ->
- case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
- undefined ->
- ok;
- _ ->
- case [MirrorNode || Pid <- [QPid | MPids],
- node(Pid) =:= MirrorNode] of
- [] ->
- Result =
- rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]),
- rabbit_log:info("Adding slave node for ~s: ~p~n",
- [rabbit_misc:rs(Name), Result]),
- case Result of
- {ok, _Pid} -> ok;
- _ -> Result
- end;
- [_] ->
- {error, queue_already_mirrored_on_node}
- end
- end
- end).
-
on_node_up() ->
Qs =
rabbit_misc:execute_mnesia_transaction(
@@ -113,3 +84,53 @@ on_node_up() ->
end),
[add_slave(Q, node()) || Q <- Qs],
ok.
+
+drop_slave(VHostPath, QueueName, MirrorNode) ->
+ drop_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+drop_slave(Queue, MirrorNode) ->
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids }) ->
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ {error, {queue_not_mirrored_on_node, MirrorNode}};
+ [QPid | MPids] ->
+ {error, cannot_drop_only_mirror};
+ [Pid] ->
+ rabbit_log:info("Dropping slave node on ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
+ exit(Pid, {shutdown, dropped}),
+ ok
+ end
+ end).
+
+add_slave(VHostPath, QueueName, MirrorNode) ->
+ add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+add_slave(Queue, MirrorNode) ->
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids } = Q) ->
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ [] -> Result = rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info(
+ "Adding slave node for ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, Result]),
+ case Result of
+ {ok, _Pid} -> ok;
+ _ -> Result
+ end;
+ [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ end
+ end).
+
+if_mirrored_queue(Queue, Fun) ->
+ rabbit_amqqueue:with(
+ Queue, fun (#amqqueue { arguments = Args } = Q) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> ok;
+ _ -> Fun(Q)
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c7ff4480..666687a5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -226,6 +226,9 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
noreply(local_sender_death(ChPid, State));
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -238,6 +241,10 @@ terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
ok;
+terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% See rabbit_mirror_queue_master:terminate/2
+ BQ:delete_and_terminate(R, BQS);
terminate(Reason, #state { q = Q,
gm = GM,
backing_queue = BQ,
@@ -839,10 +846,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
-process_instruction(delete_and_terminate,
+process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- BQ:delete_and_terminate(BQS),
+ BQ:delete_and_terminate(Reason, BQS),
{stop, State #state { backing_queue_state = undefined }}.
msg_ids_to_acktags(MsgIds, MA) ->
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index 25ee1fd0..2ce5941e 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -40,7 +40,7 @@
start() ->
{ok, _} =
- supervisor:start_child(
+ supervisor2:start_child(
rabbit_sup,
{rabbit_mirror_queue_slave_sup,
{rabbit_mirror_queue_slave_sup, start_link, []},
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdff..3f4aa54e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43..a167cca0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.