diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-12-23 16:12:01 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-12-23 16:12:01 +0000 |
commit | 29527cea333c8ccc57449483da9326d484dfd11b (patch) | |
tree | 4e334e15be12a141b486c39361c5a54e8497f1db | |
parent | 30f878d2e519fbe872aef0018ba4f758ea549f11 (diff) | |
download | rabbitmq-server-29527cea333c8ccc57449483da9326d484dfd11b.tar.gz |
Changed exchange callback API for transactions
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 8 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 29 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 22 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 80 | ||||
-rw-r--r-- | src/rabbit_event.erl | 11 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 44 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 12 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 14 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 14 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 14 | ||||
-rw-r--r-- | src/rabbit_exchange_type_registry.erl | 25 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 14 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 56 |
13 files changed, 180 insertions, 163 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index ae326a87..280ffd15 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -34,14 +34,14 @@ -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/1 :: (rabbit_types:exchange()) -> 'ok'). +-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(delete/2 :: (rabbit_types:exchange(), +-spec(delete/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/2 :: (rabbit_types:exchange(), +-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'). --spec(remove_bindings/2 :: (rabbit_types:exchange(), +-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(assert_args_equivalence/2 :: (rabbit_types:exchange(), rabbit_framing:amqp_table()) diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b47e4f1e..d5f03fce 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -316,8 +316,8 @@ add_vhost(VHostPath) -> [_] -> mnesia:abort({vhost_already_exists, VHostPath}) end end, - fun (Arg, true) -> Arg; - (_Arg, false) -> + fun (ok) -> ok end, + fun (ok) -> [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || @@ -329,25 +329,20 @@ add_vhost(VHostPath) -> {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok - end - ), + end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), R. delete_vhost(VHostPath) -> - %%FIXME: We are forced to delete the queues outside the TX below - %%because queue deletion involves sending messages to the queue - %%process, which in turn results in further mnesia actions and - %%eventually the termination of that process. - lists:foreach(fun (Q) -> - {ok,_} = rabbit_amqqueue:delete(Q, false, false) - end, - rabbit_amqqueue:list(VHostPath)), - %%Exchange deletion causes notifications which must be sent outside the TX - lists:foreach(fun (#exchange{name = Name}) -> - ok = rabbit_exchange:delete(Name, false) - end, - rabbit_exchange:list(VHostPath)), + %% FIXME: We are forced to delete the queues and exchanges outside + %% the TX below. Queue deletion involves sending messages to the queue + %% process, which in turn results in further mnesia actions and + %% eventually the termination of that process. Exchange deletion causes + %% notifications which must be sent outside the TX + [{ok,_} = rabbit_amqqueue:delete(Q, false, false) || + Q <- rabbit_amqqueue:list(VHostPath)], + [ok = rabbit_exchange:delete(Name, false) || + #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], R = rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_vhost( VHostPath, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e4971958..0af63fb7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -215,29 +215,33 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> end. internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> - rabbit_misc:execute_mnesia_transaction( + EmptyFun = fun (_) -> ok end, + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> + {ReturnArg, TailFun} = case Recover of true -> ok = store_queue(Q), - {false, Q}; + {Q, EmptyFun}; false -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), - {true, Q}; + B = add_default_binding(Q), + {Q, B}; %% Q exists on stopped node - [_] -> {false, not_found} + [_] -> {not_found, EmptyFun} end; [ExistingQ] -> - {false, ExistingQ} + {ExistingQ, EmptyFun} end + end, + fun (Tx) -> + TailFun(Tx), + ReturnArg end - end, - fun ({true, Q}, false) -> ok = add_default_binding(Q), Q; - ({_AddBinding, Arg}, _Tx) -> Arg end). store_queue(Q = #amqqueue{durable = true}) -> @@ -463,7 +467,7 @@ internal_delete(QueueName) -> end end, fun ({error, _} = Err, _Tx) -> Err; - (Deletions, Tx) -> + (Deletions, Tx) -> ok = rabbit_binding:process_deletions(Deletions, Tx) end). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 19c7faee..aad501ba 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -119,8 +119,11 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end, - fun (_, _) -> ok end). + fun (_Src, _Dst, B) -> + fun (_Tx) -> + mnesia:read({rabbit_route, B}) /= [] + end + end). add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). @@ -139,26 +142,26 @@ add(Binding, InnerFun) -> [] -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:write/3), - {new, Src, B}; - [_] -> {existing, Src, B} + fun (Tx) -> + ok = rabbit_exchange:callback( + Src, add_binding, + [Tx, Src, B]), + rabbit_event:notify_if( + not(Tx), + binding_created, info(B)) + end; + [_] -> fun (_Tx) -> ok end end; - {error, _} = E -> - E + {error, _} = Err -> + fun (_Tx) -> Err end end - end, - fun({new, Src, B}, Tx) -> - ok = rabbit_exchange:callback(Src, add_binding, [Src, B], Tx), - rabbit_event:notify(binding_created, info(B), Tx); - ({existing, _, _}, _Tx) -> - ok; - ({error, _} = Err, _Tx) -> - Err end). remove(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> + Result = case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> @@ -175,12 +178,15 @@ remove(Binding, InnerFun) -> {error, _} = E -> E end + end, + fun (Tx) -> + case {Result, Tx} of + {{ok, Deletions}, _} -> + ok = process_deletions(Deletions, Tx); + {{error, _} = Err, _} -> + Err + end end - end, - fun ({error, _} = Err, _Tx) -> - Err; - ({ok, Deletions}, Tx) -> - ok = process_deletions(Deletions, Tx) end). list(VHostPath) -> @@ -269,14 +275,13 @@ all_durable(Resources) -> binding_action(Binding = #binding{source = SrcName, destination = DstName, - args = Arguments}, Fun, TriggerFun) -> + args = Arguments}, Fun) -> call_with_source_and_destination( SrcName, DstName, fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), Fun(Src, Dst, Binding#binding{args = SortedArgs}) - end, - TriggerFun). + end). sync_binding(Binding, Durable, Fun) -> ok = case Durable of @@ -289,10 +294,10 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_source_and_destination(SrcName, DstName, Fun, TriggerFun) -> +call_with_source_and_destination(SrcName, DstName, Fun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case {mnesia:read({SrcTable, SrcName}), mnesia:read({DstTable, DstName})} of {[Src], [Dst]} -> Fun(Src, Dst); @@ -300,8 +305,7 @@ call_with_source_and_destination(SrcName, DstName, Fun, TriggerFun) -> {[_], [] } -> {error, destination_not_found}; {[], [] } -> {error, source_and_destination_not_found} end - end, - TriggerFun). + end). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. @@ -424,17 +428,15 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> process_deletions(Deletions, Tx) -> dict:fold( fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify(binding_deleted, info(B), Tx) || - B <- FlatBindings], - case Deleted of - not_deleted -> rabbit_exchange:callback(X, remove_bindings, - [X, FlatBindings], Tx); - deleted -> rabbit_event:notify( - exchange_deleted, - [{name, X#exchange.name}], - Tx), - rabbit_exchange:callback(X, delete, - [X, FlatBindings], Tx) - end + FlatBindings = lists:flatten(Bindings), + [rabbit_event:notify_if(not(Tx), binding_deleted, info(B)) || + B <- FlatBindings], + case Deleted of + not_deleted -> rabbit_exchange:callback(X, remove_bindings, + [Tx, X, FlatBindings]); + deleted -> rabbit_event:notify_if(not(Tx), exchange_deleted, + [{name, X#exchange.name}]), + rabbit_exchange:callback(X, delete, + [Tx, X, FlatBindings]) + end end, ok, Deletions). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index c1d92f73..cf126dcc 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -37,7 +37,7 @@ -export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). --export([notify/2, notify/3]). +-export([notify/2, notify_if/3]). %%---------------------------------------------------------------------------- @@ -77,7 +77,7 @@ -spec(stats_level/1 :: (state()) -> level()). -spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). --spec(notify/3 :: (event_type(), event_props(), boolean()) -> 'ok'). +-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). -endif. @@ -141,11 +141,8 @@ if_enabled(_State, Fun) -> Fun(), ok. -notify(Type, Props, Tx) -> - case Tx of - false -> notify(Type, Props); - true -> ok - end. +notify_if(false, _Type, _Props) -> ok; +notify_if(true, Type, Props) -> notify(Type, Props). notify(Type, Props) -> try diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 3eb7e0ff..a40dd3df 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -35,8 +35,9 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +-export([callback/3]). %% this must be run inside a mnesia tx --export([maybe_auto_delete/1, callback/4]). +-export([maybe_auto_delete/1]). -export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -85,7 +86,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(callback/4:: (rabbit_types:exchange(), atom(), [any()], boolean()) -> 'ok'). +-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). -endif. @@ -121,8 +122,7 @@ declare(XName, Type, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args}, %% We want to upset things if it isn't ok - TypeModule = type_to_module(Type), - ok = TypeModule:validate(X), + ok = (type_to_module(Type)):validate(X), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of @@ -140,17 +140,20 @@ declare(XName, Type, Durable, AutoDelete, Args) -> {existing, ExistingX} end end, - fun({Status, Exchange}, Tx) - when Status =:= new; Status =:= existing -> - rabbit_exchange:callback(Exchange, create, [Exchange], Tx), - rabbit_event:notify(exchange_created, info(Exchange), Tx), + fun ({new, Exchange}, Tx) -> + callback(Exchange, create, [Tx, Exchange]), + rabbit_event:notify_if( + not(Tx), exchange_created, info(Exchange)), Exchange; - (Err, _Tx) -> Err + ({existing, Exchange}, _Tx) -> + Exchange; + (Err, _Tx) -> + Err end). %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, [Module, _Tx]} = rabbit_exchange_type_registry:lookup_module(T), + {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), Module. %% Used with binaries sent over the wire; the type may not exist. @@ -273,13 +276,13 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun, TriggerFun) -> +call_with_exchange(XName, Fun, PrePostCommitFun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end - end, TriggerFun). + end, PrePostCommitFun). delete(XName, IfUnused) -> call_with_exchange(XName, @@ -287,11 +290,11 @@ delete(XName, IfUnused) -> true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - fun({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); - (Error = {error, _InUseOrNotFound}, _Tx) -> Error + fun ({deleted, X, Bs, Deletions}, Tx) -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), Tx); + (Error = {error, _InUseOrNotFound}, _Tx) -> Error end). maybe_auto_delete(#exchange{auto_delete = false}) -> @@ -302,11 +305,8 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. -callback(#exchange{type = XType}, Fun, Args, Tx) -> - {ok, [Module, ModTx]} = rabbit_exchange_type_registry:lookup_module(XType), - if (ModTx =:= Tx) -> ok = apply(Module, Fun, Args); - true -> ok - end. +callback(#exchange{type = XType}, Fun, Args) -> + apply(type_to_module(XType), Fun, Args). conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 25d74043..8b90cbc4 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -42,23 +42,19 @@ behaviour_info(callbacks) -> {validate, 1}, %% called after declaration when previously absent - %% registration determines whether this is called in a mnesia transaction - {create, 1}, + {create, 2}, %% called when recovering {recover, 2}, %% called after exchange deletion. - %% registration determines whether this is called in a mnesia transaction - {delete, 2}, + {delete, 3}, %% called after a binding has been added - %% registration determines whether this is called in a mnesia transaction - {add_binding, 2}, + {add_binding, 3}, %% called after bindings have been deleted. - %% registration determines whether this is called in a mnesia transaction - {remove_bindings, 2}, + {remove_bindings, 3}, %% called when comparing exchanges for equivalence - should return ok or %% exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 6bfc36d9..7e59e464 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,14 +35,14 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"direct">>, ?MODULE, false]}}, + [<<"direct">>, ?MODULE]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). @@ -55,10 +55,10 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, RoutingKey). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 9449b3df..13d5e78a 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,14 +35,14 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"fanout">>, ?MODULE, false]}}, + [<<"fanout">>, ?MODULE]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). @@ -54,10 +54,10 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, '_'). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 1c615c21..9aa9c211 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,14 +36,14 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"headers">>, ?MODULE, false]}}, + [<<"headers">>, ?MODULE]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). @@ -128,10 +128,10 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl index 4f06dcbe..f15275b5 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_exchange_type_registry.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/2, register/3, binary_to_type/1, lookup_module/1]). +-export([register/2, binary_to_type/1, lookup_module/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -47,12 +47,10 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/2 :: (binary(), atom()) -> 'ok'). --spec(register/3 :: (binary(), atom(), boolean()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). -spec(lookup_module/1 :: - (atom()) -> rabbit_types:ok_or_error2([atom() | boolean()], - 'not_found')). + (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). -endif. @@ -64,12 +62,7 @@ start_link() -> %%--------------------------------------------------------------------------- register(TypeName, ModuleName) -> - register(TypeName, ModuleName, false). - -% Tx determines whether exchange and binding creation and deletion callbacks -% are made from within a transaction (true) or outside a transaction (false) -register(TypeName, ModuleName, Tx) -> - gen_server:call(?SERVER, {register, TypeName, ModuleName, Tx}). + gen_server:call(?SERVER, {register, TypeName, ModuleName}). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it @@ -83,8 +76,8 @@ binary_to_type(TypeBin) when is_binary(TypeBin) -> lookup_module(T) when is_atom(T) -> case ets:lookup(?ETS_NAME, T) of - [{_, Module, Tx}] -> - {ok, [Module, Tx]}; + [{_, Module}] -> + {ok, Module}; [] -> {error, not_found} end. @@ -94,11 +87,11 @@ lookup_module(T) when is_atom(T) -> internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> list_to_atom(binary_to_list(TypeBin)). -internal_register(TypeName, ModuleName, Tx) +internal_register(TypeName, ModuleName) when is_binary(TypeName), is_atom(ModuleName) -> ok = sanity_check_module(ModuleName), true = ets:insert(?ETS_NAME, - {internal_binary_to_type(TypeName), ModuleName, Tx}), + {internal_binary_to_type(TypeName), ModuleName}), ok. sanity_check_module(Module) -> @@ -119,8 +112,8 @@ init([]) -> ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), {ok, none}. -handle_call({register, TypeName, ModuleName, Tx}, _From, State) -> - ok = internal_register(TypeName, ModuleName, Tx), +handle_call({register, TypeName, ModuleName}, _From, State) -> + ok = internal_register(TypeName, ModuleName), {reply, ok, State}; handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 0f2d5242..70de55e2 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,14 +35,14 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/1, recover/2, delete/2, add_binding/2, - remove_bindings/2, assert_args_equivalence/2]). +-export([validate/1, create/2, recover/2, delete/3, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"topic">>, ?MODULE, false]}}, + [<<"topic">>, ?MODULE]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). @@ -94,10 +94,10 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> last_topic_match(P, [BacktrackNext | R], BacktrackList). validate(_X) -> ok. -create(_X) -> ok. +create(_Tx, _X) -> ok. recover(_X, _Bs) -> ok. -delete(_X, _Bs) -> ok. -add_binding(_X, _B) -> ok. -remove_bindings(_X, _Bs) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3aed723d..d37af9e3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -49,6 +49,8 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([execute_mnesia_transaction/2]). +-export([execute_mnesia_transaction/3]). +-export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([upmap/2, map_in_order/2]). @@ -150,6 +152,10 @@ -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(execute_mnesia_transaction/2 :: (thunk(A), fun ((A, boolean()) -> B)) -> B). +-spec(execute_mnesia_transaction/3 :: + (thunk(A), fun ((A) -> B), fun ((A) -> B)) -> B). +-spec(execute_mnesia_tx_with_tail/1 :: + (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(makenode/1 :: ({string(), string()} | string()) -> node()). -spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). @@ -392,20 +398,44 @@ execute_mnesia_transaction(TxFun) -> {aborted, Reason} -> throw({error, Reason}) end. -%% Like execute_mnesia_transaction/2, with an additional Fun that gets called -%% immediately before and after the mnesia tx commit. It gets called with the -%% result of TxFun and a flag indicating whether mnesia is in a tx. -execute_mnesia_transaction(TxFun, TriggerFun) -> - Tx = mnesia:is_transaction(), - if Tx -> throw(unexpected_transaction); - true -> ok + +%% Like execute_mnesia_transaction/1 with additional Pre- and Post- +%% commit functions +execute_mnesia_transaction(TxFun, PreCommit, PostCommit) -> + case mnesia:is_transaction() of + true -> throw(unexpected_transaction); + false -> ok end, - Result = execute_mnesia_transaction( - fun () -> R = TxFun(), - TriggerFun(R, true), - R - end), - TriggerFun(Result, false). + PostCommit(execute_mnesia_transaction( + fun () -> + PreCommit(TxFun()) + end)). + +%% Like execute_mnesia_transaction/3 with similar Pre- and PostCommit funs +execute_mnesia_transaction(TxFun, PrePostCommitFun) -> + execute_mnesia_transaction(TxFun, + fun (Result) -> + PrePostCommitFun(Result, true), + Result + end, + fun (Result) -> + PrePostCommitFun(Result, false) + end). + +%% Like execute_mnesia_transaction/2, but TxFun is expected to return a +%% TailFun which gets called immediately before and after the tx commit +execute_mnesia_tx_with_tail(TxFun) -> + case mnesia:is_transaction() of + true -> execute_mnesia_transaction(TxFun); + false -> TailFun = + execute_mnesia_transaction( + fun () -> + TailFun = TxFun(), + TailFun(true), + TailFun + end), + TailFun(false) + end. ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). |