diff options
author | Michael Bridgen <mikeb@lshift.net> | 2009-11-04 16:50:02 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2009-11-04 16:50:02 +0000 |
commit | 6b3177ff16b00c5800377080805a1fb7d5ce0b5a (patch) | |
tree | 047096ff57eeaae9d2458aef3dc19d01336575d2 | |
parent | 9a19d03a4b227e10ede39188f30544048b1b852e (diff) | |
download | rabbitmq-server-6b3177ff16b00c5800377080805a1fb7d5ce0b5a.tar.gz |
bug 20578: Spring-clean queues on recovery by checking each to see if
it is exclusive to a dead process (connection); and if so, deleting
it. As things stand, a queue can't be owned by a connection on
another node (so the aliveness check is redundant), but this will
account for such scenarios in the event of connection- or
queue-mobility.
-rw-r--r-- | src/rabbit_amqqueue.erl | 57 |
1 files changed, 37 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b958f306..77895c2a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -120,24 +120,40 @@ recover() -> ok = recover_durable_queues(), ok. +shared_or_live_owner(none) -> + true; +shared_or_live_owner(Owner) when is_pid(Owner) -> + rpc:call(node(Owner), erlang, is_process_alive, [Owner]). + recover_durable_queues() -> Node = node(), lists:foreach( - fun (RecoveredQ) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) + fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }) -> + case shared_or_live_owner(Owner) of + true -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end; + false -> + rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> internal_delete2(RecoveredQ#amqqueue.name); + [] -> ok + end + end) end end, %% TODO: use dirty ops instead @@ -304,16 +320,17 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). +internal_delete2(QueueName) -> + ok = rabbit_exchange:delete_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; - [_] -> - ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - ok + [_] -> internal_delete2(QueueName) end end). |