summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl84
1 files changed, 31 insertions, 53 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index be73e818..495fc4b3 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -105,13 +105,7 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route),
- %% Tell exchanges to recover themselves only *after* we've
- %% recovered their bindings.
- ok = rabbit_misc:table_foreach(
- fun(Exchange = #exchange{type = Type}) ->
- ok = Type:recover(Exchange)
- end, rabbit_durable_exchange).
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -119,6 +113,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
+ ok = Type:declare(Exchange),
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, ExchangeName}) of
@@ -128,7 +123,6 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange, write);
true -> ok
end,
- ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
@@ -263,43 +257,21 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- FwdRoute = reverse_route(Route),
- ok = FwdDeleteFun(FwdRoute),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
- FwdRoute#route.binding
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- %% We need the keysort to group the bindings by exchange name, so
- %% that cleanup_deleted_queue_bindings can inform the exchange of
- %% its vanished bindings before maybe_auto_delete'ing the
- %% exchange.
- ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings),
- none, []).
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings)
- when N =:= ExchangeName ->
- cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings),
- cleanup_deleted_queue_bindings(Rest, N, [B]).
-
-cleanup_deleted_queue_bindings1(none, []) ->
- ok;
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}),
- [ok = Type:delete_binding(X, B) || B <- Bindings],
- ok = maybe_auto_delete(X).
+ Exchanges = exchanges_for_queue(QueueName),
+ [begin
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ ok = maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ ok.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -308,6 +280,15 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
+
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -346,25 +327,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- ok = Type:add_binding(X, B)
+ fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
- ok = Type:delete_binding(X, B),
maybe_auto_delete(X)
end
end).
@@ -455,11 +434,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
+unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- ok = Type:delete(X).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API