summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_sync.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-04 12:24:44 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-04 12:24:44 +0000
commit1d15ffde6cf646d05861d6cd71d4c4519f36c798 (patch)
treee52afb2d50f8d0ffbf59c55ff8b171c7a9380225 /src/rabbit_mirror_queue_sync.erl
parentf6f0d2fe572ae7697d91139396d634864cecdf04 (diff)
parent4aa86fc039e0fdd7adf41b3b4e50b821c753a129 (diff)
downloadrabbitmq-server-1d15ffde6cf646d05861d6cd71d4c4519f36c798.tar.gz
Merge in bug24407.
Diffstat (limited to 'src/rabbit_mirror_queue_sync.erl')
-rw-r--r--src/rabbit_mirror_queue_sync.erl45
1 files changed, 32 insertions, 13 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 10a74cc9..e3f254e4 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/3, master_go/5, slave/7]).
+-export([master_prepare/3, master_go/7, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -59,7 +59,10 @@
-type(bqs() :: any()).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
--spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) ->
+-spec(master_go/7 :: (pid(), reference(), log_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ rabbit_mirror_queue_master:stats_fun(),
+ bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'shutdown', any(), bqs()} |
{'sync_died', any(), bqs()}).
@@ -78,12 +81,13 @@ master_prepare(Ref, Log, SPids) ->
MPid = self(),
spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
-master_go(Syncer, Ref, Log, BQ, BQS) ->
- Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
- {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ {ready, Syncer} -> EmitStats({syncing, 0}),
+ master_go0(Args, BQ, BQS)
end.
master_go0(Args, BQ, BQS) ->
@@ -95,12 +99,15 @@ master_go0(Args, BQ, BQS) ->
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
+master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
+ {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
- true -> Log("~p messages", [I]),
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
erlang:now();
false -> Last
end,
+ HandleInfo({syncing, I}),
receive
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age)
@@ -108,24 +115,31 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
ok
end,
receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
{next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
-master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
receive
- {next, Ref} -> unlink(Syncer),
- Syncer ! {done, Ref},
- receive {'EXIT', Syncer, _} -> ok
- after 0 -> ok
- end,
+ {next, Ref} -> stop_syncer(Syncer, {done, Ref}),
{ok, BQS};
{'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
end.
+stop_syncer(Syncer, Msg) ->
+ unlink(Syncer),
+ Syncer ! Msg,
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end.
+
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -158,6 +172,11 @@ syncer_loop(Ref, MPid, SPids) ->
SPid ! {sync_msg, Ref, Msg, MsgProps}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ ok;
{done, Ref} ->
[SPid ! {sync_complete, Ref} || SPid <- SPids]
end.