diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-15 17:14:39 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-15 17:14:39 +0100 |
commit | 2dab458c133edfbffdf4ea8f2146db79964078d7 (patch) | |
tree | db02a0d12bac619e6f896aad3a99864eed9ba15c | |
parent | af6eec80bb59b86e644c93f36ac0aa51463d1567 (diff) | |
download | rabbitmq-server-2dab458c133edfbffdf4ea8f2146db79964078d7.tar.gz |
Two modes for delete, one for when the server is running and we need to be quick going via the reverse route, and another when it is down and we need to clean up durable routes. Not entirely happy with the elegance of this but it gets the tests to pass.bug26347
-rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 58 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 27 |
3 files changed, 53 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1aba7ecb..e45e026e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -612,7 +612,7 @@ notify_sent_queue_down(QPid) -> resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). -internal_delete1(QueueName) -> +internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), %% this 'guarded' delete prevents unnecessary writes to the mnesia %% disk log @@ -622,7 +622,7 @@ internal_delete1(QueueName) -> end, %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. - rabbit_binding:remove_for_destination(QueueName). + rabbit_binding:remove_for_destination(QueueName, OnlyDurable). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -632,7 +632,7 @@ internal_delete(QueueName) -> {[], []} -> rabbit_misc:const({error, not_found}); _ -> - Deletions = internal_delete1(QueueName), + Deletions = internal_delete1(QueueName, false), T = rabbit_binding:process_deletions(Deletions), fun() -> ok = T(), @@ -651,7 +651,7 @@ forget_all_durable(Node) -> Qs = mnesia:match_object(rabbit_durable_queue, #amqqueue{_ = '_'}, write), [rabbit_binding:process_deletions( - internal_delete1(Name)) || + internal_delete1(Name, true)) || #amqqueue{name = Name, pid = Pid} = Q <- Qs, node(Pid) =:= Node, rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined], diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7a095e06..d887f26a 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -25,7 +25,7 @@ -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, - remove_for_destination/1, remove_transient_for_destination/1]). + remove_for_destination/2, remove_transient_for_destination/1]). %%---------------------------------------------------------------------------- @@ -78,8 +78,8 @@ -> [rabbit_types:infos()]). -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). --spec(remove_for_destination/1 :: - (rabbit_types:binding_destination()) -> deletions()). +-spec(remove_for_destination/2 :: + (rabbit_types:binding_destination(), boolean()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). -spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). @@ -215,7 +215,8 @@ remove(Binding, InnerFun) -> remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), - Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), + Deletions = maybe_auto_delete( + B#binding.source, [B], new_deletions(), false), process_deletions(Deletions). list(VHostPath) -> @@ -298,11 +299,11 @@ remove_for_source(SrcName) -> mnesia:match_object(rabbit_route, Match, write) ++ mnesia:match_object(rabbit_semi_durable_route, Match, write))). -remove_for_destination(DstName) -> - remove_for_destination(DstName, fun remove_routes/1). +remove_for_destination(DstName, OnlyDurable) -> + remove_for_destination(DstName, OnlyDurable, fun remove_routes/1). remove_transient_for_destination(DstName) -> - remove_for_destination(DstName, fun remove_transient_routes/1). + remove_for_destination(DstName, false, fun remove_transient_routes/1). %%---------------------------------------------------------------------------- @@ -428,36 +429,47 @@ remove_transient_routes(Routes) -> R#route.binding end || R <- Routes]. -remove_for_destination(DstName, Fun) -> +remove_for_destination(DstName, OnlyDurable, Fun) -> lock_route_tables(), - Match = reverse_route( - #route{binding = #binding{destination = DstName, _ = '_'}}), - Routes = [reverse_route(R) || R <- mnesia:match_object( - rabbit_reverse_route, Match, write)], + MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}}, + MatchRev = reverse_route(MatchFwd), + Routes = case OnlyDurable of + false -> [reverse_route(R) || + R <- mnesia:match_object( + rabbit_reverse_route, MatchRev, write)]; + true -> lists:usort( + mnesia:match_object( + rabbit_durable_route, MatchFwd, write) ++ + mnesia:match_object( + rabbit_semi_durable_route, MatchFwd, write)) + end, Bindings = Fun(Routes), - group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), - lists:keysort(#binding.source, Bindings)). + group_bindings_fold(fun maybe_auto_delete/4, new_deletions(), + lists:keysort(#binding.source, Bindings), OnlyDurable). %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. -group_bindings_fold(_Fun, Acc, []) -> +group_bindings_fold(_Fun, Acc, [], _OnlyDurable) -> Acc; -group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) -> - group_bindings_fold(Fun, SrcName, Acc, Bs, [B]). +group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs], + OnlyDurable) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable). group_bindings_fold( - Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) -> - group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]); -group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) -> + Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings, + OnlyDurable) -> + group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable); +group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) -> %% Either Removed is [], or its head has a non-matching SrcName. - group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed). + group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed, + OnlyDurable). -maybe_auto_delete(XName, Bindings, Deletions) -> +maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) -> {Entry, Deletions1} = case mnesia:read({rabbit_exchange, XName}) of [] -> {{undefined, not_deleted, Bindings}, Deletions}; - [X] -> case rabbit_exchange:maybe_auto_delete(X) of + [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of not_deleted -> {{X, not_deleted, Bindings}, Deletions}; {deleted, Deletions2} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4d4a2a58..685c311f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -24,7 +24,7 @@ info_keys/0, info/1, info/2, info_all/1, info_all/2, route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). +-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). %%---------------------------------------------------------------------------- @@ -86,8 +86,8 @@ -spec(validate_binding/2 :: (rabbit_types:exchange(), rabbit_types:binding()) -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})). --spec(maybe_auto_delete/1:: - (rabbit_types:exchange()) +-spec(maybe_auto_delete/2:: + (rabbit_types:exchange(), boolean()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). -spec(serial/1 :: (rabbit_types:exchange()) -> fun((boolean()) -> 'none' | pos_integer())). @@ -400,13 +400,13 @@ call_with_exchange(XName, Fun) -> delete(XName, IfUnused) -> Fun = case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 + true -> fun conditional_delete/2; + false -> fun unconditional_delete/2 end, call_with_exchange( XName, fun (X) -> - case Fun(X) of + case Fun(X, false) of {deleted, X, Bs, Deletions} -> rabbit_binding:process_deletions( rabbit_binding:add_deletion( @@ -420,21 +420,21 @@ validate_binding(X = #exchange{type = XType}, Binding) -> Module = type_to_module(XType), Module:validate_binding(X, Binding). -maybe_auto_delete(#exchange{auto_delete = false}) -> +maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = X) -> - case conditional_delete(X) of +maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) -> + case conditional_delete(X, OnlyDurable) of {error, in_use} -> not_deleted; {deleted, X, [], Deletions} -> {deleted, Deletions} end. -conditional_delete(X = #exchange{name = XName}) -> +conditional_delete(X = #exchange{name = XName}, OnlyDurable) -> case rabbit_binding:has_for_source(XName) of - false -> unconditional_delete(X); + false -> unconditional_delete(X, OnlyDurable); true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = XName}) -> +unconditional_delete(X = #exchange{name = XName}, OnlyDurable) -> %% this 'guarded' delete prevents unnecessary writes to the mnesia %% disk log case mnesia:wread({rabbit_durable_exchange, XName}) of @@ -444,7 +444,8 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), - {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + {deleted, X, Bindings, rabbit_binding:remove_for_destination( + XName, OnlyDurable)}. next_serial(XName) -> Serial = peek_serial(XName, write), |