diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-03 22:05:17 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-03 22:05:17 +0000 |
commit | 192ff326d82e601c0ebc1ebcdcc0d4411fda08eb (patch) | |
tree | a3f8d1c80f7135fb5193c3f4b3f94a6dae9b7230 | |
parent | 4d137f9a7b05d2a994e8e1a8d0da2f7ee9d12b65 (diff) | |
download | rabbitmq-server-192ff326d82e601c0ebc1ebcdcc0d4411fda08eb.tar.gz |
make credit waiting less brittle
We were relying on running out of credit for all slaves
*simultaneously*, which requires in-depth knowledge of the credit flow
logic and that no other credit-requiring messages are sent to a slave
prior to this.
Fortunately, since we are running in a fresh separate process we can
simply handle *any* credit bumping message and *any* DOWN message.
As a bonus we can revert to making the type of the bump msg opaque.
-rw-r--r-- | src/credit_flow.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 21 |
2 files changed, 11 insertions, 12 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index dff339fc..102c353f 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -37,7 +37,7 @@ -ifdef(use_specs). --type(bump_msg() :: {pid(), non_neg_integer()}). +-opaque(bump_msg() :: {pid(), non_neg_integer()}). -type(credit_spec() :: {non_neg_integer(), non_neg_integer()}). -spec(send/1 :: (pid()) -> 'ok'). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index ac03ca8d..8c561d1c 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -148,7 +148,7 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> MPid ! {next, Ref}, receive {msg, Ref, Msg, MsgProps} -> - SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref), + SPidsMRefs1 = wait_for_credit(SPidsMRefs), [begin credit_flow:send(SPid), SPid ! {sync_msg, Ref, Msg, MsgProps} @@ -158,10 +158,16 @@ syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> SPidsMRefs end. -wait_for_credit(SPidsMRefs, Ref) -> +wait_for_credit(SPidsMRefs) -> case credit_flow:blocked() of - true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref, - fun sync_receive_credit/3), Ref); + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + wait_for_credit(SPidsMRefs); + {'DOWN', MRef, _, SPid, _} -> + credit_flow:peer_down(SPid), + wait_for_credit(lists:delete({SPid, MRef}, SPidsMRefs)) + end; false -> SPidsMRefs end. @@ -176,13 +182,6 @@ sync_receive_ready(SPid, MRef, Ref) -> {'DOWN', MRef, _, SPid, _} -> ignore end. -sync_receive_credit(SPid, MRef, _Ref) -> - receive - {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), - SPid; - {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), - ignore - end. sync_send_complete(SPid, _MRef, Ref) -> SPid ! {sync_complete, Ref}. |