summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2010-11-23 14:12:55 +0000
committerEmile Joubert <emile@rabbitmq.com>2010-11-23 14:12:55 +0000
commit66e84559b9b197fefcc6f9ae52856e178fe94e7b (patch)
treead11750728de4249a85f9537168f17d481dfa73d
parent4ac7ef6ee499b97e3634578b3c5a9243c443e7c3 (diff)
downloadrabbitmq-server-66e84559b9b197fefcc6f9ae52856e178fe94e7b.tar.gz
Offer tx and non-tx exchange hooks
-rw-r--r--src/rabbit_access_control.erl29
-rw-r--r--src/rabbit_binding.erl56
-rw-r--r--src/rabbit_exchange.erl33
-rw-r--r--src/rabbit_exchange_type.erl4
-rw-r--r--src/rabbit_exchange_type_direct.erl2
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_headers.erl2
-rw-r--r--src/rabbit_exchange_type_registry.erl25
-rw-r--r--src/rabbit_exchange_type_topic.erl2
9 files changed, 100 insertions, 55 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index bc588013..f2007137 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -314,23 +314,28 @@ add_vhost(VHostPath) ->
ok = mnesia:write(rabbit_vhost,
#vhost{virtual_host = VHostPath},
write),
- [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
- ok;
+ {ok, [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, % 0-9-1 pdf
+ {<<"amq.headers">>, headers}, % 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]]};
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
end
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
- R.
+ case R of
+ {ok, Xs} -> [{rabbit_exchange:maybe_callback(Type, create, [X]),
+ rabbit_event:notify(exchange_created,
+ rabbit_exchange:info(X))} ||
+ X = #exchange{type = Type} <- Xs], ok;
+ _ -> R
+ end.
delete_vhost(VHostPath) ->
%%FIXME: We are forced to delete the queues outside the TX below
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 668fb9bb..6a5d6ddd 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -128,7 +128,7 @@ remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
case binding_action(
Binding,
- fun (Src, Dst, B) ->
+ fun (Src = #exchange{ type = Type }, Dst, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
@@ -138,6 +138,8 @@ add(Binding, InnerFun) ->
[] -> ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:write/3),
+ rabbit_exchange:maybe_callback(
+ Type, add_binding, [Src, B]),
{new, Src, B};
[_] -> {existing, Src, B}
end;
@@ -146,7 +148,7 @@ add(Binding, InnerFun) ->
end
end) of
{new, Src = #exchange{ type = Type }, B} ->
- ok = (type_to_module(Type)):add_binding(Src, B),
+ rabbit_exchange:maybe_callback(Type, add_binding, [Src, B]),
rabbit_event:notify(binding_created, info(B));
{existing, _, _} ->
ok;
@@ -168,9 +170,11 @@ remove(Binding, InnerFun) ->
ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:delete_object/3),
- {ok,
- maybe_auto_delete(B#binding.source,
- [B], new_deletions())};
+ Deletions = maybe_auto_delete(
+ B#binding.source,
+ [B], new_deletions()),
+ ok = process_deletions(Deletions),
+ {ok, Deletions};
{error, _} = E ->
E
end
@@ -303,11 +307,6 @@ call_with_source_and_destination(SrcName, DstName, Fun) ->
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
-%% Used with atoms from records; e.g., the type is expected to exist.
-type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
- Module.
-
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
@@ -424,16 +423,33 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
[Bindings1 | Bindings2]}.
process_deletions(Deletions) ->
- dict:fold(
- fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify(binding_deleted, info(B)) ||
- B <- FlatBindings],
- TypeModule = type_to_module(Type),
+ Tx = mnesia:is_transaction(),
+ NonTxFun =
+ if Tx -> fun(_X, _D, _B) -> ok end;
+ not Tx ->
+ fun (#exchange{name = XName}, Deleted, FlatBindings) ->
+ [rabbit_event:notify(binding_deleted, info(B))
+ || B <- FlatBindings],
+ case Deleted of
+ not_deleted -> ok;
+ deleted -> rabbit_event:notify(exchange_deleted,
+ [{name, XName}])
+ end
+ end
+ end,
+ Fun =
+ fun (X = #exchange{type = Type}, Deleted, FlatBindings) ->
case Deleted of
- not_deleted -> TypeModule:remove_bindings(X, FlatBindings);
- deleted -> rabbit_event:notify(exchange_deleted,
- [{name, X#exchange.name}]),
- TypeModule:delete(X, FlatBindings)
+ not_deleted -> rabbit_exchange:maybe_callback(
+ Type, remove_bindings, [X, FlatBindings]);
+ deleted -> rabbit_exchange:maybe_callback(
+ Type, delete, [X, FlatBindings])
end
+ end,
+ dict:fold(
+ fun (_XName, {X, Deleted, Bindings}, ok) ->
+ FlatBindings = lists:flatten(Bindings),
+ ok = Fun(X, Deleted, FlatBindings),
+ NonTxFun(X, Deleted, FlatBindings)
end, ok, Deletions).
+
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4c0f341f..752f7935 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
%% this must be run inside a mnesia tx
--export([maybe_auto_delete/1]).
+-export([maybe_auto_delete/1, maybe_callback/3]).
-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -85,6 +85,7 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
+-spec(maybe_callback/3:: (atom(), atom(), [any()]) -> 'ok').
-endif.
@@ -124,7 +125,7 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
%% value.
TypeModule = type_to_module(Type),
ok = TypeModule:validate(X),
- case rabbit_misc:execute_mnesia_transaction(
+ case {rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
@@ -136,21 +137,23 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
false ->
ok
end,
+ maybe_callback(Type, create, [X]),
{new, X};
[ExistingX] ->
{existing, ExistingX}
end
- end) of
- {new, X} -> TypeModule:create(X),
- rabbit_event:notify(exchange_created, info(X)),
- X;
- {existing, X} -> X;
- Err -> Err
+ end), mnesia:is_transaction()} of
+ {{new, X}, false} -> maybe_callback(Type, create, [X]),
+ rabbit_event:notify(exchange_created, info(X)),
+ X;
+ {{new, X}, true} -> X;
+ {{existing, X}, _} -> X;
+ Err -> Err
end.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, [Module, _Tx]} = rabbit_exchange_type_registry:lookup_module(T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
@@ -303,14 +306,24 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
+%% Possible callback, depending on whether mnesia is in a transaction
+%% and whether transactional callbacks were requested by the exchange
+maybe_callback(XType, Fun, Args) ->
+ {ok, [Module, ModTx]} = rabbit_exchange_type_registry:lookup_module(XType),
+ Tx = mnesia:is_transaction(),
+ if (ModTx =:= Tx) -> ok = apply(Module, Fun, Args);
+ true -> ok
+ end.
+
conditional_delete(X = #exchange{name = XName}) ->
case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = XName}) ->
+unconditional_delete(X = #exchange{name = XName, type = Type}) ->
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
+ maybe_callback(Type, delete, [X, lists:flatten(Bindings)]),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 742944dc..25d74043 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -42,18 +42,22 @@ behaviour_info(callbacks) ->
{validate, 1},
%% called after declaration when previously absent
+ %% registration determines whether this is called in a mnesia transaction
{create, 1},
%% called when recovering
{recover, 2},
%% called after exchange deletion.
+ %% registration determines whether this is called in a mnesia transaction
{delete, 2},
%% called after a binding has been added
+ %% registration determines whether this is called in a mnesia transaction
{add_binding, 2},
%% called after bindings have been deleted.
+ %% registration determines whether this is called in a mnesia transaction
{remove_bindings, 2},
%% called when comparing exchanges for equivalence - should return ok or
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d934a497..6bfc36d9 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -42,7 +42,7 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE]}},
+ [<<"direct">>, ?MODULE, false]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 77ca9686..9449b3df 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -42,7 +42,7 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE]}},
+ [<<"fanout">>, ?MODULE, false]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index ec9e7ba4..1c615c21 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -43,7 +43,7 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE]}},
+ [<<"headers">>, ?MODULE, false]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index f15275b5..4f06dcbe 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, binary_to_type/1, lookup_module/1]).
+-export([register/2, register/3, binary_to_type/1, lookup_module/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -47,10 +47,12 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (binary(), atom()) -> 'ok').
+-spec(register/3 :: (binary(), atom(), boolean()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
-spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
+ (atom()) -> rabbit_types:ok_or_error2([atom() | boolean()],
+ 'not_found')).
-endif.
@@ -62,7 +64,12 @@ start_link() ->
%%---------------------------------------------------------------------------
register(TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName}).
+ register(TypeName, ModuleName, false).
+
+% Tx determines whether exchange and binding creation and deletion callbacks
+% are made from within a transaction (true) or outside a transaction (false)
+register(TypeName, ModuleName, Tx) ->
+ gen_server:call(?SERVER, {register, TypeName, ModuleName, Tx}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -76,8 +83,8 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
lookup_module(T) when is_atom(T) ->
case ets:lookup(?ETS_NAME, T) of
- [{_, Module}] ->
- {ok, Module};
+ [{_, Module, Tx}] ->
+ {ok, [Module, Tx]};
[] ->
{error, not_found}
end.
@@ -87,11 +94,11 @@ lookup_module(T) when is_atom(T) ->
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName)
+internal_register(TypeName, ModuleName, Tx)
when is_binary(TypeName), is_atom(ModuleName) ->
ok = sanity_check_module(ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName}),
+ {internal_binary_to_type(TypeName), ModuleName, Tx}),
ok.
sanity_check_module(Module) ->
@@ -112,8 +119,8 @@ init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName}, _From, State) ->
- ok = internal_register(TypeName, ModuleName),
+handle_call({register, TypeName, ModuleName, Tx}, _From, State) ->
+ ok = internal_register(TypeName, ModuleName, Tx),
{reply, ok, State};
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d3ecdd4d..0f2d5242 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -42,7 +42,7 @@
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE]}},
+ [<<"topic">>, ?MODULE, false]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).