summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-15 17:14:39 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-15 17:14:39 +0100
commit2dab458c133edfbffdf4ea8f2146db79964078d7 (patch)
treedb02a0d12bac619e6f896aad3a99864eed9ba15c
parentaf6eec80bb59b86e644c93f36ac0aa51463d1567 (diff)
downloadrabbitmq-server-2dab458c133edfbffdf4ea8f2146db79964078d7.tar.gz
Two modes for delete, one for when the server is running and we need to be quick going via the reverse route, and another when it is down and we need to clean up durable routes. Not entirely happy with the elegance of this but it gets the tests to pass.bug26347
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_binding.erl58
-rw-r--r--src/rabbit_exchange.erl27
3 files changed, 53 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1aba7ecb..e45e026e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -612,7 +612,7 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-internal_delete1(QueueName) ->
+internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
%% disk log
@@ -622,7 +622,7 @@ internal_delete1(QueueName) ->
end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
- rabbit_binding:remove_for_destination(QueueName).
+ rabbit_binding:remove_for_destination(QueueName, OnlyDurable).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -632,7 +632,7 @@ internal_delete(QueueName) ->
{[], []} ->
rabbit_misc:const({error, not_found});
_ ->
- Deletions = internal_delete1(QueueName),
+ Deletions = internal_delete1(QueueName, false),
T = rabbit_binding:process_deletions(Deletions),
fun() ->
ok = T(),
@@ -651,7 +651,7 @@ forget_all_durable(Node) ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
[rabbit_binding:process_deletions(
- internal_delete1(Name)) ||
+ internal_delete1(Name, true)) ||
#amqqueue{name = Name, pid = Pid} = Q <- Qs,
node(Pid) =:= Node,
rabbit_policy:get(<<"ha-mode">>, Q) =:= undefined],
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7a095e06..d887f26a 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -25,7 +25,7 @@
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
- remove_for_destination/1, remove_transient_for_destination/1]).
+ remove_for_destination/2, remove_transient_for_destination/1]).
%%----------------------------------------------------------------------------
@@ -78,8 +78,8 @@
-> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
--spec(remove_for_destination/1 ::
- (rabbit_types:binding_destination()) -> deletions()).
+-spec(remove_for_destination/2 ::
+ (rabbit_types:binding_destination(), boolean()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')).
@@ -215,7 +215,8 @@ remove(Binding, InnerFun) ->
remove(Src, Dst, B) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
- Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
+ Deletions = maybe_auto_delete(
+ B#binding.source, [B], new_deletions(), false),
process_deletions(Deletions).
list(VHostPath) ->
@@ -298,11 +299,11 @@ remove_for_source(SrcName) ->
mnesia:match_object(rabbit_route, Match, write) ++
mnesia:match_object(rabbit_semi_durable_route, Match, write))).
-remove_for_destination(DstName) ->
- remove_for_destination(DstName, fun remove_routes/1).
+remove_for_destination(DstName, OnlyDurable) ->
+ remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
remove_transient_for_destination(DstName) ->
- remove_for_destination(DstName, fun remove_transient_routes/1).
+ remove_for_destination(DstName, false, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -428,36 +429,47 @@ remove_transient_routes(Routes) ->
R#route.binding
end || R <- Routes].
-remove_for_destination(DstName, Fun) ->
+remove_for_destination(DstName, OnlyDurable, Fun) ->
lock_route_tables(),
- Match = reverse_route(
- #route{binding = #binding{destination = DstName, _ = '_'}}),
- Routes = [reverse_route(R) || R <- mnesia:match_object(
- rabbit_reverse_route, Match, write)],
+ MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}},
+ MatchRev = reverse_route(MatchFwd),
+ Routes = case OnlyDurable of
+ false -> [reverse_route(R) ||
+ R <- mnesia:match_object(
+ rabbit_reverse_route, MatchRev, write)];
+ true -> lists:usort(
+ mnesia:match_object(
+ rabbit_durable_route, MatchFwd, write) ++
+ mnesia:match_object(
+ rabbit_semi_durable_route, MatchFwd, write))
+ end,
Bindings = Fun(Routes),
- group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
- lists:keysort(#binding.source, Bindings)).
+ group_bindings_fold(fun maybe_auto_delete/4, new_deletions(),
+ lists:keysort(#binding.source, Bindings), OnlyDurable).
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.
-group_bindings_fold(_Fun, Acc, []) ->
+group_bindings_fold(_Fun, Acc, [], _OnlyDurable) ->
Acc;
-group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs]) ->
- group_bindings_fold(Fun, SrcName, Acc, Bs, [B]).
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs],
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable).
group_bindings_fold(
- Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings) ->
- group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings]);
-group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings,
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) ->
%% Either Removed is [], or its head has a non-matching SrcName.
- group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed,
+ OnlyDurable).
-maybe_auto_delete(XName, Bindings, Deletions) ->
+maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case mnesia:read({rabbit_exchange, XName}) of
[] -> {{undefined, not_deleted, Bindings}, Deletions};
- [X] -> case rabbit_exchange:maybe_auto_delete(X) of
+ [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of
not_deleted ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, Deletions2} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4d4a2a58..685c311f 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -24,7 +24,7 @@
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
+-export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -86,8 +86,8 @@
-spec(validate_binding/2 ::
(rabbit_types:exchange(), rabbit_types:binding())
-> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
--spec(maybe_auto_delete/1::
- (rabbit_types:exchange())
+-spec(maybe_auto_delete/2::
+ (rabbit_types:exchange(), boolean())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-spec(serial/1 :: (rabbit_types:exchange()) ->
fun((boolean()) -> 'none' | pos_integer())).
@@ -400,13 +400,13 @@ call_with_exchange(XName, Fun) ->
delete(XName, IfUnused) ->
Fun = case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
+ true -> fun conditional_delete/2;
+ false -> fun unconditional_delete/2
end,
call_with_exchange(
XName,
fun (X) ->
- case Fun(X) of
+ case Fun(X, false) of
{deleted, X, Bs, Deletions} ->
rabbit_binding:process_deletions(
rabbit_binding:add_deletion(
@@ -420,21 +420,21 @@ validate_binding(X = #exchange{type = XType}, Binding) ->
Module = type_to_module(XType),
Module:validate_binding(X, Binding).
-maybe_auto_delete(#exchange{auto_delete = false}) ->
+maybe_auto_delete(#exchange{auto_delete = false}, _OnlyDurable) ->
not_deleted;
-maybe_auto_delete(#exchange{auto_delete = true} = X) ->
- case conditional_delete(X) of
+maybe_auto_delete(#exchange{auto_delete = true} = X, OnlyDurable) ->
+ case conditional_delete(X, OnlyDurable) of
{error, in_use} -> not_deleted;
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-conditional_delete(X = #exchange{name = XName}) ->
+conditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
case rabbit_binding:has_for_source(XName) of
- false -> unconditional_delete(X);
+ false -> unconditional_delete(X, OnlyDurable);
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = XName}) ->
+unconditional_delete(X = #exchange{name = XName}, OnlyDurable) ->
%% this 'guarded' delete prevents unnecessary writes to the mnesia
%% disk log
case mnesia:wread({rabbit_durable_exchange, XName}) of
@@ -444,7 +444,8 @@ unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
- {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+ {deleted, X, Bindings, rabbit_binding:remove_for_destination(
+ XName, OnlyDurable)}.
next_serial(XName) ->
Serial = peek_serial(XName, write),