diff options
authorFrancesco Mazzoli <>2012-09-03 16:00:40 +0100
committerFrancesco Mazzoli <>2012-09-03 16:00:40 +0100
commit379d46dfab4175ed27121283839ffc8763a3f729 (patch)
parent8cddf06bea147e94230e8a75a3f460c178960ac9 (diff)
store the depth of master and slave instead of the unknown pending msgs
2 files changed, 92 insertions, 69 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 1151fd76..62109dae 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}),
+ ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)})
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds})
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
@@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}),
+ ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
@@ -375,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len, BQ:pending_ack(BQS)}),
+ ok = gm:broadcast(GM, {depth, depth(BQ, BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -407,7 +407,7 @@ length_fun() ->
backing_queue = BQ,
backing_queue_state = BQS }) ->
ok = gm:broadcast(
- GM, {length, BQ:len(BQS), BQ:pending_ack(BQS)}),
+ GM, {depth, depth(BQ, BQS)}),
@@ -425,3 +425,10 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid,
CPid, [ChPid]),
State #state { known_senders = sets:add_element(ChPid, KS) }
+%% ---------------------------------------------------------------------------
+%% Internal exports
+%% ---------------------------------------------------------------------------
+depth(BQ, BQS) ->
+ BQ:len(BQS) + BQ:pending_ack(BQS).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 5111f46a..5e0907be 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -77,11 +77,10 @@
- synchronised,
- %% Records the pending acks on the master for messages that we
- %% do not have. This is necessary to accurately determine
- %% whether the slave is synchronised or not.
- unknown_pending
+ %% The depth is the BQ len + the number of messages pending
+ %% acks.
+ depth,
+ master_depth
start_link(Q) ->
@@ -135,8 +134,8 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false,
- unknown_pending = undefined
+ depth = 0,
+ master_depth = undefined
infos(?CREATION_EVENT_KEYS, State)),
@@ -396,7 +395,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(is_synchronised, #state { depth = D, master_depth = MD }) -> D =:= MD;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -771,18 +770,22 @@ process_instruction(
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
- {ok,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
- end};
+ {State3, Delta} =
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ {State2 #state { backing_queue_state = BQS1 }, 1};
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(
+ AckRequired, Msg, MsgProps, ChPid, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State2 #state {backing_queue_state = BQS1}),
+ case AckRequired of
+ true -> 1;
+ false -> 1
+ end}
+ end,
+ {ok, set_synchronised(Delta, Delta, State3)};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
backing_queue = BQ,
@@ -831,43 +834,49 @@ process_instruction({set_length, Length, Dropped, AckRequired},
StateN #state { backing_queue_state = BQSN1 })
end, State, lists:duplicate(ToDrop, const)),
{ok, case AckRequired of
- true -> set_synchronised(Dropped - ToDrop, Length, State1);
- false -> set_synchronised(Length, State1)
+ true -> State1;
+ false -> set_synchronised(-ToDrop, -Dropped, State1)
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {ok, case {QLen - 1, AckRequired} of
- {Remaining, _} ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- {_, false} when QLen =< Remaining ->
- set_synchronised(Remaining, State);
- {_, true} when QLen =< Remaining ->
- set_synchronised(1, Remaining, State)
- end};
-process_instruction({ack, MsgIds, Length},
+ {State1, {Delta, MasterDelta}} =
+ case {QLen - 1} of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 }),
+ case AckRequired of
+ true -> {0, 0};
+ false -> {-1, -1}
+ end};
+ _ when QLen =< Remaining ->
+ {State, case AckRequired of
+ true -> {0, 0};
+ false -> {0, -1}
+ end}
+ end,
+ {ok, set_synchronised(Delta, MasterDelta, State1)};
+process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(length(AckTags) - length(MsgIds), Length,
+ {ok, set_synchronised(-length(AckTags), -length(MsgIds),
State #state { msg_id_ack = MA1,
backing_queue_state = BQS1 })};
-process_instruction({requeue, MsgIds, Length},
+process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- {ok, set_synchronised(length(AckTags) - length(MsgIds), Length,
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 })};
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -885,8 +894,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
-process_instruction({length, Length, Pend}, State) ->
- {ok, set_synchronised(Length, State #state { unknown_pending = Pend })};
+process_instruction({depth, Depth}, State) ->
+ {ok, set_synchronised(0, 0, true, State #state { master_depth = Depth })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -912,37 +923,42 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(Length, State) -> set_synchronised(0, Length, State).
-set_synchronised(_, _, State = #state { unknown_pending = undefined }) ->
- State;
-set_synchronised(PendingDelta, Length,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- unknown_pending = Pend,
- synchronised = Sync}) ->
- Pend1 = Pend + PendingDelta,
- true = Pend1 >= 0,
- State1 = State #state { unknown_pending = Pend1 },
+set_synchronised(Delta, MasterDelta, State) ->
+ set_synchronised(Delta, MasterDelta, false, State).
+set_synchronised(Delta, _MasterDelta, _AddAnyway,
+ State = #state { depth = Depth,
+ master_depth = undefined }) ->
+ State #state { depth = Depth + Delta };
+set_synchronised(Delta, MasterDelta, AddAnyway,
+ State = #state { depth = Depth,
+ master_depth = MasterDepth,
+ q = #amqqueue { name = QName }}) ->
+ Depth1 = Depth + Delta,
+ MasterDepth1 = MasterDepth + MasterDelta,
+ io:format("depths: local depths ~p ~p master depth ~p ~p~n", [Depth, Depth1, MasterDepth, MasterDepth1]),
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
- case {Sync, Pend1 =:= 0 andalso Length =:= BQ:len(BQS)} of
- {true, true} ->
- State1;
- {false, false} ->
- State1;
- {false, true} ->
+ %% The `AddAnyway' param is there since in the `depth' instruction we
+ %% receive the master depth for the first time, and we want to set the sync
+ %% state anyway if we are synced.
+ case {Depth =:= MasterDepth, Depth1 =:= MasterDepth1} of
+ {WasSync, true} when not WasSync orelse AddAnyway ->
Self = self(),
- #state{ q = #amqqueue { name = QName } } = State1,
fun () ->
case mnesia:read({rabbit_queue, QName}) of
[] ->
[Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ %% We might be there already, in the `AddAnyway'
+ %% case
+ SSPids1 = SSPids -- [Self],
- Q1#amqqueue{sync_slave_pids = [Self | SSPids]})
+ Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
- end),
- State1 #state { synchronised = true }
- end.
+ end);
+ {Same, Same} ->
+ ok
+ end,
+ State #state { depth = Depth1, master_depth = MasterDepth1 }.