summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_slave.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_slave.erl')
-rw-r--r--src/rabbit_mirror_queue_slave.erl181
1 files changed, 119 insertions, 62 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 93340ba8..3c453981 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
@@ -33,11 +33,11 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2]).
+-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -47,6 +47,15 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ master_pid,
+ is_synchronised
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
+
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -64,7 +73,9 @@
ack_num,
msg_id_status,
- known_senders
+ known_senders,
+
+ synchronised
}).
start_link(Q) ->
@@ -73,6 +84,9 @@ start_link(Q) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+info(QPid) ->
+ gen_server2:call(QPid, info, infinity).
+
init([#amqqueue { name = QueueName } = Q]) ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -89,33 +103,38 @@ init([#amqqueue { name = QueueName } = Q]) ->
%% ASSERTION
[] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
MPids1 = MPids ++ [Self],
- mnesia:write(rabbit_queue,
- Q1 #amqqueue { slave_pids = MPids1 },
- write),
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
{ok, QPid}
end),
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
BQS = bq_init(BQ, Q, false),
- {ok, #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new()
- }, hibernate,
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
+ },
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
@@ -145,29 +164,32 @@ handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
gm = GM,
master_pid = MPid }) ->
- rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n",
- [rabbit_misc:rs(QueueName),
- rabbit_misc:pid_to_string(self()),
- [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]),
%% The GM has told us about deaths, which means we're not going to
%% receive any more messages from GM
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, Pid} when node(Pid) =:= node(MPid) ->
- %% master hasn't changed
- reply(ok, State);
- {ok, Pid} when node(Pid) =:= node() ->
- %% we've become master
- promote_me(From, State);
- {ok, Pid} ->
- %% master has changed to not us.
- gen_server2:reply(From, ok),
- erlang:monitor(process, Pid),
- ok = gm:broadcast(GM, heartbeat),
- noreply(State #state { master_pid = Pid });
{error, not_found} ->
gen_server2:reply(From, ok),
- {stop, normal, State}
- end.
+ {stop, normal, State};
+ {ok, Pid, DeadPids} ->
+ rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ DeadPids),
+ if node(Pid) =:= node(MPid) ->
+ %% master hasn't changed
+ reply(ok, State);
+ node(Pid) =:= node() ->
+ %% we've become master
+ promote_me(From, State);
+ true ->
+ %% master has changed to not us.
+ gen_server2:reply(From, ok),
+ erlang:monitor(process, Pid),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_pid = Pid })
+ end
+ end;
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -187,9 +209,9 @@ handle_cast({set_ram_duration_target, Duration},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- noreply(State #state { backing_queue_state = BQS1 });
+ noreply(State #state { backing_queue_state = BQS1 }).
-handle_cast(update_ram_duration,
+handle_info(update_ram_duration,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
@@ -199,9 +221,9 @@ handle_cast(update_ram_duration,
noreply(State #state { rate_timer_ref = just_measured,
backing_queue_state = BQS2 });
-handle_cast(sync_timeout, State) ->
+handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
- State #state { sync_timer_ref = undefined })).
+ State #state { sync_timer_ref = undefined }));
handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
@@ -260,22 +282,28 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _State) ->
case Msg of
+ info -> 9;
{gm_deaths, _Deaths} -> 5;
_ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- sync_timeout -> 6;
{gm, _Msg} -> 5;
{post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ sync_timeout -> 6;
+ _ -> 0
+ end.
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -291,6 +319,9 @@ members_changed([SPid], _Births, Deaths) ->
handle_msg([_SPid], _From, heartbeat) ->
ok;
+handle_msg([_SPid], _From, request_length) ->
+ %% This is only of value to the master
+ ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
@@ -315,6 +346,14 @@ inform_deaths(SPid, Deaths) ->
%% Others
%% ---------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _State) -> self();
+i(name, #state { q = #amqqueue { name = Name } }) -> Name;
+i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(Item, _State) -> throw({bad_argument, Item}).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
@@ -380,7 +419,7 @@ gb_trees_cons(Key, Value, Tree) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, #state { q = Q,
+promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -389,12 +428,14 @@ promote_me(From, #state { q = Q,
msg_id_ack = MA,
msg_id_status = MS,
known_senders = KS }) ->
+ rabbit_event:notify(queue_slave_promoted, [{pid, self()},
+ {name, QName}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
- [rabbit_misc:rs(Q #amqqueue.name),
- rabbit_misc:pid_to_string(self())]),
+ [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
+ rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
@@ -516,8 +557,7 @@ backing_queue_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(
- ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
State.
@@ -525,14 +565,12 @@ ensure_sync_timer(State) ->
stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
State;
stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State #state { sync_timer_ref = undefined }.
ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(
- ?RAM_DURATION_UPDATE_INTERVAL,
- rabbit_amqqueue, update_ram_duration,
- [self()]),
+ TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
State #state { rate_timer_ref = TRef };
ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
State #state { rate_timer_ref = undefined };
@@ -544,7 +582,7 @@ stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
State #state { rate_timer_ref = undefined };
stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
+ erlang:cancel_timer(TRef),
State #state { rate_timer_ref = undefined }.
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
@@ -748,7 +786,7 @@ process_instruction({set_length, Length},
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop > 0 of
+ {ok, case ToDrop >= 0 of
true -> BQS1 =
lists:foldl(
fun (const, BQSN) ->
@@ -756,7 +794,8 @@ process_instruction({set_length, Length},
BQSN1} = BQ:fetch(false, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
+ set_synchronised(
+ true, State #state { backing_queue_state = BQS1 });
false -> State
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
@@ -769,6 +808,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
+ Other when Other + 1 =:= Remaining ->
+ set_synchronised(true, State);
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -821,6 +862,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
+process_instruction({length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -848,3 +893,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
ack_num = Num }) ->
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+
+%% We intentionally leave out the head where a slave becomes
+%% unsynchronised: we assert that can never happen.
+set_synchronised(true, State = #state { q = #amqqueue { name = QName },
+ synchronised = false }) ->
+ rabbit_event:notify(queue_slave_synchronised, [{pid, self()},
+ {name, QName}]),
+ State #state { synchronised = true };
+set_synchronised(true, State) ->
+ State;
+set_synchronised(false, State = #state { synchronised = false }) ->
+ State.