summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-03 22:29:56 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-03 22:29:56 +0000
commit08c59a9170c1858d009ccecd089b6f1b2d25cd20 (patch)
tree9b6393f3e09de22f051c1fa738f4b0d99636634d
parent192ff326d82e601c0ebc1ebcdcc0d4411fda08eb (diff)
downloadrabbitmq-server-08c59a9170c1858d009ccecd089b6f1b2d25cd20.tar.gz
simplify syncer
we don't need to track monitors
-rw-r--r--src/rabbit_mirror_queue_sync.erl56
1 files changed, 23 insertions, 33 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 8c561d1c..88f4639f 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -131,61 +131,51 @@ master_done({Syncer, Ref, _Log, Parent}, BQS) ->
%% Syncer
syncer(Ref, Log, MPid, SPids) ->
- SPidsMRefs = [{SPid, erlang:monitor(process, SPid)} || SPid <- SPids],
+ [erlang:monitor(process, SPid) || SPid <- SPids],
%% We wait for a reply from the slaves so that we know they are in
%% a receive block and will thus receive messages we send to them
%% *without* those messages ending up in their gen_server2 pqueue.
- case foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3) of
- [] -> Log("all slaves already synced", []);
- SPidsMRefs1 -> MPid ! {ready, self()},
- Log("~p to sync", [[rabbit_misc:pid_to_string(S) ||
- {S, _} <- SPidsMRefs1]]),
- SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
- foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3)
+ case [SPid || SPid <- SPids,
+ receive
+ {sync_ready, Ref, SPid} -> true;
+ {sync_deny, Ref, SPid} -> false;
+ {'DOWN', _, process, SPid, _} -> false
+ end] of
+ [] -> Log("all slaves already synced", []);
+ SPids1 -> MPid ! {ready, self()},
+ Log("~p to sync", [[rabbit_misc:pid_to_string(SPid) ||
+ SPid <- SPids1]]),
+ SPids2 = syncer_loop(Ref, MPid, SPids1),
+ [SPid ! {sync_complete, Ref} || SPid <- SPids2]
end.
-syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
+syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
{msg, Ref, Msg, MsgProps} ->
- SPidsMRefs1 = wait_for_credit(SPidsMRefs),
+ SPids1 = wait_for_credit(SPids),
[begin
credit_flow:send(SPid),
SPid ! {sync_msg, Ref, Msg, MsgProps}
- end || {SPid, _} <- SPidsMRefs1],
- syncer_loop(Args, SPidsMRefs1);
+ end || SPid <- SPids1],
+ syncer_loop(Ref, MPid, SPids1);
{done, Ref} ->
- SPidsMRefs
+ SPids
end.
-wait_for_credit(SPidsMRefs) ->
+wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- wait_for_credit(SPidsMRefs);
- {'DOWN', MRef, _, SPid, _} ->
+ wait_for_credit(SPids);
+ {'DOWN', _, _, SPid, _} ->
credit_flow:peer_down(SPid),
- wait_for_credit(lists:delete({SPid, MRef}, SPidsMRefs))
+ wait_for_credit(lists:delete(SPid, SPids))
end;
- false -> SPidsMRefs
+ false -> SPids
end.
-foreach_slave(SPidsMRefs, Ref, Fun) ->
- [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
- Fun(SPid, MRef, Ref) =/= ignore].
-
-sync_receive_ready(SPid, MRef, Ref) ->
- receive
- {sync_ready, Ref, SPid} -> SPid;
- {sync_deny, Ref, SPid} -> ignore;
- {'DOWN', MRef, _, SPid, _} -> ignore
- end.
-
-
-sync_send_complete(SPid, _MRef, Ref) ->
- SPid ! {sync_complete, Ref}.
-
%% Syncer
%% ---------------------------------------------------------------------------
%% Slave