diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-17 12:12:48 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-17 12:12:48 +0100 |
commit | 9e744ff212999ee6e4244504ffd4878334c7846a (patch) | |
tree | 71024110776f76c12d3cf9f7664239a0dc6ea064 | |
parent | 990c53d772565fc6967b1cad17587bcc1e82b153 (diff) | |
download | rabbitmq-server-9e744ff212999ee6e4244504ffd4878334c7846a.tar.gz |
Ensure that when a slave gets promoted, it requeues msgs in the same order which they were fetched
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 40 |
1 files changed, 21 insertions, 19 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index cceb67e2..052078bd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -60,6 +60,7 @@ sender_queues, %% :: Pid -> MsgQ msg_id_ack, %% :: MsgId -> AckTag + ack_num, msg_id_status }). @@ -108,6 +109,8 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), msg_id_ack = dict:new(), + ack_num = 0, + msg_id_status = dict:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -456,7 +459,8 @@ promote_me(From, #state { q = Q, MTC = dict:from_list( [{MsgId, {ChPid, MsgSeqNo}} || {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), - AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], + NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], + AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ), {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( @@ -568,7 +572,6 @@ process_instruction( State = #state { sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, - msg_id_ack = MA, msg_id_status = MS }) -> %% We really are going to do the publish right now, even though we @@ -628,12 +631,8 @@ process_instruction( {true, AckRequired} -> {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), - MA1 = case AckRequired of - true -> dict:store(MsgId, AckTag, MA); - false -> MA - end, - State1 #state { backing_queue_state = BQS1, - msg_id_ack = MA1 } + maybe_store_ack(AckRequired, MsgId, AckTag, + State1 #state { backing_queue_state = BQS1 }) end}; process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, State = #state { sender_queues = SQ, @@ -688,19 +687,14 @@ process_instruction({set_length, Length}, end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, - backing_queue_state = BQS, - msg_id_ack = MA }) -> + backing_queue_state = BQS }) -> QLen = BQ:len(BQS), {ok, case QLen - 1 of Remaining -> {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - MA1 = case AckRequired of - true -> dict:store(MsgId, AckTag, MA); - false -> MA - end, - State #state { backing_queue_state = BQS1, - msg_id_ack = MA1 }; + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); Other when Other < Remaining -> %% we must be shorter than the master State @@ -744,11 +738,19 @@ msg_ids_to_acktags(MsgIds, MA) -> lists:foldl( fun (MsgId, {Acc, MAN}) -> case dict:find(MsgId, MA) of - error -> {Acc, MAN}; - {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} + error -> {Acc, MAN}; + {ok, {_Num, AckTag}} -> {[AckTag | Acc], + dict:erase(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], BQS). + BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). + +maybe_store_ack(false, _MsgId, _AckTag, State) -> + State; +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, + ack_num = Num }) -> + State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), + ack_num = Num + 1 }. |