diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-29 11:29:13 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-29 11:29:13 +0100 |
commit | 85c36851e257c454a3e1039160fe7b08eb65f2eb (patch) | |
tree | 96b531e8c5608fd035957ad4ae0997beb95c7f46 | |
parent | e876d31d003d2a234e9848101ef1545720e25a4e (diff) | |
download | rabbitmq-server-85c36851e257c454a3e1039160fe7b08eb65f2eb.tar.gz |
Don't attempt to provide a serial for the x deleted event, we weren't doing it right anyway. Create the table entry at exchange creation time.
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 7 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 8 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 40 |
3 files changed, 34 insertions, 21 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 9458d2fe..bb23445d 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -16,17 +16,18 @@ -ifdef(use_specs). --type(serial() :: pos_integer() | 'transaction' | 'none'). +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). -spec(description/0 :: () -> [{atom(), any()}]). -spec(serialise_events/0 :: () -> boolean()). -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/2 :: (serial(), rabbit_types:exchange()) -> 'ok'). +-spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok'). -spec(recover/2 :: (rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(delete/3 :: (serial(), rabbit_types:exchange(), +-spec(delete/3 :: (tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(add_binding/3 :: (serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 54fa1a63..6384cf0e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -134,7 +134,7 @@ add(Binding, InnerFun) -> add_notify(X, B) -> ok = rabbit_exchange:callback(X, add_binding, [transaction, X, B]), - Serial = rabbit_exchange:serial(X), + Serial = rabbit_exchange:serial(X, binding), fun () -> ok = rabbit_exchange:callback(X, add_binding, [Serial, X, B]), ok = rabbit_event:notify(binding_created, info(B)) @@ -411,7 +411,11 @@ process_deletions(Deletions) -> fun (XName, {X, Deleted, Bindings}, Acc) -> FlatBindings = lists:flatten(Bindings), pd_callback(transaction, X, Deleted, FlatBindings), - dict:store(XName, rabbit_exchange:serial(X), Acc) + dict:store(XName, rabbit_exchange:serial( + X, case Deleted of + deleted -> exchange; + not_deleted -> binding + end), Acc) end, dict:new(), Deletions), fun() -> dict:fold( diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index ff15ce3a..067df560 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -24,7 +24,7 @@ info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). %% these must be run inside a mnesia tx --export([maybe_auto_delete/1, serial/1]). +-export([maybe_auto_delete/1, serial/2]). %%---------------------------------------------------------------------------- @@ -75,7 +75,8 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(serial/1:: (rabbit_types:exchange()) -> 'none' | pos_integer()). +-spec(serial/2:: (rabbit_types:exchange(), 'exchange' | 'binding') + -> 'none' | pos_integer()). -endif. @@ -86,7 +87,7 @@ recover() -> Xs = rabbit_misc:table_fold( fun (X, Acc) -> - ok = mnesia:write(rabbit_exchange, X, write), + write_exchange(X), [X | Acc] end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), @@ -121,7 +122,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> fun () -> case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, X, write), + write_exchange(X), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, X, write); @@ -135,10 +136,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> fun ({new, Exchange}, Tx) -> ok = XT:create(case Tx of true -> transaction; - false -> case XT:serialise_events() of - true -> 0; - false -> none - end + false -> none end, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; @@ -148,6 +146,14 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). +write_exchange(X = #exchange{name = Name, type = XT}) -> + ok = mnesia:write(rabbit_exchange, X, write), + case (type_to_module(XT)):serialise_events() of + true -> S = #exchange_serial{name = Name, serial = 0}, + ok = mnesia:write(rabbit_exchange_serial, S, write); + false -> ok + end. + %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -318,20 +324,22 @@ unconditional_delete(X = #exchange{name = XName}) -> Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. -serial(#exchange{name = XName, type = XType}) -> +serial(#exchange{}, exchange) -> + none; + +serial(#exchange{name = XName, type = XType}, binding) -> case (type_to_module(XType)):serialise_events() of true -> next_serial(XName); false -> none end. next_serial(XName) -> - Serial1 = case mnesia:read(rabbit_exchange_serial, XName, write) of - [] -> 1; - [#exchange_serial{serial = Serial}] -> Serial + 1 - end, - mnesia:write(rabbit_exchange_serial, - #exchange_serial{name = XName, serial = Serial1}, write), - Serial1. + [#exchange_serial{serial = S}] = + mnesia:read(rabbit_exchange_serial, XName, write), + Serial = S + 1, + ok = mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = XName, serial = Serial}, write), + Serial. %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> |