diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-04 12:24:44 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-04 12:24:44 +0000 |
commit | 1d15ffde6cf646d05861d6cd71d4c4519f36c798 (patch) | |
tree | e52afb2d50f8d0ffbf59c55ff8b171c7a9380225 /src/rabbit_mirror_queue_sync.erl | |
parent | f6f0d2fe572ae7697d91139396d634864cecdf04 (diff) | |
parent | 4aa86fc039e0fdd7adf41b3b4e50b821c753a129 (diff) | |
download | rabbitmq-server-1d15ffde6cf646d05861d6cd71d4c4519f36c798.tar.gz |
Merge in bug24407.
Diffstat (limited to 'src/rabbit_mirror_queue_sync.erl')
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 45 |
1 files changed, 32 insertions, 13 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 10a74cc9..e3f254e4 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/3, master_go/5, slave/7]). +-export([master_prepare/3, master_go/7, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -59,7 +59,10 @@ -type(bqs() :: any()). -spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). --spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) -> +-spec(master_go/7 :: (pid(), reference(), log_fun(), + rabbit_mirror_queue_master:stats_fun(), + rabbit_mirror_queue_master:stats_fun(), + bq(), bqs()) -> {'already_synced', bqs()} | {'ok', bqs()} | {'shutdown', any(), bqs()} | {'sync_died', any(), bqs()}). @@ -78,12 +81,13 @@ master_prepare(Ref, Log, SPids) -> MPid = self(), spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). -master_go(Syncer, Ref, Log, BQ, BQS) -> - Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, +master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> + Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; - {ready, Syncer} -> master_go0(Args, BQ, BQS) + {ready, Syncer} -> EmitStats({syncing, 0}), + master_go0(Args, BQ, BQS) end. master_go0(Args, BQ, BQS) -> @@ -95,12 +99,15 @@ master_go0(Args, BQ, BQS) -> {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> +master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, + {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of - true -> Log("~p messages", [I]), + true -> EmitStats({syncing, I}), + Log("~p messages", [I]), erlang:now(); false -> Last end, + HandleInfo({syncing, I}), receive {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age) @@ -108,24 +115,31 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) -> ok end, receive + {'$gen_call', From, + cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), + gen_server2:reply(From, ok), + {stop, cancelled}; {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. -master_done({Syncer, Ref, _Log, Parent}, BQS) -> +master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) -> receive - {next, Ref} -> unlink(Syncer), - Syncer ! {done, Ref}, - receive {'EXIT', Syncer, _} -> ok - after 0 -> ok - end, + {next, Ref} -> stop_syncer(Syncer, {done, Ref}), {ok, BQS}; {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS} end. +stop_syncer(Syncer, Msg) -> + unlink(Syncer), + Syncer ! Msg, + receive {'EXIT', Syncer, _} -> ok + after 0 -> ok + end. + %% Master %% --------------------------------------------------------------------------- %% Syncer @@ -158,6 +172,11 @@ syncer_loop(Ref, MPid, SPids) -> SPid ! {sync_msg, Ref, Msg, MsgProps} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + ok; {done, Ref} -> [SPid ! {sync_complete, Ref} || SPid <- SPids] end. |