diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 64 |
1 files changed, 39 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb29..382810c3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -122,19 +122,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + durable_queues, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(durable_queues), + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -280,28 +293,29 @@ internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({amqqueue, QueueName}) of - [] -> {error, not_found}; - [Q] -> - ok = delete_queue(Q), + [] -> {error, not_found}; + [_] -> + ok = rabbit_exchange:delete_queue_bindings(QueueName), + ok = mnesia:delete({amqqueue, QueueName}), ok = mnesia:delete({durable_queues, QueueName}), ok end end). -delete_queue(#amqqueue{name = QueueName}) -> - ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), - ok. - on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( - fun (Q, Acc) -> ok = delete_queue(Q), Acc end, + fun (QueueName, Acc) -> + ok = rabbit_exchange:delete_transient_queue_bindings( + QueueName), + ok = mnesia:delete({amqqueue, QueueName}), + Acc + end, ok, - qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(amqqueue), - node(Pid) == Node])) + qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(amqqueue), + node(Pid) == Node])) end). pseudo_queue(QueueName, Pid) -> |