summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl64
1 files changed, 39 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..382810c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -122,19 +122,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ durable_queues, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(durable_queues),
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -280,28 +293,29 @@ internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
+ [] -> {error, not_found};
+ [_] ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
ok = mnesia:delete({durable_queues, QueueName}),
ok
end
end).
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok.
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
- fun (Q, Acc) -> ok = delete_queue(Q), Acc end,
+ fun (QueueName, Acc) ->
+ ok = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
+ Acc
+ end,
ok,
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(amqqueue),
- node(Pid) == Node]))
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(amqqueue),
+ node(Pid) == Node]))
end).
pseudo_queue(QueueName, Pid) ->