diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-04 06:31:01 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-04 06:31:01 +0100 |
commit | 5e3cc6acfac3278f78be4840dac42c10469b8366 (patch) | |
tree | 16a6d54aabd8be9ec04a885d25f0b610d100a2ff /src/rabbit_binding.erl | |
parent | 726dec5e26dca42fd0eacd1b3f421e35d7a25684 (diff) | |
download | rabbitmq-server-5e3cc6acfac3278f78be4840dac42c10469b8366.tar.gz |
tweak and extend rabbit_binding API
- 'add', 'remove' take binding records instead of losts of args
- 'list*' return #binding records instead of tuples
- add 'list_for_exchange_and_queue'
- add 'info*' functions
Also fix two bugs:
- don't invoke rabbit_event:notify inside mnesia tx
- include complete binding info in binding_deleted event
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 197 |
1 files changed, 102 insertions, 95 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 3569ba93..a815f544 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -32,8 +32,9 @@ -module(rabbit_binding). -include("rabbit.hrl"). --export([recover/0, add/5, remove/5, list/1]). --export([list_for_exchange/1, list_for_queue/1]). +-export([recover/0, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_exchange/1, remove_for_exchange/1, remove_for_queue/1, remove_transient_for_queue/1]). @@ -52,31 +53,26 @@ -type(inner_fun() :: fun((rabbit_types:exchange(), queue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). -spec(recover/0 :: () -> [rabbit_types:binding()]). --spec(add/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res()). --spec(remove/5 :: - (rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), rabbit_framing:amqp_table(), - inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list/1 :: (rabbit_types:vhost()) -> - [{rabbit_exchange:name(), rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_exchange/1 :: - (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_for_queue/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(), - rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). -spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). --spec(remove_for_exchange/1 :: - (rabbit_exchange:name()) -> [rabbit_types:binding()]). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). -spec(remove_for_queue/1 :: (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(remove_transient_for_queue/1 :: @@ -86,6 +82,8 @@ %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + recover() -> rabbit_misc:table_fold( fun (Route = #route{binding = B}, Acc) -> @@ -95,9 +93,9 @@ recover() -> [B | Acc] end, [], rabbit_durable_route). -add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +add(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to @@ -105,58 +103,47 @@ add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case InnerFun(X, Q) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} end; {error, _} = E -> E end end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); + {new, Exchange = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(Exchange, B), + rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; {error, _} = Err -> Err end. -remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> +remove(Binding, InnerFun) -> case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, + Binding, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - Del = rabbit_exchange:maybe_auto_delete(X), - {{Del, X}, B}; - {error, _} = E -> - E - end + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end end end) of {error, _} = Err -> @@ -164,41 +151,62 @@ remove(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok end. list(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. list_for_exchange(ExchangeName) -> Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. -% Refactoring is left as an exercise for the reader list_for_queue(QueueName) -> Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(ExchangeName, QueueName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). has_for_exchange(ExchangeName) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, @@ -227,15 +235,14 @@ remove_transient_for_queue(QueueName) -> %%---------------------------------------------------------------------------- -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> +binding_action(Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + args = Arguments}, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) end). sync_binding(Binding, Durable, Fun) -> |