summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-29 14:37:13 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-29 14:37:13 +0000
commitd9a00cfa67c05583ffcddbfa0a003cecc102bee9 (patch)
tree564d80ebdaf4b1137a43099849f40afbfbb38efe
parent69f561481424051a2e456c4138af9e0d22790d7b (diff)
downloadrabbitmq-server-d9a00cfa67c05583ffcddbfa0a003cecc102bee9.tar.gz
handle the case of the Syncer dying right at the end
which could previously leave the master blocked, waiting for 'next'. And move the unlinking, which allows us to ensure we don't end up with stray 'EXIT's.
-rw-r--r--src/rabbit_mirror_queue_sync.erl35
1 files changed, 20 insertions, 15 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 3a8a68b8..c654cde5 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -58,19 +58,13 @@ master_prepare(Ref, Log, SPids) ->
spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
master_go(Syncer, Ref, Log, BQ, BQS) ->
- SendArgs = {Syncer, Ref, Log, rabbit_misc:get_parent()},
- {Acc, BQS1} =
- BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
- master_send(SendArgs, I, Last, Msg, MsgProps)
- end, {0, erlang:now()}, BQS),
- receive
- {next, Ref} -> ok
- end,
- Syncer ! {done, Ref},
- case Acc of
- {shutdown, Reason} -> {shutdown, Reason, BQS1};
- {sync_died, Reason} -> {sync_died, Reason, BQS1};
- _ -> {ok, BQS1}
+ Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+ case BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
+ master_send(Args, I, Last, Msg, MsgProps)
+ end, {0, erlang:now()}, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
end.
master_send({Syncer, Ref, Log, Parent}, I, Last, Msg, MsgProps) ->
@@ -93,6 +87,18 @@ master_send({Syncer, Ref, Log, Parent}, I, Last, Msg, MsgProps) ->
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
+master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+ receive
+ {next, Ref} -> unlink(Syncer),
+ Syncer ! {done, Ref},
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end,
+ {ok, BQS};
+ {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
+ end.
+
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -108,8 +114,7 @@ syncer(Ref, Log, MPid, SPids) ->
{S, _} <- SPidsMRefs1]]),
SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3)
- end,
- unlink(MPid).
+ end.
syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
MPid ! {next, Ref},