diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 18:45:37 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-31 18:45:37 +0100 |
commit | 607f4a55eaa410448736f5d356c48799fac6af1f (patch) | |
tree | 31693d18af638a8062bef0ba67d0e4c9a101f062 | |
parent | 35d6af87ce356f83422d692aa3de62e4dd85702c (diff) | |
download | rabbitmq-server-607f4a55eaa410448736f5d356c48799fac6af1f.tar.gz |
`set_synchronized' => `update_unknown_pending'
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 44 |
1 files changed, 22 insertions, 22 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3423295a..8e872d4e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -814,7 +814,7 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, Dropped, AckRequired}, +process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), @@ -831,8 +831,8 @@ process_instruction({set_length, Length, Dropped, AckRequired}, StateN #state { backing_queue_state = BQSN1 }) end, State, lists:duplicate(ToDrop, const)), {ok, case AckRequired of - true -> set_synchronised(Dropped - ToDrop, Length, State1); - false -> set_synchronised(Length, State1) + true -> update_unknown_pending(Dropped - ToDrop, Length, State1); + false -> update_unknown_pending(Length, State1) end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, @@ -845,9 +845,9 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); {_, false} when QLen =< Remaining -> - set_synchronised(Remaining, State); + update_unknown_pending(Remaining, State); {_, true} when QLen =< Remaining -> - set_synchronised(1, Remaining, State) + update_unknown_pending(1, Remaining, State) end}; process_instruction({ack, MsgIds, Length}, State = #state { backing_queue = BQ, @@ -856,18 +856,18 @@ process_instruction({ack, MsgIds, Length}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(length(AckTags) - length(MsgIds), Length, - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 })}; + {ok, update_unknown_pending(length(AckTags) - length(MsgIds), Length, + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - {ok, set_synchronised(length(AckTags) - length(MsgIds), Length, - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 })}; + {ok, update_unknown_pending(length(AckTags) - length(MsgIds), Length, + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -886,8 +886,8 @@ process_instruction({sender_death, ChPid}, known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({length, Length, ExtPending}, State) -> - {ok, set_synchronised(Length, - State #state { unknown_pending = ExtPending })}; + {ok, update_unknown_pending(Length, + State #state { unknown_pending = ExtPending })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -913,25 +913,25 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(_, _, State = #state { unknown_pending = undefined }) -> +update_unknown_pending(_, _, State = #state { unknown_pending = undefined }) -> State; -set_synchronised(PendingDelta, Length, +update_unknown_pending(PendingDelta, Length, State = #state { backing_queue = BQ, backing_queue_state = BQS, unknown_pending = ExtPending }) -> ExtPending1 = ExtPending + PendingDelta, State1 = State #state { unknown_pending = ExtPending1 }, case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of - true -> set_synchronised1(true, State1); - false when ExtPending1 >= 0 -> set_synchronised1(false, State1) + true -> set_synchronised(true, State1); + false when ExtPending1 >= 0 -> set_synchronised(false, State1) end. -set_synchronised(Length, State) -> - set_synchronised(0, Length, State). +update_unknown_pending(Length, State) -> + update_unknown_pending(0, Length, State). %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. -set_synchronised1(true, State = #state { q = #amqqueue { name = QName }, +set_synchronised(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( @@ -945,7 +945,7 @@ set_synchronised1(true, State = #state { q = #amqqueue { name = QName }, end end), State #state { synchronised = true }; -set_synchronised1(true, State) -> +set_synchronised(true, State) -> State; -set_synchronised1(false, State = #state { synchronised = false }) -> +set_synchronised(false, State = #state { synchronised = false }) -> State. |