summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_sync.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_sync.erl')
-rw-r--r--src/rabbit_mirror_queue_sync.erl260
1 files changed, 260 insertions, 0 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
new file mode 100644
index 00000000..61e90105
--- /dev/null
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -0,0 +1,260 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2010-2012 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_sync).
+
+-include("rabbit.hrl").
+
+-export([master_prepare/3, master_go/7, slave/7]).
+
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
+%% There are three processes around, the master, the syncer and the
+%% slave(s). The syncer is an intermediary, linked to the master in
+%% order to make sure we do not mess with the master's credit flow or
+%% set of monitors.
+%%
+%% Interactions
+%% ------------
+%%
+%% '*' indicates repeating messages. All are standard Erlang messages
+%% except sync_start which is sent over GM to flush out any other
+%% messages that we might have sent that way already. (credit) is the
+%% usual credit_flow bump message every so often.
+%%
+%% Master Syncer Slave(s)
+%% sync_mirrors -> || ||
+%% (from channel) || -- (spawns) --> || ||
+%% || --------- sync_start (over GM) -------> ||
+%% || || <--- sync_ready ---- ||
+%% || || (or) ||
+%% || || <--- sync_deny ----- ||
+%% || <--- ready ---- || ||
+%% || <--- next* ---- || || }
+%% || ---- msg* ----> || || } loop
+%% || || ---- sync_msg* ----> || }
+%% || || <--- (credit)* ----- || }
+%% || <--- next ---- || ||
+%% || ---- done ----> || ||
+%% || || -- sync_complete --> ||
+%% || (Dies) ||
+
+-ifdef(use_specs).
+
+-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
+-type(bq() :: atom()).
+-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
+
+-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_go/7 :: (pid(), reference(), log_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ bq(), bqs()) ->
+ {'already_synced', bqs()} | {'ok', bqs()} |
+ {'shutdown', any(), bqs()} |
+ {'sync_died', any(), bqs()}).
+-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
+ bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
+ 'denied' |
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
+
+-endif.
+
+%% ---------------------------------------------------------------------------
+%% Master
+
+master_prepare(Ref, Log, SPids) ->
+ MPid = self(),
+ spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
+ receive
+ {'EXIT', Syncer, normal} -> {already_synced, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
+ {ready, Syncer} -> EmitStats({syncing, 0}),
+ master_go0(Args, BQ, BQS)
+ end.
+
+master_go0(Args, BQ, BQS) ->
+ case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
+ end, {0, erlang:now()}, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
+ end.
+
+master_send(Msg, MsgProps, Unacked,
+ {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
+ T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
+ erlang:now();
+ false -> Last
+ end,
+ HandleInfo({syncing, I}),
+ receive
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age)
+ after 0 ->
+ ok
+ end,
+ receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
+ {cont, {I + 1, T}};
+ {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
+ {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
+ end.
+
+master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
+ receive
+ {next, Ref} -> stop_syncer(Syncer, {done, Ref}),
+ {ok, BQS};
+ {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
+ end.
+
+stop_syncer(Syncer, Msg) ->
+ unlink(Syncer),
+ Syncer ! Msg,
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end.
+
+%% Master
+%% ---------------------------------------------------------------------------
+%% Syncer
+
+syncer(Ref, Log, MPid, SPids) ->
+ [erlang:monitor(process, SPid) || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ case [SPid || SPid <- SPids,
+ receive
+ {sync_ready, Ref, SPid} -> true;
+ {sync_deny, Ref, SPid} -> false;
+ {'DOWN', _, process, SPid, _} -> false
+ end] of
+ [] -> Log("all slaves already synced", []);
+ SPids1 -> MPid ! {ready, self()},
+ Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
+ syncer_loop(Ref, MPid, SPids1)
+ end.
+
+syncer_loop(Ref, MPid, SPids) ->
+ MPid ! {next, Ref},
+ receive
+ {msg, Ref, Msg, MsgProps, Unacked} ->
+ SPids1 = wait_for_credit(SPids),
+ [begin
+ credit_flow:send(SPid),
+ SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
+ end || SPid <- SPids1],
+ syncer_loop(Ref, MPid, SPids1);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ ok;
+ {done, Ref} ->
+ [SPid ! {sync_complete, Ref} || SPid <- SPids]
+ end.
+
+wait_for_credit(SPids) ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ wait_for_credit(SPids);
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ wait_for_credit(lists:delete(SPid, SPids))
+ end;
+ false -> SPids
+ end.
+
+%% Syncer
+%% ---------------------------------------------------------------------------
+%% Slave
+
+slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
+ Syncer ! {sync_deny, Ref, self()},
+ denied;
+
+slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, Syncer),
+ Syncer ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
+ slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
+
+slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
+ State = {MA, TRef, BQS}) ->
+ receive
+ {'DOWN', MRef, process, Syncer, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue); instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
+ credit_flow:peer_down(Syncer),
+ {failed, {[], TRef, BQS1}};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ slave_sync_loop(Args, State);
+ {sync_complete, Ref} ->
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(Syncer),
+ {ok, State};
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ slave_sync_loop(Args, State);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ slave_sync_loop(Args, {MA, TRef, BQS1});
+ update_ram_duration ->
+ {TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
+ {sync_msg, Ref, Msg, Props, Unacked} ->
+ credit_flow:ack(Syncer),
+ Props1 = Props#message_properties{needs_confirming = false},
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
+ {'EXIT', Parent, Reason} ->
+ {stop, Reason, State};
+ %% If the master throws an exception
+ {'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
+ BQ:delete_and_terminate(Reason, BQS),
+ {stop, Reason, {[], TRef, undefined}}
+ end.