diff options
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r-- | src/rabbit_exchange.erl | 177 |
1 files changed, 113 insertions, 64 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 92259195..afa48355 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,12 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, - info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). --export([callback/3]). -%% this must be run inside a mnesia tx --export([maybe_auto_delete/1]). --export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). +-export([recover/0, callback/3, declare/6, + assert_equivalence/6, assert_args_equivalence/2, check_type/1, + lookup/1, lookup_or_die/1, list/1, update_scratch/2, + info_keys/0, info/1, info/2, info_all/1, info_all/2, + route/2, delete/2]). +%% these must be run inside a mnesia tx +-export([maybe_auto_delete/1, serial/1, peek_serial/1]). %%---------------------------------------------------------------------------- @@ -33,8 +34,10 @@ -type(name() :: rabbit_types:r('exchange')). -type(type() :: atom()). +-type(fun_name() :: atom()). --spec(recover/0 :: () -> 'ok'). +-spec(recover/0 :: () -> [name()]). +-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -55,6 +58,7 @@ (name()) -> rabbit_types:exchange() | rabbit_types:channel_exit()). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]). +-spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok'). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()). -spec(info/2 :: @@ -62,9 +66,9 @@ -> rabbit_types:infos()). -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) - -> [rabbit_types:infos()]). --spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> {rabbit_router:routing_result(), [pid()]}). + -> [rabbit_types:infos()]). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> [rabbit_amqqueue:name()]). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | @@ -72,7 +76,8 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). +-spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined'). -endif. @@ -81,25 +86,22 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> - Xs = rabbit_misc:table_fold( - fun (X, Acc) -> - ok = mnesia:write(rabbit_exchange, X, write), - [X | Acc] - end, [], rabbit_durable_exchange), - Bs = rabbit_binding:recover(), - recover_with_bindings( - lists:keysort(#binding.source, Bs), - lists:keysort(#exchange.name, Xs), []). - -recover_with_bindings([B = #binding{source = XName} | Rest], - Xs = [#exchange{name = XName} | _], - Bindings) -> - recover_with_bindings(Rest, Xs, [B | Bindings]); -recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> - (type_to_module(Type)):recover(X, Bindings), - recover_with_bindings(Bs, Xs, []); -recover_with_bindings([], [], []) -> - ok. + Xs = rabbit_misc:table_filter( + fun (#exchange{name = XName}) -> + mnesia:read({rabbit_exchange, XName}) =:= [] + end, + fun (X, Tx) -> + case Tx of + true -> store(X); + false -> ok + end, + rabbit_exchange:callback(X, create, [map_create_tx(Tx), X]) + end, + rabbit_durable_exchange), + [XName || #exchange{name = XName} <- Xs]. + +callback(#exchange{type = XType}, Fun, Args) -> + apply(type_to_module(XType), Fun, Args). declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, @@ -108,13 +110,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> auto_delete = AutoDelete, internal = Internal, arguments = Args}, + XT = type_to_module(Type), %% We want to upset things if it isn't ok - ok = (type_to_module(Type)):validate(X), + ok = XT:validate(X), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, X, write), + store(X), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, X, write); @@ -126,7 +129,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - callback(Exchange, create, [Tx, Exchange]), + ok = XT:create(map_create_tx(Tx), Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -135,10 +138,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). -%% Used with atoms from records; e.g., the type is expected to exist. -type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. +map_create_tx(true) -> transaction; +map_create_tx(false) -> none. + +store(X = #exchange{name = Name, type = Type}) -> + ok = mnesia:write(rabbit_exchange, X, write), + case (type_to_module(Type)):serialise_events() of + true -> S = #exchange_serial{name = Name, next = 1}, + ok = mnesia:write(rabbit_exchange_serial, S, write); + false -> ok + end. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> @@ -191,6 +200,23 @@ list(VHostPath) -> rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). +update_scratch(Name, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:wread({rabbit_exchange, Name}) of + [X = #exchange{durable = Durable, scratch = Scratch}] -> + X1 = X#exchange{scratch = Fun(Scratch)}, + ok = mnesia:write(rabbit_exchange, X1, write), + case Durable of + true -> ok = mnesia:write(rabbit_durable_exchange, + X1, write); + _ -> ok + end; + [] -> + ok + end + end). + info_keys() -> ?INFO_KEYS. map(VHostPath, F) -> @@ -216,21 +242,19 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X = #exchange{name = XName}, Delivery) -> - rabbit_router:deliver( - route(Delivery, {queue:from_list([X]), XName, []}), - Delivery). +route(X = #exchange{name = XName}, Delivery) -> + route1(Delivery, {queue:from_list([X]), XName, []}). -route(Delivery, {WorkList, SeenXs, QNames}) -> +route1(Delivery, {WorkList, SeenXs, QNames}) -> case queue:out(WorkList) of {empty, _WorkList} -> lists:usort(QNames); {{value, X = #exchange{type = Type}}, WorkList1} -> DstNames = process_alternate( X, ((type_to_module(Type)):route(X, Delivery))), - route(Delivery, - lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, - DstNames)) + route1(Delivery, + lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames}, + DstNames)) end. process_alternate(#exchange{name = XName, arguments = Args}, []) -> @@ -263,27 +287,30 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun, PrePostCommitFun) -> - rabbit_misc:execute_mnesia_transaction( +call_with_exchange(XName, Fun) -> + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end - end, PrePostCommitFun). + [] -> rabbit_misc:const({error, not_found}); + [X] -> Fun(X) + end + end). delete(XName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, call_with_exchange( XName, - case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - fun ({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); - (Error = {error, _InUseOrNotFound}, _Tx) -> - Error + fun (X) -> + case Fun(X) of + {deleted, X, Bs, Deletions} -> + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions)); + {error, _InUseOrNotFound} = E -> + rabbit_misc:const(E) + end end). maybe_auto_delete(#exchange{auto_delete = false}) -> @@ -294,9 +321,6 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. -callback(#exchange{type = XType}, Fun, Args) -> - apply(type_to_module(XType), Fun, Args). - conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); @@ -306,5 +330,30 @@ conditional_delete(X = #exchange{name = XName}) -> unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_durable_exchange, XName}), ok = mnesia:delete({rabbit_exchange, XName}), + ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + +serial(#exchange{name = XName, type = Type}) -> + case (type_to_module(Type)):serialise_events() of + true -> next_serial(XName); + false -> none + end. + +next_serial(XName) -> + [#exchange_serial{next = Serial}] = + mnesia:read(rabbit_exchange_serial, XName, write), + ok = mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = XName, next = Serial + 1}, write), + Serial. + +peek_serial(XName) -> + case mnesia:read({rabbit_exchange_serial, XName}) of + [#exchange_serial{next = Serial}] -> Serial; + _ -> undefined + end. + +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + {ok, Module} = rabbit_registry:lookup_module(exchange, T), + Module. |