diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-04-08 16:54:06 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-04-08 16:54:06 +0100 |
commit | cba6ad4fb962117040d243281dd1c0773eda2b42 (patch) | |
tree | 8cfe3ba75fe9f17e5aea5717a198d68af3f986d8 /src/rabbit_binding.erl | |
parent | 9579f96dd9507569d4e4f6576673137e3a329193 (diff) | |
parent | d76f7ffdf5aa02bb1031c9cf1891791607db25a0 (diff) | |
download | rabbitmq-server-cba6ad4fb962117040d243281dd1c0773eda2b42.tar.gz |
Merge in bug24009
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 73 |
1 files changed, 42 insertions, 31 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e4827526..84584a1c 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -21,7 +21,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/2]). + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -44,9 +44,9 @@ rabbit_types:exchange() | rabbit_types:amqqueue()) -> rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -type(bindings() :: [rabbit_types:binding()]). --type(add_res() :: bind_res() | rabbit_misc:const(bind_res())). +-type(add_res() :: bind_res() | rabbit_misc:thunk(bind_res())). -type(bind_or_error() :: bind_res() | rabbit_types:error('binding_not_found')). --type(remove_res() :: bind_or_error() | rabbit_misc:const(bind_or_error())). +-type(remove_res() :: bind_or_error() | rabbit_misc:thunk(bind_or_error())). -opaque(deletions() :: dict()). @@ -78,7 +78,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). +-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -156,15 +156,18 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_binding(B, Durable, durable(Dst), fun mnesia:write/3), - fun (Tx) -> - ok = rabbit_exchange:callback( - Src, add_binding, [Tx, Src, B]), - rabbit_event:notify_if( - not Tx, binding_created, info(B)) - end; + add_notify(Src, B); false -> rabbit_misc:const(not_found) end; - [_] -> fun rabbit_misc:const_ok/1 + [_] -> fun rabbit_misc:const_ok/0 + end. + +add_notify(X, B) -> + ok = rabbit_exchange:callback(X, add_binding, [transaction, X, B]), + Serial = rabbit_exchange:serial(X), + fun () -> + ok = rabbit_exchange:callback(X, add_binding, [Serial, X, B]), + ok = rabbit_event:notify(binding_created, info(B)) end. remove(Binding, InnerFun) -> @@ -189,10 +192,8 @@ remove(Binding, InnerFun) -> end end, case Result of - {error, _} = Err -> - rabbit_misc:const(Err); - {ok, Deletions} -> - fun (Tx) -> ok = process_deletions(Deletions, Tx) end + {error, _} = Err -> rabbit_misc:const(Err); + {ok, Deletions} -> process_deletions(Deletions) end end). @@ -443,19 +444,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> - dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || - B <- FlatBindings], - case Deleted of - not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); - deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) - end - end, ok, Deletions). +process_deletions(Deletions) -> + AugmentedDeletions = + dict:map(fun (_XName, {X, deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, delete, Bs), + {X, deleted, Bs, none}; + (_XName, {X, not_deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, remove_bindings, Bs), + {X, not_deleted, Bs, rabbit_exchange:serial(X)} + end, Deletions), + fun() -> + dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) -> + ok = rabbit_event:notify( + exchange_deleted, [{name, XName}]), + del_notify(Bs), + x_callback(Serial, X, delete, Bs); + (_XName, {X, not_deleted, Bs, Serial}, ok) -> + del_notify(Bs), + x_callback(Serial, X, remove_bindings, Bs) + end, ok, AugmentedDeletions) + end. + +del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. + +x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). |