summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-06-26 18:53:15 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-06-26 18:53:15 +0100
commit1a996146d8d99d1ebe2396348a1bed8a0ec2ffc5 (patch)
tree0cce03e9dc133347b72a0fa56da10ba26bd72b3c
parenta07a3fa48f73cb86c2afd246bbeec35177a455c5 (diff)
downloadrabbitmq-server-1a996146d8d99d1ebe2396348a1bed8a0ec2ffc5.tar.gz
Master responds to length_requests with length broadcast; slaves can determine their synchronised state correctly.
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl37
-rw-r--r--src/rabbit_mirror_queue_master.erl65
-rw-r--r--src/rabbit_mirror_queue_slave.erl31
3 files changed, 85 insertions, 48 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index ab0b2362..b70761ea 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/3, get_gm/1, ensure_monitoring/2]).
+-export([start_link/4, get_gm/1, ensure_monitoring/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -32,7 +32,8 @@
-record(state, { q,
gm,
monitors,
- death_fun
+ death_fun,
+ length_fun
}).
-define(ONE_SECOND, 1000).
@@ -308,8 +309,8 @@
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
+start_link(Queue, GM, DeathFun, LengthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -321,7 +322,7 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -335,10 +336,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
end,
{ok, _TRef} =
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
- {ok, #state { q = Q,
- gm = GM1,
- monitors = dict:new(),
- death_fun = DeathFun },
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun,
+ length_fun = LengthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -358,8 +360,8 @@ handle_cast({gm_deaths, Deaths},
{stop, normal, State}
end;
-handle_cast(request_length, State) ->
- %% TODO: something
+handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
+ ok = LengthFun(),
noreply(State);
handle_cast({ensure_monitoring, Pids},
@@ -376,13 +378,12 @@ handle_cast({ensure_monitoring, Pids},
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
- death_fun = Fun }) ->
- noreply(
- case dict:is_key(Pid, Monitors) of
- false -> State;
- true -> ok = Fun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
- end);
+ death_fun = DeathFun }) ->
+ noreply(case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = DeathFun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 463b8cfb..7d2b7e44 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0]).
+-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
-behaviour(rabbit_backing_queue).
@@ -59,22 +59,10 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-sender_death_fun() ->
- Self = self(),
- fun (DeadPid) ->
- rabbit_amqqueue:run_backing_queue_async(
- Self, ?MODULE,
- fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
- ok = gm:broadcast(GM, {sender_death, DeadPid}),
- KS1 = sets:del_element(DeadPid, KS),
- State #state { known_senders = KS1 }
- end)
- end.
-
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
AsyncCallback, SyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun()),
+ Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
MNodes1 =
(case MNodes of
@@ -85,6 +73,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
+ ok = gm:broadcast(GM, {length, BQ:length(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -95,17 +84,6 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
ack_msg_id = dict:new(),
known_senders = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
- #state { gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
- seen_status = SeenStatus,
- confirmed = [],
- ack_msg_id = dict:new(),
- known_senders = sets:from_list(KS) }.
-
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination - this node has been explicitly
@@ -365,6 +343,43 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
State
end.
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ ok = gm:broadcast(GM, {length, BQ:length(BQS)}),
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = BQ:len(BQS),
+ seen_status = SeenStatus,
+ confirmed = [],
+ ack_msg_id = dict:new(),
+ known_senders = sets:from_list(KS) }.
+
+sender_death_fun() ->
+ Self = self(),
+ fun (DeadPid) ->
+ rabbit_amqqueue:run_backing_queue_async(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM, known_senders = KS }) ->
+ ok = gm:broadcast(GM, {sender_death, DeadPid}),
+ KS1 = sets:del_element(DeadPid, KS),
+ State #state { known_senders = KS1 }
+ end)
+ end.
+
+length_fun() ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:run_backing_queue_async(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {length, BQ:length(BQS)}),
+ State
+ end)
+ end.
+
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
maybe_store_acktag(AckTag, MsgId, AM) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8a365e6a..f4c950a9 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,9 @@
ack_num,
msg_id_status,
- known_senders
+ known_senders,
+
+ synchronised
}).
start_link(Q) ->
@@ -117,7 +119,9 @@ init([#amqqueue { name = QueueName } = Q]) ->
ack_num = 0,
msg_id_status = dict:new(),
- known_senders = dict:new()
+ known_senders = dict:new(),
+
+ synchronised = false
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -419,7 +423,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
[rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
+ rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
@@ -777,7 +782,7 @@ process_instruction({set_length, Length},
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop > 0 of
+ {ok, case ToDrop >= 0 of
true -> BQS1 =
lists:foldl(
fun (const, BQSN) ->
@@ -785,7 +790,8 @@ process_instruction({set_length, Length},
BQSN1} = BQ:fetch(false, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
+ set_synchronised(
+ true, State #state { backing_queue_state = BQS1 });
false -> State
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
@@ -798,6 +804,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
+ Other when Other + 1 =:= Remaining ->
+ set_synchronised(true, State);
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -850,6 +858,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
+process_instruction({length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -877,3 +889,12 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
ack_num = Num }) ->
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+
+%% We intentionally leave out the head where a slave becomes
+%% unsynchronised: we assert that can never happen.
+set_synchronised(true, State = #state { synchronised = false }) ->
+ State #state { synchronised = true };
+set_synchronised(true, State) ->
+ State;
+set_synchronised(false, State = #state { synchronised = false }) ->
+ State.