diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-11 15:54:06 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-11 15:54:06 +0100 |
commit | 688ab242b766dca9e67ab08e209e1b52355ff946 (patch) | |
tree | 3c1429c8512efbb5744df394bdc8628a34ac16ec | |
parent | 17e1c2b4e8966eefb9dc6195dfa64d97f495a36c (diff) | |
download | rabbitmq-server-688ab242b766dca9e67ab08e209e1b52355ff946.tar.gz |
Reworked binding / exchange autodeletion with better abstracted and cleaner API
-rw-r--r-- | src/rabbit_amqqueue.erl | 28 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 184 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 20 |
3 files changed, 115 insertions, 117 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3c767eef..d047f7ca 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -450,8 +450,7 @@ internal_delete(QueueName) -> end end) of {error, _} = Err -> Err; - PostHook -> PostHook(), - ok + Deletions -> ok = rabbit_binding:process_deletions(Deletions) end. maybe_run_queue_via_backing_queue(QPid, Fun) -> @@ -470,20 +469,23 @@ maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> - [Hook() || - Hook <- rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end)], - ok. + Deletions = + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)), + ok = rabbit_binding:process_deletions(Deletions). delete_queue(QueueName) -> - Post = rabbit_binding:remove_transient_for_destination(QueueName), + Deletions = rabbit_binding:remove_transient_for_destination(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), - Post. + Deletions. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 2e9c580a..a40671b8 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -35,18 +35,18 @@ -export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). +-export([new_deletions/0, combine_deletions/2, add_deletion/3, + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([post_binding_removal_fun/1]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, - remove_for_destination/1, remove_transient_for_destination/1, - remove_for_destination_inner/1]). + remove_for_destination/1, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([key/0]). +-export_type([key/0, deletions/0]). -type(key() :: binary()). @@ -60,6 +60,8 @@ rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). +-opaque(deletions() :: dict:dictionary()). + -spec(recover/0 :: () -> [rabbit_types:binding()]). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> bind_res()). @@ -86,12 +88,16 @@ -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: - (rabbit_types:binding_destination()) -> fun (() -> 'ok')). + (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: - (rabbit_types:binding_destination()) -> fun (() -> 'ok')). --spec(remove_for_destination_inner/1 :: - (rabbit_types:binding_destination()) -> dict:dictionary()). --spec(post_binding_removal_fun/1 :: (dict:dictionary()) -> fun (() -> 'ok')). + (rabbit_types:binding_destination()) -> deletions()). +-spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). +-spec(add_deletion/3 :: (rabbit_types:binding_source(), + {'undefined' | rabbit_exchange:name(), + 'deleted' | 'not_deleted', + deletions()}, deletions()) -> deletions()). +-spec(new_deletions/0 :: () -> deletions()). -endif. @@ -162,9 +168,9 @@ remove(Binding, InnerFun) -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:delete_object/3), - {ok, merge_maybe_auto_delete( - Binding#binding.source, [B], - dict:new())}; + {ok, + maybe_auto_delete(B#binding.source, + [B], new_deletions())}; {error, _} = E -> E end @@ -172,8 +178,8 @@ remove(Binding, InnerFun) -> end) of {error, _} = Err -> Err; - {ok, Grouped} -> - ok = (post_binding_removal_fun(Grouped))() + {ok, Deletions} -> + ok = process_deletions(Deletions) end. list(VHostPath) -> @@ -250,9 +256,6 @@ remove_for_source(SrcName) -> remove_for_destination(DstName) -> remove_for_destination(DstName, fun delete_forward_routes/1). -remove_for_destination_inner(DstName) -> - remove_for_destination_inner(DstName, fun delete_forward_routes/1). - remove_transient_for_destination(DstName) -> remove_for_destination(DstName, fun delete_transient_forward_routes/1). @@ -313,11 +316,7 @@ continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). remove_for_destination(DstName, FwdDeleteFun) -> - post_binding_removal_fun( - remove_for_destination_inner(DstName, FwdDeleteFun)). - -remove_for_destination_inner(DstName, FwdDeleteFun) -> - DeletedBindings = + Bindings = [begin Route = reverse_route(ReverseRoute), ok = FwdDeleteFun(Route), @@ -332,86 +331,38 @@ remove_for_destination_inner(DstName, FwdDeleteFun) -> destination = DstName, _ = '_'}}), write)], - group_bindings_and_auto_delete( - lists:keysort(#binding.source, DeletedBindings), dict:new()). - -post_binding_removal_fun(Grouped) -> - fun () -> dict:fold( - fun (_SrcName, {Src, IsDeleted, Bs}, ok) -> - post_binding_removal(IsDeleted, Src, - lists:usort(lists:flatten(Bs))) - end, ok, Grouped) - end. - -post_binding_removal(not_deleted, Src = #exchange{ type = Type }, Bs) -> - ok = (type_to_module(Type)):remove_bindings(Src, Bs); -post_binding_removal(deleted, Src = #exchange{ type = Type }, Bs) -> - ok = (type_to_module(Type)):delete(Src, Bs). + group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), + lists:keysort(#binding.source, Bindings)). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_and_auto_delete([], Acc) -> +group_bindings_fold(_Fun, Acc, []) -> Acc; -group_bindings_and_auto_delete([B = #binding{source = SrcName} | Bs], Acc) -> - group_bindings_and_auto_delete(SrcName, Bs, [B], Acc). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). -group_bindings_and_auto_delete(SrcName, [B = #binding{source = SrcName} | Bs], - Bindings, Acc) -> - group_bindings_and_auto_delete(SrcName, Bs, [B | Bindings], Acc); -group_bindings_and_auto_delete(SrcName, Removed, Bindings, Acc) -> +group_bindings_fold( + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> %% Either Removed is [], or its head has a non-matching SrcName. - group_bindings_and_auto_delete( - Removed, merge_maybe_auto_delete(SrcName, Bindings, Acc)). - -%% Once a binding source is deleted, we'll never revisit it, so we -%% should never find that the existing entry is {deleted, Bindings}. -merge_maybe_auto_delete(SrcName, Bindings, Acc) -> - UpdateFun = fun (NewResult, Src) -> - dict:update( - SrcName, - fun ({Src1, Result, Bindings1}) -> - {not_undef(Src, Src1), - boolean_or(deleted, Result, NewResult), - [Bindings | Bindings1]} - end, {Src, NewResult, Bindings}, Acc) - end, - case mnesia:read({rabbit_exchange, SrcName}) of - [] -> UpdateFun(deleted, undefined); - [Src] -> case rabbit_exchange:maybe_auto_delete(Src) of - not_deleted -> - UpdateFun(not_deleted, Src); - {auto_deleted, Acc1} -> - merge_binding_dicts(UpdateFun(deleted, Src), Acc1) - end + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + +maybe_auto_delete(XName, Bindings, Deletions) -> + case rabbit_exchange:lookup(XName) of + {error, not_found} -> + add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions); + {ok, X} -> + Deletions1 = + add_deletion(XName, {X, not_deleted, Bindings}, Deletions), + case rabbit_exchange:maybe_auto_delete(X) of + not_deleted -> Deletions1; + {deleted, Deletions2} -> combine_deletions(Deletions1, + Deletions2) + end end. -%% Should never find that both have deleted the exchange. -merge_binding_dicts(LHS, RHS) -> - dict:merge( - fun (_SrcName, - {SrcA, IsDeletedA, BindingsL}, {SrcB, IsDeletedB, BindingsR}) -> - {not_undef(SrcA, SrcB), - boolean_or(deleted, IsDeletedA, IsDeletedB), - [BindingsL | BindingsR]} - end, LHS, RHS). - -not_undef(undefined, undefined) -> - undefined; -not_undef(undefined, N) -> - N; -not_undef(N, undefined) -> - N; -not_undef(N, N) -> - N. - -boolean_or(True, True, _Any) -> - True; -boolean_or(True, _Any, True) -> - True; -boolean_or(_True, Any, Any) -> - Any. - delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). @@ -448,3 +399,50 @@ reverse_binding(#binding{source = SrcName, destination = DstName, key = Key, args = Args}. + +%% ---------------------------------------------------------------------------- +%% Binding / exchange deletion abstraction API +%% ---------------------------------------------------------------------------- + +anything_but(NotThis, NotThis, NotThis) -> + NotThis; +anything_but(NotThis, NotThis, This) -> + This; +anything_but(NotThis, This, NotThis) -> + This; +anything_but(_NotThis, This, This) -> + This. + +boolean_or(True, True, _Any) -> + True; +boolean_or(True, _Any, True) -> + True; +boolean_or(_True, Any, Any) -> + Any. + +new_deletions() -> + dict:new(). + +add_deletion(XName, Init = {X, Deleted, Bindings}, Deletions) -> + dict:update( + XName, fun ({X1, Deleted1, Bindings1}) -> + {anything_but(undefined, X, X1), + boolean_or(deleted, Deleted, Deleted1), + [Bindings | Bindings1]} + end, Init, Deletions). + +combine_deletions(Deletions1, Deletions2) -> + dict:merge( + fun (_XName, {X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> + {anything_but(undefined, X1, X2), + boolean_or(deleted, Deleted1, Deleted2), + [Bindings1 | Bindings2]} + end, Deletions1, Deletions2). + +process_deletions(Deletions) -> + dict:fold( + fun (_XName, {X = #exchange{ type = Type }, not_deleted, Bindings}, ok) -> + (type_to_module(Type)):remove_bindings(X, lists:flatten(Bindings)); + (_XName, {X = #exchange{ type = Type }, deleted, Bindings}, ok) -> + (type_to_module(Type)):delete(X, lists:flatten(Bindings)) + end, ok, Deletions). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 50f49d0c..0ddeca37 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -84,7 +84,7 @@ rabbit_types:error('in_use')). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) - -> 'not_deleted' | {'auto_deleted', dict:dictionary()}). + -> 'not_deleted' | {'deleted', rabbit_binding:dictionary()}). -endif. @@ -278,11 +278,10 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, case call_with_exchange(XName, Fun) of - {deleted, X, Bs, Grouped} -> - Grouped1 = dict:update(XName, fun ({_X, _MaybeDeleted, Bs1}) -> - {X, deleted, [Bs | Bs1]} - end, {X, deleted, Bs}, Grouped), - ok = (rabbit_binding:post_binding_removal_fun(Grouped1))(); + {deleted, X, Bs, Deletions} -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); Error = {error, _InUseOrNotFound} -> Error end. @@ -291,8 +290,8 @@ maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> case conditional_delete(X) of - {error, in_use} -> not_deleted; - {deleted, X, [], Res} -> {auto_deleted, Res} + {error, in_use} -> not_deleted; + {deleted, X, [], Deletions} -> {deleted, Deletions} end. conditional_delete(X = #exchange{name = XName}) -> @@ -302,8 +301,7 @@ conditional_delete(X = #exchange{name = XName}) -> end. unconditional_delete(X = #exchange{name = XName}) -> - Bindings = rabbit_binding:remove_for_source(XName), ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), - rabbit_event:notify(exchange_deleted, [{name, XName}]), - {deleted, X, Bindings, rabbit_binding:remove_for_destination_inner(XName)}. + Bindings = rabbit_binding:remove_for_source(XName), + {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. |