summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-17 12:12:48 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-17 12:12:48 +0100
commit9e744ff212999ee6e4244504ffd4878334c7846a (patch)
tree71024110776f76c12d3cf9f7664239a0dc6ea064
parent990c53d772565fc6967b1cad17587bcc1e82b153 (diff)
downloadrabbitmq-server-9e744ff212999ee6e4244504ffd4878334c7846a.tar.gz
Ensure that when a slave gets promoted, it requeues msgs in the same order which they were fetched
-rw-r--r--src/rabbit_mirror_queue_slave.erl40
1 files changed, 21 insertions, 19 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index cceb67e2..052078bd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -60,6 +60,7 @@
sender_queues, %% :: Pid -> MsgQ
msg_id_ack, %% :: MsgId -> AckTag
+ ack_num,
msg_id_status
}).
@@ -108,6 +109,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
+ ack_num = 0,
+
msg_id_status = dict:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -456,7 +459,8 @@ promote_me(From, #state { q = Q,
MTC = dict:from_list(
[{MsgId, {ChPid, MsgSeqNo}} ||
{MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
- AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
+ NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
+ AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ),
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
@@ -568,7 +572,6 @@ process_instruction(
State = #state { sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
- msg_id_ack = MA,
msg_id_status = MS }) ->
%% We really are going to do the publish right now, even though we
@@ -628,12 +631,8 @@ process_instruction(
{true, AckRequired} ->
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
- MA1 = case AckRequired of
- true -> dict:store(MsgId, AckTag, MA);
- false -> MA
- end,
- State1 #state { backing_queue_state = BQS1,
- msg_id_ack = MA1 }
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State1 #state { backing_queue_state = BQS1 })
end};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
@@ -688,19 +687,14 @@ process_instruction({set_length, Length},
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_ack = MA }) ->
+ backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
{ok, case QLen - 1 of
Remaining ->
{{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
BQ:fetch(AckRequired, BQS),
- MA1 = case AckRequired of
- true -> dict:store(MsgId, AckTag, MA);
- false -> MA
- end,
- State #state { backing_queue_state = BQS1,
- msg_id_ack = MA1 };
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 });
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -744,11 +738,19 @@ msg_ids_to_acktags(MsgIds, MA) ->
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {Acc, MAN};
- {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
+ error -> {Acc, MAN};
+ {ok, {_Num, AckTag}} -> {[AckTag | Acc],
+ dict:erase(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], BQS).
+ BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
+
+maybe_store_ack(false, _MsgId, _AckTag, State) ->
+ State;
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
+ ack_num = Num }) ->
+ State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
+ ack_num = Num + 1 }.