diff options
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 346 |
1 files changed, 204 insertions, 142 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 19150fa9..1af213c4 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,29 +33,35 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([list_for_source/1, list_for_destination/1, + list_for_source_and_destination/2]). +-export([new_deletions/0, combine_deletions/2, add_deletion/3, + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx --export([has_for_exchange/1, remove_for_exchange/1, - remove_for_queue/1, remove_transient_for_queue/1]). +-export([has_for_source/1, remove_for_source/1, + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([key/0]). +-export_type([key/0, deletions/0]). -type(key() :: binary()). --type(bind_errors() :: rabbit_types:error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). +-type(bind_errors() :: rabbit_types:error('source_not_found' | + 'destination_not_found' | + 'source_and_destination_not_found')). -type(bind_res() :: 'ok' | bind_errors()). -type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> + fun((rabbit_types:exchange(), + rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-opaque(deletions() :: dict:dictionary()). + -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> bind_res()). @@ -65,10 +71,13 @@ -spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). --spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). --spec(list_for_exchange_and_queue/2 :: - (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_source/1 :: + (rabbit_types:binding_source()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_types:binding_destination()) -> bindings()). +-spec(list_for_source_and_destination/2 :: + (rabbit_types:binding_source(), rabbit_types:binding_destination()) -> + bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -76,18 +85,27 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). -spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) -> [[rabbit_types:info()]]). --spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(remove_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(remove_transient_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). +-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). +-spec(remove_for_destination/1 :: + (rabbit_types:binding_destination()) -> deletions()). +-spec(remove_transient_for_destination/1 :: + (rabbit_types:binding_destination()) -> deletions()). +-spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). +-spec(add_deletion/3 :: (rabbit_exchange:name(), + {'undefined' | rabbit_types:binding_source(), + 'deleted' | 'not_deleted', + deletions()}, deletions()) -> deletions()). +-spec(new_deletions/0 :: () -> deletions()). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). +-define(INFO_KEYS, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -101,36 +119,34 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). +add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). +remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, Q) of + case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, + [] -> ok = sync_binding( + B, all_durable([Src, Dst]), fun mnesia:write/3), - {new, X, B}; - [_] -> {existing, X, B} + {new, Src, B}; + [_] -> {existing, Src, B} end; {error, _} = E -> E end end) of - {new, X = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(X, B), + {new, Src = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Src, B), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -141,61 +157,55 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (Src, Dst, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, Q) of - ok -> - Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, - fun mnesia:delete_object/3), - Deleted = - rabbit_exchange:maybe_auto_delete(X), - {{Deleted, X}, B}; - {error, _} = E -> - E - end + [] -> + {error, binding_not_found}; + [_] -> + case InnerFun(Src, Dst) of + ok -> + ok = sync_binding( + B, all_durable([Src, Dst]), + fun mnesia:delete_object/3), + {ok, + maybe_auto_delete(B#binding.source, + [B], new_deletions())}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> ok = Module:delete(X, [B]); - not_deleted -> ok = Module:remove_bindings(X, [B]) - end, - rabbit_event:notify(binding_deleted, info(B)), - ok + {ok, Deletions} -> + ok = process_deletions(Deletions) end. list(VHostPath) -> - Route = #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - queue_name = rabbit_misc:r(VHostPath, queue), - _ = '_'}, + VHostResource = rabbit_misc:r(VHostPath, '_'), + Route = #route{binding = #binding{source = VHostResource, + destination = VHostResource, + _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_exchange(XName) -> - Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +list_for_source(SrcName) -> + Route = #route{binding = #binding{source = SrcName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_queue(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, +list_for_destination(DstName) -> + Route = #route{binding = #binding{destination = DstName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(XName, QueueName) -> - Route = #route{binding = #binding{exchange_name = XName, - queue_name = QueueName, - _ = '_'}}, +list_for_source_and_destination(SrcName, DstName) -> + Route = #route{binding = #binding{source = SrcName, + destination = DstName, + _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -208,10 +218,12 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(queue_name, #binding{queue_name = QName}) -> QName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(source_name, #binding{source = SrcName}) -> SrcName#resource.name; +i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind; +i(destination_name, #binding{destination = DstName}) -> DstName#resource.name; +i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -222,14 +234,14 @@ info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). -has_for_exchange(XName) -> - Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, +has_for_source(SrcName) -> + Match = #route{binding = #binding{source = SrcName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). -remove_for_exchange(XName) -> +remove_for_source(SrcName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), @@ -237,26 +249,31 @@ remove_for_exchange(XName) -> Route#route.binding end || Route <- mnesia:match_object( rabbit_route, - #route{binding = #binding{exchange_name = XName, - _ = '_'}}, + #route{binding = #binding{source = SrcName, + _ = '_'}}, write)]. -remove_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_forward_routes/1). +remove_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_forward_routes/1). -remove_transient_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(DstName) -> + remove_for_destination(DstName, fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- -binding_action(Binding = #binding{exchange_name = XName, - queue_name = QueueName, - args = Arguments}, Fun) -> - call_with_exchange_and_queue( - XName, QueueName, - fun (X, Q) -> +all_durable(Resources) -> + lists:all(fun (#exchange{durable = D}) -> D; + (#amqqueue{durable = D}) -> D + end, Resources). + +binding_action(Binding = #binding{source = SrcName, + destination = DstName, + args = Arguments}, Fun) -> + call_with_source_and_destination( + SrcName, DstName, + fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, Q, Binding#binding{args = SortedArgs}) + Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -270,17 +287,22 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(XName, QueueName, Fun) -> +call_with_source_and_destination(SrcName, DstName, Fun) -> + SrcTable = table_for_resource(SrcName), + DstTable = table_for_resource(DstName), rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, XName}), - mnesia:read({rabbit_queue, QueueName})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end + fun () -> case {mnesia:read({SrcTable, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> {error, source_not_found}; + {[_], [] } -> {error, destination_not_found}; + {[], [] } -> {error, source_and_destination_not_found} + end end). +table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; +table_for_resource(#resource{kind = queue}) -> rabbit_queue. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), @@ -293,8 +315,8 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_queue(QueueName, FwdDeleteFun) -> - DeletedBindings = +remove_for_destination(DstName, FwdDeleteFun) -> + Bindings = [begin Route = reverse_route(ReverseRoute), ok = FwdDeleteFun(Route), @@ -304,40 +326,41 @@ remove_for_queue(QueueName, FwdDeleteFun) -> end || ReverseRoute <- mnesia:match_object( rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), + reverse_route(#route{ + binding = #binding{ + destination = DstName, + _ = '_'}}), write)], - Grouped = group_bindings_and_auto_delete( - lists:keysort(#binding.exchange_name, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) - end - end, Grouped) - end. + group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), + lists:keysort(#binding.source, Bindings)). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_and_auto_delete([], Acc) -> +group_bindings_fold(_Fun, Acc, []) -> Acc; -group_bindings_and_auto_delete( - [B = #binding{exchange_name = XName} | Bs], Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B], Acc). - -group_bindings_and_auto_delete( - XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> - group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> - %% either Removed is [], or its head has a non-matching XName - [X] = mnesia:read({rabbit_exchange, XName}), - NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], - group_bindings_and_auto_delete(Removed, NewAcc). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). + +group_bindings_fold( + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> + %% Either Removed is [], or its head has a non-matching SrcName. + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + +maybe_auto_delete(XName, Bindings, Deletions) -> + case rabbit_exchange:lookup(XName) of + {error, not_found} -> + add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); + {ok, X} -> + add_deletion(XName, {X, not_deleted, Bindings}, + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> Deletions; + {deleted, Deletions1} -> combine_deletions( + Deletions, Deletions1) + end) + end. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -358,20 +381,59 @@ reverse_route(#route{binding = Binding}) -> reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. -reverse_binding(#reverse_binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}) -> - #binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = XName, - queue_name = QueueName, - key = Key, - args = Args}. +reverse_binding(#reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}; + +reverse_binding(#binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}) -> + #reverse_binding{source = SrcName, + destination = DstName, + key = Key, + args = Args}. + +%% ---------------------------------------------------------------------------- +%% Binding / exchange deletion abstraction API +%% ---------------------------------------------------------------------------- + +anything_but( NotThis, NotThis, NotThis) -> NotThis; +anything_but( NotThis, NotThis, This) -> This; +anything_but( NotThis, This, NotThis) -> This; +anything_but(_NotThis, This, This) -> This. + +new_deletions() -> dict:new(). + +add_deletion(XName, Entry, Deletions) -> + dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, + Entry, Deletions). + +combine_deletions(Deletions1, Deletions2) -> + dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, + Deletions1, Deletions2). + +merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> + {anything_but(undefined, X1, X2), + anything_but(not_deleted, Deleted1, Deleted2), + [Bindings1 | Bindings2]}. + +process_deletions(Deletions) -> + dict:fold( + fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> + FlatBindings = lists:flatten(Bindings), + [rabbit_event:notify(binding_deleted, info(B)) || + B <- FlatBindings], + TypeModule = type_to_module(Type), + case Deleted of + not_deleted -> TypeModule:remove_bindings(X, FlatBindings); + deleted -> rabbit_event:notify(exchange_deleted, + [{name, X#exchange.name}]), + TypeModule:delete(X, FlatBindings) + end + end, ok, Deletions). |