summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-19 15:10:11 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-19 15:10:11 +0100
commit3ecee483b94dd4c09875979d11cca3c58bcd89db (patch)
tree0cc35d08abbd5317c0968b9e5b67b2ec94ff39f6
parentf73b1f12810e85c81ad1693b005f6b51722c807d (diff)
downloadrabbitmq-server-3ecee483b94dd4c09875979d11cca3c58bcd89db.tar.gz
Make slaves explicitly monitor the master (see comments to come in bug)
-rw-r--r--src/rabbit_mirror_queue_slave.erl25
1 files changed, 20 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index fdf9d9bc..422b0d59 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -93,6 +93,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
write),
{ok, QPid}
end),
+ erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register(
@@ -149,11 +150,15 @@ handle_call({gm_deaths, Deaths}, From,
%% receive any more messages from GM
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
{ok, Pid} when node(Pid) =:= MNode ->
+ %% master hasn't changed
reply(ok, State);
{ok, Pid} when node(Pid) =:= node() ->
+ %% we've become master
promote_me(From, State);
{ok, Pid} ->
+ %% master has changed to not us.
gen_server2:reply(From, ok),
+ erlang:monitor(process, Pid),
ok = gm:broadcast(GM, heartbeat),
noreply(State #state { master_node = node(Pid) });
{error, not_found} ->
@@ -209,6 +214,11 @@ handle_cast({rollback, _Txn, _ChPid}, State) ->
handle_info(timeout, State) ->
noreply(backing_queue_idle_timeout(State));
+handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
+ State = #state { gm = GM }) ->
+ ok = gm:broadcast(GM, {process_death, Pid}),
+ noreply(State);
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -276,6 +286,16 @@ joined([SPid], _Members) ->
members_changed([_SPid], _Births, []) ->
ok;
members_changed([SPid], _Births, Deaths) ->
+ inform_deaths(SPid, Deaths).
+
+handle_msg([_SPid], _From, heartbeat) ->
+ ok;
+handle_msg([SPid], _From, {process_death, Pid}) ->
+ inform_deaths(SPid, [Pid]);
+handle_msg([SPid], _From, Msg) ->
+ ok = gen_server2:cast(SPid, {gm, Msg}).
+
+inform_deaths(SPid, Deaths) ->
rabbit_misc:with_exit_handler(
fun () -> {stop, normal} end,
fun () ->
@@ -287,11 +307,6 @@ members_changed([SPid], _Births, Deaths) ->
end
end).
-handle_msg([_SPid], _From, heartbeat) ->
- ok;
-handle_msg([SPid], _From, Msg) ->
- ok = gen_server2:cast(SPid, {gm, Msg}).
-
%% ---------------------------------------------------------------------------
%% Others
%% ---------------------------------------------------------------------------