diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-11-29 12:57:55 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-11-29 12:57:55 +0000 |
commit | 30f878d2e519fbe872aef0018ba4f758ea549f11 (patch) | |
tree | eeaa6194715dd46f9f3fb951848367f92e8bbefa | |
parent | 30e11c31fc7e0149384af3099630ce12b18db44d (diff) | |
download | rabbitmq-server-30f878d2e519fbe872aef0018ba4f758ea549f11.tar.gz |
Introduce some abstraction, reducing duplicate calls
-rw-r--r-- | src/rabbit_access_control.erl | 57 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 48 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 109 | ||||
-rw-r--r-- | src/rabbit_event.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 61 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 18 |
6 files changed, 154 insertions, 148 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index f574507b..b47e4f1e 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -310,35 +310,29 @@ add_vhost(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_vhost, VHostPath}) of - [] -> - ok = mnesia:write(rabbit_vhost, - #vhost{virtual_host = VHostPath}, - write), - {ok, [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, % 0-9-1 pdf - {<<"amq.headers">>, headers}, % 0-9-1 xml - {<<"amq.fanout">>, fanout}]]}; - [_] -> - mnesia:abort({vhost_already_exists, VHostPath}) + [] -> ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write); + [_] -> mnesia:abort({vhost_already_exists, VHostPath}) end - end), + end, + fun (Arg, true) -> Arg; + (_Arg, false) -> + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], + ok + end + ), rabbit_log:info("Added vhost ~p~n", [VHostPath]), - case R of - {ok, Xs} -> lists:map( - fun(X = #exchange{type = Type}) -> - rabbit_event:notify(exchange_created, - rabbit_exchange:info(X)) - rabbit_exchange:maybe_callback(Type, create, [X]), - end, Xs), - ok; - _ -> R - end. + R. delete_vhost(VHostPath) -> %%FIXME: We are forced to delete the queues outside the TX below @@ -349,6 +343,11 @@ delete_vhost(VHostPath) -> {ok,_} = rabbit_amqqueue:delete(Q, false, false) end, rabbit_amqqueue:list(VHostPath)), + %%Exchange deletion causes notifications which must be sent outside the TX + lists:foreach(fun (#exchange{name = Name}) -> + ok = rabbit_exchange:delete(Name, false) + end, + rabbit_exchange:list(VHostPath)), R = rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_vhost( VHostPath, @@ -359,10 +358,6 @@ delete_vhost(VHostPath) -> R. internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name = Name}) -> - ok = rabbit_exchange:delete(Name, false) - end, - rabbit_exchange:list(VHostPath)), lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 5cdd0e3c..e4971958 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -220,21 +220,24 @@ internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> case Recover of true -> ok = store_queue(Q), - Q; + {false, Q}; false -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), - ok = add_default_binding(Q), - Q; - [_] -> not_found %% Q exists on stopped node + {true, Q}; + %% Q exists on stopped node + [_] -> {false, not_found} end; [ExistingQ] -> - ExistingQ + {false, ExistingQ} end end + end, + fun ({true, Q}, false) -> ok = add_default_binding(Q), Q; + ({_AddBinding, Arg}, _Tx) -> Arg end). store_queue(Q = #amqqueue{durable = true}) -> @@ -452,16 +455,17 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - case rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [_] -> internal_delete1(QueueName) end - end) of - {error, _} = Err -> Err; - Deletions -> ok = rabbit_binding:process_deletions(Deletions) - end. + end, + fun ({error, _} = Err, _Tx) -> Err; + (Deletions, Tx) -> + ok = rabbit_binding:process_deletions(Deletions, Tx) + end). maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). @@ -482,16 +486,20 @@ drop_expired(QPid) -> gen_server2:cast(QPid, drop_expired). on_node_down(Node) -> - rabbit_binding:process_deletions( - lists:foldl( - fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - rabbit_misc:execute_mnesia_transaction( - fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end))). + rabbit_misc:execute_mnesia_transaction( + fun () -> qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end, + fun (Deletions, Tx) -> + rabbit_binding:process_deletions( + lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), + Deletions), + Tx) + end). delete_queue(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 5131400c..19c7faee 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -36,7 +36,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/1]). + process_deletions/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -91,7 +91,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/1 :: (deletions()) -> 'ok'). +-spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -119,16 +119,17 @@ recover() -> exists(Binding) -> binding_action( Binding, - fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end). + fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end, + fun (_, _) -> ok end). add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end). remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). add(Binding, InnerFun) -> - case binding_action( + binding_action( Binding, - fun (Src = #exchange{type = Type}, Dst, B) -> + fun (Src, Dst, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else @@ -138,26 +139,24 @@ add(Binding, InnerFun) -> [] -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:write/3), - ok = rabbit_exchange:maybe_callback( - Type, add_binding, [Src, B]), {new, Src, B}; [_] -> {existing, Src, B} end; {error, _} = E -> E end - end) of - {new, Src = #exchange{type = Type}, B} -> - ok = rabbit_exchange:maybe_callback(Type, add_binding, [Src, B]), - rabbit_event:notify(binding_created, info(B)); - {existing, _, _} -> - ok; - {error, _} = Err -> - Err - end. + end, + fun({new, Src, B}, Tx) -> + ok = rabbit_exchange:callback(Src, add_binding, [Src, B], Tx), + rabbit_event:notify(binding_created, info(B), Tx); + ({existing, _, _}, _Tx) -> + ok; + ({error, _} = Err, _Tx) -> + Err + end). remove(Binding, InnerFun) -> - case binding_action( + binding_action( Binding, fun (Src, Dst, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, @@ -170,21 +169,19 @@ remove(Binding, InnerFun) -> ok = sync_binding( B, all_durable([Src, Dst]), fun mnesia:delete_object/3), - Deletions = maybe_auto_delete( - B#binding.source, - [B], new_deletions()), - ok = process_deletions(Deletions), - {ok, Deletions}; + {ok, + maybe_auto_delete(B#binding.source, + [B], new_deletions())}; {error, _} = E -> E end end - end) of - {error, _} = Err -> - Err; - {ok, Deletions} -> - ok = process_deletions(Deletions) - end. + end, + fun ({error, _} = Err, _Tx) -> + Err; + ({ok, Deletions}, Tx) -> + ok = process_deletions(Deletions, Tx) + end). list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), @@ -272,13 +269,14 @@ all_durable(Resources) -> binding_action(Binding = #binding{source = SrcName, destination = DstName, - args = Arguments}, Fun) -> + args = Arguments}, Fun, TriggerFun) -> call_with_source_and_destination( SrcName, DstName, fun (Src, Dst) -> SortedArgs = rabbit_misc:sort_field_table(Arguments), Fun(Src, Dst, Binding#binding{args = SortedArgs}) - end). + end, + TriggerFun). sync_binding(Binding, Durable, Fun) -> ok = case Durable of @@ -291,7 +289,7 @@ sync_binding(Binding, Durable, Fun) -> ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. -call_with_source_and_destination(SrcName, DstName, Fun) -> +call_with_source_and_destination(SrcName, DstName, Fun, TriggerFun) -> SrcTable = table_for_resource(SrcName), DstTable = table_for_resource(DstName), rabbit_misc:execute_mnesia_transaction( @@ -302,7 +300,8 @@ call_with_source_and_destination(SrcName, DstName, Fun) -> {[_], [] } -> {error, destination_not_found}; {[], [] } -> {error, source_and_destination_not_found} end - end). + end, + TriggerFun). table_for_resource(#resource{kind = exchange}) -> rabbit_exchange; table_for_resource(#resource{kind = queue}) -> rabbit_queue. @@ -422,34 +421,20 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions) -> - Tx = mnesia:is_transaction(), - NonTxFun = - if Tx -> fun(_X, _D, _B) -> ok end; - not Tx -> - fun (#exchange{name = XName}, Deleted, FlatBindings) -> - [rabbit_event:notify(binding_deleted, info(B)) || - B <- FlatBindings], - case Deleted of - not_deleted -> ok; - deleted -> rabbit_event:notify(exchange_deleted, - [{name, XName}]) - end - end - end, - Fun = - fun (X = #exchange{type = Type}, Deleted, FlatBindings) -> - case Deleted of - not_deleted -> rabbit_exchange:maybe_callback( - Type, remove_bindings, [X, FlatBindings]); - deleted -> rabbit_exchange:maybe_callback( - Type, delete, [X, FlatBindings]) - end - end, +process_deletions(Deletions, Tx) -> dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - NonTxFun(X, Deleted, FlatBindings) - Fun(X, Deleted, FlatBindings), - end, ok, Deletions). - + fun (_XName, {X, Deleted, Bindings}, ok) -> + FlatBindings = lists:flatten(Bindings), + [rabbit_event:notify(binding_deleted, info(B), Tx) || + B <- FlatBindings], + case Deleted of + not_deleted -> rabbit_exchange:callback(X, remove_bindings, + [X, FlatBindings], Tx); + deleted -> rabbit_event:notify( + exchange_deleted, + [{name, X#exchange.name}], + Tx), + rabbit_exchange:callback(X, delete, + [X, FlatBindings], Tx) + end + end, ok, Deletions). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 2b236531..c1d92f73 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -37,7 +37,7 @@ -export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]). -export([reset_stats_timer/1]). -export([stats_level/1, if_enabled/2]). --export([notify/2]). +-export([notify/2, notify/3]). %%---------------------------------------------------------------------------- @@ -77,6 +77,7 @@ -spec(stats_level/1 :: (state()) -> level()). -spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). +-spec(notify/3 :: (event_type(), event_props(), boolean()) -> 'ok'). -endif. @@ -140,6 +141,12 @@ if_enabled(_State, Fun) -> Fun(), ok. +notify(Type, Props, Tx) -> + case Tx of + false -> notify(Type, Props); + true -> ok + end. + notify(Type, Props) -> try %% TODO: switch to os:timestamp() when we drop support for diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 07883118..3eb7e0ff 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% this must be run inside a mnesia tx --export([maybe_auto_delete/1, maybe_callback/3]). +-export([maybe_auto_delete/1, callback/4]). -export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -85,7 +85,7 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(maybe_callback/3:: (atom(), atom(), [any()]) -> 'ok'). +-spec(callback/4:: (rabbit_types:exchange(), atom(), [any()], boolean()) -> 'ok'). -endif. @@ -123,7 +123,7 @@ declare(XName, Type, Durable, AutoDelete, Args) -> %% We want to upset things if it isn't ok TypeModule = type_to_module(Type), ok = TypeModule:validate(X), - case rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> @@ -135,19 +135,18 @@ declare(XName, Type, Durable, AutoDelete, Args) -> false -> ok end, - ok = maybe_callback(Type, create, [X]), - {new, X, mnesia:is_transaction()}; + {new, X}; [ExistingX] -> {existing, ExistingX} end - end) of - {new, X, false} -> ok = maybe_callback(Type, create, [X]), - rabbit_event:notify(exchange_created, info(X)), - X; - {new, X, true} -> X; - {existing, X} -> X; - Err -> Err - end. + end, + fun({Status, Exchange}, Tx) + when Status =:= new; Status =:= existing -> + rabbit_exchange:callback(Exchange, create, [Exchange], Tx), + rabbit_event:notify(exchange_created, info(Exchange), Tx), + Exchange; + (Err, _Tx) -> Err + end). %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> @@ -274,27 +273,26 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun) -> +call_with_exchange(XName, Fun, TriggerFun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end - end). + end, TriggerFun). delete(XName, IfUnused) -> - Fun = case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - case call_with_exchange(XName, Fun) of - {deleted, X, Bs, Deletions} -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions)); - Error = {error, _InUseOrNotFound} -> - Error - end. + call_with_exchange(XName, + case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + fun({deleted, X, Bs, Deletions}, Tx) -> + ok = rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), Tx); + (Error = {error, _InUseOrNotFound}, _Tx) -> Error + end). maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; @@ -304,12 +302,8 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. -%% Possible callback, depending on whether mnesia is in a transaction -%% and whether transactional callbacks were requested by the exchange -%% The type is expected to exist. -maybe_callback(XType, Fun, Args) -> +callback(#exchange{type = XType}, Fun, Args, Tx) -> {ok, [Module, ModTx]} = rabbit_exchange_type_registry:lookup_module(XType), - Tx = mnesia:is_transaction(), if (ModTx =:= Tx) -> ok = apply(Module, Fun, Args); true -> ok end. @@ -320,9 +314,8 @@ conditional_delete(X = #exchange{name = XName}) -> true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = XName, type = Type}) -> +unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), - ok = maybe_callback(Type, delete, [X, lists:flatten(Bindings)]), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 230f4db5..3aed723d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -48,6 +48,7 @@ -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). +-export([execute_mnesia_transaction/2]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([upmap/2, map_in_order/2]). @@ -147,6 +148,8 @@ (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). +-spec(execute_mnesia_transaction/2 :: + (thunk(A), fun ((A, boolean()) -> B)) -> B). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(makenode/1 :: ({string(), string()} | string()) -> node()). -spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). @@ -389,6 +392,21 @@ execute_mnesia_transaction(TxFun) -> {aborted, Reason} -> throw({error, Reason}) end. +%% Like execute_mnesia_transaction/2, with an additional Fun that gets called +%% immediately before and after the mnesia tx commit. It gets called with the +%% result of TxFun and a flag indicating whether mnesia is in a tx. +execute_mnesia_transaction(TxFun, TriggerFun) -> + Tx = mnesia:is_transaction(), + if Tx -> throw(unexpected_transaction); + true -> ok + end, + Result = execute_mnesia_transaction( + fun () -> R = TxFun(), + TriggerFun(R, true), + R + end), + TriggerFun(Result, false). + ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). |