diff options
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r-- | src/rabbit_exchange.erl | 84 |
1 files changed, 31 insertions, 53 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index be73e818..495fc4b3 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -105,13 +105,7 @@ recover() -> Route, write), ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write) - end, rabbit_durable_route), - %% Tell exchanges to recover themselves only *after* we've - %% recovered their bindings. - ok = rabbit_misc:table_foreach( - fun(Exchange = #exchange{type = Type}) -> - ok = Type:recover(Exchange) - end, rabbit_durable_exchange). + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -119,6 +113,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, + ok = Type:declare(Exchange), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, ExchangeName}) of @@ -128,7 +123,6 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange, write); true -> ok end, - ok = Type:init(Exchange), Exchange; [ExistingX] -> ExistingX end @@ -263,43 +257,21 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - FwdRoute = reverse_route(Route), - ok = FwdDeleteFun(FwdRoute), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write), - FwdRoute#route.binding - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - %% We need the keysort to group the bindings by exchange name, so - %% that cleanup_deleted_queue_bindings can inform the exchange of - %% its vanished bindings before maybe_auto_delete'ing the - %% exchange. - ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings), - none, []). - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) - when N =:= ExchangeName -> - cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings), - cleanup_deleted_queue_bindings(Rest, N, [B]). - -cleanup_deleted_queue_bindings1(none, []) -> - ok; -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}), - [ok = Type:delete_binding(X, B) || B <- Bindings], - ok = maybe_auto_delete(X). + Exchanges = exchanges_for_queue(QueueName), + [begin + ok = FwdDeleteFun(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -308,6 +280,15 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -346,25 +327,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - ok = Type:add_binding(X, B) + fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; _ -> ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), - ok = Type:delete_binding(X, B), maybe_auto_delete(X) end end). @@ -455,11 +434,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) -> +unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - ok = Type:delete(X). + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API |