summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl177
1 files changed, 113 insertions, 64 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 92259195..afa48355 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,12 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
--export([callback/3]).
-%% this must be run inside a mnesia tx
--export([maybe_auto_delete/1]).
--export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
+-export([recover/0, callback/3, declare/6,
+ assert_equivalence/6, assert_args_equivalence/2, check_type/1,
+ lookup/1, lookup_or_die/1, list/1, update_scratch/2,
+ info_keys/0, info/1, info/2, info_all/1, info_all/2,
+ route/2, delete/2]).
+%% these must be run inside a mnesia tx
+-export([maybe_auto_delete/1, serial/1, peek_serial/1]).
%%----------------------------------------------------------------------------
@@ -33,8 +34,10 @@
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
+-type(fun_name() :: atom()).
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> [name()]).
+-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -55,6 +58,7 @@
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
+-spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok').
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -62,9 +66,9 @@
-> rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
--spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> {rabbit_router:routing_result(), [pid()]}).
+ -> [rabbit_types:infos()]).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> [rabbit_amqqueue:name()]).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
@@ -72,7 +76,8 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
+-spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
-endif.
@@ -81,25 +86,22 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
- Xs = rabbit_misc:table_fold(
- fun (X, Acc) ->
- ok = mnesia:write(rabbit_exchange, X, write),
- [X | Acc]
- end, [], rabbit_durable_exchange),
- Bs = rabbit_binding:recover(),
- recover_with_bindings(
- lists:keysort(#binding.source, Bs),
- lists:keysort(#exchange.name, Xs), []).
-
-recover_with_bindings([B = #binding{source = XName} | Rest],
- Xs = [#exchange{name = XName} | _],
- Bindings) ->
- recover_with_bindings(Rest, Xs, [B | Bindings]);
-recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
- (type_to_module(Type)):recover(X, Bindings),
- recover_with_bindings(Bs, Xs, []);
-recover_with_bindings([], [], []) ->
- ok.
+ Xs = rabbit_misc:table_filter(
+ fun (#exchange{name = XName}) ->
+ mnesia:read({rabbit_exchange, XName}) =:= []
+ end,
+ fun (X, Tx) ->
+ case Tx of
+ true -> store(X);
+ false -> ok
+ end,
+ rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
+ end,
+ rabbit_durable_exchange),
+ [XName || #exchange{name = XName} <- Xs].
+
+callback(#exchange{type = XType}, Fun, Args) ->
+ apply(type_to_module(XType), Fun, Args).
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = #exchange{name = XName,
@@ -108,13 +110,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
+ XT = type_to_module(Type),
%% We want to upset things if it isn't ok
- ok = (type_to_module(Type)):validate(X),
+ ok = XT:validate(X),
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- ok = mnesia:write(rabbit_exchange, X, write),
+ store(X),
ok = case Durable of
true -> mnesia:write(rabbit_durable_exchange,
X, write);
@@ -126,7 +129,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- callback(Exchange, create, [Tx, Exchange]),
+ ok = XT:create(map_create_tx(Tx), Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -135,10 +138,16 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
Err
end).
-%% Used with atoms from records; e.g., the type is expected to exist.
-type_to_module(T) ->
- {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- Module.
+map_create_tx(true) -> transaction;
+map_create_tx(false) -> none.
+
+store(X = #exchange{name = Name, type = Type}) ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ case (type_to_module(Type)):serialise_events() of
+ true -> S = #exchange_serial{name = Name, next = 1},
+ ok = mnesia:write(rabbit_exchange_serial, S, write);
+ false -> ok
+ end.
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -191,6 +200,23 @@ list(VHostPath) ->
rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
+update_scratch(Name, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X = #exchange{durable = Durable, scratch = Scratch}] ->
+ X1 = X#exchange{scratch = Fun(Scratch)},
+ ok = mnesia:write(rabbit_exchange, X1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_exchange,
+ X1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end
+ end).
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -216,21 +242,19 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X = #exchange{name = XName}, Delivery) ->
- rabbit_router:deliver(
- route(Delivery, {queue:from_list([X]), XName, []}),
- Delivery).
+route(X = #exchange{name = XName}, Delivery) ->
+ route1(Delivery, {queue:from_list([X]), XName, []}).
-route(Delivery, {WorkList, SeenXs, QNames}) ->
+route1(Delivery, {WorkList, SeenXs, QNames}) ->
case queue:out(WorkList) of
{empty, _WorkList} ->
lists:usort(QNames);
{{value, X = #exchange{type = Type}}, WorkList1} ->
DstNames = process_alternate(
X, ((type_to_module(Type)):route(X, Delivery))),
- route(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
+ DstNames))
end.
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
@@ -263,27 +287,30 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun, PrePostCommitFun) ->
- rabbit_misc:execute_mnesia_transaction(
+call_with_exchange(XName, Fun) ->
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end
- end, PrePostCommitFun).
+ [] -> rabbit_misc:const({error, not_found});
+ [X] -> Fun(X)
+ end
+ end).
delete(XName, IfUnused) ->
+ Fun = case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
call_with_exchange(
XName,
- case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- fun ({deleted, X, Bs, Deletions}, Tx) ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
- (Error = {error, _InUseOrNotFound}, _Tx) ->
- Error
+ fun (X) ->
+ case Fun(X) of
+ {deleted, X, Bs, Deletions} ->
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions));
+ {error, _InUseOrNotFound} = E ->
+ rabbit_misc:const(E)
+ end
end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
@@ -294,9 +321,6 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-callback(#exchange{type = XType}, Fun, Args) ->
- apply(type_to_module(XType), Fun, Args).
-
conditional_delete(X = #exchange{name = XName}) ->
case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
@@ -306,5 +330,30 @@ conditional_delete(X = #exchange{name = XName}) ->
unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_durable_exchange, XName}),
ok = mnesia:delete({rabbit_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+
+serial(#exchange{name = XName, type = Type}) ->
+ case (type_to_module(Type)):serialise_events() of
+ true -> next_serial(XName);
+ false -> none
+ end.
+
+next_serial(XName) ->
+ [#exchange_serial{next = Serial}] =
+ mnesia:read(rabbit_exchange_serial, XName, write),
+ ok = mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = XName, next = Serial + 1}, write),
+ Serial.
+
+peek_serial(XName) ->
+ case mnesia:read({rabbit_exchange_serial, XName}) of
+ [#exchange_serial{next = Serial}] -> Serial;
+ _ -> undefined
+ end.
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
+ Module.