summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-04-14 22:02:58 +0100
committerMatthias Radestock <matthias@lshift.net>2009-04-14 22:02:58 +0100
commitf504e1f82254e56c55e175ff9df7dbeb3deb55a0 (patch)
tree9452f4aacc3657f8e870fe6243127624850dfe8b
parent3f4040d9937088b93ae8050923ec53e1f028829a (diff)
downloadrabbitmq-server-bug20590.tar.gz
don't remove a queue's durable routes on node_downbug20590
...since that means we can't recover them when the queue re-appears. Also, rename rabbit_exchange:delete_bindings_for_X to delete_X_bindings - the former was becoming a mouthful.
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_exchange.erl22
2 files changed, 29 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 69c97dfe..382810c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -293,28 +293,29 @@ internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
+ [] -> {error, not_found};
+ [_] ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
ok = mnesia:delete({durable_queues, QueueName}),
ok
end
end).
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok.
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
- fun (Q, Acc) -> ok = delete_queue(Q), Acc end,
+ fun (QueueName, Acc) ->
+ ok = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
+ Acc
+ end,
ok,
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(amqqueue),
- node(Pid) == Node]))
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(amqqueue),
+ node(Pid) == Node]))
end).
pseudo_queue(QueueName, Pid) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e72669ac..b70b868d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -40,7 +40,7 @@
route/2]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
--export([delete_bindings_for_queue/1]).
+-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2]).
%% EXTENDED API
@@ -86,7 +86,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
+-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
@@ -271,18 +272,24 @@ lookup_qpids(Queues) ->
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
-delete_bindings_for_exchange(ExchangeName) ->
+delete_exchange_bindings(ExchangeName) ->
indexed_delete(
#route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
fun delete_forward_routes/1, fun mnesia:delete_object/1).
-delete_bindings_for_queue(QueueName) ->
+delete_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_forward_routes/1).
+
+delete_transient_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
+
+delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
indexed_delete(
reverse_route(#route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
- fun mnesia:delete_object/1, fun delete_forward_routes/1),
+ fun mnesia:delete_object/1, FwdDeleteFun),
[begin
[X] = mnesia:read({exchange, ExchangeName}),
ok = maybe_auto_delete(X)
@@ -300,6 +307,9 @@ delete_forward_routes(Route) ->
ok = mnesia:delete_object(Route),
ok = mnesia:delete_object(durable_routes, Route, write).
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(Route).
+
exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
#route{binding = #binding{exchange_name = '$1',
@@ -467,7 +477,7 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
- ok = delete_bindings_for_exchange(ExchangeName),
+ ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({durable_exchanges, ExchangeName}),
ok = mnesia:delete({exchange, ExchangeName}).