summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 18:45:37 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 18:45:37 +0100
commit607f4a55eaa410448736f5d356c48799fac6af1f (patch)
tree31693d18af638a8062bef0ba67d0e4c9a101f062
parent35d6af87ce356f83422d692aa3de62e4dd85702c (diff)
downloadrabbitmq-server-607f4a55eaa410448736f5d356c48799fac6af1f.tar.gz
`set_synchronized' => `update_unknown_pending'
-rw-r--r--src/rabbit_mirror_queue_slave.erl44
1 files changed, 22 insertions, 22 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3423295a..8e872d4e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -814,7 +814,7 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, Dropped, AckRequired},
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
@@ -831,8 +831,8 @@ process_instruction({set_length, Length, Dropped, AckRequired},
StateN #state { backing_queue_state = BQSN1 })
end, State, lists:duplicate(ToDrop, const)),
{ok, case AckRequired of
- true -> set_synchronised(Dropped - ToDrop, Length, State1);
- false -> set_synchronised(Length, State1)
+ true -> update_unknown_pending(Dropped - ToDrop, Length, State1);
+ false -> update_unknown_pending(Length, State1)
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
@@ -845,9 +845,9 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
{_, false} when QLen =< Remaining ->
- set_synchronised(Remaining, State);
+ update_unknown_pending(Remaining, State);
{_, true} when QLen =< Remaining ->
- set_synchronised(1, Remaining, State)
+ update_unknown_pending(1, Remaining, State)
end};
process_instruction({ack, MsgIds, Length},
State = #state { backing_queue = BQ,
@@ -856,18 +856,18 @@ process_instruction({ack, MsgIds, Length},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, set_synchronised(length(AckTags) - length(MsgIds), Length,
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 })};
+ {ok, update_unknown_pending(length(AckTags) - length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- {ok, set_synchronised(length(AckTags) - length(MsgIds), Length,
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 })};
+ {ok, update_unknown_pending(length(AckTags) - length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -886,8 +886,8 @@ process_instruction({sender_death, ChPid},
known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({length, Length, ExtPending}, State) ->
- {ok, set_synchronised(Length,
- State #state { unknown_pending = ExtPending })};
+ {ok, update_unknown_pending(Length,
+ State #state { unknown_pending = ExtPending })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -913,25 +913,25 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(_, _, State = #state { unknown_pending = undefined }) ->
+update_unknown_pending(_, _, State = #state { unknown_pending = undefined }) ->
State;
-set_synchronised(PendingDelta, Length,
+update_unknown_pending(PendingDelta, Length,
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
unknown_pending = ExtPending }) ->
ExtPending1 = ExtPending + PendingDelta,
State1 = State #state { unknown_pending = ExtPending1 },
case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of
- true -> set_synchronised1(true, State1);
- false when ExtPending1 >= 0 -> set_synchronised1(false, State1)
+ true -> set_synchronised(true, State1);
+ false when ExtPending1 >= 0 -> set_synchronised(false, State1)
end.
-set_synchronised(Length, State) ->
- set_synchronised(0, Length, State).
+update_unknown_pending(Length, State) ->
+ update_unknown_pending(0, Length, State).
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
-set_synchronised1(true, State = #state { q = #amqqueue { name = QName },
+set_synchronised(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
@@ -945,7 +945,7 @@ set_synchronised1(true, State = #state { q = #amqqueue { name = QName },
end
end),
State #state { synchronised = true };
-set_synchronised1(true, State) ->
+set_synchronised(true, State) ->
State;
-set_synchronised1(false, State = #state { synchronised = false }) ->
+set_synchronised(false, State = #state { synchronised = false }) ->
State.