summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_coordinator.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_coordinator.erl')
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl426
1 files changed, 0 insertions, 426 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
deleted file mode 100644
index 3d460528..00000000
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ /dev/null
@@ -1,426 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2010-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_mirror_queue_coordinator).
-
--export([start_link/4, get_gm/1, ensure_monitoring/2]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--export([joined/2, members_changed/3, handle_msg/3, handle_terminate/2]).
-
--behaviour(gen_server2).
--behaviour(gm).
-
--include("rabbit.hrl").
--include("gm_specs.hrl").
-
--record(state, { q,
- gm,
- monitors,
- death_fun,
- depth_fun
- }).
-
--ifdef(use_specs).
-
--spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
- rabbit_mirror_queue_master:death_fun(),
- rabbit_mirror_queue_master:depth_fun()) ->
- rabbit_types:ok_pid_or_error()).
--spec(get_gm/1 :: (pid()) -> pid()).
--spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-%%
-%% Mirror Queues
-%%
-%% A queue with mirrors consists of the following:
-%%
-%% #amqqueue{ pid, slave_pids }
-%% | |
-%% +----------+ +-------+--------------+-----------...etc...
-%% | | |
-%% V V V
-%% amqqueue_process---+ slave-----+ slave-----+ ...etc...
-%% | BQ = master----+ | | BQ = vq | | BQ = vq |
-%% | | BQ = vq | | +-+-------+ +-+-------+
-%% | +-+-------+ | | |
-%% +-++-----|---------+ | | (some details elided)
-%% || | | |
-%% || coordinator-+ | |
-%% || +-+---------+ | |
-%% || | | |
-%% || gm-+ -- -- -- -- gm-+- -- -- -- gm-+- -- --...etc...
-%% || +--+ +--+ +--+
-%% ||
-%% consumers
-%%
-%% The master is merely an implementation of bq, and thus is invoked
-%% through the normal bq interface by the amqqueue_process. The slaves
-%% meanwhile are processes in their own right (as is the
-%% coordinator). The coordinator and all slaves belong to the same gm
-%% group. Every member of a gm group receives messages sent to the gm
-%% group. Because the master is the bq of amqqueue_process, it doesn't
-%% have sole control over its mailbox, and as a result, the master
-%% itself cannot be passed messages directly (well, it could by via
-%% the amqqueue:run_backing_queue callback but that would induce
-%% additional unnecessary loading on the master queue process), yet it
-%% needs to react to gm events, such as the death of slaves. Thus the
-%% master creates the coordinator, and it is the coordinator that is
-%% the gm callback module and event handler for the master.
-%%
-%% Consumers are only attached to the master. Thus the master is
-%% responsible for informing all slaves when messages are fetched from
-%% the bq, when they're acked, and when they're requeued.
-%%
-%% The basic goal is to ensure that all slaves performs actions on
-%% their bqs in the same order as the master. Thus the master
-%% intercepts all events going to its bq, and suitably broadcasts
-%% these events on the gm. The slaves thus receive two streams of
-%% events: one stream is via the gm, and one stream is from channels
-%% directly. Whilst the stream via gm is guaranteed to be consistently
-%% seen by all slaves, the same is not true of the stream via
-%% channels. For example, in the event of an unexpected death of a
-%% channel during a publish, only some of the mirrors may receive that
-%% publish. As a result of this problem, the messages broadcast over
-%% the gm contain published content, and thus slaves can operate
-%% successfully on messages that they only receive via the gm.
-%%
-%% The key purpose of also sending messages directly from the channels
-%% to the slaves is that without this, in the event of the death of
-%% the master, messages could be lost until a suitable slave is
-%% promoted. However, that is not the only reason. A slave cannot send
-%% confirms for a message until it has seen it from the
-%% channel. Otherwise, it might send a confirm to a channel for a
-%% message that it might *never* receive from that channel. This can
-%% happen because new slaves join the gm ring (and thus receive
-%% messages from the master) before inserting themselves in the
-%% queue's mnesia record (which is what channels look at for routing).
-%% As it turns out, channels will simply ignore such bogus confirms,
-%% but relying on that would introduce a dangerously tight coupling.
-%%
-%% Hence the slaves have to wait until they've seen both the publish
-%% via gm, and the publish via the channel before they issue the
-%% confirm. Either form of publish can arrive first, and a slave can
-%% be upgraded to the master at any point during this
-%% process. Confirms continue to be issued correctly, however.
-%%
-%% Because the slave is a full process, it impersonates parts of the
-%% amqqueue API. However, it does not need to implement all parts: for
-%% example, no ack or consumer-related message can arrive directly at
-%% a slave from a channel: it is only publishes that pass both
-%% directly to the slaves and go via gm.
-%%
-%% Slaves can be added dynamically. When this occurs, there is no
-%% attempt made to sync the current contents of the master with the
-%% new slave, thus the slave will start empty, regardless of the state
-%% of the master. Thus the slave needs to be able to detect and ignore
-%% operations which are for messages it has not received: because of
-%% the strict FIFO nature of queues in general, this is
-%% straightforward - all new publishes that the new slave receives via
-%% gm should be processed as normal, but fetches which are for
-%% messages the slave has never seen should be ignored. Similarly,
-%% acks for messages the slave never fetched should be
-%% ignored. Similarly, we don't republish rejected messages that we
-%% haven't seen. Eventually, as the master is consumed from, the
-%% messages at the head of the queue which were there before the slave
-%% joined will disappear, and the slave will become fully synced with
-%% the state of the master.
-%%
-%% The detection of the sync-status is based on the depth of the BQs,
-%% where the depth is defined as the sum of the length of the BQ (as
-%% per BQ:len) and the messages pending an acknowledgement. When the
-%% depth of the slave is equal to the master's, then the slave is
-%% synchronised. We only store the difference between the two for
-%% simplicity. Comparing the length is not enough since we need to
-%% take into account rejected messages which will make it back into
-%% the master queue but can't go back in the slave, since we don't
-%% want "holes" in the slave queue. Note that the depth, and the
-%% length likewise, must always be shorter on the slave - we assert
-%% that in various places. In case slaves are joined to an empty queue
-%% which only goes on to receive publishes, they start by asking the
-%% master to broadcast its depth. This is enough for slaves to always
-%% be able to work out when their head does not differ from the master
-%% (and is much simpler and cheaper than getting the master to hang on
-%% to the guid of the msg at the head of its queue). When a slave is
-%% promoted to a master, it unilaterally broadcasts its depth, in
-%% order to solve the problem of depth requests from new slaves being
-%% unanswered by a dead master.
-%%
-%% Obviously, due to the async nature of communication across gm, the
-%% slaves can fall behind. This does not matter from a sync pov: if
-%% they fall behind and the master dies then a) no publishes are lost
-%% because all publishes go to all mirrors anyway; b) the worst that
-%% happens is that acks get lost and so messages come back to
-%% life. This is no worse than normal given you never get confirmation
-%% that an ack has been received (not quite true with QoS-prefetch,
-%% but close enough for jazz).
-%%
-%% Because acktags are issued by the bq independently, and because
-%% there is no requirement for the master and all slaves to use the
-%% same bq, all references to msgs going over gm is by msg_id. Thus
-%% upon acking, the master must convert the acktags back to msg_ids
-%% (which happens to be what bq:ack returns), then sends the msg_ids
-%% over gm, the slaves must convert the msg_ids to acktags (a mapping
-%% the slaves themselves must maintain).
-%%
-%% When the master dies, a slave gets promoted. This will be the
-%% eldest slave, and thus the hope is that that slave is most likely
-%% to be sync'd with the master. The design of gm is that the
-%% notification of the death of the master will only appear once all
-%% messages in-flight from the master have been fully delivered to all
-%% members of the gm group. Thus at this point, the slave that gets
-%% promoted cannot broadcast different events in a different order
-%% than the master for the same msgs: there is no possibility for the
-%% same msg to be processed by the old master and the new master - if
-%% it was processed by the old master then it will have been processed
-%% by the slave before the slave was promoted, and vice versa.
-%%
-%% Upon promotion, all msgs pending acks are requeued as normal, the
-%% slave constructs state suitable for use in the master module, and
-%% then dynamically changes into an amqqueue_process with the master
-%% as the bq, and the slave's bq as the master's bq. Thus the very
-%% same process that was the slave is now a full amqqueue_process.
-%%
-%% It is important that we avoid memory leaks due to the death of
-%% senders (i.e. channels) and partial publications. A sender
-%% publishing a message may fail mid way through the publish and thus
-%% only some of the mirrors will receive the message. We need the
-%% mirrors to be able to detect this and tidy up as necessary to avoid
-%% leaks. If we just had the master monitoring all senders then we
-%% would have the possibility that a sender appears and only sends the
-%% message to a few of the slaves before dying. Those slaves would
-%% then hold on to the message, assuming they'll receive some
-%% instruction eventually from the master. Thus we have both slaves
-%% and the master monitor all senders they become aware of. But there
-%% is a race: if the slave receives a DOWN of a sender, how does it
-%% know whether or not the master is going to send it instructions
-%% regarding those messages?
-%%
-%% Whilst the master monitors senders, it can't access its mailbox
-%% directly, so it delegates monitoring to the coordinator. When the
-%% coordinator receives a DOWN message from a sender, it informs the
-%% master via a callback. This allows the master to do any tidying
-%% necessary, but more importantly allows the master to broadcast a
-%% sender_death message to all the slaves, saying the sender has
-%% died. Once the slaves receive the sender_death message, they know
-%% that they're not going to receive any more instructions from the gm
-%% regarding that sender. However, it is possible that the coordinator
-%% receives the DOWN and communicates that to the master before the
-%% master has finished receiving and processing publishes from the
-%% sender. This turns out not to be a problem: the sender has actually
-%% died, and so will not need to receive confirms or other feedback,
-%% and should further messages be "received" from the sender, the
-%% master will ask the coordinator to set up a new monitor, and
-%% will continue to process the messages normally. Slaves may thus
-%% receive publishes via gm from previously declared "dead" senders,
-%% but again, this is fine: should the slave have just thrown out the
-%% message it had received directly from the sender (due to receiving
-%% a sender_death message via gm), it will be able to cope with the
-%% publication purely from the master via gm.
-%%
-%% When a slave receives a DOWN message for a sender, if it has not
-%% received the sender_death message from the master via gm already,
-%% then it will wait 20 seconds before broadcasting a request for
-%% confirmation from the master that the sender really has died.
-%% Should a sender have only sent a publish to slaves, this allows
-%% slaves to inform the master of the previous existence of the
-%% sender. The master will thus monitor the sender, receive the DOWN,
-%% and subsequently broadcast the sender_death message, allowing the
-%% slaves to tidy up. This process can repeat for the same sender:
-%% consider one slave receives the publication, then the DOWN, then
-%% asks for confirmation of death, then the master broadcasts the
-%% sender_death message. Only then does another slave receive the
-%% publication and thus set up its monitoring. Eventually that slave
-%% too will receive the DOWN, ask for confirmation and the master will
-%% monitor the sender again, receive another DOWN, and send out
-%% another sender_death message. Given the 20 second delay before
-%% requesting death confirmation, this is highly unlikely, but it is a
-%% possibility.
-%%
-%% When the 20 second timer expires, the slave first checks to see
-%% whether it still needs confirmation of the death before requesting
-%% it. This prevents unnecessary traffic on gm as it allows one
-%% broadcast of the sender_death message to satisfy many slaves.
-%%
-%% If we consider the promotion of a slave at this point, we have two
-%% possibilities: that of the slave that has received the DOWN and is
-%% thus waiting for confirmation from the master that the sender
-%% really is down; and that of the slave that has not received the
-%% DOWN. In the first case, in the act of promotion to master, the new
-%% master will monitor again the dead sender, and after it has
-%% finished promoting itself, it should find another DOWN waiting,
-%% which it will then broadcast. This will allow slaves to tidy up as
-%% normal. In the second case, we have the possibility that
-%% confirmation-of-sender-death request has been broadcast, but that
-%% it was broadcast before the master failed, and that the slave being
-%% promoted does not know anything about that sender, and so will not
-%% monitor it on promotion. Thus a slave that broadcasts such a
-%% request, at the point of broadcasting it, recurses, setting another
-%% 20 second timer. As before, on expiry of the timer, the slaves
-%% checks to see whether it still has not received a sender_death
-%% message for the dead sender, and if not, broadcasts a death
-%% confirmation request. Thus this ensures that even when a master
-%% dies and the new slave has no knowledge of the dead sender, it will
-%% eventually receive a death confirmation request, shall monitor the
-%% dead sender, receive the DOWN and broadcast the sender_death
-%% message.
-%%
-%% The preceding commentary deals with the possibility of slaves
-%% receiving publications from senders which the master does not, and
-%% the need to prevent memory leaks in such scenarios. The inverse is
-%% also possible: a partial publication may cause only the master to
-%% receive a publication. It will then publish the message via gm. The
-%% slaves will receive it via gm, will publish it to their BQ and will
-%% set up monitoring on the sender. They will then receive the DOWN
-%% message and the master will eventually publish the corresponding
-%% sender_death message. The slave will then be able to tidy up its
-%% state as normal.
-%%
-%% Recovery of mirrored queues is straightforward: as nodes die, the
-%% remaining nodes record this, and eventually a situation is reached
-%% in which only one node is alive, which is the master. This is the
-%% only node which, upon recovery, will resurrect a mirrored queue:
-%% nodes which die and then rejoin as a slave will start off empty as
-%% if they have no mirrored content at all. This is not surprising: to
-%% achieve anything more sophisticated would require the master and
-%% recovering slave to be able to check to see whether they agree on
-%% the last seen state of the queue: checking depth alone is not
-%% sufficient in this case.
-%%
-%% For more documentation see the comments in bug 23554.
-%%
-%%----------------------------------------------------------------------------
-
-start_link(Queue, GM, DeathFun, DepthFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []).
-
-get_gm(CPid) ->
- gen_server2:call(CPid, get_gm, infinity).
-
-ensure_monitoring(CPid, Pids) ->
- gen_server2:cast(CPid, {ensure_monitoring, Pids}).
-
-%% ---------------------------------------------------------------------------
-%% gen_server
-%% ---------------------------------------------------------------------------
-
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
- ?store_proc_name(QueueName),
- GM1 = case GM of
- undefined ->
- {ok, GM2} = gm:start_link(
- QueueName, ?MODULE, [self()],
- fun rabbit_misc:execute_mnesia_transaction/1),
- receive {joined, GM2, _Members} ->
- ok
- end,
- GM2;
- _ ->
- true = link(GM),
- GM
- end,
- {ok, #state { q = Q,
- gm = GM1,
- monitors = pmon:new(),
- death_fun = DeathFun,
- depth_fun = DepthFun },
- hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call(get_gm, _From, State = #state { gm = GM }) ->
- reply(GM, State).
-
-handle_cast({gm_deaths, DeadGMPids},
- State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
- when node(MPid) =:= node() ->
- case rabbit_mirror_queue_misc:remove_from_queue(
- QueueName, MPid, DeadGMPids) of
- {ok, MPid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
- DeadPids),
- noreply(State);
- {error, not_found} ->
- {stop, normal, State}
- end;
-
-handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
- ok = DepthFun(),
- noreply(State);
-
-handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
- noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
-
-handle_cast({delete_and_terminate, Reason}, State) ->
- {stop, Reason, State}.
-
-handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
- State = #state { monitors = Mons,
- death_fun = DeathFun }) ->
- noreply(case pmon:is_monitored(Pid, Mons) of
- false -> State;
- true -> ok = DeathFun(Pid),
- State #state { monitors = pmon:erase(Pid, Mons) }
- end);
-
-handle_info(Msg, State) ->
- {stop, {unexpected_info, Msg}, State}.
-
-terminate(_Reason, #state{}) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% ---------------------------------------------------------------------------
-%% GM
-%% ---------------------------------------------------------------------------
-
-joined([CPid], Members) ->
- CPid ! {joined, self(), Members},
- ok.
-
-members_changed([_CPid], _Births, []) ->
- ok;
-members_changed([CPid], _Births, Deaths) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
-
-handle_msg([CPid], _From, request_depth = Msg) ->
- ok = gen_server2:cast(CPid, Msg);
-handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
- ok = gen_server2:cast(CPid, Msg);
-handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
- ok = gen_server2:cast(CPid, Msg),
- {stop, {shutdown, ring_shutdown}};
-handle_msg([_CPid], _From, _Msg) ->
- ok.
-
-handle_terminate([_CPid], _Reason) ->
- ok.
-
-%% ---------------------------------------------------------------------------
-%% Others
-%% ---------------------------------------------------------------------------
-
-noreply(State) ->
- {noreply, State, hibernate}.
-
-reply(Reply, State) ->
- {reply, Reply, State, hibernate}.