summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2010-11-29 12:57:55 +0000
committerEmile Joubert <emile@rabbitmq.com>2010-11-29 12:57:55 +0000
commit30f878d2e519fbe872aef0018ba4f758ea549f11 (patch)
treeeeaa6194715dd46f9f3fb951848367f92e8bbefa
parent30e11c31fc7e0149384af3099630ce12b18db44d (diff)
downloadrabbitmq-server-30f878d2e519fbe872aef0018ba4f758ea549f11.tar.gz
Introduce some abstraction, reducing duplicate calls
-rw-r--r--src/rabbit_access_control.erl57
-rw-r--r--src/rabbit_amqqueue.erl48
-rw-r--r--src/rabbit_binding.erl109
-rw-r--r--src/rabbit_event.erl9
-rw-r--r--src/rabbit_exchange.erl61
-rw-r--r--src/rabbit_misc.erl18
6 files changed, 154 insertions, 148 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index f574507b..b47e4f1e 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -310,35 +310,29 @@ add_vhost(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, VHostPath}) of
- [] ->
- ok = mnesia:write(rabbit_vhost,
- #vhost{virtual_host = VHostPath},
- write),
- {ok, [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, % 0-9-1 pdf
- {<<"amq.headers">>, headers}, % 0-9-1 xml
- {<<"amq.fanout">>, fanout}]]};
- [_] ->
- mnesia:abort({vhost_already_exists, VHostPath})
+ [] -> ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write);
+ [_] -> mnesia:abort({vhost_already_exists, VHostPath})
end
- end),
+ end,
+ fun (Arg, true) -> Arg;
+ (_Arg, false) ->
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
+ ok
+ end
+ ),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
- case R of
- {ok, Xs} -> lists:map(
- fun(X = #exchange{type = Type}) ->
- rabbit_event:notify(exchange_created,
- rabbit_exchange:info(X))
- rabbit_exchange:maybe_callback(Type, create, [X]),
- end, Xs),
- ok;
- _ -> R
- end.
+ R.
delete_vhost(VHostPath) ->
%%FIXME: We are forced to delete the queues outside the TX below
@@ -349,6 +343,11 @@ delete_vhost(VHostPath) ->
{ok,_} = rabbit_amqqueue:delete(Q, false, false)
end,
rabbit_amqqueue:list(VHostPath)),
+ %%Exchange deletion causes notifications which must be sent outside the TX
+ lists:foreach(fun (#exchange{name = Name}) ->
+ ok = rabbit_exchange:delete(Name, false)
+ end,
+ rabbit_exchange:list(VHostPath)),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_vhost(
VHostPath,
@@ -359,10 +358,6 @@ delete_vhost(VHostPath) ->
R.
internal_delete_vhost(VHostPath) ->
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
lists:foreach(fun ({Username, _, _, _}) ->
ok = clear_permissions(Username, VHostPath)
end,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5cdd0e3c..e4971958 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -220,21 +220,24 @@ internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
case Recover of
true ->
ok = store_queue(Q),
- Q;
+ {false, Q};
false ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue,
QueueName}) of
[] -> ok = store_queue(Q),
- ok = add_default_binding(Q),
- Q;
- [_] -> not_found %% Q exists on stopped node
+ {true, Q};
+ %% Q exists on stopped node
+ [_] -> {false, not_found}
end;
[ExistingQ] ->
- ExistingQ
+ {false, ExistingQ}
end
end
+ end,
+ fun ({true, Q}, false) -> ok = add_default_binding(Q), Q;
+ ({_AddBinding, Arg}, _Tx) -> Arg
end).
store_queue(Q = #amqqueue{durable = true}) ->
@@ -452,16 +455,17 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- case rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[_] -> internal_delete1(QueueName)
end
- end) of
- {error, _} = Err -> Err;
- Deletions -> ok = rabbit_binding:process_deletions(Deletions)
- end.
+ end,
+ fun ({error, _} = Err, _Tx) -> Err;
+ (Deletions, Tx) ->
+ ok = rabbit_binding:process_deletions(Deletions, Tx)
+ end).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
@@ -482,16 +486,20 @@ drop_expired(QPid) ->
gen_server2:cast(QPid, drop_expired).
on_node_down(Node) ->
- rabbit_binding:process_deletions(
- lists:foldl(
- fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- rabbit_misc:execute_mnesia_transaction(
- fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end))).
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end,
+ fun (Deletions, Tx) ->
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ Deletions),
+ Tx)
+ end).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 5131400c..19c7faee 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -36,7 +36,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/1]).
+ process_deletions/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -91,7 +91,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/1 :: (deletions()) -> 'ok').
+-spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -119,16 +119,17 @@ recover() ->
exists(Binding) ->
binding_action(
Binding,
- fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end,
+ fun (_, _) -> ok end).
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
- case binding_action(
+ binding_action(
Binding,
- fun (Src = #exchange{type = Type}, Dst, B) ->
+ fun (Src, Dst, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
@@ -138,26 +139,24 @@ add(Binding, InnerFun) ->
[] -> ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:write/3),
- ok = rabbit_exchange:maybe_callback(
- Type, add_binding, [Src, B]),
{new, Src, B};
[_] -> {existing, Src, B}
end;
{error, _} = E ->
E
end
- end) of
- {new, Src = #exchange{type = Type}, B} ->
- ok = rabbit_exchange:maybe_callback(Type, add_binding, [Src, B]),
- rabbit_event:notify(binding_created, info(B));
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
+ end,
+ fun({new, Src, B}, Tx) ->
+ ok = rabbit_exchange:callback(Src, add_binding, [Src, B], Tx),
+ rabbit_event:notify(binding_created, info(B), Tx);
+ ({existing, _, _}, _Tx) ->
+ ok;
+ ({error, _} = Err, _Tx) ->
+ Err
+ end).
remove(Binding, InnerFun) ->
- case binding_action(
+ binding_action(
Binding,
fun (Src, Dst, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
@@ -170,21 +169,19 @@ remove(Binding, InnerFun) ->
ok = sync_binding(
B, all_durable([Src, Dst]),
fun mnesia:delete_object/3),
- Deletions = maybe_auto_delete(
- B#binding.source,
- [B], new_deletions()),
- ok = process_deletions(Deletions),
- {ok, Deletions};
+ {ok,
+ maybe_auto_delete(B#binding.source,
+ [B], new_deletions())};
{error, _} = E ->
E
end
end
- end) of
- {error, _} = Err ->
- Err;
- {ok, Deletions} ->
- ok = process_deletions(Deletions)
- end.
+ end,
+ fun ({error, _} = Err, _Tx) ->
+ Err;
+ ({ok, Deletions}, Tx) ->
+ ok = process_deletions(Deletions, Tx)
+ end).
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
@@ -272,13 +269,14 @@ all_durable(Resources) ->
binding_action(Binding = #binding{source = SrcName,
destination = DstName,
- args = Arguments}, Fun) ->
+ args = Arguments}, Fun, TriggerFun) ->
call_with_source_and_destination(
SrcName, DstName,
fun (Src, Dst) ->
SortedArgs = rabbit_misc:sort_field_table(Arguments),
Fun(Src, Dst, Binding#binding{args = SortedArgs})
- end).
+ end,
+ TriggerFun).
sync_binding(Binding, Durable, Fun) ->
ok = case Durable of
@@ -291,7 +289,7 @@ sync_binding(Binding, Durable, Fun) ->
ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
-call_with_source_and_destination(SrcName, DstName, Fun) ->
+call_with_source_and_destination(SrcName, DstName, Fun, TriggerFun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
rabbit_misc:execute_mnesia_transaction(
@@ -302,7 +300,8 @@ call_with_source_and_destination(SrcName, DstName, Fun) ->
{[_], [] } -> {error, destination_not_found};
{[], [] } -> {error, source_and_destination_not_found}
end
- end).
+ end,
+ TriggerFun).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
@@ -422,34 +421,20 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions) ->
- Tx = mnesia:is_transaction(),
- NonTxFun =
- if Tx -> fun(_X, _D, _B) -> ok end;
- not Tx ->
- fun (#exchange{name = XName}, Deleted, FlatBindings) ->
- [rabbit_event:notify(binding_deleted, info(B)) ||
- B <- FlatBindings],
- case Deleted of
- not_deleted -> ok;
- deleted -> rabbit_event:notify(exchange_deleted,
- [{name, XName}])
- end
- end
- end,
- Fun =
- fun (X = #exchange{type = Type}, Deleted, FlatBindings) ->
- case Deleted of
- not_deleted -> rabbit_exchange:maybe_callback(
- Type, remove_bindings, [X, FlatBindings]);
- deleted -> rabbit_exchange:maybe_callback(
- Type, delete, [X, FlatBindings])
- end
- end,
+process_deletions(Deletions, Tx) ->
dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- NonTxFun(X, Deleted, FlatBindings)
- Fun(X, Deleted, FlatBindings),
- end, ok, Deletions).
-
+ fun (_XName, {X, Deleted, Bindings}, ok) ->
+ FlatBindings = lists:flatten(Bindings),
+ [rabbit_event:notify(binding_deleted, info(B), Tx) ||
+ B <- FlatBindings],
+ case Deleted of
+ not_deleted -> rabbit_exchange:callback(X, remove_bindings,
+ [X, FlatBindings], Tx);
+ deleted -> rabbit_event:notify(
+ exchange_deleted,
+ [{name, X#exchange.name}],
+ Tx),
+ rabbit_exchange:callback(X, delete,
+ [X, FlatBindings], Tx)
+ end
+ end, ok, Deletions).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 2b236531..c1d92f73 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -37,7 +37,7 @@
-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
-export([reset_stats_timer/1]).
-export([stats_level/1, if_enabled/2]).
--export([notify/2]).
+-export([notify/2, notify/3]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,7 @@
-spec(stats_level/1 :: (state()) -> level()).
-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(notify/3 :: (event_type(), event_props(), boolean()) -> 'ok').
-endif.
@@ -140,6 +141,12 @@ if_enabled(_State, Fun) ->
Fun(),
ok.
+notify(Type, Props, Tx) ->
+ case Tx of
+ false -> notify(Type, Props);
+ true -> ok
+ end.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 07883118..3eb7e0ff 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
%% this must be run inside a mnesia tx
--export([maybe_auto_delete/1, maybe_callback/3]).
+-export([maybe_auto_delete/1, callback/4]).
-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -85,7 +85,7 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(maybe_callback/3:: (atom(), atom(), [any()]) -> 'ok').
+-spec(callback/4:: (rabbit_types:exchange(), atom(), [any()], boolean()) -> 'ok').
-endif.
@@ -123,7 +123,7 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
%% We want to upset things if it isn't ok
TypeModule = type_to_module(Type),
ok = TypeModule:validate(X),
- case rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
@@ -135,19 +135,18 @@ declare(XName, Type, Durable, AutoDelete, Args) ->
false ->
ok
end,
- ok = maybe_callback(Type, create, [X]),
- {new, X, mnesia:is_transaction()};
+ {new, X};
[ExistingX] ->
{existing, ExistingX}
end
- end) of
- {new, X, false} -> ok = maybe_callback(Type, create, [X]),
- rabbit_event:notify(exchange_created, info(X)),
- X;
- {new, X, true} -> X;
- {existing, X} -> X;
- Err -> Err
- end.
+ end,
+ fun({Status, Exchange}, Tx)
+ when Status =:= new; Status =:= existing ->
+ rabbit_exchange:callback(Exchange, create, [Exchange], Tx),
+ rabbit_event:notify(exchange_created, info(Exchange), Tx),
+ Exchange;
+ (Err, _Tx) -> Err
+ end).
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
@@ -274,27 +273,26 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun) ->
+call_with_exchange(XName, Fun, TriggerFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
- end).
+ end, TriggerFun).
delete(XName, IfUnused) ->
- Fun = case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- case call_with_exchange(XName, Fun) of
- {deleted, X, Bs, Deletions} ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions));
- Error = {error, _InUseOrNotFound} ->
- Error
- end.
+ call_with_exchange(XName,
+ case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
+ fun({deleted, X, Bs, Deletions}, Tx) ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions), Tx);
+ (Error = {error, _InUseOrNotFound}, _Tx) -> Error
+ end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
@@ -304,12 +302,8 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-%% Possible callback, depending on whether mnesia is in a transaction
-%% and whether transactional callbacks were requested by the exchange
-%% The type is expected to exist.
-maybe_callback(XType, Fun, Args) ->
+callback(#exchange{type = XType}, Fun, Args, Tx) ->
{ok, [Module, ModTx]} = rabbit_exchange_type_registry:lookup_module(XType),
- Tx = mnesia:is_transaction(),
if (ModTx =:= Tx) -> ok = apply(Module, Fun, Args);
true -> ok
end.
@@ -320,9 +314,8 @@ conditional_delete(X = #exchange{name = XName}) ->
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = XName, type = Type}) ->
+unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
- ok = maybe_callback(Type, delete, [X, lists:flatten(Bindings)]),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 230f4db5..3aed723d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -48,6 +48,7 @@
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
+-export([execute_mnesia_transaction/2]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
@@ -147,6 +148,8 @@
(rabbit_access_control:username(), rabbit_types:vhost(), thunk(A))
-> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
+-spec(execute_mnesia_transaction/2 ::
+ (thunk(A), fun ((A, boolean()) -> B)) -> B).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(makenode/1 :: ({string(), string()} | string()) -> node()).
-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
@@ -389,6 +392,21 @@ execute_mnesia_transaction(TxFun) ->
{aborted, Reason} -> throw({error, Reason})
end.
+%% Like execute_mnesia_transaction/2, with an additional Fun that gets called
+%% immediately before and after the mnesia tx commit. It gets called with the
+%% result of TxFun and a flag indicating whether mnesia is in a tx.
+execute_mnesia_transaction(TxFun, TriggerFun) ->
+ Tx = mnesia:is_transaction(),
+ if Tx -> throw(unexpected_transaction);
+ true -> ok
+ end,
+ Result = execute_mnesia_transaction(
+ fun () -> R = TxFun(),
+ TriggerFun(R, true),
+ R
+ end),
+ TriggerFun(Result, false).
+
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).