diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-13 14:23:37 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-13 14:23:37 +0000 |
commit | 801ceca09c3567f181d833f32888aed52ebdf64d (patch) | |
tree | 8bdb12f17b6da28a027ad3811f8e1384e87ebf3e | |
parent | 26b539bf4ac7e8bff4ff319c955d47e62597ecc0 (diff) | |
parent | c048e064f7d0a69a8ab707ede80acddd339a9d4f (diff) | |
download | rabbitmq-server-801ceca09c3567f181d833f32888aed52ebdf64d.tar.gz |
Merging default into bug23445
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 54 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 102 | ||||
-rw-r--r-- | src/rabbit_event.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 60 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 8 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 12 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 12 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 12 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 54 | ||||
-rw-r--r-- | src/rabbit_vhost.erl | 56 |
12 files changed, 237 insertions, 159 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index ae326a87..280ffd15 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -34,14 +34,14 @@ -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). +-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(delete/2 :: (rabbit_types:exchange(), +-spec(delete/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/2 :: (rabbit_types:exchange(), +-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'). --spec(remove_bindings/2 :: (rabbit_types:exchange(), +-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), rabbit_framing:amqp_table()) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 35ed1c94..56fc64fe 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -213,25 +213,31 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> end. internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> + {ReturnArg, TailFun} = case Recover of true -> ok = store_queue(Q), - Q; + {Q, fun rabbit_misc:const_ok/1}; false -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), - ok = add_default_binding(Q), - Q; - [_] -> not_found %% Q exists on stopped node + B = add_default_binding(Q), + {Q, B}; + %% Q exists on stopped node + [_] -> {not_found, fun rabbit_misc:const_ok/1} end; [ExistingQ] -> - ExistingQ + {ExistingQ, fun rabbit_misc:const_ok/1} end + end, + fun (Tx) -> + TailFun(Tx), + ReturnArg end end). @@ -447,16 +453,18 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - case rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [_] -> internal_delete1(QueueName) end - end) of - {error, _} = Err -> Err; - Deletions -> ok = rabbit_binding:process_deletions(Deletions) - end. + end, + fun ({error, _} = Err, _Tx) -> + Err; + (Deletions, Tx) -> + ok = rabbit_binding:process_deletions(Deletions, Tx) + end). maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). @@ -480,16 +488,20 @@ drop_expired(QPid) -> gen_server2:cast(QPid, drop_expired). on_node_down(Node) -> - rabbit_binding:process_deletions( - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end))). + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end, + fun (Deletions, Tx) -> + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + Deletions), + Tx) + end). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index ccadf5af..93f9dc27 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -36,7 +36,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/1]). + process_deletions/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -91,7 +91,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -119,14 +119,16 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> + rabbit_misc:const(mnesia:read({rabbit_route, B}) /= []) + end). add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> - case binding_action( + binding_action( Binding, fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; @@ -138,26 +140,26 @@ add(Binding, InnerFun) -> [] -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:write/3), - {new, Src, B}; - [_] -> {existing, Src, B} + fun (Tx) -> + ok = rabbit_exchange:callback( + Src, add_binding, + [Tx, Src, B]), + rabbit_event:notify_if( + not Tx, + binding_created, info(B)) + end; + [_] -> fun (_Tx) -> ok end end; - {error, _} = E -> - E + {error, _} = Err -> + rabbit_misc:const(Err) end - end) of - {new, Src = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(Src, B), - rabbit_event:notify(binding_created, info(B)); - {existing, _, _} -> - ok; - {error, _} = Err -> - Err - end. + end). remove(Binding, InnerFun) -> - case binding_action( + binding_action( Binding, fun (Src, Dst, B) -> + Result = case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> @@ -174,13 +176,16 @@ remove(Binding, InnerFun) -> {error, _} = E -> E end + end, + fun (Tx) -> + case Result of + {ok, Deletions} -> + ok = process_deletions(Deletions, Tx); + {error, _} = Err -> + Err + end end - end) of - {error, _} = Err -> - Err; - {ok, Deletions} -> - ok = process_deletions(Deletions) - end. + end). list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), @@ -290,24 +295,22 @@ sync_binding(Binding, Durable, Fun) -> call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({SrcTable, SrcName}), - mnesia:read({DstTable, DstName})} of - {[Src], [Dst]} -> Fun(Src, Dst); - {[], [_] } -> {error, source_not_found}; - {[_], [] } -> {error, destination_not_found}; - {[], [] } -> {error, source_and_destination_not_found} - end + ErrFun = fun (Err) -> rabbit_misc:const(Err) end, + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> + case {mnesia:read({SrcTable, SrcName}), + mnesia:read({DstTable, DstName})} of + {[Src], [Dst]} -> Fun(Src, Dst); + {[], [_] } -> ErrFun({error, source_not_found}); + {[_], [] } -> ErrFun({error, destination_not_found}); + {[], [] } -> ErrFun({error, + source_and_destination_not_found}) + end end). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. -%% Used with atoms from records; e.g., the type is expected to exist. -type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. - contains(Table, MatchHead) -> continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). @@ -423,17 +426,18 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions) -> +process_deletions(Deletions, Tx) -> dict:fold( - fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify(binding_deleted, info(B)) || - B <- FlatBindings], - TypeModule = type_to_module(Type), - case Deleted of - not_deleted -> TypeModule:remove_bindings(X, FlatBindings); - deleted -> rabbit_event:notify(exchange_deleted, - [{name, X#exchange.name}]), - TypeModule:delete(X, FlatBindings) - end + fun (_XName, {X, Deleted, Bindings}, ok) -> + FlatBindings = lists:flatten(Bindings), + [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || + B <- FlatBindings], + case Deleted of + not_deleted -> rabbit_exchange:callback(X, remove_bindings, + [Tx, X, FlatBindings]); + deleted -> rabbit_event:notify_if(not Tx, exchange_deleted, + [{name, X#exchange.name}]), + rabbit_exchange:callback(X, delete, + [Tx, X, FlatBindings]) + end end, ok, Deletions). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 2b236531..9755654b 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -37,7 +37,7 @@ -export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). --export([notify/2]). +-export([notify/2, notify_if/3]). %%---------------------------------------------------------------------------- @@ -77,6 +77,7 @@ -spec(stats_level/1 :: (state()) -> level()). -spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). -endif. @@ -140,6 +141,9 @@ if_enabled(_State, Fun) -> Fun(), ok. +notify_if(true, Type, Props) -> notify(Type, Props); +notify_if(false, _Type, _Props) -> ok. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a95cf0b1..0ec57564 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -35,6 +35,7 @@ -export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +-export([callback/3]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). -export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). @@ -86,6 +87,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). +-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). -endif. @@ -121,12 +123,9 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> auto_delete = AutoDelete, internal = Internal, arguments = Args}, - %% We want to upset things if it isn't ok; this is different from - %% the other hooks invocations, where we tend to ignore the return - %% value. - TypeModule = type_to_module(Type), - ok = TypeModule:validate(X), - case rabbit_misc:execute_mnesia_transaction( + %% We want to upset things if it isn't ok + ok = (type_to_module(Type)):validate(X), + rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> @@ -142,13 +141,17 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> [ExistingX] -> {existing, ExistingX} end - end) of - {new, X} -> TypeModule:create(X), - rabbit_event:notify(exchange_created, info(X)), - X; - {existing, X} -> X; - Err -> Err - end. + end, + fun ({new, Exchange}, Tx) -> + callback(Exchange, create, [Tx, Exchange]), + rabbit_event:notify_if( + not Tx, exchange_created, info(Exchange)), + Exchange; + ({existing, Exchange}, _Tx) -> + Exchange; + (Err, _Tx) -> + Err + end). %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> @@ -278,27 +281,27 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun) -> +call_with_exchange(XName, Fun, PrePostCommitFun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end - end). + end, PrePostCommitFun). delete(XName, IfUnused) -> - Fun = case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - case call_with_exchange(XName, Fun) of - {deleted, X, Bs, Deletions} -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions)); - Error = {error, _InUseOrNotFound} -> - Error - end. + call_with_exchange(XName, + case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + fun ({deleted, X, Bs, Deletions}, Tx) -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), Tx); + (Error = {error, _InUseOrNotFound}, _Tx) -> + Error + end). maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; @@ -308,6 +311,9 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. +callback(#exchange{type = XType}, Fun, Args) -> + apply(type_to_module(XType), Fun, Args). + conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 742944dc..8b90cbc4 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -42,19 +42,19 @@ behaviour_info(callbacks) -> {validate, 1}, %% called after declaration when previously absent - {create, 1}, + {create, 2}, %% called when recovering {recover, 2}, %% called after exchange deletion. - {delete, 2}, + {delete, 3}, %% called after a binding has been added - {add_binding, 2}, + {add_binding, 3}, %% called after bindings have been deleted. - {remove_bindings, 2}, + {remove_bindings, 3}, %% called when comparing exchanges for equivalence - should return ok or %% exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index d49d0199..adb47cc0 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -55,10 +55,10 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index e7f75464..5266dd87 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -54,10 +54,10 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index caf141fe..efe0ec88 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,8 +36,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -128,10 +128,10 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 44851858..2f0d47a7 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,8 +35,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -94,10 +94,10 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> last_topic_match(P, [BacktrackNext | R], BacktrackList). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 15ba787a..dbc09e7f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -48,6 +48,9 @@ -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). +-export([execute_mnesia_transaction/2]). +-export([execute_mnesia_transaction/3]). +-export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([upmap/2, map_in_order/2]). @@ -67,6 +70,7 @@ -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). +-export([const_ok/1, const/1]). %%---------------------------------------------------------------------------- @@ -142,6 +146,12 @@ (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). +-spec(execute_mnesia_transaction/2 :: + (thunk(A), fun ((A, boolean()) -> B)) -> B). +-spec(execute_mnesia_transaction/3 :: + (thunk(A), fun ((A) -> B), fun ((A) -> B)) -> B). +-spec(execute_mnesia_tx_with_tail/1 :: + (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(makenode/1 :: ({string(), string()} | string()) -> node()). -spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). @@ -196,6 +206,8 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). +-spec(const_ok/1 :: (any()) -> 'ok'). +-spec(const/1 :: (A) -> fun ((_) -> A)). -endif. @@ -377,6 +389,45 @@ execute_mnesia_transaction(TxFun) -> {aborted, Reason} -> throw({error, Reason}) end. + +%% Like execute_mnesia_transaction/1 with additional Pre- and Post- +%% commit functions +execute_mnesia_transaction(TxFun, PreCommit, PostCommit) -> + case mnesia:is_transaction() of + true -> throw(unexpected_transaction); + false -> ok + end, + PostCommit(execute_mnesia_transaction( + fun () -> + PreCommit(TxFun()) + end)). + +%% Like execute_mnesia_transaction/3 with similar Pre- and PostCommit funs +execute_mnesia_transaction(TxFun, PrePostCommitFun) -> + execute_mnesia_transaction(TxFun, + fun (Result) -> + PrePostCommitFun(Result, true), + Result + end, + fun (Result) -> + PrePostCommitFun(Result, false) + end). + +%% Like execute_mnesia_transaction/2, but TxFun is expected to return a +%% TailFun which gets called immediately before and after the tx commit +execute_mnesia_tx_with_tail(TxFun) -> + case mnesia:is_transaction() of + true -> execute_mnesia_transaction(TxFun); + false -> TailFun = + execute_mnesia_transaction( + fun () -> + TailFun = TxFun(), + TailFun(true), + TailFun + end), + TailFun(false) + end. + ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). @@ -805,3 +856,6 @@ lock_file(Path) -> false -> {ok, Lock} = file:open(Path, [write]), ok = file:close(Lock) end. + +const_ok(_) -> ok. +const(X) -> fun (_) -> X end. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index f939a3fe..f5de0cf6 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -53,37 +53,39 @@ add(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_vhost, VHostPath}) of - [] -> - ok = mnesia:write(rabbit_vhost, - #vhost{virtual_host = VHostPath}, - write), - [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], - ok; - [_] -> - mnesia:abort({vhost_already_exists, VHostPath}) + [] -> ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write); + [_] -> mnesia:abort({vhost_already_exists, VHostPath}) end + end, + fun rabbit_misc:const_ok/1, + fun (ok) -> + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], + ok end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), R. delete(VHostPath) -> - %%FIXME: We are forced to delete the queues outside the TX below - %%because queue deletion involves sending messages to the queue - %%process, which in turn results in further mnesia actions and - %%eventually the termination of that process. - lists:foreach(fun (Q) -> - {ok,_} = rabbit_amqqueue:delete(Q, false, false) - end, - rabbit_amqqueue:list(VHostPath)), + %% FIXME: We are forced to delete the queues and exchanges outside + %% the TX below. Queue deletion involves sending messages to the queue + %% process, which in turn results in further mnesia actions and + %% eventually the termination of that process. Exchange deletion causes + %% notifications which must be sent outside the TX + [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + Q <- rabbit_amqqueue:list(VHostPath)], + [ok = rabbit_exchange:delete(Name, false) || + #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( with(VHostPath, fun () -> ok = internal_delete(VHostPath) @@ -92,10 +94,6 @@ delete(VHostPath) -> R. internal_delete(VHostPath) -> - lists:foreach(fun (#exchange{name = Name}) -> - ok = rabbit_exchange:delete(Name, false) - end, - rabbit_exchange:list(VHostPath)), lists:foreach( fun ({Username, _, _, _}) -> ok = rabbit_auth_backend_internal:clear_permissions(Username, |