summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-13 14:23:37 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-13 14:23:37 +0000
commit801ceca09c3567f181d833f32888aed52ebdf64d (patch)
tree8bdb12f17b6da28a027ad3811f8e1384e87ebf3e
parent26b539bf4ac7e8bff4ff319c955d47e62597ecc0 (diff)
parentc048e064f7d0a69a8ab707ede80acddd339a9d4f (diff)
downloadrabbitmq-server-801ceca09c3567f181d833f32888aed52ebdf64d.tar.gz
Merging default into bug23445
-rw-r--r--include/rabbit_exchange_type_spec.hrl8
-rw-r--r--src/rabbit_amqqueue.erl54
-rw-r--r--src/rabbit_binding.erl102
-rw-r--r--src/rabbit_event.erl6
-rw-r--r--src/rabbit_exchange.erl60
-rw-r--r--src/rabbit_exchange_type.erl8
-rw-r--r--src/rabbit_exchange_type_direct.erl12
-rw-r--r--src/rabbit_exchange_type_fanout.erl12
-rw-r--r--src/rabbit_exchange_type_headers.erl12
-rw-r--r--src/rabbit_exchange_type_topic.erl12
-rw-r--r--src/rabbit_misc.erl54
-rw-r--r--src/rabbit_vhost.erl56
12 files changed, 237 insertions, 159 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index ae326a87..280ffd15 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -34,14 +34,14 @@
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(delete/2 :: (rabbit_types:exchange(),
+-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(add_binding/2 :: (rabbit_types:exchange(),
+-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok').
--spec(remove_bindings/2 :: (rabbit_types:exchange(),
+-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(assert_args_equivalence/2 ::
(rabbit_types:exchange(), rabbit_framing:amqp_table())
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 35ed1c94..56fc64fe 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -213,25 +213,31 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
end.
internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
+ {ReturnArg, TailFun} =
case Recover of
true ->
ok = store_queue(Q),
- Q;
+ {Q, fun rabbit_misc:const_ok/1};
false ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue,
QueueName}) of
[] -> ok = store_queue(Q),
- ok = add_default_binding(Q),
- Q;
- [_] -> not_found %% Q exists on stopped node
+ B = add_default_binding(Q),
+ {Q, B};
+ %% Q exists on stopped node
+ [_] -> {not_found, fun rabbit_misc:const_ok/1}
end;
[ExistingQ] ->
- ExistingQ
+ {ExistingQ, fun rabbit_misc:const_ok/1}
end
+ end,
+ fun (Tx) ->
+ TailFun(Tx),
+ ReturnArg
end
end).
@@ -447,16 +453,18 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- case rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[_] -> internal_delete1(QueueName)
end
- end) of
- {error, _} = Err -> Err;
- Deletions -> ok = rabbit_binding:process_deletions(Deletions)
- end.
+ end,
+ fun ({error, _} = Err, _Tx) ->
+ Err;
+ (Deletions, Tx) ->
+ ok = rabbit_binding:process_deletions(Deletions, Tx)
+ end).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
@@ -480,16 +488,20 @@ drop_expired(QPid) ->
gen_server2:cast(QPid, drop_expired).
on_node_down(Node) ->
- rabbit_binding:process_deletions(
- lists:foldl(
- fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- rabbit_misc:execute_mnesia_transaction(
- fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end))).
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end,
+ fun (Deletions, Tx) ->
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ Deletions),
+ Tx)
+ end).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index ccadf5af..93f9dc27 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -36,7 +36,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/1]).
+ process_deletions/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -91,7 +91,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/1 :: (deletions()) -> 'ok').
+-spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -119,14 +119,16 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_Src, _Dst, B) ->
+ rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
+ end).
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
- case binding_action(
+ binding_action(
Binding,
fun (Src, Dst, B) ->
%% this argument is used to check queue exclusivity;
@@ -138,26 +140,26 @@ add(Binding, InnerFun) ->
[] -> ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:write/3),
- {new, Src, B};
- [_] -> {existing, Src, B}
+ fun (Tx) ->
+ ok = rabbit_exchange:callback(
+ Src, add_binding,
+ [Tx, Src, B]),
+ rabbit_event:notify_if(
+ not Tx,
+ binding_created, info(B))
+ end;
+ [_] -> fun (_Tx) -> ok end
end;
- {error, _} = E ->
- E
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
end
- end) of
- {new, Src = #exchange{ type = Type }, B} ->
- ok = (type_to_module(Type)):add_binding(Src, B),
- rabbit_event:notify(binding_created, info(B));
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
+ end).
remove(Binding, InnerFun) ->
- case binding_action(
+ binding_action(
Binding,
fun (Src, Dst, B) ->
+ Result =
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] ->
@@ -174,13 +176,16 @@ remove(Binding, InnerFun) ->
{error, _} = E ->
E
end
+ end,
+ fun (Tx) ->
+ case Result of
+ {ok, Deletions} ->
+ ok = process_deletions(Deletions, Tx);
+ {error, _} = Err ->
+ Err
+ end
end
- end) of
- {error, _} = Err ->
- Err;
- {ok, Deletions} ->
- ok = process_deletions(Deletions)
- end.
+ end).
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
@@ -290,24 +295,22 @@ sync_binding(Binding, Durable, Fun) ->
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({SrcTable, SrcName}),
- mnesia:read({DstTable, DstName})} of
- {[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> {error, source_not_found};
- {[_], [] } -> {error, destination_not_found};
- {[], [] } -> {error, source_and_destination_not_found}
- end
+ ErrFun = fun (Err) -> rabbit_misc:const(Err) end,
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> ErrFun({error, source_not_found});
+ {[_], [] } -> ErrFun({error, destination_not_found});
+ {[], [] } -> ErrFun({error,
+ source_and_destination_not_found})
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
-%% Used with atoms from records; e.g., the type is expected to exist.
-type_to_module(T) ->
- {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- Module.
-
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
@@ -423,17 +426,18 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions) ->
+process_deletions(Deletions, Tx) ->
dict:fold(
- fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify(binding_deleted, info(B)) ||
- B <- FlatBindings],
- TypeModule = type_to_module(Type),
- case Deleted of
- not_deleted -> TypeModule:remove_bindings(X, FlatBindings);
- deleted -> rabbit_event:notify(exchange_deleted,
- [{name, X#exchange.name}]),
- TypeModule:delete(X, FlatBindings)
- end
+ fun (_XName, {X, Deleted, Bindings}, ok) ->
+ FlatBindings = lists:flatten(Bindings),
+ [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
+ B <- FlatBindings],
+ case Deleted of
+ not_deleted -> rabbit_exchange:callback(X, remove_bindings,
+ [Tx, X, FlatBindings]);
+ deleted -> rabbit_event:notify_if(not Tx, exchange_deleted,
+ [{name, X#exchange.name}]),
+ rabbit_exchange:callback(X, delete,
+ [Tx, X, FlatBindings])
+ end
end, ok, Deletions).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 2b236531..9755654b 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -37,7 +37,7 @@
-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
-export([reset_stats_timer/1]).
-export([stats_level/1, if_enabled/2]).
--export([notify/2]).
+-export([notify/2, notify_if/3]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,7 @@
-spec(stats_level/1 :: (state()) -> level()).
-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
-endif.
@@ -140,6 +141,9 @@ if_enabled(_State, Fun) ->
Fun(),
ok.
+notify_if(true, Type, Props) -> notify(Type, Props);
+notify_if(false, _Type, _Props) -> ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a95cf0b1..0ec57564 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -35,6 +35,7 @@
-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+-export([callback/3]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
@@ -86,6 +87,7 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
+-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
-endif.
@@ -121,12 +123,9 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- %% We want to upset things if it isn't ok; this is different from
- %% the other hooks invocations, where we tend to ignore the return
- %% value.
- TypeModule = type_to_module(Type),
- ok = TypeModule:validate(X),
- case rabbit_misc:execute_mnesia_transaction(
+ %% We want to upset things if it isn't ok
+ ok = (type_to_module(Type)):validate(X),
+ rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
@@ -142,13 +141,17 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
[ExistingX] ->
{existing, ExistingX}
end
- end) of
- {new, X} -> TypeModule:create(X),
- rabbit_event:notify(exchange_created, info(X)),
- X;
- {existing, X} -> X;
- Err -> Err
- end.
+ end,
+ fun ({new, Exchange}, Tx) ->
+ callback(Exchange, create, [Tx, Exchange]),
+ rabbit_event:notify_if(
+ not Tx, exchange_created, info(Exchange)),
+ Exchange;
+ ({existing, Exchange}, _Tx) ->
+ Exchange;
+ (Err, _Tx) ->
+ Err
+ end).
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
@@ -278,27 +281,27 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun) ->
+call_with_exchange(XName, Fun, PrePostCommitFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
- end).
+ end, PrePostCommitFun).
delete(XName, IfUnused) ->
- Fun = case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- case call_with_exchange(XName, Fun) of
- {deleted, X, Bs, Deletions} ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions));
- Error = {error, _InUseOrNotFound} ->
- Error
- end.
+ call_with_exchange(XName,
+ case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
+ fun ({deleted, X, Bs, Deletions}, Tx) ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions), Tx);
+ (Error = {error, _InUseOrNotFound}, _Tx) ->
+ Error
+ end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
@@ -308,6 +311,9 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
+callback(#exchange{type = XType}, Fun, Args) ->
+ apply(type_to_module(XType), Fun, Args).
+
conditional_delete(X = #exchange{name = XName}) ->
case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 742944dc..8b90cbc4 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -42,19 +42,19 @@ behaviour_info(callbacks) ->
{validate, 1},
%% called after declaration when previously absent
- {create, 1},
+ {create, 2},
%% called when recovering
{recover, 2},
%% called after exchange deletion.
- {delete, 2},
+ {delete, 3},
%% called after a binding has been added
- {add_binding, 2},
+ {add_binding, 3},
%% called after bindings have been deleted.
- {remove_bindings, 2},
+ {remove_bindings, 3},
%% called when comparing exchanges for equivalence - should return ok or
%% exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d49d0199..adb47cc0 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -55,10 +55,10 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, RoutingKey).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index e7f75464..5266dd87 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -54,10 +54,10 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index caf141fe..efe0ec88 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,8 +36,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -128,10 +128,10 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 44851858..2f0d47a7 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -94,10 +94,10 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
last_topic_match(P, [BacktrackNext | R], BacktrackList).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 15ba787a..dbc09e7f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -48,6 +48,9 @@
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
+-export([execute_mnesia_transaction/2]).
+-export([execute_mnesia_transaction/3]).
+-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
@@ -67,6 +70,7 @@
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([lock_file/1]).
+-export([const_ok/1, const/1]).
%%----------------------------------------------------------------------------
@@ -142,6 +146,12 @@
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
-> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
+-spec(execute_mnesia_transaction/2 ::
+ (thunk(A), fun ((A, boolean()) -> B)) -> B).
+-spec(execute_mnesia_transaction/3 ::
+ (thunk(A), fun ((A) -> B), fun ((A) -> B)) -> B).
+-spec(execute_mnesia_tx_with_tail/1 ::
+ (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(makenode/1 :: ({string(), string()} | string()) -> node()).
-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
@@ -196,6 +206,8 @@
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
+-spec(const_ok/1 :: (any()) -> 'ok').
+-spec(const/1 :: (A) -> fun ((_) -> A)).
-endif.
@@ -377,6 +389,45 @@ execute_mnesia_transaction(TxFun) ->
{aborted, Reason} -> throw({error, Reason})
end.
+
+%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
+%% commit functions
+execute_mnesia_transaction(TxFun, PreCommit, PostCommit) ->
+ case mnesia:is_transaction() of
+ true -> throw(unexpected_transaction);
+ false -> ok
+ end,
+ PostCommit(execute_mnesia_transaction(
+ fun () ->
+ PreCommit(TxFun())
+ end)).
+
+%% Like execute_mnesia_transaction/3 with similar Pre- and PostCommit funs
+execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
+ execute_mnesia_transaction(TxFun,
+ fun (Result) ->
+ PrePostCommitFun(Result, true),
+ Result
+ end,
+ fun (Result) ->
+ PrePostCommitFun(Result, false)
+ end).
+
+%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
+%% TailFun which gets called immediately before and after the tx commit
+execute_mnesia_tx_with_tail(TxFun) ->
+ case mnesia:is_transaction() of
+ true -> execute_mnesia_transaction(TxFun);
+ false -> TailFun =
+ execute_mnesia_transaction(
+ fun () ->
+ TailFun = TxFun(),
+ TailFun(true),
+ TailFun
+ end),
+ TailFun(false)
+ end.
+
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
@@ -805,3 +856,6 @@ lock_file(Path) ->
false -> {ok, Lock} = file:open(Path, [write]),
ok = file:close(Lock)
end.
+
+const_ok(_) -> ok.
+const(X) -> fun (_) -> X end.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index f939a3fe..f5de0cf6 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -53,37 +53,39 @@ add(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, VHostPath}) of
- [] ->
- ok = mnesia:write(rabbit_vhost,
- #vhost{virtual_host = VHostPath},
- write),
- [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
- ok;
- [_] ->
- mnesia:abort({vhost_already_exists, VHostPath})
+ [] -> ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write);
+ [_] -> mnesia:abort({vhost_already_exists, VHostPath})
end
+ end,
+ fun rabbit_misc:const_ok/1,
+ fun (ok) ->
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
+ ok
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
R.
delete(VHostPath) ->
- %%FIXME: We are forced to delete the queues outside the TX below
- %%because queue deletion involves sending messages to the queue
- %%process, which in turn results in further mnesia actions and
- %%eventually the termination of that process.
- lists:foreach(fun (Q) ->
- {ok,_} = rabbit_amqqueue:delete(Q, false, false)
- end,
- rabbit_amqqueue:list(VHostPath)),
+ %% FIXME: We are forced to delete the queues and exchanges outside
+ %% the TX below. Queue deletion involves sending messages to the queue
+ %% process, which in turn results in further mnesia actions and
+ %% eventually the termination of that process. Exchange deletion causes
+ %% notifications which must be sent outside the TX
+ [{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
+ Q <- rabbit_amqqueue:list(VHostPath)],
+ [ok = rabbit_exchange:delete(Name, false) ||
+ #exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
R = rabbit_misc:execute_mnesia_transaction(
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
@@ -92,10 +94,6 @@ delete(VHostPath) ->
R.
internal_delete(VHostPath) ->
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
lists:foreach(
fun ({Username, _, _, _}) ->
ok = rabbit_auth_backend_internal:clear_permissions(Username,