diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 16:11:19 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 16:11:19 +0000 |
commit | bf091440f2ad1dc80462b9145c7063f971f2cb2e (patch) | |
tree | b853b3d40016da05b214badbd51bd9af16ef66e5 | |
parent | 3179a6148228279d5aa491cbec401d4976375179 (diff) | |
parent | 0af18258f54c8d7a57d64e005ee2fda9bce97b8f (diff) | |
download | rabbitmq-server-bf091440f2ad1dc80462b9145c7063f971f2cb2e.tar.gz |
merge bug24407 into default
-rw-r--r-- | docs/rabbitmqctl.1.xml | 30 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 13 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 60 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 49 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 232 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 37 |
8 files changed, 412 insertions, 30 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 34947b66..a95f7b3d 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -446,6 +446,36 @@ </para> </listitem> </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> + </term> + <listitem> + <variablelist> + <varlistentry> + <term>queue</term> + <listitem> + <para> + The name of the queue to synchronise. + </para> + </listitem> + </varlistentry> + </variablelist> + <para> + Instructs a mirrored queue with unsynchronised slaves to + synchronise itself. The queue will block while + synchronisation takes place (all publishers to and + consumers from the queue will block). The queue must be + mirrored, and must not have any pending unacknowledged + messages for this command to succeed. + </para> + <para> + Note that unsynchronised queues from which messages are + being drained will become synchronised eventually. This + command is primarily useful for queues which are not + being drained. + </para> + </listitem> + </varlistentry> </variablelist> </refsect2> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a270364..fbe146e8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -173,6 +173,8 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). +-spec(sync_mirrors/1 :: (pid()) -> + 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). -endif. @@ -598,6 +600,8 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). +sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f9614517..6b065b96 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1145,6 +1145,21 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); +handle_call(sync_mirrors, _From, + State = #q{backing_queue = rabbit_mirror_queue_master = BQ, + backing_queue_state = BQS}) -> + S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, + case BQ:depth(BQS) - BQ:len(BQS) of + 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of + {ok, BQS1} -> reply(ok, S(BQS1)); + {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} + end; + _ -> reply({error, pending_acks}, State) + end; + +handle_call(sync_mirrors, _From, State) -> + reply({error, not_mirrored}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 2b7061e8..5bde996e 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,7 +17,7 @@ -module(rabbit_control_main). -include("rabbit.hrl"). --export([start/0, stop/0, action/5]). +-export([start/0, stop/0, action/5, sync_queue/1]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -50,6 +50,7 @@ update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, + {sync_queue, [?VHOST_DEF]}, add_user, delete_user, @@ -280,6 +281,12 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> rpc_call(Node, rabbit_mnesia, forget_cluster_node, [ClusterNode, RemoveWhenOffline]); +action(sync_queue, Node, [Q], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Synchronising queue \"~s\" in vhost \"~s\"", [Q, VHost]), + rpc_call(Node, rabbit_control_main, sync_queue, + [rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q))]); + action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), wait_for_application(Node, PidFile, rabbit_and_plugins, Inform); @@ -513,6 +520,10 @@ action(eval, Node, [Expr], _Opts, _Inform) -> format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). +sync_queue(Q) -> + rabbit_amqqueue:with( + Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end). + %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Application, Inform) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e857f395..0df7ea1c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,15 +26,16 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]). +-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). --record(state, { gm, +-record(state, { name, + gm, coordinator, backing_queue, backing_queue_state, @@ -49,7 +50,8 @@ -type(death_fun() :: fun ((pid()) -> 'ok')). -type(depth_fun() :: fun (() -> 'ok')). --type(master_state() :: #state { gm :: pid(), +-type(master_state() :: #state { name :: rabbit_amqqueue:name(), + gm :: pid(), coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), @@ -58,14 +60,16 @@ known_senders :: set() }). --spec(promote_backing_queue_state/7 :: - (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) -> - master_state()). +-spec(promote_backing_queue_state/8 :: + (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(), + [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(depth_fun/0 :: () -> depth_fun()). -spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> master_state()). -spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). +-spec(sync_mirrors/1 :: (master_state()) -> + {'ok', master_state()} | {stop, any(), master_state()}). -endif. @@ -106,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), - #state { gm = GM, + #state { name = QName, + gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, @@ -121,6 +126,29 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. +sync_mirrors(State = #state { name = QName, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + Log = fun (Fmt, Params) -> + rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n", + [rabbit_misc:rs(QName) | Params]) + end, + Log("~p messages to synchronise", [BQ:len(BQS)]), + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + Ref = make_ref(), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids), + gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), + S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, + case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of + {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; + {sync_died, R, BQS1} -> Log("~p", [R]), + {ok, S(BQS1)}; + {already_synced, BQS1} -> {ok, S(BQS1)}; + {ok, BQS1} -> Log("complete", []), + {ok, S(BQS1)} + end. + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -144,17 +172,14 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{gm = GM}) -> - Info = gm:info(GM), - Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), - node(Pid) =/= node()], - MRefs = [erlang:monitor(process, S) || S <- Slaves], +stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), + MRefs = [erlang:monitor(process, SPid) || SPid <- SPids], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], %% Normally when we remove a slave another slave or master will %% notice and update Mnesia. But we just removed them all, and %% have stopped listening ourselves. So manually clean up. - QName = proplists:get_value(group_name, Info), rabbit_misc:execute_mnesia_transaction( fun () -> [Q] = mnesia:read({rabbit_queue, QName}), @@ -385,17 +410,18 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Other exported functions %% --------------------------------------------------------------------------- -promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> +promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), Len = BQ:len(BQS1), Depth = BQ:depth(BQS1), true = Len == Depth, %% ASSERTION: everything must have been requeued ok = gm:broadcast(GM, {depth, Depth}), - #state { gm = GM, + #state { name = QName, + gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS1, - seen_status = SeenStatus, + seen_status = Seen, confirmed = [], known_senders = sets:from_list(KS) }. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9354f485..53564f09 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -222,6 +222,29 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); +handle_cast({sync_start, Ref, Syncer}, + State = #state { depth_delta = DD, + backing_queue = BQ, + backing_queue_state = BQS }) -> + State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), + S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined, + rate_timer_ref = TRefN, + backing_queue_state = BQSN} end, + %% [0] We can only sync when there are no pending acks + case rabbit_mirror_queue_sync:slave( + DD, Ref, TRef, Syncer, BQ, BQS, + fun (BQN, BQSN) -> + BQSN1 = update_ram_duration(BQN, BQSN), + TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + {TRefN, BQSN1} + end) of + denied -> noreply(State1); + {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {failed, Res} -> noreply(S(Res)); + {stop, Reason, Res} -> {stop, Reason, S(Res)} + end; + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); @@ -232,15 +255,10 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State #state { backing_queue_state = BQS1 }). -handle_info(update_ram_duration, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State #state { rate_timer_ref = just_measured, - backing_queue_state = BQS2 }); +handle_info(update_ram_duration, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + noreply(State#state{rate_timer_ref = just_measured, + backing_queue_state = update_ram_duration(BQ, BQS)}); handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( @@ -357,6 +375,11 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; +handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> + case lists:member(SPid, SPids) of + true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer}); + false -> ok + end; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). @@ -528,7 +551,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, AckTags, SS, MPids), + QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -815,6 +838,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). +update_ram_duration(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQ:set_ram_duration_target(DesiredDuration, BQS1). + record_synchronised(#amqqueue { name = QName }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl new file mode 100644 index 00000000..508d46e9 --- /dev/null +++ b/src/rabbit_mirror_queue_sync.erl @@ -0,0 +1,232 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_sync). + +-include("rabbit.hrl"). + +-export([master_prepare/3, master_go/5, slave/7]). + +-define(SYNC_PROGRESS_INTERVAL, 1000000). + +%% There are three processes around, the master, the syncer and the +%% slave(s). The syncer is an intermediary, linked to the master in +%% order to make sure we do not mess with the master's credit flow or +%% set of monitors. +%% +%% Interactions +%% ------------ +%% +%% '*' indicates repeating messages. All are standard Erlang messages +%% except sync_start which is sent over GM to flush out any other +%% messages that we might have sent that way already. (credit) is the +%% usual credit_flow bump message every so often. +%% +%% Master Syncer Slave(s) +%% sync_mirrors -> || || +%% (from channel) || -- (spawns) --> || || +%% || --------- sync_start (over GM) -------> || +%% || || <--- sync_ready ---- || +%% || || (or) || +%% || || <--- sync_deny ----- || +%% || <--- ready ---- || || +%% || <--- next* ---- || || } +%% || ---- msg* ----> || || } loop +%% || || ---- sync_msg* ----> || } +%% || || <--- (credit)* ----- || } +%% || <--- next ---- || || +%% || ---- done ----> || || +%% || || -- sync_complete --> || +%% || (Dies) || + +-ifdef(use_specs). + +-type(log_fun() :: fun ((string(), [any()]) -> 'ok')). +-type(bq() :: atom()). +-type(bqs() :: any()). + +-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). +-spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) -> + {'already_synced', bqs()} | {'ok', bqs()} | + {'shutdown', any(), bqs()} | + {'sync_died', any(), bqs()}). +-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(), + bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> + 'denied' | + {'ok' | 'failed', {timer:tref(), bqs()}} | + {'stop', any(), {timer:tref(), bqs()}}). + +-endif. + +%% --------------------------------------------------------------------------- +%% Master + +master_prepare(Ref, Log, SPids) -> + MPid = self(), + spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). + +master_go(Syncer, Ref, Log, BQ, BQS) -> + Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, + receive + {'EXIT', Syncer, normal} -> {already_synced, BQS}; + {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; + {ready, Syncer} -> master_go0(Args, BQ, BQS) + end. + +master_go0(Args, BQ, BQS) -> + case BQ:fold(fun (Msg, MsgProps, Acc) -> + master_send(Msg, MsgProps, Args, Acc) + end, {0, erlang:now()}, BQS) of + {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; + {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; + {_, BQS1} -> master_done(Args, BQS1) + end. + +master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> + T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of + true -> Log("~p messages", [I]), + erlang:now(); + false -> Last + end, + receive + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age) + after 0 -> + ok + end, + receive + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {cont, {I + 1, T}}; + {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; + {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} + end. + +master_done({Syncer, Ref, _Log, Parent}, BQS) -> + receive + {next, Ref} -> unlink(Syncer), + Syncer ! {done, Ref}, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end, + {ok, BQS}; + {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS}; + {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS} + end. + +%% Master +%% --------------------------------------------------------------------------- +%% Syncer + +syncer(Ref, Log, MPid, SPids) -> + [erlang:monitor(process, SPid) || SPid <- SPids], + %% We wait for a reply from the slaves so that we know they are in + %% a receive block and will thus receive messages we send to them + %% *without* those messages ending up in their gen_server2 pqueue. + case [SPid || SPid <- SPids, + receive + {sync_ready, Ref, SPid} -> true; + {sync_deny, Ref, SPid} -> false; + {'DOWN', _, process, SPid, _} -> false + end] of + [] -> Log("all slaves already synced", []); + SPids1 -> MPid ! {ready, self()}, + Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), + syncer_loop(Ref, MPid, SPids1) + end. + +syncer_loop(Ref, MPid, SPids) -> + MPid ! {next, Ref}, + receive + {msg, Ref, Msg, MsgProps} -> + SPids1 = wait_for_credit(SPids), + [begin + credit_flow:send(SPid), + SPid ! {sync_msg, Ref, Msg, MsgProps} + end || SPid <- SPids1], + syncer_loop(Ref, MPid, SPids1); + {done, Ref} -> + [SPid ! {sync_complete, Ref} || SPid <- SPids] + end. + +wait_for_credit(SPids) -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + wait_for_credit(SPids); + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + wait_for_credit(lists:delete(SPid, SPids)) + end; + false -> SPids + end. + +%% Syncer +%% --------------------------------------------------------------------------- +%% Slave + +slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> + Syncer ! {sync_deny, Ref, self()}, + denied; + +slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> + MRef = erlang:monitor(process, Syncer), + Syncer ! {sync_ready, Ref, self()}, + {_MsgCount, BQS1} = BQ:purge(BQS), + slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration, + rabbit_misc:get_parent()}, TRef, BQS1). + +slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, + TRef, BQS) -> + receive + {'DOWN', MRef, process, Syncer, _Reason} -> + %% If the master dies half way we are not in the usual + %% half-synced state (with messages nearer the tail of the + %% queue); instead we have ones nearer the head. If we then + %% sync with a newly promoted master, or even just receive + %% messages from it, we have a hole in the middle. So the + %% only thing to do here is purge. + {_MsgCount, BQS1} = BQ:purge(BQS), + credit_flow:peer_down(Syncer), + {failed, {TRef, BQS1}}; + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + slave_sync_loop(Args, TRef, BQS); + {sync_complete, Ref} -> + erlang:demonitor(MRef, [flush]), + credit_flow:peer_down(Syncer), + {ok, {TRef, BQS}}; + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age), + slave_sync_loop(Args, TRef, BQS); + {'$gen_cast', {set_ram_duration_target, Duration}} -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + slave_sync_loop(Args, TRef, BQS1); + update_ram_duration -> + {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), + slave_sync_loop(Args, TRef1, BQS1); + {sync_msg, Ref, Msg, Props} -> + credit_flow:ack(Syncer), + Props1 = Props#message_properties{needs_confirming = false}, + BQS1 = BQ:publish(Msg, Props1, true, none, BQS), + slave_sync_loop(Args, TRef, BQS1); + {'EXIT', Parent, Reason} -> + {stop, Reason, {TRef, BQS}}; + %% If the master throws an exception + {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> + BQ:delete_and_terminate(Reason, BQS), + {stop, Reason, {TRef, undefined}} + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index edaa7198..ce3e3802 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -67,6 +67,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). +-export([get_parent/0]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -241,7 +242,7 @@ -spec(interval_operation/4 :: ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer()) -> {any(), non_neg_integer()}). - +-spec(get_parent/0 :: () -> pid()). -endif. %%---------------------------------------------------------------------------- @@ -1045,3 +1046,37 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> {false, false} -> lists:max([IdealInterval, round(LastInterval / 1.5)]) end}. + +%% ------------------------------------------------------------------------- +%% Begin copypasta from gen_server2.erl + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid (Parent) -> Parent; + [Parent | _] when is_atom(Parent) -> name_to_pid(Parent); + _ -> exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> case whereis_name(Name) of + undefined -> exit(could_not_find_registerd_name); + Pid -> Pid + end; + Pid -> Pid + end. + +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> case erlang:is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> Pid + end; + [] -> undefined + end. + +%% End copypasta from gen_server2.erl +%% ------------------------------------------------------------------------- |