diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-02-09 15:44:27 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-02-09 15:44:27 +0000 |
commit | a1460c9cc3b1ccce1f784b37466180da3d0d4f02 (patch) | |
tree | 40bebc6f60ea6a1e22d67fcfd41206e8f416d39c | |
parent | 86460d1ace24c3d650a08a5707f03fd65a781e3f (diff) | |
parent | a807bfe03cb685bcbb3c569a4c2951fe87985c88 (diff) | |
download | rabbitmq-server-a1460c9cc3b1ccce1f784b37466180da3d0d4f02.tar.gz |
merge bug23806 into default (Re-declaring a queue on a different cluster node fails)
-rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 34 |
4 files changed, 57 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2545b07c..1d423809 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -218,7 +218,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> rabbit_misc:const(not_found) end; [ExistingQ = #amqqueue{pid = QPid}] -> - case is_process_alive(QPid) of + case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), fun (Tx) -> TailFun(Tx), ExistingQ end diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c7e28fe..496b2064 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -790,20 +790,20 @@ handle_call({init, Recover}, From, handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) -> - case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of - true -> erlang:monitor(process, Owner), - declare(Recover, From, State); - _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined} = State, - gen_server2:reply(From, not_found), - case Recover of - true -> ok; - _ -> rabbit_log:warning( - "Queue ~p exclusive owner went away~n", [QName]) - end, - BQS = BQ:init(QName, IsDurable, Recover), - %% Rely on terminate to delete the queue. - {stop, normal, State#q{backing_queue_state = BQS}} + case rabbit_misc:is_process_alive(Owner) of + true -> erlang:monitor(process, Owner), + declare(Recover, From, State); + false -> #q{backing_queue = BQ, backing_queue_state = undefined, + q = #amqqueue{name = QName, durable = IsDurable}} = State, + gen_server2:reply(From, not_found), + case Recover of + true -> ok; + _ -> rabbit_log:warning( + "Queue ~p exclusive owner went away~n", [QName]) + end, + BQS = BQ:init(QName, IsDurable, Recover), + %% Rely on terminate to delete the queue. + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e36b1dd1..abc27c5f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,6 +56,7 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). %%---------------------------------------------------------------------------- @@ -194,6 +195,7 @@ -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -endif. @@ -861,3 +863,12 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of + true -> true; + _ -> false + end. + diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ddb53b15..58c369b5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -82,6 +82,7 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), passed = test_queue_cleanup(SecondaryNode), + passed = test_declare_on_dead_queue(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate @@ -90,13 +91,14 @@ run_cluster_dependent_tests(SecondaryNode) -> Remote = spawn(SecondaryNode, fun () -> Rs = [ test_delegates_async(Node), test_delegates_sync(Node), - test_queue_cleanup(Node) ], + test_queue_cleanup(Node), + test_declare_on_dead_queue(Node) ], Self ! {self(), Rs} end), receive {Remote, Result} -> - Result = [passed, passed, passed] - after 2000 -> + Result = lists:duplicate(length(Result), passed) + after 30000 -> throw(timeout) end, @@ -1310,6 +1312,32 @@ test_queue_cleanup(_SecondaryNode) -> end, passed. +test_declare_on_dead_queue(SecondaryNode) -> + QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME), + Self = self(), + Pid = spawn(SecondaryNode, + fun () -> + {new, #amqqueue{name = QueueName, pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], + none), + exit(QPid, kill), + Self ! {self(), killed, QPid} + end), + receive + {Pid, killed, QPid} -> + {existing, #amqqueue{name = QueueName, + pid = QPid}} = + rabbit_amqqueue:declare(QueueName, false, false, [], none), + false = rabbit_misc:is_process_alive(QPid), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], + none), + true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + {ok, 0} = rabbit_amqqueue:delete(Q, false, false), + passed + after 2000 -> + throw(failed_to_create_and_kill_queue) + end. + %--------------------------------------------------------------------- control_action(Command, Args) -> |