diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-25 16:37:36 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-09-25 16:37:36 +0100 |
commit | f15ccf9f4289b61a48020241764894188d7a4c8a (patch) | |
tree | 1b3f4b360b50fe3ebff296c0a1512c73c86e9774 | |
parent | f5e77c697a33161543c7505bc1a62ba60e4e4b0c (diff) | |
download | rabbitmq-server-f15ccf9f4289b61a48020241764894188d7a4c8a.tar.gz |
take a stab at making set_synchronised less obscure
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 67 |
1 files changed, 30 insertions, 37 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b28ff6e2..472eff47 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -853,10 +853,15 @@ process_instruction({sender_death, ChPid}, known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({depth, Depth}, - State = #state { backing_queue = BQ, + State = #state { depth_delta = D, + backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised( - 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; + D1 = case D of + undefinded -> 1; %% anything but 0 will do here + _ -> D + end, + {ok, set_synchronised(Depth - BQ:depth(BQS) - D1, + State #state { depth_delta = D1 })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -882,38 +887,26 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(Delta, State) -> - set_synchronised(Delta, false, State). - -set_synchronised(_Delta, _AddAnyway, - State = #state { depth_delta = undefined }) -> +set_synchronised(_Delta, State = #state { depth_delta = undefined }) -> State; -set_synchronised(Delta, AddAnyway, - State = #state { depth_delta = DepthDelta, - q = #amqqueue { name = QName }}) -> - DepthDelta1 = DepthDelta + Delta, - %% We intentionally leave out the head where a slave becomes - %% unsynchronised: we assert that can never happen. - %% The `AddAnyway' param is there since in the `depth' instruction we - %% receive the master depth for the first time, and we want to set the sync - %% state anyway if we are synced. - case DepthDelta1 =:= 0 of - true when not (DepthDelta =:= 0) orelse AddAnyway -> - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - %% We might be there already, in the `AddAnyway' - %% case - SSPids1 = SSPids -- [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) - end - end); - _ when DepthDelta1 >= 0 -> - ok - end, - State #state { depth_delta = DepthDelta1 }. +set_synchronised(Delta, State = #state { depth_delta = D }) -> + case D + Delta of + 0 when D == 0 -> State; + 0 when D =/= 0 -> ok = record_synchronised(State#state.q), + State #state { depth_delta = 0 }; + N when D =/= 0 andalso N > 0 -> State #state { depth_delta = N } + end. + +record_synchronised(#amqqueue { name = QName }) -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q = #amqqueue { sync_slave_pids = SSPids }] -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { sync_slave_pids = [Self | SSPids] }), + ok + end + end). |