summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl1
-rw-r--r--include/rabbit_exchange_type_spec.hrl1
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_binding.erl56
-rw-r--r--src/rabbit_exchange.erl19
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl12
-rw-r--r--src/rabbit_misc.erl14
-rw-r--r--src/rabbit_mnesia.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl6
13 files changed, 96 insertions, 39 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 9f483c30..99608be4 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,6 +43,7 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments}).
+-record(exchange_serial, {name, serial}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
arguments, pid}).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 45c475d8..8774b6ce 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -17,6 +17,7 @@
-ifdef(use_specs).
-spec(description/0 :: () -> [{atom(), any()}]).
+-spec(serialise_events/0 :: () -> boolean()).
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7391965..102ea13b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -433,8 +433,8 @@ internal_delete(QueueName) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- fun (Tx) -> ok = rabbit_binding:process_deletions(
- Deletions, Tx)
+ fun (Tx) -> rabbit_binding:process_deletions(
+ Deletions, Tx)
end
end
end).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7ddb7814..9aacfaa4 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -124,12 +124,7 @@ add(Binding, InnerFun) ->
case mnesia:read({rabbit_route, B}) of
[] -> ok = sync_binding(B, all_durable([Src, Dst]),
fun mnesia:write/3),
- fun (Tx) ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
- rabbit_event:notify_if(
- not Tx, binding_created, info(B))
- end;
+ fun (Tx) -> process_addition(Src, B, Tx) end;
[_] -> fun rabbit_misc:const_ok/1
end;
{error, _} = Err ->
@@ -161,7 +156,7 @@ remove(Binding, InnerFun) ->
{error, _} = Err ->
rabbit_misc:const(Err);
{ok, Deletions} ->
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ fun (Tx) -> process_deletions(Deletions, Tx) end
end
end).
@@ -404,19 +399,54 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
+process_addition(Src, B, State) ->
+ Serial = serial(Src, State, fun (_, S) -> S end),
+ Tx = State =:= transaction,
+ Arg = case Tx of true -> transaction; _ -> Serial end,
+ ok = rabbit_exchange:callback(Src, add_binding, [Arg, Src, B]),
+ rabbit_event:notify_if(not Tx, binding_created, info(B)),
+ case Tx of true -> Serial; false -> ok end.
+
+process_deletions(Deletions, State) ->
+ Tx = State =:= transaction,
+ Next =
dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
+ fun (_XName, {X, Deleted, Bindings}, Serials) ->
FlatBindings = lists:flatten(Bindings),
[rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
B <- FlatBindings],
case Deleted of
not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
+ Serial = serial(X, State, fun dict:fetch/2),
+ Arg = case Tx of true -> transaction; _ -> Serial end,
+ ok = rabbit_exchange:callback(X, remove_bindings,
+ [Arg, X, FlatBindings]),
+ dict:store(X, Serial, Serials);
deleted ->
rabbit_event:notify_if(not Tx, exchange_deleted,
[{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
+ ok = rabbit_exchange:callback(X, delete,
+ [Tx, X, FlatBindings]),
+ Serials
end
- end, ok, Deletions).
+ end, dict:new(), Deletions),
+ case Tx of true -> Next; false -> ok end.
+
+serial(X, State, Fun) ->
+ case rabbit_exchange:callback(X, serialise_events, []) of
+ true -> case State of
+ transaction -> incr_serial(X);
+ _ -> Fun(X, State)
+ end;
+ false -> none
+ end.
+
+incr_serial(#exchange{name = Name}) ->
+ Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of
+ [] -> 0;
+ [#exchange_serial{serial = S}] -> S
+ end,
+ Serial = Prev + 1,
+ mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = Name, serial = Serial}, write),
+ Serial.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a463e570..09648fcf 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -264,12 +264,13 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, [QName | QNames]}.
call_with_exchange(XName, Fun, PrePostCommitFun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end
- end, PrePostCommitFun).
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () -> Result = case mnesia:read({rabbit_exchange, XName}) of
+ [] -> {error, not_found};
+ [X] -> Fun(X)
+ end,
+ fun(Tx) -> PrePostCommitFun(Result, Tx) end
+ end).
delete(XName, IfUnused) ->
call_with_exchange(
@@ -279,9 +280,9 @@ delete(XName, IfUnused) ->
false -> fun unconditional_delete/1
end,
fun ({deleted, X, Bs, Deletions}, Tx) ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
+ rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions), Tx);
(Error = {error, _InUseOrNotFound}, _Tx) ->
Error
end).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 547583e9..b34d1aec 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,6 +21,12 @@
behaviour_info(callbacks) ->
[
{description, 0},
+
+ %% Should Rabbit ensure that all events delivered to this
+ %% exchange can be serialised (they might still be delivered out
+ %% of order, but there'll be a serial number).
+ {serialise_events, 0},
+
{route, 2},
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 349c2f6e..d1ea62f3 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/0]).
-export([validate/1, create/2, recover/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -35,6 +35,8 @@ description() ->
[{name, <<"direct">>},
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
rabbit_router:match_routing_key(Name, Routes).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index bc5293c8..9b6e68d8 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/0]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -35,6 +35,8 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index d3529b06..1480afc8 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/0]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -41,6 +41,8 @@ description() ->
[{name, <<"headers">>},
{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
route(#exchange{name = Name},
#delivery{message = #basic_message{content = Content}}) ->
Headers = case (Content#content.properties)#'P_basic'.headers of
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ffd1e583..9a9cbc47 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
+-export([description/0, route/2, serialise_events/0]).
-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -38,6 +38,8 @@ description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+serialise_events() -> false.
+
%% NB: This may return duplicate results in some situations (that's ok)
route(#exchange{name = X},
#delivery{message = #basic_message{routing_keys = Routes}}) ->
@@ -62,12 +64,12 @@ delete(true, #exchange{name = X}, _Bs) ->
delete(false, _Exchange, _Bs) ->
ok.
-add_binding(true, _Exchange, Binding) ->
+add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
-add_binding(false, _Exchange, _Binding) ->
+add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(true, #exchange{name = X}, Bs) ->
+remove_bindings(transaction, #exchange{name = X}, Bs) ->
%% The remove process is split into two distinct phases. In the
%% first phase we gather the lists of bindings and edges to
%% delete, then in the second phase we process all the
@@ -86,7 +88,7 @@ remove_bindings(true, #exchange{name = X}, Bs) ->
[trie_remove_edge(X, Parent, Node, W) ||
{Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
ok;
-remove_bindings(false, _X, _Bs) ->
+remove_bindings(none, _X, _Bs) ->
ok.
maybe_add_path(_X, [{root, none}], PathAcc) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index e79a58a1..a869a72c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -409,13 +409,13 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
execute_mnesia_tx_with_tail(TxFun) ->
case mnesia:is_transaction() of
true -> execute_mnesia_transaction(TxFun);
- false -> TailFun = execute_mnesia_transaction(
- fun () ->
- TailFun1 = TxFun(),
- TailFun1(true),
- TailFun1
- end),
- TailFun(false)
+ false -> {TailFun, TailRes} = execute_mnesia_transaction(
+ fun () ->
+ TailFun1 = TxFun(),
+ Res1 = TailFun1(transaction),
+ {TailFun1, Res1}
+ end),
+ TailFun(TailRes)
end.
ensure_ok(ok, _) -> ok;
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 66436920..3d010acf 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -208,6 +208,10 @@ table_definitions() ->
[{record_name, exchange},
{attributes, record_info(fields, exchange)},
{match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
{rabbit_durable_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index b9dbe418..8b3b833c 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -26,6 +26,7 @@
-rabbit_upgrade({internal_exchanges, []}).
-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
-rabbit_upgrade({topic_trie, []}).
+-rabbit_upgrade({exchange_event_serialisation, []}).
%% -------------------------------------------------------------------
@@ -37,6 +38,7 @@
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
-spec(topic_trie/0 :: () -> 'ok').
+-spec(exchange_event_serialisation/0 :: () -> 'ok').
-endif.
@@ -101,6 +103,10 @@ topic_trie() ->
{attributes, [trie_binding, value]},
{type, ordered_set}]).
+exchange_event_serialisation() ->
+ create(rabbit_exchange_serial, [{record_name, exchange_serial},
+ {attributes, [name, serial]}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->