summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-04 16:11:19 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-04 16:11:19 +0000
commitbf091440f2ad1dc80462b9145c7063f971f2cb2e (patch)
treeb853b3d40016da05b214badbd51bd9af16ef66e5
parent3179a6148228279d5aa491cbec401d4976375179 (diff)
parent0af18258f54c8d7a57d64e005ee2fda9bce97b8f (diff)
downloadrabbitmq-server-bf091440f2ad1dc80462b9145c7063f971f2cb2e.tar.gz
merge bug24407 into default
-rw-r--r--docs/rabbitmqctl.1.xml30
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_control_main.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl60
-rw-r--r--src/rabbit_mirror_queue_slave.erl49
-rw-r--r--src/rabbit_mirror_queue_sync.erl232
-rw-r--r--src/rabbit_misc.erl37
8 files changed, 412 insertions, 30 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 34947b66..a95f7b3d 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -446,6 +446,36 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to synchronise.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a mirrored queue with unsynchronised slaves to
+ synchronise itself. The queue will block while
+ synchronisation takes place (all publishers to and
+ consumers from the queue will block). The queue must be
+ mirrored, and must not have any pending unacknowledged
+ messages for this command to succeed.
+ </para>
+ <para>
+ Note that unsynchronised queues from which messages are
+ being drained will become synchronised eventually. This
+ command is primarily useful for queues which are not
+ being drained.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a270364..fbe146e8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -173,6 +173,8 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
+-spec(sync_mirrors/1 :: (pid()) ->
+ 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
-endif.
@@ -598,6 +600,8 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
+sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f9614517..6b065b96 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1145,6 +1145,21 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call(sync_mirrors, _From,
+ State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
+ backing_queue_state = BQS}) ->
+ S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ case BQ:depth(BQS) - BQ:len(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
+ end;
+ _ -> reply({error, pending_acks}, State)
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, not_mirrored}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2b7061e8..5bde996e 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,7 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5]).
+-export([start/0, stop/0, action/5, sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -50,6 +50,7 @@
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
+ {sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -280,6 +281,12 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
rpc_call(Node, rabbit_mnesia, forget_cluster_node,
[ClusterNode, RemoveWhenOffline]);
+action(sync_queue, Node, [Q], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Synchronising queue \"~s\" in vhost \"~s\"", [Q, VHost]),
+ rpc_call(Node, rabbit_control_main, sync_queue,
+ [rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
@@ -513,6 +520,10 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
+sync_queue(Q) ->
+ rabbit_amqqueue:with(
+ Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e857f395..0df7ea1c 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,15 +26,16 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
+-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
--record(state, { gm,
+-record(state, { name,
+ gm,
coordinator,
backing_queue,
backing_queue_state,
@@ -49,7 +50,8 @@
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
--type(master_state() :: #state { gm :: pid(),
+-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
+ gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
@@ -58,14 +60,16 @@
known_senders :: set()
}).
--spec(promote_backing_queue_state/7 ::
- (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
- master_state()).
+-spec(promote_backing_queue_state/8 ::
+ (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(),
+ [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(depth_fun/0 :: () -> depth_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
+-spec(sync_mirrors/1 :: (master_state()) ->
+ {'ok', master_state()} | {stop, any(), master_state()}).
-endif.
@@ -106,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -121,6 +126,29 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+sync_mirrors(State = #state { name = QName,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Log = fun (Fmt, Params) ->
+ rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n",
+ [rabbit_misc:rs(QName) | Params])
+ end,
+ Log("~p messages to synchronise", [BQ:len(BQS)]),
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ Ref = make_ref(),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
+ S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
+ case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
+ {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
+ {sync_died, R, BQS1} -> Log("~p", [R]),
+ {ok, S(BQS1)};
+ {already_synced, BQS1} -> {ok, S(BQS1)};
+ {ok, BQS1} -> Log("complete", []),
+ {ok, S(BQS1)}
+ end.
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -144,17 +172,14 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{gm = GM}) ->
- Info = gm:info(GM),
- Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
- node(Pid) =/= node()],
- MRefs = [erlang:monitor(process, S) || S <- Slaves],
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ MRefs = [erlang:monitor(process, SPid) || SPid <- SPids],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
- QName = proplists:get_value(group_name, Info),
rabbit_misc:execute_mnesia_transaction(
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
@@ -385,17 +410,18 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
Len = BQ:len(BQS1),
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- seen_status = SeenStatus,
+ seen_status = Seen,
confirmed = [],
known_senders = sets:from_list(KS) }.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9354f485..53564f09 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -222,6 +222,29 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, Syncer},
+ State = #state { depth_delta = DD,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
+ S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN} end,
+ %% [0] We can only sync when there are no pending acks
+ case rabbit_mirror_queue_sync:slave(
+ DD, Ref, TRef, Syncer, BQ, BQS,
+ fun (BQN, BQSN) ->
+ BQSN1 = update_ram_duration(BQN, BQSN),
+ TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRefN, BQSN1}
+ end) of
+ denied -> noreply(State1);
+ {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {failed, Res} -> noreply(S(Res));
+ {stop, Reason, Res} -> {stop, Reason, S(Res)}
+ end;
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -232,15 +255,10 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State #state { rate_timer_ref = just_measured,
- backing_queue_state = BQS2 });
+handle_info(update_ram_duration, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ noreply(State#state{rate_timer_ref = just_measured,
+ backing_queue_state = update_ram_duration(BQ, BQS)});
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
@@ -357,6 +375,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
@@ -528,7 +551,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, AckTags, SS, MPids),
+ QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -815,6 +838,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+update_ram_duration(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQ:set_ram_duration_target(DesiredDuration, BQS1).
+
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
new file mode 100644
index 00000000..508d46e9
--- /dev/null
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -0,0 +1,232 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_sync).
+
+-include("rabbit.hrl").
+
+-export([master_prepare/3, master_go/5, slave/7]).
+
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
+%% There are three processes around, the master, the syncer and the
+%% slave(s). The syncer is an intermediary, linked to the master in
+%% order to make sure we do not mess with the master's credit flow or
+%% set of monitors.
+%%
+%% Interactions
+%% ------------
+%%
+%% '*' indicates repeating messages. All are standard Erlang messages
+%% except sync_start which is sent over GM to flush out any other
+%% messages that we might have sent that way already. (credit) is the
+%% usual credit_flow bump message every so often.
+%%
+%% Master Syncer Slave(s)
+%% sync_mirrors -> || ||
+%% (from channel) || -- (spawns) --> || ||
+%% || --------- sync_start (over GM) -------> ||
+%% || || <--- sync_ready ---- ||
+%% || || (or) ||
+%% || || <--- sync_deny ----- ||
+%% || <--- ready ---- || ||
+%% || <--- next* ---- || || }
+%% || ---- msg* ----> || || } loop
+%% || || ---- sync_msg* ----> || }
+%% || || <--- (credit)* ----- || }
+%% || <--- next ---- || ||
+%% || ---- done ----> || ||
+%% || || -- sync_complete --> ||
+%% || (Dies) ||
+
+-ifdef(use_specs).
+
+-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
+-type(bq() :: atom()).
+-type(bqs() :: any()).
+
+-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) ->
+ {'already_synced', bqs()} | {'ok', bqs()} |
+ {'shutdown', any(), bqs()} |
+ {'sync_died', any(), bqs()}).
+-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
+ bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
+ 'denied' |
+ {'ok' | 'failed', {timer:tref(), bqs()}} |
+ {'stop', any(), {timer:tref(), bqs()}}).
+
+-endif.
+
+%% ---------------------------------------------------------------------------
+%% Master
+
+master_prepare(Ref, Log, SPids) ->
+ MPid = self(),
+ spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+
+master_go(Syncer, Ref, Log, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+ receive
+ {'EXIT', Syncer, normal} -> {already_synced, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
+ {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ end.
+
+master_go0(Args, BQ, BQS) ->
+ case BQ:fold(fun (Msg, MsgProps, Acc) ->
+ master_send(Msg, MsgProps, Args, Acc)
+ end, {0, erlang:now()}, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
+ end.
+
+master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
+ T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ true -> Log("~p messages", [I]),
+ erlang:now();
+ false -> Last
+ end,
+ receive
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age)
+ after 0 ->
+ ok
+ end,
+ receive
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {cont, {I + 1, T}};
+ {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
+ {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
+ end.
+
+master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+ receive
+ {next, Ref} -> unlink(Syncer),
+ Syncer ! {done, Ref},
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end,
+ {ok, BQS};
+ {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
+ end.
+
+%% Master
+%% ---------------------------------------------------------------------------
+%% Syncer
+
+syncer(Ref, Log, MPid, SPids) ->
+ [erlang:monitor(process, SPid) || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ case [SPid || SPid <- SPids,
+ receive
+ {sync_ready, Ref, SPid} -> true;
+ {sync_deny, Ref, SPid} -> false;
+ {'DOWN', _, process, SPid, _} -> false
+ end] of
+ [] -> Log("all slaves already synced", []);
+ SPids1 -> MPid ! {ready, self()},
+ Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
+ syncer_loop(Ref, MPid, SPids1)
+ end.
+
+syncer_loop(Ref, MPid, SPids) ->
+ MPid ! {next, Ref},
+ receive
+ {msg, Ref, Msg, MsgProps} ->
+ SPids1 = wait_for_credit(SPids),
+ [begin
+ credit_flow:send(SPid),
+ SPid ! {sync_msg, Ref, Msg, MsgProps}
+ end || SPid <- SPids1],
+ syncer_loop(Ref, MPid, SPids1);
+ {done, Ref} ->
+ [SPid ! {sync_complete, Ref} || SPid <- SPids]
+ end.
+
+wait_for_credit(SPids) ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ wait_for_credit(SPids);
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ wait_for_credit(lists:delete(SPid, SPids))
+ end;
+ false -> SPids
+ end.
+
+%% Syncer
+%% ---------------------------------------------------------------------------
+%% Slave
+
+slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
+ Syncer ! {sync_deny, Ref, self()},
+ denied;
+
+slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, Syncer),
+ Syncer ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
+ rabbit_misc:get_parent()}, TRef, BQS1).
+
+slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
+ TRef, BQS) ->
+ receive
+ {'DOWN', MRef, process, Syncer, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue); instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ credit_flow:peer_down(Syncer),
+ {failed, {TRef, BQS1}};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ slave_sync_loop(Args, TRef, BQS);
+ {sync_complete, Ref} ->
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(Syncer),
+ {ok, {TRef, BQS}};
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ slave_sync_loop(Args, TRef, BQS);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ slave_sync_loop(Args, TRef, BQS1);
+ update_ram_duration ->
+ {TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
+ slave_sync_loop(Args, TRef1, BQS1);
+ {sync_msg, Ref, Msg, Props} ->
+ credit_flow:ack(Syncer),
+ Props1 = Props#message_properties{needs_confirming = false},
+ BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
+ slave_sync_loop(Args, TRef, BQS1);
+ {'EXIT', Parent, Reason} ->
+ {stop, Reason, {TRef, BQS}};
+ %% If the master throws an exception
+ {'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
+ BQ:delete_and_terminate(Reason, BQS),
+ {stop, Reason, {TRef, undefined}}
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index edaa7198..ce3e3802 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,6 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
+-export([get_parent/0]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -241,7 +242,7 @@
-spec(interval_operation/4 ::
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
-
+-spec(get_parent/0 :: () -> pid()).
-endif.
%%----------------------------------------------------------------------------
@@ -1045,3 +1046,37 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
{false, false} -> lists:max([IdealInterval,
round(LastInterval / 1.5)])
end}.
+
+%% -------------------------------------------------------------------------
+%% Begin copypasta from gen_server2.erl
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid (Parent) -> Parent;
+ [Parent | _] when is_atom(Parent) -> name_to_pid(Parent);
+ _ -> exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined -> case whereis_name(Name) of
+ undefined -> exit(could_not_find_registerd_name);
+ Pid -> Pid
+ end;
+ Pid -> Pid
+ end.
+
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() -> case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true -> Pid
+ end;
+ [] -> undefined
+ end.
+
+%% End copypasta from gen_server2.erl
+%% -------------------------------------------------------------------------