diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-11 23:46:41 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-09-11 23:46:41 +0100 |
commit | f182b6135b803e0d77887046ad5fb6bb7adddea5 (patch) | |
tree | c2b5c09c192929e1ee61f4a7472490ff01bfcbc7 /src/rabbit_binding.erl | |
parent | 432aefa0b24e199ec5b694da32187b777348cf61 (diff) | |
download | rabbitmq-server-f182b6135b803e0d77887046ad5fb6bb7adddea5.tar.gz |
Implement exchange-to-exchange bindings
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 124 |
1 files changed, 67 insertions, 57 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index bb29580f..ec1816ca 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -33,11 +33,12 @@ -include("rabbit.hrl"). -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). --export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([list_for_exchange/1, list_for_destination/1, + list_for_exchange_and_destination/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, - remove_for_queue/1, remove_transient_for_queue/1]). + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -66,9 +67,11 @@ bind_res() | rabbit_types:error('binding_not_found')). -spec(list/1 :: (rabbit_types:vhost()) -> bindings()). -spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). --spec(list_for_exchange_and_queue/2 :: - (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_destination/1 :: + (rabbit_amqqueue:name()|rabbit_exchange:name()) -> bindings()). +-spec(list_for_exchange_and_destination/2 :: + (rabbit_exchange:name(), + rabbit_amqqueue:name() | rabbit_exchange:name()) -> bindings()). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). -spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> @@ -78,16 +81,16 @@ -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). -spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). --spec(remove_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(remove_transient_for_queue/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(remove_for_destination/1 :: + (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). +-spec(remove_transient_for_destination/1 :: + (rabbit_amqqueue:name() | rabbit_exchange:name()) -> fun (() -> any())). -endif. %%---------------------------------------------------------------------------- --define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). +-define(INFO_KEYS, [exchange_name, destination, routing_key, arguments]). recover() -> rabbit_misc:table_fold( @@ -101,26 +104,24 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_X, _D, B) -> mnesia:read({rabbit_route, B}) /= [] end). -add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). +add(Binding) -> add(Binding, fun (_X, _D) -> ok end). -remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). +remove(Binding) -> remove(Binding, fun (_X, _D) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (X, D, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else - case InnerFun(X, Q) of + case InnerFun(X, D) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> Durable = (X#exchange.durable andalso - Q#amqqueue.durable), - ok = sync_binding( - B, Durable, + [] -> ok = sync_binding( + B, are_endpoints_durable(X, D), fun mnesia:write/3), {new, X, B}; [_] -> {existing, X, B} @@ -141,16 +142,14 @@ add(Binding, InnerFun) -> remove(Binding, InnerFun) -> case binding_action( Binding, - fun (X, Q, B) -> + fun (X, D, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - [_] -> case InnerFun(X, Q) of + [_] -> case InnerFun(X, D) of ok -> - Durable = (X#exchange.durable andalso - Q#amqqueue.durable), ok = sync_binding( - B, Durable, + B, are_endpoints_durable(X, D), fun mnesia:delete_object/3), Deleted = rabbit_exchange:maybe_auto_delete(X), @@ -175,7 +174,7 @@ remove(Binding, InnerFun) -> list(VHostPath) -> Route = #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - queue_name = rabbit_misc:r(VHostPath, queue), + destination = rabbit_misc:r(VHostPath, '_'), _ = '_'}, _ = '_'}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, @@ -186,15 +185,15 @@ list_for_exchange(ExchangeName) -> [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. -list_for_queue(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, +list_for_destination(DestinationName) -> + Route = #route{binding = #binding{destination = DestinationName, _ = '_'}}, [reverse_binding(B) || #reverse_route{reverse_binding = B} <- mnesia:dirty_match_object(rabbit_reverse_route, reverse_route(Route))]. -list_for_exchange_and_queue(ExchangeName, QueueName) -> +list_for_exchange_and_destination(ExchangeName, DestinationName) -> Route = #route{binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = DestinationName, _ = '_'}}, [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, Route)]. @@ -208,10 +207,10 @@ map(VHostPath, F) -> infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. -i(exchange_name, #binding{exchange_name = XName}) -> XName; -i(queue_name, #binding{queue_name = QName}) -> QName; -i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; -i(arguments, #binding{args = Arguments}) -> Arguments; +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(destination, #binding{destination = Destination}) -> Destination; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). info(B = #binding{}) -> infos(?INFO_KEYS, B). @@ -241,22 +240,28 @@ remove_for_exchange(ExchangeName) -> _ = '_'}}, write)]. -remove_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_forward_routes/1). +remove_for_destination(DestinationName) -> + remove_for_destination(DestinationName, fun delete_forward_routes/1). -remove_transient_for_queue(QueueName) -> - remove_for_queue(QueueName, fun delete_transient_forward_routes/1). +remove_transient_for_destination(DestinationName) -> + remove_for_destination(DestinationName, + fun delete_transient_forward_routes/1). %%---------------------------------------------------------------------------- +are_endpoints_durable(#exchange{durable = A}, #amqqueue{durable = B}) -> + A andalso B; +are_endpoints_durable(#exchange{durable = A}, #exchange{durable = B}) -> + A andalso B. + binding_action(Binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, + destination = Destination, args = Arguments}, Fun) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> + call_with_exchange_and_destination( + ExchangeName, Destination, + fun (X, D) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), - Fun(X, Q, Binding#binding{args = SortedArgs}) + Fun(X, D, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> @@ -270,15 +275,19 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_exchange_and_queue(Exchange, Queue, Fun) -> +call_with_exchange_and_destination(Exchange, Destination, Fun) -> + DestTable = case Destination#resource.kind of + queue -> rabbit_queue; + exchange -> rabbit_exchange + end, rabbit_misc:execute_mnesia_transaction( fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end + mnesia:read({DestTable, Destination})} of + {[X], [D]} -> Fun(X, D); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, destination_not_found}; + {[ ], [ ]} -> {error, exchange_and_destination_not_found} + end end). %% Used with atoms from records; e.g., the type is expected to exist. @@ -293,7 +302,7 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -remove_for_queue(QueueName, FwdDeleteFun) -> +remove_for_destination(DestinationName, FwdDeleteFun) -> DeletedBindings = [begin Route = reverse_route(ReverseRoute), @@ -304,9 +313,10 @@ remove_for_queue(QueueName, FwdDeleteFun) -> end || ReverseRoute <- mnesia:match_object( rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), + reverse_route(#route{ + binding = #binding{ + destination = DestinationName, + _ = '_'}}), write)], Grouped = group_bindings_and_auto_delete( lists:keysort(#binding.exchange_name, DeletedBindings), []), @@ -360,19 +370,19 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}) -> #binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}; reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}) -> #reverse_binding{exchange_name = Exchange, - queue_name = Queue, + destination = Destination, key = Key, args = Args}. |