summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-10-10 11:38:54 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-10-10 11:38:54 +0100
commitba1c1d0c31e7038cd9f64577d24a288e515f004f (patch)
treeebbefebdc4b0874107b8c32a47945ec654ceb9b1
parent305814e9c50b14223e55713dd957b663b54a9445 (diff)
parenteeb04e43418810e499822eed4b2e88b9e9037471 (diff)
downloadrabbitmq-server-ba1c1d0c31e7038cd9f64577d24a288e515f004f.tar.gz
Merge bug 26408
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_node_monitor.erl9
-rw-r--r--src/rabbit_prequeue.erl7
4 files changed, 15 insertions, 10 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 63f229be..dd4d5c76 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -853,9 +853,8 @@ ntoab(IP) ->
%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
%% would be bad news.
is_process_alive(Pid) ->
- Node = node(Pid),
- lists:member(Node, [node() | nodes()]) andalso
- rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
+ rabbit_mnesia:on_running_node(Pid) andalso
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
pget(K, P) -> proplists:get_value(K, P).
pget(K, P, D) -> proplists:get_value(K, P, D).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 2bada7ad..19fd01a1 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -27,6 +27,7 @@
status/0,
is_clustered/0,
+ on_running_node/1,
cluster_nodes/1,
node_type/0,
dir/0,
@@ -71,6 +72,7 @@
{'running_nodes', [node()]} |
{'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
+-spec(on_running_node/1 :: (pid()) -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
@@ -336,6 +338,8 @@ is_running() -> mnesia:system_info(is_running) =:= yes.
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
+on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)).
+
cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index a7131ded..a948115d 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -619,10 +619,11 @@ cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
%%--------------------------------------------------------------------
%% mnesia:system_info(db_nodes) (and hence
-%% rabbit_mnesia:cluster_nodes(running)) does not give reliable
-%% results when partitioned. So we have a small set of replacement
-%% functions here. "rabbit" in a function's name implies we test if
-%% the rabbit application is up, not just the node.
+%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes
+%% when partitioned, just those that we are sharing Mnesia state
+%% with. So we have a small set of replacement functions
+%% here. "rabbit" in a function's name implies we test if the rabbit
+%% application is up, not just the node.
%% As we use these functions to decide what to do in pause_minority
%% state, they *must* be fast, even in the case where TCP connections
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index b1d92b89..e7c2b5e4 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -64,13 +64,14 @@ init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);
init(#amqqueue{name = QueueName}, restart) ->
{ok, Q = #amqqueue{pid = QPid,
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
- Local = node(QPid) =:= node(),
+ LocalOrMasterDown = node(QPid) =:= node()
+ orelse not rabbit_mnesia:on_running_node(QPid),
Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
case rabbit_misc:is_process_alive(QPid) of
- true -> false = Local, %% assertion
+ true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
rabbit_mirror_queue_slave:init(Q); %% [1]
- false -> case Local andalso Slaves =:= [] of
+ false -> case LocalOrMasterDown andalso Slaves =:= [] of
true -> crash_restart(Q); %% [2]
false -> timer:sleep(25),
init(Q, restart) %% [3]