summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-11 15:54:06 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-11 15:54:06 +0100
commit688ab242b766dca9e67ab08e209e1b52355ff946 (patch)
tree3c1429c8512efbb5744df394bdc8628a34ac16ec
parent17e1c2b4e8966eefb9dc6195dfa64d97f495a36c (diff)
downloadrabbitmq-server-688ab242b766dca9e67ab08e209e1b52355ff946.tar.gz
Reworked binding / exchange autodeletion with better abstracted and cleaner API
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_binding.erl184
-rw-r--r--src/rabbit_exchange.erl20
3 files changed, 115 insertions, 117 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3c767eef..d047f7ca 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -450,8 +450,7 @@ internal_delete(QueueName) ->
end
end) of
{error, _} = Err -> Err;
- PostHook -> PostHook(),
- ok
+ Deletions -> ok = rabbit_binding:process_deletions(Deletions)
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
@@ -470,20 +469,23 @@ maybe_expire(QPid) ->
gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
- [Hook() ||
- Hook <- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end)],
- ok.
+ Deletions =
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end)),
+ ok = rabbit_binding:process_deletions(Deletions).
delete_queue(QueueName) ->
- Post = rabbit_binding:remove_transient_for_destination(QueueName),
+ Deletions = rabbit_binding:remove_transient_for_destination(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
- Post.
+ Deletions.
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 2e9c580a..a40671b8 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -35,18 +35,18 @@
-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
+-export([new_deletions/0, combine_deletions/2, add_deletion/3,
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([post_binding_removal_fun/1]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
- remove_for_destination/1, remove_transient_for_destination/1,
- remove_for_destination_inner/1]).
+ remove_for_destination/1, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([key/0]).
+-export_type([key/0, deletions/0]).
-type(key() :: binary()).
@@ -60,6 +60,8 @@
rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-type(bindings() :: [rabbit_types:binding()]).
+-opaque(deletions() :: dict:dictionary()).
+
-spec(recover/0 :: () -> [rabbit_types:binding()]).
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
@@ -86,12 +88,16 @@
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
-spec(remove_for_destination/1 ::
- (rabbit_types:binding_destination()) -> fun (() -> 'ok')).
+ (rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
- (rabbit_types:binding_destination()) -> fun (() -> 'ok')).
--spec(remove_for_destination_inner/1 ::
- (rabbit_types:binding_destination()) -> dict:dictionary()).
--spec(post_binding_removal_fun/1 :: (dict:dictionary()) -> fun (() -> 'ok')).
+ (rabbit_types:binding_destination()) -> deletions()).
+-spec(process_deletions/1 :: (deletions()) -> 'ok').
+-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
+-spec(add_deletion/3 :: (rabbit_types:binding_source(),
+ {'undefined' | rabbit_exchange:name(),
+ 'deleted' | 'not_deleted',
+ deletions()}, deletions()) -> deletions()).
+-spec(new_deletions/0 :: () -> deletions()).
-endif.
@@ -162,9 +168,9 @@ remove(Binding, InnerFun) ->
ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:delete_object/3),
- {ok, merge_maybe_auto_delete(
- Binding#binding.source, [B],
- dict:new())};
+ {ok,
+ maybe_auto_delete(B#binding.source,
+ [B], new_deletions())};
{error, _} = E ->
E
end
@@ -172,8 +178,8 @@ remove(Binding, InnerFun) ->
end) of
{error, _} = Err ->
Err;
- {ok, Grouped} ->
- ok = (post_binding_removal_fun(Grouped))()
+ {ok, Deletions} ->
+ ok = process_deletions(Deletions)
end.
list(VHostPath) ->
@@ -250,9 +256,6 @@ remove_for_source(SrcName) ->
remove_for_destination(DstName) ->
remove_for_destination(DstName, fun delete_forward_routes/1).
-remove_for_destination_inner(DstName) ->
- remove_for_destination_inner(DstName, fun delete_forward_routes/1).
-
remove_transient_for_destination(DstName) ->
remove_for_destination(DstName, fun delete_transient_forward_routes/1).
@@ -313,11 +316,7 @@ continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
remove_for_destination(DstName, FwdDeleteFun) ->
- post_binding_removal_fun(
- remove_for_destination_inner(DstName, FwdDeleteFun)).
-
-remove_for_destination_inner(DstName, FwdDeleteFun) ->
- DeletedBindings =
+ Bindings =
[begin
Route = reverse_route(ReverseRoute),
ok = FwdDeleteFun(Route),
@@ -332,86 +331,38 @@ remove_for_destination_inner(DstName, FwdDeleteFun) ->
destination = DstName,
_ = '_'}}),
write)],
- group_bindings_and_auto_delete(
- lists:keysort(#binding.source, DeletedBindings), dict:new()).
-
-post_binding_removal_fun(Grouped) ->
- fun () -> dict:fold(
- fun (_SrcName, {Src, IsDeleted, Bs}, ok) ->
- post_binding_removal(IsDeleted, Src,
- lists:usort(lists:flatten(Bs)))
- end, ok, Grouped)
- end.
-
-post_binding_removal(not_deleted, Src = #exchange{ type = Type }, Bs) ->
- ok = (type_to_module(Type)):remove_bindings(Src, Bs);
-post_binding_removal(deleted, Src = #exchange{ type = Type }, Bs) ->
- ok = (type_to_module(Type)):delete(Src, Bs).
+ group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
+ lists:keysort(#binding.source, Bindings)).
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
-group_bindings_and_auto_delete([], Acc) ->
+group_bindings_fold(_Fun, Acc, []) ->
Acc;
-group_bindings_and_auto_delete([B = #binding{source = SrcName} | Bs], Acc) ->
- group_bindings_and_auto_delete(SrcName, Bs, [B], Acc).
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B]).
-group_bindings_and_auto_delete(SrcName, [B = #binding{source = SrcName} | Bs],
- Bindings, Acc) ->
- group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc);
-group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) ->
+group_bindings_fold(
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
%% Either Removed is [], or its head has a non-matching SrcName.
- group_bindings_and_auto_delete(
- Removed, merge_maybe_auto_delete(SrcName, Bindings, Acc)).
-
-%% Once a binding source is deleted, we'll never revisit it, so we
-%% should never find that the existing entry is {deleted, Bindings}.
-merge_maybe_auto_delete(SrcName, Bindings, Acc) ->
- UpdateFun = fun (NewResult, Src) ->
- dict:update(
- SrcName,
- fun ({Src1, Result, Bindings1}) ->
- {not_undef(Src, Src1),
- boolean_or(deleted, Result, NewResult),
- [Bindings | Bindings1]}
- end, {Src, NewResult, Bindings}, Acc)
- end,
- case mnesia:read({rabbit_exchange, SrcName}) of
- [] -> UpdateFun(deleted, undefined);
- [Src] -> case rabbit_exchange:maybe_auto_delete(Src) of
- not_deleted ->
- UpdateFun(not_deleted, Src);
- {auto_deleted, Acc1} ->
- merge_binding_dicts(UpdateFun(deleted, Src), Acc1)
- end
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
+
+maybe_auto_delete(XName, Bindings, Deletions) ->
+ case rabbit_exchange:lookup(XName) of
+ {error, not_found} ->
+ add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
+ {ok, X} ->
+ Deletions1 =
+ add_deletion(XName, {X, not_deleted, Bindings}, Deletions),
+ case rabbit_exchange:maybe_auto_delete(X) of
+ not_deleted -> Deletions1;
+ {deleted, Deletions2} -> combine_deletions(Deletions1,
+ Deletions2)
+ end
end.
-%% Should never find that both have deleted the exchange.
-merge_binding_dicts(LHS, RHS) ->
- dict:merge(
- fun (_SrcName,
- {SrcA, IsDeletedA, BindingsL}, {SrcB, IsDeletedB, BindingsR}) ->
- {not_undef(SrcA, SrcB),
- boolean_or(deleted, IsDeletedA, IsDeletedB),
- [BindingsL | BindingsR]}
- end, LHS, RHS).
-
-not_undef(undefined, undefined) ->
- undefined;
-not_undef(undefined, N) ->
- N;
-not_undef(N, undefined) ->
- N;
-not_undef(N, N) ->
- N.
-
-boolean_or(True, True, _Any) ->
- True;
-boolean_or(True, _Any, True) ->
- True;
-boolean_or(_True, Any, Any) ->
- Any.
-
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
@@ -448,3 +399,50 @@ reverse_binding(#binding{source = SrcName,
destination = DstName,
key = Key,
args = Args}.
+
+%% ----------------------------------------------------------------------------
+%% Binding / exchange deletion abstraction API
+%% ----------------------------------------------------------------------------
+
+anything_but(NotThis, NotThis, NotThis) ->
+ NotThis;
+anything_but(NotThis, NotThis, This) ->
+ This;
+anything_but(NotThis, This, NotThis) ->
+ This;
+anything_but(_NotThis, This, This) ->
+ This.
+
+boolean_or(True, True, _Any) ->
+ True;
+boolean_or(True, _Any, True) ->
+ True;
+boolean_or(_True, Any, Any) ->
+ Any.
+
+new_deletions() ->
+ dict:new().
+
+add_deletion(XName, Init = {X, Deleted, Bindings}, Deletions) ->
+ dict:update(
+ XName, fun ({X1, Deleted1, Bindings1}) ->
+ {anything_but(undefined, X, X1),
+ boolean_or(deleted, Deleted, Deleted1),
+ [Bindings | Bindings1]}
+ end, Init, Deletions).
+
+combine_deletions(Deletions1, Deletions2) ->
+ dict:merge(
+ fun (_XName, {X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
+ {anything_but(undefined, X1, X2),
+ boolean_or(deleted, Deleted1, Deleted2),
+ [Bindings1 | Bindings2]}
+ end, Deletions1, Deletions2).
+
+process_deletions(Deletions) ->
+ dict:fold(
+ fun (_XName, {X = #exchange{ type = Type }, not_deleted, Bindings}, ok) ->
+ (type_to_module(Type)):remove_bindings(X, lists:flatten(Bindings));
+ (_XName, {X = #exchange{ type = Type }, deleted, Bindings}, ok) ->
+ (type_to_module(Type)):delete(X, lists:flatten(Bindings))
+ end, ok, Deletions).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 50f49d0c..0ddeca37 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -84,7 +84,7 @@
rabbit_types:error('in_use')).
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
- -> 'not_deleted' | {'auto_deleted', dict:dictionary()}).
+ -> 'not_deleted' | {'deleted', rabbit_binding:dictionary()}).
-endif.
@@ -278,11 +278,10 @@ delete(XName, IfUnused) ->
false -> fun unconditional_delete/1
end,
case call_with_exchange(XName, Fun) of
- {deleted, X, Bs, Grouped} ->
- Grouped1 = dict:update(XName, fun ({_X, _MaybeDeleted, Bs1}) ->
- {X, deleted, [Bs | Bs1]}
- end, {X, deleted, Bs}, Grouped),
- ok = (rabbit_binding:post_binding_removal_fun(Grouped1))();
+ {deleted, X, Bs, Deletions} ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
Error = {error, _InUseOrNotFound} ->
Error
end.
@@ -291,8 +290,8 @@ maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
case conditional_delete(X) of
- {error, in_use} -> not_deleted;
- {deleted, X, [], Res} -> {auto_deleted, Res}
+ {error, in_use} -> not_deleted;
+ {deleted, X, [], Deletions} -> {deleted, Deletions}
end.
conditional_delete(X = #exchange{name = XName}) ->
@@ -302,8 +301,7 @@ conditional_delete(X = #exchange{name = XName}) ->
end.
unconditional_delete(X = #exchange{name = XName}) ->
- Bindings = rabbit_binding:remove_for_source(XName),
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
- rabbit_event:notify(exchange_deleted, [{name, XName}]),
- {deleted, X, Bindings, rabbit_binding:remove_for_destination_inner(XName)}.
+ Bindings = rabbit_binding:remove_for_source(XName),
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.