summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-30 17:05:54 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-30 17:05:54 +0100
commitd22b4dd1b543bc969a370178bd0d2e44c616ff51 (patch)
tree8b51e6922856ae41e93521a8297855ba9fade59c
parent2b1256803a3f841c7b374c006c8ab98091f5c534 (diff)
downloadrabbitmq-server-d22b4dd1b543bc969a370178bd0d2e44c616ff51.tar.gz
take into account requeues when setting synch state for slaves
To do this, keep count all the fetches we've seen that require an ack for messages we don't have (e.g. when the queue we have is shorter than on the master). We then decrease this counter appropriately when requeueing, acking, and set_length'ing. Given this, we can deem the slave synced only when the length is the same *and* the counter described above is 9 - there are no pending acks on the master for messages we don't have. I might have missed something (I barely tested this) but it seems to do the trick.
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl115
2 files changed, 65 insertions, 58 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 477449e3..094b83c9 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {set_length, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired,
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {set_length, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)})
end,
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
@@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- ok = gm:broadcast(GM, {requeue, MsgIds}),
+ ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ef43d96e..2c60acf0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -77,7 +77,8 @@
msg_id_status,
known_senders,
- synchronised
+ synchronised,
+ external_pending
}).
start_link(Q) ->
@@ -131,7 +132,8 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ synchronised = false,
+ external_pending = 0
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -809,71 +811,67 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({set_length, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
{ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ set_synchronised(
+ Length,
+ case ToDrop >= 0 of
+ true ->
+ State1 =
+ lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _},
+ BQSN1} = BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ case AckRequired of
+ true -> set_synchronised(ToDrop, Dropped, Length, State1);
+ false -> State1
+ end;
+ false ->
+ State
+ end)};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ external_pending = ExtPending }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
+ {ok, case {QLen - 1, AckRequired} of
+ {Remaining, _} ->
{{#basic_message{id = MsgId}, _IsDelivered,
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
+ {_, false} when QLen =< Remaining ->
+ set_synchronised(Remaining, State);
+ {_, true} when QLen =< Remaining ->
+ State #state { external_pending = ExtPending + 1}
end};
-process_instruction({ack, MsgIds},
+process_instruction({ack, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgIds},
+ {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
+process_instruction({requeue, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -891,10 +889,8 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+process_instruction({length, Length}, State) ->
+ {ok, set_synchronised(Length, State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -913,9 +909,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -923,9 +916,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+set_synchronised(LocalPending, RemotePending, Length,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ external_pending = ExtPending }) ->
+ ExtPending1 = ExtPending - (RemotePending - LocalPending),
+ State1 = State #state { external_pending = ExtPending1 },
+ case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of
+ true -> set_synchronised1(true, State1);
+ false when ExtPending1 >= 0 -> set_synchronised1(false, State1)
+ end.
+
+set_synchronised(Length, State) ->
+ set_synchronised(0, 0, Length, State).
+
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
+set_synchronised1(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
@@ -939,7 +946,7 @@ set_synchronised(true, State = #state { q = #amqqueue { name = QName },
end
end),
State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised1(true, State) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
+set_synchronised1(false, State = #state { synchronised = false }) ->
State.