summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-07-07 10:34:59 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-07-07 10:34:59 +0100
commit1e72197c5720adb868498a119edd78a3444e3198 (patch)
tree320a9c46d188c476e9d0606c29593e573a44a98d
parent6757adac1ca87455fb9c89ecf1219f5d0ae645ab (diff)
parentbf7a82d355b9b3bed4d16f40ac5a3fe1494a4356 (diff)
downloadrabbitmq-server-1e72197c5720adb868498a119edd78a3444e3198.tar.gz
Merging default into bug24130
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl69
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl44
3 files changed, 105 insertions, 36 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 57f6ca8b..48522af6 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/3, get_gm/1, ensure_monitoring/2]).
+-export([start_link/4, get_gm/1, ensure_monitoring/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -32,15 +32,17 @@
-record(state, { q,
gm,
monitors,
- death_fun
+ death_fun,
+ length_fun
}).
-define(ONE_SECOND, 1000).
-ifdef(use_specs).
--spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined',
- rabbit_mirror_queue_master:death_fun()) ->
+-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun(),
+ rabbit_mirror_queue_master:length_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(get_gm/1 :: (pid()) -> pid()).
-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
@@ -138,9 +140,28 @@
%% state of the master. The detection of the sync-status of a slave is
%% done entirely based on length: if the slave and the master both
%% agree on the length of the queue after the fetch of the head of the
-%% queue, then the queues must be in sync. The only other possibility
-%% is that the slave's queue is shorter, and thus the fetch should be
-%% ignored.
+%% queue (or a 'set_length' results in a slave having to drop some
+%% messages from the head of its queue), then the queues must be in
+%% sync. The only other possibility is that the slave's queue is
+%% shorter, and thus the fetch should be ignored. In case slaves are
+%% joined to an empty queue which only goes on to receive publishes,
+%% they start by asking the master to broadcast its length. This is
+%% enough for slaves to always be able to work out when their head
+%% does not differ from the master (and is much simpler and cheaper
+%% than getting the master to hang on to the guid of the msg at the
+%% head of its queue). When a slave is promoted to a master, it
+%% unilaterally broadcasts its length, in order to solve the problem
+%% of length requests from new slaves being unanswered by a dead
+%% master.
+%%
+%% Obviously, due to the async nature of communication across gm, the
+%% slaves can fall behind. This does not matter from a sync pov: if
+%% they fall behind and the master dies then a) no publishes are lost
+%% because all publishes go to all mirrors anyway; b) the worst that
+%% happens is that acks get lost and so messages come back to
+%% life. This is no worse than normal given you never get confirmation
+%% that an ack has been received (not quite true with QoS-prefetch,
+%% but close enough for jazz).
%%
%% Because acktags are issued by the bq independently, and because
%% there is no requirement for the master and all slaves to use the
@@ -318,8 +339,8 @@
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
+start_link(Queue, GM, DeathFun, LengthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -331,7 +352,7 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -345,10 +366,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
end,
{ok, _TRef} =
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
- {ok, #state { q = Q,
- gm = GM1,
- monitors = dict:new(),
- death_fun = DeathFun },
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun,
+ length_fun = LengthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -368,6 +390,10 @@ handle_cast({gm_deaths, Deaths},
{stop, normal, State}
end;
+handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
+ ok = LengthFun(),
+ noreply(State);
+
handle_cast({ensure_monitoring, Pids},
State = #state { monitors = Monitors }) ->
Monitors1 =
@@ -382,13 +408,12 @@ handle_cast({ensure_monitoring, Pids},
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
- death_fun = Fun }) ->
- noreply(
- case dict:is_key(Pid, Monitors) of
- false -> State;
- true -> ok = Fun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
- end);
+ death_fun = DeathFun }) ->
+ noreply(case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = DeathFun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -418,6 +443,8 @@ members_changed([CPid], _Births, Deaths) ->
handle_msg([_CPid], _From, heartbeat) ->
ok;
+handle_msg([CPid], _From, request_length = Msg) ->
+ ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([_CPid], _From, _Msg) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b090ebe8..9578026e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0]).
+-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
-behaviour(rabbit_backing_queue).
@@ -45,9 +45,10 @@
-ifdef(use_specs).
--export_type([death_fun/0]).
+-export_type([death_fun/0, length_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
+-type(length_fun() :: fun (() -> 'ok')).
-type(master_state() :: #state { gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
@@ -62,6 +63,7 @@
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
+-spec(length_fun/0 :: () -> length_fun()).
-endif.
@@ -84,7 +86,7 @@ stop() ->
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
AsyncCallback, SyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun()),
+ Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
MNodes1 =
(case MNodes of
@@ -95,6 +97,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -369,11 +372,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
%% ---------------------------------------------------------------------------
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ Len = BQ:len(BQS),
+ ok = gm:broadcast(GM, {length, Len}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
+ set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -391,9 +396,18 @@ sender_death_fun() ->
end)
end.
-%% ---------------------------------------------------------------------------
-%% Helpers
-%% ---------------------------------------------------------------------------
+length_fun() ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:run_backing_queue_async(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ State
+ end)
+ end.
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 55d61d41..a4a40a8c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,9 @@
ack_num,
msg_id_status,
- known_senders
+ known_senders,
+
+ synchronised
}).
start_link(Q) ->
@@ -101,6 +103,9 @@ init([#amqqueue { name = QueueName } = Q]) ->
self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
{ok, BQ} = application:get_env(backing_queue_module),
BQS = bq_init(BQ, Q, false),
+ rabbit_event:notify(queue_slave_created,
+ [{name, QueueName}, {pid, self()}, {master_pid, MPid}]),
+ ok = gm:broadcast(GM, request_length),
{ok, #state { q = Q,
gm = GM,
master_pid = MPid,
@@ -114,7 +119,9 @@ init([#amqqueue { name = QueueName } = Q]) ->
ack_num = 0,
msg_id_status = dict:new(),
- known_senders = dict:new()
+ known_senders = dict:new(),
+
+ synchronised = false
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -303,6 +310,9 @@ members_changed([SPid], _Births, Deaths) ->
handle_msg([_SPid], _From, heartbeat) ->
ok;
+handle_msg([_SPid], _From, request_length) ->
+ %% This is only of value to the master
+ ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
@@ -399,7 +409,7 @@ gb_trees_cons(Key, Value, Tree) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, #state { q = Q,
+promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -408,12 +418,13 @@ promote_me(From, #state { q = Q,
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
+ rabbit_event:notify(queue_slave_promoted, [{name, QName}, {pid, self()}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
- [rabbit_misc:rs(Q #amqqueue.name),
- rabbit_misc:pid_to_string(self())]),
+ [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
+ rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
@@ -771,7 +782,7 @@ process_instruction({set_length, Length},
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop > 0 of
+ {ok, case ToDrop >= 0 of
true -> BQS1 =
lists:foldl(
fun (const, BQSN) ->
@@ -779,7 +790,8 @@ process_instruction({set_length, Length},
BQSN1} = BQ:fetch(false, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
+ set_synchronised(
+ true, State #state { backing_queue_state = BQS1 });
false -> State
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
@@ -792,6 +804,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
+ Other when Other + 1 =:= Remaining ->
+ set_synchronised(true, State);
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -844,6 +858,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
+process_instruction({length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -871,3 +889,13 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
ack_num = Num }) ->
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+
+%% We intentionally leave out the head where a slave becomes
+%% unsynchronised: we assert that can never happen.
+set_synchronised(true, State = #state { synchronised = false }) ->
+ rabbit_event:notify(queue_slave_synchronised, [{pid, self()}]),
+ State #state { synchronised = true };
+set_synchronised(true, State) ->
+ State;
+set_synchronised(false, State = #state { synchronised = false }) ->
+ State.