summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2010-12-23 16:12:01 +0000
committerEmile Joubert <emile@rabbitmq.com>2010-12-23 16:12:01 +0000
commit29527cea333c8ccc57449483da9326d484dfd11b (patch)
tree4e334e15be12a141b486c39361c5a54e8497f1db
parent30f878d2e519fbe872aef0018ba4f758ea549f11 (diff)
downloadrabbitmq-server-29527cea333c8ccc57449483da9326d484dfd11b.tar.gz
Changed exchange callback API for transactions
-rw-r--r--include/rabbit_exchange_type_spec.hrl8
-rw-r--r--src/rabbit_access_control.erl29
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_binding.erl80
-rw-r--r--src/rabbit_event.erl11
-rw-r--r--src/rabbit_exchange.erl44
-rw-r--r--src/rabbit_exchange_type.erl12
-rw-r--r--src/rabbit_exchange_type_direct.erl14
-rw-r--r--src/rabbit_exchange_type_fanout.erl14
-rw-r--r--src/rabbit_exchange_type_headers.erl14
-rw-r--r--src/rabbit_exchange_type_registry.erl25
-rw-r--r--src/rabbit_exchange_type_topic.erl14
-rw-r--r--src/rabbit_misc.erl56
13 files changed, 180 insertions, 163 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index ae326a87..280ffd15 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -34,14 +34,14 @@
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(delete/2 :: (rabbit_types:exchange(),
+-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(add_binding/2 :: (rabbit_types:exchange(),
+-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok').
--spec(remove_bindings/2 :: (rabbit_types:exchange(),
+-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(assert_args_equivalence/2 ::
(rabbit_types:exchange(), rabbit_framing:amqp_table())
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b47e4f1e..d5f03fce 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -316,8 +316,8 @@ add_vhost(VHostPath) ->
[_] -> mnesia:abort({vhost_already_exists, VHostPath})
end
end,
- fun (Arg, true) -> Arg;
- (_Arg, false) ->
+ fun (ok) -> ok end,
+ fun (ok) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
@@ -329,25 +329,20 @@ add_vhost(VHostPath) ->
{<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok
- end
- ),
+ end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
R.
delete_vhost(VHostPath) ->
- %%FIXME: We are forced to delete the queues outside the TX below
- %%because queue deletion involves sending messages to the queue
- %%process, which in turn results in further mnesia actions and
- %%eventually the termination of that process.
- lists:foreach(fun (Q) ->
- {ok,_} = rabbit_amqqueue:delete(Q, false, false)
- end,
- rabbit_amqqueue:list(VHostPath)),
- %%Exchange deletion causes notifications which must be sent outside the TX
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
+ %% FIXME: We are forced to delete the queues and exchanges outside
+ %% the TX below. Queue deletion involves sending messages to the queue
+ %% process, which in turn results in further mnesia actions and
+ %% eventually the termination of that process. Exchange deletion causes
+ %% notifications which must be sent outside the TX
+ [{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
+ Q <- rabbit_amqqueue:list(VHostPath)],
+ [ok = rabbit_exchange:delete(Name, false) ||
+ #exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_vhost(
VHostPath,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e4971958..0af63fb7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -215,29 +215,33 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
end.
internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
- rabbit_misc:execute_mnesia_transaction(
+ EmptyFun = fun (_) -> ok end,
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
+ {ReturnArg, TailFun} =
case Recover of
true ->
ok = store_queue(Q),
- {false, Q};
+ {Q, EmptyFun};
false ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue,
QueueName}) of
[] -> ok = store_queue(Q),
- {true, Q};
+ B = add_default_binding(Q),
+ {Q, B};
%% Q exists on stopped node
- [_] -> {false, not_found}
+ [_] -> {not_found, EmptyFun}
end;
[ExistingQ] ->
- {false, ExistingQ}
+ {ExistingQ, EmptyFun}
end
+ end,
+ fun (Tx) ->
+ TailFun(Tx),
+ ReturnArg
end
- end,
- fun ({true, Q}, false) -> ok = add_default_binding(Q), Q;
- ({_AddBinding, Arg}, _Tx) -> Arg
end).
store_queue(Q = #amqqueue{durable = true}) ->
@@ -463,7 +467,7 @@ internal_delete(QueueName) ->
end
end,
fun ({error, _} = Err, _Tx) -> Err;
- (Deletions, Tx) ->
+ (Deletions, Tx) ->
ok = rabbit_binding:process_deletions(Deletions, Tx)
end).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 19c7faee..aad501ba 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -119,8 +119,11 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end,
- fun (_, _) -> ok end).
+ fun (_Src, _Dst, B) ->
+ fun (_Tx) ->
+ mnesia:read({rabbit_route, B}) /= []
+ end
+ end).
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
@@ -139,26 +142,26 @@ add(Binding, InnerFun) ->
[] -> ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:write/3),
- {new, Src, B};
- [_] -> {existing, Src, B}
+ fun (Tx) ->
+ ok = rabbit_exchange:callback(
+ Src, add_binding,
+ [Tx, Src, B]),
+ rabbit_event:notify_if(
+ not(Tx),
+ binding_created, info(B))
+ end;
+ [_] -> fun (_Tx) -> ok end
end;
- {error, _} = E ->
- E
+ {error, _} = Err ->
+ fun (_Tx) -> Err end
end
- end,
- fun({new, Src, B}, Tx) ->
- ok = rabbit_exchange:callback(Src, add_binding, [Src, B], Tx),
- rabbit_event:notify(binding_created, info(B), Tx);
- ({existing, _, _}, _Tx) ->
- ok;
- ({error, _} = Err, _Tx) ->
- Err
end).
remove(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
+ Result =
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] ->
@@ -175,12 +178,15 @@ remove(Binding, InnerFun) ->
{error, _} = E ->
E
end
+ end,
+ fun (Tx) ->
+ case {Result, Tx} of
+ {{ok, Deletions}, _} ->
+ ok = process_deletions(Deletions, Tx);
+ {{error, _} = Err, _} ->
+ Err
+ end
end
- end,
- fun ({error, _} = Err, _Tx) ->
- Err;
- ({ok, Deletions}, Tx) ->
- ok = process_deletions(Deletions, Tx)
end).
list(VHostPath) ->
@@ -269,14 +275,13 @@ all_durable(Resources) ->
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
- args = Arguments}, Fun, TriggerFun) ->
+ args = Arguments}, Fun) ->
call_with_source_and_destination(
SrcName, DstName,
fun (Src, Dst) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
Fun(Src, Dst, Binding#binding{args = SortedArgs})
- end,
- TriggerFun).
+ end).
sync_binding(Binding, Durable, Fun) ->
ok = case Durable of
@@ -289,10 +294,10 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_source_and_destination(SrcName, DstName, Fun, TriggerFun) ->
+call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case {mnesia:read({SrcTable, SrcName}),
mnesia:read({DstTable, DstName})} of
{[Src], [Dst]} -> Fun(Src, Dst);
@@ -300,8 +305,7 @@ call_with_source_and_destination(SrcName, DstName, Fun, TriggerFun) ->
{[_], [] } -> {error, destination_not_found};
{[], [] } -> {error, source_and_destination_not_found}
end
- end,
- TriggerFun).
+ end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
@@ -424,17 +428,15 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
process_deletions(Deletions, Tx) ->
dict:fold(
fun (_XName, {X, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify(binding_deleted, info(B), Tx) ||
- B <- FlatBindings],
- case Deleted of
- not_deleted -> rabbit_exchange:callback(X, remove_bindings,
- [X, FlatBindings], Tx);
- deleted -> rabbit_event:notify(
- exchange_deleted,
- [{name, X#exchange.name}],
- Tx),
- rabbit_exchange:callback(X, delete,
- [X, FlatBindings], Tx)
- end
+ FlatBindings = lists:flatten(Bindings),
+ [rabbit_event:notify_if(not(Tx), binding_deleted, info(B)) ||
+ B <- FlatBindings],
+ case Deleted of
+ not_deleted -> rabbit_exchange:callback(X, remove_bindings,
+ [Tx, X, FlatBindings]);
+ deleted -> rabbit_event:notify_if(not(Tx), exchange_deleted,
+ [{name, X#exchange.name}]),
+ rabbit_exchange:callback(X, delete,
+ [Tx, X, FlatBindings])
+ end
end, ok, Deletions).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index c1d92f73..cf126dcc 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -37,7 +37,7 @@
-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
-export([reset_stats_timer/1]).
-export([stats_level/1, if_enabled/2]).
--export([notify/2, notify/3]).
+-export([notify/2, notify_if/3]).
%%----------------------------------------------------------------------------
@@ -77,7 +77,7 @@
-spec(stats_level/1 :: (state()) -> level()).
-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
--spec(notify/3 :: (event_type(), event_props(), boolean()) -> 'ok').
+-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
-endif.
@@ -141,11 +141,8 @@ if_enabled(_State, Fun) ->
Fun(),
ok.
-notify(Type, Props, Tx) ->
- case Tx of
- false -> notify(Type, Props);
- true -> ok
- end.
+notify_if(false, _Type, _Props) -> ok;
+notify_if(true, Type, Props) -> notify(Type, Props).
notify(Type, Props) ->
try
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 3eb7e0ff..a40dd3df 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -35,8 +35,9 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+-export([callback/3]).
%% this must be run inside a mnesia tx
--export([maybe_auto_delete/1, callback/4]).
+-export([maybe_auto_delete/1]).
-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -85,7 +86,7 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(callback/4:: (rabbit_types:exchange(), atom(), [any()], boolean()) -> 'ok').
+-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
-endif.
@@ -121,8 +122,7 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
auto_delete = AutoDelete,
arguments = Args},
%% We want to upset things if it isn't ok
- TypeModule = type_to_module(Type),
- ok = TypeModule:validate(X),
+ ok = (type_to_module(Type)):validate(X),
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
@@ -140,17 +140,20 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
{existing, ExistingX}
end
end,
- fun({Status, Exchange}, Tx)
- when Status =:= new; Status =:= existing ->
- rabbit_exchange:callback(Exchange, create, [Exchange], Tx),
- rabbit_event:notify(exchange_created, info(Exchange), Tx),
+ fun ({new, Exchange}, Tx) ->
+ callback(Exchange, create, [Tx, Exchange]),
+ rabbit_event:notify_if(
+ not(Tx), exchange_created, info(Exchange)),
Exchange;
- (Err, _Tx) -> Err
+ ({existing, Exchange}, _Tx) ->
+ Exchange;
+ (Err, _Tx) ->
+ Err
end).
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
- {ok, [Module, _Tx]} = rabbit_exchange_type_registry:lookup_module(T),
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
Module.
%% Used with binaries sent over the wire; the type may not exist.
@@ -273,13 +276,13 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun, TriggerFun) ->
+call_with_exchange(XName, Fun, PrePostCommitFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
- end, TriggerFun).
+ end, PrePostCommitFun).
delete(XName, IfUnused) ->
call_with_exchange(XName,
@@ -287,11 +290,11 @@ delete(XName, IfUnused) ->
true -> fun conditional_delete/1;
false -> fun unconditional_delete/1
end,
- fun({deleted, X, Bs, Deletions}, Tx) ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
- (Error = {error, _InUseOrNotFound}, _Tx) -> Error
+ fun ({deleted, X, Bs, Deletions}, Tx) ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions), Tx);
+ (Error = {error, _InUseOrNotFound}, _Tx) -> Error
end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
@@ -302,11 +305,8 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-callback(#exchange{type = XType}, Fun, Args, Tx) ->
- {ok, [Module, ModTx]} = rabbit_exchange_type_registry:lookup_module(XType),
- if (ModTx =:= Tx) -> ok = apply(Module, Fun, Args);
- true -> ok
- end.
+callback(#exchange{type = XType}, Fun, Args) ->
+ apply(type_to_module(XType), Fun, Args).
conditional_delete(X = #exchange{name = XName}) ->
case rabbit_binding:has_for_source(XName) of
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 25d74043..8b90cbc4 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -42,23 +42,19 @@ behaviour_info(callbacks) ->
{validate, 1},
%% called after declaration when previously absent
- %% registration determines whether this is called in a mnesia transaction
- {create, 1},
+ {create, 2},
%% called when recovering
{recover, 2},
%% called after exchange deletion.
- %% registration determines whether this is called in a mnesia transaction
- {delete, 2},
+ {delete, 3},
%% called after a binding has been added
- %% registration determines whether this is called in a mnesia transaction
- {add_binding, 2},
+ {add_binding, 3},
%% called after bindings have been deleted.
- %% registration determines whether this is called in a mnesia transaction
- {remove_bindings, 2},
+ {remove_bindings, 3},
%% called when comparing exchanges for equivalence - should return ok or
%% exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 6bfc36d9..7e59e464 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -35,14 +35,14 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"direct">>, ?MODULE, false]}},
+ [<<"direct">>, ?MODULE]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
@@ -55,10 +55,10 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, RoutingKey).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 9449b3df..13d5e78a 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,14 +35,14 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"fanout">>, ?MODULE, false]}},
+ [<<"fanout">>, ?MODULE]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
@@ -54,10 +54,10 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 1c615c21..9aa9c211 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,14 +36,14 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"headers">>, ?MODULE, false]}},
+ [<<"headers">>, ?MODULE]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
@@ -128,10 +128,10 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index 4f06dcbe..f15275b5 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -38,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([register/2, register/3, binary_to_type/1, lookup_module/1]).
+-export([register/2, binary_to_type/1, lookup_module/1]).
-define(SERVER, ?MODULE).
-define(ETS_NAME, ?MODULE).
@@ -47,12 +47,10 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (binary(), atom()) -> 'ok').
--spec(register/3 :: (binary(), atom(), boolean()) -> 'ok').
-spec(binary_to_type/1 ::
(binary()) -> atom() | rabbit_types:error('not_found')).
-spec(lookup_module/1 ::
- (atom()) -> rabbit_types:ok_or_error2([atom() | boolean()],
- 'not_found')).
+ (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
-endif.
@@ -64,12 +62,7 @@ start_link() ->
%%---------------------------------------------------------------------------
register(TypeName, ModuleName) ->
- register(TypeName, ModuleName, false).
-
-% Tx determines whether exchange and binding creation and deletion callbacks
-% are made from within a transaction (true) or outside a transaction (false)
-register(TypeName, ModuleName, Tx) ->
- gen_server:call(?SERVER, {register, TypeName, ModuleName, Tx}).
+ gen_server:call(?SERVER, {register, TypeName, ModuleName}).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
@@ -83,8 +76,8 @@ binary_to_type(TypeBin) when is_binary(TypeBin) ->
lookup_module(T) when is_atom(T) ->
case ets:lookup(?ETS_NAME, T) of
- [{_, Module, Tx}] ->
- {ok, [Module, Tx]};
+ [{_, Module}] ->
+ {ok, Module};
[] ->
{error, not_found}
end.
@@ -94,11 +87,11 @@ lookup_module(T) when is_atom(T) ->
internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
list_to_atom(binary_to_list(TypeBin)).
-internal_register(TypeName, ModuleName, Tx)
+internal_register(TypeName, ModuleName)
when is_binary(TypeName), is_atom(ModuleName) ->
ok = sanity_check_module(ModuleName),
true = ets:insert(?ETS_NAME,
- {internal_binary_to_type(TypeName), ModuleName, Tx}),
+ {internal_binary_to_type(TypeName), ModuleName}),
ok.
sanity_check_module(Module) ->
@@ -119,8 +112,8 @@ init([]) ->
?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]),
{ok, none}.
-handle_call({register, TypeName, ModuleName, Tx}, _From, State) ->
- ok = internal_register(TypeName, ModuleName, Tx),
+handle_call({register, TypeName, ModuleName}, _From, State) ->
+ ok = internal_register(TypeName, ModuleName),
{reply, ok, State};
handle_call(Request, _From, State) ->
{stop, {unhandled_call, Request}, State}.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 0f2d5242..70de55e2 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,14 +35,14 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
{mfa, {rabbit_exchange_type_registry, register,
- [<<"topic">>, ?MODULE, false]}},
+ [<<"topic">>, ?MODULE]}},
{requires, rabbit_exchange_type_registry},
{enables, kernel_ready}]}).
@@ -94,10 +94,10 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
last_topic_match(P, [BacktrackNext | R], BacktrackList).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3aed723d..d37af9e3 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -49,6 +49,8 @@
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
+-export([execute_mnesia_transaction/3]).
+-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
@@ -150,6 +152,10 @@
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(execute_mnesia_transaction/2 ::
(thunk(A), fun ((A, boolean()) -> B)) -> B).
+-spec(execute_mnesia_transaction/3 ::
+ (thunk(A), fun ((A) -> B), fun ((A) -> B)) -> B).
+-spec(execute_mnesia_tx_with_tail/1 ::
+ (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(makenode/1 :: ({string(), string()} | string()) -> node()).
-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
@@ -392,20 +398,44 @@ execute_mnesia_transaction(TxFun) ->
{aborted, Reason} -> throw({error, Reason})
end.
-%% Like execute_mnesia_transaction/2, with an additional Fun that gets called
-%% immediately before and after the mnesia tx commit. It gets called with the
-%% result of TxFun and a flag indicating whether mnesia is in a tx.
-execute_mnesia_transaction(TxFun, TriggerFun) ->
- Tx = mnesia:is_transaction(),
- if Tx -> throw(unexpected_transaction);
- true -> ok
+
+%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
+%% commit functions
+execute_mnesia_transaction(TxFun, PreCommit, PostCommit) ->
+ case mnesia:is_transaction() of
+ true -> throw(unexpected_transaction);
+ false -> ok
end,
- Result = execute_mnesia_transaction(
- fun () -> R = TxFun(),
- TriggerFun(R, true),
- R
- end),
- TriggerFun(Result, false).
+ PostCommit(execute_mnesia_transaction(
+ fun () ->
+ PreCommit(TxFun())
+ end)).
+
+%% Like execute_mnesia_transaction/3 with similar Pre- and PostCommit funs
+execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
+ execute_mnesia_transaction(TxFun,
+ fun (Result) ->
+ PrePostCommitFun(Result, true),
+ Result
+ end,
+ fun (Result) ->
+ PrePostCommitFun(Result, false)
+ end).
+
+%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
+%% TailFun which gets called immediately before and after the tx commit
+execute_mnesia_tx_with_tail(TxFun) ->
+ case mnesia:is_transaction() of
+ true -> execute_mnesia_transaction(TxFun);
+ false -> TailFun =
+ execute_mnesia_transaction(
+ fun () ->
+ TailFun = TxFun(),
+ TailFun(true),
+ TailFun
+ end),
+ TailFun(false)
+ end.
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).