summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-29 11:29:13 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-29 11:29:13 +0100
commit85c36851e257c454a3e1039160fe7b08eb65f2eb (patch)
tree96b531e8c5608fd035957ad4ae0997beb95c7f46
parente876d31d003d2a234e9848101ef1545720e25a4e (diff)
downloadrabbitmq-server-85c36851e257c454a3e1039160fe7b08eb65f2eb.tar.gz
Don't attempt to provide a serial for the x deleted event, we weren't doing it right anyway. Create the table entry at exchange creation time.
-rw-r--r--include/rabbit_exchange_type_spec.hrl7
-rw-r--r--src/rabbit_binding.erl8
-rw-r--r--src/rabbit_exchange.erl40
3 files changed, 34 insertions, 21 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 9458d2fe..bb23445d 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -16,17 +16,18 @@
-ifdef(use_specs).
--type(serial() :: pos_integer() | 'transaction' | 'none').
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
-spec(description/0 :: () -> [{atom(), any()}]).
-spec(serialise_events/0 :: () -> boolean()).
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/2 :: (serial(), rabbit_types:exchange()) -> 'ok').
+-spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(delete/3 :: (serial(), rabbit_types:exchange(),
+-spec(delete/3 :: (tx(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(add_binding/3 :: (serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok').
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 54fa1a63..6384cf0e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -134,7 +134,7 @@ add(Binding, InnerFun) ->
add_notify(X, B) ->
ok = rabbit_exchange:callback(X, add_binding, [transaction, X, B]),
- Serial = rabbit_exchange:serial(X),
+ Serial = rabbit_exchange:serial(X, binding),
fun () ->
ok = rabbit_exchange:callback(X, add_binding, [Serial, X, B]),
ok = rabbit_event:notify(binding_created, info(B))
@@ -411,7 +411,11 @@ process_deletions(Deletions) ->
fun (XName, {X, Deleted, Bindings}, Acc) ->
FlatBindings = lists:flatten(Bindings),
pd_callback(transaction, X, Deleted, FlatBindings),
- dict:store(XName, rabbit_exchange:serial(X), Acc)
+ dict:store(XName, rabbit_exchange:serial(
+ X, case Deleted of
+ deleted -> exchange;
+ not_deleted -> binding
+ end), Acc)
end, dict:new(), Deletions),
fun() ->
dict:fold(
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index ff15ce3a..067df560 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -24,7 +24,7 @@
info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2, delete/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1]).
+-export([maybe_auto_delete/1, serial/2]).
%%----------------------------------------------------------------------------
@@ -75,7 +75,8 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(serial/2:: (rabbit_types:exchange(), 'exchange' | 'binding')
+ -> 'none' | pos_integer()).
-endif.
@@ -86,7 +87,7 @@
recover() ->
Xs = rabbit_misc:table_fold(
fun (X, Acc) ->
- ok = mnesia:write(rabbit_exchange, X, write),
+ write_exchange(X),
[X | Acc]
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
@@ -121,7 +122,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
fun () ->
case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- ok = mnesia:write(rabbit_exchange, X, write),
+ write_exchange(X),
ok = case Durable of
true -> mnesia:write(rabbit_durable_exchange,
X, write);
@@ -135,10 +136,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
fun ({new, Exchange}, Tx) ->
ok = XT:create(case Tx of
true -> transaction;
- false -> case XT:serialise_events() of
- true -> 0;
- false -> none
- end
+ false -> none
end, Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
@@ -148,6 +146,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
Err
end).
+write_exchange(X = #exchange{name = Name, type = XT}) ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ case (type_to_module(XT)):serialise_events() of
+ true -> S = #exchange_serial{name = Name, serial = 0},
+ ok = mnesia:write(rabbit_exchange_serial, S, write);
+ false -> ok
+ end.
+
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
case rabbit_registry:binary_to_type(TypeBin) of
@@ -318,20 +324,22 @@ unconditional_delete(X = #exchange{name = XName}) ->
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-serial(#exchange{name = XName, type = XType}) ->
+serial(#exchange{}, exchange) ->
+ none;
+
+serial(#exchange{name = XName, type = XType}, binding) ->
case (type_to_module(XType)):serialise_events() of
true -> next_serial(XName);
false -> none
end.
next_serial(XName) ->
- Serial1 = case mnesia:read(rabbit_exchange_serial, XName, write) of
- [] -> 1;
- [#exchange_serial{serial = Serial}] -> Serial + 1
- end,
- mnesia:write(rabbit_exchange_serial,
- #exchange_serial{name = XName, serial = Serial1}, write),
- Serial1.
+ [#exchange_serial{serial = S}] =
+ mnesia:read(rabbit_exchange_serial, XName, write),
+ Serial = S + 1,
+ ok = mnesia:write(rabbit_exchange_serial,
+ #exchange_serial{name = XName, serial = Serial}, write),
+ Serial.
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->