diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-03 22:29:56 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-03 22:29:56 +0000 |
commit | 08c59a9170c1858d009ccecd089b6f1b2d25cd20 (patch) | |
tree | 9b6393f3e09de22f051c1fa738f4b0d99636634d | |
parent | 192ff326d82e601c0ebc1ebcdcc0d4411fda08eb (diff) | |
download | rabbitmq-server-08c59a9170c1858d009ccecd089b6f1b2d25cd20.tar.gz |
simplify syncer
we don't need to track monitors
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 56 |
1 files changed, 23 insertions, 33 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 8c561d1c..88f4639f 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -131,61 +131,51 @@ master_done({Syncer, Ref, _Log, Parent}, BQS) -> %% Syncer syncer(Ref, Log, MPid, SPids) -> - SPidsMRefs = [{SPid, erlang:monitor(process, SPid)} || SPid <- SPids], + [erlang:monitor(process, SPid) || SPid <- SPids], %% We wait for a reply from the slaves so that we know they are in %% a receive block and will thus receive messages we send to them %% *without* those messages ending up in their gen_server2 pqueue. - case foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3) of - [] -> Log("all slaves already synced", []); - SPidsMRefs1 -> MPid ! {ready, self()}, - Log("~p to sync", [[rabbit_misc:pid_to_string(S) || - {S, _} <- SPidsMRefs1]]), - SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), - foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3) + case [SPid || SPid <- SPids, + receive + {sync_ready, Ref, SPid} -> true; + {sync_deny, Ref, SPid} -> false; + {'DOWN', _, process, SPid, _} -> false + end] of + [] -> Log("all slaves already synced", []); + SPids1 -> MPid ! {ready, self()}, + Log("~p to sync", [[rabbit_misc:pid_to_string(SPid) || + SPid <- SPids1]]), + SPids2 = syncer_loop(Ref, MPid, SPids1), + [SPid ! {sync_complete, Ref} || SPid <- SPids2] end. -syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> +syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive {msg, Ref, Msg, MsgProps} -> - SPidsMRefs1 = wait_for_credit(SPidsMRefs), + SPids1 = wait_for_credit(SPids), [begin credit_flow:send(SPid), SPid ! {sync_msg, Ref, Msg, MsgProps} - end || {SPid, _} <- SPidsMRefs1], - syncer_loop(Args, SPidsMRefs1); + end || SPid <- SPids1], + syncer_loop(Ref, MPid, SPids1); {done, Ref} -> - SPidsMRefs + SPids end. -wait_for_credit(SPidsMRefs) -> +wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - wait_for_credit(SPidsMRefs); - {'DOWN', MRef, _, SPid, _} -> + wait_for_credit(SPids); + {'DOWN', _, _, SPid, _} -> credit_flow:peer_down(SPid), - wait_for_credit(lists:delete({SPid, MRef}, SPidsMRefs)) + wait_for_credit(lists:delete(SPid, SPids)) end; - false -> SPidsMRefs + false -> SPids end. -foreach_slave(SPidsMRefs, Ref, Fun) -> - [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, - Fun(SPid, MRef, Ref) =/= ignore]. - -sync_receive_ready(SPid, MRef, Ref) -> - receive - {sync_ready, Ref, SPid} -> SPid; - {sync_deny, Ref, SPid} -> ignore; - {'DOWN', MRef, _, SPid, _} -> ignore - end. - - -sync_send_complete(SPid, _MRef, Ref) -> - SPid ! {sync_complete, Ref}. - %% Syncer %% --------------------------------------------------------------------------- %% Slave |