summaryrefslogtreecommitdiff
path: root/src/rabbit_binding.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-05-04 15:36:03 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-05-04 15:36:03 +0100
commitba5ce65c9a829321b92008fac836f58197d0d81c (patch)
treef9d2493f622425869b651a090f249ed1e91b70e6 /src/rabbit_binding.erl
parentc75b4aaebf0fb1ade23f4fb501584f27acc8c829 (diff)
parent0cabb3204f3683079a24d0f855817fd9b8a29c7f (diff)
downloadrabbitmq-server-ba5ce65c9a829321b92008fac836f58197d0d81c.tar.gz
Merge in default
Diffstat (limited to 'src/rabbit_binding.erl')
-rw-r--r--src/rabbit_binding.erl71
1 files changed, 42 insertions, 29 deletions
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index dc119fbd..1944792e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -21,7 +21,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/2]).
+ process_deletions/1]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -77,7 +77,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
+-spec(process_deletions/1 :: (deletions()) -> rabbit_misc:thunk('ok')).
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -114,12 +114,14 @@ recover(XNames, QNames) ->
end)
end,
fun (R = #route{binding = B = #binding{source = Src}}, Tx) ->
- case Tx of
- true -> ok = sync_transient_route(R, fun mnesia:write/3);
- false -> ok
- end,
{ok, X} = rabbit_exchange:lookup(Src),
- rabbit_exchange:callback(X, add_binding, [Tx, X, B])
+ Serial = case Tx of
+ true -> ok = sync_transient_route(
+ R, fun mnesia:write/3),
+ transaction;
+ false -> rabbit_exchange:serial(X)
+ end,
+ rabbit_exchange:callback(X, add_binding, [Serial, X, B])
end,
rabbit_semi_durable_route),
ok.
@@ -142,7 +144,7 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok -> case mnesia:read({rabbit_route, B}) of
[] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/1
+ [_] -> fun rabbit_misc:const_ok/0
end;
{error, _} = Err -> rabbit_misc:const(Err)
end
@@ -154,10 +156,11 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- fun (Tx) -> ok = rabbit_exchange:callback(Src, add_binding,
- [Tx, Src, B]),
- rabbit_event:notify_if(not Tx, binding_created,
- info(B))
+ ok = rabbit_exchange:callback(Src, add_binding, [transaction, Src, B]),
+ Serial = rabbit_exchange:serial(Src),
+ fun () ->
+ ok = rabbit_exchange:callback(Src, add_binding, [Serial, Src, B]),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
false -> rabbit_misc:const({error, binding_not_found})
end.
@@ -181,7 +184,7 @@ remove(Src, Dst, B) ->
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
Deletions = maybe_auto_delete(B#binding.source, [B], new_deletions()),
- fun (Tx) -> ok = process_deletions(Deletions, Tx) end.
+ process_deletions(Deletions).
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
@@ -407,19 +410,29 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions, Tx) ->
- dict:fold(
- fun (_XName, {X, Deleted, Bindings}, ok) ->
- FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
- B <- FlatBindings],
- case Deleted of
- not_deleted ->
- rabbit_exchange:callback(X, remove_bindings,
- [Tx, X, FlatBindings]);
- deleted ->
- rabbit_event:notify_if(not Tx, exchange_deleted,
- [{name, X#exchange.name}]),
- rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
- end
- end, ok, Deletions).
+process_deletions(Deletions) ->
+ AugmentedDeletions =
+ dict:map(fun (_XName, {X, deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, delete, Bs),
+ {X, deleted, Bs, none};
+ (_XName, {X, not_deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, remove_bindings, Bs),
+ {X, not_deleted, Bs, rabbit_exchange:serial(X)}
+ end, Deletions),
+ fun() ->
+ dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
+ ok = rabbit_event:notify(
+ exchange_deleted, [{name, XName}]),
+ del_notify(Bs),
+ x_callback(Serial, X, delete, Bs);
+ (_XName, {X, not_deleted, Bs, Serial}, ok) ->
+ del_notify(Bs),
+ x_callback(Serial, X, remove_bindings, Bs)
+ end, ok, AugmentedDeletions)
+ end.
+
+del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
+
+x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).