summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl140
1 files changed, 90 insertions, 50 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb44797e..91f42e9c 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -10,8 +10,8 @@
%%
%% The Original Code is RabbitMQ.
%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_binding).
@@ -35,11 +35,16 @@
-type(key() :: binary()).
--type(bind_errors() :: rabbit_types:error('source_not_found' |
- 'destination_not_found' |
- 'source_and_destination_not_found')).
+-type(bind_errors() :: rabbit_types:error(
+ {'resources_missing',
+ [{'not_found', (rabbit_types:binding_source() |
+ rabbit_types:binding_destination())} |
+ {'absent', rabbit_types:amqqueue()}]})).
+
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
- rabbit_types:error('binding_not_found')).
+ rabbit_types:error(
+ 'binding_not_found' |
+ {'binding_invalid', string(), [any()]})).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
@@ -155,33 +160,38 @@ add(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(Src, Dst) of
- ok -> case mnesia:read({rabbit_route, B}) of
- [] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/0
- end;
- {error, _} = Err -> rabbit_misc:const(Err)
+ case rabbit_exchange:validate_binding(Src, B) of
+ ok ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(Src, Dst) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/0
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
end
end).
add(Src, Dst, B) ->
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
- case (not (SrcDurable andalso DstDurable) orelse
- mnesia:read({rabbit_durable_route, B}) =:= []) of
- true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ case (SrcDurable andalso DstDurable andalso
+ mnesia:read({rabbit_durable_route, B}) =/= []) of
+ false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- ok = rabbit_exchange:callback(
- Src, add_binding, [transaction, Src, B]),
+ x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
fun () ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Serial, Src, B]),
- ok = rabbit_event:notify(binding_created, info(B))
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
- false -> rabbit_misc:const({error, binding_not_found})
+ true -> rabbit_misc:const({error, binding_not_found})
end.
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
@@ -279,21 +289,15 @@ has_for_source(SrcName) ->
remove_for_source(SrcName) ->
lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- Routes = lists:usort(
- mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_durable_route, Match, write)),
- [begin
- sync_route(Route, fun mnesia:delete_object/3),
- Route#route.binding
- end || Route <- Routes].
+ remove_routes(
+ lists:usort(mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write))).
-remove_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_routes/1).
-remove_transient_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -310,6 +314,14 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
+delete_object(Tab, Record, LockKind) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:match_object(Tab, Record, LockKind) of
+ [] -> ok;
+ [_] -> mnesia:delete_object(Tab, Record, LockKind)
+ end.
+
sync_route(R, Fun) -> sync_route(R, true, true, Fun).
sync_route(Route, true, true, Fun) ->
@@ -330,21 +342,32 @@ sync_transient_route(Route, Fun) ->
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- ErrFun = fun (Err) -> rabbit_misc:const({error, Err}) end,
+ ErrFun = fun (Names) ->
+ Errs = [not_found_or_absent(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}})
+ end,
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> ErrFun(source_not_found);
- {[_], [] } -> ErrFun(destination_not_found);
- {[], [] } -> ErrFun(source_and_destination_not_found)
- end
+ {[], [_] } -> ErrFun([SrcName]);
+ {[_], [] } -> ErrFun([DstName]);
+ {[], [] } -> ErrFun([SrcName, DstName])
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+not_found_or_absent(#resource{kind = exchange} = Name) ->
+ {not_found, Name};
+not_found_or_absent(#resource{kind = queue} = Name) ->
+ case rabbit_amqqueue:not_found_or_absent(Name) of
+ not_found -> {not_found, Name};
+ {absent, _Q} = R -> R
+ end.
+
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
@@ -372,16 +395,32 @@ lock_route_tables() ->
rabbit_semi_durable_route,
rabbit_durable_route]].
-remove_for_destination(DstName, DeleteFun) ->
+remove_routes(Routes) ->
+ %% This partitioning allows us to suppress unnecessary delete
+ %% operations on disk tables, which require an fsync.
+ {TransientRoutes, DurableRoutes} =
+ lists:partition(fun (R) -> mnesia:match_object(
+ rabbit_durable_route, R, write) == [] end,
+ Routes),
+ [ok = sync_transient_route(R, fun mnesia:delete_object/3) ||
+ R <- TransientRoutes],
+ [ok = sync_route(R, fun mnesia:delete_object/3) ||
+ R <- DurableRoutes],
+ [R#route.binding || R <- Routes].
+
+remove_transient_routes(Routes) ->
+ [begin
+ ok = sync_transient_route(R, fun delete_object/3),
+ R#route.binding
+ end || R <- Routes].
+
+remove_for_destination(DstName, Fun) ->
lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
- ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
- Bindings = [begin
- Route = reverse_route(ReverseRoute),
- ok = DeleteFun(Route),
- Route#route.binding
- end || ReverseRoute <- ReverseRoutes],
+ Routes = [reverse_route(R) || R <- mnesia:match_object(
+ rabbit_reverse_route, Match, write)],
+ Bindings = Fun(Routes),
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
@@ -487,4 +526,5 @@ process_deletions(Deletions) ->
del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
-x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).
+x_callback(Serial, X, F, Bs) ->
+ ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).