summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-10-10 15:28:49 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-10-10 15:28:49 +0100
commitc8e2664f60e13f74ebe712f52ddf467f4fd53343 (patch)
treeab948184f690adeee6a274ea322ee9768d0170df
parenteeb04e43418810e499822eed4b2e88b9e9037471 (diff)
downloadrabbitmq-server-bug26408.tar.gz
Separate out different is_process_alive implementations depending on whether we want to consider the process alive if it is running but we can't talk to it via Mnesia. Thus unbreak exclusive queues with the direct client from non-Rabbit nodes.bug26408
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_mnesia.erl10
-rw-r--r--src/rabbit_prequeue.erl4
5 files changed, 23 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c4abfd9d..68e96742 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -398,7 +398,7 @@ with(Name, F, E) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_misc:is_process_alive(QPid),
+ fun () -> false = rabbit_mnesia:is_process_alive(QPid),
timer:sleep(25),
with(Name, F, E)
end, fun () -> F(Q) end);
@@ -772,7 +772,7 @@ on_node_down(Node) ->
slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node andalso
- not rabbit_misc:is_process_alive(Pid)])),
+ not rabbit_mnesia:is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index af1e2141..58fbcbe0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -167,11 +167,11 @@ init_it(Self, GM, Node, QName) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
{new, QPid, GMPids};
- [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
end;
- [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of
true -> existing;
false -> GMPids1 = [T || T = {_, S} <- GMPids,
S =/= SPid],
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dd4d5c76..2bd81a86 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -852,9 +852,14 @@ ntoab(IP) ->
%% We try to avoid reconnecting to down nodes here; this is used in a
%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
%% would be bad news.
+%%
+%% See also rabbit_mnesia:is_process_alive/1 which also requires the
+%% process be in the same running cluster as us (i.e. not partitioned
+%% or some random node).
is_process_alive(Pid) ->
- rabbit_mnesia:on_running_node(Pid) andalso
- rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+ Node = node(Pid),
+ lists:member(Node, [node() | nodes()]) andalso
+ rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
pget(K, P) -> proplists:get_value(K, P).
pget(K, P, D) -> proplists:get_value(K, P, D).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 19fd01a1..fa51dd70 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -28,6 +28,7 @@
status/0,
is_clustered/0,
on_running_node/1,
+ is_process_alive/1,
cluster_nodes/1,
node_type/0,
dir/0,
@@ -73,6 +74,7 @@
{'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(on_running_node/1 :: (pid()) -> boolean()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
@@ -340,6 +342,14 @@ is_clustered() -> AllNodes = cluster_nodes(all),
on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)).
+%% This requires the process be in the same running cluster as us
+%% (i.e. not partitioned or some random node).
+%%
+%% See also rabbit_misc:is_process_alive/1 which does not.
+is_process_alive(Pid) ->
+ on_running_node(Pid) andalso
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+
cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index e7c2b5e4..16e30cac 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -66,8 +66,8 @@ init(#amqqueue{name = QueueName}, restart) ->
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
LocalOrMasterDown = node(QPid) =:= node()
orelse not rabbit_mnesia:on_running_node(QPid),
- Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
- case rabbit_misc:is_process_alive(QPid) of
+ Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
+ case rabbit_mnesia:is_process_alive(QPid) of
true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
rabbit_mirror_queue_slave:init(Q); %% [1]