diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-11-23 14:12:55 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-11-23 14:12:55 +0000 |
commit | 66e84559b9b197fefcc6f9ae52856e178fe94e7b (patch) | |
tree | ad11750728de4249a85f9537168f17d481dfa73d | |
parent | 4ac7ef6ee499b97e3634578b3c5a9243c443e7c3 (diff) | |
download | rabbitmq-server-66e84559b9b197fefcc6f9ae52856e178fe94e7b.tar.gz |
Offer tx and non-tx exchange hooks
-rw-r--r-- | src/rabbit_access_control.erl | 29 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 56 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 33 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_registry.erl | 25 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 2 |
9 files changed, 100 insertions, 55 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index bc588013..f2007137 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -314,23 +314,28 @@ add_vhost(VHostPath) -> ok = mnesia:write(rabbit_vhost, #vhost{virtual_host = VHostPath}, write), - [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], - ok; + {ok, [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, % 0-9-1 pdf + {<<"amq.headers">>, headers}, % 0-9-1 xml + {<<"amq.fanout">>, fanout}]]}; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) end end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), - R. + case R of + {ok, Xs} -> [{rabbit_exchange:maybe_callback(Type, create, [X]), + rabbit_event:notify(exchange_created, + rabbit_exchange:info(X))} || + X = #exchange{type = Type} <- Xs], ok; + _ -> R + end. delete_vhost(VHostPath) -> %%FIXME: We are forced to delete the queues outside the TX below diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 668fb9bb..6a5d6ddd 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -128,7 +128,7 @@ remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> case binding_action( Binding, - fun (Src, Dst, B) -> + fun (Src = #exchange{ type = Type }, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else @@ -138,6 +138,8 @@ add(Binding, InnerFun) -> [] -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:write/3), + rabbit_exchange:maybe_callback( + Type, add_binding, [Src, B]), {new, Src, B}; [_] -> {existing, Src, B} end; @@ -146,7 +148,7 @@ add(Binding, InnerFun) -> end end) of {new, Src = #exchange{ type = Type }, B} -> - ok = (type_to_module(Type)):add_binding(Src, B), + rabbit_exchange:maybe_callback(Type, add_binding, [Src, B]), rabbit_event:notify(binding_created, info(B)); {existing, _, _} -> ok; @@ -168,9 +170,11 @@ remove(Binding, InnerFun) -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:delete_object/3), - {ok, - maybe_auto_delete(B#binding.source, - [B], new_deletions())}; + Deletions = maybe_auto_delete( + B#binding.source, + [B], new_deletions()), + ok = process_deletions(Deletions), + {ok, Deletions}; {error, _} = E -> E end @@ -303,11 +307,6 @@ call_with_source_and_destination(SrcName, DstName, Fun) -> table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. -%% Used with atoms from records; e.g., the type is expected to exist. -type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), - Module. - contains(Table, MatchHead) -> continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). @@ -424,16 +423,33 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> [Bindings1 | Bindings2]}. process_deletions(Deletions) -> - dict:fold( - fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify(binding_deleted, info(B)) || - B <- FlatBindings], - TypeModule = type_to_module(Type), + Tx = mnesia:is_transaction(), + NonTxFun = + if Tx -> fun(_X, _D, _B) -> ok end; + not Tx -> + fun (#exchange{name = XName}, Deleted, FlatBindings) -> + [rabbit_event:notify(binding_deleted, info(B)) + || B <- FlatBindings], + case Deleted of + not_deleted -> ok; + deleted -> rabbit_event:notify(exchange_deleted, + [{name, XName}]) + end + end + end, + Fun = + fun (X = #exchange{type = Type}, Deleted, FlatBindings) -> case Deleted of - not_deleted -> TypeModule:remove_bindings(X, FlatBindings); - deleted -> rabbit_event:notify(exchange_deleted, - [{name, X#exchange.name}]), - TypeModule:delete(X, FlatBindings) + not_deleted -> rabbit_exchange:maybe_callback( + Type, remove_bindings, [X, FlatBindings]); + deleted -> rabbit_exchange:maybe_callback( + Type, delete, [X, FlatBindings]) end + end, + dict:fold( + fun (_XName, {X, Deleted, Bindings}, ok) -> + FlatBindings = lists:flatten(Bindings), + ok = Fun(X, Deleted, FlatBindings), + NonTxFun(X, Deleted, FlatBindings) end, ok, Deletions). + diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4c0f341f..752f7935 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% this must be run inside a mnesia tx --export([maybe_auto_delete/1]). +-export([maybe_auto_delete/1, maybe_callback/3]). -export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -85,6 +85,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). +-spec(maybe_callback/3:: (atom(), atom(), [any()]) -> 'ok'). -endif. @@ -124,7 +125,7 @@ declare(XName, Type, Durable, AutoDelete, Args) -> %% value. TypeModule = type_to_module(Type), ok = TypeModule:validate(X), - case rabbit_misc:execute_mnesia_transaction( + case {rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> @@ -136,21 +137,23 @@ declare(XName, Type, Durable, AutoDelete, Args) -> false -> ok end, + maybe_callback(Type, create, [X]), {new, X}; [ExistingX] -> {existing, ExistingX} end - end) of - {new, X} -> TypeModule:create(X), - rabbit_event:notify(exchange_created, info(X)), - X; - {existing, X} -> X; - Err -> Err + end), mnesia:is_transaction()} of + {{new, X}, false} -> maybe_callback(Type, create, [X]), + rabbit_event:notify(exchange_created, info(X)), + X; + {{new, X}, true} -> X; + {{existing, X}, _} -> X; + Err -> Err end. %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + {ok, [Module, _Tx]} = rabbit_exchange_type_registry:lookup_module(T), Module. %% Used with binaries sent over the wire; the type may not exist. @@ -303,14 +306,24 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. +%% Possible callback, depending on whether mnesia is in a transaction +%% and whether transactional callbacks were requested by the exchange +maybe_callback(XType, Fun, Args) -> + {ok, [Module, ModTx]} = rabbit_exchange_type_registry:lookup_module(XType), + Tx = mnesia:is_transaction(), + if (ModTx =:= Tx) -> ok = apply(Module, Fun, Args); + true -> ok + end. + conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = XName}) -> +unconditional_delete(X = #exchange{name = XName, type = Type}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), + maybe_callback(Type, delete, [X, lists:flatten(Bindings)]), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 742944dc..25d74043 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -42,18 +42,22 @@ behaviour_info(callbacks) -> {validate, 1}, %% called after declaration when previously absent + %% registration determines whether this is called in a mnesia transaction {create, 1}, %% called when recovering {recover, 2}, %% called after exchange deletion. + %% registration determines whether this is called in a mnesia transaction {delete, 2}, %% called after a binding has been added + %% registration determines whether this is called in a mnesia transaction {add_binding, 2}, %% called after bindings have been deleted. + %% registration determines whether this is called in a mnesia transaction {remove_bindings, 2}, %% called when comparing exchanges for equivalence - should return ok or diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index d934a497..6bfc36d9 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -42,7 +42,7 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"direct">>, ?MODULE]}}, + [<<"direct">>, ?MODULE, false]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 77ca9686..9449b3df 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -42,7 +42,7 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"fanout">>, ?MODULE]}}, + [<<"fanout">>, ?MODULE, false]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index ec9e7ba4..1c615c21 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -43,7 +43,7 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"headers">>, ?MODULE]}}, + [<<"headers">>, ?MODULE, false]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl index f15275b5..4f06dcbe 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_exchange_type_registry.erl @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/2, binary_to_type/1, lookup_module/1]). +-export([register/2, register/3, binary_to_type/1, lookup_module/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -47,10 +47,12 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(register/3 :: (binary(), atom(), boolean()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). -spec(lookup_module/1 :: - (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). + (atom()) -> rabbit_types:ok_or_error2([atom() | boolean()], + 'not_found')). -endif. @@ -62,7 +64,12 @@ start_link() -> %%--------------------------------------------------------------------------- register(TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, TypeName, ModuleName}). + register(TypeName, ModuleName, false). + +% Tx determines whether exchange and binding creation and deletion callbacks +% are made from within a transaction (true) or outside a transaction (false) +register(TypeName, ModuleName, Tx) -> + gen_server:call(?SERVER, {register, TypeName, ModuleName, Tx}). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it @@ -76,8 +83,8 @@ binary_to_type(TypeBin) when is_binary(TypeBin) -> lookup_module(T) when is_atom(T) -> case ets:lookup(?ETS_NAME, T) of - [{_, Module}] -> - {ok, Module}; + [{_, Module, Tx}] -> + {ok, [Module, Tx]}; [] -> {error, not_found} end. @@ -87,11 +94,11 @@ lookup_module(T) when is_atom(T) -> internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> list_to_atom(binary_to_list(TypeBin)). -internal_register(TypeName, ModuleName) +internal_register(TypeName, ModuleName, Tx) when is_binary(TypeName), is_atom(ModuleName) -> ok = sanity_check_module(ModuleName), true = ets:insert(?ETS_NAME, - {internal_binary_to_type(TypeName), ModuleName}), + {internal_binary_to_type(TypeName), ModuleName, Tx}), ok. sanity_check_module(Module) -> @@ -112,8 +119,8 @@ init([]) -> ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), {ok, none}. -handle_call({register, TypeName, ModuleName}, _From, State) -> - ok = internal_register(TypeName, ModuleName), +handle_call({register, TypeName, ModuleName, Tx}, _From, State) -> + ok = internal_register(TypeName, ModuleName, Tx), {reply, ok, State}; handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index d3ecdd4d..0f2d5242 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -42,7 +42,7 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, {mfa, {rabbit_exchange_type_registry, register, - [<<"topic">>, ?MODULE]}}, + [<<"topic">>, ?MODULE, false]}}, {requires, rabbit_exchange_type_registry}, {enables, kernel_ready}]}). |