diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-03 16:00:40 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-09-03 16:00:40 +0100 |
commit | 379d46dfab4175ed27121283839ffc8763a3f729 (patch) | |
tree | 0b66b369e39048dc8fc0833c1db23606a3e111d0 | |
parent | 8cddf06bea147e94230e8a75a3f460c178960ac9 (diff) | |
download | rabbitmq-server-379d46dfab4175ed27121283839ffc8763a3f729.tar.gz |
store the depth of master and slave instead of the unknown pending msgs
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 17 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 144 |
2 files changed, 92 insertions, 69 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 1151fd76..62109dae 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), + ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, BQS1} = BQ:ack(AckTags, BQS), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)}) + _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, @@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}), + ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {length, Len, BQ:pending_ack(BQS)}), + ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -407,7 +407,7 @@ length_fun() -> backing_queue = BQ, backing_queue_state = BQS }) -> ok = gm:broadcast( - GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}), + GM, {depth, depth(BQ, BQS)}), State end) end. @@ -425,3 +425,10 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid, CPid, [ChPid]), State #state { known_senders = sets:add_element(ChPid, KS) } end. + +%% --------------------------------------------------------------------------- +%% Internal exports +%% --------------------------------------------------------------------------- + +depth(BQ, BQS) -> + BQ:len(BQS) + BQ:pending_ack(BQS). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 5111f46a..5e0907be 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -77,11 +77,10 @@ msg_id_status, known_senders, - synchronised, - %% Records the pending acks on the master for messages that we - %% do not have. This is necessary to accurately determine - %% whether the slave is synchronised or not. - unknown_pending + %% The depth is the BQ len + the number of messages pending + %% acks. + depth, + master_depth }). start_link(Q) -> @@ -135,8 +134,8 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false, - unknown_pending = undefined + depth = 0, + master_depth = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -396,7 +395,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; i(master_pid, #state { master_pid = MPid }) -> MPid; -i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(is_synchronised, #state { depth = D, master_depth = MD }) -> D =:= MD; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> @@ -771,18 +770,22 @@ process_instruction( SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, - - {ok, - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State2 #state { backing_queue_state = BQS1 }; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, - ChPid, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State2 #state { backing_queue_state = BQS1 }) - end}; + {State3, Delta} = + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + {State2 #state { backing_queue_state = BQS1 }, 1}; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered( + AckRequired, Msg, MsgProps, ChPid, BQS), + {maybe_store_ack(AckRequired, MsgId, AckTag, + State2 #state {backing_queue_state = BQS1}), + case AckRequired of + true -> 1; + false -> 1 + end} + end, + {ok, set_synchronised(Delta, Delta, State3)}; process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, State = #state { sender_queues = SQ, backing_queue = BQ, @@ -831,43 +834,49 @@ process_instruction({set_length, Length, Dropped, AckRequired}, StateN #state { backing_queue_state = BQSN1 }) end, State, lists:duplicate(ToDrop, const)), {ok, case AckRequired of - true -> set_synchronised(Dropped - ToDrop, Length, State1); - false -> set_synchronised(Length, State1) + true -> State1; + false -> set_synchronised(-ToDrop, -Dropped, State1) end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {ok, case {QLen - 1, AckRequired} of - {Remaining, _} -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - {_, false} when QLen =< Remaining -> - set_synchronised(Remaining, State); - {_, true} when QLen =< Remaining -> - set_synchronised(1, Remaining, State) - end}; -process_instruction({ack, MsgIds, Length}, + {State1, {Delta, MasterDelta}} = + case {QLen - 1} of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + {maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }), + case AckRequired of + true -> {0, 0}; + false -> {-1, -1} + end}; + _ when QLen =< Remaining -> + {State, case AckRequired of + true -> {0, 0}; + false -> {0, -1} + end} + end, + {ok, set_synchronised(Delta, MasterDelta, State1)}; +process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, set_synchronised(length(AckTags) - length(MsgIds), Length, + {ok, set_synchronised(-length(AckTags), -length(MsgIds), State #state { msg_id_ack = MA1, backing_queue_state = BQS1 })}; -process_instruction({requeue, MsgIds, Length}, +process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - {ok, set_synchronised(length(AckTags) - length(MsgIds), Length, - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 })}; + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -885,8 +894,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length, Pend}, State) -> - {ok, set_synchronised(Length, State #state { unknown_pending = Pend })}; + +process_instruction({depth, Depth}, State) -> + {ok, set_synchronised(0, 0, true, State #state { master_depth = Depth })}; + process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -912,37 +923,42 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -set_synchronised(Length, State) -> set_synchronised(0, Length, State). - -set_synchronised(_, _, State = #state { unknown_pending = undefined }) -> - State; -set_synchronised(PendingDelta, Length, - State = #state { backing_queue = BQ, - backing_queue_state = BQS, - unknown_pending = Pend, - synchronised = Sync}) -> - Pend1 = Pend + PendingDelta, - true = Pend1 >= 0, - State1 = State #state { unknown_pending = Pend1 }, +set_synchronised(Delta, MasterDelta, State) -> + set_synchronised(Delta, MasterDelta, false, State). + +set_synchronised(Delta, _MasterDelta, _AddAnyway, + State = #state { depth = Depth, + master_depth = undefined }) -> + State #state { depth = Depth + Delta }; +set_synchronised(Delta, MasterDelta, AddAnyway, + State = #state { depth = Depth, + master_depth = MasterDepth, + q = #amqqueue { name = QName }}) -> + Depth1 = Depth + Delta, + MasterDepth1 = MasterDepth + MasterDelta, + io:format("depths: local depths ~p ~p master depth ~p ~p~n", [Depth, Depth1, MasterDepth, MasterDepth1]), %% We intentionally leave out the head where a slave becomes %% unsynchronised: we assert that can never happen. - case {Sync, Pend1 =:= 0 andalso Length =:= BQ:len(BQS)} of - {true, true} -> - State1; - {false, false} -> - State1; - {false, true} -> + %% The `AddAnyway' param is there since in the `depth' instruction we + %% receive the master depth for the first time, and we want to set the sync + %% state anyway if we are synced. + case {Depth =:= MasterDepth, Depth1 =:= MasterDepth1} of + {WasSync, true} when not WasSync orelse AddAnyway -> Self = self(), - #state{ q = #amqqueue { name = QName } } = State1, rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_queue, QName}) of [] -> ok; [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + %% We might be there already, in the `AddAnyway' + %% case + SSPids1 = SSPids -- [Self], rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{sync_slave_pids = [Self | SSPids]}) + Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) end - end), - State1 #state { synchronised = true } - end. + end); + {Same, Same} -> + ok + end, + State #state { depth = Depth1, master_depth = MasterDepth1 }. |