summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-02-09 15:44:27 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-02-09 15:44:27 +0000
commita1460c9cc3b1ccce1f784b37466180da3d0d4f02 (patch)
tree40bebc6f60ea6a1e22d67fcfd41206e8f416d39c
parent86460d1ace24c3d650a08a5707f03fd65a781e3f (diff)
parenta807bfe03cb685bcbb3c569a4c2951fe87985c88 (diff)
downloadrabbitmq-server-a1460c9cc3b1ccce1f784b37466180da3d0d4f02.tar.gz
merge bug23806 into default (Re-declaring a queue on a different cluster node fails)
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_tests.erl34
4 files changed, 57 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2545b07c..1d423809 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
- case is_process_alive(QPid) of
+ case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
fun (Tx) -> TailFun(Tx), ExistingQ end
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c7e28fe..496b2064 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -790,20 +790,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e36b1dd1..abc27c5f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,6 +56,7 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -194,6 +195,7 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif.
@@ -861,3 +863,12 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ddb53b15..58c369b5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -82,6 +82,7 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
@@ -90,13 +91,14 @@ run_cluster_dependent_tests(SecondaryNode) ->
Remote = spawn(SecondaryNode,
fun () -> Rs = [ test_delegates_async(Node),
test_delegates_sync(Node),
- test_queue_cleanup(Node) ],
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
Self ! {self(), Rs}
end),
receive
{Remote, Result} ->
- Result = [passed, passed, passed]
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
throw(timeout)
end,
@@ -1310,6 +1312,32 @@ test_queue_cleanup(_SecondaryNode) ->
end,
passed.
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
+
%---------------------------------------------------------------------
control_action(Command, Args) ->