diff options
-rw-r--r-- | include/rabbit.hrl | 1 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 56 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 19 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 12 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 14 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 6 |
13 files changed, 96 insertions, 39 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9f483c30..99608be4 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,6 +43,7 @@ -record(resource, {virtual_host, kind, name}). -record(exchange, {name, type, durable, auto_delete, internal, arguments}). +-record(exchange_serial, {name, serial}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 45c475d8..8774b6ce 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -17,6 +17,7 @@ -ifdef(use_specs). -spec(description/0 :: () -> [{atom(), any()}]). +-spec(serialise_events/0 :: () -> boolean()). -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7391965..102ea13b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -433,8 +433,8 @@ internal_delete(QueueName) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - fun (Tx) -> ok = rabbit_binding:process_deletions( - Deletions, Tx) + fun (Tx) -> rabbit_binding:process_deletions( + Deletions, Tx) end end end). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7ddb7814..9aacfaa4 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -124,12 +124,7 @@ add(Binding, InnerFun) -> case mnesia:read({rabbit_route, B}) of [] -> ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:write/3), - fun (Tx) -> - ok = rabbit_exchange:callback( - Src, add_binding, [Tx, Src, B]), - rabbit_event:notify_if( - not Tx, binding_created, info(B)) - end; + fun (Tx) -> process_addition(Src, B, Tx) end; [_] -> fun rabbit_misc:const_ok/1 end; {error, _} = Err -> @@ -161,7 +156,7 @@ remove(Binding, InnerFun) -> {error, _} = Err -> rabbit_misc:const(Err); {ok, Deletions} -> - fun (Tx) -> ok = process_deletions(Deletions, Tx) end + fun (Tx) -> process_deletions(Deletions, Tx) end end end). @@ -404,19 +399,54 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> +process_addition(Src, B, State) -> + Serial = serial(Src, State, fun (_, S) -> S end), + Tx = State =:= transaction, + Arg = case Tx of true -> transaction; _ -> Serial end, + ok = rabbit_exchange:callback(Src, add_binding, [Arg, Src, B]), + rabbit_event:notify_if(not Tx, binding_created, info(B)), + case Tx of true -> Serial; false -> ok end. + +process_deletions(Deletions, State) -> + Tx = State =:= transaction, + Next = dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> + fun (_XName, {X, Deleted, Bindings}, Serials) -> FlatBindings = lists:flatten(Bindings), [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || B <- FlatBindings], case Deleted of not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); + Serial = serial(X, State, fun dict:fetch/2), + Arg = case Tx of true -> transaction; _ -> Serial end, + ok = rabbit_exchange:callback(X, remove_bindings, + [Arg, X, FlatBindings]), + dict:store(X, Serial, Serials); deleted -> rabbit_event:notify_if(not Tx, exchange_deleted, [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) + ok = rabbit_exchange:callback(X, delete, + [Tx, X, FlatBindings]), + Serials end - end, ok, Deletions). + end, dict:new(), Deletions), + case Tx of true -> Next; false -> ok end. + +serial(X, State, Fun) -> + case rabbit_exchange:callback(X, serialise_events, []) of + true -> case State of + transaction -> incr_serial(X); + _ -> Fun(X, State) + end; + false -> none + end. + +incr_serial(#exchange{name = Name}) -> + Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of + [] -> 0; + [#exchange_serial{serial = S}] -> S + end, + Serial = Prev + 1, + mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = Name, serial = Serial}, write), + Serial. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a463e570..09648fcf 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -264,12 +264,13 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, [QName | QNames]}. call_with_exchange(XName, Fun, PrePostCommitFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end - end, PrePostCommitFun). + rabbit_misc:execute_mnesia_tx_with_tail( + fun () -> Result = case mnesia:read({rabbit_exchange, XName}) of + [] -> {error, not_found}; + [X] -> Fun(X) + end, + fun(Tx) -> PrePostCommitFun(Result, Tx) end + end). delete(XName, IfUnused) -> call_with_exchange( @@ -279,9 +280,9 @@ delete(XName, IfUnused) -> false -> fun unconditional_delete/1 end, fun ({deleted, X, Bs, Deletions}, Tx) -> - ok = rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); + rabbit_binding:process_deletions( + rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), Tx); (Error = {error, _InUseOrNotFound}, _Tx) -> Error end). diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 547583e9..b34d1aec 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,6 +21,12 @@ behaviour_info(callbacks) -> [ {description, 0}, + + %% Should Rabbit ensure that all events delivered to this + %% exchange can be serialised (they might still be delivered out + %% of order, but there'll be a serial number). + {serialise_events, 0}, + {route, 2}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 349c2f6e..d1ea62f3 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/0]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"direct">>}, {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = Routes}}) -> rabbit_router:match_routing_key(Name, Routes). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index bc5293c8..9b6e68d8 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/0]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -35,6 +35,8 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index d3529b06..1480afc8 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/0]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -41,6 +41,8 @@ description() -> [{name, <<"headers">>}, {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + route(#exchange{name = Name}, #delivery{message = #basic_message{content = Content}}) -> Headers = case (Content#content.properties)#'P_basic'.headers of diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index ffd1e583..9a9cbc47 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). --export([description/0, route/2]). +-export([description/0, route/2, serialise_events/0]). -export([validate/1, create/2, recover/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -38,6 +38,8 @@ description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. +serialise_events() -> false. + %% NB: This may return duplicate results in some situations (that's ok) route(#exchange{name = X}, #delivery{message = #basic_message{routing_keys = Routes}}) -> @@ -62,12 +64,12 @@ delete(true, #exchange{name = X}, _Bs) -> delete(false, _Exchange, _Bs) -> ok. -add_binding(true, _Exchange, Binding) -> +add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); -add_binding(false, _Exchange, _Binding) -> +add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(true, #exchange{name = X}, Bs) -> +remove_bindings(transaction, #exchange{name = X}, Bs) -> %% The remove process is split into two distinct phases. In the %% first phase we gather the lists of bindings and edges to %% delete, then in the second phase we process all the @@ -86,7 +88,7 @@ remove_bindings(true, #exchange{name = X}, Bs) -> [trie_remove_edge(X, Parent, Node, W) || {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], ok; -remove_bindings(false, _X, _Bs) -> +remove_bindings(none, _X, _Bs) -> ok. maybe_add_path(_X, [{root, none}], PathAcc) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e79a58a1..a869a72c 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -409,13 +409,13 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> TailFun = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - TailFun1(true), - TailFun1 - end), - TailFun(false) + false -> {TailFun, TailRes} = execute_mnesia_transaction( + fun () -> + TailFun1 = TxFun(), + Res1 = TailFun1(transaction), + {TailFun1, Res1} + end), + TailFun(TailRes) end. ensure_ok(ok, _) -> ok; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 66436920..3d010acf 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -208,6 +208,10 @@ table_definitions() -> [{record_name, exchange}, {attributes, record_info(fields, exchange)}, {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, {rabbit_durable_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b9dbe418..8b3b833c 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -26,6 +26,7 @@ -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). -rabbit_upgrade({topic_trie, []}). +-rabbit_upgrade({exchange_event_serialisation, []}). %% ------------------------------------------------------------------- @@ -37,6 +38,7 @@ -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). -spec(topic_trie/0 :: () -> 'ok'). +-spec(exchange_event_serialisation/0 :: () -> 'ok'). -endif. @@ -101,6 +103,10 @@ topic_trie() -> {attributes, [trie_binding, value]}, {type, ordered_set}]). +exchange_event_serialisation() -> + create(rabbit_exchange_serial, [{record_name, exchange_serial}, + {attributes, [name, serial]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |