summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-09-03 15:18:12 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-09-03 15:18:12 +0100
commitc7ec5cd19aaa065446bf303f605b1ff9bb724db9 (patch)
treea5012ac693e0a6cb8859c98ecf139bc0bcaaab73
parentd4d10ffaa96cc2d50643f3fea1083ce6fc76b5f4 (diff)
downloadrabbitmq-server-c7ec5cd19aaa065446bf303f605b1ff9bb724db9.tar.gz
move binding code into its own module
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_binding.erl360
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_exchange.erl363
5 files changed, 393 insertions, 352 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b9ac560..e9f13b5c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -251,8 +251,8 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
- fun (_X, _Q) -> ok end),
+ rabbit_binding:add(Exchange, QueueName, RoutingKey, [],
+ fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -433,7 +433,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName).
+ rabbit_binding:delete_queue_bindings(QueueName).
internal_delete(QueueName) ->
case
@@ -478,7 +478,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ Post = rabbit_binding:delete_transient_queue_bindings(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
new file mode 100644
index 00000000..15f3fa87
--- /dev/null
+++ b/src/rabbit_binding.erl
@@ -0,0 +1,360 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_binding).
+-include("rabbit.hrl").
+
+-export([recover/0, add/5, delete/5, list/1]).
+-export([list_exchange_bindings/1, list_queue_bindings/1]).
+%% these must all be run inside a mnesia tx
+-export([has_exchange_bindings/1, delete_exchange_bindings/1,
+ delete_queue_bindings/1, delete_transient_queue_bindings/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([key/0]).
+
+-type(key() :: binary()).
+
+-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+
+-spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(add/5 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name(),
+ rabbit_router:routing_key(), rabbit_framing:amqp_table(),
+ inner_fun()) -> bind_res()).
+-spec(delete/5 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name(),
+ rabbit_router:routing_key(), rabbit_framing:amqp_table(),
+ inner_fun()) -> bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list/1 :: (rabbit_types:vhost()) ->
+ [{rabbit_exchange:name(), rabbit_amqqueue:name(),
+ rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(list_exchange_bindings/1 ::
+ (rabbit_exchange:name()) -> [{rabbit_amqqueue:name(),
+ rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(list_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> [{rabbit_exchange:name(),
+ rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(has_exchange_bindings/1 :: (rabbit_exchange:name()) -> boolean()).
+-spec(delete_exchange_bindings/1 ::
+ (rabbit_exchange:name()) -> [rabbit_types:binding()]).
+-spec(delete_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(delete_transient_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+recover() ->
+ rabbit_misc:table_fold(
+ fun (Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route).
+
+add(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ ok = sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ rabbit_event:notify(
+ binding_created,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName},
+ {routing_key, RoutingKey},
+ {arguments, Arguments}]),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, Binding} ->
+ (type_to_module(Type)):add_binding(Exchange, Binding);
+ {existing, _, _} ->
+ ok;
+ {error, _} = Err ->
+ Err
+ end.
+
+delete(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ case InnerFun(X, Q) of
+ ok ->
+ ok =
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ rabbit_event:notify(
+ binding_deleted,
+ [{exchange_name, ExchangeName},
+ {queue_name, QueueName}]),
+ Del = rabbit_exchange:maybe_auto_delete(X),
+ {{Del, X}, B};
+ {error, _} = E ->
+ E
+ end
+ end
+ end) of
+ {error, _} = Err ->
+ Err;
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, [B]);
+ not_deleted -> Module:remove_bindings(X, [B])
+ end
+ end.
+
+list(VHostPath) ->
+ [{ExchangeName, QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{
+ exchange_name = ExchangeName,
+ key = RoutingKey,
+ queue_name = QueueName,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(
+ rabbit_route,
+ #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ _ = '_'},
+ _ = '_'})].
+
+list_exchange_bindings(ExchangeName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ [{QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{queue_name = QueueName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
+
+% Refactoring is left as an exercise for the reader
+list_queue_bindings(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+ [{ExchangeName, RoutingKey, Arguments} ||
+ #route{binding = #binding{exchange_name = ExchangeName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
+
+has_exchange_bindings(ExchangeName) ->
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+
+delete_exchange_bindings(ExchangeName) ->
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route),
+ Route#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ write)].
+
+delete_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_forward_routes/1).
+
+delete_transient_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
+
+%%----------------------------------------------------------------------------
+
+binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ Fun(X, Q, #binding{
+ exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = rabbit_misc:sort_field_table(Arguments)})
+ end).
+
+sync_binding(Binding, Durable, Fun) ->
+ ok = case Durable of
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
+ false -> ok
+ end,
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
+ ok.
+
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
+ end).
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
+
+contains(Table, MatchHead) ->
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+delete_queue_bindings(QueueName, FwdDeleteFun) ->
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Cleanup = cleanup_deleted_queue_bindings(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ not_deleted -> Module:remove_bindings(X, Bs)
+ end
+ end, Cleanup)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% cleanup_deleted_queue_bindings1) works properly.
+cleanup_deleted_queue_bindings([], Acc) ->
+ Acc;
+cleanup_deleted_queue_bindings(
+ [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
+ cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
+
+cleanup_deleted_queue_bindings(
+ ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
+ Bindings, Acc) ->
+ cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
+cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
+ %% either Deleted is [], or its head has a non-matching ExchangeName
+ NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
+ cleanup_deleted_queue_bindings(Deleted, NewAcc).
+
+cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ {{rabbit_exchange:maybe_auto_delete(X), X}, Bindings}.
+
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 835d3f0d..097e36c6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -807,17 +807,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
- NoWait, State);
+ binding_action(fun rabbit_binding:add/5,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
- false, State);
+ binding_action(fun rabbit_binding:delete/5,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index f0b623c2..cca2e3d1 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -252,7 +252,7 @@ action(list_bindings, Node, _Args, Opts, Inform) ->
InfoKeys = [exchange_name, queue_name, routing_key, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
+ X <- rpc_call(Node, rabbit_binding, list, [VHostArg])],
InfoKeys);
action(list_connections, Node, Args, _Opts, Inform) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index af4eb1bd..d98cb20d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,38 +34,19 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2]).
--export([add_binding/5, delete_binding/5, list_bindings/1]).
--export([delete/2]).
--export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([assert_equivalence/5]).
--export([assert_args_equivalence/2]).
--export([check_type/1]).
-
-%% EXTENDED API
--export([list_exchange_bindings/1]).
--export([list_queue_bindings/1]).
-
--import(mnesia).
--import(sets).
--import(lists).
+ info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+%% this must be run inside a mnesia tx
+-export([maybe_auto_delete/1]).
+-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, type/0, binding_key/0]).
+-export_type([name/0, type/0]).
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
--type(binding_key() :: binary()).
-
--type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
--type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
- rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -97,32 +78,12 @@
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
--spec(add_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
--spec(delete_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found')).
--spec(list_bindings/1 ::
- (rabbit_types:vhost())
- -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(list_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> [{name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_exchange_bindings/1 ::
- (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
+-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
+ 'not_deleted' | 'auto_deleted').
-endif.
@@ -136,19 +97,7 @@ recover() ->
ok = mnesia:write(rabbit_exchange, Exchange, write),
[Exchange | Acc]
end, [], rabbit_durable_exchange),
- Bs = rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route),
- recover_with_bindings(Bs, Exs),
- ok.
-
-recover_with_bindings(Bs, Exs) ->
+ Bs = rabbit_binding:recover(),
recover_with_bindings(
lists:keysort(#binding.exchange_name, Bs),
lists:keysort(#exchange.name, Exs), []).
@@ -164,11 +113,11 @@ recover_with_bindings([], [], []) ->
ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
+ Exchange = #exchange{name = ExchangeName,
+ type = Type,
+ durable = Durable,
auto_delete = AutoDelete,
- arguments = Args},
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
@@ -220,9 +169,9 @@ check_type(TypeBin) ->
end
end.
-assert_equivalence(X = #exchange{ durable = Durable,
+assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
- type = Type},
+ type = Type},
Type, Durable, AutoDelete, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
@@ -232,8 +181,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_args_equivalence(#exchange{ name = Name,
- arguments = Args },
+assert_args_equivalence(#exchange{ name = Name, arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
%% equivalence". The only arg we care about is
@@ -311,92 +259,6 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
-%% TODO: Should all of the route and binding management not be
-%% refactored to its own module, especially seeing as unbind will have
-%% to be implemented for 0.91 ?
-
-delete_exchange_bindings(ExchangeName) ->
- [begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
- Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- write)].
-
-delete_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_forward_routes/1).
-
-delete_transient_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
-
-delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
- write)],
- Cleanup = cleanup_deleted_queue_bindings(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Cleanup)
- end.
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], Acc) ->
- Acc;
-cleanup_deleted_queue_bindings(
- [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
-
-cleanup_deleted_queue_bindings(
- ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
- Bindings, Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
-cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
- %% either Deleted is [], or its head has a non-matching ExchangeName
- NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
- cleanup_deleted_queue_bindings(Deleted, NewAcc).
-
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- {maybe_auto_delete(X), Bindings}.
-
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-contains(Table, MatchHead) ->
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
-
-continue('$end_of_table') -> false;
-continue({[_|_], _}) -> true;
-continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
@@ -405,156 +267,6 @@ call_with_exchange(Exchange, Fun) ->
end
end).
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
- end).
-
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(X, Q) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- ok = sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- rabbit_event:notify(
- binding_created,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName},
- {routing_key, RoutingKey},
- {arguments, Arguments}]),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end;
- {error, _} = E ->
- E
- end
- end) of
- {new, Exchange = #exchange{ type = Type }, Binding} ->
- (type_to_module(Type)):add_binding(Exchange, Binding);
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
-
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- _ ->
- case InnerFun(X, Q) of
- ok ->
- ok =
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- rabbit_event:notify(
- binding_deleted,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName}]),
- {maybe_auto_delete(X), B};
- {error, _} = E ->
- E
- end
- end
- end) of
- {error, _} = Err ->
- Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, [B]);
- not_deleted -> Module:remove_bindings(X, [B])
- end
- end.
-
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
- Fun(X, Q, #binding{
- exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
- end).
-
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
- ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
-
-list_bindings(VHostPath) ->
- [{ExchangeName, QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName,
- args = Arguments}}
- <- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
-reverse_route(#route{binding = Binding}) ->
- #reverse_route{reverse_binding = reverse_binding(Binding)};
-
-reverse_route(#reverse_route{reverse_binding = Binding}) ->
- #route{binding = reverse_binding(Binding)}.
-
-reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
delete(ExchangeName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
@@ -568,54 +280,23 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {not_deleted, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ not_deleted;
+maybe_auto_delete(#exchange{auto_delete = true} = Exchange) ->
case conditional_delete(Exchange) of
- {error, in_use} -> {not_deleted, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
+ {error, in_use} -> not_deleted;
+ {deleted, Exchange, []} -> auto_deleted
end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
- case contains(rabbit_route, Match) orelse
- contains(rabbit_durable_route, Match) of
+ case rabbit_binding:has_exchange_bindings(ExchangeName) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = delete_exchange_bindings(ExchangeName),
+ Bindings = rabbit_binding:delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
{deleted, Exchange, Bindings}.
-
-%%----------------------------------------------------------------------------
-%% EXTENDED API
-%% These are API calls that are not used by the server internally,
-%% they are exported for embedded clients to use
-
-%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
-%% return {QueueName, RoutingKey, Arguments} tuples
-list_exchange_bindings(ExchangeName) ->
- Route = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- [{QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{queue_name = QueueName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
-
-% Refactoring is left as an exercise for the reader
-list_queue_bindings(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName,
- _ = '_'}},
- [{ExchangeName, RoutingKey, Arguments} ||
- #route{binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].