summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-16 13:34:13 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-16 13:34:13 +0100
commit4af8668dd69688896c14abe22f278405556d6e49 (patch)
treee5d9505ee37e0755bc6833b3c9b0e5da843d7e5a
parented5237aa32212f894af50e6bb91e9aabe52c285c (diff)
parent417e05d94a7b9bdc971fbcaa039eed8b3ebfaa0c (diff)
downloadrabbitmq-server-4af8668dd69688896c14abe22f278405556d6e49.tar.gz
Merging default to bug24340
-rw-r--r--docs/rabbitmqctl.1.xml11
-rw-r--r--src/rabbit_amqqueue_process.erl64
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl82
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_misc.erl30
-rw-r--r--src/rabbit_mirror_queue_slave.erl142
-rw-r--r--src/rabbit_mnesia.erl81
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_node_monitor.erl9
9 files changed, 340 insertions, 109 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ee000215..ba87c836 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -860,6 +860,17 @@
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>synchronised_slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of
+ the current slaves which are synchronised with the master -
+ i.e. those which could take over from the master without
+ message loss.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 05de48d6..bc1a85d0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -73,8 +73,8 @@
messages,
consumers,
memory,
- backing_queue_status,
- slave_pids
+ slave_pids,
+ backing_queue_status
]).
-define(CREATION_EVENT_KEYS,
@@ -84,10 +84,12 @@
auto_delete,
arguments,
owner_pid,
- mirror_nodes
+ slave_pids,
+ synchronised_slave_pids
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS,
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
%%----------------------------------------------------------------------------
@@ -149,11 +151,13 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
-terminate(Reason, State = #q{backing_queue = BQ}) ->
+terminate(Reason, State = #q{q = #amqqueue{name = QName},
+ backing_queue = BQ}) ->
%% FIXME: How do we cancel active subscriptions?
terminate_shutdown(fun (BQS) ->
rabbit_event:notify(
- queue_deleted, [{pid, self()}]),
+ queue_deleted, [{pid, self()},
+ {name, QName}]),
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete
%% doesn't return 'ok'.
@@ -703,7 +707,40 @@ ensure_ttl_timer(State) ->
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, State) ->
+ {Prefix, Items1} =
+ case lists:member(synchronised_slave_pids, Items) of
+ true -> Prefix1 = slaves_status(State),
+ case lists:member(slave_pids, Items) of
+ true -> {Prefix1, Items -- [slave_pids]};
+ false -> {proplists:delete(slave_pids, Prefix1), Items}
+ end;
+ false -> {[], Items}
+ end,
+ Prefix ++ [{Item, i(Item, State)}
+ || Item <- (Items1 -- [synchronised_slave_pids])].
+
+slaves_status(#q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ [{slave_pids, ''}, {synchronised_slave_pids, ''}];
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ {SPids1, SSPids} =
+ lists:foldl(
+ fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
+ {[Pid | SPidsN],
+ case proplists:get_bool(is_synchronised, Infos) of
+ true -> [Pid | SSPidsN];
+ false -> SSPidsN
+ end}
+ end, {[], []}, Results),
+ [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
+ end.
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -735,14 +772,15 @@ i(consumers, State) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined -> [];
+ _ -> SPids
+ end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
-i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- SPids;
-i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name),
- MNodes;
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index f6664a27..8ed2bede 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/3, get_gm/1, ensure_monitoring/2]).
+-export([start_link/4, get_gm/1, ensure_monitoring/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -32,15 +32,17 @@
-record(state, { q,
gm,
monitors,
- death_fun
+ death_fun,
+ length_fun
}).
-define(ONE_SECOND, 1000).
-ifdef(use_specs).
--spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined',
- rabbit_mirror_queue_master:death_fun()) ->
+-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun(),
+ rabbit_mirror_queue_master:length_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(get_gm/1 :: (pid()) -> pid()).
-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
@@ -53,7 +55,7 @@
%%
%% A queue with mirrors consists of the following:
%%
-%% #amqqueue{ pid, mirror_pids }
+%% #amqqueue{ pid, slave_pids }
%% | |
%% +----------+ +-------+--------------+-----------...etc...
%% | | |
@@ -138,9 +140,28 @@
%% state of the master. The detection of the sync-status of a slave is
%% done entirely based on length: if the slave and the master both
%% agree on the length of the queue after the fetch of the head of the
-%% queue, then the queues must be in sync. The only other possibility
-%% is that the slave's queue is shorter, and thus the fetch should be
-%% ignored.
+%% queue (or a 'set_length' results in a slave having to drop some
+%% messages from the head of its queue), then the queues must be in
+%% sync. The only other possibility is that the slave's queue is
+%% shorter, and thus the fetch should be ignored. In case slaves are
+%% joined to an empty queue which only goes on to receive publishes,
+%% they start by asking the master to broadcast its length. This is
+%% enough for slaves to always be able to work out when their head
+%% does not differ from the master (and is much simpler and cheaper
+%% than getting the master to hang on to the guid of the msg at the
+%% head of its queue). When a slave is promoted to a master, it
+%% unilaterally broadcasts its length, in order to solve the problem
+%% of length requests from new slaves being unanswered by a dead
+%% master.
+%%
+%% Obviously, due to the async nature of communication across gm, the
+%% slaves can fall behind. This does not matter from a sync pov: if
+%% they fall behind and the master dies then a) no publishes are lost
+%% because all publishes go to all mirrors anyway; b) the worst that
+%% happens is that acks get lost and so messages come back to
+%% life. This is no worse than normal given you never get confirmation
+%% that an ack has been received (not quite true with QoS-prefetch,
+%% but close enough for jazz).
%%
%% Because acktags are issued by the bq independently, and because
%% there is no requirement for the master and all slaves to use the
@@ -279,8 +300,8 @@
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
+start_link(Queue, GM, DeathFun, LengthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -292,7 +313,7 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -306,10 +327,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
end,
{ok, _TRef} =
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
- {ok, #state { q = Q,
- gm = GM1,
- monitors = dict:new(),
- death_fun = DeathFun },
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun,
+ length_fun = LengthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -317,18 +339,21 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
handle_cast({gm_deaths, Deaths},
- State = #state { q = #amqqueue { name = QueueName } }) ->
- rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n",
- [rabbit_misc:rs(QueueName),
- rabbit_misc:pid_to_string(self()),
- [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
+ State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
+ when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid} when node(Pid) =:= node() ->
+ {ok, MPid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
+ DeadPids),
noreply(State);
{error, not_found} ->
{stop, normal, State}
end;
+handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
+ ok = LengthFun(),
+ noreply(State);
+
handle_cast({ensure_monitoring, Pids},
State = #state { monitors = Monitors }) ->
Monitors1 =
@@ -343,13 +368,12 @@ handle_cast({ensure_monitoring, Pids},
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
- death_fun = Fun }) ->
- noreply(
- case dict:is_key(Pid, Monitors) of
- false -> State;
- true -> ok = Fun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
- end);
+ death_fun = DeathFun }) ->
+ noreply(case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = DeathFun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -379,6 +403,8 @@ members_changed([CPid], _Births, Deaths) ->
handle_msg([_CPid], _From, heartbeat) ->
ok;
+handle_msg([CPid], _From, request_length = Msg) ->
+ ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([_CPid], _From, _Msg) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 532911f2..ad5fd28f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -25,7 +25,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0]).
+-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
-behaviour(rabbit_backing_queue).
@@ -44,9 +44,10 @@
-ifdef(use_specs).
--export_type([death_fun/0]).
+-export_type([death_fun/0, length_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
+-type(length_fun() :: fun (() -> 'ok')).
-type(master_state() :: #state { gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
@@ -61,6 +62,7 @@
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
+-spec(length_fun/0 :: () -> length_fun()).
-endif.
@@ -83,7 +85,7 @@ stop() ->
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
AsyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun()),
+ Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
MNodes1 =
(case MNodes of
@@ -94,6 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -349,11 +352,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
%% ---------------------------------------------------------------------------
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ Len = BQ:len(BQS),
+ ok = gm:broadcast(GM, {length, Len}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
+ set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -371,9 +376,18 @@ sender_death_fun() ->
end)
end.
-%% ---------------------------------------------------------------------------
-%% Helpers
-%% ---------------------------------------------------------------------------
+length_fun() ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ State
+ end)
+ end.
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 6a9f733e..cf8e9484 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -17,7 +17,8 @@
-module(rabbit_mirror_queue_misc).
-export([remove_from_queue/2, on_node_up/0,
- drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]).
+ drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
+ report_deaths/4]).
-include("rabbit.hrl").
@@ -28,6 +29,7 @@
%% become the new master, which is bad because it could then mean the
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
+%% Returns {ok, NewMPid, DeadPids}
remove_from_queue(QueueName, DeadPids) ->
DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
rabbit_misc:execute_mnesia_transaction(
@@ -38,27 +40,27 @@ remove_from_queue(QueueName, DeadPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids }] ->
- [QPid1 | SPids1] =
+ [QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- ok;
+ {ok, QPid1, []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
Q1 = Q #amqqueue { pid = QPid1,
slave_pids = SPids1 },
- ok = rabbit_amqqueue:store_queue(Q1);
+ ok = rabbit_amqqueue:store_queue(Q1),
+ {ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- ok
- end,
- {ok, QPid1}
+ {ok, QPid1, []}
+ end
end
end).
@@ -133,3 +135,17 @@ if_mirrored_queue(Queue, Fun) ->
_ -> Fun(Q)
end
end).
+
+report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
+ ok;
+report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
+ rabbit_event:notify(queue_mirror_deaths, [{name, QueueName},
+ {pids, DeadPids}]),
+ rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n",
+ [rabbit_misc:rs(QueueName),
+ case IsMaster of
+ true -> "Master";
+ false -> "Slave"
+ end,
+ rabbit_misc:pid_to_string(MirrorPid),
+ [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c918f388..3c453981 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -33,7 +33,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2]).
+-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,15 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ master_pid,
+ is_synchronised
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
+
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -64,7 +73,9 @@
ack_num,
msg_id_status,
- known_senders
+ known_senders,
+
+ synchronised
}).
start_link(Q) ->
@@ -73,6 +84,9 @@ start_link(Q) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+info(QPid) ->
+ gen_server2:call(QPid, info, infinity).
+
init([#amqqueue { name = QueueName } = Q]) ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -95,26 +109,32 @@ init([#amqqueue { name = QueueName } = Q]) ->
end),
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
BQS = bq_init(BQ, Q, false),
- {ok, #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new()
- }, hibernate,
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
+ },
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
@@ -144,29 +164,32 @@ handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
gm = GM,
master_pid = MPid }) ->
- rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n",
- [rabbit_misc:rs(QueueName),
- rabbit_misc:pid_to_string(self()),
- [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
%% The GM has told us about deaths, which means we're not going to
%% receive any more messages from GM
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid} when node(Pid) =:= node(MPid) ->
- %% master hasn't changed
- reply(ok, State);
- {ok, Pid} when node(Pid) =:= node() ->
- %% we've become master
- promote_me(From, State);
- {ok, Pid} ->
- %% master has changed to not us.
- gen_server2:reply(From, ok),
- erlang:monitor(process, Pid),
- ok = gm:broadcast(GM, heartbeat),
- noreply(State #state { master_pid = Pid });
{error, not_found} ->
gen_server2:reply(From, ok),
- {stop, normal, State}
- end.
+ {stop, normal, State};
+ {ok, Pid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ DeadPids),
+ if node(Pid) =:= node(MPid) ->
+ %% master hasn't changed
+ reply(ok, State);
+ node(Pid) =:= node() ->
+ %% we've become master
+ promote_me(From, State);
+ true ->
+ %% master has changed to not us.
+ gen_server2:reply(From, ok),
+ erlang:monitor(process, Pid),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_pid = Pid })
+ end
+ end;
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -259,6 +282,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
+ info -> 9;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.
@@ -295,6 +319,9 @@ members_changed([SPid], _Births, Deaths) ->
handle_msg([_SPid], _From, heartbeat) ->
ok;
+handle_msg([_SPid], _From, request_length) ->
+ %% This is only of value to the master
+ ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
@@ -319,6 +346,14 @@ inform_deaths(SPid, Deaths) ->
%% Others
%% ---------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _State) -> self();
+i(name, #state { q = #amqqueue { name = Name } }) -> Name;
+i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(Item, _State) -> throw({bad_argument, Item}).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
@@ -384,7 +419,7 @@ gb_trees_cons(Key, Value, Tree) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, #state { q = Q,
+promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -393,12 +428,14 @@ promote_me(From, #state { q = Q,
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
+ rabbit_event:notify(queue_slave_promoted, [{pid, self()},
+ {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
- [rabbit_misc:rs(Q #amqqueue.name),
- rabbit_misc:pid_to_string(self())]),
+ [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
+ rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
@@ -749,7 +786,7 @@ process_instruction({set_length, Length},
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop > 0 of
+ {ok, case ToDrop >= 0 of
true -> BQS1 =
lists:foldl(
fun (const, BQSN) ->
@@ -757,7 +794,8 @@ process_instruction({set_length, Length},
BQSN1} = BQ:fetch(false, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
+ set_synchronised(
+ true, State #state { backing_queue_state = BQS1 });
false -> State
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
@@ -770,6 +808,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
+ Other when Other + 1 =:= Remaining ->
+ set_synchronised(true, State);
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -822,6 +862,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
+process_instruction({length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -849,3 +893,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
ack_num = Num }) ->
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+
+%% We intentionally leave out the head where a slave becomes
+%% unsynchronised: we assert that can never happen.
+set_synchronised(true, State = #state { q = #amqqueue { name = QName },
+ synchronised = false }) ->
+ rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
+ {name, QName}]),
+ State #state { synchronised = true };
+set_synchronised(true, State) ->
+ State;
+set_synchronised(false, State = #state { synchronised = false }) ->
+ State.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index ab553a8b..9f1e166d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -24,7 +24,7 @@
create_cluster_nodes_config/1, read_cluster_nodes_config/0,
record_running_nodes/0, read_previously_running_nodes/0,
delete_previously_running_nodes/0, running_nodes_filename/0,
- is_disc_node/0]).
+ is_disc_node/0, on_node_down/1, on_node_up/1]).
-export([table_names/0]).
@@ -67,6 +67,8 @@
-spec(delete_previously_running_nodes/0 :: () -> 'ok').
-spec(running_nodes_filename/0 :: () -> file:filename()).
-spec(is_disc_node/0 :: () -> boolean()).
+-spec(on_node_up/1 :: (node()) -> 'ok').
+-spec(on_node_down/1 :: (node()) -> 'ok').
-endif.
@@ -85,7 +87,9 @@ status() ->
no -> case all_clustered_nodes() of
[] -> [];
Nodes -> [{unknown, Nodes}]
- end
+ end;
+ Reason when Reason =:= starting; Reason =:= stopping ->
+ exit({rabbit_busy, try_again_later})
end},
{running_nodes, running_clustered_nodes()}].
@@ -118,10 +122,19 @@ cluster(ClusterNodes, Force) ->
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
+ case not Force andalso is_only_disc_node(node(), false) andalso
+ not should_be_disc_node(ClusterNodes) of
+ true -> log_both("last running disc node leaving cluster");
+ _ -> ok
+ end,
+
%% Wipe mnesia if we're changing type from disc to ram
case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
- {true, false} -> error_logger:warning_msg(
- "changing node type; wiping mnesia...~n~n"),
+ {true, false} -> rabbit_misc:with_local_io(
+ fun () -> error_logger:warning_msg(
+ "changing node type; wiping "
+ "mnesia...~n~n")
+ end),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema);
_ -> ok
@@ -159,6 +172,7 @@ cluster(ClusterNodes, Force) ->
after
stop_mnesia()
end,
+
ok.
%% return node to its virgin state, where it is not member of any
@@ -325,14 +339,24 @@ ensure_mnesia_dir() ->
ensure_mnesia_running() ->
case mnesia:system_info(is_running) of
- yes -> ok;
- no -> throw({error, mnesia_not_running})
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
end.
ensure_mnesia_not_running() ->
case mnesia:system_info(is_running) of
- no -> ok;
- yes -> throw({error, mnesia_unexpectedly_running})
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
end.
ensure_schema_integrity() ->
@@ -690,6 +714,10 @@ wait_for_tables(TableNames) ->
reset(Force) ->
ensure_mnesia_not_running(),
+ case not Force andalso is_only_disc_node(node(), false) of
+ true -> log_both("no other disc nodes running");
+ false -> ok
+ end,
Node = node(),
case Force of
true -> ok;
@@ -737,6 +765,43 @@ leave_cluster(Nodes, RunningNodes) ->
Nodes, RunningNodes}})
end.
+wait_for(Condition) ->
+ error_logger:info_msg("Waiting for ~p...~n", [Condition]),
+ timer:sleep(1000).
+
+on_node_up(Node) ->
+ case is_only_disc_node(Node, true) of
+ true -> rabbit_misc:with_local_io(
+ fun () -> rabbit_log:info("cluster contains disc "
+ "nodes again~n")
+ end);
+ false -> ok
+ end.
+
+on_node_down(Node) ->
+ case is_only_disc_node(Node, true) of
+ true -> rabbit_misc:with_local_io(
+ fun () -> rabbit_log:info("only running disc node "
+ "went down~n")
+ end);
+ false -> ok
+ end.
+
+is_only_disc_node(Node, _MnesiaRunning = true) ->
+ RunningSet = sets:from_list(running_clustered_nodes()),
+ DiscSet = sets:from_list(nodes_of_type(disc_copies)),
+ [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet));
+is_only_disc_node(Node, false) ->
+ start_mnesia(),
+ Res = is_only_disc_node(Node, true),
+ stop_mnesia(),
+ Res.
+
+log_both(Warning) ->
+ io:format("Warning: ~s~n", [Warning]),
+ rabbit_misc:with_local_io(
+ fun () -> error_logger:warning_msg("~s~n", [Warning]) end).
+
start_mnesia() ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ensure_mnesia_running().
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f86f90cc..aabe5aff 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,7 +36,7 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 5). %% milliseconds
+-define(SYNC_INTERVAL, 25). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
-define(TRANSFORM_TMP, "transform_tmp").
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 78aeb2ef..cb4f826d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -68,7 +68,7 @@ handle_call(_Request, _From, State) ->
handle_cast({rabbit_running_on, Node}, State) ->
rabbit_log:info("rabbit on ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
- ok = rabbit_alarm:on_node_up(Node),
+ ok = handle_live_rabbit(Node),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -94,4 +94,9 @@ code_change(_OldVsn, State, _Extra) ->
handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
- ok = rabbit_alarm:on_node_down(Node).
+ ok = rabbit_alarm:on_node_down(Node),
+ ok = rabbit_mnesia:on_node_down(Node).
+
+handle_live_rabbit(Node) ->
+ ok = rabbit_alarm:on_node_up(Node),
+ ok = rabbit_mnesia:on_node_up(Node).