summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-09-25 16:37:36 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-09-25 16:37:36 +0100
commitf15ccf9f4289b61a48020241764894188d7a4c8a (patch)
tree1b3f4b360b50fe3ebff296c0a1512c73c86e9774
parentf5e77c697a33161543c7505bc1a62ba60e4e4b0c (diff)
downloadrabbitmq-server-f15ccf9f4289b61a48020241764894188d7a4c8a.tar.gz
take a stab at making set_synchronised less obscure
-rw-r--r--src/rabbit_mirror_queue_slave.erl67
1 files changed, 30 insertions, 37 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b28ff6e2..472eff47 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -853,10 +853,15 @@ process_instruction({sender_death, ChPid},
known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({depth, Depth},
- State = #state { backing_queue = BQ,
+ State = #state { depth_delta = D,
+ backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(
- 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
+ D1 = case D of
+ undefinded -> 1; %% anything but 0 will do here
+ _ -> D
+ end,
+ {ok, set_synchronised(Depth - BQ:depth(BQS) - D1,
+ State #state { depth_delta = D1 })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -882,38 +887,26 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-set_synchronised(Delta, State) ->
- set_synchronised(Delta, false, State).
-
-set_synchronised(_Delta, _AddAnyway,
- State = #state { depth_delta = undefined }) ->
+set_synchronised(_Delta, State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(Delta, AddAnyway,
- State = #state { depth_delta = DepthDelta,
- q = #amqqueue { name = QName }}) ->
- DepthDelta1 = DepthDelta + Delta,
- %% We intentionally leave out the head where a slave becomes
- %% unsynchronised: we assert that can never happen.
- %% The `AddAnyway' param is there since in the `depth' instruction we
- %% receive the master depth for the first time, and we want to set the sync
- %% state anyway if we are synced.
- case DepthDelta1 =:= 0 of
- true when not (DepthDelta =:= 0) orelse AddAnyway ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- %% We might be there already, in the `AddAnyway'
- %% case
- SSPids1 = SSPids -- [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
- end
- end);
- _ when DepthDelta1 >= 0 ->
- ok
- end,
- State #state { depth_delta = DepthDelta1 }.
+set_synchronised(Delta, State = #state { depth_delta = D }) ->
+ case D + Delta of
+ 0 when D == 0 -> State;
+ 0 when D =/= 0 -> ok = record_synchronised(State#state.q),
+ State #state { depth_delta = 0 };
+ N when D =/= 0 andalso N > 0 -> State #state { depth_delta = N }
+ end.
+
+record_synchronised(#amqqueue { name = QName }) ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q = #amqqueue { sync_slave_pids = SSPids }] ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
+ ok
+ end
+ end).