From fc442faac16cc393036a771505420706d91d5e22 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 15 May 2014 14:43:16 +0100 Subject: Notify about policy and parameter removal after the vhost is deleted, not during the attendant tx. --- src/rabbit_channel.erl | 16 +++++++++++----- src/rabbit_policy.erl | 16 +++++++++++----- src/rabbit_runtime_parameters.erl | 21 ++++++++++++++------- src/rabbit_vhost.erl | 21 ++++++++++----------- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 74f9cacf..eb9ed4ed 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -187,7 +187,7 @@ force_event_refresh(Ref) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, +init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ?store_proc_name({ConnName, Channel}), @@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, State = #ch{state = starting, protocol = Protocol, channel = Channel, - reader_pid = ReaderPid, + reader_pid = Foo, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, @@ -894,8 +894,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, State), + test(State, ExchangeName), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -1119,7 +1118,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, limiter = Limiter}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + State1 = test2(State, Msgs), Rev = fun (X) -> lists:reverse(lists:sort(X)) end, lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1); ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter) @@ -1165,6 +1164,13 @@ handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). +test2(State, Msgs) -> + rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs). + +test(State, ExchangeName) -> + check_not_default_exchange(ExchangeName), + check_configure_permitted(ExchangeName, State). + %%---------------------------------------------------------------------------- %% We get the queue process to send the consume_ok on our behalf. This diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 6e0abd69..0a69fb32 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -213,17 +213,23 @@ notify_clear(VHost, <<"policy">>, Name) -> %% [1] We need to prevent this from becoming O(n^2) in a similar %% manner to rabbit_binding:remove_for_{source,destination}. So see %% the comment in rabbit_binding:lock_route_tables/0 for more rationale. +%% [2] We could be here in a post-tx fun after the vhost has been +%% deleted; in which case it's fine to do nothing. update_policies(VHost) -> Tabs = [rabbit_queue, rabbit_durable_queue, rabbit_exchange, rabbit_durable_exchange], {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( fun() -> [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] - Policies = list(VHost), - {[update_exchange(X, Policies) || - X <- rabbit_exchange:list(VHost)], - [update_queue(Q, Policies) || - Q <- rabbit_amqqueue:list(VHost)]} + case catch list(VHost) of + {error, {no_such_vhost, _}} -> + ok; %% [2] + Policies -> + {[update_exchange(X, Policies) || + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies) || + Q <- rabbit_amqqueue:list(VHost)]} + end end), [catch notify(X) || X <- Xs], [catch notify(Q) || Q <- Qs], diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 7307330b..cf125913 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -29,6 +29,7 @@ -ifdef(use_specs). -type(ok_or_error_string() :: 'ok' | {'error_string', string()}). +-type(ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok')). -spec(parse_set/5 :: (rabbit_types:vhost(), binary(), binary(), string(), rabbit_types:user() | 'none') -> ok_or_error_string()). @@ -38,9 +39,9 @@ rabbit_types:user() | 'none') -> ok_or_error_string()). -spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) - -> ok_or_error_string()). + -> ok_thunk_or_error_string()). -spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) - -> ok_or_error_string()). + -> ok_thunk_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]). -spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]). @@ -137,16 +138,22 @@ clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). clear_any(VHost, Component, Name) -> - case lookup(VHost, Component, Name) of - not_found -> {error_string, "Parameter does not exist"}; - _ -> mnesia_clear(VHost, Component, Name), + Notify = fun () -> case lookup_component(Component) of {ok, Mod} -> event_notify( - parameter_cleared, VHost, Component, - [{name, Name}]), + parameter_cleared, VHost, Component, + [{name, Name}]), Mod:notify_clear(VHost, Component, Name); _ -> ok end + end, + case lookup(VHost, Component, Name) of + not_found -> {error_string, "Parameter does not exist"}; + _ -> mnesia_clear(VHost, Component, Name), + case mnesia:is_transaction() of + true -> Notify; + false -> Notify() + end end. mnesia_clear(VHost, Component, Name) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index b57627e4..cfa3add4 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -88,12 +88,11 @@ delete(VHostPath) -> #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)], [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], - R = rabbit_misc:execute_mnesia_transaction( - with(VHostPath, fun () -> - ok = internal_delete(VHostPath) - end)), + Funs = rabbit_misc:execute_mnesia_transaction( + with(VHostPath, fun () -> internal_delete(VHostPath) end)), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), - R. + [ok = Fun() || Fun <- Funs], + ok. assert_benign(ok) -> ok; assert_benign({ok, _}) -> ok; @@ -111,14 +110,14 @@ internal_delete(VHostPath) -> [ok = rabbit_auth_backend_internal:clear_permissions( proplists:get_value(user, Info), VHostPath) || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], - [ok = rabbit_runtime_parameters:clear(VHostPath, - proplists:get_value(component, Info), - proplists:get_value(name, Info)) + Fs1 = [rabbit_runtime_parameters:clear(VHostPath, + proplists:get_value(component, Info), + proplists:get_value(name, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], - [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) - || Info <- rabbit_policy:list(VHostPath)], + Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) + || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), - ok. + Fs1 ++ Fs2. exists(VHostPath) -> mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. -- cgit v1.2.1