diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-24 10:46:34 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-24 10:46:34 +0000 |
commit | c18f07aaa2b4de1442215c8b2d3dda4404ccdecf (patch) | |
tree | 168d052e62e820cab223e505b08b95df69f2a97f /src/rabbit_binding.erl | |
parent | 5d51177b297d5425741b808fb6f78a2712a0376e (diff) | |
parent | c3519792e0c7358da2c108b43e2214f2a9f08875 (diff) | |
download | rabbitmq-server-c18f07aaa2b4de1442215c8b2d3dda4404ccdecf.tar.gz |
Merge from default
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 68 |
1 files changed, 57 insertions, 11 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6167790e..cc7aea33 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -127,8 +127,7 @@ add(Binding, InnerFun) -> fun (Tx) -> ok = rabbit_exchange:callback( Src, add_binding, [Tx, Src, B]), - rabbit_event:notify_if( - not Tx, binding_created, info(B)) + process_addition(Src, B, Tx) end; [_] -> fun rabbit_misc:const_ok/1 end; @@ -161,7 +160,7 @@ remove(Binding, InnerFun) -> {error, _} = Err -> rabbit_misc:const(Err); {ok, Deletions} -> - fun (Tx) -> ok = process_deletions(Deletions, Tx) end + fun (Tx) -> process_deletions(Deletions, Tx) end end end). @@ -405,19 +404,66 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> +process_addition(Src, _B, transaction) -> + serial(Src); + +process_addition(_Src, B, _Serial) -> + ok = rabbit_event:notify(binding_created, info(B)). + +process_deletions(Deletions, transaction) -> + process_deletions( + fun (X, Bindings, Acc) -> + pd_callback(transaction, remove_bindings, X, Bindings), + dict:store(X, serial(X), Acc) + end, + fun (X, Bindings, Acc) -> + pd_callback(transaction, delete, X, Bindings), + dict:store(X, serial(X), Acc) + end, + Deletions, dict:new(), true); + +process_deletions(Deletions, Serials) -> + process_deletions( + fun (X, Bindings, Acc) -> + pd_callback(dict:fetch(X, Serials), remove_bindings, X, Bindings), + Acc + end, + fun (X, Bindings, Acc) -> + pd_callback(dict:fetch(X, Serials), delete, X, Bindings), + rabbit_event:notify(exchange_deleted, [{name, X#exchange.name}]), + Acc + end, + Deletions, ok, false). + +process_deletions(NotDeletedFun, DeletedFun, Deletions, Acc0, Tx) -> dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> + fun (_XName, {X, Deleted, Bindings}, Acc) -> FlatBindings = lists:flatten(Bindings), [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || B <- FlatBindings], case Deleted of not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); + NotDeletedFun(X, FlatBindings, Acc); deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) + DeletedFun(X, FlatBindings, Acc) end - end, ok, Deletions). + end, Acc0, Deletions). + +pd_callback(Arg, CB, X, Bindings) -> + ok = rabbit_exchange:callback(X, CB, [Arg, X, Bindings]). + +serial(X) -> + case rabbit_exchange:callback(X, serialise_events, [X]) of + true -> next_serial(X); + false -> none + end. + +next_serial(#exchange{name = Name}) -> + Prev = case mnesia:read(rabbit_exchange_serial, Name, write) of + [] -> 0; + [#exchange_serial{serial = S}] -> S + end, + Serial = Prev + 1, + mnesia:write(rabbit_exchange_serial, + #exchange_serial{name = Name, serial = Serial}, write), + Serial. |