diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-25 18:27:09 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-25 18:27:09 +0100 |
commit | 42e2c535eaaa385c3175d7b6aff37e2b83f2c721 (patch) | |
tree | c3124083f320513cb18cf93db8e02be945a7c4a2 | |
parent | 6cdf3e7e0a198f969808268f396ebb0a98440068 (diff) | |
download | rabbitmq-server-42e2c535eaaa385c3175d7b6aff37e2b83f2c721.tar.gz |
Rename set_synchronised to update_delta. And remove that nasty little arithmetic-based hack. We should be explicit about the fact that we treat depth messages from the master differently when we don't have a delta.
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4d53bd1b..7c3aef7a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -800,7 +800,7 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State, lists:duplicate(ToDrop, const)), {ok, case AckRequired of true -> State1; - false -> set_synchronised(ToDrop - Dropped, State1) + false -> update_delta(ToDrop - Dropped, State1) end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, @@ -815,7 +815,7 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, _ when QLen =< Remaining andalso AckRequired -> State; _ when QLen =< Remaining -> - set_synchronised(-1, State) + update_delta(-1, State) end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, @@ -824,9 +824,9 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(length(MsgIds1) - length(MsgIds), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 })}; + {ok, update_delta(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -853,15 +853,10 @@ process_instruction({sender_death, ChPid}, known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({depth, Depth}, - State = #state { depth_delta = D, - backing_queue = BQ, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - D1 = case D of - undefined -> 1; %% anything but 0 will do here - _ -> D - end, - {ok, set_synchronised(Depth - BQ:depth(BQS) - D1, - State #state { depth_delta = D1 })}; + {ok, update_delta_from_master(Depth - BQ:depth(BQS), State)}; + process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -887,9 +882,18 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(_DeltaChange, State = #state { depth_delta = undefined }) -> +update_delta_from_master(NewDelta, State = #state{depth_delta = undefined}) -> + case NewDelta of + 0 -> ok = record_synchronised(State#state.q), + State #state { depth_delta = 0 }; + N when N > 0 -> State #state { depth_delta = N } + end; +update_delta_from_master(DeltaChange, State) -> + update_delta(DeltaChange, State). + +update_delta(_DeltaChange, State = #state { depth_delta = undefined }) -> State; -set_synchronised(DeltaChange, State = #state { depth_delta = Delta }) -> +update_delta(DeltaChange, State = #state { depth_delta = Delta }) -> %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. case {Delta, Delta + DeltaChange} of |