summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-03 22:05:17 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-03 22:05:17 +0000
commit192ff326d82e601c0ebc1ebcdcc0d4411fda08eb (patch)
treea3f8d1c80f7135fb5193c3f4b3f94a6dae9b7230
parent4d137f9a7b05d2a994e8e1a8d0da2f7ee9d12b65 (diff)
downloadrabbitmq-server-192ff326d82e601c0ebc1ebcdcc0d4411fda08eb.tar.gz
make credit waiting less brittle
We were relying on running out of credit for all slaves *simultaneously*, which requires in-depth knowledge of the credit flow logic and that no other credit-requiring messages are sent to a slave prior to this. Fortunately, since we are running in a fresh separate process we can simply handle *any* credit bumping message and *any* DOWN message. As a bonus we can revert to making the type of the bump msg opaque.
-rw-r--r--src/credit_flow.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl21
2 files changed, 11 insertions, 12 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index dff339fc..102c353f 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -37,7 +37,7 @@
-ifdef(use_specs).
--type(bump_msg() :: {pid(), non_neg_integer()}).
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
-spec(send/1 :: (pid()) -> 'ok').
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index ac03ca8d..8c561d1c 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -148,7 +148,7 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
MPid ! {next, Ref},
receive
{msg, Ref, Msg, MsgProps} ->
- SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref),
+ SPidsMRefs1 = wait_for_credit(SPidsMRefs),
[begin
credit_flow:send(SPid),
SPid ! {sync_msg, Ref, Msg, MsgProps}
@@ -158,10 +158,16 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
SPidsMRefs
end.
-wait_for_credit(SPidsMRefs, Ref) ->
+wait_for_credit(SPidsMRefs) ->
case credit_flow:blocked() of
- true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref,
- fun sync_receive_credit/3), Ref);
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ wait_for_credit(SPidsMRefs);
+ {'DOWN', MRef, _, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ wait_for_credit(lists:delete({SPid, MRef}, SPidsMRefs))
+ end;
false -> SPidsMRefs
end.
@@ -176,13 +182,6 @@ sync_receive_ready(SPid, MRef, Ref) ->
{'DOWN', MRef, _, SPid, _} -> ignore
end.
-sync_receive_credit(SPid, MRef, _Ref) ->
- receive
- {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
- SPid;
- {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
- ignore
- end.
sync_send_complete(SPid, _MRef, Ref) ->
SPid ! {sync_complete, Ref}.