summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2009-11-04 16:50:02 +0000
committerMichael Bridgen <mikeb@lshift.net>2009-11-04 16:50:02 +0000
commit6b3177ff16b00c5800377080805a1fb7d5ce0b5a (patch)
tree047096ff57eeaae9d2458aef3dc19d01336575d2
parent9a19d03a4b227e10ede39188f30544048b1b852e (diff)
downloadrabbitmq-server-6b3177ff16b00c5800377080805a1fb7d5ce0b5a.tar.gz
bug 20578: Spring-clean queues on recovery by checking each to see if
it is exclusive to a dead process (connection); and if so, deleting it. As things stand, a queue can't be owned by a connection on another node (so the aliveness check is redundant), but this will account for such scenarios in the event of connection- or queue-mobility.
-rw-r--r--src/rabbit_amqqueue.erl57
1 files changed, 37 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b958f306..77895c2a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -120,24 +120,40 @@ recover() ->
ok = recover_durable_queues(),
ok.
+shared_or_live_owner(none) ->
+ true;
+shared_or_live_owner(Owner) when is_pid(Owner) ->
+ rpc:call(node(Owner), erlang, is_process_alive, [Owner]).
+
recover_durable_queues() ->
Node = node(),
lists:foreach(
- fun (RecoveredQ) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
+ fun (RecoveredQ = #amqqueue{ exclusive_owner = Owner }) ->
+ case shared_or_live_owner(Owner) of
+ true ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end;
+ false ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read) of
+ [_] -> internal_delete2(RecoveredQ#amqqueue.name);
+ [] -> ok
+ end
+ end)
end
end,
%% TODO: use dirty ops instead
@@ -304,16 +320,17 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+internal_delete2(QueueName) ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [_] ->
- ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- ok
+ [_] -> internal_delete2(QueueName)
end
end).