diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-30 17:05:54 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-08-30 17:05:54 +0100 |
commit | d22b4dd1b543bc969a370178bd0d2e44c616ff51 (patch) | |
tree | 8b51e6922856ae41e93521a8297855ba9fade59c | |
parent | 2b1256803a3f841c7b374c006c8ab98091f5c534 (diff) | |
download | rabbitmq-server-d22b4dd1b543bc969a370178bd0d2e44c616ff51.tar.gz |
take into account requeues when setting synch state for slaves
To do this, keep count all the fetches we've seen that require an ack
for messages we don't have (e.g. when the queue we have is shorter than
on the master).
We then decrease this counter appropriately when requeueing, acking, and
set_length'ing.
Given this, we can deem the slave synced only when the length is the same
*and* the counter described above is 9 - there are no pending acks
on the master for messages we don't have.
I might have missed something (I barely tested this) but it seems to do the
trick.
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 115 |
2 files changed, 65 insertions, 58 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 477449e3..094b83c9 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) -> purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {set_length, 0, false}), + ok = gm:broadcast(GM, {set_length, 0, BQ:len(BQS), false}), {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. @@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired, Len = BQ:len(BQS), {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), - ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, + ok = gm:broadcast(GM, {set_length, Len1, Dropped, AckRequired}), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), {Next, Msgs, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 } }. @@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, BQS1} = BQ:ack(AckTags, BQS), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)}) end, AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, @@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - ok = gm:broadcast(GM, {requeue, MsgIds}), + ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}), {MsgIds, State #state { backing_queue_state = BQS1 }}. len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ef43d96e..2c60acf0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -77,7 +77,8 @@ msg_id_status, known_senders, - synchronised + synchronised, + external_pending }). start_link(Q) -> @@ -131,7 +132,8 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false + synchronised = false, + external_pending = 0 }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -809,71 +811,67 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, AckRequired}, +process_instruction({set_length, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, {ok, - case ToDrop >= 0 of - true -> - State1 = - lists:foldl( - fun (const, StateN = #state {backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _IsDelivered, AckTag, - _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), - maybe_store_ack( - AckRequired, MsgId, AckTag, - StateN #state { backing_queue_state = BQSN1 }) - end, State, lists:duplicate(ToDrop, const)), - set_synchronised(true, State1); - false -> - State - end}; + set_synchronised( + Length, + case ToDrop >= 0 of + true -> + State1 = + lists:foldl( + fun (const, StateN = #state{backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _, AckTag, _}, + BQSN1} = BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + case AckRequired of + true -> set_synchronised(ToDrop, Dropped, Length, State1); + false -> State1 + end; + false -> + State + end)}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + external_pending = ExtPending }) -> QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> + {ok, case {QLen - 1, AckRequired} of + {Remaining, _} -> {{#basic_message{id = MsgId}, _IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); - Other when Other + 1 =:= Remaining -> - set_synchronised(true, State); - Other when Other < Remaining -> - %% we must be shorter than the master - State + {_, false} when QLen =< Remaining -> + set_synchronised(Remaining, State); + {_, true} when QLen =< Remaining -> + State #state { external_pending = ExtPending + 1} end}; -process_instruction({ack, MsgIds}, +process_instruction({ack, MsgIds, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }}; -process_instruction({requeue, MsgIds}, + {ok, set_synchronised(length(AckTags), length(MsgIds), Length, + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; +process_instruction({requeue, MsgIds, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {ok, case length(AckTags) =:= length(MsgIds) of - true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }; - false -> - %% The only thing we can safely do is nuke out our BQ - %% and MA. The interaction between this and confirms - %% doesn't really bear thinking about... - {_Count, BQS1} = BQ:purge(BQS), - {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), - State #state { msg_id_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + {ok, set_synchronised(length(AckTags), length(MsgIds), Length, + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -891,10 +889,8 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; +process_instruction({length, Length}, State) -> + {ok, set_synchronised(Length, State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -913,9 +909,6 @@ msg_ids_to_acktags(MsgIds, MA) -> end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. -ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). - maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, @@ -923,9 +916,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. +set_synchronised(LocalPending, RemotePending, Length, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + external_pending = ExtPending }) -> + ExtPending1 = ExtPending - (RemotePending - LocalPending), + State1 = State #state { external_pending = ExtPending1 }, + case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of + true -> set_synchronised1(true, State1); + false when ExtPending1 >= 0 -> set_synchronised1(false, State1) + end. + +set_synchronised(Length, State) -> + set_synchronised(0, 0, Length, State). + %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, +set_synchronised1(true, State = #state { q = #amqqueue { name = QName }, synchronised = false }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( @@ -939,7 +946,7 @@ set_synchronised(true, State = #state { q = #amqqueue { name = QName }, end end), State #state { synchronised = true }; -set_synchronised(true, State) -> +set_synchronised1(true, State) -> State; -set_synchronised(false, State = #state { synchronised = false }) -> +set_synchronised1(false, State = #state { synchronised = false }) -> State. |