summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-05-07 08:40:19 +0100
committerMatthias Radestock <matthias@lshift.net>2009-05-07 08:40:19 +0100
commit6a802eace79c664e84f77c2c38aee143f8da4991 (patch)
tree02ee05f1d7c07223333e49d59f5669b8e19d1d02 /src/rabbit_amqqueue.erl
parentfbd7eaf8e1fd45f322382cfa1d2291809cac4f1a (diff)
parent2f69044318f570317e3ddc3639e04ef26d8f16b8 (diff)
downloadrabbitmq-server-6a802eace79c664e84f77c2c38aee143f8da4991.tar.gz
merge bug20471 into v1_5
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl64
1 files changed, 39 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..382810c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -122,19 +122,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ durable_queues, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(durable_queues),
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -280,28 +293,29 @@ internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
+ [] -> {error, not_found};
+ [_] ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
ok = mnesia:delete({durable_queues, QueueName}),
ok
end
end).
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok.
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
- fun (Q, Acc) -> ok = delete_queue(Q), Acc end,
+ fun (QueueName, Acc) ->
+ ok = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
+ Acc
+ end,
ok,
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(amqqueue),
- node(Pid) == Node]))
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(amqqueue),
+ node(Pid) == Node]))
end).
pseudo_queue(QueueName, Pid) ->