summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl576
1 files changed, 252 insertions, 324 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 03fafc3e..1ba1420f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -19,17 +19,8 @@
%% For general documentation of HA design, see
%% rabbit_mirror_queue_coordinator
%%
-%% We join the GM group before we add ourselves to the amqqueue
-%% record. As a result:
-%% 1. We can receive msgs from GM that correspond to messages we will
-%% never receive from publishers.
-%% 2. When we receive a message from publishers, we must receive a
-%% message from the GM group for it.
-%% 3. However, that instruction from the GM group can arrive either
-%% before or after the actual message. We need to be able to
-%% distinguish between GM instructions arriving early, and case (1)
-%% above.
-%%
+%% We receive messages from GM and from publishers, and the gm
+%% messages can arrive either before or after the 'actual' message.
%% All instructions from the GM group must be processed in the order
%% in which they're received.
@@ -73,63 +64,59 @@
-record(state, { q,
gm,
- master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
- ack_num,
msg_id_status,
known_senders,
- synchronised
+ %% Master depth - local depth
+ depth_delta
}).
-start_link(Q) ->
- gen_server2:start_link(?MODULE, Q, []).
+start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-info(QPid) ->
- gen_server2:call(QPid, info, infinity).
-
-init(#amqqueue { name = QueueName } = Q) ->
+info(QPid) -> gen_server2:call(QPid, info, infinity).
+
+init(Q = #amqqueue { name = QName }) ->
+ %% We join the GM group before we add ourselves to the amqqueue
+ %% record. As a result:
+ %% 1. We can receive msgs from GM that correspond to messages we will
+ %% never receive from publishers.
+ %% 2. When we receive a message from publishers, we must receive a
+ %% message from the GM group for it.
+ %% 3. However, that instruction from the GM group can arrive either
+ %% before or after the actual message. We need to be able to
+ %% distinguish between GM instructions arriving early, and case (1)
+ %% above.
+ %%
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()],
+ fun rabbit_misc:execute_mnesia_transaction/1),
+ receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
- end
- end) of
- {new, MPid} ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
- erlang:monitor(process, MPid),
+ fun() -> init_it(Self, GM, Node, QName) end) of
+ {new, QPid} ->
+ erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
+ Q1 = Q #amqqueue { pid = QPid },
+ BQS = bq_init(BQ, Q1, false),
+ State = #state { q = Q1,
gm = GM,
- master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -137,70 +124,83 @@ init(#amqqueue { name = QueueName } = Q) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
- ack_num = 0,
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
+ ok = gm:broadcast(GM, request_depth),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, StalePid} ->
+ {stop, {stale_master_pid, StalePid}};
+ duplicate_live_master ->
+ {stop, {duplicate_live_master, Node}};
existing ->
+ gm:leave(GM),
ignore
end.
-handle_call({deliver, Delivery = #delivery { immediate = true }},
- From, State) ->
- %% It is safe to reply 'false' here even if a) we've not seen the
- %% msg via gm, or b) the master dies before we receive the msg via
- %% gm. In the case of (a), we will eventually receive the msg via
- %% gm, and it's only the master's result to the channel that is
- %% important. In the case of (b), if the master does die and we do
- %% get promoted then at that point we have no consumers, thus
- %% 'false' is precisely the correct answer. However, we must be
- %% careful to _not_ enqueue the message in this case.
-
- %% Note this is distinct from the case where we receive the msg
- %% via gm first, then we're promoted to master, and only then do
- %% we receive the msg from the channel.
- gen_server2:reply(From, false), %% master may deliver it, not us
- noreply(maybe_enqueue_message(Delivery, false, State));
-
-handle_call({deliver, Delivery = #delivery { mandatory = true }},
- From, State) ->
- gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- noreply(maybe_enqueue_message(Delivery, true, State));
+init_it(Self, GM, Node, QName) ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
+ mnesia:read({rabbit_queue, QName}),
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
+ [] -> add_slave(Q, Self, GM),
+ {new, QPid};
+ [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> Q1 = Q#amqqueue {
+ slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {_, S} <- GMPids,
+ S =/= SPid] },
+ add_slave(Q1, Self, GM),
+ {new, QPid}
+ end
+ end.
+
+%% Add to the end, so they are in descending order of age, see
+%% rabbit_mirror_queue_misc:promote_slave/1
+add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
+
+handle_call({deliver, Delivery, true}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- gm = GM,
- master_pid = MPid }) ->
- %% The GM has told us about deaths, which means we're not going to
- %% receive any more messages from GM
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
+ Self = self(),
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
- if node(Pid) =:= node(MPid) ->
+ case Pid of
+ MPid ->
%% master hasn't changed
- reply(ok, State);
- node(Pid) =:= node() ->
+ gen_server2:reply(From, ok),
+ noreply(State);
+ Self ->
%% we've become master
- promote_me(From, State);
- true ->
- %% master has changed to not us.
+ QueueState = promote_me(From, State),
+ {become, rabbit_amqqueue_process, QueueState, hibernate};
+ _ ->
+ %% master has changed to not us
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- ok = gm:broadcast(GM, heartbeat),
- noreply(State #state { master_pid = Pid })
+ noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
@@ -213,13 +213,14 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+ State) ->
+ %% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
noflow -> ok
end,
- noreply(maybe_enqueue_message(Delivery, true, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -249,8 +250,8 @@ handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
- State = #state { gm = GM, master_pid = MPid }) ->
- ok = gm:broadcast(GM, {process_death, MPid}),
+ State = #state { gm = GM, q = #amqqueue { pid = MPid } }) ->
+ ok = gm:broadcast(GM, process_death),
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
@@ -286,7 +287,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
+ Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -332,25 +333,26 @@ prioritise_info(Msg, _State) ->
%% GM
%% ---------------------------------------------------------------------------
-joined([SPid], _Members) ->
- SPid ! {joined, self()},
- ok.
+joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, []) ->
- ok;
-members_changed([SPid], _Births, Deaths) ->
- inform_deaths(SPid, Deaths).
+members_changed([_SPid], _Births, []) -> ok;
+members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
-handle_msg([_SPid], _From, heartbeat) ->
- ok;
-handle_msg([_SPid], _From, request_length) ->
+handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
-handle_msg([SPid], _From, {process_death, Pid}) ->
- inform_deaths(SPid, [Pid]);
+handle_msg([_SPid], _From, process_death) ->
+ %% Since GM is by nature lazy we need to make sure there is some
+ %% traffic when a master dies, to make sure we get informed of the
+ %% death. That's all process_death does, create some traffic. We
+ %% must not take any notice of the master death here since it
+ %% comes without ordering guarantees - there could still be
+ %% messages from the master we have yet to receive. When we get
+ %% members_changed, then there will be no more messages.
+ ok;
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
@@ -371,8 +373,8 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -390,14 +392,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
- never;
-needs_confirming(#delivery { message = #basic_message {
- is_persistent = true } },
- #state { q = #amqqueue { durable = true } }) ->
- eventually;
-needs_confirming(_Delivery, _State) ->
- immediately.
+send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+ MS;
+send_or_record_confirm(published, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ id = MsgId,
+ is_persistent = true } },
+ MS, #state { q = #amqqueue { durable = true } }) ->
+ dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo },
+ MS, _State) ->
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ MS.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{CMs, MS1} =
@@ -409,16 +417,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
+ {CMsN, dict:store(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
- {ok, {confirmed, _ChPid}} ->
+ {ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
%% and then delivered and ack'd before we've
@@ -442,17 +450,14 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
- rabbit_event:notify(queue_slave_promoted, [{pid, self()},
- {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
[rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
- rabbit_mirror_queue_master:length_fun()),
+ rabbit_mirror_queue_master:depth_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
- ok = gm:confirmed_broadcast(GM, heartbeat),
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
@@ -460,8 +465,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
- %% not from gm, and if they're due to be enqueued on promotion
- %% then we pass them to the
+ %% not from gm, and pass them to the
%% queue_process:init_with_backing_queue_state to be enqueued.
%%
%% We also have to requeue messages which are pending acks: the
@@ -489,18 +493,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%%
%% MS contains the following three entry types:
%%
- %% a) {published, ChPid}:
+ %% a) published:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
%% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% c) {confirmed, ChPid}:
+ %% c) confirmed:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% d) discarded
+ %% d) discarded:
%% seen via gm only as discarded. Pending publication from
%% channel
%%
@@ -517,34 +521,24 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% those messages are then requeued. However, as discussed above,
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- %%
- %% Everything that's in MA gets requeued. Consequently the new
- %% master should start with a fresh AM as there are no messages
- %% pending acks.
- MSList = dict:to_list(MS),
- SS = dict:from_list(
- [E || E = {_MsgId, discarded} <- MSList] ++
- [{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- MSList,
- Status =:= published orelse Status =:= confirmed]),
+ St = [published, confirmed, discarded],
+ SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
+ AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MPids),
-
- MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
- (_, MTC0) ->
- MTC0
- end, gb_trees:empty(), MSList),
- NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
- AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
+ CPid, BQ, BQS, GM, AckTags, SS, MPids),
+
+ MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_Msgid, _Status, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MS),
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- {Delivery, true} <- queue:to_list(PubQ)],
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, KS, MTC),
- {become, rabbit_amqqueue_process, QueueState, hibernate}.
+ Delivery <- queue:to_list(PubQ)],
+ rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -560,9 +554,9 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State #state {
backing_queue_state = BQS1 })),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
@@ -638,49 +632,22 @@ confirm_sender_death(Pid) ->
ok.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { id = MsgId },
- msg_seq_no = MsgSeqNo,
- sender = ChPid },
- EnqueueOnPromotion,
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, {confirmed, ChPid}} ->
- %% BQ has confirmed it but we didn't know what the
- %% msg_seq_no was at the time. We do now!
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { sender_queues = SQ1,
- msg_id_status = dict:erase(MsgId, MS) };
- {ok, {published, ChPid}} ->
- %% It was published to the BQ and we didn't know the
- %% msg_seq_no so couldn't confirm it at the time.
- case needs_confirming(Delivery, State1) of
- never ->
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- eventually ->
- State1 #state {
- msg_id_status =
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 }
- end;
- {ok, discarded} ->
- %% We've already heard from GM that the msg is to be
- %% discarded. We won't see this again.
+ {ok, Status} ->
+ MS1 = send_or_record_confirm(
+ Status, Delivery, dict:erase(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ State1 #state { msg_id_status = MS1,
sender_queues = SQ1 }
end.
@@ -698,45 +665,27 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
end.
-process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
-
- %% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. As a result, we
- %% may know that it needs confirming without knowing its
- %% msg_seq_no, which means that we can see the confirmation come
- %% back from the backing queue without knowing the msg_seq_no,
- %% which means that we're going to have to hang on to the fact
- %% that we've seen the msg_id confirmed until we can associate it
- %% with a msg_seq_no.
+publish_or_discard(Status, ChPid, MsgId,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ %% We really are going to do the publish/discard right now, even
+ %% though we may not have seen it directly from the channel. But
+ %% we cannot issues confirms until the latter has happened. So we
+ %% need to keep track of the MsgId and its confirmation status in
+ %% the meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, {published, ChPid}, MS)};
- {{value, {Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
- %% We received the msg from the channel first. Thus we
- %% need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never ->
- {MQ2, PendingCh, MS};
- eventually ->
- {MQ2, PendingCh,
- dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
- immediately ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- {MQ2, PendingCh, MS}
- end;
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ dict:store(MsgId, Status, MS)};
+ {{value, Delivery = #delivery {
+ message = #basic_message { id = MsgId } }}, MQ2} ->
+ {MQ2, PendingCh,
+ %% We received the msg from the channel first. Thus
+ %% we need to deal with confirms here.
+ send_or_record_confirm(Status, Delivery, MS, State1)};
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -744,73 +693,48 @@ process_instruction(
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
-
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
-
- {ok,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
- end};
-process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
- %% Many of the comments around the publish head above apply here
- %% too.
- State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- {MQ1, PendingCh1, MS1} =
- case queue:out(MQ) of
- {empty, _MQ} ->
- {MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, discarded, MS)};
- {{value, {#delivery { message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
- %% We've already seen it from the channel, we're not
- %% going to see this again, so don't add it to MS
- {MQ2, PendingCh, MS};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
- %% The instruction was sent to us before we were
- %% within the slave_pids within the #amqqueue{}
- %% record. We'll never receive the message directly
- %% from the channel.
- {MQ, PendingCh, MS}
- end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS1,
- backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+ State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
+
+
+process_instruction({publish, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({publish_delivered, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {ok, maybe_store_ack(true, MsgId, AckTag,
+ State1 #state { backing_queue_state = BQS1 })};
+process_instruction({discard, ChPid, MsgId}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(discarded, ChPid, MsgId, State),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> State1;
+ false -> update_delta(ToDrop - Dropped, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -821,11 +745,10 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
+ _ when QLen =< Remaining andalso AckRequired ->
+ State;
+ _ when QLen =< Remaining ->
+ update_delta(-1, State)
end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
@@ -834,27 +757,17 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
+ {ok, update_delta(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -872,10 +785,11 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+ {ok, set_delta(Depth - BQ:depth(BQS), State)};
+
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -887,31 +801,45 @@ msg_ids_to_acktags(MsgIds, MA) ->
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {Acc, MAN};
- {ok, {_Num, AckTag}} -> {[AckTag | Acc],
- dict:erase(MsgId, MAN)}
+ error -> {Acc, MAN};
+ {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
-maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
- ack_num = Num }) ->
- State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
- ack_num = Num + 1 }.
-
-%% We intentionally leave out the head where a slave becomes
-%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
- synchronised = false }) ->
- rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
- {name, QName}]),
- State #state { synchronised = true };
-set_synchronised(true, State) ->
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
+ State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
+
+set_delta(0, State = #state { depth_delta = undefined }) ->
+ ok = record_synchronised(State#state.q),
+ State #state { depth_delta = 0 };
+set_delta(NewDelta, State = #state { depth_delta = undefined }) ->
+ true = NewDelta > 0, %% assertion
+ State #state { depth_delta = NewDelta };
+set_delta(NewDelta, State = #state { depth_delta = Delta }) ->
+ update_delta(NewDelta - Delta, State).
+
+update_delta(_DeltaChange, State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
- State.
+update_delta( DeltaChange, State = #state { depth_delta = 0 }) ->
+ 0 = DeltaChange, %% assertion: we cannot become unsync'ed
+ State;
+update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
+ true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
+ set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+
+record_synchronised(#amqqueue { name = QName }) ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q = #amqqueue { sync_slave_pids = SSPids }] ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
+ ok
+ end
+ end).