summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-27 11:51:14 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-27 11:51:14 +0100
commit4a23f3fbd6099501145f2502b4da2773f8492687 (patch)
tree1ecbf33a4c03407a1eb5783ae2c7d0a97f1f31e8
parent137c0553856888469b992df0ca537249bdb44c84 (diff)
parentcf7d7556ceb76ee2bdaa4a31cdd3bef129bac920 (diff)
downloadrabbitmq-server-4a23f3fbd6099501145f2502b4da2773f8492687.tar.gz
Merging default into bug23554
-rw-r--r--docs/rabbitmqctl.1.xml43
-rw-r--r--include/rabbit.hrl2
-rw-r--r--include/rabbit_backing_queue_spec.hrl4
-rw-r--r--src/gm_soak_test.erl8
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_amqqueue.erl54
-rw-r--r--src/rabbit_amqqueue_process.erl70
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_control.erl18
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl424
-rw-r--r--src/rabbit_mirror_queue_master.erl380
-rw-r--r--src/rabbit_mirror_queue_misc.erl136
-rw-r--r--src/rabbit_mirror_queue_slave.erl873
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl60
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl35
-rw-r--r--src/rabbit_variable_queue.erl6
20 files changed, 2086 insertions, 64 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ffa01894..b825a1d0 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1317,6 +1317,49 @@
</variablelist>
</refsect2>
+
+ <refsect2>
+ <title>Mirrored Queue Management</title>
+ <para>
+ Mirrored queues can have slaves dynamically added, and slaves
+ or the master dynamically dropped. Refer to the <ulink
+ url="http://www.rabbitmq.com/ha.html">High Availability
+ guide</ulink> for further details about mirrored queues in
+ general.
+ </para>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>add_queue_mirror</command> <arg choice="req"><replaceable>queue_name</replaceable></arg> <arg choice="req"><replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Attempts to add a mirror of the queue
+ <command>queue_name</command> on
+ <command>node</command>. This will only succeed if the
+ queue was declared a mirrored queue and if there is no
+ mirror of the queue already on the node. If it succeeds,
+ the new mirror will start off as an empty slave.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>drop_queue_mirror</command> <arg choice="req"><replaceable>queue_name</replaceable></arg> <arg choice="req"><replaceable>node</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Attempts to drop a mirror of the queue
+ <command>queue_name</command> on
+ <command>node</command>. This will only succeed if the
+ queue was declared a mirrored queue and if there is a
+ mirror of the queue already on the node. If the node
+ contains the master of the queue, a slave on some other
+ node will be promoted to become the new master. It is
+ not permitted to drop the only node of a mirrored-queue.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect2>
</refsect1>
</refentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index db4773b8..1388f3c4 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -46,7 +46,7 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid}).
+ arguments, pid, mirror_pids}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1c2b94e2..295d9039 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -32,8 +32,8 @@
-spec(stop/0 :: () -> 'ok').
-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
async_callback(), sync_callback()) -> state()).
--spec(terminate/1 :: (state()) -> state()).
--spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(terminate/2 :: (any(), state()) -> state()).
+-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/4 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state()) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index dae42ac7..5e5a3a5a 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) ->
{ok, Num} -> ok;
{ok, Num1} when Num < Num1 ->
exit({{from, From},
- {duplicate_delivery_of, Num1},
- {expecting, Num}});
+ {duplicate_delivery_of, Num},
+ {expecting, Num1}});
{ok, Num1} ->
exit({{from, From},
- {missing_delivery_of, Num},
- {received_early, Num1}});
+ {received_early, Num},
+ {expecting, Num1}});
error ->
exit({{from, From},
{received_premature_delivery, Num}})
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e6e80b4a..02477b65 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -39,6 +39,12 @@
{requires, pre_boot},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_registry]}},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
{requires, file_handle_cache},
@@ -60,13 +66,6 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_registry,
- [{description, "plugin registry"},
- {mfa, {rabbit_sup, start_child,
- [rabbit_registry]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d029ff1d..7f46b870 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -27,6 +27,8 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
+
%% internal
-export([internal_declare/2, internal_delete/1,
@@ -193,12 +195,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ mirror_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -253,8 +256,13 @@ lookup(Name) ->
with(Name, F, E) ->
case lookup(Name) of
- {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
- {error, not_found} -> E()
+ {ok, Q = #amqqueue{mirror_pids = []}} ->
+ rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
+ {ok, Q} ->
+ E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
+ rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
+ {error, not_found} ->
+ E()
end.
with(Name, F) ->
@@ -291,8 +299,9 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
- rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>]).
+ rabbit_misc:assert_args_equivalence(
+ Args, RequiredArgs, QueueName,
+ [<<"x-expires">>, <<"x-message-ttl">>, <<"x-mirror">>]).
check_declare_arguments(QueueName, Args) ->
[case Fun(rabbit_misc:table_lookup(Args, Key)) of
@@ -303,7 +312,8 @@ check_declare_arguments(QueueName, Args) ->
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
[{<<"x-expires">>, fun check_integer_argument/1},
- {<<"x-message-ttl">>, fun check_integer_argument/1}]],
+ {<<"x-message-ttl">>, fun check_integer_argument/1},
+ {<<"x-mirror">>, fun check_array_of_longstr_argument/1}]],
ok.
check_integer_argument(undefined) ->
@@ -316,6 +326,18 @@ check_integer_argument({Type, Val}) when Val > 0 ->
check_integer_argument({_Type, Val}) ->
{error, {value_zero_or_less, Val}}.
+check_array_of_longstr_argument(undefined) ->
+ ok;
+check_array_of_longstr_argument({array, Array}) ->
+ case lists:all(fun ({longstr, _NodeName}) -> true;
+ (_) -> false
+ end, Array) of
+ true -> ok;
+ false -> {error, {array_contains_non_longstrs, Array}}
+ end;
+check_array_of_longstr_argument({Type, _Val}) ->
+ {error, {unacceptable_type, Type}}.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -465,7 +487,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ mirror_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node])),
rabbit_binding:process_deletions(
@@ -478,11 +501,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ mirror_pids = []}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8091e2c2..b1c95338 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,9 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
-%% Queue's state
+-export([init_with_backing_queue_state/7]).
+
+% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -72,7 +74,8 @@
messages,
consumers,
memory,
- backing_queue_status
+ backing_queue_status,
+ mirror_pids
]).
-define(CREATION_EVENT_KEYS,
@@ -97,12 +100,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -115,16 +117,44 @@ init(Q) ->
msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(shutdown, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
- terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
-terminate(_Reason, State = #q{backing_queue = BQ}) ->
+init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
+ RateTRef, AckTags, Deliveries, MTC) ->
+ ?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ case Owner of
+ none -> ok;
+ _ -> erlang:monitor(process, Owner)
+ end,
+ State = requeue_and_run(
+ AckTags,
+ process_args(
+ #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = MTC})),
+ lists:foldl(
+ fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
+ State, Deliveries).
+
+terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
+terminate(Reason, State = #q{backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
queue_deleted, [{pid, self()}]),
- BQS1 = BQ:delete_and_terminate(BQS),
+ BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
rabbit_amqqueue:internal_delete(qname(State)),
@@ -226,6 +256,13 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
+backing_queue_module(#amqqueue{arguments = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ _Nodes -> rabbit_mirror_queue_master
+ end.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
@@ -768,6 +805,9 @@ i(memory, _) ->
M;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
+i(mirror_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name),
+ MPids;
i(Item, _) ->
throw({bad_argument, Item}).
@@ -803,11 +843,11 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {run_backing_queue, _Mod, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index addaabc5..217ad3eb 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -49,11 +49,11 @@ behaviour_info(callbacks) ->
{init, 4},
%% Called on queue shutdown when queue isn't being deleted.
- {terminate, 1},
+ {terminate, 2},
%% Called when the queue is terminating and needs to delete all
%% its content.
- {delete_and_terminate, 1},
+ {delete_and_terminate, 2},
%% Remove all messages in the queue, but not messages which have
%% been fetched and are pending acks.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8172f804..117496f5 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -239,6 +239,18 @@ action(list_queues, Node, Args, Opts, Inform) ->
[VHostArg, ArgAtoms]),
ArgAtoms);
+action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
+ Inform("Adding mirror of queue ~p on node ~p~n", [Queue, MirrorNode]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ rpc_call(Node, rabbit_mirror_queue_misc, add_slave,
+ [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+
+action(drop_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) ->
+ Inform("Dropping mirror of queue ~p on node ~p~n", [Queue, MirrorNode]),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ rpc_call(Node, rabbit_mirror_queue_misc, drop_slave,
+ [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]);
+
action(list_exchanges, Node, Args, Opts, Inform) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
@@ -372,6 +384,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([T | _] = Value)
+ when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
+ is_list(T) ->
+ "[" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- Value])) ++ "]";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
new file mode 100644
index 00000000..2727c1d0
--- /dev/null
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -0,0 +1,424 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_coordinator).
+
+-export([start_link/3, get_gm/1, ensure_monitoring/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm,
+ monitors,
+ death_fun
+ }).
+
+-define(ONE_SECOND, 1000).
+
+%%----------------------------------------------------------------------------
+%%
+%% Mirror Queues
+%%
+%% A queue with mirrors consists of the following:
+%%
+%% #amqqueue{ pid, mirror_pids }
+%% | |
+%% +----------+ +-------+--------------+-----------...etc...
+%% | | |
+%% V V V
+%% amqqueue_process---+ slave-----+ slave-----+ ...etc...
+%% | BQ = master----+ | | BQ = vq | | BQ = vq |
+%% | | BQ = vq | | +-+-------+ +-+-------+
+%% | +-+-------+ | | |
+%% +-++-----|---------+ | | (some details elided)
+%% || | | |
+%% || coordinator-+ | |
+%% || +-+---------+ | |
+%% || | | |
+%% || gm-+ -- -- -- -- gm-+- -- -- -- gm-+- -- --...etc...
+%% || +--+ +--+ +--+
+%% ||
+%% consumers
+%%
+%% The master is merely an implementation of bq, and thus is invoked
+%% through the normal bq interface by the amqqueue_process. The slaves
+%% meanwhile are processes in their own right (as is the
+%% coordinator). The coordinator and all slaves belong to the same gm
+%% group. Every member of a gm group receives messages sent to the gm
+%% group. Because the master is the bq of amqqueue_process, it doesn't
+%% have sole control over its mailbox, and as a result, the master
+%% itself cannot be passed messages directly (well, it could by via
+%% the amqqueue:run_backing_queue_async callback but that would induce
+%% additional unnecessary loading on the master queue process), yet it
+%% needs to react to gm events, such as the death of slaves. Thus the
+%% master creates the coordinator, and it is the coordinator that is
+%% the gm callback module and event handler for the master.
+%%
+%% Consumers are only attached to the master. Thus the master is
+%% responsible for informing all slaves when messages are fetched from
+%% the bq, when they're acked, and when they're requeued.
+%%
+%% The basic goal is to ensure that all slaves performs actions on
+%% their bqs in the same order as the master. Thus the master
+%% intercepts all events going to its bq, and suitably broadcasts
+%% these events on the gm. The slaves thus receive two streams of
+%% events: one stream is via the gm, and one stream is from channels
+%% directly. Whilst the stream via gm is guaranteed to be consistently
+%% seen by all slaves, the same is not true of the stream via
+%% channels. For example, in the event of an unexpected death of a
+%% channel during a publish, only some of the mirrors may receive that
+%% publish. As a result of this problem, the messages broadcast over
+%% the gm contain published content, and thus slaves can operate
+%% successfully on messages that they only receive via the gm. The key
+%% purpose of also sending messages directly from the channels to the
+%% slaves is that without this, in the event of the death of the
+%% master, messages could be lost until a suitable slave is promoted.
+%%
+%% However, that is not the only reason. For example, if confirms are
+%% in use, then there is no guarantee that every slave will see the
+%% delivery with the same msg_seq_no. As a result, the slaves have to
+%% wait until they've seen both the publish via gm, and the publish
+%% via the channel before they have enough information to be able to
+%% perform the publish to their own bq, and subsequently issue the
+%% confirm, if necessary. Either form of publish can arrive first, and
+%% a slave can be upgraded to the master at any point during this
+%% process. Confirms continue to be issued correctly, however.
+%%
+%% Because the slave is a full process, it impersonates parts of the
+%% amqqueue API. However, it does not need to implement all parts: for
+%% example, no ack or consumer-related message can arrive directly at
+%% a slave from a channel: it is only publishes that pass both
+%% directly to the slaves and go via gm.
+%%
+%% Slaves can be added dynamically. When this occurs, there is no
+%% attempt made to sync the current contents of the master with the
+%% new slave, thus the slave will start empty, regardless of the state
+%% of the master. Thus the slave needs to be able to detect and ignore
+%% operations which are for messages it has not received: because of
+%% the strict FIFO nature of queues in general, this is
+%% straightforward - all new publishes that the new slave receives via
+%% gm should be processed as normal, but fetches which are for
+%% messages the slave has never seen should be ignored. Similarly,
+%% acks for messages the slave never fetched should be
+%% ignored. Eventually, as the master is consumed from, the messages
+%% at the head of the queue which were there before the slave joined
+%% will disappear, and the slave will become fully synced with the
+%% state of the master. The detection of the sync-status of a slave is
+%% done entirely based on length: if the slave and the master both
+%% agree on the length of the queue after the fetch of the head of the
+%% queue, then the queues must be in sync. The only other possibility
+%% is that the slave's queue is shorter, and thus the fetch should be
+%% ignored.
+%%
+%% Because acktags are issued by the bq independently, and because
+%% there is no requirement for the master and all slaves to use the
+%% same bq, all references to msgs going over gm is by msg_id. Thus
+%% upon acking, the master must convert the acktags back to msg_ids
+%% (which happens to be what bq:ack returns), then sends the msg_ids
+%% over gm, the slaves must convert the msg_ids to acktags (a mapping
+%% the slaves themselves must maintain).
+%%
+%% When the master dies, a slave gets promoted. This will be the
+%% eldest slave, and thus the hope is that that slave is most likely
+%% to be sync'd with the master. The design of gm is that the
+%% notification of the death of the master will only appear once all
+%% messages in-flight from the master have been fully delivered to all
+%% members of the gm group. Thus at this point, the slave that gets
+%% promoted cannot broadcast different events in a different order
+%% than the master for the same msgs: there is no possibility for the
+%% same msg to be processed by the old master and the new master - if
+%% it was processed by the old master then it will have been processed
+%% by the slave before the slave was promoted, and vice versa.
+%%
+%% Upon promotion, all msgs pending acks are requeued as normal, the
+%% slave constructs state suitable for use in the master module, and
+%% then dynamically changes into an amqqueue_process with the master
+%% as the bq, and the slave's bq as the master's bq. Thus the very
+%% same process that was the slave is now a full amqqueue_process.
+%%
+%% It is important that we avoid memory leaks due to the death of
+%% senders (i.e. channels) and partial publications. A sender
+%% publishing a message may fail mid way through the publish and thus
+%% only some of the mirrors will receive the message. We need the
+%% mirrors to be able to detect this and tidy up as necessary to avoid
+%% leaks. If we just had the master monitoring all senders then we
+%% would have the possibility that a sender appears and only sends the
+%% message to a few of the slaves before dying. Those slaves would
+%% then hold on to the message, assuming they'll receive some
+%% instruction eventually from the master. Thus we have both slaves
+%% and the master monitor all senders they become aware of. But there
+%% is a race: if the slave receives a DOWN of a sender, how does it
+%% know whether or not the master is going to send it instructions
+%% regarding those messages?
+%%
+%% Whilst the master monitors senders, it can't access its mailbox
+%% directly, so it delegates monitoring to the coordinator. When the
+%% coordinator receives a DOWN message from a sender, it informs the
+%% master via a callback. This allows the master to do any tidying
+%% necessary, but more importantly allows the master to broadcast a
+%% sender_death message to all the slaves, saying the sender has
+%% died. Once the slaves receive the sender_death message, they know
+%% that they're not going to receive any more instructions from the gm
+%% regarding that sender, thus they throw away any publications from
+%% the sender pending publication instructions. However, it is
+%% possible that the coordinator receives the DOWN and communicates
+%% that to the master before the master has finished receiving and
+%% processing publishes from the sender. This turns out not to be a
+%% problem: the sender has actually died, and so will not need to
+%% receive confirms or other feedback, and should further messages be
+%% "received" from the sender, the master will ask the coordinator to
+%% set up a new monitor, and will continue to process the messages
+%% normally. Slaves may thus receive publishes via gm from previously
+%% declared "dead" senders, but again, this is fine: should the slave
+%% have just thrown out the message it had received directly from the
+%% sender (due to receiving a sender_death message via gm), it will be
+%% able to cope with the publication purely from the master via gm.
+%%
+%% When a slave receives a DOWN message for a sender, if it has not
+%% received the sender_death message from the master via gm already,
+%% then it will wait 20 seconds before broadcasting a request for
+%% confirmation from the master that the sender really has died.
+%% Should a sender have only sent a publish to slaves, this allows
+%% slaves to inform the master of the previous existence of the
+%% sender. The master will thus monitor the sender, receive the DOWN,
+%% and subsequently broadcast the sender_death message, allowing the
+%% slaves to tidy up. This process can repeat for the same sender:
+%% consider one slave receives the publication, then the DOWN, then
+%% asks for confirmation of death, then the master broadcasts the
+%% sender_death message. Only then does another slave receive the
+%% publication and thus set up its monitoring. Eventually that slave
+%% too will receive the DOWN, ask for confirmation and the master will
+%% monitor the sender again, receive another DOWN, and send out
+%% another sender_death message. Given the 20 second delay before
+%% requesting death confirmation, this is highly unlikely, but it is a
+%% possibility.
+%%
+%% When the 20 second timer expires, the slave first checks to see
+%% whether it still needs confirmation of the death before requesting
+%% it. This prevents unnecessary traffic on gm as it allows one
+%% broadcast of the sender_death message to satisfy many slaves.
+%%
+%% If we consider the promotion of a slave at this point, we have two
+%% possibilities: that of the slave that has received the DOWN and is
+%% thus waiting for confirmation from the master that the sender
+%% really is down; and that of the slave that has not received the
+%% DOWN. In the first case, in the act of promotion to master, the new
+%% master will monitor again the dead sender, and after it has
+%% finished promoting itself, it should find another DOWN waiting,
+%% which it will then broadcast. This will allow slaves to tidy up as
+%% normal. In the second case, we have the possibility that
+%% confirmation-of-sender-death request has been broadcast, but that
+%% it was broadcast before the master failed, and that the slave being
+%% promoted does not know anything about that sender, and so will not
+%% monitor it on promotion. Thus a slave that broadcasts such a
+%% request, at the point of broadcasting it, recurses, setting another
+%% 20 second timer. As before, on expiry of the timer, the slaves
+%% checks to see whether it still has not received a sender_death
+%% message for the dead sender, and if not, broadcasts a death
+%% confirmation request. Thus this ensures that even when a master
+%% dies and the new slave has no knowledge of the dead sender, it will
+%% eventually receive a death confirmation request, shall monitor the
+%% dead sender, receive the DOWN and broadcast the sender_death
+%% message.
+%%
+%% The preceding commentary deals with the possibility of slaves
+%% receiving publications from senders which the master does not, and
+%% the need to prevent memory leaks in such scenarios. The inverse is
+%% also possible: a partial publication may cause only the master to
+%% receive a publication. It will then publish the message via gm. The
+%% slaves will receive it via gm, will publish it to their BQ and will
+%% set up monitoring on the sender. They will then receive the DOWN
+%% message and the master will eventually publish the corresponding
+%% sender_death message. The slave will then be able to tidy up its
+%% state as normal.
+%%
+%% We don't support transactions on mirror queues. To do so is
+%% challenging. The underlying bq is free to add the contents of the
+%% txn to the queue proper at any point after the tx.commit comes in
+%% but before the tx.commit-ok goes out. This means that it is not
+%% safe for all mirrors to simply issue the bq:tx_commit at the same
+%% time, as the addition of the txn's contents to the queue may
+%% subsequently be inconsistently interwoven with other actions on the
+%% bq. The solution to this is, in the master, wrap the PostCommitFun
+%% and do the gm:broadcast in there: at that point, you're in the bq
+%% (well, there's actually nothing to stop that function being invoked
+%% by some other process, but let's pretend for now: you could always
+%% use run_backing_queue to ensure you really are in the queue process
+%% (the _async variant would be unsafe from an ordering pov)), the
+%% gm:broadcast is safe because you don't have to worry about races
+%% with other gm:broadcast calls (same process). Thus this signal
+%% would indicate sufficiently to all the slaves that they must insert
+%% the complete contents of the txn at precisely this point in the
+%% stream of events.
+%%
+%% However, it's quite difficult for the slaves to make that happen:
+%% they would be forced to issue the bq:tx_commit at that point, but
+%% then stall processing any further instructions from gm until they
+%% receive the notification from their bq that the tx_commit has fully
+%% completed (i.e. they need to treat what is an async system as being
+%% fully synchronous). This is not too bad (apart from the
+%% vomit-inducing notion of it all): just need a queue of instructions
+%% from the GM; but then it gets rather worse when you consider what
+%% needs to happen if the master dies at this point and the slave in
+%% the middle of this tx_commit needs to be promoted.
+%%
+%% Finally, we can't possibly hope to make transactions atomic across
+%% mirror queues, and it's not even clear that that's desirable: if a
+%% slave fails whilst there's an open transaction in progress then
+%% when the channel comes to commit the txn, it will detect the
+%% failure and destroy the channel. However, the txn will have
+%% actually committed successfully in all the other mirrors (including
+%% master). To do this bit properly would require 2PC and all the
+%% baggage that goes with that.
+%%
+%% Recovery of mirrored queues is straightforward: as nodes die, the
+%% remaining nodes record this, and eventually a situation is reached
+%% in which only one node is alive, which is the master. This is the
+%% only node which, upon recovery, will resurrect a mirrored queue:
+%% nodes which die and then rejoin as a slave will start off empty as
+%% if they have no mirrored content at all. This is not surprising: to
+%% achieve anything more sophisticated would require the master and
+%% recovering slave to be able to check to see whether they agree on
+%% the last seen state of the queue: checking length alone is not
+%% sufficient in this case.
+%%
+%% For more documentation see the comments in bug 23554.
+%%
+%%----------------------------------------------------------------------------
+
+start_link(Queue, GM, DeathFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
+
+get_gm(CPid) ->
+ gen_server2:call(CPid, get_gm, infinity).
+
+ensure_monitoring(CPid, Pids) ->
+ gen_server2:cast(CPid, {ensure_monitoring, Pids}).
+
+%% ---------------------------------------------------------------------------
+%% gen_server
+%% ---------------------------------------------------------------------------
+
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
+ GM1 = case GM of
+ undefined ->
+ {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM2, _Members} ->
+ ok
+ end,
+ GM2;
+ _ ->
+ true = link(GM),
+ GM
+ end,
+ {ok, _TRef} =
+ timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun },
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(get_gm, _From, State = #state { gm = GM }) ->
+ reply(GM, State).
+
+handle_cast({gm_deaths, Deaths},
+ State = #state { q = #amqqueue { name = QueueName } }) ->
+ rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n",
+ [rabbit_misc:rs(QueueName),
+ rabbit_misc:pid_to_string(self()),
+ [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= node() ->
+ noreply(State);
+ {error, not_found} ->
+ {stop, normal, State}
+ end;
+
+handle_cast({ensure_monitoring, Pids},
+ State = #state { monitors = Monitors }) ->
+ Monitors1 =
+ lists:foldl(fun (Pid, MonitorsN) ->
+ case dict:is_key(Pid, MonitorsN) of
+ true -> MonitorsN;
+ false -> MRef = erlang:monitor(process, Pid),
+ dict:store(Pid, MRef, MonitorsN)
+ end
+ end, Monitors, Pids),
+ noreply(State #state { monitors = Monitors1 }).
+
+handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
+ State = #state { monitors = Monitors,
+ death_fun = Fun }) ->
+ noreply(
+ case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = Fun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(_Reason, #state{}) ->
+ %% gen_server case
+ ok;
+terminate([_CPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([CPid], Members) ->
+ CPid ! {joined, self(), Members},
+ ok.
+
+members_changed([_CPid], _Births, []) ->
+ ok;
+members_changed([CPid], _Births, Deaths) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
+
+handle_msg([_CPid], _From, heartbeat) ->
+ ok;
+handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
+ ok = gen_server2:cast(CPid, Msg);
+handle_msg([_CPid], _From, _Msg) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+noreply(State) ->
+ {noreply, State, hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, State, hibernate}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
new file mode 100644
index 00000000..da12ea82
--- /dev/null
+++ b/src/rabbit_mirror_queue_master.erl
@@ -0,0 +1,380 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_master).
+
+-export([init/4, terminate/2, delete_and_terminate/2,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
+ set_ram_duration_target/2, ram_duration/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1,
+ status/1, invoke/3, is_duplicate/3, discard/3]).
+
+-export([start/1, stop/0]).
+
+-export([promote_backing_queue_state/6, sender_death_fun/0]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(state, { gm,
+ coordinator,
+ backing_queue,
+ backing_queue_state,
+ set_delivered,
+ seen_status,
+ confirmed,
+ ack_msg_id,
+ known_senders
+ }).
+
+%% For general documentation of HA design, see
+%% rabbit_mirror_queue_coordinator
+
+%% ---------------------------------------------------------------------------
+%% Backing queue
+%% ---------------------------------------------------------------------------
+
+start(_DurableQueues) ->
+ %% This will never get called as this module will never be
+ %% installed as the default BQ implementation.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+stop() ->
+ %% Same as start/1.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+sender_death_fun() ->
+ Self = self(),
+ fun (DeadPid) ->
+ rabbit_amqqueue:run_backing_queue_async(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
+ ok = gm:broadcast(GM, {sender_death, DeadPid}),
+ KS1 = sets:del_element(DeadPid, KS),
+ State #state { known_senders = KS1 }
+ end)
+ end.
+
+init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
+ AsyncCallback, SyncCallback) ->
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun()),
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
+ Nodes1 = (case Nodes of
+ [] -> rabbit_mnesia:all_clustered_nodes();
+ _ -> [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes]
+ end) -- [node()],
+ [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1],
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = 0,
+ seen_status = dict:new(),
+ confirmed = [],
+ ack_msg_id = dict:new(),
+ known_senders = sets:new() }.
+
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = BQ:len(BQS),
+ seen_status = SeenStatus,
+ confirmed = [],
+ ack_msg_id = dict:new(),
+ known_senders = sets:from_list(KS) }.
+
+terminate({shutdown, dropped} = Reason,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination - this node has been explicitly
+ %% dropped. Normally, non-durable queues would be tidied up on
+ %% startup, but there's a possibility that we will be added back
+ %% in without this node being restarted. Thus we must do the full
+ %% blown delete_and_terminate now, but only locally: we do not
+ %% broadcast delete_and_terminate.
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 };
+terminate(Reason,
+ State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination. The queue is going down but
+ %% shouldn't be deleted. Most likely safe shutdown of this
+ %% node. Thus just let some other slave take over.
+ State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
+
+delete_and_terminate(Reason, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 }.
+
+purge(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {set_length, 0}),
+ {Count, BQS1} = BQ:purge(BQS),
+ {Count, State #state { backing_queue_state = BQS1,
+ set_delivered = 0 }}.
+
+publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
+
+publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
+ ChPid, State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ %% Must use confirmed_broadcast here in order to guarantee that
+ %% all slaves are forced to interpret this publish_delivered at
+ %% the same point, especially if we die and a slave is promoted.
+ ok = gm:confirmed_broadcast(
+ GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
+ {AckTag, BQS1} =
+ BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ AM1 = maybe_store_acktag(AckTag, MsgId, AM),
+ {AckTag,
+ ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 })}.
+
+dropwhile(Fun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered }) ->
+ Len = BQ:len(BQS),
+ BQS1 = BQ:dropwhile(Fun, BQS),
+ Dropped = Len - BQ:len(BQS1),
+ SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
+ ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 }.
+
+drain_confirmed(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS,
+ confirmed = Confirmed }) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ {MsgIds1, SS1} =
+ lists:foldl(
+ fun (MsgId, {MsgIdsN, SSN}) ->
+ %% We will never see 'discarded' here
+ case dict:find(MsgId, SSN) of
+ error ->
+ {[MsgId | MsgIdsN], SSN};
+ {ok, published} ->
+ %% It was published when we were a slave,
+ %% and we were promoted before we saw the
+ %% publish from the channel. We still
+ %% haven't seen the channel publish, and
+ %% consequently we need to filter out the
+ %% confirm here. We will issue the confirm
+ %% when we see the publish from the channel.
+ {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {ok, confirmed} ->
+ %% Well, confirms are racy by definition.
+ {[MsgId | MsgIdsN], SSN}
+ end
+ end, {[], SS}, MsgIds),
+ {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1,
+ seen_status = SS1,
+ confirmed = [] }}.
+
+fetch(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered,
+ ack_msg_id = AM }) ->
+ {Result, BQS1} = BQ:fetch(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ case Result of
+ empty ->
+ {Result, State1};
+ {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
+ Remaining} ->
+ ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
+ SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ AM1 = maybe_store_acktag(AckTag, MsgId, AM),
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State1 #state { set_delivered = SetDelivered1,
+ ack_msg_id = AM1 }}
+ end.
+
+ack(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, BQS),
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
+ case MsgIds of
+ [] -> ok;
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ end,
+ {MsgIds, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 }}.
+
+tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) ->
+ %% We don't support txns in mirror queues
+ State.
+
+tx_ack(_Txn, _AckTags, State) ->
+ %% We don't support txns in mirror queues
+ State.
+
+tx_rollback(_Txn, State) ->
+ {[], State}.
+
+tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) ->
+ PostCommitFun(), %% Probably must run it to avoid deadlocks
+ {[], State}.
+
+requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}),
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
+
+len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:len(BQS).
+
+is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:is_empty(BQS).
+
+set_ram_duration_target(Target, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state =
+ BQ:set_ram_duration_target(Target, BQS) }.
+
+ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:ram_duration(BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
+needs_timeout(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:needs_timeout(BQS).
+
+timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:timeout(BQS) }.
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+
+status(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:status(BQS).
+
+invoke(?MODULE, Fun, State) ->
+ Fun(?MODULE, State);
+invoke(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
+
+is_duplicate(none, Message = #basic_message { id = MsgId },
+ State = #state { seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ confirmed = Confirmed }) ->
+ %% Here, we need to deal with the possibility that we're about to
+ %% receive a message that we've already seen when we were a slave
+ %% (we received it via gm). Thus if we do receive such message now
+ %% via the channel, there may be a confirm waiting to issue for
+ %% it.
+
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(MsgId, SS) of
+ error ->
+ %% We permit the underlying BQ to have a peek at it, but
+ %% only if we ourselves are not filtering out the msg.
+ {Result, BQS1} = BQ:is_duplicate(none, Message, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }};
+ {ok, published} ->
+ %% It already got published when we were a slave and no
+ %% confirmation is waiting. amqqueue_process will have, in
+ %% its msg_id_to_channel mapping, the entry for dealing
+ %% with the confirm when that comes back in (it's added
+ %% immediately after calling is_duplicate). The msg is
+ %% invalid. We will not see this again, nor will we be
+ %% further involved in confirming this message, so erase.
+ {published, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {ok, confirmed} ->
+ %% It got published when we were a slave via gm, and
+ %% confirmed some time after that (maybe even after
+ %% promotion), but before we received the publish from the
+ %% channel, so couldn't previously know what the
+ %% msg_seq_no was (and thus confirm as a slave). So we
+ %% need to confirm now. As above, amqqueue_process will
+ %% have the entry for the msg_id_to_channel mapping added
+ %% immediately after calling is_duplicate/2.
+ {published, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }};
+ {ok, discarded} ->
+ %% Don't erase from SS here because discard/2 is about to
+ %% be called and we need to be able to detect this case
+ {discarded, State}
+ end;
+is_duplicate(_Txn, _Msg, State) ->
+ %% In a transaction. We don't support txns in mirror queues. But
+ %% it's probably not a duplicate...
+ {false, State}.
+
+discard(Msg = #basic_message { id = MsgId }, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
+ %% It's a massive error if we get told to discard something that's
+ %% already been published or published-and-confirmed. To do that
+ %% would require non FIFO access. Hence we should not find
+ %% 'published' or 'confirmed' in this dict:find.
+ case dict:find(MsgId, SS) of
+ error ->
+ ok = gm:broadcast(GM, {discard, ChPid, Msg}),
+ State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS),
+ seen_status = dict:erase(MsgId, SS) };
+ {ok, discarded} ->
+ State
+ end.
+
+maybe_store_acktag(undefined, _MsgId, AM) ->
+ AM;
+maybe_store_acktag(AckTag, MsgId, AM) ->
+ dict:store(AckTag, MsgId, AM).
+
+ensure_monitoring(ChPid, State = #state { coordinator = CPid,
+ known_senders = KS }) ->
+ case sets:is_element(ChPid, KS) of
+ true -> State;
+ false -> ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
+ CPid, [ChPid]),
+ State #state { known_senders = sets:add_element(ChPid, KS) }
+ end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
new file mode 100644
index 00000000..046d3380
--- /dev/null
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -0,0 +1,136 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_misc).
+
+-export([remove_from_queue/2, on_node_up/0,
+ drop_slave/2, drop_slave/3, add_slave/2, add_slave/3]).
+
+-include("rabbit.hrl").
+
+%% If the dead pids include the queue pid (i.e. the master has died)
+%% then only remove that if we are about to be promoted. Otherwise we
+%% can have the situation where a slave updates the mnesia record for
+%% a queue, promoting another slave before that slave realises it has
+%% become the new master.
+remove_from_queue(QueueName, DeadPids) ->
+ DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ %% Someone else could have deleted the queue before we
+ %% get here.
+ case mnesia:read({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [Q = #amqqueue { pid = QPid,
+ mirror_pids = MPids }] ->
+ [QPid1 | MPids1] =
+ [Pid || Pid <- [QPid | MPids],
+ not lists:member(node(Pid), DeadNodes)],
+ case {{QPid, MPids}, {QPid1, MPids1}} of
+ {Same, Same} ->
+ ok;
+ _ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
+ %% Either master hasn't changed, so
+ %% we're ok to update mnesia; or master
+ %% has changed to become us!
+ Q1 = Q #amqqueue { pid = QPid1,
+ mirror_pids = MPids1 },
+ ok = rabbit_amqqueue:store_queue(Q1);
+ _ ->
+ %% Master has changed, and we're not it,
+ %% so leave alone to allow the promoted
+ %% slave to find it and make its
+ %% promotion atomic.
+ ok
+ end,
+ {ok, QPid1}
+ end
+ end).
+
+on_node_up() ->
+ Qs =
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:foldl(
+ fun (#amqqueue{ arguments = Args, name = QName }, QsN) ->
+ case rabbit_misc:table_lookup(
+ Args, <<"x-mirror">>) of
+ {_Type, []} ->
+ [QName | QsN];
+ {_Type, Nodes} ->
+ Nodes1 = [list_to_atom(binary_to_list(Node))
+ || {longstr, Node} <- Nodes],
+ case lists:member(node(), Nodes1) of
+ true -> [QName | QsN];
+ false -> QsN
+ end;
+ _ ->
+ QsN
+ end
+ end, [], rabbit_queue)
+ end),
+ [add_slave(Q, node()) || Q <- Qs],
+ ok.
+
+drop_slave(VHostPath, QueueName, MirrorNode) ->
+ drop_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+drop_slave(Queue, MirrorNode) ->
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids }) ->
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ {error, {queue_not_mirrored_on_node, MirrorNode}};
+ [QPid | MPids] ->
+ {error, cannot_drop_only_mirror};
+ [Pid] ->
+ rabbit_log:info("Dropping slave node on ~p for ~s~n",
+ [MirrorNode, rabbit_misc:rs(Name)]),
+ exit(Pid, {shutdown, dropped}),
+ ok
+ end
+ end).
+
+add_slave(VHostPath, QueueName, MirrorNode) ->
+ add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+
+add_slave(Queue, MirrorNode) ->
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, mirror_pids = MPids } = Q) ->
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= MirrorNode] of
+ [] -> Result = rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]),
+ rabbit_log:info(
+ "Adding slave node for ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, Result]),
+ case Result of
+ {ok, _Pid} -> ok;
+ _ -> Result
+ end;
+ [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ end
+ end).
+
+if_mirrored_queue(Queue, Fun) ->
+ rabbit_amqqueue:with(
+ Queue, fun (#amqqueue { arguments = Args } = Q) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> ok;
+ _ -> Fun(Q)
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
new file mode 100644
index 00000000..c5f83c24
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -0,0 +1,873 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave).
+
+%% For general documentation of HA design, see
+%% rabbit_mirror_queue_coordinator
+%%
+%% We join the GM group before we add ourselves to the amqqueue
+%% record. As a result:
+%% 1. We can receive msgs from GM that correspond to messages we will
+%% never receive from publishers.
+%% 2. When we receive a message from publishers, we must receive a
+%% message from the GM group for it.
+%% 3. However, that instruction from the GM group can arrive either
+%% before or after the actual message. We need to be able to
+%% distinguish between GM instructions arriving early, and case (1)
+%% above.
+%%
+%% All instructions from the GM group must be processed in the order
+%% in which they're received.
+
+-export([start_link/1, set_maximum_since_use/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(DEATH_TIMEOUT, 20000). %% 20 seconds
+
+-record(state, { q,
+ gm,
+ master_pid,
+ backing_queue,
+ backing_queue_state,
+ sync_timer_ref,
+ rate_timer_ref,
+
+ sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ msg_id_ack, %% :: MsgId -> AckTag
+ ack_num,
+
+ msg_id_status,
+ known_senders
+ }).
+
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, [Q], []).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+
+init([#amqqueue { name = QueueName } = Q]) ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ Self = self(),
+ Node = node(),
+ {ok, MPid} =
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ %% ASSERTION
+ [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
+ MPids1 = MPids ++ [Self],
+ mnesia:write(rabbit_queue,
+ Q1 #amqqueue { mirror_pids = MPids1 },
+ write),
+ {ok, QPid}
+ end),
+ erlang:monitor(process, MPid),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = bq_init(BQ, Q, false),
+ {ok, #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new()
+ }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "immediate" delivery mode
+
+ %% It is safe to reply 'false' here even if a) we've not seen the
+ %% msg via gm, or b) the master dies before we receive the msg via
+ %% gm. In the case of (a), we will eventually receive the msg via
+ %% gm, and it's only the master's result to the channel that is
+ %% important. In the case of (b), if the master does die and we do
+ %% get promoted then at that point we have no consumers, thus
+ %% 'false' is precisely the correct answer. However, we must be
+ %% careful to _not_ enqueue the message in this case.
+
+ %% Note this is distinct from the case where we receive the msg
+ %% via gm first, then we're promoted to master, and only then do
+ %% we receive the msg from the channel.
+ gen_server2:reply(From, false), %% master may deliver it, not us
+ noreply(maybe_enqueue_message(Delivery, false, State));
+
+handle_call({deliver, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode
+ gen_server2:reply(From, true), %% amqqueue throws away the result anyway
+ noreply(maybe_enqueue_message(Delivery, true, State));
+
+handle_call({gm_deaths, Deaths}, From,
+ State = #state { q = #amqqueue { name = QueueName },
+ gm = GM,
+ master_pid = MPid }) ->
+ rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n",
+ [rabbit_misc:rs(QueueName),
+ rabbit_misc:pid_to_string(self()),
+ [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
+ %% The GM has told us about deaths, which means we're not going to
+ %% receive any more messages from GM
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= node(MPid) ->
+ %% master hasn't changed
+ reply(ok, State);
+ {ok, Pid} when node(Pid) =:= node() ->
+ %% we've become master
+ promote_me(From, State);
+ {ok, Pid} ->
+ %% master has changed to not us.
+ gen_server2:reply(From, ok),
+ erlang:monitor(process, Pid),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_pid = Pid });
+ {error, not_found} ->
+ gen_server2:reply(From, ok),
+ {stop, normal, State}
+ end;
+
+handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Mod, Fun, State));
+
+handle_call({commit, _Txn, _ChPid}, _From, State) ->
+ %% We don't support transactions in mirror queues
+ reply(ok, State).
+
+handle_cast({run_backing_queue, Mod, Fun}, State) ->
+ noreply(run_backing_queue(Mod, Fun, State));
+
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
+
+handle_cast({deliver, Delivery = #delivery {}}, State) ->
+ %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ noreply(maybe_enqueue_message(Delivery, true, State));
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_cast({set_ram_duration_target, Duration},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(update_ram_duration,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State #state { rate_timer_ref = just_measured,
+ backing_queue_state = BQS2 });
+
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_timeout(
+ State #state { sync_timer_ref = undefined }));
+
+handle_cast({rollback, _Txn, _ChPid}, State) ->
+ %% We don't support transactions in mirror queues
+ noreply(State).
+
+handle_info(timeout, State) ->
+ noreply(backing_queue_timeout(State));
+
+handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
+ State = #state { gm = GM, master_pid = MPid }) ->
+ ok = gm:broadcast(GM, {process_death, MPid}),
+ noreply(State);
+
+handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
+ noreply(local_sender_death(ChPid, State));
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate(_Reason, #state { backing_queue_state = undefined }) ->
+ %% We've received a delete_and_terminate from gm, thus nothing to
+ %% do here.
+ ok;
+terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% See rabbit_mirror_queue_master:terminate/2
+ BQ:delete_and_terminate(R, BQS);
+terminate(Reason, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef }) ->
+ ok = gm:leave(GM),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, BQ, BQS, RateTRef, [], [], dict:new()),
+ rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate([_SPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
+
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {run_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ {post_commit, _Txn, _AckTags} -> 4;
+ _ -> 0
+ end.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([SPid], _Members) ->
+ SPid ! {joined, self()},
+ ok.
+
+members_changed([_SPid], _Births, []) ->
+ ok;
+members_changed([SPid], _Births, Deaths) ->
+ inform_deaths(SPid, Deaths).
+
+handle_msg([_SPid], _From, heartbeat) ->
+ ok;
+handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
+ %% This is only of value to the master
+ ok;
+handle_msg([SPid], _From, {process_death, Pid}) ->
+ inform_deaths(SPid, [Pid]);
+handle_msg([SPid], _From, Msg) ->
+ ok = gen_server2:cast(SPid, {gm, Msg}).
+
+inform_deaths(SPid, Deaths) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> {stop, normal} end,
+ fun () ->
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok ->
+ ok;
+ {promote, CPid} ->
+ {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end
+ end).
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+bq_init(BQ, Q, Recover) ->
+ Self = self(),
+ BQ:init(Q, Recover,
+ fun (Mod, Fun) ->
+ rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun)
+ end,
+ fun (Mod, Fun) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> error end,
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
+ end)
+ end).
+
+run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
+ %% Yes, this might look a little crazy, but see comments in
+ %% confirm_sender_death/1
+ Fun(?MODULE, State);
+run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
+
+needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
+ never;
+needs_confirming(#delivery { message = #basic_message {
+ is_persistent = true } },
+ #state { q = #amqqueue { durable = true } }) ->
+ eventually;
+needs_confirming(_Delivery, _State) ->
+ immediately.
+
+confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
+ {MS1, CMs} =
+ lists:foldl(
+ fun (MsgId, {MSN, CMsN} = Acc) ->
+ %% We will never see 'discarded' here
+ case dict:find(MsgId, MSN) of
+ error ->
+ %% If it needed confirming, it'll have
+ %% already been done.
+ Acc;
+ {ok, {published, ChPid}} ->
+ %% Still not seen it from the channel, just
+ %% record that it's been confirmed.
+ {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN};
+ {ok, {published, ChPid, MsgSeqNo}} ->
+ %% Seen from both GM and Channel. Can now
+ %% confirm.
+ {dict:erase(MsgId, MSN),
+ gb_trees_cons(ChPid, MsgSeqNo, CMsN)};
+ {ok, {confirmed, _ChPid}} ->
+ %% It's already been confirmed. This is
+ %% probably it's been both sync'd to disk
+ %% and then delivered and ack'd before we've
+ %% seen the publish from the
+ %% channel. Nothing to do here.
+ Acc
+ end
+ end, {MS, gb_trees:empty()}, MsgIds),
+ [ok = rabbit_channel:confirm(ChPid, MsgSeqNos)
+ || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)],
+ State #state { msg_id_status = MS1 }.
+
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
+
+promote_me(From, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ msg_id_ack = MA,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
+ [rabbit_misc:rs(Q #amqqueue.name),
+ rabbit_misc:pid_to_string(self())]),
+ Q1 = Q #amqqueue { pid = self() },
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ true = unlink(GM),
+ gen_server2:reply(From, {promote, CPid}),
+ ok = gm:confirmed_broadcast(GM, heartbeat),
+
+ %% Everything that we're monitoring, we need to ensure our new
+ %% coordinator is monitoring.
+
+ MonitoringPids = [begin true = erlang:demonitor(MRef),
+ Pid
+ end || {Pid, MRef} <- dict:to_list(KS)],
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
+ CPid, MonitoringPids),
+
+ %% We find all the messages that we've received from channels but
+ %% not from gm, and if they're due to be enqueued on promotion
+ %% then we pass them to the
+ %% queue_process:init_with_backing_queue_state to be enqueued.
+ %%
+ %% We also have to requeue messages which are pending acks: the
+ %% consumers from the master queue have been lost and so these
+ %% messages need requeuing. They might also be pending
+ %% confirmation, and indeed they might also be pending arrival of
+ %% the publication from the channel itself, if we received both
+ %% the publication and the fetch via gm first! Requeuing doesn't
+ %% affect confirmations: if the message was previously pending a
+ %% confirmation then it still will be, under the same msg_id. So
+ %% as a master, we need to be prepared to filter out the
+ %% publication of said messages from the channel (is_duplicate
+ %% (thus such requeued messages must remain in the msg_id_status
+ %% (MS) which becomes seen_status (SS) in the master)).
+ %%
+ %% Then there are messages we already have in the queue, which are
+ %% not currently pending acknowledgement:
+ %% 1. Messages we've only received via gm:
+ %% Filter out subsequent publication from channel through
+ %% validate_message. Might have to issue confirms then or
+ %% later, thus queue_process state will have to know that
+ %% there's a pending confirm.
+ %% 2. Messages received via both gm and channel:
+ %% Queue will have to deal with issuing confirms if necessary.
+ %%
+ %% MS contains the following three entry types:
+ %%
+ %% a) {published, ChPid}:
+ %% published via gm only; pending arrival of publication from
+ %% channel, maybe pending confirm.
+ %%
+ %% b) {published, ChPid, MsgSeqNo}:
+ %% published via gm and channel; pending confirm.
+ %%
+ %% c) {confirmed, ChPid}:
+ %% published via gm only, and confirmed; pending publication
+ %% from channel.
+ %%
+ %% d) discarded
+ %% seen via gm only as discarded. Pending publication from
+ %% channel
+ %%
+ %% The forms a, c and d only, need to go to the master state
+ %% seen_status (SS).
+ %%
+ %% The form b only, needs to go through to the queue_process
+ %% state to form the msg_id_to_channel mapping (MTC).
+ %%
+ %% No messages that are enqueued from SQ at this point will have
+ %% entries in MS.
+ %%
+ %% Messages that are extracted from MA may have entries in MS, and
+ %% those messages are then requeued. However, as discussed above,
+ %% this does not affect MS, nor which bits go through to SS in
+ %% Master, or MTC in queue_process.
+ %%
+ %% Everything that's in MA gets requeued. Consequently the new
+ %% master should start with a fresh AM as there are no messages
+ %% pending acks (txns will have been rolled back).
+
+ MSList = dict:to_list(MS),
+ SS = dict:from_list(
+ [E || E = {_MsgId, discarded} <- MSList] ++
+ [{MsgId, Status}
+ || {MsgId, {Status, _ChPid}} <- MSList,
+ Status =:= published orelse Status =:= confirmed]),
+
+ MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
+ CPid, BQ, BQS, GM, SS, MonitoringPids),
+
+ MTC = dict:from_list(
+ [{MsgId, {ChPid, MsgSeqNo}} ||
+ {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
+ AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
+ Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
+ {Delivery, true} <- queue:to_list(PubQ)],
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
+ AckTags, Deliveries, MTC),
+ {become, rabbit_amqqueue_process, QueueState, hibernate}.
+
+noreply(State) ->
+ {NewState, Timeout} = next_state(State),
+ {noreply, NewState, Timeout}.
+
+reply(Reply, State) ->
+ {NewState, Timeout} = next_state(State),
+ {reply, Reply, NewState, Timeout}.
+
+next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ State1 = ensure_rate_timer(
+ confirm_messages(MsgIds, State #state {
+ backing_queue_state = BQS1 })),
+ case BQ:needs_timeout(BQS1) of
+ false -> {stop_sync_timer(State1), hibernate};
+ idle -> {stop_sync_timer(State1), 0 };
+ timed -> {ensure_sync_timer(State1), 0 }
+ end.
+
+backing_queue_timeout(State = #state { backing_queue = BQ }) ->
+ run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
+
+ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ State #state { sync_timer_ref = TRef };
+ensure_sync_timer(State) ->
+ State.
+
+stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { sync_timer_ref = undefined }.
+
+ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State #state { rate_timer_ref = TRef };
+ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+ensure_rate_timer(State) ->
+ State.
+
+stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ State;
+stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { rate_timer_ref = undefined }.
+
+ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
+ case dict:is_key(ChPid, KS) of
+ true -> State;
+ false -> MRef = erlang:monitor(process, ChPid),
+ State #state { known_senders = dict:store(ChPid, MRef, KS) }
+ end.
+
+local_sender_death(ChPid, State = #state { known_senders = KS }) ->
+ ok = case dict:is_key(ChPid, KS) of
+ false -> ok;
+ true -> confirm_sender_death(ChPid)
+ end,
+ State.
+
+confirm_sender_death(Pid) ->
+ %% We have to deal with the possibility that we'll be promoted to
+ %% master before this thing gets run. Consequently we set the
+ %% module to rabbit_mirror_queue_master so that if we do become a
+ %% rabbit_amqqueue_process before then, sane things will happen.
+ Fun =
+ fun (?MODULE, State = #state { known_senders = KS,
+ gm = GM }) ->
+ %% We're running still as a slave
+ ok = case dict:is_key(Pid, KS) of
+ false -> ok;
+ true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
+ confirm_sender_death(Pid)
+ end,
+ State;
+ (rabbit_mirror_queue_master, State) ->
+ %% We've become a master. State is now opaque to
+ %% us. When we became master, if Pid was still known
+ %% to us then we'd have set up monitoring of it then,
+ %% so this is now a noop.
+ State
+ end,
+ %% Note that we do not remove our knowledge of this ChPid until we
+ %% get the sender_death from GM.
+ {ok, _TRef} = timer:apply_after(
+ ?DEATH_TIMEOUT, rabbit_amqqueue, run_backing_queue_async,
+ [self(), rabbit_mirror_queue_master, Fun]),
+ ok.
+
+maybe_enqueue_message(
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ msg_seq_no = MsgSeqNo,
+ sender = ChPid,
+ txn = none },
+ EnqueueOnPromotion,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ State1 = ensure_monitoring(ChPid, State),
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(MsgId, MS) of
+ error ->
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
+ State1 #state { sender_queues = SQ1 };
+ {ok, {confirmed, ChPid}} ->
+ %% BQ has confirmed it but we didn't know what the
+ %% msg_seq_no was at the time. We do now!
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { sender_queues = SQ1,
+ msg_id_status = dict:erase(MsgId, MS) };
+ {ok, {published, ChPid}} ->
+ %% It was published to the BQ and we didn't know the
+ %% msg_seq_no so couldn't confirm it at the time.
+ case needs_confirming(Delivery, State1) of
+ never ->
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
+ eventually ->
+ State1 #state {
+ msg_id_status =
+ dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 }
+ end;
+ {ok, discarded} ->
+ %% We've already heard from GM that the msg is to be
+ %% discarded. We won't see this again.
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 }
+ end;
+maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) ->
+ %% We don't support txns in mirror queues.
+ State.
+
+get_sender_queue(ChPid, SQ) ->
+ case dict:find(ChPid, SQ) of
+ error -> {queue:new(), sets:new()};
+ {ok, Val} -> Val
+ end.
+
+remove_from_pending_ch(MsgId, ChPid, SQ) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ SQ;
+ {ok, {MQ, PendingCh}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ end.
+
+process_instruction(
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
+ State = #state { sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_status = MS }) ->
+
+ %% We really are going to do the publish right now, even though we
+ %% may not have seen it directly from the channel. As a result, we
+ %% may know that it needs confirming without knowing its
+ %% msg_seq_no, which means that we can see the confirmation come
+ %% back from the backing queue without knowing the msg_seq_no,
+ %% which means that we're going to have to hang on to the fact
+ %% that we've seen the msg_id confirmed until we can associate it
+ %% with a msg_seq_no.
+ State1 = ensure_monitoring(ChPid, State),
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ1, PendingCh1, MS1} =
+ case queue:out(MQ) of
+ {empty, _MQ2} ->
+ {MQ, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, {published, ChPid}, MS)};
+ {{value, {Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ2} ->
+ %% We received the msg from the channel first. Thus we
+ %% need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never ->
+ {MQ2, PendingCh, MS};
+ eventually ->
+ {MQ2, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ {MQ2, PendingCh, MS}
+ end;
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ %% The instruction was sent to us before we were
+ %% within the mirror_pids within the #amqqueue{}
+ %% record. We'll never receive the message directly
+ %% from the channel. And the channel will not be
+ %% expecting any confirms from us.
+ {MQ, PendingCh, MS}
+ end,
+
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
+
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State2 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State2 #state { backing_queue_state = BQS1 })
+ end};
+process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
+ State = #state { sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_status = MS }) ->
+ %% Many of the comments around the publish head above apply here
+ %% too.
+ State1 = ensure_monitoring(ChPid, State),
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ1, PendingCh1, MS1} =
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {MQ, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, discarded, MS)};
+ {{value, {#delivery { message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ2} ->
+ %% We've already seen it from the channel, we're not
+ %% going to see this again, so don't add it to MS
+ {MQ2, PendingCh, MS};
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ %% The instruction was sent to us before we were
+ %% within the mirror_pids within the #amqqueue{}
+ %% record. We'll never receive the message directly
+ %% from the channel.
+ {MQ, PendingCh, MS}
+ end,
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ BQS1 = BQ:discard(Msg, ChPid, BQS),
+ {ok, State1 #state { sender_queues = SQ1,
+ msg_id_status = MS1,
+ backing_queue_state = BQS1 }};
+process_instruction({set_length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ ToDrop = QLen - Length,
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
+process_instruction({fetch, AckRequired, MsgId, Remaining},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 });
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
+process_instruction({ack, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
+ [] = MsgIds1 -- MsgIds, %% ASSERTION
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
+process_instruction({requeue, MsgPropsFun, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {ok, case length(AckTags) =:= length(MsgIds) of
+ true ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% The only thing we can safely do is nuke out our BQ
+ %% and MA. The interaction between this and confirms
+ %% doesn't really bear thinking about...
+ {_Count, BQS1} = BQ:purge(BQS),
+ {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
+ State #state { msg_id_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
+process_instruction({sender_death, ChPid},
+ State = #state { sender_queues = SQ,
+ msg_id_status = MS,
+ known_senders = KS }) ->
+ {ok, case dict:find(ChPid, KS) of
+ error ->
+ State;
+ {ok, MRef} ->
+ true = erlang:demonitor(MRef),
+ MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = dict:erase(ChPid, KS) }
+ end};
+process_instruction({delete_and_terminate, Reason},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQ:delete_and_terminate(Reason, BQS),
+ {stop, State #state { backing_queue_state = undefined }}.
+
+msg_ids_to_acktags(MsgIds, MA) ->
+ {AckTags, MA1} =
+ lists:foldl(
+ fun (MsgId, {Acc, MAN}) ->
+ case dict:find(MsgId, MA) of
+ error -> {Acc, MAN};
+ {ok, {_Num, AckTag}} -> {[AckTag | Acc],
+ dict:erase(MsgId, MAN)}
+ end
+ end, {[], MA}, MsgIds),
+ {lists:reverse(AckTags), MA1}.
+
+ack_all(BQ, MA, BQS) ->
+ BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
+
+maybe_store_ack(false, _MsgId, _AckTag, State) ->
+ State;
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
+ ack_num = Num }) ->
+ State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
+ ack_num = Num + 1 }.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
new file mode 100644
index 00000000..2ce5941e
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -0,0 +1,60 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave_sup).
+
+-rabbit_boot_step({mirror_queue_slave_sup,
+ [{description, "mirror queue slave sup"},
+ {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {requires, recovery},
+ {enables, routing_ready}]}).
+
+-rabbit_boot_step({mirrored_queues,
+ [{description, "adding mirrors to queues"},
+ {mfa, {rabbit_mirror_queue_misc, on_node_up, []}},
+ {requires, mirror_queue_slave_sup},
+ {enables, routing_ready}]}).
+
+-behaviour(supervisor2).
+
+-export([start/0, start_link/0, start_child/2]).
+
+-export([init/1]).
+
+-include_lib("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+start() ->
+ {ok, _} =
+ supervisor2:start_child(
+ rabbit_sup,
+ {rabbit_mirror_queue_slave_sup,
+ {rabbit_mirror_queue_slave_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
+ ok.
+
+start_link() ->
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_child(Node, Args) ->
+ supervisor2:start_child({?SERVER, Node}, Args).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 10, 10},
+ [{rabbit_mirror_queue_slave,
+ {rabbit_mirror_queue_slave, start_link, []},
+ temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 2df76d4e..87a5f44e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -237,7 +237,8 @@ table_definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ gm:table_definitions().
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index f6a1c92f..4f683564 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -113,7 +113,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
case mnesia:dirty_read({rabbit_queue, QName}) of
- [#amqqueue{pid = QPid}] -> [QPid | QPids];
- [] -> QPids
+ [#amqqueue{pid = QPid, mirror_pids = MPids}] ->
+ MPids ++ [QPid | QPids];
+ [] ->
+ QPids
end
end, [], QNames).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1a37cdff..3f4aa54e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2116,7 +2116,7 @@ with_fresh_variable_queue(Fun) ->
{delta, {delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
passed.
test_variable_queue() ->
@@ -2284,7 +2284,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2301,7 +2301,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
{_Guids, VQ4} =
rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:timeout(VQ4),
- _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2336,7 +2336,7 @@ test_queue_recover() ->
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
- _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 1f0f8bbe..aa174e96 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -124,7 +124,8 @@
auto_delete :: boolean(),
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ pid :: rabbit_types:maybe(pid()),
+ mirror_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index bead388d..04744aa4 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -29,20 +29,24 @@
-rabbit_upgrade({semi_durable_route, mnesia, []}).
-rabbit_upgrade({exchange_event_serial, mnesia, []}).
-rabbit_upgrade({trace_exchanges, mnesia, []}).
+-rabbit_upgrade({mirror_pids, mnesia, []}).
+-rabbit_upgrade({gm, mnesia, []}).
%% -------------------------------------------------------------------
-ifdef(use_specs).
--spec(remove_user_scope/0 :: () -> 'ok').
--spec(hash_passwords/0 :: () -> 'ok').
--spec(add_ip_to_listener/0 :: () -> 'ok').
--spec(internal_exchanges/0 :: () -> 'ok').
+-spec(remove_user_scope/0 :: () -> 'ok').
+-spec(hash_passwords/0 :: () -> 'ok').
+-spec(add_ip_to_listener/0 :: () -> 'ok').
+-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
--spec(topic_trie/0 :: () -> 'ok').
+-spec(topic_trie/0 :: () -> 'ok').
-spec(exchange_event_serial/0 :: () -> 'ok').
--spec(semi_durable_route/0 :: () -> 'ok').
--spec(trace_exchanges/0 :: () -> 'ok').
+-spec(semi_durable_route/0 :: () -> 'ok').
+-spec(trace_exchanges/0 :: () -> 'ok').
+-spec(mirror_pids/0 :: () -> 'ok').
+-spec(gm/0 :: () -> 'ok').
-endif.
@@ -121,6 +125,23 @@ trace_exchanges() ->
VHost <- rabbit_vhost:list()],
ok.
+mirror_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddMirrorPidsFun =
+ fun ({amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid}) ->
+ {amqqueue, Name, Durable, AutoDelete, Owner, Arguments, Pid, []}
+ end,
+ [ ok = transform(T,
+ AddMirrorPidsFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, mirror_pids])
+ || T <- Tables ],
+ ok.
+
+gm() ->
+ create(gm_group, [{record_name, gm_group},
+ {attributes, [name, version, members]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8ac3ad43..a167cca0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/4, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -452,7 +452,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
PersistentClient, TransientClient).
-terminate(State) ->
+terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
@@ -473,7 +473,7 @@ terminate(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(State) ->
+delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'remove_pending_ack', other than
%% deleting it.