diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-05-04 15:36:03 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-05-04 15:36:03 +0100 |
commit | ba5ce65c9a829321b92008fac836f58197d0d81c (patch) | |
tree | f9d2493f622425869b651a090f249ed1e91b70e6 /src/rabbit_binding.erl | |
parent | c75b4aaebf0fb1ade23f4fb501584f27acc8c829 (diff) | |
parent | 0cabb3204f3683079a24d0f855817fd9b8a29c7f (diff) | |
download | rabbitmq-server-ba5ce65c9a829321b92008fac836f58197d0d81c.tar.gz |
Merge in default
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r-- | src/rabbit_binding.erl | 71 |
1 files changed, 42 insertions, 29 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index dc119fbd..1944792e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -21,7 +21,7 @@ -export([list_for_source/1, list_for_destination/1, list_for_source_and_destination/2]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, - process_deletions/2]). + process_deletions/1]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). %% these must all be run inside a mnesia tx -export([has_for_source/1, remove_for_source/1, @@ -77,7 +77,7 @@ (rabbit_types:binding_destination()) -> deletions()). -spec(remove_transient_for_destination/1 :: (rabbit_types:binding_destination()) -> deletions()). --spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok'). +-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')). -spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()). -spec(add_deletion/3 :: (rabbit_exchange:name(), {'undefined' | rabbit_types:exchange(), @@ -114,12 +114,14 @@ recover(XNames, QNames) -> end) end, fun (R = #route{binding = B = #binding{source = Src}}, Tx) -> - case Tx of - true -> ok = sync_transient_route(R, fun mnesia:write/3); - false -> ok - end, {ok, X} = rabbit_exchange:lookup(Src), - rabbit_exchange:callback(X, add_binding, [Tx, X, B]) + Serial = case Tx of + true -> ok = sync_transient_route( + R, fun mnesia:write/3), + transaction; + false -> rabbit_exchange:serial(X) + end, + rabbit_exchange:callback(X, add_binding, [Serial, X, B]) end, rabbit_semi_durable_route), ok. @@ -142,7 +144,7 @@ add(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/1 + [_] -> fun rabbit_misc:const_ok/0 end; {error, _} = Err -> rabbit_misc:const(Err) end @@ -154,10 +156,11 @@ add(Src, Dst, B) -> mnesia:read({rabbit_durable_route, B}) =:= []) of true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), - fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding, - [Tx, Src, B]), - rabbit_event:notify_if(not Tx, binding_created, - info(B)) + ok = rabbit_exchange:callback(Src, add_binding, [transaction, Src, B]), + Serial = rabbit_exchange:serial(Src), + fun () -> + ok = rabbit_exchange:callback(Src, add_binding, [Serial, Src, B]), + ok = rabbit_event:notify(binding_created, info(B)) end; false -> rabbit_misc:const({error, binding_not_found}) end. @@ -181,7 +184,7 @@ remove(Src, Dst, B) -> ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()), - fun (Tx) -> ok = process_deletions(Deletions, Tx) end. + process_deletions(Deletions). list(VHostPath) -> VHostResource = rabbit_misc:r(VHostPath, '_'), @@ -407,19 +410,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_deletions(Deletions, Tx) -> - dict:fold( - fun (_XName, {X, Deleted, Bindings}, ok) -> - FlatBindings = lists:flatten(Bindings), - [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) || - B <- FlatBindings], - case Deleted of - not_deleted -> - rabbit_exchange:callback(X, remove_bindings, - [Tx, X, FlatBindings]); - deleted -> - rabbit_event:notify_if(not Tx, exchange_deleted, - [{name, X#exchange.name}]), - rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings]) - end - end, ok, Deletions). +process_deletions(Deletions) -> + AugmentedDeletions = + dict:map(fun (_XName, {X, deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, delete, Bs), + {X, deleted, Bs, none}; + (_XName, {X, not_deleted, Bindings}) -> + Bs = lists:flatten(Bindings), + x_callback(transaction, X, remove_bindings, Bs), + {X, not_deleted, Bs, rabbit_exchange:serial(X)} + end, Deletions), + fun() -> + dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) -> + ok = rabbit_event:notify( + exchange_deleted, [{name, XName}]), + del_notify(Bs), + x_callback(Serial, X, delete, Bs); + (_XName, {X, not_deleted, Bs, Serial}, ok) -> + del_notify(Bs), + x_callback(Serial, X, remove_bindings, Bs) + end, ok, AugmentedDeletions) + end. + +del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs]. + +x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]). |