diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-16 13:34:13 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-16 13:34:13 +0100 |
commit | 4af8668dd69688896c14abe22f278405556d6e49 (patch) | |
tree | e5d9505ee37e0755bc6833b3c9b0e5da843d7e5a | |
parent | ed5237aa32212f894af50e6bb91e9aabe52c285c (diff) | |
parent | 417e05d94a7b9bdc971fbcaa039eed8b3ebfaa0c (diff) | |
download | rabbitmq-server-4af8668dd69688896c14abe22f278405556d6e49.tar.gz |
Merging default to bug24340
-rw-r--r-- | docs/rabbitmqctl.1.xml | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 64 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 82 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 28 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 30 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 142 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 81 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 9 |
9 files changed, 340 insertions, 109 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ee000215..ba87c836 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -860,6 +860,17 @@ <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> </varlistentry> + <varlistentry> + <term>slave_pids</term> + <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves.</para></listitem> + </varlistentry> + <varlistentry> + <term>synchronised_slave_pids</term> + <listitem><para>If the queue is mirrored, this gives the IDs of + the current slaves which are synchronised with the master - + i.e. those which could take over from the master without + message loss.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>queueinfoitem</command>s are specified then queue name and depth are diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 05de48d6..bc1a85d0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -73,8 +73,8 @@ messages, consumers, memory, - backing_queue_status, - slave_pids + slave_pids, + backing_queue_status ]). -define(CREATION_EVENT_KEYS, @@ -84,10 +84,12 @@ auto_delete, arguments, owner_pid, - mirror_nodes + slave_pids, + synchronised_slave_pids ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, + ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]). %%---------------------------------------------------------------------------- @@ -149,11 +151,13 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(Reason, State = #q{backing_queue = BQ}) -> +terminate(Reason, State = #q{q = #amqqueue{name = QName}, + backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> rabbit_event:notify( - queue_deleted, [{pid, self()}]), + queue_deleted, [{pid, self()}, + {name, QName}]), BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. @@ -703,7 +707,40 @@ ensure_ttl_timer(State) -> now_micros() -> timer:now_diff(now(), {0,0,0}). -infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. +infos(Items, State) -> + {Prefix, Items1} = + case lists:member(synchronised_slave_pids, Items) of + true -> Prefix1 = slaves_status(State), + case lists:member(slave_pids, Items) of + true -> {Prefix1, Items -- [slave_pids]}; + false -> {proplists:delete(slave_pids, Prefix1), Items} + end; + false -> {[], Items} + end, + Prefix ++ [{Item, i(Item, State)} + || Item <- (Items1 -- [synchronised_slave_pids])]. + +slaves_status(#q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + case MNodes of + undefined -> + [{slave_pids, ''}, {synchronised_slave_pids, ''}]; + _ -> + {Results, _Bad} = + delegate:invoke( + SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + {SPids1, SSPids} = + lists:foldl( + fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> + {[Pid | SPidsN], + case proplists:get_bool(is_synchronised, Infos) of + true -> [Pid | SSPidsN]; + false -> SSPidsN + end} + end, {[], []}, Results), + [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}] + end. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; @@ -735,14 +772,15 @@ i(consumers, State) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes, + slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + case MNodes of + undefined -> []; + _ -> SPids + end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - SPids; -i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name), - MNodes; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index f6664a27..8ed2bede 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/3, get_gm/1, ensure_monitoring/2]). +-export([start_link/4, get_gm/1, ensure_monitoring/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,15 +32,17 @@ -record(state, { q, gm, monitors, - death_fun + death_fun, + length_fun }). -define(ONE_SECOND, 1000). -ifdef(use_specs). --spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', - rabbit_mirror_queue_master:death_fun()) -> +-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun(), + rabbit_mirror_queue_master:length_fun()) -> rabbit_types:ok_pid_or_error()). -spec(get_gm/1 :: (pid()) -> pid()). -spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). @@ -53,7 +55,7 @@ %% %% A queue with mirrors consists of the following: %% -%% #amqqueue{ pid, mirror_pids } +%% #amqqueue{ pid, slave_pids } %% | | %% +----------+ +-------+--------------+-----------...etc... %% | | | @@ -138,9 +140,28 @@ %% state of the master. The detection of the sync-status of a slave is %% done entirely based on length: if the slave and the master both %% agree on the length of the queue after the fetch of the head of the -%% queue, then the queues must be in sync. The only other possibility -%% is that the slave's queue is shorter, and thus the fetch should be -%% ignored. +%% queue (or a 'set_length' results in a slave having to drop some +%% messages from the head of its queue), then the queues must be in +%% sync. The only other possibility is that the slave's queue is +%% shorter, and thus the fetch should be ignored. In case slaves are +%% joined to an empty queue which only goes on to receive publishes, +%% they start by asking the master to broadcast its length. This is +%% enough for slaves to always be able to work out when their head +%% does not differ from the master (and is much simpler and cheaper +%% than getting the master to hang on to the guid of the msg at the +%% head of its queue). When a slave is promoted to a master, it +%% unilaterally broadcasts its length, in order to solve the problem +%% of length requests from new slaves being unanswered by a dead +%% master. +%% +%% Obviously, due to the async nature of communication across gm, the +%% slaves can fall behind. This does not matter from a sync pov: if +%% they fall behind and the master dies then a) no publishes are lost +%% because all publishes go to all mirrors anyway; b) the worst that +%% happens is that acks get lost and so messages come back to +%% life. This is no worse than normal given you never get confirmation +%% that an ack has been received (not quite true with QoS-prefetch, +%% but close enough for jazz). %% %% Because acktags are issued by the bq independently, and because %% there is no requirement for the master and all slaves to use the @@ -279,8 +300,8 @@ %% %%---------------------------------------------------------------------------- -start_link(Queue, GM, DeathFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). +start_link(Queue, GM, DeathFun, LengthFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -292,7 +313,7 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> GM1 = case GM of undefined -> {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -306,10 +327,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> end, {ok, _TRef} = timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), - {ok, #state { q = Q, - gm = GM1, - monitors = dict:new(), - death_fun = DeathFun }, + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun, + length_fun = LengthFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -317,18 +339,21 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). handle_cast({gm_deaths, Deaths}, - State = #state { q = #amqqueue { name = QueueName } }) -> - rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) + when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node() -> + {ok, MPid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, + DeadPids), noreply(State); {error, not_found} -> {stop, normal, State} end; +handle_cast(request_length, State = #state { length_fun = LengthFun }) -> + ok = LengthFun(), + noreply(State); + handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Monitors }) -> Monitors1 = @@ -343,13 +368,12 @@ handle_cast({ensure_monitoring, Pids}, handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, - death_fun = Fun }) -> - noreply( - case dict:is_key(Pid, Monitors) of - false -> State; - true -> ok = Fun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } - end); + death_fun = DeathFun }) -> + noreply(case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = DeathFun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -379,6 +403,8 @@ members_changed([CPid], _Births, Deaths) -> handle_msg([_CPid], _From, heartbeat) -> ok; +handle_msg([CPid], _From, request_length = Msg) -> + ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([_CPid], _From, _Msg) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 532911f2..ad5fd28f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -25,7 +25,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0]). +-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -44,9 +44,10 @@ -ifdef(use_specs). --export_type([death_fun/0]). +-export_type([death_fun/0, length_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). +-type(length_fun() :: fun (() -> 'ok')). -type(master_state() :: #state { gm :: pid(), coordinator :: pid(), backing_queue :: atom(), @@ -61,6 +62,7 @@ -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). +-spec(length_fun/0 :: () -> length_fun()). -endif. @@ -83,7 +85,7 @@ stop() -> init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, AsyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun()), + Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), MNodes1 = (case MNodes of @@ -94,6 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -349,11 +352,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% --------------------------------------------------------------------------- promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + Len = BQ:len(BQS), + ok = gm:broadcast(GM, {length, Len}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS), + set_delivered = Len, seen_status = SeenStatus, confirmed = [], ack_msg_id = dict:new(), @@ -371,9 +376,18 @@ sender_death_fun() -> end) end. -%% --------------------------------------------------------------------------- -%% Helpers -%% --------------------------------------------------------------------------- +length_fun() -> + Self = self(), + fun () -> + rabbit_amqqueue:run_backing_queue( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + State + end) + end. maybe_store_acktag(undefined, _MsgId, AM) -> AM; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 6a9f733e..cf8e9484 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,8 @@ -module(rabbit_mirror_queue_misc). -export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]). + drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, + report_deaths/4]). -include("rabbit.hrl"). @@ -28,6 +29,7 @@ %% become the new master, which is bad because it could then mean the %% slave (now master) receives messages it's not ready for (for %% example, new consumers). +%% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadPids) -> DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], rabbit_misc:execute_mnesia_transaction( @@ -38,27 +40,27 @@ remove_from_queue(QueueName, DeadPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> - [QPid1 | SPids1] = + [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - ok; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. Q1 = Q #amqqueue { pid = QPid1, slave_pids = SPids1 }, - ok = rabbit_amqqueue:store_queue(Q1); + ok = rabbit_amqqueue:store_queue(Q1), + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - ok - end, - {ok, QPid1} + {ok, QPid1, []} + end end end). @@ -133,3 +135,17 @@ if_mirrored_queue(Queue, Fun) -> _ -> Fun(Q) end end). + +report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> + ok; +report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> + rabbit_event:notify(queue_mirror_deaths, [{name, QueueName}, + {pids, DeadPids}]), + rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + case IsMaster of + true -> "Master"; + false -> "Slave" + end, + rabbit_misc:pid_to_string(MirrorPid), + [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c918f388..3c453981 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -33,7 +33,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2]). +-export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,15 @@ -include("rabbit.hrl"). -include("gm_specs.hrl"). +-define(CREATION_EVENT_KEYS, + [pid, + name, + master_pid, + is_synchronised + ]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS). + -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -64,7 +73,9 @@ ack_num, msg_id_status, - known_senders + known_senders, + + synchronised }). start_link(Q) -> @@ -73,6 +84,9 @@ start_link(Q) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +info(QPid) -> + gen_server2:call(QPid, info, infinity). + init([#amqqueue { name = QueueName } = Q]) -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -95,26 +109,32 @@ init([#amqqueue { name = QueueName } = Q]) -> end), erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), + rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), BQS = bq_init(BQ, Q, false), - {ok, #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new() - }, hibernate, + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false + }, + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> @@ -144,29 +164,32 @@ handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, gm = GM, master_pid = MPid }) -> - rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node(MPid) -> - %% master hasn't changed - reply(ok, State); - {ok, Pid} when node(Pid) =:= node() -> - %% we've become master - promote_me(From, State); - {ok, Pid} -> - %% master has changed to not us. - gen_server2:reply(From, ok), - erlang:monitor(process, Pid), - ok = gm:broadcast(GM, heartbeat), - noreply(State #state { master_pid = Pid }); {error, not_found} -> gen_server2:reply(From, ok), - {stop, normal, State} - end. + {stop, normal, State}; + {ok, Pid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + DeadPids), + if node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + true -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }) + end + end; + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -259,6 +282,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of + info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. @@ -295,6 +319,9 @@ members_changed([SPid], _Births, Deaths) -> handle_msg([_SPid], _From, heartbeat) -> ok; +handle_msg([_SPid], _From, request_length) -> + %% This is only of value to the master + ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; @@ -319,6 +346,14 @@ inform_deaths(SPid, Deaths) -> %% Others %% --------------------------------------------------------------------------- +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _State) -> self(); +i(name, #state { q = #amqqueue { name = Name } }) -> Name; +i(master_pid, #state { master_pid = MPid }) -> MPid; +i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(Item, _State) -> throw({bad_argument, Item}). + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, @@ -384,7 +419,7 @@ gb_trees_cons(Key, Value, Tree) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. -promote_me(From, #state { q = Q, +promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -393,12 +428,14 @@ promote_me(From, #state { q = Q, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> + rabbit_event:notify(queue_slave_promoted, [{pid, self()}, + {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", - [rabbit_misc:rs(Q #amqqueue.name), - rabbit_misc:pid_to_string(self())]), + [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), + rabbit_mirror_queue_master:length_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), @@ -749,7 +786,7 @@ process_instruction({set_length, Length}, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {ok, case ToDrop > 0 of + {ok, case ToDrop >= 0 of true -> BQS1 = lists:foldl( fun (const, BQSN) -> @@ -757,7 +794,8 @@ process_instruction({set_length, Length}, BQSN1} = BQ:fetch(false, BQSN), BQSN1 end, BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; + set_synchronised( + true, State #state { backing_queue_state = BQS1 }); false -> State end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, @@ -770,6 +808,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); + Other when Other + 1 =:= Remaining -> + set_synchronised(true, State); Other when Other < Remaining -> %% we must be shorter than the master State @@ -822,6 +862,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = dict:erase(ChPid, KS) } end}; +process_instruction({length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -849,3 +893,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, ack_num = Num }) -> State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. + +%% We intentionally leave out the head where a slave becomes +%% unsynchronised: we assert that can never happen. +set_synchronised(true, State = #state { q = #amqqueue { name = QName }, + synchronised = false }) -> + rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, + {name, QName}]), + State #state { synchronised = true }; +set_synchronised(true, State) -> + State; +set_synchronised(false, State = #state { synchronised = false }) -> + State. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ab553a8b..9f1e166d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -24,7 +24,7 @@ create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, delete_previously_running_nodes/0, running_nodes_filename/0, - is_disc_node/0]). + is_disc_node/0, on_node_down/1, on_node_up/1]). -export([table_names/0]). @@ -67,6 +67,8 @@ -spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). -spec(is_disc_node/0 :: () -> boolean()). +-spec(on_node_up/1 :: (node()) -> 'ok'). +-spec(on_node_down/1 :: (node()) -> 'ok'). -endif. @@ -85,7 +87,9 @@ status() -> no -> case all_clustered_nodes() of [] -> []; Nodes -> [{unknown, Nodes}] - end + end; + Reason when Reason =:= starting; Reason =:= stopping -> + exit({rabbit_busy, try_again_later}) end}, {running_nodes, running_clustered_nodes()}]. @@ -118,10 +122,19 @@ cluster(ClusterNodes, Force) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), + case not Force andalso is_only_disc_node(node(), false) andalso + not should_be_disc_node(ClusterNodes) of + true -> log_both("last running disc node leaving cluster"); + _ -> ok + end, + %% Wipe mnesia if we're changing type from disc to ram case {is_disc_node(), should_be_disc_node(ClusterNodes)} of - {true, false} -> error_logger:warning_msg( - "changing node type; wiping mnesia...~n~n"), + {true, false} -> rabbit_misc:with_local_io( + fun () -> error_logger:warning_msg( + "changing node type; wiping " + "mnesia...~n~n") + end), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema); _ -> ok @@ -159,6 +172,7 @@ cluster(ClusterNodes, Force) -> after stop_mnesia() end, + ok. %% return node to its virgin state, where it is not member of any @@ -325,14 +339,24 @@ ensure_mnesia_dir() -> ensure_mnesia_running() -> case mnesia:system_info(is_running) of - yes -> ok; - no -> throw({error, mnesia_not_running}) + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) end. ensure_mnesia_not_running() -> case mnesia:system_info(is_running) of - no -> ok; - yes -> throw({error, mnesia_unexpectedly_running}) + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) end. ensure_schema_integrity() -> @@ -690,6 +714,10 @@ wait_for_tables(TableNames) -> reset(Force) -> ensure_mnesia_not_running(), + case not Force andalso is_only_disc_node(node(), false) of + true -> log_both("no other disc nodes running"); + false -> ok + end, Node = node(), case Force of true -> ok; @@ -737,6 +765,43 @@ leave_cluster(Nodes, RunningNodes) -> Nodes, RunningNodes}}) end. +wait_for(Condition) -> + error_logger:info_msg("Waiting for ~p...~n", [Condition]), + timer:sleep(1000). + +on_node_up(Node) -> + case is_only_disc_node(Node, true) of + true -> rabbit_misc:with_local_io( + fun () -> rabbit_log:info("cluster contains disc " + "nodes again~n") + end); + false -> ok + end. + +on_node_down(Node) -> + case is_only_disc_node(Node, true) of + true -> rabbit_misc:with_local_io( + fun () -> rabbit_log:info("only running disc node " + "went down~n") + end); + false -> ok + end. + +is_only_disc_node(Node, _MnesiaRunning = true) -> + RunningSet = sets:from_list(running_clustered_nodes()), + DiscSet = sets:from_list(nodes_of_type(disc_copies)), + [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); +is_only_disc_node(Node, false) -> + start_mnesia(), + Res = is_only_disc_node(Node, true), + stop_mnesia(), + Res. + +log_both(Warning) -> + io:format("Warning: ~s~n", [Warning]), + rabbit_misc:with_local_io( + fun () -> error_logger:warning_msg("~s~n", [Warning]) end). + start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ensure_mnesia_running(). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f86f90cc..aabe5aff 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,7 +36,7 @@ -include("rabbit_msg_store.hrl"). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(SYNC_INTERVAL, 25). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). -define(TRANSFORM_TMP, "transform_tmp"). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 78aeb2ef..cb4f826d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -68,7 +68,7 @@ handle_call(_Request, _From, State) -> handle_cast({rabbit_running_on, Node}, State) -> rabbit_log:info("rabbit on ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), - ok = rabbit_alarm:on_node_up(Node), + ok = handle_live_rabbit(Node), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -94,4 +94,9 @@ code_change(_OldVsn, State, _Extra) -> handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), - ok = rabbit_alarm:on_node_down(Node). + ok = rabbit_alarm:on_node_down(Node), + ok = rabbit_mnesia:on_node_down(Node). + +handle_live_rabbit(Node) -> + ok = rabbit_alarm:on_node_up(Node), + ok = rabbit_mnesia:on_node_up(Node). |